Skip to content

Commit

Permalink
Change headers format to str:json (#319)
Browse files Browse the repository at this point in the history
* Header in ErrorMessage is not changed
* ClientHandshake always send b'\0\0' (empty ext headers)
* ServerHandshake no longer has ext headers
* Fixup passing of capability bitmask
* Use signed int for implicit limit
* Fix handling of capabilities

Co-authored-by: Elvis Pranskevichus <elvis@edgedb.com>
  • Loading branch information
fantix and elprans authored Jun 18, 2022
1 parent b2c9e4b commit f17a4fd
Show file tree
Hide file tree
Showing 11 changed files with 151 additions and 119 deletions.
3 changes: 1 addition & 2 deletions edgedb/base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -712,8 +712,7 @@ def free_size(self) -> int:
async def _query(self, query_context: abstract.QueryContext):
con = await self._impl.acquire()
try:
result, _ = await con.raw_query(query_context)
return result
return await con.raw_query(query_context)
finally:
await self._impl.release(con)

Expand Down
7 changes: 0 additions & 7 deletions edgedb/protocol/consts.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,3 @@ DEF PROTO_VER_MINOR = 0

DEF LEGACY_PROTO_VER_MAJOR = 0
DEF LEGACY_PROTO_VER_MINOR_MIN = 13

DEF QUERY_OPT_IMPLICIT_LIMIT = 0xFF01
DEF QUERY_OPT_INLINE_TYPENAMES = 0xFF02
DEF QUERY_OPT_INLINE_TYPEIDS = 0xFF03
DEF QUERY_OPT_ALLOW_CAPABILITIES = 0xFF04

DEF SERVER_HEADER_CAPABILITIES = 0x1001
13 changes: 2 additions & 11 deletions edgedb/protocol/protocol.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ cdef class SansIOProtocol:

readonly bytes last_status
readonly bytes last_details
readonly object last_capabilities
readonly tuple protocol_version

readonly bint is_legacy
Expand All @@ -118,8 +119,7 @@ cdef class SansIOProtocol:
)

cdef inline ignore_headers(self)
cdef dict parse_headers(self)
cdef write_headers(self, WriteBuffer buf, dict headers)
cdef dict parse_error_headers(self)

cdef parse_error_message(self)

Expand All @@ -133,15 +133,6 @@ cdef class SansIOProtocol:

cdef fallthrough(self)

cdef write_execute_headers(
self,
WriteBuffer buf,
int implicit_limit,
bint inline_typenames,
bint inline_typeids,
uint64_t allow_capabilities,
)

cdef ensure_connected(self)


Expand Down
105 changes: 36 additions & 69 deletions edgedb/protocol/protocol.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -182,51 +182,16 @@ cdef class SansIOProtocol:

cdef inline ignore_headers(self):
cdef uint16_t num_fields = <uint16_t>self.buffer.read_int16()
while num_fields:
self.buffer.read_int16()
self.buffer.read_len_prefixed_bytes()
num_fields -= 1

cdef write_headers(self, buf: WriteBuffer, headers: dict):
buf.write_int16(len(headers))
for k, v in headers.items():
buf.write_int16(<int16_t><uint16_t>k)
if isinstance(v, bytes):
buf.write_len_prefixed_bytes(v)
else:
buf.write_len_prefixed_utf8(str(v))

cdef write_execute_headers(
self,
WriteBuffer buf,
int implicit_limit,
bint inline_typenames,
bint inline_typeids,
uint64_t allow_capabilities,
):
cdef bytes val
if (
implicit_limit or
inline_typenames or inline_typeids or
allow_capabilities != ALL_CAPABILITIES
):
headers = {}
if implicit_limit:
headers[QUERY_OPT_IMPLICIT_LIMIT] = implicit_limit
if inline_typenames:
headers[QUERY_OPT_INLINE_TYPENAMES] = True
if inline_typeids:
headers[QUERY_OPT_INLINE_TYPEIDS] = True
if allow_capabilities != ALL_CAPABILITIES:
val = cpython.PyBytes_FromStringAndSize(NULL, sizeof(uint64_t))
hton.pack_int64(
cpython.PyBytes_AsString(val),
<int64_t><uint64_t>allow_capabilities
)
headers[QUERY_OPT_ALLOW_CAPABILITIES] = val
self.write_headers(buf, headers)
if self.is_legacy:
while num_fields:
self.buffer.read_int16() # key
self.buffer.read_len_prefixed_bytes() # value
num_fields -= 1
else:
buf.write_int16(0) # no headers
while num_fields:
self.buffer.read_len_prefixed_bytes() # key
self.buffer.read_len_prefixed_bytes() # value
num_fields -= 1

cdef ensure_connected(self):
if self.cancelled:
Expand Down Expand Up @@ -274,7 +239,7 @@ cdef class SansIOProtocol:
buf.write_int16(0) # no headers
buf.write_int64(<int64_t><uint64_t>allow_capabilities)
buf.write_int64(<int64_t><uint64_t>compilation_flags)
buf.write_int64(<int64_t><uint64_t>implicit_limit)
buf.write_int64(<int64_t>implicit_limit)
buf.write_byte(output_format)
buf.write_byte(CARDINALITY_ONE if expect_one else CARDINALITY_MANY)
buf.write_len_prefixed_utf8(query)
Expand All @@ -294,7 +259,6 @@ cdef class SansIOProtocol:
self.write(packet)

result = datatypes.set_new(0)
attrs = None
re_exec = False
exc = None
while True:
Expand Down Expand Up @@ -345,7 +309,7 @@ cdef class SansIOProtocol:
self.buffer.discard_message()

