Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add ingestion_metadata field #36

Merged
merged 1 commit into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/flask_example/flaskapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def track_revenue(user_id):
@app.route("/flush")
def flush_event():
amp_client.flush()
return f"<p>All events flushed</p>"
return "<p>All events flushed</p>"


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion src/amplitude/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from amplitude.client import Amplitude
from amplitude.event import BaseEvent, EventOptions, Identify, Revenue, IdentifyEvent, \
GroupIdentifyEvent, RevenueEvent, Plan
GroupIdentifyEvent, RevenueEvent, Plan, IngestionMetadata
from amplitude.config import Config
from amplitude.constants import PluginType
from amplitude.plugin import EventPlugin, DestinationPlugin
7 changes: 5 additions & 2 deletions src/amplitude/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from typing import Optional, Callable

from amplitude import constants
from amplitude.event import BaseEvent, Plan
from amplitude.event import BaseEvent, Plan, IngestionMetadata
from amplitude.storage import InMemoryStorageProvider, StorageProvider, Storage


Expand All @@ -33,6 +33,7 @@ class Config:
storage_provider (amplitude.storage.StorageProvider, optional): Default to InMemoryStorageProvider.
Provide storage instance for events buffer.
plan (amplitude.event.Plan, optional): Tracking plan information. Default to None.
ingestion_metadata (amplitude.event.IngestionMetadata, optional): Ingestion metadata. Default to None.

Properties:
options: A dictionary contains minimum id length information. None if min_id_length not set.
Expand All @@ -55,7 +56,8 @@ def __init__(self, api_key: str = None,
use_batch: bool = False,
server_url: Optional[str] = None,
storage_provider: StorageProvider = InMemoryStorageProvider(),
plan: Plan = None):
plan: Plan = None,
ingestion_metadata: IngestionMetadata = None):
"""The constructor of Config class"""
self.api_key: str = api_key
self._flush_queue_size: int = flush_queue_size
Expand All @@ -71,6 +73,7 @@ def __init__(self, api_key: str = None,
self.storage_provider: StorageProvider = storage_provider
self.opt_out: bool = False
self.plan: Plan = plan
self.ingestion_metadata: IngestionMetadata = ingestion_metadata

def get_storage(self) -> Storage:
"""Use configured StorageProvider to create a Storage instance then return.
Expand Down
71 changes: 68 additions & 3 deletions src/amplitude/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
BaseEvent: Basic event class. Subclass of EventOptions.
Identify: A class used to create identify and group identify event.
IdentifyEvent: A special event class. Used to update user properties without an actual event.
GroupIdentifyEvent: A special event class. Used to update group properties without an actual event
GroupIdentifyEvent: A special event class. Used to update group properties without an actual event.
Revenue: A class used to create revenue event.
RevenueEvent: A special event class. Used to record revenue information.
Plan: Tracking plan info includes branch, source, version, version_id.
IngestionMetadata: Ingestion metadata includes source name, source version.
"""

import copy
Expand Down Expand Up @@ -71,7 +72,52 @@ def get_plan_body(self):
result[PLAN_KEY_MAPPING[key][0]] = self.__dict__[key]
else:
logger.error(
f"Plan.{key} expected {PLAN_KEY_MAPPING[key][1]} but received {type(self.__dict__[key])}.")
f"{type(self).__name__}.{key} expected {PLAN_KEY_MAPPING[key][1]} but received {type(self.__dict__[key])}.")
return result


INGESTION_METADATA_KEY_MAPPING = {
"source_name": ["source_name", str],
"source_version": ["source_version", str],
}


class IngestionMetadata:
"""IngestionMetadata holds metadata information. Instance of IngestionMetadata class can be value of event's `ingestion_metadata` attribute.

Args:
source_name (str, optional): Name of the ingestion source in metadata.
source_version (str, optional): Version of the ingestion source in metadata.

Methods:
get_body(): return a dict object that contains ingestion metadata information.
"""

def __init__(self, source_name: Optional[str] = None, source_version: Optional[str] = None):
"""The constructor for the IngestionMetadata class

Args:
source_name (str, optional): Name of the ingestion source in metadata.
source_version (str, optional): Version of the ingestion source in metadata.
"""
self.source_name: Optional[str] = source_name
self.source_version: Optional[str] = source_version

def get_body(self):
"""Convert this object instance to dict instance

Returns:
A dictionary with data of this object instance
"""
result = {}
for key in INGESTION_METADATA_KEY_MAPPING:
if not self.__dict__[key]:
continue
if isinstance(self.__dict__[key], INGESTION_METADATA_KEY_MAPPING[key][1]):
result[INGESTION_METADATA_KEY_MAPPING[key][0]] = self.__dict__[key]
else:
logger.error(
f"{type(self).__name__}.{key} expected {INGESTION_METADATA_KEY_MAPPING[key][1]} but received {type(self.__dict__[key])}.")
return result


