From 0ac6a2411c5048520cc9250a0764997b9695c8d5 Mon Sep 17 00:00:00 2001 From: Anushree Agrawal Date: Fri, 7 Oct 2022 13:16:04 -0700 Subject: [PATCH 1/3] Update source Orb to handle multiple events with the same event_id --- .../source-orb/source_orb/source.py | 23 +++++++++------- .../unit_tests/test_incremental_streams.py | 26 +++++++++++++++++++ 2 files changed, 39 insertions(+), 10 deletions(-) diff --git a/airbyte-integrations/connectors/source-orb/source_orb/source.py b/airbyte-integrations/connectors/source-orb/source_orb/source.py index 724625445105..8aeba8151272 100644 --- a/airbyte-integrations/connectors/source-orb/source_orb/source.py +++ b/airbyte-integrations/connectors/source-orb/source_orb/source.py @@ -301,14 +301,15 @@ def enrich_ledger_entries_with_event_data(self, ledger_entries): """ # Build up a list of the subset of ledger entries we are expected # to enrich with event metadata. - event_id_to_ledger_entry = {} + event_id_to_ledger_entries = {} for entry in ledger_entries: maybe_event_id: Optional[str] = entry.get("event_id") if maybe_event_id: - event_id_to_ledger_entry[maybe_event_id] = entry + # There can be multiple entries with the same event ID + event_id_to_ledger_entries[maybe_event_id] = event_id_to_ledger_entries.get(maybe_event_id, []) + [entry] # Nothing to enrich; short-circuit - if len(event_id_to_ledger_entry) == 0: + if len(event_id_to_ledger_entries) == 0: return ledger_entries def modify_ledger_entry_schema(ledger_entry): @@ -321,8 +322,9 @@ def modify_ledger_entry_schema(ledger_entry): ledger_entry["event"] = {} ledger_entry["event"]["id"] = event_id - for ledger_entry in event_id_to_ledger_entry.values(): - modify_ledger_entry_schema(ledger_entry=ledger_entry) + for ledger_entries_in_map in event_id_to_ledger_entries.values(): + for ledger_entry in ledger_entries_in_map: + modify_ledger_entry_schema(ledger_entry=ledger_entry) # Nothing to extract for each ledger entry merged_properties_keys = (self.string_event_properties_keys or []) + (self.numeric_event_properties_keys or []) @@ -331,7 +333,7 @@ def modify_ledger_entry_schema(ledger_entry): # The events endpoint is a `POST` endpoint which expects a list of # event_ids to filter on - request_filter_json = {"event_ids": list(event_id_to_ledger_entry)} + request_filter_json = {"event_ids": list(event_id_to_ledger_entries)} # Prepare request with self._session, which should # automatically deal with the authentication header. @@ -354,16 +356,17 @@ def modify_ledger_entry_schema(ledger_entry): # This would imply that the endpoint returned an event that wasn't part of the filter # parameters, so log an error but ignore it. - if event_id not in event_id_to_ledger_entry: + if event_id not in event_id_to_ledger_entries: self.logger.error(f"Unrecognized event received with ID {event_id} when trying to enrich ledger entries") continue # Replace ledger_entry.event_id with ledger_entry.event - event_id_to_ledger_entry[event_id]["event"]["properties"] = desired_properties_subset - num_events_enriched += 1 + for ledger_entry in event_id_to_ledger_entries[event_id]: + ledger_entry["event"]["properties"] = desired_properties_subset + num_events_enriched += 1 # Log an error if we did not enrich all the entries we asked for. - if num_events_enriched != len(event_id_to_ledger_entry): + if num_events_enriched != sum(len(le) for le in event_id_to_ledger_entries.values()): self.logger.error("Unable to enrich all eligible credit ledger entries with event metadata.") # Mutating entries within `event_id_to_ledger_entry` should have modified diff --git a/airbyte-integrations/connectors/source-orb/unit_tests/test_incremental_streams.py b/airbyte-integrations/connectors/source-orb/unit_tests/test_incremental_streams.py index 326968721a6f..f471f1cfd205 100644 --- a/airbyte-integrations/connectors/source-orb/unit_tests/test_incremental_streams.py +++ b/airbyte-integrations/connectors/source-orb/unit_tests/test_incremental_streams.py @@ -259,6 +259,32 @@ def test_credits_ledger_entries_enriches_selected_property_keys( # Does not enrich, but still passes back, irrelevant (for enrichment purposes) ledger entry assert enriched_entries[1] == original_entry_1 +@responses.activate +def test_credits_ledger_entries_enriches_with_multiple_entries_per_event(mocker): + stream = CreditsLedgerEntries(string_event_properties_keys=["ping"]) + ledger_entries = [{"event_id": "foo-event-id", "entry_type": "decrement"}, {"event_id": "foo-event-id", "entry_type": "decrement"}] + mock_response = { + "data": [ + { + "customer_id": "foo-customer-id", + "event_name": "foo-name", + "id": "foo-event-id", + "properties": {"ping": "pong"}, + "timestamp": "2022-02-21T07:00:00+00:00", + } + ], + "pagination_metadata": {"has_more": False, "next_cursor": None}, + } + responses.add(responses.POST, f"{stream.url_base}events", json=mock_response, status=200) + enriched_entries = stream.enrich_ledger_entries_with_event_data(ledger_entries) + + # We expect both events are enriched correctly + assert enriched_entries == [ + {"event": {"id": "foo-event-id", "properties": {"ping": "pong"}}, "entry_type": "decrement"}, + {"event": {"id": "foo-event-id", "properties": {"ping": "pong"}}, "entry_type": "decrement"}, + ] + + def test_supports_incremental(patch_incremental_base_class, mocker): mocker.patch.object(IncrementalOrbStream, "cursor_field", "dummy_field") From e14de80847f1d0ba17cd412d9767da58c48e9ddb Mon Sep 17 00:00:00 2001 From: Anushree Agrawal Date: Fri, 7 Oct 2022 13:24:26 -0700 Subject: [PATCH 2/3] Documentation --- .../init/src/main/resources/seed/source_definitions.yaml | 2 +- airbyte-config/init/src/main/resources/seed/source_specs.yaml | 2 +- airbyte-integrations/connectors/source-orb/Dockerfile | 2 +- docs/integrations/sources/orb.md | 1 + 4 files changed, 4 insertions(+), 3 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 6443c29c51c8..20833964f5bc 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -733,7 +733,7 @@ - name: Orb sourceDefinitionId: 7f0455fb-4518-4ec0-b7a3-d808bf8081cc dockerRepository: airbyte/source-orb - dockerImageTag: 0.1.3 + dockerImageTag: 0.1.4 documentationUrl: https://docs.airbyte.io/integrations/sources/orb icon: orb.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 3150c8db90f3..cbc8c531eeb5 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -7994,7 +7994,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-orb:0.1.3" +- dockerImage: "airbyte/source-orb:0.1.4" spec: documentationUrl: "https://docs.withorb.com/" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-orb/Dockerfile b/airbyte-integrations/connectors/source-orb/Dockerfile index 7aac1d472d9d..0e60a42a61b0 100644 --- a/airbyte-integrations/connectors/source-orb/Dockerfile +++ b/airbyte-integrations/connectors/source-orb/Dockerfile @@ -34,5 +34,5 @@ COPY source_orb ./source_orb ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.3 +LABEL io.airbyte.version=0.1.4 LABEL io.airbyte.name=airbyte/source-orb diff --git a/docs/integrations/sources/orb.md b/docs/integrations/sources/orb.md index cd2fbb946655..6dc4c2596659 100644 --- a/docs/integrations/sources/orb.md +++ b/docs/integrations/sources/orb.md @@ -52,6 +52,7 @@ an Orb Account and API Key. | Version | Date | Pull Request | Subject | | --- | --- | --- | --- | +| 0.1.4 | 2022-10-07 | []() | Fix bug with enriching ledger entries | 0.1.3 | 2022-08-26 | [16017](https://github.com/airbytehq/airbyte/pull/16017) | Add credit block id to ledger entries | 0.1.2 | 2022-04-20 | [11528](https://github.com/airbytehq/airbyte/pull/11528) | Add cost basis to ledger entries, update expiration date, sync only committed entries | 0.1.1 | 2022-03-03 | [10839](https://github.com/airbytehq/airbyte/pull/10839) | Support ledger entries with numeric properties + schema fixes From 65c8474c6750a72354181aeece338d918ac86eeb Mon Sep 17 00:00:00 2001 From: Anushree Agrawal Date: Fri, 7 Oct 2022 13:37:46 -0700 Subject: [PATCH 3/3] Update documentation --- docs/integrations/sources/orb.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/integrations/sources/orb.md b/docs/integrations/sources/orb.md index 6dc4c2596659..e9422b5f40e0 100644 --- a/docs/integrations/sources/orb.md +++ b/docs/integrations/sources/orb.md @@ -52,7 +52,7 @@ an Orb Account and API Key. | Version | Date | Pull Request | Subject | | --- | --- | --- | --- | -| 0.1.4 | 2022-10-07 | []() | Fix bug with enriching ledger entries +| 0.1.4 | 2022-10-07 | [17761](https://github.com/airbytehq/airbyte/pull/17761) | Fix bug with enriching ledger entries | 0.1.3 | 2022-08-26 | [16017](https://github.com/airbytehq/airbyte/pull/16017) | Add credit block id to ledger entries | 0.1.2 | 2022-04-20 | [11528](https://github.com/airbytehq/airbyte/pull/11528) | Add cost basis to ledger entries, update expiration date, sync only committed entries | 0.1.1 | 2022-03-03 | [10839](https://github.com/airbytehq/airbyte/pull/10839) | Support ledger entries with numeric properties + schema fixes