Skip to content

Commit

Permalink
Properly handle event streams
Browse files Browse the repository at this point in the history
  • Loading branch information
SamRemis committed Feb 15, 2025
1 parent f8ca829 commit 30a7bcd
Showing 1 changed file with 98 additions and 82 deletions.
180 changes: 98 additions & 82 deletions botocore/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,21 @@ def _has_unknown_tagged_union_member(self, shape, value):
def _handle_unknown_tagged_union_member(self, tag):
return {'SDK_UNKNOWN_MEMBER': {'name': tag}}

def _do_query_compatible_error_parse(self, code, headers, error):
"""
Error response may contain an x-amzn-query-error header to translate
errors codes from former `query` services into other protocols. We use this
to do our lookup in the errorfactory for modeled errors.
"""
query_error = headers['x-amzn-query-error']
query_error_components = query_error.split(';')

if len(query_error_components) == 2 and query_error_components[0]:
error['Error']['QueryErrorCode'] = code
error['Error']['Type'] = query_error_components[1]
return query_error_components[0]
return code


class BaseXMLResponseParser(ResponseParser):
def __init__(self, timestamp_parser=None, blob_parser=None):
Expand Down Expand Up @@ -719,20 +734,6 @@ def _do_error_parse(self, response, shape):
self._inject_response_metadata(error, response['headers'])
return error

def _do_query_compatible_error_parse(self, code, headers, error):
"""
Error response may contain an x-amzn-query-error header to translate
errors codes from former `query` services into `json`. We use this to
do our lookup in the errorfactory for modeled errors.
"""
query_error = headers['x-amzn-query-error']
query_error_components = query_error.split(';')

if len(query_error_components) == 2 and query_error_components[0]:
error['Error']['QueryErrorCode'] = code
error['Error']['Type'] = query_error_components[1]
return query_error_components[0]
return code

def _inject_response_metadata(self, parsed, headers):
if 'x-amzn-requestid' in headers:
Expand All @@ -752,42 +753,13 @@ def _parse_body_as_json(self, body_contents):
# the literal string as the message
return {'message': body}

# TODO sort methods, make sure they're all actually on the right clasess too
# TODO audit service responses error messages
# TODO sort methods, make sure they're all actually on the right classes too


class BaseCBORParser(ResponseParser):
def _do_error_parse(self, response, shape):
# TODO do we add another NotImplementedError on initial body parse?
body = self._initial_body_parse(response['body'])
error = {
"Error": {
"Message": body.get('message', body.get('Message', '')),
"Code": '',
},
"ResponseMetadata": {},
}
headers = response['headers']
code = body.get(
'__type',
response.get('status_code') and str(response['status_code']),
)
if code:
code = code.rsplit('#', 1)[-1]
if 'x-amzn-query-error' in headers:
# TODO this isn't implemented yet
code = self._do_query_compatible_error_parse(
code, headers, error
)
error['Error']['Code'] = code
if 'x-amzn-requestid' in headers:
error.setdefault('ResponseMetadata', {})['RequestId'] = headers[
'x-amzn-requestid'
]
return error

def parse_data_item(self, stream):
initial_byte = self._read_byte(stream)
initial_byte = self._read_next_byte_as_int(stream)
major_type = initial_byte >> 5
additional_info = initial_byte & 0b00011111

Expand All @@ -808,7 +780,7 @@ def parse_data_item(self, stream):
elif major_type == 7:
return self._parse_simple_and_float(stream, additional_info)
else:
raise ValueError(f"Unsupported major type: {major_type}")
raise ResponseParserError(f"Unsupported major type: {major_type}")

def _parse_tag(self, stream, additional_info):
tag = self._parse_unsigned_integer(stream, additional_info)
Expand All @@ -822,7 +794,7 @@ def _parse_datetime(self, value):
if isinstance(value, (int, float)):
return self._timestamp_parser(value)
else:
raise ResponseParserError("Invalid value for datetime tag")
raise ResponseParserError(f"Unable to parse datetime value: {value}")

