Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source SalesForce: Fix date filter behaviour #34107

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: b117307c-14b6-41aa-9422-947e34922962
dockerImageTag: 2.2.2
dockerImageTag: 2.2.3
dockerRepository: airbyte/source-salesforce
documentationUrl: https://docs.airbyte.com/integrations/sources/salesforce
githubIssueLabel: source-salesforce
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -721,9 +721,9 @@ def request_params(
where_conditions = []

if start_date:
where_conditions.append(f"{self.cursor_field} >= {start_date}")
where_conditions.append(f"{self.cursor_field} > {start_date}")
if end_date:
where_conditions.append(f"{self.cursor_field} < {end_date}")
where_conditions.append(f"{self.cursor_field} <= {end_date}")

where_clause = f"WHERE {' AND '.join(where_conditions)}"
query = f"SELECT {select_fields} FROM {table_name} {where_clause}"
Expand All @@ -744,9 +744,9 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
max_possible_value = min(latest_record_value, slice_max_value)
if current_stream_state.get(self.cursor_field):
if latest_record_value > slice_max_value:
return {self.cursor_field: max_possible_value.isoformat()}
return {self.cursor_field: max_possible_value.isoformat(timespec="milliseconds")}
max_possible_value = max(latest_record_value, pendulum.parse(current_stream_state[self.cursor_field]))
return {self.cursor_field: max_possible_value.isoformat()}
return {self.cursor_field: max_possible_value.isoformat(timespec="milliseconds")}


class BulkIncrementalSalesforceStream(BulkSalesforceStream, IncrementalRestSalesforceStream):
Expand All @@ -755,15 +755,9 @@ class BulkIncrementalSalesforceStream(BulkSalesforceStream, IncrementalRestSales
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
start_date = stream_slice["start_date"]
end_date = stream_slice["end_date"]

start_date, end_date = stream_slice["start_date"], stream_slice["end_date"]
select_fields = self.get_query_select_fields()
table_name = self.name
where_conditions = [f"{self.cursor_field} >= {start_date}", f"{self.cursor_field} < {end_date}"]

where_clause = f"WHERE {' AND '.join(where_conditions)}"
query = f"SELECT {select_fields} FROM {table_name} {where_clause}"
query = f"SELECT {select_fields} FROM {self.name} WHERE {self.cursor_field} > {start_date} AND {self.cursor_field} <= {end_date}"
return {"q": query}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ def test_rate_limit_bulk(stream_config, stream_api, bulk_catalog, state):
assert len(records) == 6 # stream page size: 6

state_record = [item for item in result if item.type == Type.STATE][0]
assert state_record.state.data["Account"]["LastModifiedDate"] == "2021-10-05T00:00:00+00:00" # state checkpoint interval is 5.
assert state_record.state.data["Account"]["LastModifiedDate"] == "2021-10-05T00:00:00.000+00:00" # state checkpoint interval is 5.


def test_rate_limit_rest(stream_config, stream_api, rest_catalog, state):
Expand Down Expand Up @@ -489,7 +489,7 @@ def test_rate_limit_rest(stream_config, stream_api, rest_catalog, state):
assert len(records) == 5

state_record = [item for item in result if item.type == Type.STATE][0]
assert state_record.state.data["KnowledgeArticle"]["LastModifiedDate"] == "2021-11-17T00:00:00+00:00"
assert state_record.state.data["KnowledgeArticle"]["LastModifiedDate"] == "2021-11-17T00:00:00.000+00:00"


def test_pagination_rest(stream_config, stream_api):
Expand Down Expand Up @@ -943,29 +943,29 @@ def test_bulk_stream_request_params_states(stream_config_date_format, stream_api
actual_state_values = [item.state.data.get("Account").get(stream.cursor_field) for item in result if item.type == Type.STATE]
# assert request params
assert (
"LastModifiedDate >= 2023-01-01T10:10:10.000+00:00 AND LastModifiedDate < 2023-01-31T10:10:10.000+00:00"
"LastModifiedDate > 2023-01-01T10:10:10.000+00:00 AND LastModifiedDate <= 2023-01-31T10:10:10.000+00:00"
in queries_history.request_history[0].text
)
assert (
"LastModifiedDate >= 2023-01-31T10:10:10.000+00:00 AND LastModifiedDate < 2023-03-02T10:10:10.000+00:00"
"LastModifiedDate > 2023-01-31T10:10:10.000+00:00 AND LastModifiedDate <= 2023-03-02T10:10:10.000+00:00"
in queries_history.request_history[1].text
)
assert (
"LastModifiedDate >= 2023-03-02T10:10:10.000+00:00 AND LastModifiedDate < 2023-04-01T00:00:00.000+00:00"
"LastModifiedDate > 2023-03-02T10:10:10.000+00:00 AND LastModifiedDate <= 2023-04-01T00:00:00.000+00:00"
in queries_history.request_history[2].text
)

# assert states
# if connector meets record with cursor `2023-04-01` out of current slice range 2023-01-31 <> 2023-03-02, we ignore all other values and set state to slice end_date
expected_state_values = ["2023-01-15T00:00:00+00:00", "2023-03-02T10:10:10+00:00", "2023-04-01T00:00:00+00:00"]
expected_state_values = ["2023-01-15T00:00:00.000+00:00", "2023-03-02T10:10:10.000+00:00", "2023-04-01T00:00:00.000+00:00"]
assert actual_state_values == expected_state_values


def test_request_params_incremental(stream_config_date_format, stream_api):
stream = generate_stream("ContentDocument", stream_config_date_format, stream_api)
params = stream.request_params(stream_state={}, stream_slice={'start_date': '2020', 'end_date': '2021'})

assert params == {'q': 'SELECT LastModifiedDate, Id FROM ContentDocument WHERE LastModifiedDate >= 2020 AND LastModifiedDate < 2021'}
assert params == {'q': 'SELECT LastModifiedDate, Id FROM ContentDocument WHERE LastModifiedDate > 2020 AND LastModifiedDate <= 2021'}


def test_request_params_substream(stream_config_date_format, stream_api):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

import json
import os
from unittest.mock import Mock

import pytest
Expand All @@ -19,14 +20,14 @@ def time_sleep_mock(mocker):

@pytest.fixture(scope="module")
def bulk_catalog():
with open("unit_tests/bulk_catalog.json") as f:
with open(f"{os.path.dirname(__file__)}/bulk_catalog.json") as f:
data = json.loads(f.read())
return ConfiguredAirbyteCatalog.parse_obj(data)


@pytest.fixture(scope="module")
def rest_catalog():
with open("unit_tests/rest_catalog.json") as f:
with open(f"{os.path.dirname(__file__)}/rest_catalog.json") as f:
data = json.loads(f.read())
return ConfiguredAirbyteCatalog.parse_obj(data)

Expand Down
4 changes: 2 additions & 2 deletions docs/integrations/sources/salesforce.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ To obtain these credentials, follow [this walkthrough](https://medium.com/@bpmme

The Salesforce source connector supports the following sync modes:

- (Recommended)[ Incremental Sync - Append + Deduped](https://docs.airbyte.com/understanding-airbyte/connections/incremental-append-deduped)
- [Full Refresh - Overwrite](https://docs.airbyte.com/understanding-airbyte/connections/full-refresh-overwrite/)
- [Full Refresh - Append](https://docs.airbyte.com/understanding-airbyte/connections/full-refresh-append)
- [Incremental Sync - Append](https://docs.airbyte.com/understanding-airbyte/connections/incremental-append)
Expand Down Expand Up @@ -193,7 +192,8 @@ Now that you have set up the Salesforce source connector, check out the followin

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------|
| 2.2.2 | 2024-01-04 | [33936](https://github.com/airbytehq/airbyte/pull/33936) | Prepare for airbyte-lib |
| 2.2.3 | 2024-01-10 | [34107](https://github.com/airbytehq/airbyte/pull/34107) | Fix date filter behaviour |
| 2.2.2 | 2024-01-04 | [33936](https://github.com/airbytehq/airbyte/pull/33936) | Prepare for airbyte-lib |
| 2.2.1 | 2023-12-12 | [33342](https://github.com/airbytehq/airbyte/pull/33342) | Added new ContentDocumentLink stream |
| 2.2.0 | 2023-12-12 | [33350](https://github.com/airbytehq/airbyte/pull/33350) | Sync streams concurrently on full refresh |
| 2.1.6 | 2023-11-28 | [32535](https://github.com/airbytehq/airbyte/pull/32535) | Run full refresh syncs concurrently |
Expand Down
Loading