forked from peterhinch/micropython-async
-
Notifications
You must be signed in to change notification settings - Fork 1
/
asyntest.py
152 lines (128 loc) · 4.25 KB
/
asyntest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# asyntest.py Test/demo of the 'micro' Lock, Event, Barrier and Semaphore classes
# Author: Peter Hinch
# Copyright Peter Hinch 2017 Released under the MIT license
# Tests:
# ack_test()
# event_test()
# barrier_test()
# semaphore_test() Pass True to test BoundedSemaphore class
# Issue ctrl-D after running each test
# CPython 3.5 compatibility
# (ignore RuntimeWarning: coroutine '_g' was never awaited)
try:
import uasyncio as asyncio
except ImportError:
import asyncio
from asyn import Lock, Event, Semaphore, BoundedSemaphore, Barrier
# ************ Test Event class ************
# Demo use of acknowledge event
async def event_wait(event, ack_event, n):
await event
print('Eventwait {} got event with value {}'.format(n, event.value()))
ack_event.set()
async def run_ack():
loop = asyncio.get_event_loop()
event = Event()
ack1 = Event()
ack2 = Event()
count = 0
while True:
loop.create_task(event_wait(event, ack1, 1))
loop.create_task(event_wait(event, ack2, 2))
event.set(count)
count += 1
print('event was set')
await ack1
ack1.clear()
print('Cleared ack1')
await ack2
ack2.clear()
print('Cleared ack2')
event.clear()
print('Cleared event')
await asyncio.sleep(1)
async def ack_coro(delay):
await asyncio.sleep(delay)
print("I've seen starships burn off the shoulder of Orion...")
print("Time to die...")
def ack_test():
loop = asyncio.get_event_loop()
loop.create_task(run_ack())
loop.run_until_complete(ack_coro(10))
# ************ Test Lock and Event classes ************
async def run_lock(n, lock):
print('run_lock {} waiting for lock'.format(n))
async with lock:
print('run_lock {} acquired lock'.format(n))
await asyncio.sleep(1) # Delay to demo other coros waiting for lock
print('run_lock {} released lock'.format(n))
async def eventset(event):
print('Waiting 5 secs before setting event')
await asyncio.sleep(5)
event.set()
print('event was set')
async def eventwait(event):
print('waiting for event')
await event
print('got event')
event.clear()
async def run_event_test(lp):
print('Test Lock class')
loop = asyncio.get_event_loop()
lock = Lock()
loop.create_task(run_lock(1, lock))
loop.create_task(run_lock(2, lock))
loop.create_task(run_lock(3, lock))
print('Test Event class')
event = Event(lp)
print('got here')
loop.create_task(eventset(event))
print('gh1')
await eventwait(event) # run_event_test runs fast until this point
print('Event status {}'.format('Incorrect' if event.is_set() else 'OK'))
print('Tasks complete')
def event_test(lp=True): # Option to use low priority scheduling
loop = asyncio.get_event_loop()
loop.run_until_complete(run_event_test(lp))
# ************ Barrier test ************
async def killer(duration):
await asyncio.sleep(duration)
def callback(text):
print(text)
barrier = Barrier(3, callback, ('Synch',))
async def report():
for i in range(5):
print('{} '.format(i), end='')
await barrier
def barrier_test():
loop = asyncio.get_event_loop()
for _ in range(3):
loop.create_task(report())
loop.run_until_complete(killer(2))
loop.close()
# ************ Semaphore test ************
async def run_sema(n, sema, barrier):
print('run_sema {} trying to access semaphore'.format(n))
async with sema:
print('run_sema {} acquired semaphore'.format(n))
await asyncio.sleep(1) # Delay to demo other coros waiting for sema
print('run_sema {} has released semaphore'.format(n))
await barrier
async def run_sema_test(bounded):
num_coros = 5
loop = asyncio.get_event_loop()
barrier = Barrier(num_coros + 1)
if bounded:
semaphore = BoundedSemaphore(3)
else:
semaphore = Semaphore(3)
for n in range(num_coros):
loop.create_task(run_sema(n, semaphore, barrier))
await barrier # Quit when all coros complete
try:
semaphore.release()
except ValueError:
print('Bounded semaphore exception test OK')
def semaphore_test(bounded=False):
loop = asyncio.get_event_loop()
loop.run_until_complete(run_sema_test(bounded))