- Notifications
You must be signed in to change notification settings - Fork 7k
/
Copy pathhsm.py
177 lines (127 loc) · 5.07 KB
/
hsm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
"""
Implementation of the HSM (hierarchical state machine) or
NFSM (nested finite state machine) C++ example from
http://www.eventhelix.com/RealtimeMantra/HierarchicalStateMachine.htm#.VwqLVEL950w
in Python
- single source 'message type' for state transition changes
- message type considered, messages (comment) not considered to avoid complexity
"""
classUnsupportedMessageType(BaseException):
pass
classUnsupportedState(BaseException):
pass
classUnsupportedTransition(BaseException):
pass
classHierachicalStateMachine:
def__init__(self):
self._active_state=Active(self) # Unit.Inservice.Active()
self._standby_state=Standby(self) # Unit.Inservice.Standby()
self._suspect_state=Suspect(self) # Unit.OutOfService.Suspect()
self._failed_state=Failed(self) # Unit.OutOfService.Failed()
self._current_state=self._standby_state
self.states= {
"active": self._active_state,
"standby": self._standby_state,
"suspect": self._suspect_state,
"failed": self._failed_state,
}
self.message_types= {
"fault trigger": self._current_state.on_fault_trigger,
"switchover": self._current_state.on_switchover,
"diagnostics passed": self._current_state.on_diagnostics_passed,
"diagnostics failed": self._current_state.on_diagnostics_failed,
"operator inservice": self._current_state.on_operator_inservice,
}
def_next_state(self, state):
try:
self._current_state=self.states[state]
exceptKeyError:
raiseUnsupportedState
def_send_diagnostics_request(self):
return"send diagnostic request"
def_raise_alarm(self):
return"raise alarm"
def_clear_alarm(self):
return"clear alarm"
def_perform_switchover(self):
return"perform switchover"
def_send_switchover_response(self):
return"send switchover response"
def_send_operator_inservice_response(self):
return"send operator inservice response"
def_send_diagnostics_failure_report(self):
return"send diagnostics failure report"
def_send_diagnostics_pass_report(self):
return"send diagnostics pass report"
def_abort_diagnostics(self):
return"abort diagnostics"
def_check_mate_status(self):
return"check mate status"
defon_message(self, message_type): # message ignored
ifmessage_typeinself.message_types.keys():
self.message_types[message_type]()
else:
raiseUnsupportedMessageType
classUnit:
def__init__(self, HierachicalStateMachine):
self.hsm=HierachicalStateMachine
defon_switchover(self):
raiseUnsupportedTransition
defon_fault_trigger(self):
raiseUnsupportedTransition
defon_diagnostics_failed(self):
raiseUnsupportedTransition
defon_diagnostics_passed(self):
raiseUnsupportedTransition
defon_operator_inservice(self):
raiseUnsupportedTransition
classInservice(Unit):
def__init__(self, HierachicalStateMachine):
self._hsm=HierachicalStateMachine
defon_fault_trigger(self):
self._hsm._next_state("suspect")
self._hsm._send_diagnostics_request()
self._hsm._raise_alarm()
defon_switchover(self):
self._hsm._perform_switchover()
self._hsm._check_mate_status()
self._hsm._send_switchover_response()
classActive(Inservice):
def__init__(self, HierachicalStateMachine):
self._hsm=HierachicalStateMachine
defon_fault_trigger(self):
super().perform_switchover()
super().on_fault_trigger()
defon_switchover(self):
self._hsm.on_switchover() # message ignored
self._hsm.next_state("standby")
classStandby(Inservice):
def__init__(self, HierachicalStateMachine):
self._hsm=HierachicalStateMachine
defon_switchover(self):
super().on_switchover() # message ignored
self._hsm._next_state("active")
classOutOfService(Unit):
def__init__(self, HierachicalStateMachine):
self._hsm=HierachicalStateMachine
defon_operator_inservice(self):
self._hsm.on_switchover() # message ignored
self._hsm.send_operator_inservice_response()
self._hsm.next_state("suspect")
classSuspect(OutOfService):
def__init__(self, HierachicalStateMachine):
self._hsm=HierachicalStateMachine
defon_diagnostics_failed(self):
super().send_diagnostics_failure_report()
super().next_state("failed")
defon_diagnostics_passed(self):
super().send_diagnostics_pass_report()
super().clear_alarm() # loss of redundancy alarm
super().next_state("standby")
defon_operator_inservice(self):
super().abort_diagnostics()
super().on_operator_inservice() # message ignored
classFailed(OutOfService):
"""No need to override any method."""
def__init__(self, HierachicalStateMachine):
self._hsm=HierachicalStateMachine