diff --git a/airbyte-cdk/python/airbyte_cdk/connector_builder/message_grouper.py b/airbyte-cdk/python/airbyte_cdk/connector_builder/message_grouper.py index dd6298c2630b..b787fe5d43c9 100644 --- a/airbyte-cdk/python/airbyte_cdk/connector_builder/message_grouper.py +++ b/airbyte-cdk/python/airbyte_cdk/connector_builder/message_grouper.py @@ -31,7 +31,6 @@ AirbyteMessage, AirbyteTraceMessage, ConfiguredAirbyteCatalog, - Level, OrchestratorType, TraceType, ) @@ -126,7 +125,6 @@ def _get_message_groups( current_slice_pages: List[StreamReadPages] = [] current_page_request: Optional[HttpRequest] = None current_page_response: Optional[HttpResponse] = None - had_error = False while records_count < limit and (message := next(messages, None)): json_object = self._parse_json(message.log) if message.type == MessageType.LOG else None @@ -134,7 +132,7 @@ def _get_message_groups( raise ValueError(f"Expected log message to be a dict, got {json_object} of type {type(json_object)}") json_message: Optional[Dict[str, JsonType]] = json_object if self._need_to_close_page(at_least_one_page_in_group, message, json_message): - self._close_page(current_page_request, current_page_response, current_slice_pages, current_page_records, True) + self._close_page(current_page_request, current_page_response, current_slice_pages, current_page_records) current_page_request = None current_page_response = None @@ -172,12 +170,9 @@ def _get_message_groups( current_page_request = self._create_request_from_log_message(json_message) current_page_response = self._create_response_from_log_message(json_message) else: - if message.log.level == Level.ERROR: - had_error = True yield message.log elif message.type == MessageType.TRACE: if message.trace.type == TraceType.ERROR: - had_error = True yield message.trace elif message.type == MessageType.RECORD: current_page_records.append(message.record.data) @@ -187,8 +182,9 @@ def _get_message_groups( elif message.type == MessageType.CONTROL and message.control.type == OrchestratorType.CONNECTOR_CONFIG: yield message.control else: - self._close_page(current_page_request, current_page_response, current_slice_pages, current_page_records, validate_page_complete=not had_error) - yield StreamReadSlices(pages=current_slice_pages, slice_descriptor=current_slice_descriptor) + if current_page_request or current_page_response or current_page_records: + self._close_page(current_page_request, current_page_response, current_slice_pages, current_page_records) + yield StreamReadSlices(pages=current_slice_pages, slice_descriptor=current_slice_descriptor) @staticmethod def _need_to_close_page(at_least_one_page_in_group: bool, message: AirbyteMessage, json_message: Optional[Dict[str, Any]]) -> bool: @@ -224,15 +220,10 @@ def _is_auxiliary_http_request(message: Optional[Dict[str, Any]]) -> bool: return is_http and message.get("http", {}).get("is_auxiliary", False) @staticmethod - def _close_page(current_page_request: Optional[HttpRequest], current_page_response: Optional[HttpResponse], current_slice_pages: List[StreamReadPages], current_page_records: List[Mapping[str, Any]], validate_page_complete: bool) -> None: + def _close_page(current_page_request: Optional[HttpRequest], current_page_response: Optional[HttpResponse], current_slice_pages: List[StreamReadPages], current_page_records: List[Mapping[str, Any]]) -> None: """ Close a page when parsing message groups - @param validate_page_complete: in some cases, we expect the CDK to not return a response. As of today, this will only happen before - an uncaught exception and therefore, the assumption is that `validate_page_complete=True` only on the last page that is being closed """ - if validate_page_complete and (not current_page_request or not current_page_response): - raise ValueError("Every message grouping should have at least one request and response") - current_slice_pages.append( StreamReadPages(request=current_page_request, response=current_page_response, records=deepcopy(current_page_records)) # type: ignore ) diff --git a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py index 4245ccc9d129..48d5884d03d0 100644 --- a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py +++ b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py @@ -510,9 +510,7 @@ def check_config_against_spec(self): response = read_stream(source, TEST_READ_CONFIG, ConfiguredAirbyteCatalog.parse_obj(CONFIGURED_CATALOG), limits) expected_stream_read = StreamRead(logs=[LogMessage("error_message - a stack trace", "ERROR")], - slices=[StreamReadSlices( - pages=[StreamReadPages(records=[], request=None, response=None)], - slice_descriptor=None, state=None)], + slices=[], test_read_limit_reached=False, auxiliary_requests=[], inferred_schema=None, diff --git a/airbyte-cdk/python/unit_tests/connector_builder/test_message_grouper.py b/airbyte-cdk/python/unit_tests/connector_builder/test_message_grouper.py index 015863c7e87d..67d437dfac91 100644 --- a/airbyte-cdk/python/unit_tests/connector_builder/test_message_grouper.py +++ b/airbyte-cdk/python/unit_tests/connector_builder/test_message_grouper.py @@ -367,25 +367,6 @@ def test_get_grouped_messages_no_records(mock_entrypoint_read: Mock) -> None: assert actual_page == expected_pages[i] -@patch('airbyte_cdk.connector_builder.message_grouper.AirbyteEntrypoint.read') -def test_get_grouped_messages_invalid_group_format(mock_entrypoint_read: Mock) -> None: - response = {"status_code": 200, "headers": {"field": "value"}, "body": '{"name": "field"}'} - - mock_source = make_mock_source(mock_entrypoint_read, iter( - [ - response_log_message(response), - record_message("hashiras", {"name": "Shinobu Kocho"}), - record_message("hashiras", {"name": "Muichiro Tokito"}), - ] - ) - ) - - api = MessageGrouper(MAX_PAGES_PER_SLICE, MAX_SLICES) - - with pytest.raises(ValueError): - api.get_message_groups(source=mock_source, config=CONFIG, configured_catalog=create_configured_catalog("hashiras")) - - @pytest.mark.parametrize( "log_message, expected_response", [ @@ -588,7 +569,7 @@ def test_given_multiple_control_messages_with_same_timestamp_then_stream_read_ha @patch('airbyte_cdk.connector_builder.message_grouper.AirbyteEntrypoint.read') -def test_given_auxiliary_requests_then_return_global_request(mock_entrypoint_read: Mock) -> None: +def test_given_auxiliary_requests_then_return_auxiliary_request(mock_entrypoint_read: Mock) -> None: mock_source = make_mock_source(mock_entrypoint_read, iter( any_request_and_response_with_a_record() + [ @@ -603,6 +584,17 @@ def test_given_auxiliary_requests_then_return_global_request(mock_entrypoint_rea assert len(stream_read.auxiliary_requests) == 1 +@patch('airbyte_cdk.connector_builder.message_grouper.AirbyteEntrypoint.read') +def test_given_no_slices_then_return_empty_slices(mock_entrypoint_read: Mock) -> None: + mock_source = make_mock_source(mock_entrypoint_read, iter([auxiliary_request_log_message()])) + connector_builder_handler = MessageGrouper(MAX_PAGES_PER_SLICE, MAX_SLICES) + stream_read: StreamRead = connector_builder_handler.get_message_groups( + source=mock_source, config=CONFIG, configured_catalog=create_configured_catalog("hashiras") + ) + + assert len(stream_read.slices) == 0 + + def make_mock_source(mock_entrypoint_read: Mock, return_value: Iterator[AirbyteMessage]) -> MagicMock: mock_source = MagicMock() mock_entrypoint_read.return_value = return_value