Skip to content

Commit

Permalink
Revert "Stop treating ReadyForQuery as a universal result indicator"
Browse files Browse the repository at this point in the history
This reverts commit 7a81613.

This has caused race regressions and will need to be rethought.
  • Loading branch information
elprans committed Nov 8, 2018
1 parent 8c5c1bf commit 04b6748
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 75 deletions.
4 changes: 2 additions & 2 deletions asyncpg/protocol/coreproto.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ cdef enum ProtocolState:
PROTOCOL_CANCELLED = 3

PROTOCOL_AUTH = 10
PROTOCOL_PARSE_DESCRIBE = 11
PROTOCOL_PREPARE = 11
PROTOCOL_BIND_EXECUTE = 12
PROTOCOL_BIND_EXECUTE_MANY = 13
PROTOCOL_CLOSE_STMT_PORTAL = 14
Expand Down Expand Up @@ -105,7 +105,7 @@ cdef class CoreProtocol:
bint result_execute_completed

cdef _process__auth(self, char mtype)
cdef _process__parse_describe(self, char mtype)
cdef _process__prepare(self, char mtype)
cdef _process__bind_execute(self, char mtype)
cdef _process__bind_execute_many(self, char mtype)
cdef _process__close_stmt_portal(self, char mtype)
Expand Down
208 changes: 145 additions & 63 deletions asyncpg/protocol/coreproto.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -55,47 +55,11 @@ cdef class CoreProtocol:
# 'N' - NoticeResponse
self._on_notice(self._parse_msg_error_response(False))

elif mtype == b'E':
# ErrorResponse
self._parse_msg_error_response(True)
# In all cases, except Auth, ErrorResponse will
# be followed by a ReadyForQuery, which is when
# _push_result() will be called.
if state == PROTOCOL_AUTH:
self._push_result()

elif mtype == b'Z':
# ReadyForQuery
self._parse_msg_ready_for_query()

if state != PROTOCOL_BIND_EXECUTE_MANY:
self._push_result()

else:
if self.result_type == RESULT_FAILED:
self._push_result()
else:
try:
buf = <WriteBuffer>next(self._execute_iter)
except StopIteration:
self._push_result()
except Exception as e:
self.result_type = RESULT_FAILED
self.result = e
self._push_result()
else:
# Next iteration over the executemany()
# arg sequence.
self._send_bind_message(
self._execute_portal_name,
self._execute_stmt_name,
buf, 0)

elif state == PROTOCOL_AUTH:
self._process__auth(mtype)

elif state == PROTOCOL_PARSE_DESCRIBE:
self._process__parse_describe(mtype)
elif state == PROTOCOL_PREPARE:
self._process__prepare(mtype)

elif state == PROTOCOL_BIND_EXECUTE:
self._process__bind_execute(mtype)
Expand Down Expand Up @@ -130,26 +94,42 @@ cdef class CoreProtocol:

elif state == PROTOCOL_CANCELLED:
# discard all messages until the sync message
self.buffer.discard_message()
if mtype == b'E':
self._parse_msg_error_response(True)
elif mtype == b'Z':
self._parse_msg_ready_for_query()
self._push_result()
else:
self.buffer.discard_message()

elif state == PROTOCOL_ERROR_CONSUME:
# Error in protocol (on asyncpg side);
# discard all messages until sync message
self.buffer.discard_message()

if mtype == b'Z':
# Sync point, self to push the result
if self.result_type != RESULT_FAILED:
self.result_type = RESULT_FAILED
self.result = apg_exc.InternalClientError(
'unknown error in protocol implementation')

self._push_result()

else:
self.buffer.discard_message()

else:
raise apg_exc.InternalClientError(
'protocol is in an unknown state {}'.format(state))

except Exception as ex:
self.state = PROTOCOL_ERROR_CONSUME
self.result_type = RESULT_FAILED
self.result = ex

if mtype == b'Z':
# This should only happen if _parse_msg_ready_for_query()
# has failed.
self._push_result()
else:
self.state = PROTOCOL_ERROR_CONSUME

finally:
self.buffer.finish_message()
Expand All @@ -171,27 +151,43 @@ cdef class CoreProtocol:
# BackendKeyData
self._parse_msg_backend_key_data()

# push_result() will be initiated by handling
# ReadyForQuery or ErrorResponse in the main loop.
elif mtype == b'E':
# ErrorResponse
self.con_status = CONNECTION_BAD
self._parse_msg_error_response(True)
self._push_result()

cdef _process__parse_describe(self, char mtype):
if mtype == b'1':
# ParseComplete
self.buffer.discard_message()
elif mtype == b'Z':
# ReadyForQuery
self._parse_msg_ready_for_query()
self.con_status = CONNECTION_OK
self._push_result()

elif mtype == b't':
# ParameterDescription
cdef _process__prepare(self, char mtype):
if mtype == b't':
# Parameters description
self.result_param_desc = self.buffer.consume_message()

elif mtype == b'1':
# ParseComplete
self.buffer.discard_message()

elif mtype == b'T':
# RowDescription
# Row description
self.result_row_desc = self.buffer.consume_message()

elif mtype == b'E':
# ErrorResponse
self._parse_msg_error_response(True)

elif mtype == b'Z':
# ReadyForQuery
self._parse_msg_ready_for_query()
self._push_result()

