Skip to content

Commit

Permalink
feat(ingest): simplify event IDs for function invocations (#4398)
Browse files Browse the repository at this point in the history
* Simplify function call events
Co-authored-by: Ravindra Lanka <rslanka@gmail.com>

Co-authored-by: Ravindra Lanka <rslanka@gmail.com>
  • Loading branch information
kevinhu and rslanka authored Mar 23, 2022
1 parent 9fbb521 commit a7b0275
Showing 1 changed file with 46 additions and 25 deletions.
71 changes: 46 additions & 25 deletions metadata-ingestion/src/datahub/telemetry/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import logging
import os
import platform
import sys
import uuid
from functools import wraps
from pathlib import Path
Expand Down Expand Up @@ -138,26 +137,24 @@ def init_tracking(self) -> None:

def ping(
self,
action: str,
event_name: str,
properties: Optional[Dict[str, Any]] = None,
) -> None:
"""
Send a single telemetry event.
Args:
category (str): category for the event
action (str): action taken
label (Optional[str], optional): label for the event
value (Optional[int], optional): value for the event
event_name (str): name of the event to send.
properties (Optional[Dict[str, Any]]): metadata for the event
"""

if not self.enabled or self.mp is None:
return

# send event
try:
logger.debug("Sending Telemetry")
self.mp.track(self.client_id, action, properties)
logger.info("Sending Telemetry")
self.mp.track(self.client_id, event_name, properties)

except Exception as e:
logger.debug(f"Error reporting telemetry: {e}")
Expand Down Expand Up @@ -186,37 +183,61 @@ def with_telemetry(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:

action = f"function:{func.__module__}.{func.__name__}"
function = f"{func.__module__}.{func.__name__}"

telemetry_instance.init_tracking()
telemetry_instance.ping(action)
telemetry_instance.ping(
"function-call", {"function": function, "status": "start"}
)
try:
res = func(*args, **kwargs)
telemetry_instance.ping(f"{action}:result", {"status": "completed"})
return res
# Catch general exceptions
except Exception as e:
telemetry_instance.ping(
f"{action}:result", {"status": "error", "error": get_full_class_name(e)}
"function-call",
{"function": function, "status": "completed"},
)
raise e
return res
# System exits (used in ingestion and Docker commands) are not caught by the exception handler,
# so we need to catch them here.
except SystemExit as e:
# Forward successful exits
if e.code == 0:
telemetry_instance.ping(f"{action}:result", {"status": "completed"})
sys.exit(0)
# 0 or None imply success
if not e.code:
telemetry_instance.ping(
"function-call",
{
"function": function,
"status": "completed",
},
)
# Report failed exits
else:
telemetry_instance.ping(
f"{action}:result",
{"status": "error", "error": get_full_class_name(e)},
"function-call",
{
"function": function,
"status": "error",
"error": get_full_class_name(e),
},
)
sys.exit(e.code)
raise e
# Catch SIGINTs
except KeyboardInterrupt:
telemetry_instance.ping(f"{action}:result", {"status": "cancelled"})
sys.exit(0)
except KeyboardInterrupt as e:
telemetry_instance.ping(
"function-call",
{"function": function, "status": "cancelled"},
)
raise e

# Catch general exceptions
except Exception as e:
telemetry_instance.ping(
"function-call",
{
"function": function,
"status": "error",
"error": get_full_class_name(e),
},
)
raise e

return wrapper

0 comments on commit a7b0275

Please sign in to comment.