From a74a943caab46e51a63c2933ce680aa9a345e7d9 Mon Sep 17 00:00:00 2001 From: Marvin Liu Date: Wed, 7 Sep 2022 10:36:00 -0700 Subject: [PATCH] feat: add ingestion_metadata field (#36) --- examples/flask_example/flaskapp.py | 2 +- src/amplitude/__init__.py | 2 +- src/amplitude/config.py | 7 ++- src/amplitude/event.py | 71 ++++++++++++++++++++++++++++-- src/amplitude/plugin.py | 13 ++++-- src/test/test_event.py | 45 ++++++++++++++++++- 6 files changed, 128 insertions(+), 12 deletions(-) diff --git a/examples/flask_example/flaskapp.py b/examples/flask_example/flaskapp.py index 8abb3c5..62eab29 100644 --- a/examples/flask_example/flaskapp.py +++ b/examples/flask_example/flaskapp.py @@ -48,7 +48,7 @@ def track_revenue(user_id): @app.route("/flush") def flush_event(): amp_client.flush() - return f"

All events flushed

" + return "

All events flushed

" if __name__ == "__main__": diff --git a/src/amplitude/__init__.py b/src/amplitude/__init__.py index 71790fa..dbb5a80 100644 --- a/src/amplitude/__init__.py +++ b/src/amplitude/__init__.py @@ -3,7 +3,7 @@ from amplitude.client import Amplitude from amplitude.event import BaseEvent, EventOptions, Identify, Revenue, IdentifyEvent, \ - GroupIdentifyEvent, RevenueEvent, Plan + GroupIdentifyEvent, RevenueEvent, Plan, IngestionMetadata from amplitude.config import Config from amplitude.constants import PluginType from amplitude.plugin import EventPlugin, DestinationPlugin diff --git a/src/amplitude/config.py b/src/amplitude/config.py index ac70308..1674bca 100644 --- a/src/amplitude/config.py +++ b/src/amplitude/config.py @@ -8,7 +8,7 @@ from typing import Optional, Callable from amplitude import constants -from amplitude.event import BaseEvent, Plan +from amplitude.event import BaseEvent, Plan, IngestionMetadata from amplitude.storage import InMemoryStorageProvider, StorageProvider, Storage @@ -33,6 +33,7 @@ class Config: storage_provider (amplitude.storage.StorageProvider, optional): Default to InMemoryStorageProvider. Provide storage instance for events buffer. plan (amplitude.event.Plan, optional): Tracking plan information. Default to None. + ingestion_metadata (amplitude.event.IngestionMetadata, optional): Ingestion metadata. Default to None. Properties: options: A dictionary contains minimum id length information. None if min_id_length not set. @@ -55,7 +56,8 @@ def __init__(self, api_key: str = None, use_batch: bool = False, server_url: Optional[str] = None, storage_provider: StorageProvider = InMemoryStorageProvider(), - plan: Plan = None): + plan: Plan = None, + ingestion_metadata: IngestionMetadata = None): """The constructor of Config class""" self.api_key: str = api_key self._flush_queue_size: int = flush_queue_size @@ -71,6 +73,7 @@ def __init__(self, api_key: str = None, self.storage_provider: StorageProvider = storage_provider self.opt_out: bool = False self.plan: Plan = plan + self.ingestion_metadata: IngestionMetadata = ingestion_metadata def get_storage(self) -> Storage: """Use configured StorageProvider to create a Storage instance then return. diff --git a/src/amplitude/event.py b/src/amplitude/event.py index 3d25bb3..92e557b 100644 --- a/src/amplitude/event.py +++ b/src/amplitude/event.py @@ -5,10 +5,11 @@ BaseEvent: Basic event class. Subclass of EventOptions. Identify: A class used to create identify and group identify event. IdentifyEvent: A special event class. Used to update user properties without an actual event. - GroupIdentifyEvent: A special event class. Used to update group properties without an actual event + GroupIdentifyEvent: A special event class. Used to update group properties without an actual event. Revenue: A class used to create revenue event. RevenueEvent: A special event class. Used to record revenue information. Plan: Tracking plan info includes branch, source, version, version_id. + IngestionMetadata: Ingestion metadata includes source name, source version. """ import copy @@ -71,7 +72,52 @@ def get_plan_body(self): result[PLAN_KEY_MAPPING[key][0]] = self.__dict__[key] else: logger.error( - f"Plan.{key} expected {PLAN_KEY_MAPPING[key][1]} but received {type(self.__dict__[key])}.") + f"{type(self).__name__}.{key} expected {PLAN_KEY_MAPPING[key][1]} but received {type(self.__dict__[key])}.") + return result + + +INGESTION_METADATA_KEY_MAPPING = { + "source_name": ["source_name", str], + "source_version": ["source_version", str], +} + + +class IngestionMetadata: + """IngestionMetadata holds metadata information. Instance of IngestionMetadata class can be value of event's `ingestion_metadata` attribute. + + Args: + source_name (str, optional): Name of the ingestion source in metadata. + source_version (str, optional): Version of the ingestion source in metadata. + + Methods: + get_body(): return a dict object that contains ingestion metadata information. + """ + + def __init__(self, source_name: Optional[str] = None, source_version: Optional[str] = None): + """The constructor for the IngestionMetadata class + + Args: + source_name (str, optional): Name of the ingestion source in metadata. + source_version (str, optional): Version of the ingestion source in metadata. + """ + self.source_name: Optional[str] = source_name + self.source_version: Optional[str] = source_version + + def get_body(self): + """Convert this object instance to dict instance + + Returns: + A dictionary with data of this object instance + """ + result = {} + for key in INGESTION_METADATA_KEY_MAPPING: + if not self.__dict__[key]: + continue + if isinstance(self.__dict__[key], INGESTION_METADATA_KEY_MAPPING[key][1]): + result[INGESTION_METADATA_KEY_MAPPING[key][0]] = self.__dict__[key] + else: + logger.error( + f"{type(self).__name__}.{key} expected {INGESTION_METADATA_KEY_MAPPING[key][1]} but received {type(self.__dict__[key])}.") return result @@ -113,6 +159,7 @@ def get_plan_body(self): "insert_id": ["insert_id", str], "library": ["library", str], "plan": ["plan", Plan], + "ingestion_metadata": ["ingestion_metadata", IngestionMetadata], "group_properties": ["group_properties", dict], "partner_id": ["partner_id", str], "version_name": ["version_name", str] @@ -158,6 +205,7 @@ class EventOptions: insert_id (str, optional): A unique identifier for the event. Events sent with the same insert_id and device_id we have already seen before within the past 7 days will be deduplicated. plan (Plan, optional): Tracking plan properties. + ingestion_metadata (IngestionMetadata, optional): Ingestion metadata. partner_id (str, optional): The partner id. version_name (str, optional): The version name. callback (callable, optional): Event level callback method. Triggered when event is sent or failed. Take three @@ -167,7 +215,7 @@ class EventOptions: retry (int): The retry attempt of the event instance. Methods: - get_event_body(): Retrun a dictionary with data of the event instance + get_event_body(): Return a dictionary with data of the event instance callback(code, message): Trigger callback method of the event instance. """ @@ -203,6 +251,7 @@ def __init__(self, user_id: Optional[str] = None, session_id: Optional[int] = None, insert_id: Optional[str] = None, plan: Optional[Plan] = None, + ingestion_metadata: Optional[IngestionMetadata] = None, partner_id: Optional[str] = None, version_name: Optional[str] = None, callback=None): @@ -240,6 +289,7 @@ def __init__(self, user_id: Optional[str] = None, self.insert_id: Optional[str] = None self.library: Optional[str] = None self.plan: Optional[Plan] = None + self.ingestion_metadata: Optional[IngestionMetadata] = None self.partner_id: Optional[str] = None self.version_name: Optional[str] = None self["user_id"] = user_id @@ -274,6 +324,7 @@ def __init__(self, user_id: Optional[str] = None, self["session_id"] = session_id self["insert_id"] = insert_id self["plan"] = plan + self["ingestion_metadata"] = ingestion_metadata self["partner_id"] = partner_id self["version_name"] = version_name self.event_callback: Optional[Callable[[EventOptions, int, Optional[str]], None]] = callback @@ -309,6 +360,8 @@ def get_event_body(self) -> dict: event_body[value[0]] = self[key] if "plan" in event_body: event_body["plan"] = event_body["plan"].get_plan_body() + if "ingestion_metadata" in event_body: + event_body["ingestion_metadata"] = event_body["ingestion_metadata"].get_body() for properties in ["user_properties", "event_properties", "group_properties"]: if properties in event_body: for key, value in event_body[properties].items(): @@ -394,6 +447,7 @@ class BaseEvent(EventOptions): insert_id (str, optional): A unique identifier for the event. Events sent with the same insert_id and device_id we have already seen before within the past 7 days will be deduplicated. plan (Plan, optional): Tracking plan properties. + ingestion_metadata (IngestionMetadata, optional): Ingestion metadata. partner_id (str, optional): The partner id. callback (callable, optional): Event level callback method. Triggered when event is sent or failed. Take three parameters: an event instance, an integer code of response status, an optional string message. @@ -439,6 +493,7 @@ def __init__(self, event_type: str, session_id: Optional[int] = None, insert_id: Optional[str] = None, plan: Optional[Plan] = None, + ingestion_metadata: Optional[IngestionMetadata] = None, partner_id: Optional[str] = None, callback: Optional[Callable[[EventOptions, int, Optional[str]], None]] = None): """The constructor of the BaseEvent class""" @@ -474,6 +529,7 @@ def __init__(self, event_type: str, session_id=session_id, insert_id=insert_id, plan=plan, + ingestion_metadata=ingestion_metadata, partner_id=partner_id, callback=callback) self.event_type: str = event_type @@ -730,6 +786,7 @@ class GroupIdentifyEvent(BaseEvent): insert_id (str, optional): A unique identifier for the event. Events sent with the same insert_id and device_id we have already seen before within the past 7 days will be deduplicated. plan (Plan, optional): Tracking plan properties. + ingestion_metadata (IngestionMetadata, optional): Ingestion metadata. partner_id (str, optional): The partner id. callback (callable, optional): Event level callback method. Triggered when event is sent or failed. Take three parameters: an event instance, an integer code of response status, an optional string message. @@ -772,6 +829,7 @@ def __init__(self, user_id: Optional[str] = None, session_id: Optional[int] = None, insert_id: Optional[str] = None, plan: Optional[Plan] = None, + ingestion_metadata: Optional[IngestionMetadata] = None, partner_id: Optional[str] = None, callback: Optional[Callable[[EventOptions, int, Optional[str]], None]] = None, identify_obj: Optional[Identify] = None): @@ -812,6 +870,7 @@ def __init__(self, user_id: Optional[str] = None, session_id=session_id, insert_id=insert_id, plan=plan, + ingestion_metadata=ingestion_metadata, partner_id=partner_id, callback=callback) if identify_obj: @@ -862,6 +921,7 @@ class IdentifyEvent(BaseEvent): insert_id (str, optional): A unique identifier for the event. Events sent with the same insert_id and device_id we have already seen before within the past 7 days will be deduplicated. plan (Plan, optional): Tracking plan properties. + ingestion_metadata (IngestionMetadata, optional): Ingestion metadata. partner_id (str, optional): The partner id. callback (callable, optional): Event level callback method. Triggered when event is sent or failed. Take three parameters: an event instance, an integer code of response status, an optional string message. @@ -904,6 +964,7 @@ def __init__(self, user_id: Optional[str] = None, session_id: Optional[int] = None, insert_id: Optional[str] = None, plan: Optional[Plan] = None, + ingestion_metadata: Optional[IngestionMetadata] = None, partner_id: Optional[str] = None, callback: Optional[Callable[[EventOptions, int, Optional[str]], None]] = None, identify_obj: Optional[Identify] = None): @@ -943,6 +1004,7 @@ def __init__(self, user_id: Optional[str] = None, session_id=session_id, insert_id=insert_id, plan=plan, + ingestion_metadata=ingestion_metadata, partner_id=partner_id, callback=callback) if identify_obj: @@ -1082,6 +1144,7 @@ class RevenueEvent(BaseEvent): insert_id (str, optional): A unique identifier for the event. Events sent with the same insert_id and device_id we have already seen before within the past 7 days will be deduplicated. plan (Plan, optional): Tracking plan properties. + ingestion_metadata (IngestionMetadata, optional): Ingestion metadata. partner_id (str, optional): The partner id. callback (callable, optional): Event level callback method. Triggered when event is sent or failed. Take three parameters: an event instance, an integer code of response status, an optional string message. @@ -1124,6 +1187,7 @@ def __init__(self, user_id: Optional[str] = None, session_id: Optional[int] = None, insert_id: Optional[str] = None, plan: Optional[Plan] = None, + ingestion_metadata: Optional[IngestionMetadata] = None, partner_id: Optional[str] = None, callback: Optional[Callable[[EventOptions, int, Optional[str]], None]] = None, revenue_obj: Optional[Revenue] = None): @@ -1164,6 +1228,7 @@ def __init__(self, user_id: Optional[str] = None, session_id=session_id, insert_id=insert_id, plan=plan, + ingestion_metadata=ingestion_metadata, partner_id=partner_id, callback=callback) if revenue_obj: diff --git a/src/amplitude/plugin.py b/src/amplitude/plugin.py index 469521a..8f8e021 100644 --- a/src/amplitude/plugin.py +++ b/src/amplitude/plugin.py @@ -179,8 +179,10 @@ class ContextPlugin(Plugin): Methods: apply_context_data(event): Add SDK name and version to event.library. - execute(event): Set event default timestamp and insert_id if not set elsewhere. - Add SDK name and version to event.library. + execute(event): + - Set event default timestamp and insert_id if not set elsewhere. + - Add SDK name and version to event.library. + - Mount plan, ingestion_metadata if not yet. """ def __init__(self): @@ -201,7 +203,10 @@ def apply_context_data(self, event: BaseEvent): event.library = self.context_string def execute(self, event: BaseEvent) -> BaseEvent: - """Set event default timestamp and insert_id if not set elsewhere. Add SDK name and version to event.library. + """ + - Set event default timestamp and insert_id if not set elsewhere. + - Add SDK name and version to event.library. + - Mount plan, ingestion_metadata if not yet. Args: event (BaseEvent): The event to be processed. @@ -212,6 +217,8 @@ def execute(self, event: BaseEvent) -> BaseEvent: event["insert_id"] = str(uuid.uuid4()) if self.configuration.plan and (not event.plan): event["plan"] = self.configuration.plan + if self.configuration.ingestion_metadata and (not event.ingestion_metadata): + event["ingestion_metadata"] = self.configuration.ingestion_metadata self.apply_context_data(event) return event diff --git a/src/test/test_event.py b/src/test/test_event.py index a54a835..91c8c78 100644 --- a/src/test/test_event.py +++ b/src/test/test_event.py @@ -3,7 +3,7 @@ from unittest.mock import MagicMock from amplitude import EventOptions, BaseEvent, Identify, IdentifyEvent, GroupIdentifyEvent, Revenue, RevenueEvent, \ - Plan, constants + Plan, IngestionMetadata, constants class AmplitudeEventTestCase(unittest.TestCase): @@ -51,13 +51,26 @@ def test_base_event_set_plan_attribute_success(self): "event_type": "test_event", "plan": {"branch": "test_branch", "versionId": "v1.1"}}, event.get_event_body()) + def test_base_event_set_ingestion_metadata_attribute_success(self): + event = BaseEvent("test_event", user_id="test_user") + event["ingestion_metadata"] = IngestionMetadata(source_name="test_source", source_version="test_version") + self.assertEqual({"user_id": "test_user", + "event_type": "test_event", + "ingestion_metadata": {"source_name": "test_source", "source_version": "test_version"}}, event.get_event_body()) + def test_base_event_load_event_options_update_attributes_value(self): event = BaseEvent(event_type="test_event", event_properties={"properties1": "test"}, time=0) - event_options = EventOptions(user_id="test_user", device_id="test_device", time=10) + event_options = EventOptions( + user_id="test_user", + device_id="test_device", + time=10, + ingestion_metadata=IngestionMetadata(source_name="test_source", source_version="test_version") + ) event.load_event_options(event_options) expect_event_body = {"user_id": "test_user", "device_id": "test_device", "time": 10, + "ingestion_metadata": {"source_name": "test_source", "source_version": "test_version"}, "event_type": "test_event", "event_properties": {"properties1": "test"}} self.assertEqual(expect_event_body, event.get_event_body()) @@ -357,6 +370,34 @@ def test_event_with_plan_equal_to_expect_event_body(self): self.assertTrue("plan" in event) self.assertEqual({"event_type": "test_event", "plan": expect_plan}, event.get_event_body()) + def test_ingestion_metadata_initialize_with_wrong_type_error_log_with_get_body(self): + ingestion_metadata = IngestionMetadata(source_name="test_source", source_version=1) + expected = {"source_name": "test_source", "source_version": "1"} + with self.assertLogs(None, "ERROR") as cm: + ingestion_metadata_dict = ingestion_metadata.get_body() + self.assertTrue(isinstance(ingestion_metadata_dict, dict)) + self.assertFalse("source_version" in ingestion_metadata_dict) + self.assertNotEqual(expected, ingestion_metadata_dict) + self.assertEqual(["ERROR:amplitude:IngestionMetadata.source_version expected but received ."], + cm.output) + + def test_ingestion_metadata_initialize_with_none_value_equal_to_expect_body(self): + ingestion_metadata = IngestionMetadata(source_name="test_source", source_version=None) + expected = {"source_name": "test_source"} + self.assertEqual(expected, ingestion_metadata.get_body()) + + def test_ingestion_metadata_initialize_with_all_value_equal_to_expect_body(self): + ingestion_metadata = IngestionMetadata(source_name="test_source", source_version="test_version") + expected = {"source_name": "test_source", "source_version": "test_version"} + self.assertEqual(expected, ingestion_metadata.get_body()) + + def test_event_with_ingestion_metadata_equal_to_expect_event_body(self): + ingestion_metadata = IngestionMetadata(source_name="test_source", source_version="test_version") + expected = {"source_name": "test_source", "source_version": "test_version"} + event = BaseEvent("test_event", ingestion_metadata=ingestion_metadata) + self.assertTrue("ingestion_metadata" in event) + self.assertEqual({"event_type": "test_event", "ingestion_metadata": expected}, event.get_event_body()) + if __name__ == '__main__': unittest.main()