forked from TheAlgorithms/Python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
multi_level_feedback_queue.py
312 lines (283 loc) · 12.1 KB
/
multi_level_feedback_queue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
from collections import deque
class Process:
def __init__(self, process_name: str, arrival_time: int, burst_time: int) -> None:
self.process_name = process_name # process name
self.arrival_time = arrival_time # arrival time of the process
# completion time of finished process or last interrupted time
self.stop_time = arrival_time
self.burst_time = burst_time # remaining burst time
self.waiting_time = 0 # total time of the process wait in ready queue
self.turnaround_time = 0 # time from arrival time to completion time
class MLFQ:
"""
MLFQ(Multi Level Feedback Queue)
https://en.wikipedia.org/wiki/Multilevel_feedback_queue
MLFQ has a lot of queues that have different priority
In this MLFQ,
The first Queue(0) to last second Queue(N-2) of MLFQ have Round Robin Algorithm
The last Queue(N-1) has First Come, First Served Algorithm
"""
def __init__(
self,
number_of_queues: int,
time_slices: list[int],
queue: deque[Process],
current_time: int,
) -> None:
# total number of mlfq's queues
self.number_of_queues = number_of_queues
# time slice of queues that round robin algorithm applied
self.time_slices = time_slices
# unfinished process is in this ready_queue
self.ready_queue = queue
# current time
self.current_time = current_time
# finished process is in this sequence queue
self.finish_queue: deque[Process] = deque()
def calculate_sequence_of_finish_queue(self) -> list[str]:
"""
This method returns the sequence of finished processes
>>> P1 = Process("P1", 0, 53)
>>> P2 = Process("P2", 0, 17)
>>> P3 = Process("P3", 0, 68)
>>> P4 = Process("P4", 0, 24)
>>> mlfq = MLFQ(3, [17, 25], deque([P1, P2, P3, P4]), 0)
>>> _ = mlfq.multi_level_feedback_queue()
>>> mlfq.calculate_sequence_of_finish_queue()
['P2', 'P4', 'P1', 'P3']
"""
sequence = []
for i in range(len(self.finish_queue)):
sequence.append(self.finish_queue[i].process_name)
return sequence
def calculate_waiting_time(self, queue: list[Process]) -> list[int]:
"""
This method calculates waiting time of processes
>>> P1 = Process("P1", 0, 53)
>>> P2 = Process("P2", 0, 17)
>>> P3 = Process("P3", 0, 68)
>>> P4 = Process("P4", 0, 24)
>>> mlfq = MLFQ(3, [17, 25], deque([P1, P2, P3, P4]), 0)
>>> _ = mlfq.multi_level_feedback_queue()
>>> mlfq.calculate_waiting_time([P1, P2, P3, P4])
[83, 17, 94, 101]
"""
waiting_times = []
for i in range(len(queue)):
waiting_times.append(queue[i].waiting_time)
return waiting_times
def calculate_turnaround_time(self, queue: list[Process]) -> list[int]:
"""
This method calculates turnaround time of processes
>>> P1 = Process("P1", 0, 53)
>>> P2 = Process("P2", 0, 17)
>>> P3 = Process("P3", 0, 68)
>>> P4 = Process("P4", 0, 24)
>>> mlfq = MLFQ(3, [17, 25], deque([P1, P2, P3, P4]), 0)
>>> _ = mlfq.multi_level_feedback_queue()
>>> mlfq.calculate_turnaround_time([P1, P2, P3, P4])
[136, 34, 162, 125]
"""
turnaround_times = []
for i in range(len(queue)):
turnaround_times.append(queue[i].turnaround_time)
return turnaround_times
def calculate_completion_time(self, queue: list[Process]) -> list[int]:
"""
This method calculates completion time of processes
>>> P1 = Process("P1", 0, 53)
>>> P2 = Process("P2", 0, 17)
>>> P3 = Process("P3", 0, 68)
>>> P4 = Process("P4", 0, 24)
>>> mlfq = MLFQ(3, [17, 25], deque([P1, P2, P3, P4]), 0)
>>> _ = mlfq.multi_level_feedback_queue()
>>> mlfq.calculate_turnaround_time([P1, P2, P3, P4])
[136, 34, 162, 125]
"""
completion_times = []
for i in range(len(queue)):
completion_times.append(queue[i].stop_time)
return completion_times
def calculate_remaining_burst_time_of_processes(
self, queue: deque[Process]
) -> list[int]:
"""
This method calculate remaining burst time of processes
>>> P1 = Process("P1", 0, 53)
>>> P2 = Process("P2", 0, 17)
>>> P3 = Process("P3", 0, 68)
>>> P4 = Process("P4", 0, 24)
>>> mlfq = MLFQ(3, [17, 25], deque([P1, P2, P3, P4]), 0)
>>> finish_queue, ready_queue = mlfq.round_robin(deque([P1, P2, P3, P4]), 17)
>>> mlfq.calculate_remaining_burst_time_of_processes(mlfq.finish_queue)
[0]
>>> mlfq.calculate_remaining_burst_time_of_processes(ready_queue)
[36, 51, 7]
>>> finish_queue, ready_queue = mlfq.round_robin(ready_queue, 25)
>>> mlfq.calculate_remaining_burst_time_of_processes(mlfq.finish_queue)
[0, 0]
>>> mlfq.calculate_remaining_burst_time_of_processes(ready_queue)
[11, 26]
"""
return [q.burst_time for q in queue]
def update_waiting_time(self, process: Process) -> int:
"""
This method updates waiting times of unfinished processes
>>> P1 = Process("P1", 0, 53)
>>> P2 = Process("P2", 0, 17)
>>> P3 = Process("P3", 0, 68)
>>> P4 = Process("P4", 0, 24)
>>> mlfq = MLFQ(3, [17, 25], deque([P1, P2, P3, P4]), 0)
>>> mlfq.current_time = 10
>>> P1.stop_time = 5
>>> mlfq.update_waiting_time(P1)
5
"""
process.waiting_time += self.current_time - process.stop_time
return process.waiting_time
def first_come_first_served(self, ready_queue: deque[Process]) -> deque[Process]:
"""
FCFS(First Come, First Served)
FCFS will be applied to MLFQ's last queue
A first came process will be finished at first
>>> P1 = Process("P1", 0, 53)
>>> P2 = Process("P2", 0, 17)
>>> P3 = Process("P3", 0, 68)
>>> P4 = Process("P4", 0, 24)
>>> mlfq = MLFQ(3, [17, 25], deque([P1, P2, P3, P4]), 0)
>>> _ = mlfq.first_come_first_served(mlfq.ready_queue)
>>> mlfq.calculate_sequence_of_finish_queue()
['P1', 'P2', 'P3', 'P4']
"""
finished: deque[Process] = deque() # sequence deque of finished process
while len(ready_queue) != 0:
cp = ready_queue.popleft() # current process
# if process's arrival time is later than current time, update current time
if self.current_time < cp.arrival_time:
self.current_time += cp.arrival_time
# update waiting time of current process
self.update_waiting_time(cp)
# update current time
self.current_time += cp.burst_time
# finish the process and set the process's burst-time 0
cp.burst_time = 0
# set the process's turnaround time because it is finished
cp.turnaround_time = self.current_time - cp.arrival_time
# set the completion time
cp.stop_time = self.current_time
# add the process to queue that has finished queue
finished.append(cp)
self.finish_queue.extend(finished) # add finished process to finish queue
# FCFS will finish all remaining processes
return finished
def round_robin(
self, ready_queue: deque[Process], time_slice: int
) -> tuple[deque[Process], deque[Process]]:
"""
RR(Round Robin)
RR will be applied to MLFQ's all queues except last queue
All processes can't use CPU for time more than time_slice
If the process consume CPU up to time_slice, it will go back to ready queue
>>> P1 = Process("P1", 0, 53)
>>> P2 = Process("P2", 0, 17)
>>> P3 = Process("P3", 0, 68)
>>> P4 = Process("P4", 0, 24)
>>> mlfq = MLFQ(3, [17, 25], deque([P1, P2, P3, P4]), 0)
>>> finish_queue, ready_queue = mlfq.round_robin(mlfq.ready_queue, 17)
>>> mlfq.calculate_sequence_of_finish_queue()
['P2']
"""
finished: deque[Process] = deque() # sequence deque of terminated process
# just for 1 cycle and unfinished processes will go back to queue
for _ in range(len(ready_queue)):
cp = ready_queue.popleft() # current process
# if process's arrival time is later than current time, update current time
if self.current_time < cp.arrival_time:
self.current_time += cp.arrival_time
# update waiting time of unfinished processes
self.update_waiting_time(cp)
# if the burst time of process is bigger than time-slice
if cp.burst_time > time_slice:
# use CPU for only time-slice
self.current_time += time_slice
# update remaining burst time
cp.burst_time -= time_slice
# update end point time
cp.stop_time = self.current_time
# locate the process behind the queue because it is not finished
ready_queue.append(cp)
else:
# use CPU for remaining burst time
self.current_time += cp.burst_time
# set burst time 0 because the process is finished
cp.burst_time = 0
# set the finish time
cp.stop_time = self.current_time
# update the process' turnaround time because it is finished
cp.turnaround_time = self.current_time - cp.arrival_time
# add the process to queue that has finished queue
finished.append(cp)
self.finish_queue.extend(finished) # add finished process to finish queue
# return finished processes queue and remaining processes queue
return finished, ready_queue
def multi_level_feedback_queue(self) -> deque[Process]:
"""
MLFQ(Multi Level Feedback Queue)
>>> P1 = Process("P1", 0, 53)
>>> P2 = Process("P2", 0, 17)
>>> P3 = Process("P3", 0, 68)
>>> P4 = Process("P4", 0, 24)
>>> mlfq = MLFQ(3, [17, 25], deque([P1, P2, P3, P4]), 0)
>>> finish_queue = mlfq.multi_level_feedback_queue()
>>> mlfq.calculate_sequence_of_finish_queue()
['P2', 'P4', 'P1', 'P3']
"""
# all queues except last one have round_robin algorithm
for i in range(self.number_of_queues - 1):
finished, self.ready_queue = self.round_robin(
self.ready_queue, self.time_slices[i]
)
# the last queue has first_come_first_served algorithm
self.first_come_first_served(self.ready_queue)
return self.finish_queue
if __name__ == "__main__":
import doctest
P1 = Process("P1", 0, 53)
P2 = Process("P2", 0, 17)
P3 = Process("P3", 0, 68)
P4 = Process("P4", 0, 24)
number_of_queues = 3
time_slices = [17, 25]
queue = deque([P1, P2, P3, P4])
if len(time_slices) != number_of_queues - 1:
raise SystemExit(0)
doctest.testmod(extraglobs={"queue": deque([P1, P2, P3, P4])})
P1 = Process("P1", 0, 53)
P2 = Process("P2", 0, 17)
P3 = Process("P3", 0, 68)
P4 = Process("P4", 0, 24)
number_of_queues = 3
time_slices = [17, 25]
queue = deque([P1, P2, P3, P4])
mlfq = MLFQ(number_of_queues, time_slices, queue, 0)
finish_queue = mlfq.multi_level_feedback_queue()
# print total waiting times of processes(P1, P2, P3, P4)
print(
f"waiting time:\
\t\t\t{MLFQ.calculate_waiting_time(mlfq, [P1, P2, P3, P4])}"
)
# print completion times of processes(P1, P2, P3, P4)
print(
f"completion time:\
\t\t{MLFQ.calculate_completion_time(mlfq, [P1, P2, P3, P4])}"
)
# print total turnaround times of processes(P1, P2, P3, P4)
print(
f"turnaround time:\
\t\t{MLFQ.calculate_turnaround_time(mlfq, [P1, P2, P3, P4])}"
)
# print sequence of finished processes
print(
f"sequence of finished processes:\
{mlfq.calculate_sequence_of_finish_queue()}"
)