def _parse_unsigned_integer(self, stream, additional_info):
return self._parse_integer(stream, additional_info)
Expand All @@ -834,7 +806,7 @@ def _parse_integer(self, stream, additional_info):
if additional_info < 24:
return additional_info
elif additional_info == 24:
return self._read_byte(stream)
return self._read_next_byte_as_int(stream)
elif additional_info == 25:
return struct.unpack('>H', self._read_from_stream(stream, 2))[0]
elif additional_info == 26:
Expand Down Expand Up @@ -866,7 +838,7 @@ def _parse_array(self, stream, additional_info):
if additional_info == 31:
items = []
while True:
initial_byte = self._read_byte(stream)
initial_byte = self._read_next_byte_as_int(stream)
if initial_byte == 0xFF:
break
stream.seek(-1, 1)
Expand All @@ -880,12 +852,13 @@ def _parse_map(self, stream, additional_info):
if additional_info == 31:
items = {}
while True:
initial_byte = self._read_byte(stream)
initial_byte = self._read_next_byte_as_int(stream)
if initial_byte == 0xFF:
break
stream.seek(-1, 1)
key = self.parse_data_item(stream)
value = self.parse_data_item(stream)
#TODO is this breaking? Do other parsers behave this way?
if value is not None:
items[key] = value
return items
Expand All @@ -906,12 +879,10 @@ def _parse_simple_and_float(self, stream, additional_info):
return False
elif additional_info == 21:
return True
elif additional_info == 22:
elif additional_info in [22, 23]:
return None
elif additional_info == 23:
return "undefined"
elif additional_info == 24:
return self._read_byte(stream)
return self._read_next_byte_as_int(stream)
elif additional_info == 25:
return struct.unpack('>e', self._read_from_stream(stream, 2))[0]
elif additional_info == 26:
Expand All @@ -921,29 +892,29 @@ def _parse_simple_and_float(self, stream, additional_info):
elif additional_info == 31:
return self._parse_indefinite_length(stream)
else:
raise ValueError(
"Invalid additional information for simple and floating point types"
raise ResponseParserError(
"Invalid additional information for simple or floating point types"
)

def _parse_indefinite_length(self, stream):
items = []
while True:
initial_byte = self._read_byte(stream)
initial_byte = self._read_next_byte_as_int(stream)
if initial_byte == 0xFF:
break
stream.seek(-1, 1)
items.append(self.parse_data_item(stream))
return items

def _read_chunk(self, stream):
initial_byte = self._read_byte(stream)
initial_byte = self._read_next_byte_as_int(stream)
if initial_byte == 0xFF:
return None
additional_info = initial_byte & 0b00011111
length = self._parse_unsigned_integer(stream, additional_info)
return self._read_from_stream(stream, length)

def _read_byte(self, stream):
def _read_next_byte_as_int(self, stream):
byte = stream.read(1)
if not byte:
raise ResponseParserError("End of stream reached")
Expand Down Expand Up @@ -1053,11 +1024,9 @@ def _initial_body_parse(self, xml_string):

class EventStreamCBORParser(BaseEventStreamParser, BaseCBORParser):
def _initial_body_parse(self, body_contents):
# TODO test both code paths
if not body_contents:
if body_contents == b'':
return {}
return self._parse_payload(body_contents)

return self.parse_data_item(io.BytesIO(body_contents))

class JSONParser(BaseJSONParser):
EVENT_STREAM_PARSER_CLS = EventStreamJSONParser
Expand Down Expand Up @@ -1196,12 +1165,12 @@ def _parse_header_map(self, shape, headers):
parsed[name] = headers[header_name]
return parsed

def _initial_body_parse(self, body_contents):
# This method should do the initial xml/json parsing of the
# body. We still need to walk the parsed body in order
# to convert types, but this method will do the first round
# of parsing.
raise NotImplementedError("_initial_body_parse")
# def _initial_body_parse(self, body_contents):
# # This method should do the initial xml/json parsing of the
# # body. We still need to walk the parsed body in order
# # to convert types, but this method will do the first round
# # of parsing.
# raise NotImplementedError("_initial_body_parse")

def _handle_string(self, shape, value):
parsed = value
Expand All @@ -1223,18 +1192,22 @@ def _handle_list(self, shape, node):
# places; go through this for all three new classes
class BaseRpcV2Parser(ResponseParser):
def _do_parse(self, response, shape):
final_parsed = {}
final_parsed['ResponseMetadata'] = self._populate_response_metadata(
parsed = {}
if shape is not None:
event_name = shape.event_stream_name
if event_name:
parsed = self._handle_event_stream(response, shape, event_name)
else:
parsed = self._handle_cbor_body(response, shape)
parsed['ResponseMetadata'] = self._populate_response_metadata(
response
)
self._add_modeled_parse(response, shape, final_parsed)
return final_parsed
return parsed

def _add_modeled_parse(self, response, shape, final_parsed):
if shape is None:
return final_parsed
member_shapes = shape.members
self._parse_payload(response, shape, member_shapes, final_parsed)
self._parse_payload(response, shape, final_parsed)

def _do_modeled_error_parse(self, response, shape):
final_parsed = {}
Expand All @@ -1244,21 +1217,18 @@ def _do_modeled_error_parse(self, response, shape):
def _populate_response_metadata(self, response):
metadata = {}
headers = response['headers']
# TODO confirm this should still be here
if 'x-amzn-requestid' in headers:
metadata['RequestId'] = headers['x-amzn-requestid']
elif 'x-amz-request-id' in headers:
metadata['RequestId'] = headers['x-amz-request-id']
return metadata

def _parse_payload(self, response, shape, member_shapes, final_parsed):
def _parse_payload(self, response, shape, final_parsed):
original_parsed = self._initial_body_parse(response['body'])
body_parsed = self._parse_shape(shape, original_parsed)
final_parsed.update(body_parsed)

# TODO do we need to add NotImplemented to parse_shape, etc.?
def _initial_body_parse(self, body_contents):
# This method should do the initial CBOR parsing of the
# This method should do the initial parsing of the
# body. We still need to walk the parsed body in order
# to convert types, but this method will do the first round
# of parsing.
Expand Down Expand Up @@ -1302,11 +1272,57 @@ def _initial_body_parse(self, body_contents):
if body_contents == b'':
return body_contents
body_contents_stream = io.BytesIO(body_contents)
# TODO consider adding a check that the stream is empty?
return self.parse_data_item(body_contents_stream)

# TODO do we need any overrides from the base cbor parser? Probably not?
def _do_error_parse(self, response, shape):
body = self._initial_body_parse(response['body'])
error = {
"Error": {
"Message": body.get('message', body.get('Message', '')),
"Code": '',
},
"ResponseMetadata": {},
}
headers = response['headers']
code = body.get(
'__type',
response.get('status_code') and str(response['status_code']),
)
if code:
code = code.rsplit('#', 1)[-1]
if 'x-amzn-query-error' in headers:
# TODO test this
code = self._do_query_compatible_error_parse(
code, headers, error
)
error['Error']['Code'] = code
if 'x-amzn-requestid' in headers:
error.setdefault('ResponseMetadata', {})['RequestId'] = headers[
'x-amzn-requestid'
]
return error

#TODO this should probably go on the cbor parser?
def _handle_event_stream(self, response, shape, event_name):
event_stream_shape = shape.members[event_name]
event_stream = self._create_event_stream(response, event_stream_shape)
try:
event = event_stream.get_initial_response()
except NoInitialResponseError:
error_msg = 'First event was not of type initial-response'
raise ResponseParserError(error_msg)
parsed = self._initial_body_parse(event.payload)
parsed[event_name] = event_stream
return parsed

def _handle_cbor_body(self, response, shape):
#TODO add a comment explaining this method (similar to _handle_json_body)
if shape is None:
return b''
parsed = {}
#TODO is parse_payload on the right place? This is CBOR...
self._parse_payload(response, shape, parsed)
return self._parse_shape(shape, parsed)

class RestXMLParser(BaseRestParser, BaseXMLResponseParser):
EVENT_STREAM_PARSER_CLS = EventStreamXMLParser
Expand Down

0 comments on commit 30a7bcd

Please sign in to comment.