-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Source twilio: implement rolling windows #12555
Source twilio: implement rolling windows #12555
Conversation
Thank you @alejandroRosano-muttdata for this contribution! Could you please sign the CLA? |
HI @alafanechere. |
I don't think so. This change was introduced in v0.35.31. But I don't think this means it will not be backward compatible with your current environment as the latest CDK version will be bundled in the connector container and it's agnostic from the Airbyte version you are using. The only obligation is make you PR branch from our |
airbyte-integrations/connectors/source-twilio/source_twilio/spec.json
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-twilio/source_twilio/streams.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-twilio/source_twilio/streams.py
Outdated
Show resolved
Hide resolved
Co-authored-by: Gonzalo Villafañe Tapia <gvillafanetapia@gmail.com>
Co-authored-by: Gonzalo Villafañe Tapia <gvillafanetapia@gmail.com>
@alafanechere ready for review 😁 |
/test connector=connectors/source-twilio
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gvillafanetapia could you please make sure the code is properly linted and formatted by running
./gradlew format
and the acceptance tests with ./gradlew :airbyte-integrations:connectors:source-twilio:integrationTest
.
The test I launched above because of an error running the flake
check: you have a bunch of unused imports.
./source_twilio/streams.py:7:1: F401 'asyncio.log.logger' imported but unused
./source_twilio/streams.py:10:1: F401 'typing.List' imported but unused
./source_twilio/streams.py:19:1: F401 'typing_extensions.Self' imported but unused
/test connector=connectors/source-twilio
|
having issues with gradle picking up the wrong version of python
|
Did you already work on the Airbyte repo in the past? I think you might need to delete all the old virtualenv that might have been created before we bumped the python version to 3.9. I personally used this command, but be careful this is destructive 😄 : |
Hey @alejandroRosano-muttdata did you try to recreate the virtualenv? |
airbyte-config/init/src/main/resources/seed/source_definitions.yaml
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-twilio/source_twilio/source.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-twilio/source_twilio/source.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-twilio/source_twilio/spec.json
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-twilio/source_twilio/streams.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-twilio/source_twilio/streams.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-twilio/source_twilio/streams.py
Outdated
Show resolved
Hide resolved
Co-authored-by: Augustin <augustin@airbyte.io>
Co-authored-by: Augustin <augustin@airbyte.io>
Co-authored-by: Augustin <augustin@airbyte.io>
Co-authored-by: Augustin <augustin@airbyte.io>
Co-authored-by: Augustin <augustin@airbyte.io>
Co-authored-by: Augustin <augustin@airbyte.io>
/test connector=connectors/source-twilio
Build FailedTest summary info:
|
}, | ||
"lookback_window": { | ||
"title": "Lookback window", | ||
"description": "Any data between sync time and sync time minus lookback window setting in minutes will be replicated (in minutes).", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A bit confused by this description.
Could we adjust to this be clearer and/or link to some documentation that elaborates possibly with clearer examples?
airbyte-integrations/connectors/source-twilio/source_twilio/spec.json
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @alejandroRosano-muttdata I tried to run the acceptance tests locally but they are failing due to multiple typos in your code.
I added comments for you to spot these.
Unfortunately, even after fixing these typo locally the tests are failing.
Could you please make the fixes required to make tests pass after running:
./gradlew :airbyte-integrations:connectors:source-twilio:integrationTest
My current tests output:
> Task :airbyte-integrations:connectors:source-twilio:sourceAcceptanceTest
============================= test session starts ==============================
platform linux -- Python 3.9.11, pytest-6.2.5, py-1.11.0, pluggy-1.0.0
rootdir: /test_input
plugins: timeout-1.4.2, sugar-0.9.4
collected 24 items
test_core.py ..................F. [ 83%]
test_full_refresh.py . [ 87%]
test_incremental.py FF. [100%]
=================================== FAILURES ===================================
_______________________ TestBasicRead.test_read[inputs0] _______________________
self = <source_acceptance_test.tests.test_core.TestBasicRead object at 0xffff83567970>
connector_config = SecretDict(******)
configured_catalog = ConfiguredAirbyteCatalog(streams=[ConfiguredAirbyteStream(stream=AirbyteStream(name='accounts', json_schema={'properti...l_refresh'>, cursor_field=None, destination_sync_mode=<DestinationSyncMode.overwrite: 'overwrite'>, primary_key=None)])
inputs = BasicReadTestConfig(config_path='secrets/config.json', configured_catalog_path='integration_tests/no_empty_streams_cat...rds=None, validate_schema=True, validate_data_points=False, expect_trace_message_on_failure=True, timeout_seconds=None)
expected_records = []
docker_runner = <source_acceptance_test.utils.connector_runner.ConnectorRunner object at 0xffff835677f0>
detailed_logger = <Logger detailed_logger /test_input/acceptance_tests_logs/test_core.py__TestBasicRead__test_read[inputs0].txt (DEBUG)>
def test_read(
self,
connector_config,
configured_catalog,
inputs: BasicReadTestConfig,
expected_records: List[AirbyteRecordMessage],
docker_runner: ConnectorRunner,
detailed_logger,
):
> output = docker_runner.call_read(connector_config, configured_catalog)
/usr/local/lib/python3.9/site-packages/source_acceptance_test/tests/test_core.py:390:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/local/lib/python3.9/site-packages/source_acceptance_test/utils/connector_runner.py:83: in call_read
output = list(self.run(cmd=cmd, config=config, catalog=catalog, **kwargs))
/usr/local/lib/python3.9/site-packages/source_acceptance_test/utils/connector_runner.py:106: in run
for line in self.read(container, command=cmd, with_ext=raise_container_error):
/usr/local/lib/python3.9/site-packages/source_acceptance_test/utils/connector_runner.py:119: in read
for chunk in container.logs(stdout=True, stderr=True, stream=True, follow=True):
/usr/local/lib/python3.9/site-packages/docker/types/daemon.py:32: in __next__
return next(self._stream)
/usr/local/lib/python3.9/site-packages/docker/api/client.py:386: in _multiplexed_response_stream_helper
header = response.raw.read(STREAM_HEADER_SIZE_BYTES)
/usr/local/lib/python3.9/site-packages/urllib3/response.py:522: in read
data = self._fp.read(amt) if not fp_closed else b""
/usr/local/lib/python3.9/http/client.py:463: in read
n = self.readinto(b)
/usr/local/lib/python3.9/http/client.py:497: in readinto
return self._readinto_chunked(b)
/usr/local/lib/python3.9/http/client.py:592: in _readinto_chunked
chunk_left = self._get_chunk_left()
/usr/local/lib/python3.9/http/client.py:560: in _get_chunk_left
chunk_left = self._read_next_chunk_size()
/usr/local/lib/python3.9/http/client.py:520: in _read_next_chunk_size
line = self.fp.readline(_MAXLINE + 1)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <socket.SocketIO object at 0xffff8371d340>
b = <memory at 0xffff839eaa00>
def readinto(self, b):
"""Read up to len(b) bytes into the writable buffer *b* and return
the number of bytes read. If the socket is non-blocking and no bytes
are available, None is returned.
If *b* is non-empty, a 0 return value indicates that the connection
was shutdown at the other end.
"""
self._checkClosed()
self._checkReadable()
if self._timeout_occurred:
raise OSError("cannot read from timed out object")
while True:
try:
> return self._sock.recv_into(b)
E Failed: Timeout >300.0s
/usr/local/lib/python3.9/socket.py:704: Failed
______________ TestIncremental.test_two_sequential_reads[inputs0] ______________
self = <source_acceptance_test.tests.test_incremental.TestIncremental object at 0xffff825b2760>
inputs = IncrementalConfig(config_path='secrets/config.json', configured_catalog_path='integration_tests/no_empty_streams_no_us...gration_tests/abnormal_state.json', timeout_seconds=None, threshold_days=0, skip_comprehensive_incremental_tests=False)
connector_config = SecretDict(******)
configured_catalog_for_incremental = ConfiguredAirbyteCatalog(streams=[ConfiguredAirbyteStream(stream=AirbyteStream(name='calls', json_schema={'properties'...tal'>, cursor_field=['date_updated'], destination_sync_mode=<DestinationSyncMode.append: 'append'>, primary_key=None)])
cursor_paths = {'alerts': ['date_updated'], 'calls': ['end_time'], 'conferences': ['date_updated'], 'message_media': ['date_created'], ...}
docker_runner = <source_acceptance_test.utils.connector_runner.ConnectorRunner object at 0xffff825b2250>
def test_two_sequential_reads(
self,
inputs: IncrementalConfig,
connector_config: SecretDict,
configured_catalog_for_incremental: ConfiguredAirbyteCatalog,
cursor_paths: dict[str, list[str]],
docker_runner: ConnectorRunner,
):
threshold_days = getattr(inputs, "threshold_days") or 0
stream_mapping = {stream.stream.name: stream for stream in configured_catalog_for_incremental.streams}
output = docker_runner.call_read(connector_config, configured_catalog_for_incremental)
records_1 = filter_output(output, type_=Type.RECORD)
states_1 = filter_output(output, type_=Type.STATE)
assert states_1, "Should produce at least one state"
assert records_1, "Should produce at least one record"
latest_state = states_1[-1].state.data
for record_value, state_value, stream_name in records_with_state(records_1, latest_state, stream_mapping, cursor_paths):
> assert (
record_value <= state_value
), f"First incremental sync should produce records younger or equal to cursor value from the state. Stream: {stream_name}"
E AssertionError: First incremental sync should produce records younger or equal to cursor value from the state. Stream: calls
E assert DateTime(2022, 6, 2, 12, 54, 5, tzinfo=Timezone('UTC')) <= DateTime(2022, 3, 13, 23, 56, 37, tzinfo=Timezone('+00:00'))
/usr/local/lib/python3.9/site-packages/source_acceptance_test/tests/test_incremental.py:133: AssertionError
_____________ TestIncremental.test_read_sequential_slices[inputs0] _____________
self = <source_acceptance_test.tests.test_incremental.TestIncremental object at 0xffff82bbe1f0>
inputs = IncrementalConfig(config_path='secrets/config.json', configured_catalog_path='integration_tests/no_empty_streams_no_us...gration_tests/abnormal_state.json', timeout_seconds=None, threshold_days=0, skip_comprehensive_incremental_tests=False)
connector_config = SecretDict(******)
configured_catalog_for_incremental = ConfiguredAirbyteCatalog(streams=[ConfiguredAirbyteStream(stream=AirbyteStream(name='calls', json_schema={'properties'...tal'>, cursor_field=['date_updated'], destination_sync_mode=<DestinationSyncMode.append: 'append'>, primary_key=None)])
cursor_paths = {'alerts': ['date_updated'], 'calls': ['end_time'], 'conferences': ['date_updated'], 'message_media': ['date_created'], ...}
docker_runner = <source_acceptance_test.utils.connector_runner.ConnectorRunner object at 0xffff82bbea00>
def test_read_sequential_slices(
self, inputs: IncrementalConfig, connector_config, configured_catalog_for_incremental, cursor_paths, docker_runner: ConnectorRunner
):
"""
Incremental test that makes calls the read method without a state checkpoint. Then we partition the results by stream and
slice checkpoints resulting in batches of messages that look like:
<state message>
<record message>
...
<record message>
Using these batches, we then make additional read method calls using the state message and verify the correctness of the
messages in the response.
"""
if inputs.skip_comprehensive_incremental_tests:
pytest.skip("Skipping new incremental test based on acceptance-test-config.yml")
return
threshold_days = getattr(inputs, "threshold_days") or 0
stream_mapping = {stream.stream.name: stream for stream in configured_catalog_for_incremental.streams}
output = docker_runner.call_read(connector_config, configured_catalog_for_incremental)
records_1 = filter_output(output, type_=Type.RECORD)
states_1 = filter_output(output, type_=Type.STATE)
assert states_1, "Should produce at least one state"
assert records_1, "Should produce at least one record"
latest_state = states_1[-1].state.data
for record_value, state_value, stream_name in records_with_state(records_1, latest_state, stream_mapping, cursor_paths):
> assert (
record_value <= state_value
), f"First incremental sync should produce records younger or equal to cursor value from the state. Stream: {stream_name}"
E AssertionError: First incremental sync should produce records younger or equal to cursor value from the state. Stream: calls
E assert DateTime(2022, 6, 2, 12, 54, 5, tzinfo=Timezone('UTC')) <= DateTime(2022, 3, 13, 23, 56, 37, tzinfo=Timezone('+00:00'))
/usr/local/lib/python3.9/site-packages/source_acceptance_test/tests/test_incremental.py:175: AssertionError
=========================== short test summary info ============================
FAILED test_core.py::TestBasicRead::test_read[inputs0] - Failed: Timeout >300.0s
FAILED test_incremental.py::TestIncremental::test_two_sequential_reads[inputs0]
FAILED test_incremental.py::TestIncremental::test_read_sequential_slices[inputs0]
=================== 3 failed, 21 passed in 593.61s (0:09:53) ===================
airbyte-integrations/connectors/source-twilio/source_twilio/spec.json
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-twilio/source_twilio/source.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-twilio/source_twilio/streams.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-twilio/source_twilio/streams.py
Outdated
Show resolved
Hide resolved
Co-authored-by: Augustin <augustin@airbyte.io>
Co-authored-by: Augustin <augustin@airbyte.io>
Co-authored-by: Augustin <augustin@airbyte.io>
Co-authored-by: Augustin <augustin@airbyte.io>
Co-authored-by: Augustin <augustin@airbyte.io>
I will be work in resolve this today. |
Hi @alafanechere , In my last commit when i run :
I really don't know where to go, it seems or I understand that it is caused by the lookback_window implementation. |
@alejandroRosano-muttdata I'm going to try to debug it locally. I'll keep you updated. |
@alejandroRosano-muttdata I spent a good amount of time figuring the root cause of the test failure. |
Closing in favour of #13896 |
What
Describe what the change is solving
It helps to add screenshots if it affects the frontend.
-Adding lookback setting to have rolling window that allows updating based on the date_updated field
-Add page size setting from UI
-Update to IncrementalMixin (deprecated get_update_state)
How
Describe the solution
carried out the changes on the twilio connector to integrate the functionality of being able to set a lookback (rolling window) in order to be able to bring information with windows of, for example, 24 hours back and be able to update. And the option to set the page size from the UI
Recommended reading order
x.java
y.python
🚨 User Impact 🚨
Are there any breaking changes? What is the end result perceived by the user? If yes, please merge this PR with the 🚨🚨 emoji so changelog authors can further highlight this if needed.
Pre-merge Checklist
Expand the relevant checklist and delete the others.
New Connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/SUMMARY.md
docs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampledocs/integrations/README.md
airbyte-integrations/builds.md
Airbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing/publish
command described hereUpdating a connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampleAirbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing/publish
command described hereConnector Generator
-scaffold
in their name) have been updated with the latest scaffold by running./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates
then checking in your changesTests
Unit
Put your unit tests output here.
Integration
Put your integration tests output here.
Acceptance
Put your acceptance tests output here.