From dd1991ea52ebb5da55ef7820edad09bd10706a7c Mon Sep 17 00:00:00 2001 From: Brian Leonard Date: Thu, 22 Feb 2024 10:49:06 -0800 Subject: [PATCH] Handle seeing uncompressed sendgrid contact data (#35343) --- .../integration_tests/expected_records.jsonl | 12 ++++----- .../connectors/source-sendgrid/metadata.yaml | 2 +- .../connectors/source-sendgrid/pyproject.toml | 2 +- .../source_sendgrid/streams.py | 8 +++++- .../source-sendgrid/unit_tests/unit_test.py | 25 +++++++++++++++++-- docs/integrations/sources/sendgrid.md | 1 + 6 files changed, 39 insertions(+), 11 deletions(-) diff --git a/airbyte-integrations/connectors/source-sendgrid/integration_tests/expected_records.jsonl b/airbyte-integrations/connectors/source-sendgrid/integration_tests/expected_records.jsonl index 188db682c2c6..acf7830c5b8e 100644 --- a/airbyte-integrations/connectors/source-sendgrid/integration_tests/expected_records.jsonl +++ b/airbyte-integrations/connectors/source-sendgrid/integration_tests/expected_records.jsonl @@ -184,12 +184,12 @@ {"stream": "suppression_group_members", "data": {"email": "test-forsuppressiongroup number8@example.com", "group_id": 14772, "group_name": "Test Suggestions Group 12", "created_at": 1612363238}, "emitted_at": 1631093393000} {"stream": "suppression_group_members", "data": {"email": "test-forsuppressiongroup number9@example.com", "group_id": 14772, "group_name": "Test Suggestions Group 12", "created_at": 1612363238}, "emitted_at": 1631093393000} {"stream": "suppression_group_members", "data": {"email": "avida.d3@gmail.com", "group_id": 14780, "group_name": "Test Suggestions Group 20", "created_at": 1631093329}, "emitted_at": 1631093393000} -{"stream": "bounces", "data": { "created": 1621442821, "email": "vadym.hevlich@zazmicinvalid", "reason": "Invalid Domain", "status": "" }, "emitted_at": 1678792680684} -{"stream": "bounces", "data": { "created": 1621441107, "email": "vadym.hevlich@zazmiccom2", "reason": "Invalid Domain", "status": "" }, "emitted_at": 1678792680684} -{"stream": "bounces", "data": { "created": 1621442883, "email": "vadym.hevlich@zazmic_com", "reason": "Invalid Domain", "status": "" }, "emitted_at": 1678792680684} -{"stream": "bounces", "data": { "created": 1621441104, "email": "vadym.hevlich@zazmiccom1", "reason": "Invalid Domain", "status": "" }, "emitted_at": 1678792680684} -{"stream": "bounces", "data": { "created": 1621442811, "email": "vadym.hevlich@zazmicio", "reason": "Invalid Domain", "status": "" }, "emitted_at": 1678792680685} -{"stream": "bounces", "data": { "created": 1621430037, "email": "vadym.hevlich@zazmiccom", "reason": "Invalid Domain", "status": "" }, "emitted_at": 1678792680685} +{"stream": "bounces", "data": {"status": "", "reason": "Invalid Domain", "email": "vadym.hevlich@zazmic_com", "created": 1621439283}, "emitted_at": 1708535996116} +{"stream": "bounces", "data": {"status": "", "reason": "Invalid Domain", "email": "vadym.hevlich@zazmicinvalid", "created": 1621439221}, "emitted_at": 1708535996116} +{"stream": "bounces", "data": {"status": "", "reason": "Invalid Domain", "email": "vadym.hevlich@zazmicio", "created": 1621439211}, "emitted_at": 1708535996116} +{"stream": "bounces", "data": {"status": "", "reason": "Invalid Domain", "email": "vadym.hevlich@zazmiccom2", "created": 1621437507}, "emitted_at": 1708535996117} +{"stream": "bounces", "data": {"status": "", "reason": "Invalid Domain", "email": "vadym.hevlich@zazmiccom1", "created": 1621437504}, "emitted_at": 1708535996117} +{"stream": "bounces", "data": {"status": "", "reason": "Invalid Domain", "email": "vadym.hevlich@zazmiccom", "created": 1621426437}, "emitted_at": 1708535996117} {"stream": "campaigns", "data": {"created_at": "2021-09-08T09:07:48Z", "id": "3c5a9fa6-1084-11ec-ac32-4228d699bad5", "name": "Untitled Single Send", "status": "triggered", "updated_at": "2021-09-08T09:11:08Z", "is_abtest": false, "channels": ["email"]}, "emitted_at": 1678791750589} {"stream": "campaigns", "data": {"created_at": "2021-09-08T09:04:36Z", "id": "c9f286fb-1083-11ec-ae03-ca0fc7f28419", "name": "Copy of Untitled Single Send", "status": "triggered", "updated_at": "2021-09-08T09:09:08Z", "is_abtest": false, "channels": ["email"]}, "emitted_at": 1678791750589} {"stream": "campaigns", "data": {"created_at": "2021-09-08T08:53:59Z", "id": "4e5be6a3-1082-11ec-8512-9afd40c324e6", "name": "Untitled Single Send", "status": "triggered", "updated_at": "2021-09-08T08:57:08Z", "is_abtest": false, "channels": ["email"]}, "emitted_at": 1678791750590} diff --git a/airbyte-integrations/connectors/source-sendgrid/metadata.yaml b/airbyte-integrations/connectors/source-sendgrid/metadata.yaml index 607b8fc44d05..9955875b363d 100644 --- a/airbyte-integrations/connectors/source-sendgrid/metadata.yaml +++ b/airbyte-integrations/connectors/source-sendgrid/metadata.yaml @@ -10,7 +10,7 @@ data: connectorSubtype: api connectorType: source definitionId: fbb5fbe2-16ad-4cf4-af7d-ff9d9c316c87 - dockerImageTag: 0.4.2 + dockerImageTag: 0.4.3 dockerRepository: airbyte/source-sendgrid documentationUrl: https://docs.airbyte.com/integrations/sources/sendgrid githubIssueLabel: source-sendgrid diff --git a/airbyte-integrations/connectors/source-sendgrid/pyproject.toml b/airbyte-integrations/connectors/source-sendgrid/pyproject.toml index 53ac7a8df890..35ad4315bcc9 100644 --- a/airbyte-integrations/connectors/source-sendgrid/pyproject.toml +++ b/airbyte-integrations/connectors/source-sendgrid/pyproject.toml @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",] build-backend = "poetry.core.masonry.api" [tool.poetry] -version = "0.4.2" +version = "0.4.3" name = "source-sendgrid" description = "Source implementation for Sendgrid." authors = [ "Airbyte ",] diff --git a/airbyte-integrations/connectors/source-sendgrid/source_sendgrid/streams.py b/airbyte-integrations/connectors/source-sendgrid/source_sendgrid/streams.py index 9275048d660d..68f1932665cd 100644 --- a/airbyte-integrations/connectors/source-sendgrid/source_sendgrid/streams.py +++ b/airbyte-integrations/connectors/source-sendgrid/source_sendgrid/streams.py @@ -299,7 +299,13 @@ def download_data(self, url: str, chunk_size: int = 1024) -> tuple[str, str]: tmp_file, "wb" ) as data_file: for chunk in response.iter_content(chunk_size=chunk_size): - data_file.write(decompressor.decompress(chunk)) + try: + # see if it's compressed. we are seeing some that are not all of a sudden. + # but let's also guard against the case where sendgrid changes it back. + data_file.write(decompressor.decompress(chunk)) + except zlib.error as e: + # it's not actually compressed! + data_file.write(chunk) # check the file exists if os.path.isfile(tmp_file): return tmp_file, self.encoding diff --git a/airbyte-integrations/connectors/source-sendgrid/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-sendgrid/unit_tests/unit_test.py index 14fab1cb6bee..750a1db4c8b3 100644 --- a/airbyte-integrations/connectors/source-sendgrid/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-sendgrid/unit_tests/unit_test.py @@ -153,7 +153,7 @@ def test_should_retry_on_permission_error(requests_mock, stream_class, status, e def test_compressed_contact_response(requests_mock): stream = Contacts() - with open(os.path.dirname(__file__) + "/compressed_response", "rb") as compressed_response: + with open(os.path.dirname(__file__) + "/compressed_response", "rb") as file_response: url = "https://api.sendgrid.com/v3/marketing/contacts/exports" requests_mock.register_uri("POST", url, [{"json": {"id": "random_id"}, "status_code": 202}]) url = "https://api.sendgrid.com/v3/marketing/contacts/exports/random_id" @@ -162,7 +162,28 @@ def test_compressed_contact_response(requests_mock): {"json": {"status": "ready", "urls": ["https://sample_url/sample_csv.csv.gzip"]}, "status_code": 202}, ] requests_mock.register_uri("GET", url, resp_bodies) - requests_mock.register_uri("GET", "https://sample_url/sample_csv.csv.gzip", [{"body": compressed_response, "status_code": 202}]) + requests_mock.register_uri("GET", "https://sample_url/sample_csv.csv.gzip", [{"body": file_response, "status_code": 202}]) + recs = list(stream.read_records(sync_mode=SyncMode.full_refresh)) + decompressed_response = pd.read_csv(os.path.dirname(__file__) + "/decompressed_response.csv", dtype=str) + expected_records = [ + {k.lower(): v for k, v in x.items()} for x in decompressed_response.replace({nan: None}).to_dict(orient="records") + ] + + assert recs == expected_records + + +def test_uncompressed_contact_response(requests_mock): + stream = Contacts() + with open(os.path.dirname(__file__) + "/decompressed_response.csv", "rb") as file_response: + url = "https://api.sendgrid.com/v3/marketing/contacts/exports" + requests_mock.register_uri("POST", url, [{"json": {"id": "random_id"}, "status_code": 202}]) + url = "https://api.sendgrid.com/v3/marketing/contacts/exports/random_id" + resp_bodies = [ + {"json": {"status": "pending", "id": "random_id", "urls": []}, "status_code": 202}, + {"json": {"status": "ready", "urls": ["https://sample_url/sample_csv.csv.gzip"]}, "status_code": 202}, + ] + requests_mock.register_uri("GET", url, resp_bodies) + requests_mock.register_uri("GET", "https://sample_url/sample_csv.csv.gzip", [{"body": file_response, "status_code": 202}]) recs = list(stream.read_records(sync_mode=SyncMode.full_refresh)) decompressed_response = pd.read_csv(os.path.dirname(__file__) + "/decompressed_response.csv", dtype=str) expected_records = [ diff --git a/docs/integrations/sources/sendgrid.md b/docs/integrations/sources/sendgrid.md index a569b28ce89d..b3851579bbf2 100644 --- a/docs/integrations/sources/sendgrid.md +++ b/docs/integrations/sources/sendgrid.md @@ -84,6 +84,7 @@ The connector is restricted by normal Sendgrid [requests limitation](https://doc | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.4.3 | 2024-02-21 | [35181](https://github.com/airbytehq/airbyte/pull/35343) | Handle uncompressed contacts downloads. | | 0.4.2 | 2024-02-12 | [35181](https://github.com/airbytehq/airbyte/pull/35181) | Manage dependencies with Poetry. | | 0.4.1 | 2023-10-18 | [31543](https://github.com/airbytehq/airbyte/pull/31543) | Base image migration: remove Dockerfile and use the python-connector-base image | | 0.4.0 | 2023-05-19 | [23959](https://github.com/airbytehq/airbyte/pull/23959) | Add `unsubscribe_groups`stream