Skip to content

Commit

Permalink
Fix up issues
Browse files Browse the repository at this point in the history
  • Loading branch information
bnchrch committed Aug 1, 2023
1 parent ec3e98e commit 054f419
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,13 @@ def persisted_cloud_registry(context: OpExecutionContext) -> Output[ConnectorReg

@asset(required_resource_keys={"latest_cloud_registry_gcs_blob"}, group_name=GROUP_NAME)
@sentry.instrument_asset_op
def latest_cloud_registry(latest_cloud_registry_dict: dict) -> ConnectorRegistryV0:
def latest_cloud_registry(_context: OpExecutionContext, latest_cloud_registry_dict: dict) -> ConnectorRegistryV0:
return ConnectorRegistryV0.parse_obj(latest_cloud_registry_dict)


@asset(required_resource_keys={"latest_oss_registry_gcs_blob"}, group_name=GROUP_NAME)
@sentry.instrument_asset_op
def latest_oss_registry(latest_oss_registry_dict: dict) -> ConnectorRegistryV0:
def latest_oss_registry(_context: OpExecutionContext, latest_oss_registry_dict: dict) -> ConnectorRegistryV0:
return ConnectorRegistryV0.parse_obj(latest_oss_registry_dict)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,39 +133,39 @@ def augment_and_normalize_connector_dataframes(


@asset(group_name=GROUP_NAME)
@sentry.instrument_asset_op
@sentry_sdk.trace
def cloud_sources_dataframe(latest_cloud_registry: ConnectorRegistryV0) -> OutputDataFrame:
latest_cloud_registry_dict = to_json_sanitized_dict(latest_cloud_registry)
sources = latest_cloud_registry_dict["sources"]
return output_dataframe(pd.DataFrame(sources))


@asset(group_name=GROUP_NAME)
@sentry.instrument_asset_op
@sentry_sdk.trace
def oss_sources_dataframe(latest_oss_registry: ConnectorRegistryV0) -> OutputDataFrame:
latest_oss_registry_dict = to_json_sanitized_dict(latest_oss_registry)
sources = latest_oss_registry_dict["sources"]
return output_dataframe(pd.DataFrame(sources))


@asset(group_name=GROUP_NAME)
@sentry.instrument_asset_op
@sentry_sdk.trace
def cloud_destinations_dataframe(latest_cloud_registry: ConnectorRegistryV0) -> OutputDataFrame:
latest_cloud_registry_dict = to_json_sanitized_dict(latest_cloud_registry)
destinations = latest_cloud_registry_dict["destinations"]
return output_dataframe(pd.DataFrame(destinations))


@asset(group_name=GROUP_NAME)
@sentry.instrument_asset_op
@sentry_sdk.trace
def oss_destinations_dataframe(latest_oss_registry: ConnectorRegistryV0) -> OutputDataFrame:
latest_oss_registry_dict = to_json_sanitized_dict(latest_oss_registry)
destinations = latest_oss_registry_dict["destinations"]
return output_dataframe(pd.DataFrame(destinations))


@asset(group_name=GROUP_NAME)
@sentry.instrument_asset_op
@sentry_sdk.trace
def all_sources_dataframe(cloud_sources_dataframe, oss_sources_dataframe, github_connector_folders) -> pd.DataFrame:
"""
Merge the cloud and oss sources registries into a single dataframe.
Expand All @@ -181,7 +181,7 @@ def all_sources_dataframe(cloud_sources_dataframe, oss_sources_dataframe, github


@asset(group_name=GROUP_NAME)
@sentry.instrument_asset_op
@sentry_sdk.trace
def all_destinations_dataframe(cloud_destinations_dataframe, oss_destinations_dataframe, github_connector_folders) -> pd.DataFrame:
"""
Merge the cloud and oss destinations registries into a single dataframe.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
import sentry_sdk
import functools

from dagster import OpExecutionContext, SensorEvaluationContext, get_dagster_logger
from dagster import OpExecutionContext, SensorEvaluationContext, AssetExecutionContext, get_dagster_logger

sentry_logger = get_dagster_logger("sentry")


def setup_dagster_sentry():
"""
Setup the sentry SDK for Dagster if SENTRY_DSN is defined for the environment.
Expand Down Expand Up @@ -51,6 +52,35 @@ def setup_dagster_sentry():
)


def _is_context(context):
"""
Check if the given object is a valid context object.
"""
return (
isinstance(context, OpExecutionContext)
or isinstance(context, SensorEvaluationContext)
or isinstance(context, AssetExecutionContext)
)


def _get_context_from_args_kwargs(args, kwargs):
"""
Given args and kwargs from a function call, return the context object if it exists.
"""
# if the first arg is a context object, return it
if len(args) > 0 and _is_context(args[0]):
return args[0]

# if the kwargs contain a context object, return it
if "context" in kwargs and _is_context(kwargs["context"]):
return kwargs["context"]

# otherwise raise an error
raise Exception(
f"No context provided to Sentry Transaction. When using @instrument, ensure that the asset/op has a context as the first argument."
)


def _log_asset_or_op_context(context: OpExecutionContext):
"""
Capture Dagster OP context for Sentry Error handling
Expand Down Expand Up @@ -90,6 +120,7 @@ def _log_sensor_context(context: SensorEvaluationContext):
sentry_sdk.set_tag("sensor_name", context._sensor_name)
sentry_sdk.set_tag("run_id", context.cursor)


def _with_sentry_op_asset_transaction(context: OpExecutionContext):
"""
Start or continue a Sentry transaction for the Dagster Op/Asset
Expand All @@ -110,27 +141,33 @@ def _with_sentry_op_asset_transaction(context: OpExecutionContext):
name=job_name,
)


# DECORATORS


def capture_asset_op_context(func):
"""
Capture Dagster OP context for Sentry Error handling
"""

@functools.wraps(func)
def wrapped_fn(*args, **kwargs):
context = kwargs["context"]
context = _get_context_from_args_kwargs(args, kwargs)
_log_asset_or_op_context(context)
return func(*args, **kwargs)

return wrapped_fn


def capture_sensor_context(func):
"""
Capture Dagster Sensor context for Sentry Error handling
"""

@functools.wraps(func)
def wrapped_fn(*args, **kwargs):
_log_sensor_context(kwargs["context"])
context = _get_context_from_args_kwargs(args, kwargs)
_log_sensor_context(context)
return func(*args, **kwargs)

return wrapped_fn
Expand Down Expand Up @@ -168,28 +205,13 @@ def start_sentry_transaction(func):
"""

def wrapped_fn(*args, **kwargs):
context = kwargs["context"]
context = _get_context_from_args_kwargs(args, kwargs)
with _with_sentry_op_asset_transaction(context):
return func(*args, **kwargs)

return wrapped_fn


def ensure_context_arg(func):
"""
Ensure that the Dagster Op/Asset has a context as the first argument.
"""
@functools.wraps(func)
def wrapped_fn(*args, **kwargs):
if len(args) == 0:
raise Exception(
f"No context provided to Sentry Transaction for {func.__name__}. When using @instrument, ensure that the asset/op has a context as the first argument."
)
return func(*args, **kwargs)

return wrapped_fn


def instrument_asset_op(func):
"""
Instrument a Dagster Op/Asset with Sentry.
Expand All @@ -205,7 +227,6 @@ def instrument_asset_op(func):
"""

@functools.wraps(func)
@ensure_context_arg
@start_sentry_transaction
@capture_asset_op_context
@capture_exceptions
Expand All @@ -214,6 +235,7 @@ def wrapped_fn(*args, **kwargs):

return wrapped_fn


def instrument_sensor(func):
"""
Instrument a Dagster Sensor with Sentry.
Expand All @@ -227,7 +249,6 @@ def instrument_sensor(func):
"""


@functools.wraps(func)
@capture_sensor_context
@capture_exceptions
Expand Down

0 comments on commit 054f419

Please sign in to comment.