Expand Down Expand Up @@ -113,6 +159,7 @@ def get_plan_body(self):
"insert_id": ["insert_id", str],
"library": ["library", str],
"plan": ["plan", Plan],
"ingestion_metadata": ["ingestion_metadata", IngestionMetadata],
"group_properties": ["group_properties", dict],
"partner_id": ["partner_id", str],
"version_name": ["version_name", str]
Expand Down Expand Up @@ -158,6 +205,7 @@ class EventOptions:
insert_id (str, optional): A unique identifier for the event. Events sent with the same insert_id and device_id
we have already seen before within the past 7 days will be deduplicated.
plan (Plan, optional): Tracking plan properties.
ingestion_metadata (IngestionMetadata, optional): Ingestion metadata.
partner_id (str, optional): The partner id.
version_name (str, optional): The version name.
callback (callable, optional): Event level callback method. Triggered when event is sent or failed. Take three
Expand All @@ -167,7 +215,7 @@ class EventOptions:
retry (int): The retry attempt of the event instance.

Methods:
get_event_body(): Retrun a dictionary with data of the event instance
get_event_body(): Return a dictionary with data of the event instance
callback(code, message): Trigger callback method of the event instance.
"""

Expand Down Expand Up @@ -203,6 +251,7 @@ def __init__(self, user_id: Optional[str] = None,
session_id: Optional[int] = None,
insert_id: Optional[str] = None,
plan: Optional[Plan] = None,
ingestion_metadata: Optional[IngestionMetadata] = None,
partner_id: Optional[str] = None,
version_name: Optional[str] = None,
callback=None):
Expand Down Expand Up @@ -240,6 +289,7 @@ def __init__(self, user_id: Optional[str] = None,
self.insert_id: Optional[str] = None
self.library: Optional[str] = None
self.plan: Optional[Plan] = None
self.ingestion_metadata: Optional[IngestionMetadata] = None
self.partner_id: Optional[str] = None
self.version_name: Optional[str] = None
self["user_id"] = user_id
Expand Down Expand Up @@ -274,6 +324,7 @@ def __init__(self, user_id: Optional[str] = None,
self["session_id"] = session_id
self["insert_id"] = insert_id
self["plan"] = plan
self["ingestion_metadata"] = ingestion_metadata
self["partner_id"] = partner_id
self["version_name"] = version_name
self.event_callback: Optional[Callable[[EventOptions, int, Optional[str]], None]] = callback
Expand Down Expand Up @@ -309,6 +360,8 @@ def get_event_body(self) -> dict:
event_body[value[0]] = self[key]
if "plan" in event_body:
event_body["plan"] = event_body["plan"].get_plan_body()
if "ingestion_metadata" in event_body:
event_body["ingestion_metadata"] = event_body["ingestion_metadata"].get_body()
for properties in ["user_properties", "event_properties", "group_properties"]:
if properties in event_body:
for key, value in event_body[properties].items():
Expand Down Expand Up @@ -394,6 +447,7 @@ class BaseEvent(EventOptions):
insert_id (str, optional): A unique identifier for the event. Events sent with the same insert_id and device_id
we have already seen before within the past 7 days will be deduplicated.
plan (Plan, optional): Tracking plan properties.
ingestion_metadata (IngestionMetadata, optional): Ingestion metadata.
partner_id (str, optional): The partner id.
callback (callable, optional): Event level callback method. Triggered when event is sent or failed. Take three
parameters: an event instance, an integer code of response status, an optional string message.
Expand Down Expand Up @@ -439,6 +493,7 @@ def __init__(self, event_type: str,
session_id: Optional[int] = None,
insert_id: Optional[str] = None,
plan: Optional[Plan] = None,
ingestion_metadata: Optional[IngestionMetadata] = None,
partner_id: Optional[str] = None,
callback: Optional[Callable[[EventOptions, int, Optional[str]], None]] = None):
"""The constructor of the BaseEvent class"""
Expand Down Expand Up @@ -474,6 +529,7 @@ def __init__(self, event_type: str,
session_id=session_id,
insert_id=insert_id,
plan=plan,
ingestion_metadata=ingestion_metadata,
partner_id=partner_id,
callback=callback)
self.event_type: str = event_type
Expand Down Expand Up @@ -730,6 +786,7 @@ class GroupIdentifyEvent(BaseEvent):
insert_id (str, optional): A unique identifier for the event. Events sent with the same insert_id and device_id
we have already seen before within the past 7 days will be deduplicated.
plan (Plan, optional): Tracking plan properties.
ingestion_metadata (IngestionMetadata, optional): Ingestion metadata.
partner_id (str, optional): The partner id.
callback (callable, optional): Event level callback method. Triggered when event is sent or failed. Take three
parameters: an event instance, an integer code of response status, an optional string message.
Expand Down Expand Up @@ -772,6 +829,7 @@ def __init__(self, user_id: Optional[str] = None,
session_id: Optional[int] = None,
insert_id: Optional[str] = None,
plan: Optional[Plan] = None,
ingestion_metadata: Optional[IngestionMetadata] = None,
partner_id: Optional[str] = None,
callback: Optional[Callable[[EventOptions, int, Optional[str]], None]] = None,
identify_obj: Optional[Identify] = None):
Expand Down Expand Up @@ -812,6 +870,7 @@ def __init__(self, user_id: Optional[str] = None,
session_id=session_id,
insert_id=insert_id,
plan=plan,
ingestion_metadata=ingestion_metadata,
partner_id=partner_id,
callback=callback)
if identify_obj:
Expand Down Expand Up @@ -862,6 +921,7 @@ class IdentifyEvent(BaseEvent):
insert_id (str, optional): A unique identifier for the event. Events sent with the same insert_id and device_id
we have already seen before within the past 7 days will be deduplicated.
plan (Plan, optional): Tracking plan properties.
ingestion_metadata (IngestionMetadata, optional): Ingestion metadata.
partner_id (str, optional): The partner id.
callback (callable, optional): Event level callback method. Triggered when event is sent or failed. Take three
parameters: an event instance, an integer code of response status, an optional string message.
Expand Down Expand Up @@ -904,6 +964,7 @@ def __init__(self, user_id: Optional[str] = None,
session_id: Optional[int] = None,
insert_id: Optional[str] = None,
plan: Optional[Plan] = None,
ingestion_metadata: Optional[IngestionMetadata] = None,
partner_id: Optional[str] = None,
callback: Optional[Callable[[EventOptions, int, Optional[str]], None]] = None,
identify_obj: Optional[Identify] = None):
Expand Down Expand Up @@ -943,6 +1004,7 @@ def __init__(self, user_id: Optional[str] = None,
session_id=session_id,
insert_id=insert_id,
plan=plan,
ingestion_metadata=ingestion_metadata,
partner_id=partner_id,
callback=callback)
if identify_obj:
Expand Down Expand Up @@ -1082,6 +1144,7 @@ class RevenueEvent(BaseEvent):
insert_id (str, optional): A unique identifier for the event. Events sent with the same insert_id and device_id
we have already seen before within the past 7 days will be deduplicated.
plan (Plan, optional): Tracking plan properties.
ingestion_metadata (IngestionMetadata, optional): Ingestion metadata.
partner_id (str, optional): The partner id.
callback (callable, optional): Event level callback method. Triggered when event is sent or failed. Take three
parameters: an event instance, an integer code of response status, an optional string message.
Expand Down Expand Up @@ -1124,6 +1187,7 @@ def __init__(self, user_id: Optional[str] = None,
session_id: Optional[int] = None,
insert_id: Optional[str] = None,
plan: Optional[Plan] = None,
ingestion_metadata: Optional[IngestionMetadata] = None,
partner_id: Optional[str] = None,
callback: Optional[Callable[[EventOptions, int, Optional[str]], None]] = None,
revenue_obj: Optional[Revenue] = None):
Expand Down Expand Up @@ -1164,6 +1228,7 @@ def __init__(self, user_id: Optional[str] = None,
session_id=session_id,
insert_id=insert_id,
plan=plan,
ingestion_metadata=ingestion_metadata,
partner_id=partner_id,
callback=callback)
if revenue_obj:
Expand Down
13 changes: 10 additions & 3 deletions src/amplitude/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,10 @@ class ContextPlugin(Plugin):

Methods:
apply_context_data(event): Add SDK name and version to event.library.
execute(event): Set event default timestamp and insert_id if not set elsewhere.
Add SDK name and version to event.library.
execute(event):
- Set event default timestamp and insert_id if not set elsewhere.
- Add SDK name and version to event.library.
- Mount plan, ingestion_metadata if not yet.
"""

def __init__(self):
Expand All @@ -201,7 +203,10 @@ def apply_context_data(self, event: BaseEvent):
event.library = self.context_string

def execute(self, event: BaseEvent) -> BaseEvent:
"""Set event default timestamp and insert_id if not set elsewhere. Add SDK name and version to event.library.
"""
- Set event default timestamp and insert_id if not set elsewhere.
- Add SDK name and version to event.library.
- Mount plan, ingestion_metadata if not yet.

Args:
event (BaseEvent): The event to be processed.
Expand All @@ -212,6 +217,8 @@ def execute(self, event: BaseEvent) -> BaseEvent:
event["insert_id"] = str(uuid.uuid4())
if self.configuration.plan and (not event.plan):
event["plan"] = self.configuration.plan
if self.configuration.ingestion_metadata and (not event.ingestion_metadata):
event["ingestion_metadata"] = self.configuration.ingestion_metadata
self.apply_context_data(event)
return event

Expand Down
Loading