From 4df21201707bc61d3eeddc19570c6996a3dc371a Mon Sep 17 00:00:00 2001 From: ohmayr Date: Fri, 23 Aug 2024 15:16:11 +0000 Subject: [PATCH 1/4] duplicating file to base --- google/api_core/{rest_streaming.py => _rest_streaming_base.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename google/api_core/{rest_streaming.py => _rest_streaming_base.py} (100%) diff --git a/google/api_core/rest_streaming.py b/google/api_core/_rest_streaming_base.py similarity index 100% rename from google/api_core/rest_streaming.py rename to google/api_core/_rest_streaming_base.py From 26f52a57334a25a7b6e40ce1a9b4f5d39d2460f1 Mon Sep 17 00:00:00 2001 From: ohmayr Date: Fri, 23 Aug 2024 15:20:31 +0000 Subject: [PATCH 2/4] restore original file --- google/api_core/rest_streaming.py | 132 ++++++++++++++++++++++++++++++ 1 file changed, 132 insertions(+) create mode 100644 google/api_core/rest_streaming.py diff --git a/google/api_core/rest_streaming.py b/google/api_core/rest_streaming.py new file mode 100644 index 00000000..88bcb31b --- /dev/null +++ b/google/api_core/rest_streaming.py @@ -0,0 +1,132 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Helpers for server-side streaming in REST.""" + +from collections import deque +import string +from typing import Deque, Union + +import proto +import requests +import google.protobuf.message +from google.protobuf.json_format import Parse + + +class ResponseIterator: + """Iterator over REST API responses. + + Args: + response (requests.Response): An API response object. + response_message_cls (Union[proto.Message, google.protobuf.message.Message]): A response + class expected to be returned from an API. + + Raises: + ValueError: If `response_message_cls` is not a subclass of `proto.Message` or `google.protobuf.message.Message`. + """ + + def __init__( + self, + response: requests.Response, + response_message_cls: Union[proto.Message, google.protobuf.message.Message], + ): + self._response = response + self._response_message_cls = response_message_cls + # Inner iterator over HTTP response's content. + self._response_itr = self._response.iter_content(decode_unicode=True) + # Contains a list of JSON responses ready to be sent to user. + self._ready_objs: Deque[str] = deque() + # Current JSON response being built. + self._obj = "" + # Keeps track of the nesting level within a JSON object. + self._level = 0 + # Keeps track whether HTTP response is currently sending values + # inside of a string value. + self._in_string = False + # Whether an escape symbol "\" was encountered. + self._escape_next = False + + def cancel(self): + """Cancel existing streaming operation.""" + self._response.close() + + def _process_chunk(self, chunk: str): + if self._level == 0: + if chunk[0] != "[": + raise ValueError( + "Can only parse array of JSON objects, instead got %s" % chunk + ) + for char in chunk: + if char == "{": + if self._level == 1: + # Level 1 corresponds to the outermost JSON object + # (i.e. the one we care about). + self._obj = "" + if not self._in_string: + self._level += 1 + self._obj += char + elif char == "}": + self._obj += char + if not self._in_string: + self._level -= 1 + if not self._in_string and self._level == 1: + self._ready_objs.append(self._obj) + elif char == '"': + # Helps to deal with an escaped quotes inside of a string. + if not self._escape_next: + self._in_string = not self._in_string + self._obj += char + elif char in string.whitespace: + if self._in_string: + self._obj += char + elif char == "[": + if self._level == 0: + self._level += 1 + else: + self._obj += char + elif char == "]": + if self._level == 1: + self._level -= 1 + else: + self._obj += char + else: + self._obj += char + self._escape_next = not self._escape_next if char == "\\" else False + + def __next__(self): + while not self._ready_objs: + try: + chunk = next(self._response_itr) + self._process_chunk(chunk) + except StopIteration as e: + if self._level > 0: + raise ValueError("Unfinished stream: %s" % self._obj) + raise e + return self._grab() + + def _grab(self): + # Add extra quotes to make json.loads happy. + if issubclass(self._response_message_cls, proto.Message): + return self._response_message_cls.from_json( + self._ready_objs.popleft(), ignore_unknown_fields=True + ) + elif issubclass(self._response_message_cls, google.protobuf.message.Message): + return Parse(self._ready_objs.popleft(), self._response_message_cls()) + else: + raise ValueError( + "Response message class must be a subclass of proto.Message or google.protobuf.message.Message." + ) + + def __iter__(self): + return self From 204920a3f2da427329f22f0a77dea03dd05b8610 Mon Sep 17 00:00:00 2001 From: ohmayr Date: Fri, 23 Aug 2024 15:22:10 +0000 Subject: [PATCH 3/4] duplicate file to async --- google/api_core/{rest_streaming.py => rest_streaming_async.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename google/api_core/{rest_streaming.py => rest_streaming_async.py} (100%) diff --git a/google/api_core/rest_streaming.py b/google/api_core/rest_streaming_async.py similarity index 100% rename from google/api_core/rest_streaming.py rename to google/api_core/rest_streaming_async.py From 2014ef6b63f4e1ea9d3fdd04f40306fbdc76ae17 Mon Sep 17 00:00:00 2001 From: ohmayr Date: Fri, 23 Aug 2024 15:22:35 +0000 Subject: [PATCH 4/4] restore original file --- google/api_core/rest_streaming.py | 132 ++++++++++++++++++++++++++++++ 1 file changed, 132 insertions(+) create mode 100644 google/api_core/rest_streaming.py diff --git a/google/api_core/rest_streaming.py b/google/api_core/rest_streaming.py new file mode 100644 index 00000000..88bcb31b --- /dev/null +++ b/google/api_core/rest_streaming.py @@ -0,0 +1,132 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Helpers for server-side streaming in REST.""" + +from collections import deque +import string +from typing import Deque, Union + +import proto +import requests +import google.protobuf.message +from google.protobuf.json_format import Parse + + +class ResponseIterator: + """Iterator over REST API responses. + + Args: + response (requests.Response): An API response object. + response_message_cls (Union[proto.Message, google.protobuf.message.Message]): A response + class expected to be returned from an API. + + Raises: + ValueError: If `response_message_cls` is not a subclass of `proto.Message` or `google.protobuf.message.Message`. + """ + + def __init__( + self, + response: requests.Response, + response_message_cls: Union[proto.Message, google.protobuf.message.Message], + ): + self._response = response + self._response_message_cls = response_message_cls + # Inner iterator over HTTP response's content. + self._response_itr = self._response.iter_content(decode_unicode=True) + # Contains a list of JSON responses ready to be sent to user. + self._ready_objs: Deque[str] = deque() + # Current JSON response being built. + self._obj = "" + # Keeps track of the nesting level within a JSON object. + self._level = 0 + # Keeps track whether HTTP response is currently sending values + # inside of a string value. + self._in_string = False + # Whether an escape symbol "\" was encountered. + self._escape_next = False + + def cancel(self): + """Cancel existing streaming operation.""" + self._response.close() + + def _process_chunk(self, chunk: str): + if self._level == 0: + if chunk[0] != "[": + raise ValueError( + "Can only parse array of JSON objects, instead got %s" % chunk + ) + for char in chunk: + if char == "{": + if self._level == 1: + # Level 1 corresponds to the outermost JSON object + # (i.e. the one we care about). + self._obj = "" + if not self._in_string: + self._level += 1 + self._obj += char + elif char == "}": + self._obj += char + if not self._in_string: + self._level -= 1 + if not self._in_string and self._level == 1: + self._ready_objs.append(self._obj) + elif char == '"': + # Helps to deal with an escaped quotes inside of a string. + if not self._escape_next: + self._in_string = not self._in_string + self._obj += char + elif char in string.whitespace: + if self._in_string: + self._obj += char + elif char == "[": + if self._level == 0: + self._level += 1 + else: + self._obj += char + elif char == "]": + if self._level == 1: + self._level -= 1 + else: + self._obj += char + else: + self._obj += char + self._escape_next = not self._escape_next if char == "\\" else False + + def __next__(self): + while not self._ready_objs: + try: + chunk = next(self._response_itr) + self._process_chunk(chunk) + except StopIteration as e: + if self._level > 0: + raise ValueError("Unfinished stream: %s" % self._obj) + raise e + return self._grab() + + def _grab(self): + # Add extra quotes to make json.loads happy. + if issubclass(self._response_message_cls, proto.Message): + return self._response_message_cls.from_json( + self._ready_objs.popleft(), ignore_unknown_fields=True + ) + elif issubclass(self._response_message_cls, google.protobuf.message.Message): + return Parse(self._ready_objs.popleft(), self._response_message_cls()) + else: + raise ValueError( + "Response message class must be a subclass of proto.Message or google.protobuf.message.Message." + ) + + def __iter__(self): + return self