-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Source s3 cursor with history adapter #29028
Conversation
Before Merging a Connector Pull RequestWow! What a great pull request you have here! 🎉 To merge this PR, ensure the following has been done/considered for each connector added or updated:
If the checklist is complete, but the CI check is failing,
|
source-s3 test report (commit
|
Step | Result |
---|---|
Validate airbyte-integrations/connectors/source-s3/metadata.yaml | ✅ |
Connector version semver check | ✅ |
QA checks | ✅ |
Code format checks | ❌ |
Connector package install | ✅ |
Build source-s3 docker image for platform linux/x86_64 | ✅ |
Unit tests | ✅ |
Integration tests | ✅ |
Acceptance tests | ❌ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-s3 test
e462779
to
5dd8d06
Compare
source-s3 test report (commit
|
Step | Result |
---|---|
Validate airbyte-integrations/connectors/source-s3/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ❌ |
QA checks | ✅ |
Code format checks | ❌ |
Connector package install | ✅ |
Build source-s3 docker image for platform linux/x86_64 | ✅ |
Unit tests | ✅ |
Integration tests | ✅ |
Acceptance tests | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-s3 test
source-s3 test report (commit
|
Step | Result |
---|---|
Validate airbyte-integrations/connectors/source-s3/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ✅ |
QA checks | ❌ |
Code format checks | ❌ |
Connector package install | ✅ |
Build source-s3 docker image for platform linux/x86_64 | ❌ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-s3 test
ebe3cd9
to
7a6a6b9
Compare
source-s3 test report (commit
|
Step | Result |
---|---|
Validate airbyte-integrations/connectors/source-s3/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ✅ |
QA checks | ✅ |
Code format checks | ❌ |
Connector package install | ✅ |
Build source-s3 docker image for platform linux/x86_64 | ✅ |
Unit tests | ✅ |
Integration tests | ✅ |
Acceptance tests | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-s3 test
efd77d9
to
10ce5fe
Compare
source-s3 test report (commit
|
Step | Result |
---|---|
Validate airbyte-integrations/connectors/source-s3/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ✅ |
QA checks | ✅ |
Code format checks | ❌ |
Connector package install | ✅ |
Build source-s3 docker image for platform linux/x86_64 | ✅ |
Unit tests | ✅ |
Integration tests | ✅ |
Acceptance tests | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-s3 test
10ce5fe
to
0a23e96
Compare
source-s3 test report (commit
|
Step | Result |
---|---|
Validate airbyte-integrations/connectors/source-s3/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ✅ |
QA checks | ✅ |
Code format checks | ❌ |
Connector package install | ✅ |
Build source-s3 docker image for platform linux/x86_64 | ✅ |
Unit tests | ✅ |
Integration tests | ✅ |
Acceptance tests | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-s3 test
0a23e96
to
fb0cf97
Compare
source-s3 test report (commit
|
Step | Result |
---|---|
Validate airbyte-integrations/connectors/source-s3/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ✅ |
QA checks | ✅ |
Code format checks | ✅ |
Connector package install | ✅ |
Build source-s3 docker image for platform linux/x86_64 | ✅ |
Unit tests | ✅ |
Integration tests | ✅ |
Acceptance tests | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-s3 test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like test coverage does not guarantee the behavior we would like. Can we test public methods instead?
return True | ||
else: | ||
return False | ||
if cursor := stream_state.get(self.cursor_field): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like an edge case but if the cursor_field change, the state will be considered as V3. Is that ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call-out, and I'm okay with that since it's an edge case, and we will hopefully be deprecating v3 before too long.
airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py
Show resolved
Hide resolved
], | ||
) | ||
def test_convert_dates(input_history: MutableMapping[str, Any], expected_output: MutableMapping[str, Any]) -> None: | ||
assert Cursor._convert_legacy_history(input_history) == expected_output |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like this test does not guarantee that the public interface is working as expected i.e. set_initial_state
could do something completely unexpected and we won't know with this coverage. Should we test set_initial_state
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good call, updated tests to use that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw this change caught a bug. Updated code in airbyte-integrations/connectors/source-s3/source_s3/v4/cursor.py if you want to take another look.
airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py
Show resolved
Hide resolved
timestamp_millis = stream_state[self.cursor_field].split("_")[0] | ||
converted_state[self.cursor_field] = self._get_ts_from_millis_ts(timestamp_millis, "%Y-%m-%dT%H:%M:%SZ") | ||
if "history" in stream_state: | ||
converted_state["history"] = converted_history |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think we can always set converted_state["history"] = converted_history
. It'll just be an empty object if the stream_state doesn't have a history field
def get_source(args: List[str]): | ||
catalog_path = AirbyteEntrypoint.extract_catalog(args) | ||
try: | ||
return FileBasedSource(SourceS3StreamReader(), Config, catalog_path, cursor_cls=Cursor) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you elaborate on why or what scenario we need this additional try/catch a opposed to just initializing it the way we had been doing before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes so I added this in order to make the CATs pass. When check
errors, they expect the error to be output as an AirbyteMessage
. But if we initialize FileBasedSource
outside of the try
that won't happen.
source-s3 test report (commit
|
Step | Result |
---|---|
Validate airbyte-integrations/connectors/source-s3/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ✅ |
QA checks | ✅ |
Code format checks | ✅ |
Connector package install | ✅ |
Build source-s3 docker image for platform linux/x86_64 | ✅ |
Unit tests | ✅ |
Integration tests | ✅ |
Acceptance tests | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-s3 test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚢 🚢 🚢
0127a8f
to
78dcbe8
Compare
source-s3 test report (commit
|
Step | Result |
---|---|
Validate airbyte-integrations/connectors/source-s3/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ✅ |
QA checks | ✅ |
Code format checks | ✅ |
Connector package install | ✅ |
Build source-s3 docker image for platform linux/x86_64 | ✅ |
Unit tests | ✅ |
Integration tests | ❌ |
Acceptance tests | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-s3 test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just small nits, nothing major so looks good!
if not timestamp: | ||
return timestamp | ||
try: | ||
timestamp_millis = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: but since this is used in mroe than one place in the file (also on line 460), can we make this a constant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 done.
|
||
for filename in filenames: | ||
if filename in converted_history: | ||
if date_obj > datetime.strptime(converted_history[filename], "%Y-%m-%dT%H:%M:%S.%fZ"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here let's use a constant since its used 3 times
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 done.
78dcbe8
to
c14d044
Compare
source-s3 test report (commit
|
Step | Result |
---|---|
Validate airbyte-integrations/connectors/source-s3/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ✅ |
QA checks | ✅ |
Code format checks | ✅ |
Connector package install | ✅ |
Build source-s3 docker image for platform linux/x86_64 | ✅ |
Unit tests | ✅ |
Integration tests | ✅ |
Acceptance tests | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-s3 test
c14d044
to
5e32482
Compare
source-s3 test report (commit
|
Step | Result |
---|---|
Validate airbyte-integrations/connectors/source-s3/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ✅ |
QA checks | ✅ |
Code format checks | ✅ |
Connector package install | ✅ |
Build source-s3 docker image for platform linux/x86_64 | ✅ |
Unit tests | ✅ |
Integration tests | ✅ |
Acceptance tests | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-s3 test
Creates a Cursor object for source S3 that adapts state message in the old format to the new format.
Depends on #29027.