elif mtype == b'n':
# NoData
self.buffer.discard_message()
self._push_result()

cdef _process__bind_execute(self, char mtype):
if mtype == b'D':
Expand All @@ -201,22 +197,28 @@ cdef class CoreProtocol:
elif mtype == b's':
# PortalSuspended
self.buffer.discard_message()
self._push_result()

elif mtype == b'C':
# CommandComplete
self.result_execute_completed = True
self._parse_msg_command_complete()
self._push_result()

elif mtype == b'E':
# ErrorResponse
self._parse_msg_error_response(True)

elif mtype == b'2':
# BindComplete
self.buffer.discard_message()

elif mtype == b'Z':
# ReadyForQuery
self._parse_msg_ready_for_query()
self._push_result()

elif mtype == b'I':
# EmptyQueryResponse
self.buffer.discard_message()
self._push_result()

cdef _process__bind_execute_many(self, char mtype):
cdef WriteBuffer buf
Expand All @@ -233,24 +235,64 @@ cdef class CoreProtocol:
# CommandComplete
self._parse_msg_command_complete()

elif mtype == b'E':
# ErrorResponse
self._parse_msg_error_response(True)

elif mtype == b'2':
# BindComplete
self.buffer.discard_message()

elif mtype == b'Z':
# ReadyForQuery
self._parse_msg_ready_for_query()
if self.result_type == RESULT_FAILED:
self._push_result()
else:
try:
buf = <WriteBuffer>next(self._execute_iter)
except StopIteration:
self._push_result()
except Exception as e:
self.result_type = RESULT_FAILED
self.result = e
self._push_result()
else:
# Next iteration over the executemany() arg sequence
self._send_bind_message(
self._execute_portal_name, self._execute_stmt_name,
buf, 0)

elif mtype == b'I':
# EmptyQueryResponse
self.buffer.discard_message()

cdef _process__bind(self, char mtype):
if mtype == b'2':
if mtype == b'E':
# ErrorResponse
self._parse_msg_error_response(True)

elif mtype == b'2':
# BindComplete
self.buffer.discard_message()

elif mtype == b'Z':
# ReadyForQuery
self._parse_msg_ready_for_query()
self._push_result()

cdef _process__close_stmt_portal(self, char mtype):
if mtype == b'3':
if mtype == b'E':
# ErrorResponse
self._parse_msg_error_response(True)

elif mtype == b'3':
# CloseComplete
self.buffer.discard_message()

elif mtype == b'Z':
# ReadyForQuery
self._parse_msg_ready_for_query()
self._push_result()

cdef _process__simple_query(self, char mtype):
Expand All @@ -260,21 +302,42 @@ cdef class CoreProtocol:
# 'T' - RowDescription
self.buffer.discard_message()

elif mtype == b'E':
# ErrorResponse
self._parse_msg_error_response(True)

elif mtype == b'Z':
# ReadyForQuery
self._parse_msg_ready_for_query()
self._push_result()

elif mtype == b'C':
# CommandComplete
self._parse_msg_command_complete()

else:
# We don't really care about COPY IN etc
self.buffer.discard_message()

cdef _process__copy_out(self, char mtype):
if mtype == b'H':
if mtype == b'E':
self._parse_msg_error_response(True)

elif mtype == b'H':
# CopyOutResponse
self._set_state(PROTOCOL_COPY_OUT_DATA)
self.buffer.discard_message()

elif mtype == b'Z':
# ReadyForQuery
self._parse_msg_ready_for_query()
self._push_result()

cdef _process__copy_out_data(self, char mtype):
if mtype == b'd':
if mtype == b'E':
self._parse_msg_error_response(True)

elif mtype == b'd':
# CopyData
self._parse_copy_data_msgs()

Expand All @@ -286,18 +349,37 @@ cdef class CoreProtocol:
elif mtype == b'C':
# CommandComplete
self._parse_msg_command_complete()

elif mtype == b'Z':
# ReadyForQuery
self._parse_msg_ready_for_query()
self._push_result()

cdef _process__copy_in(self, char mtype):
if mtype == b'G':
if mtype == b'E':
self._parse_msg_error_response(True)

elif mtype == b'G':
# CopyInResponse
self._set_state(PROTOCOL_COPY_IN_DATA)
self.buffer.discard_message()

elif mtype == b'Z':
# ReadyForQuery
self._parse_msg_ready_for_query()
self._push_result()

cdef _process__copy_in_data(self, char mtype):
if mtype == b'C':
if mtype == b'E':
self._parse_msg_error_response(True)

elif mtype == b'C':
# CommandComplete
self._parse_msg_command_complete()

elif mtype == b'Z':
# ReadyForQuery
self._parse_msg_ready_for_query()
self._push_result()

cdef _parse_msg_command_complete(self):
Expand Down Expand Up @@ -659,7 +741,7 @@ cdef class CoreProtocol:
WriteBuffer buf

self._ensure_connected()
self._set_state(PROTOCOL_PARSE_DESCRIBE)
self._set_state(PROTOCOL_PREPARE)

buf = WriteBuffer.new_message(b'P')
buf.write_str(stmt_name, self.encoding)
Expand Down
Loading

0 comments on commit 04b6748

Please sign in to comment.