elif mtype == COMMAND_COMPLETE_MSG:
attrs = self.parse_command_complete_message()
self.parse_command_complete_message()

elif mtype == ERROR_RESPONSE_MSG:
exc = self.parse_error_message()
Expand Down Expand Up @@ -396,7 +360,7 @@ cdef class SansIOProtocol:
allow_re_exec=False,
)
else:
return result, attrs
return result

async def execute(
self,
Expand All @@ -412,7 +376,7 @@ cdef class SansIOProtocol:
implicit_limit: int = 0,
inline_typenames: bool = False,
inline_typeids: bool = False,
allow_capabilities: typing.Optional[int] = None,
allow_capabilities: enums.Capability = enums.Capability.ALL,
):
cdef:
BaseCodec in_dc
Expand Down Expand Up @@ -467,9 +431,9 @@ cdef class SansIOProtocol:
implicit_limit: int = 0,
inline_typenames: bool = False,
inline_typeids: bool = False,
allow_capabilities: typing.Optional[int] = None,
allow_capabilities: enums.Capability = enums.Capability.ALL,
):
ret, attrs = await self.execute(
ret = await self.execute(
query=query,
args=args,
kwargs=kwargs,
Expand All @@ -487,27 +451,27 @@ cdef class SansIOProtocol:
if expect_one:
if ret or not required_one:
if ret:
return ret[0], attrs
return ret[0]
else:
if output_format == OutputFormat.JSON:
return 'null', attrs
return 'null'
else:
return None, attrs
return None
else:
methname = _QUERY_SINGLE_METHOD[required_one][output_format]
raise errors.NoDataError(
f'query executed via {methname}() returned no data')
else:
if ret:
if output_format == OutputFormat.JSON:
return ret[0], attrs
return ret[0]
else:
return ret, attrs
return ret
else:
if output_format == OutputFormat.JSON:
return '[]', attrs
return '[]'
else:
return ret, attrs
return ret

async def dump(self, header_callback, block_callback):
cdef:
Expand Down Expand Up @@ -710,8 +674,7 @@ cdef class SansIOProtocol:
handshake_buf.write_len_prefixed_utf8(k)
handshake_buf.write_len_prefixed_utf8(v)

# no extensions requested
handshake_buf.write_int16(0)
handshake_buf.write_int16(0) # reserved
handshake_buf.end_message()

self.write(handshake_buf)
Expand All @@ -726,15 +689,18 @@ cdef class SansIOProtocol:
# means protocol negotiation.
major = self.buffer.read_int16()
minor = self.buffer.read_int16()
self.parse_headers()

# TODO: drop this branch when dropping protocol_v0
if major == LEGACY_PROTO_VER_MAJOR:
self.is_legacy = True
self.ignore_headers()

self.buffer.finish_message()

if (
if major != PROTO_VER_MAJOR and not (
major == LEGACY_PROTO_VER_MAJOR and
minor >= LEGACY_PROTO_VER_MINOR_MIN
):
self.is_legacy = True
elif major != PROTO_VER_MAJOR:
raise errors.ClientConnectionError(
f'the server requested an unsupported version of '
f'the protocol: {major}.{minor}'
Expand Down Expand Up @@ -930,7 +896,7 @@ cdef class SansIOProtocol:
code = <uint32_t>self.buffer.read_int32()
message = self.buffer.read_len_prefixed_utf8()
# Ignore any headers: not yet specified for log messages.
self.parse_headers()
self.ignore_headers()
self.buffer.finish_message()

msg = errors.EdgeDBMessage._from_code(code, severity, message)
Expand Down Expand Up @@ -983,6 +949,7 @@ cdef class SansIOProtocol:
bytes cardinality

try:
self.ignore_headers()
capabilities = self.buffer.read_int64()
cardinality = self.buffer.read_byte()
in_dc, out_dc = self.parse_type_data(reg)
Expand Down Expand Up @@ -1075,8 +1042,8 @@ cdef class SansIOProtocol:

cdef parse_command_complete_message(self):
assert self.buffer.get_message_type() == COMMAND_COMPLETE_MSG
self.parse_headers()
self.buffer.read_int64()
self.ignore_headers()
self.last_capabilities = enums.Capability(self.buffer.read_int64())
self.last_status = self.buffer.read_len_prefixed_bytes()
self.buffer.finish_message()

Expand Down Expand Up @@ -1118,7 +1085,7 @@ cdef class SansIOProtocol:

return exc

cdef dict parse_headers(self):
cdef dict parse_error_headers(self):
cdef:
dict attrs
uint16_t num_fields
Expand Down Expand Up @@ -1146,7 +1113,7 @@ cdef class SansIOProtocol:
severity = <uint8_t>self.buffer.read_byte()
code = <uint32_t>self.buffer.read_int32()
msg = self.buffer.read_len_prefixed_utf8()
attrs = self.parse_headers()
attrs = self.parse_error_headers()

# It's safe to always map error codes as we don't reuse them
code = OLD_ERROR_CODES.get(code, code)
Expand Down
10 changes: 10 additions & 0 deletions edgedb/protocol/protocol_v0.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,13 @@
cdef class SansIOProtocolBackwardsCompatible(SansIOProtocol):
cdef parse_legacy_describe_type_message(self, CodecsRegistry reg)
cdef parse_legacy_command_complete_message(self)
cdef legacy_write_headers(self, WriteBuffer buf, dict headers)
cdef legacy_write_execute_headers(
self,
WriteBuffer buf,
int implicit_limit,
bint inline_typenames,
bint inline_typeids,
uint64_t allow_capabilities,
)
cdef dict legacy_parse_headers(self)
Loading

0 comments on commit f17a4fd

Please sign in to comment.