Skip to content

Commit

Permalink
🐛 Source SalesForce: better detect API type (airbytehq#16086)
Browse files Browse the repository at this point in the history
Signed-off-by: Sergey Chvalyuk <grubberr@gmail.com>
  • Loading branch information
grubberr authored and robbinhan committed Sep 29, 2022
1 parent 0fa284d commit 733c07c
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,7 @@
- name: Salesforce
sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962
dockerRepository: airbyte/source-salesforce
dockerImageTag: 1.0.14
dockerImageTag: 1.0.15
documentationUrl: https://docs.airbyte.io/integrations/sources/salesforce
icon: salesforce.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8654,7 +8654,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-salesforce:1.0.14"
- dockerImage: "airbyte/source-salesforce:1.0.15"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/salesforce"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ RUN pip install .

ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=1.0.14
LABEL io.airbyte.version=1.0.15
LABEL io.airbyte.name=airbyte/source-salesforce
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
#

import concurrent.futures
import logging
from typing import Any, List, Mapping, Optional, Tuple

import requests # type: ignore[import]
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.models import ConfiguredAirbyteCatalog
from requests import adapters as request_adapters
from requests.exceptions import HTTPError, RequestException # type: ignore[import]
Expand Down Expand Up @@ -178,9 +178,12 @@


class Salesforce:
logger = AirbyteLogger()
logger = logging.getLogger("airbyte")
version = "v52.0"
parallel_tasks_size = 100
# https://developer.salesforce.com/docs/atlas.en-us.salesforce_app_limits_cheatsheet.meta/salesforce_app_limits_cheatsheet/salesforce_app_limits_platform_api.htm
# Request Size Limits
REQUEST_SIZE_LIMITS = 16_384

def __init__(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,26 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->

return True, None

@classmethod
def _get_api_type(cls, stream_name, properties, stream_state):
# Salesforce BULK API currently does not support loading fields with data type base64 and compound data
properties_not_supported_by_bulk = {
key: value for key, value in properties.items() if value.get("format") == "base64" or "object" in value["type"]
}
properties_length = len(",".join(p for p in properties))

rest_required = stream_name in UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS or properties_not_supported_by_bulk
# If we have a lot of properties we can overcome REST API URL length and get an error: "reason: URI Too Long".
# For such cases connector tries to use BULK API because it uses POST request and passes properties in the request body.
bulk_required = properties_length + 2000 > Salesforce.REQUEST_SIZE_LIMITS

if bulk_required and not rest_required:
return "bulk"
elif rest_required and not bulk_required:
return "rest"
elif not bulk_required and not rest_required:
return "rest" if stream_state else "bulk"

@classmethod
def generate_streams(
cls,
Expand All @@ -51,19 +71,15 @@ def generate_streams(
for stream_name, sobject_options in stream_objects.items():
streams_kwargs = {"sobject_options": sobject_options}
stream_state = state.get(stream_name, {}) if state else {}

selected_properties = stream_properties.get(stream_name, {}).get("properties", {})
# Salesforce BULK API currently does not support loading fields with data type base64 and compound data
properties_not_supported_by_bulk = {
key: value for key, value in selected_properties.items() if value.get("format") == "base64" or "object" in value["type"]
}

if stream_state or stream_name in UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS or properties_not_supported_by_bulk:
# Use REST API
api_type = cls._get_api_type(stream_name, selected_properties, stream_state)
if api_type == "rest":
full_refresh, incremental = SalesforceStream, IncrementalSalesforceStream
else:
# Use BULK API
elif api_type == "bulk":
full_refresh, incremental = BulkSalesforceStream, BulkIncrementalSalesforceStream
else:
raise Exception(f"Stream {stream_name} cannot be processed by REST or BULK API.")

json_schema = stream_properties.get(stream_name, {})
pk, replication_key = sf_object.get_pk_and_replication_key(json_schema)
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/salesforce.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ Now that you have set up the Salesforce source connector, check out the followin

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------|
| 1.0.15 | 2022-08-30 | [16086](https://github.com/airbytehq/airbyte/pull/16086) | Improve API type detection |
| 1.0.14 | 2022-08-29 | [16119](https://github.com/airbytehq/airbyte/pull/16119) | Exclude `KnowledgeArticleVersion` from using bulk API |
| 1.0.13 | 2022-08-23 | [15901](https://github.com/airbytehq/airbyte/pull/15901) | Exclude `KnowledgeArticle` from using bulk API |
| 1.0.12 | 2022-08-09 | [15444](https://github.com/airbytehq/airbyte/pull/15444) | Fixed bug when `Bulk Job` was timeout by the connector, but remained running on the server |
Expand Down

0 comments on commit 733c07c

Please sign in to comment.