Skip to content

Commit 2431717

Browse files
committed
Handle PROTOCOL_CANCELLED (state 3) in _dispatch_result.
Prior to this patch the state machine did not handle results arriving after cancellation. This add explicit support for that scenario by keeping track of the state prior the cancellation (cancelled_from_state), and using cancelled_from_state when consuming the results. The alternative option is just silently dropping the results (similar to PROTOCOL_TERMINATING).
1 parent 9e42642 commit 2431717

File tree

3 files changed

+23
-12
lines changed

3 files changed

+23
-12
lines changed

asyncpg/protocol/coreproto.pxd

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ cdef class CoreProtocol:
8080

8181
ConnectionStatus con_status
8282
ProtocolState state
83+
ProtocolState cancelled_from_state
8384
TransactionStatus xact_status
8485

8586
str encoding

asyncpg/protocol/coreproto.pyx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ cdef class CoreProtocol:
3333
self.con_params = con_params
3434
self.con_status = CONNECTION_BAD
3535
self.state = PROTOCOL_IDLE
36+
self.cancelled_from_state = PROTOCOL_IDLE
3637
self.xact_status = PQTRANS_IDLE
3738
self.encoding = 'utf-8'
3839
# type of `scram` is `SCRAMAuthentcation`
@@ -835,11 +836,13 @@ cdef class CoreProtocol:
835836
pass
836837
else:
837838
self.state = new_state
839+
self.cancelled_from_state = PROTOCOL_IDLE
838840

839841
elif new_state == PROTOCOL_FAILED:
840842
self.state = PROTOCOL_FAILED
841843

842844
elif new_state == PROTOCOL_CANCELLED:
845+
self.cancelled_from_state = self.state
843846
self.state = PROTOCOL_CANCELLED
844847

845848
elif new_state == PROTOCOL_TERMINATING:

asyncpg/protocol/protocol.pyx

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -851,39 +851,46 @@ cdef class BaseProtocol(CoreProtocol):
851851
waiter.set_exception(exc)
852852
return
853853

854+
state = self.state
855+
if state == PROTOCOL_CANCELLED:
856+
state = self.cancelled_from_state
857+
if state == PROTOCOL_IDLE:
858+
waiter.set_exception(asyncio.CancelledError())
859+
return
860+
854861
try:
855-
if self.state == PROTOCOL_AUTH:
862+
if state == PROTOCOL_AUTH:
856863
self._on_result__connect(waiter)
857864

858-
elif self.state == PROTOCOL_PREPARE:
865+
elif state == PROTOCOL_PREPARE:
859866
self._on_result__prepare(waiter)
860867

861-
elif self.state == PROTOCOL_BIND_EXECUTE:
868+
elif state == PROTOCOL_BIND_EXECUTE:
862869
self._on_result__bind_and_exec(waiter)
863870

864-
elif self.state == PROTOCOL_BIND_EXECUTE_MANY:
871+
elif state == PROTOCOL_BIND_EXECUTE_MANY:
865872
self._on_result__bind_and_exec(waiter)
866873

867-
elif self.state == PROTOCOL_EXECUTE:
874+
elif state == PROTOCOL_EXECUTE:
868875
self._on_result__bind_and_exec(waiter)
869876

870-
elif self.state == PROTOCOL_BIND:
877+
elif state == PROTOCOL_BIND:
871878
self._on_result__bind(waiter)
872879

873-
elif self.state == PROTOCOL_CLOSE_STMT_PORTAL:
880+
elif state == PROTOCOL_CLOSE_STMT_PORTAL:
874881
self._on_result__close_stmt_or_portal(waiter)
875882

876-
elif self.state == PROTOCOL_SIMPLE_QUERY:
883+
elif state == PROTOCOL_SIMPLE_QUERY:
877884
self._on_result__simple_query(waiter)
878885

879-
elif (self.state == PROTOCOL_COPY_OUT_DATA or
880-
self.state == PROTOCOL_COPY_OUT_DONE):
886+
elif (state == PROTOCOL_COPY_OUT_DATA or
887+
state == PROTOCOL_COPY_OUT_DONE):
881888
self._on_result__copy_out(waiter)
882889

883-
elif self.state == PROTOCOL_COPY_IN_DATA:
890+
elif state == PROTOCOL_COPY_IN_DATA:
884891
self._on_result__copy_in(waiter)
885892

886-
elif self.state == PROTOCOL_TERMINATING:
893+
elif state == PROTOCOL_TERMINATING:
887894
# We are waiting for the connection to drop, so
888895
# ignore any stray results at this point.
889896
pass

0 commit comments

Comments
 (0)