Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source SalesForce: better detect API type #16086

Merged
merged 14 commits into from
Aug 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,7 @@
- name: Salesforce
sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962
dockerRepository: airbyte/source-salesforce
dockerImageTag: 1.0.14
dockerImageTag: 1.0.15
documentationUrl: https://docs.airbyte.io/integrations/sources/salesforce
icon: salesforce.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8654,7 +8654,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-salesforce:1.0.14"
- dockerImage: "airbyte/source-salesforce:1.0.15"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/salesforce"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ RUN pip install .

ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=1.0.14
LABEL io.airbyte.version=1.0.15
LABEL io.airbyte.name=airbyte/source-salesforce
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
#

import concurrent.futures
import logging
from typing import Any, List, Mapping, Optional, Tuple

import requests # type: ignore[import]
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.models import ConfiguredAirbyteCatalog
from requests import adapters as request_adapters
from requests.exceptions import HTTPError, RequestException # type: ignore[import]
Expand Down Expand Up @@ -178,9 +178,12 @@


class Salesforce:
logger = AirbyteLogger()
logger = logging.getLogger("airbyte")
version = "v52.0"
parallel_tasks_size = 100
# https://developer.salesforce.com/docs/atlas.en-us.salesforce_app_limits_cheatsheet.meta/salesforce_app_limits_cheatsheet/salesforce_app_limits_platform_api.htm
# Request Size Limits
REQUEST_SIZE_LIMITS = 16_384

def __init__(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,26 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->

return True, None

@classmethod
def _get_api_type(cls, stream_name, properties, stream_state):
# Salesforce BULK API currently does not support loading fields with data type base64 and compound data
properties_not_supported_by_bulk = {
key: value for key, value in properties.items() if value.get("format") == "base64" or "object" in value["type"]
}
properties_length = len(",".join(p for p in properties))

rest_required = stream_name in UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS or properties_not_supported_by_bulk
# If we have a lot of properties we can overcome REST API URL length and get an error: "reason: URI Too Long".
# For such cases connector tries to use BULK API because it uses POST request and passes properties in the request body.
bulk_required = properties_length + 2000 > Salesforce.REQUEST_SIZE_LIMITS
edgao marked this conversation as resolved.
Show resolved Hide resolved

if bulk_required and not rest_required:
edgao marked this conversation as resolved.
Show resolved Hide resolved
return "bulk"
elif rest_required and not bulk_required:
return "rest"
elif not bulk_required and not rest_required:
return "rest" if stream_state else "bulk"

@classmethod
def generate_streams(
cls,
Expand All @@ -51,19 +71,15 @@ def generate_streams(
for stream_name, sobject_options in stream_objects.items():
streams_kwargs = {"sobject_options": sobject_options}
stream_state = state.get(stream_name, {}) if state else {}

selected_properties = stream_properties.get(stream_name, {}).get("properties", {})
# Salesforce BULK API currently does not support loading fields with data type base64 and compound data
properties_not_supported_by_bulk = {
key: value for key, value in selected_properties.items() if value.get("format") == "base64" or "object" in value["type"]
}

if stream_state or stream_name in UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS or properties_not_supported_by_bulk:
# Use REST API
api_type = cls._get_api_type(stream_name, selected_properties, stream_state)
if api_type == "rest":
full_refresh, incremental = SalesforceStream, IncrementalSalesforceStream
else:
# Use BULK API
elif api_type == "bulk":
full_refresh, incremental = BulkSalesforceStream, BulkIncrementalSalesforceStream
else:
raise Exception(f"Stream {stream_name} cannot be processed by REST or BULK API.")

json_schema = stream_properties.get(stream_name, {})
pk, replication_key = sf_object.get_pk_and_replication_key(json_schema)
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/salesforce.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ Now that you have set up the Salesforce source connector, check out the followin

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------|
| 1.0.15 | 2022-08-30 | [16086](https://github.com/airbytehq/airbyte/pull/16086) | Improve API type detection |
| 1.0.14 | 2022-08-29 | [16119](https://github.com/airbytehq/airbyte/pull/16119) | Exclude `KnowledgeArticleVersion` from using bulk API |
| 1.0.13 | 2022-08-23 | [15901](https://github.com/airbytehq/airbyte/pull/15901) | Exclude `KnowledgeArticle` from using bulk API |
| 1.0.12 | 2022-08-09 | [15444](https://github.com/airbytehq/airbyte/pull/15444) | Fixed bug when `Bulk Job` was timeout by the connector, but remained running on the server |
Expand Down