Skip to content

Commit

Permalink
🐛 Source Orb: Fix bug to enrich multiple events with the same `event_…
Browse files Browse the repository at this point in the history
…id` (airbytehq#17761)

* Update source Orb to handle multiple events with the same event_id

* Documentation

* Update documentation
  • Loading branch information
anushree-agrawal authored and jhammarstedt committed Oct 31, 2022
1 parent 3176ed7 commit ba398ce
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,7 @@
- name: Orb
sourceDefinitionId: 7f0455fb-4518-4ec0-b7a3-d808bf8081cc
dockerRepository: airbyte/source-orb
dockerImageTag: 0.1.3
dockerImageTag: 0.1.4
documentationUrl: https://docs.airbyte.io/integrations/sources/orb
icon: orb.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7994,7 +7994,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-orb:0.1.3"
- dockerImage: "airbyte/source-orb:0.1.4"
spec:
documentationUrl: "https://docs.withorb.com/"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-orb/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_orb ./source_orb
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.name=airbyte/source-orb
23 changes: 13 additions & 10 deletions airbyte-integrations/connectors/source-orb/source_orb/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,14 +301,15 @@ def enrich_ledger_entries_with_event_data(self, ledger_entries):
"""
# Build up a list of the subset of ledger entries we are expected
# to enrich with event metadata.
event_id_to_ledger_entry = {}
event_id_to_ledger_entries = {}
for entry in ledger_entries:
maybe_event_id: Optional[str] = entry.get("event_id")
if maybe_event_id:
event_id_to_ledger_entry[maybe_event_id] = entry
# There can be multiple entries with the same event ID
event_id_to_ledger_entries[maybe_event_id] = event_id_to_ledger_entries.get(maybe_event_id, []) + [entry]

# Nothing to enrich; short-circuit
if len(event_id_to_ledger_entry) == 0:
if len(event_id_to_ledger_entries) == 0:
return ledger_entries

def modify_ledger_entry_schema(ledger_entry):
Expand All @@ -321,8 +322,9 @@ def modify_ledger_entry_schema(ledger_entry):
ledger_entry["event"] = {}
ledger_entry["event"]["id"] = event_id

for ledger_entry in event_id_to_ledger_entry.values():
modify_ledger_entry_schema(ledger_entry=ledger_entry)
for ledger_entries_in_map in event_id_to_ledger_entries.values():
for ledger_entry in ledger_entries_in_map:
modify_ledger_entry_schema(ledger_entry=ledger_entry)

# Nothing to extract for each ledger entry
merged_properties_keys = (self.string_event_properties_keys or []) + (self.numeric_event_properties_keys or [])
Expand All @@ -331,7 +333,7 @@ def modify_ledger_entry_schema(ledger_entry):

# The events endpoint is a `POST` endpoint which expects a list of
# event_ids to filter on
request_filter_json = {"event_ids": list(event_id_to_ledger_entry)}
request_filter_json = {"event_ids": list(event_id_to_ledger_entries)}

# Prepare request with self._session, which should
# automatically deal with the authentication header.
Expand All @@ -354,16 +356,17 @@ def modify_ledger_entry_schema(ledger_entry):

# This would imply that the endpoint returned an event that wasn't part of the filter
# parameters, so log an error but ignore it.
if event_id not in event_id_to_ledger_entry:
if event_id not in event_id_to_ledger_entries:
self.logger.error(f"Unrecognized event received with ID {event_id} when trying to enrich ledger entries")
continue

# Replace ledger_entry.event_id with ledger_entry.event
event_id_to_ledger_entry[event_id]["event"]["properties"] = desired_properties_subset
num_events_enriched += 1
for ledger_entry in event_id_to_ledger_entries[event_id]:
ledger_entry["event"]["properties"] = desired_properties_subset
num_events_enriched += 1

# Log an error if we did not enrich all the entries we asked for.
if num_events_enriched != len(event_id_to_ledger_entry):
if num_events_enriched != sum(len(le) for le in event_id_to_ledger_entries.values()):
self.logger.error("Unable to enrich all eligible credit ledger entries with event metadata.")

# Mutating entries within `event_id_to_ledger_entry` should have modified
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,32 @@ def test_credits_ledger_entries_enriches_selected_property_keys(
# Does not enrich, but still passes back, irrelevant (for enrichment purposes) ledger entry
assert enriched_entries[1] == original_entry_1

@responses.activate
def test_credits_ledger_entries_enriches_with_multiple_entries_per_event(mocker):
stream = CreditsLedgerEntries(string_event_properties_keys=["ping"])
ledger_entries = [{"event_id": "foo-event-id", "entry_type": "decrement"}, {"event_id": "foo-event-id", "entry_type": "decrement"}]
mock_response = {
"data": [
{
"customer_id": "foo-customer-id",
"event_name": "foo-name",
"id": "foo-event-id",
"properties": {"ping": "pong"},
"timestamp": "2022-02-21T07:00:00+00:00",
}
],
"pagination_metadata": {"has_more": False, "next_cursor": None},
}
responses.add(responses.POST, f"{stream.url_base}events", json=mock_response, status=200)
enriched_entries = stream.enrich_ledger_entries_with_event_data(ledger_entries)

# We expect both events are enriched correctly
assert enriched_entries == [
{"event": {"id": "foo-event-id", "properties": {"ping": "pong"}}, "entry_type": "decrement"},
{"event": {"id": "foo-event-id", "properties": {"ping": "pong"}}, "entry_type": "decrement"},
]



def test_supports_incremental(patch_incremental_base_class, mocker):
mocker.patch.object(IncrementalOrbStream, "cursor_field", "dummy_field")
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/orb.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ an Orb Account and API Key.

| Version | Date | Pull Request | Subject |
| --- | --- | --- | --- |
| 0.1.4 | 2022-10-07 | [17761](https://github.com/airbytehq/airbyte/pull/17761) | Fix bug with enriching ledger entries
| 0.1.3 | 2022-08-26 | [16017](https://github.com/airbytehq/airbyte/pull/16017) | Add credit block id to ledger entries
| 0.1.2 | 2022-04-20 | [11528](https://github.com/airbytehq/airbyte/pull/11528) | Add cost basis to ledger entries, update expiration date, sync only committed entries
| 0.1.1 | 2022-03-03 | [10839](https://github.com/airbytehq/airbyte/pull/10839) | Support ledger entries with numeric properties + schema fixes
Expand Down

0 comments on commit ba398ce

Please sign in to comment.