Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

# 

#    Licensed under the Apache License, Version 2.0 (the "License"); you may 

#    not use this file except in compliance with the License. You may obtain 

#    a copy of the License at 

# 

#         http://www.apache.org/licenses/LICENSE-2.0 

# 

#    Unless required by applicable law or agreed to in writing, software 

#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

#    License for the specific language governing permissions and limitations 

#    under the License. 

 

import contextlib 

 

from oslo_config import cfg 

import oslo_messaging as messaging 

from oslo_utils import excutils 

 

from magnum.common import exception 

from magnum.conductor.api import ListenerAPI 

from magnum import objects 

from magnum.openstack.common._i18n import _LI 

from magnum.openstack.common._i18n import _LW 

from magnum.openstack.common import log as logging 

 

 

cfg.CONF.import_opt('topic', 'magnum.conductor.config', 

                    group='conductor') 

cfg.CONF.import_opt('conductor_life_check_timeout', 'magnum.conductor.config', 

                    group='conductor') 

 

 

LOG = logging.getLogger(__name__) 

 

 

class BayLock(object): 

 

    def __init__(self, context, bay, conductor_id): 

        self.context = context 

        self.bay = bay 

        self.conductor_id = conductor_id 

 

    @staticmethod 

    def conductor_alive(context, conductor_id): 

        topic = cfg.CONF.conductor.topic 

        timeout = cfg.CONF.conductor.conductor_life_check_timeout 

        listener_api = ListenerAPI(context=context, topic=topic, 

                                   server=conductor_id, timeout=timeout) 

        try: 

            return listener_api.ping_conductor() 

        except messaging.MessagingTimeout: 

            return False 

 

    def acquire(self, retry=True): 

        """Acquire a lock on the bay. 

 

        :param retry: When True, retry if lock was released while stealing. 

        """ 

        lock_conductor_id = objects.BayLock.create(self.bay.uuid, 

                                                   self.conductor_id) 

        if lock_conductor_id is None: 

            LOG.debug("Conductor %(conductor)s acquired lock on bay " 

                      "%(bay)s" % {'conductor': self.conductor_id, 

                                   'bay': self.bay.uuid}) 

            return 

 

        if (lock_conductor_id == self.conductor_id or 

                self.conductor_alive(self.context, lock_conductor_id)): 

            LOG.debug("Lock on bay %(bay)s is owned by conductor " 

                      "%(conductor)s" % {'bay': self.bay.uuid, 

                                         'conductor': lock_conductor_id}) 

            raise exception.OperationInProgress(bay_name=self.bay.name) 

        else: 

            LOG.info(_LI("Stale lock detected on bay %(bay)s.  Conductor " 

                         "%(conductor)s will attempt to steal the lock"), 

                     {'bay': self.bay.uuid, 'conductor': self.conductor_id}) 

 

            result = objects.BayLock.steal(self.bay.uuid, 

                                           lock_conductor_id, 

                                           self.conductor_id) 

 

            if result is None: 

                LOG.info(_LI("Conductor %(conductor)s successfully stole the " 

                             "lock on bay %(bay)s"), 

                         {'conductor': self.conductor_id, 

                          'bay': self.bay.uuid}) 

                return 

            elif result is True: 

                if retry: 

                    LOG.info(_LI("The lock on bay %(bay)s was released while " 

                                 "conductor %(conductor)s was stealing it. " 

                                 "Trying again"), 

                             {'bay': self.bay.uuid, 

                              'conductor': self.conductor_id}) 

                    return self.acquire(retry=False) 

            else: 

                new_lock_conductor_id = result 

                LOG.info(_LI("Failed to steal lock on bay %(bay)s. " 

                             "Conductor %(conductor)s stole the lock first"), 

                         {'bay': self.bay.uuid, 

                          'conductor': new_lock_conductor_id}) 

 

            raise exception.OperationInProgress(bay_name=self.bay.name) 

 

    def release(self, bay_uuid): 

        """Release a bay lock.""" 

        # Only the conductor that owns the lock will be releasing it. 

        result = objects.BayLock.release(bay_uuid, self.conductor_id) 

111        if result is True: 

            LOG.warn(_LW("Lock was already released on bay %s!"), bay_uuid) 

        else: 

            LOG.debug("Conductor %(conductor)s released lock on bay " 

                      "%(bay)s" % {'conductor': self.conductor_id, 

                                   'bay': bay_uuid}) 

 

    @contextlib.contextmanager 

    def thread_lock(self, bay_uuid): 

        """Acquire a lock and release it only if there is an exception. 

        The release method still needs to be scheduled to be run at the 

        end of the thread using the Thread.link method. 

        """ 

        try: 

            self.acquire() 

            yield 

        except exception.OperationInProgress: 

            raise 

        except:  # noqa 

            with excutils.save_and_reraise_exception(): 

                self.release(bay_uuid)