forked from dabeaz/curio
-
Notifications
You must be signed in to change notification settings - Fork 0
/
CHANGES
690 lines (530 loc) · 29.1 KB
/
CHANGES
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
CHANGES
-------
01/08/2017 Some refinements to the abide() function. You can now have it
reserve a dedicated thread. This allows it to work with things
like Condition variables. For example::
cond = threading.Condition() # Foreign condition variable
...
async with abide(code, reserve_thread=True) as c:
# c is an async-wrapper around (code)
# The following operation uses the same thread that was
# used to acquire the lock.
await c.wait()
...
abide() also prefers to use the block_in_thread() function that
makes it much more efficient when synchronizing with basic locks
and events.
01/08/2017 Some reworking of internals related to thread/process workers and
task cancellation. One issue with launching work into a thread
worker is that threads have no mechanism for cancellation. They
run fully to completion no matter what. Thus, if you perform some
work like this:
await run_in_thread(callable, args)
and the calling task gets cancelled, it's impossible to find out
what happened with the thread. Basically, it's lost to the sands
of time. However, you can now supply an optional call_on_cancel
argument to the function and use it like this:
def cancelled_result(future):
result = future.result()
...
await run_in_thread(callable, args, call_on_cancel=cancelled_result)
The call_on_cancel function is a normal synchronous
function. It receives the Future instance that was being used
to receive the result of the threaded operation. This
Future is guaranteed to have the result/exception set.
Be aware that there is no way to know when the call_on_cancel
function might be triggered. It might be far in the future.
The Curio kernel might not even be running. Thus, it's
generally not safe to make too many assumptions about it.
The only guarantee is that the call_on_cancel function is
called after a result is computed and it's called in the
same thread.
The main purpose of this feature is to have better support
for cleanup of failed synchronization operations involving
threads.
01/06/2017 New function. block_in_thread(). This works like run_in_thread()
except that it's used with the expectation that whatever operation
is being performed is likely going to block for an undetermined
time period. The underlying operation is handled more efficiently.
For each unique callable, there is at most 1 background thread
being used regardless of how many tasks might be trying to
perform the same operation. For example, suppose you were
trying to synchronize with a foreign queue:
import queue
work_q = queue.Queue() # Standard thread queue
async def worker():
while True:
item = await block_in_thread(work_q.get)
...
# Spin up a huge number of workers
for n in range(1000):
await spawn(worker())
In this code, there is one queue and 1000 worker tasks trying to
read items. The block_in_thread() function only uses 1 background
thread to handle it. If you used run_in_thread() instead, it
consume all available worker threads and you'd probably deadlock.
01/05/2017 Experimental new feature--asynchronous threads! An async thread
is an actual real-life thread where it is safe to call Curio
coroutines and use its various synchronization features.
As an example, suppose you had some code like this:
async def handler(client, addr):
async with client:
async for data in client.as_stream():
n = int(data)
time.sleep(n)
await client.sendall(b'Awake!\n')
print('Connection closed')
run(tcp_server('', 25000, handler))
Imagine that the time.sleep() function represents some kind of
synchronous, blocking operation. In the above code, it would
block the Curio kernel, prevents all other tasks from running.
Not a problem, change the handler() function to an async thread
and use the await() function like this:
from curio.thread import await, async_thread
@async_thread
def handler(client, addr):
with client:
for data in client.as_stream():
n = int(data)
time.sleep(n)
await(client.sendall(b'Awake!\n'))
print('Connection closed')
run(tcp_server('', 25000, handler))
You'll find that the above code works fine and doesn't block
the kernel.
Asynchronous threads only work in the context of Curio. They
may use all of Curio's features. Everywhere you would normally
use await, you use the await() function. with and for statements
will work with objects supporting asynchronous operation.
01/04/2017 Modifed enable_cancellation() and disable_cancellation() so that
they can also be used as functions. This makes it easier to
shield a single operation. For example:
await disable_cancellation(coro())
Functionally, it is the same as this:
async with disable_cancellation():
await coro()
This is mostly a convenience feature.
01/04/2017 Two tasks that attempt to wait on the same file descriptor
now results in an exception. Closes issue #104.
01/04/2017 Modified the monitor so that killing the Curio process also
kills the monitor thread and disconnects any connected clients.
Addresses issue #108.
01/04/2017 Modified task.cancel() so that it also cancels any pending
timeout. This prevents the delivery of a timeout exception
(if any) in code that might be executing to cleanup from
task cancellation.
01/03/2017 Added a TaskCancelled exception. This is now what gets
raised when tasks are cancelled using the task.cancel()
method. It is a subclass of CancelledError. This change
makes CancelledError more of an abstract exception class
for cancellation. The TaskCancelled, TaskTimeout, and
TimeoutCancellationError exceptions are more specific
subclasses that indicates exactly what has happened.
01/02/2017 Major reorganization of how task cancellation works. There
are two major parts to it.
Kernel:
Every task has a boolean flag "task.allow_cancel" that
determines whether or not cancellation exceptions (which
includes cancellation and timeouts) can be raised in the
task or not. The flag acts as a simple mask. If set True,
a cancellation results in an exception being raised in the
task. If set False, the cancellation-related exception is
placed into "task.cancel_pending" instead. That attribute
holds onto the exception indefinitely, waiting for the task
to reenable cancellations. Once reenabled, the exception
is raised immediately the next time the task performs a
blocking operation.
Coroutines:
From coroutines, control of the cancellation flag is
performed by two functions which are used as context
managers:
To disable cancellation, use the following construct:
async def coro():
async with disable_cancellation():
...
await something1()
await something2()
...
await blocking_op() # Cancellation raised here (if any)
Within a disable_cancellation() block, it is illegal for
a CancelledError exception to be raised--even manually. Doing
so causes a RuntimeError.
To reenable cancellation in specific regions of code, use
enable_cancellation() like this:
async def coro():
async with disable_cancellation():
while True:
await something1()
await something2()
async with enable_cancellation() as c:
await blocking_op()
if c.cancel_pending:
# Cancellation is pending right now. Must bail out.
break
await blocking_op() # Cancellation raised here (if any)
Use of enable_cancellation() is never allowed outside of an
enclosing disable_cancellation() block. Doing so will
cause a RuntimeError exception. Within an
enable_cancellation() block, all of the normal cancellation
rules apply. This includes raising of exceptions,
timeouts, and so forth. However, CancelledError exceptions
will never escape the block. Instead, they turn back into
a pending exception which can be checked as shown above.
Normally cancellations are are only delivered on blocking
operations. If you want to force a check, you can use
check_cancellation() like this:
if await check_cancellation():
# Cancellation is pending, but not allowed right now
...
Depending on the setting of the allow_cancel flag,
check_cancellation() will either raise the cancellation
exception immediately or report that it is pending.
12/27/2016 Modified timeout_after(None) so that it leaves any prior timeout
setting in place (if any). However, if a timeout occurs, it
will appear as a TimeoutCancellationError instead of the usual
TaskTimeout exception. This is subtle, but it means that the
timeout occurred to due to an outer timeout setting. This
change makes it easier to write functions that accept optional
timeout settings. For example:
async def func(args, timeout=None):
try:
async with timeout_after(timeout):
statements
...
except TaskTimeout as e:
# Timeout specifically due to timeout setting supplied
...
except CancelledError as e:
# Function cancelled for some other reason
# (possibly an outer timeout)
...
12/23/2016 Added further information to cancellation/timeout exceptions
where partial I/O may have been performed. For readall() and
read_exactly() methods, the bytes_read attribute contains
all data read so far. The readlines() method attaches a
lines_read attribute. For write() and writelines(), a bytes_written
attribute is added to the exception. For example:
try:
data = timeout_after(5, s.readall())
except TimeoutError as e:
data = e.bytes_read # Data received prior to timeout
Here is a sending example:
try:
timeout_after(5, s.write(data))
except TimeoutError as e:
nwritten = e.bytes_written
The primary purpose of these attributes is to allow more
robust recovery in the event of cancellation.
12/23/2016 The timeout arguments to subprocess related functions have been
removed. Use the curio timeout_after() function instead to deal
with this case. For example:
try:
out = timeout_after(5, subprocess.check_output(args))
except TaskTimeout as e:
# Get the partially read output
partial_stdout = e.stdout
partial_stderr = e.stderr
... other recovery ...
If there is an exception, the stdout and stderr
attributes contain any partially read data on standard output
and standard error. These attributes mirror those present
on the CalledProcessError exception raised if there is an error.
12/03/2016 Added a parentid attribute to Task instances so you can find parent
tasks. Nothing else is done with this internally.
12/03/2016 Withdrew the pdb and crash_handler arguments to Kernel() and the
run() function. Added a pdb() method to tasks that can be used
to enter the debugger on a crashed task. High-level handling
of crashed/terminated tasks is being rethought. The old
crash_handler() callback was next to useless since no useful
actions could be performed (i.e., there was no ability to respawn
tasks or execute any kind of coroutine in response to a crash).
11/05/2016 Pulled time related functionality into the kernel as a new call.
Use the following to get the current value of the kernel clock:
await curio.clock()
Timeout related functions such as timeout_after() and ignore_after()
now rely on the kernel clock instead of using time.monotonic().
This changes consolidates all use of the clock into one place
and makes it easier (later) to reconfigure timing should it be
desired. For example, perhaps changing the scale of the clock
to slow down or speed up time handling (for debugging, testing, etc.)
10/29/2016 If the sendall() method of a socket is aborted with a CancelledError,
the resulting exception object now gets a bytes_sent attribute set to
indicate how much progress was made. For example:
try:
await sock.sendall(data)
except CancelledError as e:
print(e.bytes_sent, 'bytes sent')
10/29/2016 Added timeout_at() and ignore_at() functions that allow timeouts
to be specified at absolute clock values. The usage is the
same as for timeout_after() and ignore_after().
10/29/2016 Modified TaskTimout exception so that it subclasses CancelledError.
This makes it easier to write code that handles any kind of
cancellation (explicit cancellation, cancellation by timeout, etc.)
10/17/2016 Added shutdown() method to sockets. It is an async function
to mirror async implementation of close()
await sock.shutdown(how)
10/17/2016 Added writeable() method to sockets. It can be used to
quickly test if a socket will accept more data before
doing a send(). See Issue #83.
await sock.writeable()
nsent = await sock.send(data)
10/17/2016 More precisely defined the semantics of nested timeouts
and exception handling. Consider the following arrangement
of timeout blocks:
# B1
async with timeout_after(time1):
# B2
async with timeout_after(time2):
await coro()
Here are the rules:
1. If time2 expires before time1, then block B2 receives
a TaskTimeout exception.
2. If time1 expires before time2, then block B2 receives
a TimeoutCancellationError exception and block B1
receives a TaskTimeout exception. This reflects the
fact that the inner timeout didn't actually occur
and thus it shouldn't be reported as such. The inner
block is still cancelled however in order to satisfy
the outer timeout.
3. If time2 expires before time1 and the resulting
TaskTimeout is NOT caught, but allowed to propagate out
to B1, then block B1 receives an UncaughtTimeoutError
exception. A block should never report a TaskTimeout
unless its specified time interval has actually expired.
Reporting a timeout early because of an uncaught
exception in an inner block should be considered to be
an operational error. This exception reflects that.
4. If time1 and time2 both expire simultaneously, the
outer timeout takes precedence and time1 is considered
to have expired first.
See Issue #82 for further details about the rationale for
this change. https://github.com/dabeaz/curio/issues/82
08/16/2016 Modified the Queue class so that the put() method can be used from either
synchronous or asynchronous code. For example:
from curio import Queue
queue = Queue()
def spam():
# Create some item
...
queue.put(item)
async def consumer():
while True:
item = await queue.get()
# Consume the item
...
async def coro():
...
spam() # Note: Normal synchronous function call
...
async def main():
await spawn(coro())
await spawn(consumer())
run(main())
The main purpose of adding this is to make it easier for normal
synchronous code to communicate to async tasks without knowing
too much about what they are. Note: The put() method is never
allowed to block in synchronous mode. If the queue has a bounded
size and it fills up, an exception is raised.
08/16/2016 Modified the Event class so that events can also be set from synchronous
code. For example:
from curio import Event
evt = Event()
async def coro():
print('Waiting for something')
await evt.wait()
print('It happened')
# A normal synchronous function. No async/await here.
def spam():
print('About to signal')
evt.set()
async def main():
await spawn(coro())
await sleep(5)
spam() # Note: Normal synchronous function call
run(main())
The main motivation for adding this is that is very easy for
control flow to escape the world of "async/await". However,
that code may still want to signal or coordinate with async
tasks in some way. By allowing a synchronous set(), it
makes it possible to do this. It should be noted that within
a coroutine, you have to use await when triggering an event.
For example:
evt = Event()
def foo():
evt.set() # Synchronous use
async def bar():
await evt.set() # Asynchronous use
08/04/2016 Added a new KernelExit exception that can be used to
make the kernel terminate execution. For example:
async def coro():
...
if something_bad:
raise KernelExit('Something bad')
This causes the kernel to simply stop, aborting the
currently executing task. The exception will propagate
out of the run() function so if you need to catch it, do
this:
try:
run(coro())
except KernelExit as e:
print('Going away because of:', e)
KernelExit by itself does not do anything to other
running tasks. However, the run() function will
separately issue a shutdown request causing all
remaining tasks to cancel.
08/04/2016 Added a new TaskExit exception that can be used to make a
single task terminate. For example:
async def coro():
...
if something_bad:
raise TaskExit('Goodbye')
...
Think of TaskExit as a kind of self-cancellation.
08/04/2016 Some refinements to kernel shutdown. The shutdown process is
more carefully supervised and fixes a few very subtle errors
related to task scheduling.
07/22/2016 Added support for asynchronous access to files as might be
opened by the builtin open() function. Use the new aopen()
function with an async-context manager like this:
async with aopen(filename, 'r') as f:
data = await f.read()
Note: a file opened in this manner provides an asynchronous API
that will prevent the Curio kernel from blocking on things
like disk seeks. However, the underlying implementation is
not specified. In the initial version, thread pools are
used to carry out each I/O operation.
07/18/2016 Some changes to Kernel cleanup and resource management. The
proper way to shut down the kernel is to use Kernel.run(shutdown=True).
Alternatively, the kernel can now been used as a context manager:
with Kernel() as kern:
kern.run(coro())
Note: The plain run() method properly shuts down the Kernel
if you're only running a single coroutine.
The Kernel.__del__() method now raises an exception if the
kernel is deleted without being properly shut down.
06/30/2016 Added alpn_protocols keyword argument to open_connection()
function to make it easier to use TLS ALPN with clients. For
example to open a connection and have it negotiate HTTP/2
or HTTP/1.1 as a protocol, you can do this:
sock = await open_connection(host, port, ssl=True,
server_hostname=host,
alpn_protocols=['h2', 'http/1.1'])
print('selected protocol:', sock.selected_alpn_protocol())
06/30/2016 Changed internal clock handling to use absolute values of
the monotonic clock. New wakeat() function utilizes this
to allow more controlled sleeping for periodic timers
and other applications. For example, here is a loop that
precisely wakes up on a specified time interval:
import time
from curio import wakeat
async def pulse(interval):
next_wake = time.monotonic()
while True:
await wake_at(next_wake)
print('Tick', time.asctime())
next_wake += interval
06/16/2016 Fixed Issue #55. Exceptions occuring in code executed by
run_in_process() now include a RemoteTraceback exception
that shows the traceback from the remote process. This
should make debugging a big easier.
06/11/2016 Fixed Issue #53. curio.run() was swallowing all exceptions. It now
reports a TaskError exception if the given coroutine fails. This is
a chained exception where __cause__ contains the actual cause of
failure. This is meant to be consistent with the join() method
of Tasks.
06/09/2016 Experimental new wait() function added. It can be used to wait for
more than one task at a time and to return them in completion order.
For example:
task1 = await spawn(coro())
task2 = await spawn(coro())
task3 = await spawn(coro())
# Get results from all tasks as they complete
async for task in wait([task1, task2, task3]):
result = await task.join()
# Get the first result and cancel remaining tasks
async with wait([task1, task2, task3]) as w:
task = await w.next_done()
result = await task.join()
# Other tasks cancelled here
06/09/2016 Refined the behavior of timeouts. First, a timeout is not allowed
to extend the time expiration of a previously set timeout. For
example, if code previously set a 5 second timeout, an attempt
to now set a 10 second timeout still results in a 5 second timeout.
Second, when restoring a previous timeout, if the timeout period has
expired, Curio arranges for a TaskTimeout exception to be raised on
the next blocking call. Without this, it's too easy for timeouts
to disappear or not have any effect. Setting a timeout of None
disables timeouts regardless of any prior setting.
06/07/2016 Changed trap names (e.g., '_trap_io') to int enums. This is
low-level change that shouldn't affect existing code.
05/23/2016 Fixed Issue #52 (Problem with ignore_after context manager).
There was a possibility that a task would be marked for
timeout at precisely the same time some other operation had
completed and the task was sitting on the ready queue. To fix,
the timeout is deferred and retried the next time the kernel
blocks.
05/20/2016 Added asyncobject class to curio/meta.py. This allows you
to write classes with an asynchronous __init__ method. For example:
from curio.meta import asyncobject
class Spam(asyncobject):
async def __init__(self):
...
self.value = await coro()
...
Instances can only be created via await. For example:
s = await Spam()
05/15/2016 Fixed Issue #50. Undefined variable n in io.py
Reported by Wolfgang Langner
Version 0.4 : May 13, 2016
--------------------------
05/13/2016 Fixed a subtle bug with futures/cancellation.
Version 0.3 : May 13, 2016
--------------------------
05/13/2016 Bug fixes to the run_in_process() and run_in_thread()
functions so that exceptions are reported properly.
Also fixed logic bug on related to kernel task initialization.
05/13/2016 Modification to the abide() function to allow it to work
with RLocks from the threading module. One caveat: Such
locks are NOT reentrant from within curio itself.
Version 0.2 : May 11, 2016
--------------------------
05/05/2016 Refactoring of stream I/O classes. There is now FileStream
and SocketStream. The old Stream class is gone.
04/30/2016 The run_blocking() and run_cpu_bound() calls are now
called run_in_thread() and run_in_process().
04/23/2016 Changed the new_task() function to spawn().
04/22/2016 Removed parent/child task relationship and associated
tracking. It's an added complexity that's not really
needed in the kernel and it can be done easily enough by
the user in cases where it might be needed.
04/18/2016 Major refactoring of timeout handling. Virtually all
operations in curio support cancellation and timeouts.
However, putting an explicit "timeout" argument on
every API function/method greatly complicates the
underlying implementation (and introduces performance
overhead in cases where timeouts aren't used). To
put a timeout on an operation, use the timeout_after()
function instead. For example:
await timeout_after(5, sock.recv(1024))
This will cause a timeout to be raised after the
specified time interval.
04/01/2016 Improved management of the I/O selector. The number of
register/unregister operations are reduced for tasks
that constantly perform I/O on the same resources. This
could offer a nice performance boost in certain cases.
03/31/2016 Switched test suite to py.test. All of the tests are in the
top-level tests directory. Use 'python3 -m pytest' to test.
03/30/2016 Improved the curio monitor. Instead of relying on the
console TTY (and invoked via Ctrl-C), it now uses a socket
to which you must connect via a different session. To
enable the monitor either use:
kernel = Kernel(with_monitor=True)
or run with an environment variable
env CURIOMONITOR=TRUE python3 yourprogram.py
To connect to the monitor, use the following command:
python3 -m curio.monitor
02/15/2016 Fixed Issue #37 where scheduling multiple tasks for sleeping
could potentially cause a crash in rare circumstances.
Version 0.1 : October 31, 2015
------------------------------
Initial version