Skip to content

Commit

Permalink
Source Salesforce: add get_error_display_message for ConnectionError (#…
Browse files Browse the repository at this point in the history
…18753)

Signed-off-by: Sergey Chvalyuk <grubberr@gmail.com>
  • Loading branch information
grubberr authored and nataly committed Nov 3, 2022
1 parent e242dd6 commit c0f93a6
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1073,7 +1073,7 @@
- name: Salesforce
sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962
dockerRepository: airbyte/source-salesforce
dockerImageTag: 1.0.22
dockerImageTag: 1.0.23
documentationUrl: https://docs.airbyte.com/integrations/sources/salesforce
icon: salesforce.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10698,7 +10698,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-salesforce:1.0.22"
- dockerImage: "airbyte/source-salesforce:1.0.23"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/salesforce"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ RUN pip install .

ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=1.0.22
LABEL io.airbyte.version=1.0.23
LABEL io.airbyte.name=airbyte/source-salesforce
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ def get_stream(input_config: Mapping[str, Any], stream_name: str) -> Stream:
stream_cls = type("a", (object,), {"name": stream_name})
configured_stream_cls = type("b", (object,), {"stream": stream_cls()})
catalog_cls = type("c", (object,), {"streams": [configured_stream_cls()]})
return SourceSalesforce().streams(input_config, catalog_cls())[0]
source = SourceSalesforce()
source.catalog = catalog_cls()
return source.streams(input_config)[0]


def get_any_real_stream(input_config: Mapping[str, Any]) -> Stream:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,33 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import logging
from typing import Any, Iterator, List, Mapping, MutableMapping, Optional, Tuple, Union

import requests
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.models import AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog
from airbyte_cdk.models import AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
from airbyte_cdk.sources.utils.schema_helpers import split_config
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from requests import codes, exceptions # type: ignore[import]

from .api import UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS, UNSUPPORTED_FILTERING_STREAMS, Salesforce
from .streams import BulkIncrementalSalesforceStream, BulkSalesforceStream, Describe, IncrementalSalesforceStream, SalesforceStream


class AirbyteStopSync(AirbyteTracedException):
pass


class SourceSalesforce(AbstractSource):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.catalog = None

@staticmethod
def _get_sf_object(config: Mapping[str, Any]) -> Salesforce:
sf = Salesforce(**config)
Expand Down Expand Up @@ -94,56 +104,41 @@ def generate_streams(

return streams

def streams(self, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog = None) -> List[Stream]:
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
sf = self._get_sf_object(config)
stream_objects = sf.get_validated_streams(config=config, catalog=catalog)
stream_objects = sf.get_validated_streams(config=config, catalog=self.catalog)
streams = self.generate_streams(config, stream_objects, sf)
streams.append(Describe(sf_api=sf, catalog=catalog))
streams.append(Describe(sf_api=sf, catalog=self.catalog))
return streams

def read(
self,
logger: AirbyteLogger,
logger: logging.Logger,
config: Mapping[str, Any],
catalog: ConfiguredAirbyteCatalog,
state: Union[List[AirbyteStateMessage], MutableMapping[str, Any]] = None,
) -> Iterator[AirbyteMessage]:
"""
Overwritten to dynamically receive only those streams that are necessary for reading for significant speed gains
(Salesforce has a strict API limit on requests).
"""
config, internal_config = split_config(config)
# get the streams once in case the connector needs to make any queries to generate them
logger.info("Starting generating streams")
stream_instances = {s.name: s for s in self.streams(config, catalog=catalog)}
state_manager = ConnectorStateManager(stream_instance_map=stream_instances, state=state)
logger.info(f"Starting syncing {self.name}")
self._stream_to_instance_map = stream_instances
for configured_stream in catalog.streams:
stream_instance = stream_instances.get(configured_stream.stream.name)
if not stream_instance:
raise KeyError(
f"The requested stream {configured_stream.stream.name} was not found in the source. Available streams: {stream_instances.keys()}"
)

try:
yield from self._read_stream(
logger=logger,
stream_instance=stream_instance,
configured_stream=configured_stream,
state_manager=state_manager,
internal_config=internal_config,
)
except exceptions.HTTPError as error:
error_data = error.response.json()[0]
error_code = error_data.get("errorCode")
if error.response.status_code == codes.FORBIDDEN and error_code == "REQUEST_LIMIT_EXCEEDED":
logger.warn(f"API Call limit is exceeded. Error message: '{error_data.get('message')}'")
break # if got 403 rate limit response, finish the sync with success.
raise error

except Exception as e:
logger.exception(f"Encountered an exception while reading stream {self.name}")
raise e
# save for use inside streams method
self.catalog = catalog
try:
yield from super().read(logger, config, catalog, state)
except AirbyteStopSync:
logger.info(f"Finished syncing {self.name}")

logger.info(f"Finished syncing {self.name}")
def _read_stream(
self,
logger: logging.Logger,
stream_instance: Stream,
configured_stream: ConfiguredAirbyteStream,
state_manager: ConnectorStateManager,
internal_config: InternalConfig,
) -> Iterator[AirbyteMessage]:
try:
yield from super()._read_stream(logger, stream_instance, configured_stream, state_manager, internal_config)
except exceptions.HTTPError as error:
error_data = error.response.json()[0]
error_code = error_data.get("errorCode")
if error.response.status_code == codes.FORBIDDEN and error_code == "REQUEST_LIMIT_EXCEEDED":
logger.warn(f"API Call limit is exceeded. Error message: '{error_data.get('message')}'")
raise AirbyteStopSync() # if got 403 rate limit response, finish the sync with success.
raise error
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ def read_records(
return
raise error

def get_error_display_message(self, exception: BaseException) -> Optional[str]:
if isinstance(exception, exceptions.ConnectionError):
return f"After {self.max_retries} retries the connector has failed with a network error. It looks like Salesforce API experienced temporary instability, please try again later."
return super().get_error_display_message(exception)


class BulkSalesforceStream(SalesforceStream):
page_size = 15000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,9 @@ def test_forwarding_sobject_options(stream_config, stream_names, catalog_stream_
],
},
)
streams = SourceSalesforce().streams(config=stream_config, catalog=catalog)
source = SourceSalesforce()
source.catalog = catalog
streams = source.streams(config=stream_config)
expected_names = catalog_stream_names if catalog else stream_names
assert not set(expected_names).symmetric_difference(set(stream.name for stream in streams)), "doesn't match excepted streams"

Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/salesforce.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ Now that you have set up the Salesforce source connector, check out the followin

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------- |
| 1.0.23 | 2022-11-01 | [18753](https://github.com/airbytehq/airbyte/pull/18753) | Add error_display_message for ConnectionError |
| 1.0.22 | 2022-10-12 | [17615](https://github.com/airbytehq/airbyte/pull/17615) | Make paging work, if `cursor_field` is not changed inside one page |
| 1.0.21 | 2022-10-10 | [17778](https://github.com/airbytehq/airbyte/pull/17778) | Add `EventWhoRelation` to the list of unsupported Bulk API objects. |
| 1.0.20 | 2022-09-30 | [17453](https://github.com/airbytehq/airbyte/pull/17453) | Check objects that are not supported by the Bulk API (v52.0) |
Expand Down

0 comments on commit c0f93a6

Please sign in to comment.