From 9709adf41e9e961c62878ba420cb321c27a096f8 Mon Sep 17 00:00:00 2001 From: Parker Mossman Date: Tue, 7 Feb 2023 10:29:59 -0800 Subject: [PATCH] Add SubscriptionUsage stream to Orb Source (#21951) * wip * mvp * big rework to make incremental possibly work * incremental might be working * add external_customer_id to subscriptions stream * keep state indexed by subscription_id * add a source defined primary key * move super call after assignments * start work on integration tests * format and more test setup * tests and cleanup * fix yield from * format fixes * fix type of new subscriptions property * add subscription with actual usage for integration test config * fix start date parsing and use correct keys in integration test * update docs * bump version from 0.1.4 to 0.1.5 and update changelog * make start_date a required field and set version 0.2.0 instead of 0.1.5 * major version bump from 0.1.4 to 1.0.0 to reflect backwards-incompatible change making start_date a required field * pass start_date in unit test * add sample start_date * start_date already converted * add start_date to sample_config * remove accidental char * auto-bump connector version --------- Co-authored-by: Octavia Squidington III --- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 15 +- .../connectors/source-orb/Dockerfile | 2 +- .../source-orb/acceptance-test-config.yml | 6 + .../integration_tests/abnormal_state.json | 5 + .../integration_tests/configured_catalog.json | 16 + .../integration_tests/sample_config.json | 3 +- .../integration_tests/sample_state.json | 5 + .../schemas/subscription_usage.json | 34 +++ .../source_orb/schemas/subscriptions.json | 3 + .../source-orb/source_orb/source.py | 279 ++++++++++++++++- .../source-orb/source_orb/spec.json | 14 +- .../unit_tests/test_incremental_streams.py | 284 +++++++++++++++++- .../source-orb/unit_tests/test_source.py | 4 +- .../source-orb/unit_tests/test_streams.py | 15 +- docs/integrations/sources/orb.md | 7 +- 16 files changed, 678 insertions(+), 16 deletions(-) create mode 100644 airbyte-integrations/connectors/source-orb/source_orb/schemas/subscription_usage.json diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 016af160f736..556800417963 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -1216,7 +1216,7 @@ - name: Orb sourceDefinitionId: 7f0455fb-4518-4ec0-b7a3-d808bf8081cc dockerRepository: airbyte/source-orb - dockerImageTag: 0.1.4 + dockerImageTag: 1.0.0 documentationUrl: https://docs.airbyte.com/integrations/sources/orb icon: orb.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 4e171d1b95fd..ee0cc4d57965 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -10600,7 +10600,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-orb:0.1.4" +- dockerImage: "airbyte/source-orb:1.0.0" spec: documentationUrl: "https://docs.withorb.com/" connectionSpecification: @@ -10609,6 +10609,7 @@ type: "object" required: - "api_key" + - "start_date" additionalProperties: true properties: api_key: @@ -10622,7 +10623,8 @@ title: "Start Date" pattern: "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$" description: "UTC date and time in the format 2022-03-01T00:00:00Z. Any\ - \ data with created_at before this data will not be synced." + \ data with created_at before this data will not be synced. For Subscription\ + \ Usage, this becomes the `timeframe_start` API parameter." examples: - "2022-03-01T00:00:00Z" order: 2 @@ -10651,6 +10653,15 @@ description: "Property key names to extract from all events, in order to\ \ enrich ledger entries corresponding to an event deduction." order: 5 + subscription_usage_grouping_key: + type: "string" + title: "Subscription usage grouping key (string value)" + description: "Property key name to group subscription usage by." + plan_id: + type: "string" + title: "Orb Plan ID for Subscription Usage (string value)" + description: "Orb Plan ID to filter subscriptions that should have usage\ + \ fetched." supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] diff --git a/airbyte-integrations/connectors/source-orb/Dockerfile b/airbyte-integrations/connectors/source-orb/Dockerfile index 0e60a42a61b0..ce95e5deed21 100644 --- a/airbyte-integrations/connectors/source-orb/Dockerfile +++ b/airbyte-integrations/connectors/source-orb/Dockerfile @@ -34,5 +34,5 @@ COPY source_orb ./source_orb ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.4 +LABEL io.airbyte.version=1.0.0 LABEL io.airbyte.name=airbyte/source-orb diff --git a/airbyte-integrations/connectors/source-orb/acceptance-test-config.yml b/airbyte-integrations/connectors/source-orb/acceptance-test-config.yml index 66464ba6b434..71a4457cf2d6 100644 --- a/airbyte-integrations/connectors/source-orb/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-orb/acceptance-test-config.yml @@ -4,6 +4,8 @@ connector_image: airbyte/source-orb:dev tests: spec: - spec_path: "source_orb/spec.json" + backward_compatibility_tests_config: + disable_for_version: "0.1.4" connection: - config_path: "secrets/config.json" status: "succeed" @@ -23,6 +25,10 @@ tests: # This points to a specific customer's credit ledger entries in the state, # and this customer is in the integration test account. credits_ledger_entries: ["hHQF5BT5jtyj9r7V", "created_at"] + + # This points to a specific subscription's usage entries in the state, + # and this subscription is in the integration test account. + subscription_usage: ["FDWRvxuBUiFfZech", "timeframe_start"] full_refresh: - config_path: "secrets/config.json" configured_catalog_path: "integration_tests/configured_catalog.json" diff --git a/airbyte-integrations/connectors/source-orb/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-orb/integration_tests/abnormal_state.json index 30b7f86dfa47..713104475acd 100644 --- a/airbyte-integrations/connectors/source-orb/integration_tests/abnormal_state.json +++ b/airbyte-integrations/connectors/source-orb/integration_tests/abnormal_state.json @@ -12,5 +12,10 @@ "hHQF5BT5jtyj9r7V": { "created_at": "2122-01-01T00:00:00Z" } + }, + "subscription_usage": { + "FDWRvxuBUiFfZech": { + "timeframe_start": "2122-01-01T00:00:00Z" + } } } diff --git a/airbyte-integrations/connectors/source-orb/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-orb/integration_tests/configured_catalog.json index 2aa869727544..1bbf8cb7db46 100644 --- a/airbyte-integrations/connectors/source-orb/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-orb/integration_tests/configured_catalog.json @@ -47,6 +47,22 @@ }, "sync_mode": "incremental", "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "subscription_usage", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["timeframe_start"], + "source_defined_primary_key": [ + ["subscription_id"], + ["billable_metric_id"], + ["timeframe_start"] + ] + }, + "sync_mode": "incremental", + "destination_sync_mode": "overwrite" } ] } diff --git a/airbyte-integrations/connectors/source-orb/integration_tests/sample_config.json b/airbyte-integrations/connectors/source-orb/integration_tests/sample_config.json index 0523701684a9..08ebf5bb7d0a 100644 --- a/airbyte-integrations/connectors/source-orb/integration_tests/sample_config.json +++ b/airbyte-integrations/connectors/source-orb/integration_tests/sample_config.json @@ -1,3 +1,4 @@ { - "api_key": "" + "api_key": "", + "start_date": "2023-01-25T00:00:00Z" } diff --git a/airbyte-integrations/connectors/source-orb/integration_tests/sample_state.json b/airbyte-integrations/connectors/source-orb/integration_tests/sample_state.json index 94323506a933..f053686d1fc6 100644 --- a/airbyte-integrations/connectors/source-orb/integration_tests/sample_state.json +++ b/airbyte-integrations/connectors/source-orb/integration_tests/sample_state.json @@ -12,5 +12,10 @@ "7c507794-7413-4467-8f1d-d3785a6c65ca": { "created_at": "2022-01-01T00:00:00Z" } + }, + "subscription_usage": { + "someId": { + "timeframe_start": "2022-01-01T00:00:00Z" + } } } diff --git a/airbyte-integrations/connectors/source-orb/source_orb/schemas/subscription_usage.json b/airbyte-integrations/connectors/source-orb/source_orb/schemas/subscription_usage.json new file mode 100644 index 000000000000..139349b1c9bb --- /dev/null +++ b/airbyte-integrations/connectors/source-orb/source_orb/schemas/subscription_usage.json @@ -0,0 +1,34 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": ["null", "object"], + "properties": { + "quantity": { + "type": "number" + }, + "timeframe_start": { + "type": "string", + "format": "date-time" + }, + "timeframe_end": { + "type": "string", + "format": "date-time" + }, + "billable_metric_name": { + "type": "string" + }, + "billable_metric_id": { + "type": "string" + }, + "subscription_id": { + "type": "string" + } + }, + "required": [ + "quantity", + "timeframe_start", + "timeframe_end", + "billable_metric_name", + "billable_metric_id", + "subscription_id" + ] +} diff --git a/airbyte-integrations/connectors/source-orb/source_orb/schemas/subscriptions.json b/airbyte-integrations/connectors/source-orb/source_orb/schemas/subscriptions.json index 41283384e7cd..c341eb43cb38 100644 --- a/airbyte-integrations/connectors/source-orb/source_orb/schemas/subscriptions.json +++ b/airbyte-integrations/connectors/source-orb/source_orb/schemas/subscriptions.json @@ -12,6 +12,9 @@ "customer_id": { "type": "string" }, + "external_customer_id": { + "type": ["null", "string"] + }, "start_date": { "type": ["null", "string"], "format": "date-time" diff --git a/airbyte-integrations/connectors/source-orb/source_orb/source.py b/airbyte-integrations/connectors/source-orb/source_orb/source.py index 8aeba8151272..fd7b31221f9f 100644 --- a/airbyte-integrations/connectors/source-orb/source_orb/source.py +++ b/airbyte-integrations/connectors/source-orb/source_orb/source.py @@ -164,10 +164,12 @@ def path(self, **kwargs) -> str: return "subscriptions" def transform_record(self, subscription_record): - # Un-nest customer -> id into customer_id + # Un-nest customer -> id, external_customer_id into customer_id,external_customer_id nested_customer_id = subscription_record["customer"]["id"] + nested_external_customer_id = subscription_record["customer"]["external_customer_id"] del subscription_record["customer"] subscription_record["customer_id"] = nested_customer_id + subscription_record["external_customer_id"] = nested_external_customer_id # Un-nest plan -> id into plan_id nested_plan_id = subscription_record["plan"]["id"] @@ -177,6 +179,266 @@ def transform_record(self, subscription_record): return subscription_record +# helpers for working with pendulum dates and strings +def to_datetime(time) -> Optional[pendulum.DateTime]: + if time is None: + return None + elif isinstance(time, pendulum.DateTime): + return time + elif isinstance(time, str): + return pendulum.parse(time) + else: + raise TypeError(f"Cannot convert input of type {type(time)} to DateTime") + + +def to_utc_isoformat(time) -> Optional[str]: + if time is None: + return None + elif isinstance(time, pendulum.DateTime): + return time.in_timezone("UTC").isoformat() + elif isinstance(time, str): + return pendulum.parse(time).in_timezone("UTC").isoformat() + else: + raise TypeError(f"Cannot convert input of type {type(time)} to isoformat") + + +def chunk_date_range(start_date: pendulum.DateTime, end_date: Optional[pendulum.DateTime] = None) -> Iterable[pendulum.Period]: + """ + Yields a list of the beginning and ending timestamps of each day between the start date and now. + The return value is a pendulum.period + """ + one_day = pendulum.duration(days=1) + end_date = end_date or pendulum.now() + + # Each stream_slice contains the beginning and ending timestamp for a 24 hour period + chunk_start_date = start_date + while chunk_start_date < end_date: + chunk_end_date = min(chunk_start_date + one_day, end_date) + yield pendulum.period(chunk_start_date, chunk_end_date) + chunk_start_date = chunk_end_date + # yield from empty list to avoid returning None in case chunk_start_date >= end_date + yield from [] + + +class SubscriptionUsage(IncrementalOrbStream): + """ + API Docs: https://docs.withorb.com/docs/orb-docs/api-reference/operations/get-a-subscription-usage + """ + + cursor_field = "timeframe_start" + + def __init__( + self, + start_date: pendulum.DateTime, + subscription_usage_grouping_key: Optional[str] = None, + plan_id: Optional[str] = None, + end_date: Optional[pendulum.DateTime] = None, + **kwargs, + ): + super().__init__(**kwargs) + self.subscription_usage_grouping_key = subscription_usage_grouping_key + self.plan_id = plan_id + self.start_date = start_date + # default to current time if end_date is unspecified + self.end_date = end_date if end_date else pendulum.now() + + @property + def primary_key(self) -> Iterable[str]: + key = ["subscription_id", "billable_metric_id", "timeframe_start"] + + # If a grouping key is present, it should be included in the primary key + if self.subscription_usage_grouping_key: + key.append(self.subscription_usage_grouping_key) + + return key + + def parse_response( + self, + response: requests.Response, + *, + stream_state: Mapping[str, Any], + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None, + ) -> Iterable[Mapping]: + subscription_id = stream_slice["subscription_id"] + + # Records are in a container array called data + response_json = response.json() + records = response_json.get("data", []) + for record in records: + yield from self.yield_transformed_subrecords(record, subscription_id) + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + # This API endpoint is not paginated, so there will never be a next page + return None + + def yield_transformed_subrecords(self, record, subscription_id): + # for each top level response record, there can be multiple sub-records depending + # on granularity and other input params. This function yields one transformed record + # for each subrecord in the response. + + subrecords = record.get("usage", []) + del record["usage"] + for subrecord in subrecords: + # skip records that don't contain any actual usage + if subrecord.get("quantity", 0) > 0: + # Merge the parent record with the sub record + output = {**record, **subrecord} + + # Add the subscription ID to the output + output["subscription_id"] = subscription_id + + # Un-nest billable_metric -> name,id into billable_metric_name and billable_metric_id + nested_billable_metric_name = output["billable_metric"]["name"] + nested_billable_metric_id = output["billable_metric"]["id"] + del output["billable_metric"] + output["billable_metric_name"] = nested_billable_metric_name + output["billable_metric_id"] = nested_billable_metric_id + + # If a group_by key is specified, un-nest it + if self.subscription_usage_grouping_key: + nested_key = output["metric_group"]["property_key"] + nested_value = output["metric_group"]["property_value"] + del output["metric_group"] + output[nested_key] = nested_value + yield output + yield from [] + + def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, **kwargs) -> MutableMapping[str, Any]: + """ + This is a non-paginated API that operates on specified timeframe_start/timeframe_end + windows. + + Request params are based on the specific slice (i.e. subscription_id) we are requesting for, + and so we need to pull out relevant slice state from the stream state. + + Force granularity to 'day' so that this stream can be used incrementally, + with a day-based "cursor" based on timeframe_start and timeframe_end + + If a subscription_usage_grouping_key is present, adds a `group_by` param + and `billable_metric_id` param from the stream slice. This is because + the API requires a specific `billable_metric_id` to be set when using a + `group_by` key. + """ + + params = { + "granularity": "day", + "timeframe_start": to_utc_isoformat(stream_slice["timeframe_start"]), + "timeframe_end": to_utc_isoformat(stream_slice["timeframe_end"]), + } + + if self.subscription_usage_grouping_key: + params["group_by"] = self.subscription_usage_grouping_key + + # if a group_by key is specified, assume the stream slice contains a billable_metric_id + params["billable_metric_id"] = stream_slice["billable_metric_id"] + + return params + + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: + current_stream_state = current_stream_state or {} + current_subscription_id = latest_record["subscription_id"] + current_subscription_state = current_stream_state.get(current_subscription_id, {}) + + record_cursor_value = to_datetime(latest_record[self.cursor_field]) + state_cursor_value = to_datetime(current_subscription_state.get(self.cursor_field, self.start_date)) + max_cursor_value = max(record_cursor_value, state_cursor_value) + + current_subscription_state[self.cursor_field] = to_utc_isoformat(max_cursor_value) + + return {**current_stream_state, current_subscription_id: current_subscription_state} + + def path(self, stream_slice: Mapping[str, Any] = None, **kwargs): + """ + Orb does not support querying for all subscription usage in an unscoped + way, so the path here is dependent on the stream_slice, which determines + the `subscription_id`. + """ + subscription_id = stream_slice["subscription_id"] + return f"subscriptions/{subscription_id}/usage" + + def get_billable_metric_ids_by_plan_id(self) -> Mapping[str, Any]: + metric_ids_by_plan_id = {} + + for plan in Plans(authenticator=self._session.auth).read_records(sync_mode=SyncMode.full_refresh): + # if a plan_id filter is specified, skip any plan that doesn't match + if self.plan_id and plan["id"] != self.plan_id: + continue + + prices = plan.get("prices", []) + metric_ids_by_plan_id[plan["id"]] = [(price.get("billable_metric") or {}).get("id") for price in prices] + + # self.logger.warning("returning %s from get_billable_metric_ids", metric_ids_by_plan_id) + return metric_ids_by_plan_id + + def get_json_schema(self) -> Mapping[str, Any]: + """ + This schema differs from `subscription_usage.json` based on the configuration + of the Stream. If a group_by key is specified, the stream will output + records that contain the group_key name and value. + """ + schema = super().get_json_schema() + if self.subscription_usage_grouping_key: + schema["properties"][self.subscription_usage_grouping_key] = {"type": "string"} + + return schema + + def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: + """ + This stream is sliced per `subscription_id` and day, as well as `billable_metric_id` + if a grouping key is provided. This is because the API only supports a + single billable_metric_id per API call when using a group_by param. + + """ + stream_state = stream_state or {} + slice_yielded = False + subscriptions_stream = Subscriptions(authenticator=self._session.auth) + + # if using a group_by key, populate prices_by_plan_id so that each + # billable metric will get its own slice + if self.subscription_usage_grouping_key: + metric_ids_by_plan_id = self.get_billable_metric_ids_by_plan_id() + + for subscription in subscriptions_stream.read_records(sync_mode=SyncMode.full_refresh): + subscription_id = subscription["id"] + subscription_plan_id = subscription["plan_id"] + + # if filtering subscription usage by plan ID, skip any subscription that doesn't match the plan_id + if self.plan_id and subscription_plan_id != self.plan_id: + continue + + subscription_state = stream_state.get(subscription_id, {}) + start_date = to_datetime(subscription_state.get(self.cursor_field, self.start_date)) + end_date = to_datetime(self.end_date) + + # create one slice for each day of usage between the start and end date + for period in chunk_date_range(start_date=start_date, end_date=end_date): + slice = { + "subscription_id": subscription_id, + "timeframe_start": to_utc_isoformat(period.start), + "timeframe_end": to_utc_isoformat(period.end), + } + + # if using a group_by key, yield one slice per billable_metric_id. + # otherwise, yield slices without a billable_metric_id because + # each API call will return usage broken down by billable metric + # when grouping isn't used. + if self.subscription_usage_grouping_key: + metric_ids = metric_ids_by_plan_id.get(subscription_plan_id) + if metric_ids is not None: + for metric_id in metric_ids: + # self.logger.warning("stream_slices is about to yield the following slice: %s", slice) + yield {**slice, "billable_metric_id": metric_id} + slice_yielded = True + else: + # self.logger.warning("stream_slices is about to yield the following slice: %s", slice) + yield slice + slice_yielded = True + if not slice_yielded: + # yield an empty slice to checkpoint state later + yield {} + + class Plans(IncrementalOrbStream): """ API Docs: https://docs.withorb.com/reference/list-plans @@ -430,12 +692,15 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: lookback_window = config.get("lookback_window_days") string_event_properties_keys = config.get("string_event_properties_keys") numeric_event_properties_keys = config.get("numeric_event_properties_keys") + subscription_usage_grouping_key = config.get("subscription_usage_grouping_key") + plan_id = config.get("plan_id") + start_date = to_datetime(config.get("start_date")) + # this field is not exposed to spec, used only for testing purposes + end_date = to_datetime(config.get("end_date")) if not self.input_keys_mutually_exclusive(string_event_properties_keys, numeric_event_properties_keys): raise ValueError("Supplied property keys for string and numeric valued property values must be mutually exclusive.") - start_date_str = config.get("start_date") - start_date = pendulum.parse(start_date_str) if start_date_str else None return [ Customers(authenticator=authenticator, lookback_window_days=lookback_window, start_date=start_date), Subscriptions(authenticator=authenticator, lookback_window_days=lookback_window, start_date=start_date), @@ -447,4 +712,12 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: string_event_properties_keys=string_event_properties_keys, numeric_event_properties_keys=numeric_event_properties_keys, ), + SubscriptionUsage( + authenticator=authenticator, + lookback_window_days=lookback_window, + start_date=start_date, + end_date=end_date, + plan_id=plan_id, + subscription_usage_grouping_key=subscription_usage_grouping_key, + ), ] diff --git a/airbyte-integrations/connectors/source-orb/source_orb/spec.json b/airbyte-integrations/connectors/source-orb/source_orb/spec.json index 37f9d771edb5..423d85cf6484 100644 --- a/airbyte-integrations/connectors/source-orb/source_orb/spec.json +++ b/airbyte-integrations/connectors/source-orb/source_orb/spec.json @@ -4,7 +4,7 @@ "$schema": "http://json-schema.org/draft-07/schema#", "title": "Orb Spec", "type": "object", - "required": ["api_key"], + "required": ["api_key", "start_date"], "additionalProperties": true, "properties": { "api_key": { @@ -18,7 +18,7 @@ "type": "string", "title": "Start Date", "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$", - "description": "UTC date and time in the format 2022-03-01T00:00:00Z. Any data with created_at before this data will not be synced.", + "description": "UTC date and time in the format 2022-03-01T00:00:00Z. Any data with created_at before this data will not be synced. For Subscription Usage, this becomes the `timeframe_start` API parameter.", "examples": ["2022-03-01T00:00:00Z"], "order": 2 }, @@ -47,6 +47,16 @@ "title": "Event properties keys (numeric values)", "description": "Property key names to extract from all events, in order to enrich ledger entries corresponding to an event deduction.", "order": 5 + }, + "subscription_usage_grouping_key": { + "type": "string", + "title": "Subscription usage grouping key (string value)", + "description": "Property key name to group subscription usage by." + }, + "plan_id": { + "type": "string", + "title": "Orb Plan ID for Subscription Usage (string value)", + "description": "Orb Plan ID to filter subscriptions that should have usage fetched." } } } diff --git a/airbyte-integrations/connectors/source-orb/unit_tests/test_incremental_streams.py b/airbyte-integrations/connectors/source-orb/unit_tests/test_incremental_streams.py index ebffe43548d1..14b9ef63dc8a 100644 --- a/airbyte-integrations/connectors/source-orb/unit_tests/test_incremental_streams.py +++ b/airbyte-integrations/connectors/source-orb/unit_tests/test_incremental_streams.py @@ -8,7 +8,7 @@ import responses from airbyte_cdk.models import SyncMode from pytest import fixture -from source_orb.source import CreditsLedgerEntries, Customers, IncrementalOrbStream, OrbStream +from source_orb.source import CreditsLedgerEntries, Customers, IncrementalOrbStream, OrbStream, Plans, Subscriptions, SubscriptionUsage @fixture @@ -285,6 +285,288 @@ def test_credits_ledger_entries_enriches_with_multiple_entries_per_event(mocker) {"event": {"id": "foo-event-id", "properties": {"ping": "pong"}}, "entry_type": "decrement"}, ] +# We have specific unit tests for SubscriptionUsage incremental stream +# because its logic differs from other IncrementalOrbStreams + + +@pytest.mark.parametrize( + ("current_stream_state", "latest_record", "expected_state"), + [ + # Updates for matching customer already in state + ( + dict(subscription_id_foo=dict(timeframe_start="2022-01-25T12:00:00+00:00")), + dict(timeframe_start="2022-01-26T12:00:00+00:00", subscription_id="subscription_id_foo"), + dict(subscription_id_foo=dict(timeframe_start="2022-01-26T12:00:00+00:00")), + ), + # No state for subscription + ( + {}, + dict(timeframe_start="2022-01-26T12:00:00+00:00", subscription_id="subscription_id_foo"), + dict(subscription_id_foo=dict(timeframe_start="2022-01-26T12:00:00+00:00")), + ), + # State has different subscription than latest record + ( + dict(subscription_id_foo=dict(timeframe_start="2022-01-25T12:00:00+00:00")), + dict(timeframe_start="2022-01-26T12:00:00+00:00", subscription_id="subscription_id_bar"), + dict( + subscription_id_foo=dict(timeframe_start="2022-01-25T12:00:00+00:00"), + subscription_id_bar=dict(timeframe_start="2022-01-26T12:00:00+00:00"), + ), + ), + ], +) +def test_subscription_usage_get_updated_state(mocker, current_stream_state, latest_record, expected_state): + stream = SubscriptionUsage(start_date="2022-01-25T12:00:00+00:00", end_date="2022-01-26T12:00:00+00:00") + inputs = {"current_stream_state": current_stream_state, "latest_record": latest_record} + assert stream.get_updated_state(**inputs) == expected_state + + +def test_subscription_usage_stream_slices(mocker): + mocker.patch.object( + Subscriptions, "read_records", return_value=iter([ + {"id": "1", "plan_id": "2"}, + {"id": "11", "plan_id": "2"}, + {"id": "111", "plan_id": "3"} # should be ignored because plan_id set to 2 + ]) + ) + stream = SubscriptionUsage(plan_id="2", start_date="2022-01-25T12:00:00+00:00", end_date="2022-01-26T12:00:00+00:00") + inputs = {"sync_mode": SyncMode.incremental, "cursor_field": [], "stream_state": {}} + expected_stream_slice = [ + {"subscription_id": "1", "timeframe_start": "2022-01-25T12:00:00+00:00", "timeframe_end": "2022-01-26T12:00:00+00:00"}, + {"subscription_id": "11", "timeframe_start": "2022-01-25T12:00:00+00:00", "timeframe_end": "2022-01-26T12:00:00+00:00"} + ] + assert list(stream.stream_slices(**inputs)) == expected_stream_slice + + +def test_subscription_usage_stream_slices_with_grouping_key(mocker): + mocker.patch.object( + Subscriptions, "read_records", return_value=iter([ + {"id": "1", "plan_id": "2"}, + {"id": "11", "plan_id": "2"}, + {"id": "111", "plan_id": "3"} # should be ignored because plan_id set to 2 + ]) + ) + + mocker.patch.object( + Plans, "read_records", return_value=iter([ + {"id": "2", "prices": [ + {"billable_metric": {"id": "billableMetricIdA"}}, + {"billable_metric": {"id": "billableMetricIdB"}} + ]}, + {"id": "3", "prices": [ # should be ignored because plan_id is set to 2 + {"billable_metric": {"id": "billableMetricIdC"}} + ]} + ]) + ) + + # when a grouping_key is present, one slice per billable_metric is created because the Orb API + # requires one API call per billable metric if the group_by param is in use. + stream = SubscriptionUsage(plan_id="2", subscription_usage_grouping_key="groupKey", start_date="2022-01-25T12:00:00+00:00", end_date="2022-01-26T12:00:00+00:00") + inputs = {"sync_mode": SyncMode.incremental, "cursor_field": [], "stream_state": {}} + + # one slice per billable metric per subscription that matches the input plan + expected_stream_slice = [ + {"subscription_id": "1", "billable_metric_id": "billableMetricIdA", "timeframe_start": "2022-01-25T12:00:00+00:00", "timeframe_end": "2022-01-26T12:00:00+00:00"}, + {"subscription_id": "1", "billable_metric_id": "billableMetricIdB", "timeframe_start": "2022-01-25T12:00:00+00:00", "timeframe_end": "2022-01-26T12:00:00+00:00"}, + {"subscription_id": "11", "billable_metric_id": "billableMetricIdA", "timeframe_start": "2022-01-25T12:00:00+00:00", "timeframe_end": "2022-01-26T12:00:00+00:00"}, + {"subscription_id": "11", "billable_metric_id": "billableMetricIdB", "timeframe_start": "2022-01-25T12:00:00+00:00", "timeframe_end": "2022-01-26T12:00:00+00:00"}, + ] + assert list(stream.stream_slices(**inputs)) == expected_stream_slice + + +@pytest.mark.parametrize( + ("current_stream_state", "current_stream_slice", "grouping_key"), + [ + # Slice matches subscription in state, no grouping + ( + dict(subscription_id_foo=dict(timeframe_start="2022-01-25T12:00:00+00:00")), + dict(subscription_id="subscription_id_foo", timeframe_start="2022-01-25T12:00:00+00:00", timeframe_end="2022-01-26T12:00:00+00:00"), + None + ), + # Slice does not match subscription in state, no grouping + ( + dict(subscription_id_foo=dict(timeframe_start="2022-01-25T12:00:00+00:00")), + dict(subscription_id="subscription_id_bar", timeframe_start="2022-01-25T12:00:00+00:00", timeframe_end="2022-01-26T12:00:00+00:00"), + None + ), + # No existing state, no grouping + ( + {}, + dict(subscription_id="subscription_id_baz", timeframe_start="2022-01-25T12:00:00+00:00", timeframe_end="2022-01-26T12:00:00+00:00"), + None + ), + # Slice matches subscription in state, with grouping + ( + dict(subscription_id_foo=dict(timeframe_start="2022-01-25T12:00:00+00:00")), + dict(subscription_id="subscription_id_foo", billable_metric_id="billableMetricA", timeframe_start="2022-01-25T12:00:00+00:00", timeframe_end="2022-01-26T12:00:00+00:00"), + "group_key_foo" + ), + # Slice does not match subscription in state, with grouping + ( + dict(subscription_id_foo=dict(timeframe_start="2022-01-25T12:00:00+00:00")), + dict(subscription_id="subscription_id_bar", billable_metric_id="billableMetricA", timeframe_start="2022-01-25T12:00:00+00:00", timeframe_end="2022-01-26T12:00:00+00:00"), + "group_key_foo" + ), + # No existing state, with grouping + ( + {}, + dict(subscription_id="subscription_id_baz", billable_metric_id="billableMetricA", timeframe_start="2022-01-25T12:00:00+00:00", timeframe_end="2022-01-26T12:00:00+00:00"), + "group_key_foo" + ), + ], +) +def test_subscription_usage_request_params(mocker, current_stream_state, current_stream_slice, grouping_key): + if grouping_key: + stream = SubscriptionUsage(start_date="2022-01-25T12:00:00+00:00", end_date="2022-01-26T12:00:00+00:00", subscription_usage_grouping_key=grouping_key) + else: + stream = SubscriptionUsage(start_date="2022-01-25T12:00:00+00:00", end_date="2022-01-26T12:00:00+00:00") + + inputs = {"stream_state": current_stream_state, "stream_slice": current_stream_slice} + expected_params = dict(granularity="day") + + # always pull the timeframe_start and timeframe_end from the stream slice + expected_params["timeframe_start"] = current_stream_slice["timeframe_start"] + expected_params["timeframe_end"] = current_stream_slice["timeframe_end"] + + # if a grouping_key is present, add the group_by and billable_metric_id to params + if grouping_key: + expected_params["group_by"] = grouping_key + expected_params["billable_metric_id"] = current_stream_slice["billable_metric_id"] + + assert stream.request_params(**inputs) == expected_params + + +def test_subscription_usage_yield_transformed_subrecords(mocker): + stream = SubscriptionUsage(start_date="2022-01-25T12:00:00+00:00", end_date="2022-01-26T12:00:00+00:00") + + subscription_usage_response = { + "billable_metric": { + "name": "Metric A", + "id": "billableMetricA" + }, + "usage": [ + { + "quantity": 0, + "timeframe_start": "2022-01-25T12:00:00+00:00", + "timeframe_end": "2022-01-26T12:00:00+00:00" + }, + { + "quantity": 1, + "timeframe_start": "2022-01-25T12:00:00+00:00", + "timeframe_end": "2022-01-26T12:00:00+00:00" + }, + { + "quantity": 2, + "timeframe_start": "2022-01-26T12:00:00+00:00", + "timeframe_end": "2022-01-27T12:00:00+00:00" + } + ], + "otherTopLevelField": { + "shouldBeIncluded": "true" + } + } + + subscription_id = "subscriptionIdA" + + # Validate that one record is yielded per non-zero usage subrecord + expected = [ + { + "quantity": 1, + "timeframe_start": "2022-01-25T12:00:00+00:00", + "timeframe_end": "2022-01-26T12:00:00+00:00", + "billable_metric_name": "Metric A", + "billable_metric_id": "billableMetricA", + "otherTopLevelField": { + "shouldBeIncluded": "true" + }, + "subscription_id": subscription_id + }, + { + "quantity": 2, + "timeframe_start": "2022-01-26T12:00:00+00:00", + "timeframe_end": "2022-01-27T12:00:00+00:00", + "billable_metric_name": "Metric A", + "billable_metric_id": "billableMetricA", + "otherTopLevelField": { + "shouldBeIncluded": "true" + }, + "subscription_id": subscription_id + } + ] + + actual_output = list(stream.yield_transformed_subrecords(subscription_usage_response, subscription_id)) + + assert actual_output == expected + + +def test_subscription_usage_yield_transformed_subrecords_with_grouping(mocker): + stream = SubscriptionUsage(start_date="2022-01-25T12:00:00+00:00", end_date="2022-01-26T12:00:00+00:00", subscription_usage_grouping_key="grouping_key") + + subscription_usage_response = { + "billable_metric": { + "name": "Metric A", + "id": "billableMetricA" + }, + "metric_group": { + "property_key": "grouping_key", + "property_value": "grouping_value" + }, + "usage": [ + { + "quantity": 0, + "timeframe_start": "2022-01-25T12:00:00+00:00", + "timeframe_end": "2022-01-26T12:00:00+00:00" + }, + { + "quantity": 1, + "timeframe_start": "2022-01-25T12:00:00+00:00", + "timeframe_end": "2022-01-26T12:00:00+00:00" + }, + { + "quantity": 2, + "timeframe_start": "2022-01-26T12:00:00+00:00", + "timeframe_end": "2022-01-27T12:00:00+00:00" + } + ], + "otherTopLevelField": { + "shouldBeIncluded": "true" + } + } + + subscription_id = "subscriptionIdA" + + # Validate that one record is yielded per non-zero usage subrecord + expected = [ + { + "quantity": 1, + "timeframe_start": "2022-01-25T12:00:00+00:00", + "timeframe_end": "2022-01-26T12:00:00+00:00", + "billable_metric_name": "Metric A", + "billable_metric_id": "billableMetricA", + "otherTopLevelField": { + "shouldBeIncluded": "true" + }, + "subscription_id": subscription_id, + "grouping_key": "grouping_value" + }, + { + "quantity": 2, + "timeframe_start": "2022-01-26T12:00:00+00:00", + "timeframe_end": "2022-01-27T12:00:00+00:00", + "billable_metric_name": "Metric A", + "billable_metric_id": "billableMetricA", + "otherTopLevelField": { + "shouldBeIncluded": "true" + }, + "subscription_id": subscription_id, + "grouping_key": "grouping_value" + } + ] + + actual_output = list(stream.yield_transformed_subrecords(subscription_usage_response, subscription_id)) + + assert actual_output == expected + def test_supports_incremental(patch_incremental_base_class, mocker): mocker.patch.object(IncrementalOrbStream, "cursor_field", "dummy_field") diff --git a/airbyte-integrations/connectors/source-orb/unit_tests/test_source.py b/airbyte-integrations/connectors/source-orb/unit_tests/test_source.py index ce6d9683aa02..ac994103f91c 100644 --- a/airbyte-integrations/connectors/source-orb/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-orb/unit_tests/test_source.py @@ -32,8 +32,8 @@ def test_check_connection_fail(mocker): def test_streams(mocker): source = SourceOrb() config_mock = MagicMock() - sample_config = {"api_key": "test-token"} + sample_config = {"api_key": "test-token", "start_date": "2023-01-25T00:00:00Z"} config_mock.get.side_effect = sample_config.get streams = source.streams(config_mock) - expected_streams_number = 4 + expected_streams_number = 5 assert len(streams) == expected_streams_number diff --git a/airbyte-integrations/connectors/source-orb/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-orb/unit_tests/test_streams.py index e79c208dee0d..ce4b03dfbdcd 100644 --- a/airbyte-integrations/connectors/source-orb/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-orb/unit_tests/test_streams.py @@ -3,7 +3,7 @@ # import pytest -from source_orb.source import CreditsLedgerEntries, OrbStream +from source_orb.source import CreditsLedgerEntries, OrbStream, SubscriptionUsage @pytest.fixture @@ -70,3 +70,16 @@ def test_credit_ledger_entries_schema(patch_base_class, mocker, event_properties else: for property_key in event_properties_keys: assert property_key in json_schema["properties"]["event"]["properties"]["properties"]["properties"] + + +@pytest.mark.parametrize("group_by_key", ["foo-key", None]) +def test_subscription_usage_schema(patch_base_class, mocker, group_by_key): + stream = SubscriptionUsage(start_date="2022-01-01T00:00:00Z", subscription_usage_grouping_key=group_by_key) + json_schema = stream.get_json_schema() + + if group_by_key is None: + # no additional key added to schema + assert len(json_schema["properties"]) == 6 + else: + assert len(json_schema["properties"]) == 7 + assert group_by_key in json_schema["properties"] diff --git a/docs/integrations/sources/orb.md b/docs/integrations/sources/orb.md index c0c5cd099ded..1fcf95e0ba5b 100644 --- a/docs/integrations/sources/orb.md +++ b/docs/integrations/sources/orb.md @@ -13,13 +13,15 @@ This Source is capable of syncing the following core resources, each of which ha * [Plans](https://docs.withorb.com/docs/orb-docs/api-reference/operations/list-plans) * [Customers](https://docs.withorb.com/docs/orb-docs/api-reference/operations/list-customers) * [Credits Ledger Entries](https://docs.withorb.com/docs/orb-docs/api-reference/operations/get-a-customer-credit-ledger) - +* [Subscription Usage](https://docs.withorb.com/docs/orb-docs/api-reference/operations/get-a-subscription-usage) As a caveat, the Credits Ledger Entries must read all Customers for an incremental sync, but will only incrementally return new ledger entries for each customers. +Similarily, the Subscription Usage stream must read all Subscriptions for an incremental sync (and all Plans if using the optional `subscription_usage_grouping_key`), but will only incrementally return new usage entries for each subscription. + ### Note on Incremental Syncs -The Orb API does not allow querying objects based on an `updated_at` time. Therefore, this connector uses the `created_at` field to query for new data since the last sync. +The Orb API does not allow querying objects based on an `updated_at` time. Therefore, this connector uses the `created_at` field (or the `timeframe_start` field in the Subscription Usage stream) to query for new data since the last sync. In order to capture data that has been updated after creation, please run a periodic Full Refresh. @@ -52,6 +54,7 @@ an Orb Account and API Key. | Version | Date | Pull Request | Subject | | --- | --- | --- | --- | +| 1.0.0 | 2023-02-02 | [21951](https://github.com/airbytehq/airbyte/pull/21951) | Add SubscriptionUsage stream, and made `start_date` a required field | 0.1.4 | 2022-10-07 | [17761](https://github.com/airbytehq/airbyte/pull/17761) | Fix bug with enriching ledger entries with multiple credit blocks | 0.1.3 | 2022-08-26 | [16017](https://github.com/airbytehq/airbyte/pull/16017) | Add credit block id to ledger entries | 0.1.2 | 2022-04-20 | [11528](https://github.com/airbytehq/airbyte/pull/11528) | Add cost basis to ledger entries, update expiration date, sync only committed entries