-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
✨ [source-zendesk] upgrade to CDK v6 to use concurrency for incremental streams #48379
✨ [source-zendesk] upgrade to CDK v6 to use concurrency for incremental streams #48379
Conversation
…update manifest to not use stream_state
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Skipped Deployment
|
@@ -183,7 +183,7 @@ def __init__( | |||
self._parser = DatetimeParser() | |||
|
|||
def output_format(self, timestamp: datetime) -> str: | |||
return timestamp.strftime(self._datetime_format) | |||
return self._parser.format(timestamp, self._datetime_format) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to #48361, although it adds a little bit of a dependency on the declarative framework, it's safer to retain the exact behavior by using the parser. And we can always move this code into concurrent cursor later
from source_zendesk_support import SourceZendeskSupport | ||
|
||
|
||
def _get_source(args: List[str]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels like this method should be imported and reused from CDK, rather than be applied to each source / destination separately.
Do we consider this in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes i think it would make sense given the repetition thusfar
launch(source, sys.argv[1:]) | ||
init_uncaught_exception_handler(logger) | ||
_args = sys.argv[1:] | ||
source = _get_source(_args) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_init_source
is a better name for this, wdyt?
Here are the CDK changes to support the change: #48389 |
61a12ec
to
79fbd5b
Compare
@@ -39,7 +39,7 @@ acceptance_tests: | |||
- config_path: "secrets/config.json" | |||
configured_catalog_path: "integration_tests/incremental_catalog.json" | |||
future_state: | |||
future_state_path: "integration_tests/abnormal_state.json" | |||
bypass_reason: "This test does not make sense using Concurrent CDK" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This however no longer holds true in the concurrent framework because the final state we emit is either latest synced record of a date window, or the lower bound of the window. We don’t want to use the upper bound of the window because its possible we’ve missed delayed records in that window. This was fixed in the PR. And this will now lead to the mentioned test always failing because instead of emitting the date impossibly far in the future, we emit the bottom of the window which is typically the start date.
We intend to delete this test eventually anyway. For more information, see https://airbytehq-team.slack.com/archives/C02U9R3AF37/p1730329381148769
assert result._token == expected | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"response, start_date, check_passed", | ||
[({"active_features": {"organization_access_enabled": True}}, "2020-01-01T00:00:00Z", True), ({}, "2020-01-00T00:00:00Z", False)], | ||
[([{"active_features": {"organization_access_enabled": True}}], "2020-01-01T00:00:00Z", True), ([], "2020-01-01T00:00:00Z", False)], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed two things:
- Mocked
UserSettingsStream.read_records
instaed ofUserSettingsStream.get_settings
to have better test coverage - Fixed the datetime
2020-01-00T00:00:00Z
that is not a valid date
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2020-01-00T00:00:00Z
== 2019-12-31T00:00:00Z
. everybody knows this
@@ -159,26 +152,25 @@ def test_default_start_date(): | |||
[ | |||
(TEST_CONFIG, "aW50ZWdyYXRpb24tdGVzdEBhaXJieXRlLmlvL3Rva2VuOmFwaV90b2tlbg=="), | |||
(TEST_CONFIG_OAUTH, "test_access_token"), | |||
(TEST_OLD_CONFIG, "aW50ZWdyYXRpb24tdGVzdEBhaXJieXRlLmlvL3Rva2VuOmFwaV90b2tlbg=="), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is interesting. Using the latest version of the source and this config:
{
"subdomain": "d3v-airbyte",
"start_date": "2022-06-01T00:00:00Z",
"ignore_pagination": true,
"auth_method": {
"auth_method": "api_token",
"email": <redacted>,
"api_token": <redacted>
}
}
... I would get the following error during a read:
{
"type": "TRACE",
"trace": {
"type": "ERROR",
"emitted_at": 1731073703195,
"error": {
"message": "Something went wrong in the connector. See the logs for more details.",
"internal_message": "The path from `authenticator_selection_path` is not found in the config.",
"stack_trace": "Traceback (most recent call last):
File \"/Users/maxime/devel/code/airbyte/airbyte-integrations/connectors/source-zendesk-support/.venv/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/auth/selective_authenticator.py\", line 30, in __new__
selected_key = str(dpath.get(config, authenticator_selection_path))
File \"/Users/maxime/devel/code/airbyte/airbyte-integrations/connectors/source-zendesk-support/.venv/lib/python3.10/site-packages/dpath/__init__.py\", line 192, in get
raise KeyError(glob)
KeyError: ['credentials', 'credentials']
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File \"/Users/maxime/devel/code/airbyte/airbyte-integrations/connectors/source-zendesk-support/main.py\", line 8, in <module>
run()
File \"/Users/maxime/devel/code/airbyte/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/run.py\", line 14, in run
launch(source, sys.argv[1:])
File \"/Users/maxime/devel/code/airbyte/airbyte-integrations/connectors/source-zendesk-support/.venv/lib/python3.10/site-packages/airbyte_cdk/entrypoint.py\", line 263, in launch
for message in source_entrypoint.run(parsed_args):
File \"/Users/maxime/devel/code/airbyte/airbyte-integrations/connectors/source-zendesk-support/.venv/lib/python3.10/site-packages/airbyte_cdk/entrypoint.py\", line 133, in run
yield from map(AirbyteEntrypoint.airbyte_message_to_string, self.read(source_spec, config, config_catalog, state))
File \"/Users/maxime/devel/code/airbyte/airbyte-integrations/connectors/source-zendesk-support/.venv/lib/python3.10/site-packages/airbyte_cdk/entrypoint.py\", line 191, in read
for message in self.source.read(self.logger, config, catalog, state):
File \"/Users/maxime/devel/code/airbyte/airbyte-integrations/connectors/source-zendesk-support/.venv/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/manifest_declarative_source.py\", line 165, in read
yield from super().read(logger, config, catalog, state)
File \"/Users/maxime/devel/code/airbyte/airbyte-integrations/connectors/source-zendesk-support/.venv/lib/python3.10/site-packages/airbyte_cdk/sources/abstract_source.py\", line 98, in read
stream_instances = {s.name: s for s in self.streams(config)}
File \"/Users/maxime/devel/code/airbyte/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/source.py\", line 176, in streams
declarative_streams = super().streams(args)
File \"/Users/maxime/devel/code/airbyte/airbyte-integrations/connectors/source-zendesk-support/.venv/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/manifest_declarative_source.py\", line 95, in streams
source_streams = [
File \"/Users/maxime/devel/code/airbyte/airbyte-integrations/connectors/source-zendesk-support/.venv/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/manifest_declarative_source.py\", line 96, in <listcomp>
self._constructor.create_component(
File \"/Users/maxime/devel/code/airbyte/airbyte-integrations/connectors/source-zendesk-support/.venv/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py\", line 292, in create_component
return self._create_component_from_model(model=declarative_component_model, config=config, **kwargs)
File \"/Users/maxime/devel/code/airbyte/airbyte-integrations/connectors/source-zendesk-support/.venv/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py\", line 300, in _create_component_from_model
return component_constructor(model=model, config=config, **kwargs)
File \"/Users/maxime/devel/code/airbyte/airbyte-integrations/connectors/source-zendesk-support/.venv/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py\", line 738, in create_declarative_stream
retriever = self._create_component_from_model(
File \"/Users/maxime/devel/code/airbyte/airbyte-integrations/connectors/source-zendesk-support/.venv/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py\", line 300, in _create_component_from_model
return component_constructor(model=model, config=config, **kwargs)
File \"/Users/maxime/devel/code/airbyte/airbyte-integrations/connectors/source-zendesk-support/.venv/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py\", line 1224, in create_simple_retriever
requester = self._create_component_from_model(model=model.requester, decoder=decoder, config=config, name=name)
File \"/Users/maxime/devel/code/airbyte/airbyte-integrations/connectors/source-zendesk-support/.venv/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py\", line 300, in _create_component_from_model
return component_constructor(model=model, config=config, **kwargs)
File \"/Users/maxime/devel/code/airbyte/airbyte-integrations/connectors/source-zendesk-support/.venv/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py\", line 897, in create_http_requester
self._create_component_from_model(model=model.authenticator, config=config, url_base=model.url_base, name=name, decoder=decoder)
File \"/Users/maxime/devel/code/airbyte/airbyte-integrations/connectors/source-zendesk-support/.venv/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py\", line 300, in _create_component_from_model
return component_constructor(model=model, config=config, **kwargs)
File \"/Users/maxime/devel/code/airbyte/airbyte-integrations/connectors/source-zendesk-support/.venv/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py\", line 1186, in create_selective_authenticator
return SelectiveAuthenticator( # type: ignore[abstract]
File \"/Users/maxime/devel/code/airbyte/airbyte-integrations/connectors/source-zendesk-support/.venv/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/auth/selective_authenticator.py\", line 32, in __new__
raise ValueError(\"The path from `authenticator_selection_path` is not found in the config.\") from err
ValueError: The path from `authenticator_selection_path` is not found in the config.
",
"failure_type": "system_error"
}
}
}
The config has been updated in our GSM on 3/15/22, 7:49 AM
. I would assume it was because of this change. However, the manifest.yaml never took into account the old config format [see first commit here). Hence, I assume that old configs don't work since the introduction of manifest.yaml for any source that was low-code.
Why is this breaking now? It is because we call streams
in __init__
now which instantiate those streams in order to dispatch them as concurrent_streams
or sequential_streams
. We've made this conscious decision with @brianjlai knowing the consequences because else, the variable concurrent_streams
and sequential_streams
would have been stateful (sometimes instantiated, some other times not) which would have had complexity in ConcurrentDeclarativeSource
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the detailed writeup, i think we can just accept that if a connector is still using a 2 1/2 year old config, we don't have to guarantee it's going to work and we can accept this type of failure might exist
def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional[Mapping[str, Any]], state: TState, **kwargs): | ||
# Before 2024-11-11, the config was being modified in `streams`. We can't do that anymore because `ConcurrentDeclarativeSource` use | ||
# the config to make concurrent components. Hence, the "main" config needs to be the declarative one and if Python sources need | ||
# something a bit different, it needs to deduce it from the declarative config |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without those lines, we would get the following error:
{
"type": "TRACE",
"trace": {
"type": "ERROR",
"emitted_at": 1731074703862,
"error": {
"message": "Error starting the sync. This could be due to an invalid configuration or catalog. Please contact Support for assistance. Error: time data 'None' does not match format '%Y-%m-%dT%H:%M:%SZ'",
"stack_trace": "Traceback (most recent call last):
File \"/Users/maxime/devel/code/airbyte/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/run.py\", line 22, in _get_source
return SourceZendeskSupport(
File \"/Users/maxime/devel/code/airbyte/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/source.py\", line 47, in __init__
super().__init__(catalog=catalog, config=config, state=state, **{\"path_to_yaml\": \"manifest.yaml\"})
File \"/Users/maxime/devel/code/airbyte/airbyte-integrations/connectors/source-zendesk-support/.venv/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/yaml_declarative_source.py\", line 31, in __init__
super().__init__(
File \"/Users/maxime/devel/code/airbyte/airbyte-integrations/connectors/source-zendesk-support/.venv/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/concurrent_declarative_source.py\", line 65, in __init__
self._concurrent_streams, self._synchronous_streams = self._group_streams(config=config or {})
File \"/Users/maxime/devel/code/airbyte/airbyte-integrations/connectors/source-zendesk-support/.venv/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/concurrent_declarative_source.py\", line 164, in _group_streams
cursor, connector_state_converter = self._constructor.create_concurrent_cursor_from_datetime_based_cursor(
File \"/Users/maxime/devel/code/airbyte/airbyte-integrations/connectors/source-zendesk-support/.venv/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py\", line 564, in create_concurrent_cursor_from_datetime_based_cursor
start_date = interpolated_start_date.get_datetime(config=config)
File \"/Users/maxime/devel/code/airbyte/airbyte-integrations/connectors/source-zendesk-support/.venv/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/datetime/min_max_datetime.py\", line 58, in get_datetime
time = self._parser.parse(str(self.datetime.eval(config, **additional_parameters)), datetime_format) # type: ignore # datetime is always cast to an interpolated string
File \"/Users/maxime/devel/code/airbyte/airbyte-integrations/connectors/source-zendesk-support/.venv/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/datetime/datetime_parser.py\", line 35, in parse
parsed_datetime = datetime.datetime.strptime(str(date), format)
File \"/Users/maxime/.pyenv/versions/3.10.11/lib/python3.10/_strptime.py\", line 568, in _strptime_datetime
tt, fraction, gmtoff_fraction = _strptime(data_string, format)
File \"/Users/maxime/.pyenv/versions/3.10.11/lib/python3.10/_strptime.py\", line 349, in _strptime
raise ValueError(\"time data %r does not match format %r\" %
ValueError: time data 'None' does not match format '%Y-%m-%dT%H:%M:%SZ'
"
}
}
}
The reason is that the start_date
is optional in the config but the child stream (SourceZendeskSupport
in this case) modify the config to instantiate the declarative streams, ConcurrentDeclarativeSource
do not see those changes and this might cause errors when grouping the streams as part of ConcurrentDeclarativeSource.__init__
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What still isn't obvious to me is why we have to inject the optional 2 year start date in code.
I'm assuming that this is failing for the semi incremental cursor here:
incremental_sync:
type: DatetimeBasedCursor
cursor_datetime_formats:
- "%Y-%m-%dT%H:%M:%SZ"
- "%Y-%m-%dT%H:%M:%S%z"
datetime_format: "%Y-%m-%dT%H:%M:%SZ"
cursor_field: "{{ parameters.get('cursor_field', 'updated_at') }}"
start_datetime:
datetime: "{{ config.get('start_date') }}"
I feel like we should be able to just add a condition to the interpolation to look back 2 years which simplify the python code that's required. Something like this. I'll experiment with why we didn't just do this instead of this extra config transformation logic
start_datetime:
datetime: "{{ config.get('start_date') or day_delta(-730, '%Y-%m-%dT%H:%M:%SZ') }}"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
noting that i removed this block of code because we can actually account for this using interpolation instead of custom python code:
in semi_incremental_stream.incremental_stream
:
"{{ config.get('start_date') or day_delta(-730, '%Y-%m-%dT%H:%M:%SZ') }}"
If start_date
is missing, then we'll default to two years previously, and i tested this removing it from a config and the sync will not experience this error
""" | ||
return { | ||
"subdomain": config["subdomain"], | ||
"start_date": config.get("start_date", cls.get_default_start_date()), | ||
"auth_type": config.get("auth_type"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't see any reference to auth_type
in the manifest hence why I removed this
@@ -1,5 +0,0 @@ | |||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maxi297 I think you mean to delete the abnormal_state.json
file for source-zendesk-support
instead of sunshine haha
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops! I'm clearly not the sharpest shooter in the west
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no worries, i'll fix this as i catch myself back up to this PR
… from the interpolation
…g regression test results
…i incremental syncs
Regression test results analysis: Analyzing results of: https://github.com/airbytehq/airbyte/actions/runs/11924723431/job/33235567364 After a few bug fixes and analyzing the latest results, there are mismatches. However, after inspecting the results and doing some local testing, these look to be expected mismatches. The main issue w/ the current regression results is because new data for the target is added so rapidly, we rarely get the exact count to align since streams are no longer run at almost the same time. I’ve added some notes during my testing. The main manual validation I did was adding in a end date filter to stop new records from causing mismatched counts Of the streams with mismatches that have been converted to concurrent streams:
Stream mismatches for other streams:
|
@@ -87,7 +87,7 @@ definitions: | |||
datetime_format: "%Y-%m-%dT%H:%M:%SZ" | |||
cursor_field: "{{ parameters.get('cursor_field', 'updated_at') }}" | |||
start_datetime: | |||
datetime: "{{ config.get('start_date') }}" | |||
datetime: "{{ config.get('start_date') or day_delta(-730, '%Y-%m-%dT%H:%M:%SZ') }}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIL about using day_delta
instead of hardcoding a default startdate. /cc @btkcodedev this is a cool trick!
@@ -98,7 +98,7 @@ definitions: | |||
datetime_format: "%s" | |||
cursor_field: "{{ parameters.get('cursor_field', 'updated_at') }}" | |||
start_datetime: | |||
datetime: "{{ timestamp(config.get('start_date')) | int or day_delta(-730, '%Y-%m-%dT%H:%M:%SZ') }}" | |||
datetime: "{{ timestamp(config.get('start_date')) | int if config.get('start_date') else day_delta(-730, '%s') }}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Weird, did this bug out previously? Seems unrelated to concurrency work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to do this because i got rid of the extra python code convert_config_to_declarative_stream_args()
so we're more declcarative and have less custom code.
And because we no longer automatically inject start_date
, we can't just use or
because the None | int
would fail types or something along those lines.
/approve-regression-tests Confirmed that the only other stream to validate
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks for being so diligent on this @brianjlai !
NOTE: This PR also includes the CDK changes needed to fix a few bugs related to stream state converting. I need them in the branch to build a version of zendesk with the local cdk. Once the changes are vetted with regression testing, I will create a separate PR to release the CDK changes
What
Upgrades
source-zendesk-support
to the latest CDKHow
Updated run.py, source.py to take in the required arguments
replace
stream_state
withstream_interval
in manifest which is thread safeadd concurrency level based on the API docs from zendesk
fix tests (i still need to fix 3 more)
todo: look into regression test failures
User Impact
Potential rate limiting since we are syncing at a faster rate by default
Can this PR be safely reverted and rolled back?
We still emit state in the legacy format to allow for rollbacks