forked from TheAlgorithms/Python
- Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbankers_algorithm.py
228 lines (210 loc) · 8.36 KB
/
bankers_algorithm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# A Python implementation of the Banker's Algorithm in Operating Systems using
# Processes and Resources
# {
# "Author: "Biney Kingsley (bluedistro@github.io), bineykingsley36@gmail.com",
# "Date": 28-10-2018
# }
"""
The Banker's algorithm is a resource allocation and deadlock avoidance algorithm
developed by Edsger Dijkstra that tests for safety by simulating the allocation of
predetermined maximum possible amounts of all resources, and then makes a "s-state"
check to test for possible deadlock conditions for all other pending activities,
before deciding whether allocation should be allowed to continue.
| [Source] Wikipedia
| [Credit] Rosetta Code C implementation helped very much.
| (https://rosettacode.org/wiki/Banker%27s_algorithm)
"""
from __future__ importannotations
importnumpyasnp
test_claim_vector= [8, 5, 9, 7]
test_allocated_res_table= [
[2, 0, 1, 1],
[0, 1, 2, 1],
[4, 0, 0, 3],
[0, 2, 1, 0],
[1, 0, 3, 0],
]
test_maximum_claim_table= [
[3, 2, 1, 4],
[0, 2, 5, 2],
[5, 1, 0, 5],
[1, 5, 3, 0],
[3, 0, 3, 3],
]
classBankersAlgorithm:
def__init__(
self,
claim_vector: list[int],
allocated_resources_table: list[list[int]],
maximum_claim_table: list[list[int]],
) ->None:
"""
:param claim_vector: A nxn/nxm list depicting the amount of each resources
(eg. memory, interface, semaphores, etc.) available.
:param allocated_resources_table: A nxn/nxm list depicting the amount of each
resource each process is currently holding
:param maximum_claim_table: A nxn/nxm list depicting how much of each resource
the system currently has available
"""
self.__claim_vector=claim_vector
self.__allocated_resources_table=allocated_resources_table
self.__maximum_claim_table=maximum_claim_table
def__processes_resource_summation(self) ->list[int]:
"""
Check for allocated resources in line with each resource in the claim vector
"""
return [
sum(p_item[i] forp_iteminself.__allocated_resources_table)
foriinrange(len(self.__allocated_resources_table[0]))
]
def__available_resources(self) ->list[int]:
"""
Check for available resources in line with each resource in the claim vector
"""
returnnp.array(self.__claim_vector) -np.array(
self.__processes_resource_summation()
)
def__need(self) ->list[list[int]]:
"""
Implement safety checker that calculates the needs by ensuring that
``max_claim[i][j] - alloc_table[i][j] <= avail[j]``
"""
return [
list(np.array(self.__maximum_claim_table[i]) -np.array(allocated_resource))
fori, allocated_resourceinenumerate(self.__allocated_resources_table)
]
def__need_index_manager(self) ->dict[int, list[int]]:
"""
This function builds an index control dictionary to track original ids/indices
of processes when altered during execution of method "main"
:Return: {0: [a: int, b: int], 1: [c: int, d: int]}
>>> index_control = BankersAlgorithm(
... test_claim_vector, test_allocated_res_table, test_maximum_claim_table
... )._BankersAlgorithm__need_index_manager()
>>> {key: [int(x) for x in value] for key, value
... in index_control.items()} # doctest: +NORMALIZE_WHITESPACE
{0: [1, 2, 0, 3], 1: [0, 1, 3, 1], 2: [1, 1, 0, 2], 3: [1, 3, 2, 0],
4: [2, 0, 0, 3]}
"""
return {self.__need().index(i): iforiinself.__need()}
defmain(self, **kwargs) ->None:
"""
Utilize various methods in this class to simulate the Banker's algorithm
:Return: None
>>> BankersAlgorithm(test_claim_vector, test_allocated_res_table,
... test_maximum_claim_table).main(describe=True)
Allocated Resource Table
P1 2 0 1 1
<BLANKLINE>
P2 0 1 2 1
<BLANKLINE>
P3 4 0 0 3
<BLANKLINE>
P4 0 2 1 0
<BLANKLINE>
P5 1 0 3 0
<BLANKLINE>
System Resource Table
P1 3 2 1 4
<BLANKLINE>
P2 0 2 5 2
<BLANKLINE>
P3 5 1 0 5
<BLANKLINE>
P4 1 5 3 0
<BLANKLINE>
P5 3 0 3 3
<BLANKLINE>
Current Usage by Active Processes: 8 5 9 7
Initial Available Resources: 1 2 2 2
__________________________________________________
<BLANKLINE>
Process 3 is executing.
Updated available resource stack for processes: 5 2 2 5
The process is in a safe state.
<BLANKLINE>
Process 1 is executing.
Updated available resource stack for processes: 7 2 3 6
The process is in a safe state.
<BLANKLINE>
Process 2 is executing.
Updated available resource stack for processes: 7 3 5 7
The process is in a safe state.
<BLANKLINE>
Process 4 is executing.
Updated available resource stack for processes: 7 5 6 7
The process is in a safe state.
<BLANKLINE>
Process 5 is executing.
Updated available resource stack for processes: 8 5 9 7
The process is in a safe state.
<BLANKLINE>
"""
need_list=self.__need()
alloc_resources_table=self.__allocated_resources_table
available_resources=self.__available_resources()
need_index_manager=self.__need_index_manager()
forkw, valinkwargs.items():
ifkwandvalisTrue:
self.__pretty_data()
print("_"*50+"\n")
whileneed_list:
safe=False
foreach_needinneed_list:
execution=True
forindex, needinenumerate(each_need):
ifneed>available_resources[index]:
execution=False
break
ifexecution:
safe=True
# get the original index of the process from ind_ctrl db
fororiginal_need_index, need_cloneinneed_index_manager.items():
ifeach_need==need_clone:
process_number=original_need_index
print(f"Process {process_number+1} is executing.")
# remove the process run from stack
need_list.remove(each_need)
# update available/freed resources stack
available_resources=np.array(available_resources) +np.array(
alloc_resources_table[process_number]
)
print(
"Updated available resource stack for processes: "
+" ".join([str(x) forxinavailable_resources])
)
break
ifsafe:
print("The process is in a safe state.\n")
else:
print("System in unsafe state. Aborting...\n")
break
def__pretty_data(self):
"""
Properly align display of the algorithm's solution
"""
print(" "*9+"Allocated Resource Table")
foriteminself.__allocated_resources_table:
print(
f"P{self.__allocated_resources_table.index(item) +1}"
+" ".join(f"{it:>8}"foritinitem)
+"\n"
)
print(" "*9+"System Resource Table")
foriteminself.__maximum_claim_table:
print(
f"P{self.__maximum_claim_table.index(item) +1}"
+" ".join(f"{it:>8}"foritinitem)
+"\n"
)
print(
"Current Usage by Active Processes: "
+" ".join(str(x) forxinself.__claim_vector)
)
print(
"Initial Available Resources: "
+" ".join(str(x) forxinself.__available_resources())
)
if__name__=="__main__":
importdoctest
doctest.testmod()