From c0f93a6ef7423a537ee96eff60591b717bf689ae Mon Sep 17 00:00:00 2001 From: Serhii Chvaliuk Date: Tue, 1 Nov 2022 15:48:04 +0200 Subject: [PATCH] Source Salesforce: add get_error_display_message for ConnectionError (#18753) Signed-off-by: Sergey Chvalyuk --- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 2 +- .../connectors/source-salesforce/Dockerfile | 2 +- .../integration_tests/bulk_error_test.py | 4 +- .../source_salesforce/source.py | 83 +++++++++---------- .../source_salesforce/streams.py | 5 ++ .../source-salesforce/unit_tests/api_test.py | 4 +- docs/integrations/sources/salesforce.md | 1 + 8 files changed, 54 insertions(+), 49 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index ec5957656d4a..dace40db9293 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -1073,7 +1073,7 @@ - name: Salesforce sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962 dockerRepository: airbyte/source-salesforce - dockerImageTag: 1.0.22 + dockerImageTag: 1.0.23 documentationUrl: https://docs.airbyte.com/integrations/sources/salesforce icon: salesforce.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 2788eaf7c4c2..9d52680a839b 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -10698,7 +10698,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-salesforce:1.0.22" +- dockerImage: "airbyte/source-salesforce:1.0.23" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/salesforce" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-salesforce/Dockerfile b/airbyte-integrations/connectors/source-salesforce/Dockerfile index 72ddaf32b2d9..567f40689cff 100644 --- a/airbyte-integrations/connectors/source-salesforce/Dockerfile +++ b/airbyte-integrations/connectors/source-salesforce/Dockerfile @@ -13,5 +13,5 @@ RUN pip install . ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=1.0.22 +LABEL io.airbyte.version=1.0.23 LABEL io.airbyte.name=airbyte/source-salesforce diff --git a/airbyte-integrations/connectors/source-salesforce/integration_tests/bulk_error_test.py b/airbyte-integrations/connectors/source-salesforce/integration_tests/bulk_error_test.py index bc75b186d4a7..c7755e638aac 100644 --- a/airbyte-integrations/connectors/source-salesforce/integration_tests/bulk_error_test.py +++ b/airbyte-integrations/connectors/source-salesforce/integration_tests/bulk_error_test.py @@ -33,7 +33,9 @@ def get_stream(input_config: Mapping[str, Any], stream_name: str) -> Stream: stream_cls = type("a", (object,), {"name": stream_name}) configured_stream_cls = type("b", (object,), {"stream": stream_cls()}) catalog_cls = type("c", (object,), {"streams": [configured_stream_cls()]}) - return SourceSalesforce().streams(input_config, catalog_cls())[0] + source = SourceSalesforce() + source.catalog = catalog_cls() + return source.streams(input_config)[0] def get_any_real_stream(input_config: Mapping[str, Any]) -> Stream: diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py index 743d8ac8c4e8..753819c56c14 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py @@ -2,23 +2,33 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # +import logging from typing import Any, Iterator, List, Mapping, MutableMapping, Optional, Tuple, Union import requests from airbyte_cdk import AirbyteLogger -from airbyte_cdk.models import AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog +from airbyte_cdk.models import AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator -from airbyte_cdk.sources.utils.schema_helpers import split_config +from airbyte_cdk.sources.utils.schema_helpers import InternalConfig +from airbyte_cdk.utils.traced_exception import AirbyteTracedException from requests import codes, exceptions # type: ignore[import] from .api import UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS, UNSUPPORTED_FILTERING_STREAMS, Salesforce from .streams import BulkIncrementalSalesforceStream, BulkSalesforceStream, Describe, IncrementalSalesforceStream, SalesforceStream +class AirbyteStopSync(AirbyteTracedException): + pass + + class SourceSalesforce(AbstractSource): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.catalog = None + @staticmethod def _get_sf_object(config: Mapping[str, Any]) -> Salesforce: sf = Salesforce(**config) @@ -94,56 +104,41 @@ def generate_streams( return streams - def streams(self, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog = None) -> List[Stream]: + def streams(self, config: Mapping[str, Any]) -> List[Stream]: sf = self._get_sf_object(config) - stream_objects = sf.get_validated_streams(config=config, catalog=catalog) + stream_objects = sf.get_validated_streams(config=config, catalog=self.catalog) streams = self.generate_streams(config, stream_objects, sf) - streams.append(Describe(sf_api=sf, catalog=catalog)) + streams.append(Describe(sf_api=sf, catalog=self.catalog)) return streams def read( self, - logger: AirbyteLogger, + logger: logging.Logger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: Union[List[AirbyteStateMessage], MutableMapping[str, Any]] = None, ) -> Iterator[AirbyteMessage]: - """ - Overwritten to dynamically receive only those streams that are necessary for reading for significant speed gains - (Salesforce has a strict API limit on requests). - """ - config, internal_config = split_config(config) - # get the streams once in case the connector needs to make any queries to generate them - logger.info("Starting generating streams") - stream_instances = {s.name: s for s in self.streams(config, catalog=catalog)} - state_manager = ConnectorStateManager(stream_instance_map=stream_instances, state=state) - logger.info(f"Starting syncing {self.name}") - self._stream_to_instance_map = stream_instances - for configured_stream in catalog.streams: - stream_instance = stream_instances.get(configured_stream.stream.name) - if not stream_instance: - raise KeyError( - f"The requested stream {configured_stream.stream.name} was not found in the source. Available streams: {stream_instances.keys()}" - ) - - try: - yield from self._read_stream( - logger=logger, - stream_instance=stream_instance, - configured_stream=configured_stream, - state_manager=state_manager, - internal_config=internal_config, - ) - except exceptions.HTTPError as error: - error_data = error.response.json()[0] - error_code = error_data.get("errorCode") - if error.response.status_code == codes.FORBIDDEN and error_code == "REQUEST_LIMIT_EXCEEDED": - logger.warn(f"API Call limit is exceeded. Error message: '{error_data.get('message')}'") - break # if got 403 rate limit response, finish the sync with success. - raise error - - except Exception as e: - logger.exception(f"Encountered an exception while reading stream {self.name}") - raise e + # save for use inside streams method + self.catalog = catalog + try: + yield from super().read(logger, config, catalog, state) + except AirbyteStopSync: + logger.info(f"Finished syncing {self.name}") - logger.info(f"Finished syncing {self.name}") + def _read_stream( + self, + logger: logging.Logger, + stream_instance: Stream, + configured_stream: ConfiguredAirbyteStream, + state_manager: ConnectorStateManager, + internal_config: InternalConfig, + ) -> Iterator[AirbyteMessage]: + try: + yield from super()._read_stream(logger, stream_instance, configured_stream, state_manager, internal_config) + except exceptions.HTTPError as error: + error_data = error.response.json()[0] + error_code = error_data.get("errorCode") + if error.response.status_code == codes.FORBIDDEN and error_code == "REQUEST_LIMIT_EXCEEDED": + logger.warn(f"API Call limit is exceeded. Error message: '{error_data.get('message')}'") + raise AirbyteStopSync() # if got 403 rate limit response, finish the sync with success. + raise error diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index 89d24aba86a3..d141ce57a76e 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -130,6 +130,11 @@ def read_records( return raise error + def get_error_display_message(self, exception: BaseException) -> Optional[str]: + if isinstance(exception, exceptions.ConnectionError): + return f"After {self.max_retries} retries the connector has failed with a network error. It looks like Salesforce API experienced temporary instability, please try again later." + return super().get_error_display_message(exception) + class BulkSalesforceStream(SalesforceStream): page_size = 15000 diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py index db977d9c13a1..1a997e00bb10 100644 --- a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py @@ -498,7 +498,9 @@ def test_forwarding_sobject_options(stream_config, stream_names, catalog_stream_ ], }, ) - streams = SourceSalesforce().streams(config=stream_config, catalog=catalog) + source = SourceSalesforce() + source.catalog = catalog + streams = source.streams(config=stream_config) expected_names = catalog_stream_names if catalog else stream_names assert not set(expected_names).symmetric_difference(set(stream.name for stream in streams)), "doesn't match excepted streams" diff --git a/docs/integrations/sources/salesforce.md b/docs/integrations/sources/salesforce.md index 900b17d4a174..0c51319d0605 100644 --- a/docs/integrations/sources/salesforce.md +++ b/docs/integrations/sources/salesforce.md @@ -121,6 +121,7 @@ Now that you have set up the Salesforce source connector, check out the followin | Version | Date | Pull Request | Subject | | :------ | :--------- | :------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------- | +| 1.0.23 | 2022-11-01 | [18753](https://github.com/airbytehq/airbyte/pull/18753) | Add error_display_message for ConnectionError | | 1.0.22 | 2022-10-12 | [17615](https://github.com/airbytehq/airbyte/pull/17615) | Make paging work, if `cursor_field` is not changed inside one page | | 1.0.21 | 2022-10-10 | [17778](https://github.com/airbytehq/airbyte/pull/17778) | Add `EventWhoRelation` to the list of unsupported Bulk API objects. | | 1.0.20 | 2022-09-30 | [17453](https://github.com/airbytehq/airbyte/pull/17453) | Check objects that are not supported by the Bulk API (v52.0) |