diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py index 367c8626707526..ff037a0176db48 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py @@ -245,7 +245,7 @@ class _SingleDatasetProfiler(BasicDatasetProfilerBase): query_combiner: SQLAlchemyQueryCombiner def _get_columns_to_profile(self) -> List[str]: - if self.config.profile_table_level_only: + if not self.config.any_field_level_metrics_enabled(): return [] # Compute columns to profile diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py b/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py index d1b96d6e503f06..d06bb9d5d21ef1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py @@ -7,6 +7,13 @@ from datahub.configuration.common import AllowDenyPattern, ConfigModel +_PROFILING_FLAGS_TO_REPORT = { + "turn_off_expensive_profiling_metrics", + "profile_table_level_only", + "query_combiner_enabled", + # all include_field_ flags are reported. +} + class GEProfilingConfig(ConfigModel): enabled: bool = Field( @@ -25,7 +32,6 @@ class GEProfilingConfig(ConfigModel): description="Whether to report datasets or dataset columns which were not profiled. Set to `True` for debugging purposes.", ) - # These settings will override the ones below. turn_off_expensive_profiling_metrics: bool = Field( default=False, description="Whether to turn off expensive profiling or not. This turns off profiling for quantiles, distinct_value_frequencies, histogram & sample_values. This also limits maximum number of fields being profiled to 10.", @@ -136,41 +142,46 @@ def ensure_field_level_settings_are_normalized( cls: "GEProfilingConfig", values: Dict[str, Any] ) -> Dict[str, Any]: max_num_fields_to_profile_key = "max_number_of_fields_to_profile" - table_level_profiling_only_key = "profile_table_level_only" max_num_fields_to_profile = values.get(max_num_fields_to_profile_key) - if values.get(table_level_profiling_only_key): - all_field_level_metrics: List[str] = [ - "include_field_null_count", - "include_field_distinct_count", - "include_field_min_value", - "include_field_max_value", - "include_field_mean_value", - "include_field_median_value", - "include_field_stddev_value", + + # Disable all field-level metrics. + if values.get("profile_table_level_only"): + for field_level_metric in cls.__fields__: + if field_level_metric.startswith("include_field_"): + values.setdefault(field_level_metric, False) + + assert ( + max_num_fields_to_profile is None + ), f"{max_num_fields_to_profile_key} should be set to None" + + # Disable expensive queries. + if values.get("turn_off_expensive_profiling_metrics"): + expensive_field_level_metrics: List[str] = [ "include_field_quantiles", "include_field_distinct_value_frequencies", "include_field_histogram", "include_field_sample_values", ] - # Suppress all field-level metrics - for field_level_metric in all_field_level_metrics: - values[field_level_metric] = False - assert ( - max_num_fields_to_profile is None - ), f"{max_num_fields_to_profile_key} should be set to None" + for expensive_field_metric in expensive_field_level_metrics: + values.setdefault(expensive_field_metric, False) - if values.get("turn_off_expensive_profiling_metrics"): - if not values.get(table_level_profiling_only_key): - expensive_field_level_metrics: List[str] = [ - "include_field_quantiles", - "include_field_distinct_value_frequencies", - "include_field_histogram", - "include_field_sample_values", - ] - for expensive_field_metric in expensive_field_level_metrics: - values[expensive_field_metric] = False - if max_num_fields_to_profile is None: - # We currently profile up to 10 non-filtered columns in this mode by default. - values[max_num_fields_to_profile_key] = 10 + # By default, we profile at most 10 non-filtered columns in this mode. + values.setdefault(max_num_fields_to_profile_key, 10) return values + + def any_field_level_metrics_enabled(self) -> bool: + return any( + getattr(self, field_name) + for field_name in self.__fields__ + if field_name.startswith("include_field_") + ) + + def config_for_telemetry(self) -> Dict[str, Any]: + config_dict = self.dict() + + return { + flag: config_dict[flag] + for flag in config_dict + if flag in _PROFILING_FLAGS_TO_REPORT or flag.startswith("include_field_") + } diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/profiling.py b/metadata-ingestion/src/datahub/ingestion/source/s3/profiling.py index 27b87733622fb1..9f534d77c1b3b6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/profiling.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/profiling.py @@ -124,24 +124,14 @@ def ensure_field_level_settings_are_normalized( cls: "DataLakeProfilerConfig", values: Dict[str, Any] ) -> Dict[str, Any]: max_num_fields_to_profile_key = "max_number_of_fields_to_profile" - table_level_profiling_only_key = "profile_table_level_only" max_num_fields_to_profile = values.get(max_num_fields_to_profile_key) - if values.get(table_level_profiling_only_key): - all_field_level_metrics: List[str] = [ - "include_field_null_count", - "include_field_min_value", - "include_field_max_value", - "include_field_mean_value", - "include_field_median_value", - "include_field_stddev_value", - "include_field_quantiles", - "include_field_distinct_value_frequencies", - "include_field_histogram", - "include_field_sample_values", - ] - # Suppress all field-level metrics - for field_level_metric in all_field_level_metrics: - values[field_level_metric] = False + + # Disable all field-level metrics. + if values.get("profile_table_level_only"): + for field_level_metric in cls.__fields__: + if field_level_metric.startswith("include_field_"): + values.setdefault(field_level_metric, False) + assert ( max_num_fields_to_profile is None ), f"{max_num_fields_to_profile_key} should be set to None" diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index 949daf075d3edf..c0f92503dfb253 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -455,23 +455,6 @@ def get_schema_metadata( "include_tables", ] -# flags to emit telemetry for -profiling_flags_to_report = [ - "turn_off_expensive_profiling_metrics", - "profile_table_level_only", - "include_field_null_count", - "include_field_min_value", - "include_field_max_value", - "include_field_mean_value", - "include_field_median_value", - "include_field_stddev_value", - "include_field_quantiles", - "include_field_distinct_value_frequencies", - "include_field_histogram", - "include_field_sample_values", - "query_combiner_enabled", -] - class SQLAlchemySource(StatefulIngestionSourceBase): """A Base class for all SQL Sources that use SQLAlchemy to extend""" @@ -508,13 +491,9 @@ def __init__(self, config: SQLAlchemyConfig, ctx: PipelineContext, platform: str ) if config.profiling.enabled: - telemetry.telemetry_instance.ping( "sql_profiling_config", - { - config_flag: config.profiling.dict().get(config_flag) - for config_flag in profiling_flags_to_report - }, + config.profiling.config_for_telemetry(), ) if self.config.domain: self.domain_registry = DomainRegistry(