Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'main' into issue-12959
Browse files Browse the repository at this point in the history
Abhishek332 authored Jan 19, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
2 parents be24f93 + 3f78e07 commit 996fbab
Showing 5 changed files with 434 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@
"binary": types.BINARY,
"char": types.CHAR,
"varchar": types.VARCHAR,
"decimal": types.DECIMAL,
}
)

@@ -61,6 +62,8 @@ def get_columns(
if attype == "decimal":
prec, scale = charlen.split(",")
args = (int(prec), int(scale))
elif attype.startswith("struct"):
args = []
else:
args = (int(charlen),)
coltype = coltype(*args)
23 changes: 13 additions & 10 deletions ingestion/src/metadata/profiler/processor/core.py
Original file line number Diff line number Diff line change
@@ -53,6 +53,7 @@
from metadata.profiler.orm.registry import NOT_COMPUTE
from metadata.profiler.processor.sample_data_handler import upload_sample_data
from metadata.utils.constants import SAMPLE_DATA_DEFAULT_COUNT
from metadata.utils.helpers import calculate_execution_time
from metadata.utils.logger import profiler_logger

logger = profiler_logger()
@@ -91,6 +92,7 @@ def __init__(
"""

self.profiler_interface = profiler_interface
self.source_config = self.profiler_interface.source_config
self.include_columns = include_columns
self.exclude_columns = exclude_columns
self._metrics = metrics
@@ -518,27 +520,27 @@ def compute_metrics(self) -> Profiler:

return self

def process(
self,
generate_sample_data: Optional[bool],
) -> ProfilerResponse:
def process(self) -> ProfilerResponse:
"""
Given a table, we will prepare the profiler for
all its columns and return all the run profilers
in a Dict in the shape {col_name: Profiler}
"""
logger.debug(
f"Computing profile metrics for {self.profiler_interface.table_entity.fullyQualifiedName.__root__}..."
)

self.compute_metrics()
if generate_sample_data:
if self.source_config.computeMetrics:
logger.debug(
f"Computing profile metrics for {self.profiler_interface.table_entity.fullyQualifiedName.__root__}..."
)
self.compute_metrics()

if self.source_config.generateSampleData:
sample_data = self.generate_sample_data()
else:
sample_data = None

profile = self.get_profile()
self._check_profile_and_handle(profile)
if self.source_config.computeMetrics:
self._check_profile_and_handle(profile)

table_profile = ProfilerResponse(
table=self.profiler_interface.table_entity,
@@ -548,6 +550,7 @@ def process(

return table_profile

@calculate_execution_time
def generate_sample_data(self) -> Optional[TableData]:
"""Fetch and ingest sample data
4 changes: 1 addition & 3 deletions ingestion/src/metadata/profiler/processor/processor.py
Original file line number Diff line number Diff line change
@@ -58,9 +58,7 @@ def _run(self, record: ProfilerSourceAndEntity) -> Either[ProfilerResponse]:
)

try:
profile: ProfilerResponse = profiler_runner.process(
self.source_config.generateSampleData,
)
profile: ProfilerResponse = profiler_runner.process()
except Exception as exc:
self.status.failed(
StackTraceError(
Loading

0 comments on commit 996fbab

Please sign in to comment.