Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create new Auto Classification Workflow #18610

Merged
merged 36 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
26a9425
skeleton
pmbrull Nov 10, 2024
402b64e
separate sampler
pmbrull Nov 11, 2024
9ad37c4
separate sampler
pmbrull Nov 12, 2024
16c2a58
separate sampler
pmbrull Nov 12, 2024
0a79f76
Merge remote-tracking branch 'upstream/main' into separate-workflow
pmbrull Nov 12, 2024
f02043d
merge
pmbrull Nov 12, 2024
85ab4f5
workflow
pmbrull Nov 12, 2024
bd98a7b
schemas
pmbrull Nov 12, 2024
f5519b4
prep
pmbrull Nov 14, 2024
ebbfc00
Merge remote-tracking branch 'upstream/main' into separate-workflow
pmbrull Nov 14, 2024
bd619f1
fix
pmbrull Nov 14, 2024
da82e91
fix
pmbrull Nov 14, 2024
c8956cb
copy db
pmbrull Nov 14, 2024
3a56ec8
fix tests and remove unused method
pmbrull Nov 15, 2024
76db1b1
fix tests
pmbrull Nov 15, 2024
9dd20ee
Merge remote-tracking branch 'upstream/main' into separate-workflow
pmbrull Nov 15, 2024
f6d6e83
fix tests
pmbrull Nov 15, 2024
56f6643
linting and tests
pmbrull Nov 16, 2024
48b8b64
rename workflow
pmbrull Nov 16, 2024
d58e916
rename
pmbrull Nov 16, 2024
11f8156
prepare ingestion pipeline
pmbrull Nov 16, 2024
fc01ac6
migrate profiler pipeline
pmbrull Nov 16, 2024
460d1d7
Merge branch 'main' into separate-workflow
pmbrull Nov 16, 2024
7117f1d
fix
pmbrull Nov 16, 2024
ed193e1
Merge remote-tracking branch 'origin/separate-workflow' into separate…
pmbrull Nov 16, 2024
dfcbb78
fix
pmbrull Nov 16, 2024
4091660
compat
pmbrull Nov 17, 2024
1c5309f
compat
pmbrull Nov 17, 2024
79958b8
fix tests and comments
pmbrull Nov 18, 2024
621efef
Merge remote-tracking branch 'upstream/main' into separate-workflow
pmbrull Nov 18, 2024
9c0f78b
tests
pmbrull Nov 18, 2024
669d119
Merge remote-tracking branch 'upstream/main' into separate-workflow
pmbrull Nov 18, 2024
d5a5bf4
tests
pmbrull Nov 18, 2024
d4042a1
SampleData
pmbrull Nov 18, 2024
1983af6
SampleData
pmbrull Nov 18, 2024
74ae007
clean test
pmbrull Nov 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions bootstrap/sql/migrations/native/1.6.0/mysql/schemaChanges.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1743,3 +1743,7 @@ UPDATE ingestion_pipeline_entity
SET json = JSON_REMOVE(json, '$.sourceConfig.config.overrideViewLineage')
WHERE JSON_EXTRACT(json, '$.pipelineType') = 'metadata';

-- classification and sampling configs from the profiler pipelines
UPDATE ingestion_pipeline_entity
SET json = JSON_REMOVE(json, '$.sourceConfig.config.processPiiSensitive', '$.sourceConfig.config.confidence', '$.sourceConfig.config.generateSampleData')
WHERE JSON_EXTRACT(json, '$.pipelineType') = 'profiler';
Original file line number Diff line number Diff line change
Expand Up @@ -1729,3 +1729,8 @@ CREATE INDEX idx_event_subscription_id ON successful_sent_change_events (event_s
UPDATE ingestion_pipeline_entity
SET json = json::jsonb #- '{sourceConfig,config,overrideViewLineage}'
WHERE json #>> '{pipelineType}' = 'metadata';

-- classification and sampling configs from the profiler pipelines
UPDATE ingestion_pipeline_entity
SET json = json::jsonb #- '{sourceConfig,config,processPiiSensitive}' #- '{sourceConfig,config,confidence}' #- '{sourceConfig,config,generateSampleData}'
WHERE json #>> '{pipelineType}' = 'profiler';
5 changes: 3 additions & 2 deletions ingestion/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,10 @@ ignore-paths = [
"ingestion/src/metadata/utils/datalake/datalake_utils.py",
"ingestion/src/metadata/great_expectations/action.py",
"ingestion/src/metadata/profiler/interface/nosql/profiler_interface.py",
"ingestion/src/metadata/profiler/processor/sampler/nosql/sampler.py",
".*/src/metadata/ingestion/source/.*/service_spec.py",
"ingestion/src/metadata/profiler/metrics",
"ingestion/src/metadata/profiler/source/databricks",

# metadata ingestion sources
"ingestion/src/metadata/ingestion/source/api/rest/connection.py",
"ingestion/src/metadata/ingestion/source/api/rest/metadata.py",
Expand Down Expand Up @@ -262,6 +261,7 @@ ignore = [
"src/metadata/parsers/*",
"src/metadata/pii/*",
"src/metadata/profiler/*",
"src/metadata/sampler/*",
"src/metadata/readers/*",
"src/metadata/timer/*",
"src/metadata/utils/*",
Expand All @@ -273,6 +273,7 @@ ignore = [
"src/metadata/workflow/metadata.py",
"src/metadata/workflow/profiler.py",
"src/metadata/workflow/usage.py",
"src/metadata/workflow/classification.py",
"src/metadata/workflow/workflow_status_mixin.py",
]

Expand Down
52 changes: 52 additions & 0 deletions ingestion/src/metadata/cli/classify.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Sampler utility for the metadata CLI
"""
import sys
import traceback
from pathlib import Path

from metadata.config.common import load_config_file
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineType,
)
from metadata.utils.logger import cli_logger, redacted_config
from metadata.workflow.classification import AutoClassificationWorkflow
from metadata.workflow.workflow_init_error_handler import WorkflowInitErrorHandler

logger = cli_logger()


def run_classification(config_path: Path) -> None:
"""
Run the sampler workflow from a config path
to a JSON or YAML file
:param config_path: Path to load JSON config
"""

config_dict = None
try:
config_dict = load_config_file(config_path)
logger.debug("Using workflow config:\n%s", redacted_config(config_dict))
workflow = AutoClassificationWorkflow.create(config_dict)
except Exception as exc:
logger.debug(traceback.format_exc())
WorkflowInitErrorHandler.print_init_error(
exc, config_dict, PipelineType.metadata
)
sys.exit(1)

workflow.execute()
workflow.stop()
workflow.print_status()
workflow.raise_from_status()
9 changes: 9 additions & 0 deletions ingestion/src/metadata/cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from metadata.__version__ import get_metadata_version
from metadata.cli.app import run_app
from metadata.cli.classify import run_classification
from metadata.cli.dataquality import run_test
from metadata.cli.ingest import run_ingest
from metadata.cli.lineage import run_lineage
Expand All @@ -40,6 +41,7 @@ class MetadataCommands(Enum):
WEBHOOK = "webhook"
LINEAGE = "lineage"
APP = "app"
AUTO_CLASSIFICATION = "classify"


RUN_PATH_METHODS = {
Expand All @@ -49,6 +51,7 @@ class MetadataCommands(Enum):
MetadataCommands.PROFILE.value: run_profiler,
MetadataCommands.TEST.value: run_test,
MetadataCommands.APP.value: run_app,
MetadataCommands.AUTO_CLASSIFICATION.value: run_classification,
}


Expand Down Expand Up @@ -124,6 +127,12 @@ def get_parser(args: Optional[List[str]] = None):
help="Workflow for running external applications",
)
)
create_common_config_parser_args(
sub_parser.add_parser(
MetadataCommands.AUTO_CLASSIFICATION.value,
help="Workflow for running auto classification",
)
)
webhook_args(
sub_parser.add_parser(
MetadataCommands.WEBHOOK.value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
Interfaces with database for all database engine
supporting sqlalchemy abstraction layer
"""

from metadata.data_quality.builders.i_validator_builder import IValidatorBuilder
from metadata.data_quality.builders.pandas_validator_builder import (
PandasValidatorBuilder,
Expand All @@ -25,8 +24,8 @@
)
from metadata.generated.schema.tests.testCase import TestCase
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection
from metadata.mixins.pandas.pandas_mixin import PandasInterfaceMixin
from metadata.sampler.sampler_interface import SamplerInterface
from metadata.utils.logger import test_suite_logger

logger = test_suite_logger()
Expand All @@ -43,14 +42,16 @@ def __init__(
self,
service_connection_config: DatalakeConnection,
ometa_client: OpenMetadata,
table_entity: Table = None,
**kwargs, # pylint: disable=unused-argument
sampler: SamplerInterface,
table_entity: Table,
**__,
):
super().__init__()
self.table_entity = table_entity

self.ometa_client = ometa_client
self.service_connection_config = service_connection_config
super().__init__(
service_connection_config,
ometa_client,
sampler,
table_entity,
)

(
self.table_sample_query,
Expand All @@ -59,12 +60,7 @@ def __init__(
) = self._get_table_config()

# add partition logic to test suite
self.dfs = self.return_ometa_dataframes_sampled(
service_connection_config=self.service_connection_config,
client=get_connection(self.service_connection_config).client._client,
table=self.table_entity,
profile_sample_config=self.table_sample_config,
)
self.dfs = self.sampler.table
if self.dfs and self.table_partition_config:
self.dfs = self.get_partitioned_df(self.dfs)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.mixins.sqalchemy.sqa_mixin import SQAInterfaceMixin
from metadata.profiler.processor.runner import QueryRunner
from metadata.profiler.processor.sampler.sqlalchemy.sampler import SQASampler
from metadata.sampler.sampler_interface import SamplerInterface
from metadata.utils.constants import TEN_MIN
from metadata.utils.logger import test_suite_logger
from metadata.utils.ssl_manager import get_ssl_connection
Expand All @@ -49,23 +49,25 @@ def __init__(
self,
service_connection_config: DatabaseConnection,
ometa_client: OpenMetadata,
sampler: SamplerInterface,
table_entity: Table = None,
sqa_metadata=None,
orm_table=None,
):
super().__init__()
self.ometa_client = ometa_client
self.table_entity = table_entity
self.service_connection_config = service_connection_config
super().__init__(
service_connection_config,
ometa_client,
sampler,
table_entity,
)
self.create_session()
self._table = self._convert_table_to_orm_object(sqa_metadata)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we kept the logic to pass the Metadata object when creating orm object. We need it to prevent naming conflict when creating the orm object if I well remember

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the ORM table is now an input argument https://github.com/open-metadata/OpenMetadata/pull/18610/files#diff-e8a8a48b444e6246572ad762618b52eed69b4b7a75ffba896cf8314f56c15990R132

Before we sent the sqa_metadata object and let the interfaces (profiler/test suite) create the ORM. Now, since the sampler needs the ORM table too, we create it at the source and pass it as an arg to the 3 interfaces

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently for the profiler each database should have its own instance of the Metadata. So that if you are profiling service abc with 2 databases where each database has a table name FOO then we won't have conflict. #6009.

self._table = orm_table

(
self.table_sample_query,
self.table_sample_config,
self.table_partition_config,
) = self._get_table_config()

self._sampler = self._create_sampler()
self._runner = self._create_runner()

def create_session(self):
Expand Down Expand Up @@ -96,15 +98,6 @@ def runner(self) -> QueryRunner:
"""
return self._runner

@property
def sampler(self) -> SQASampler:
"""getter method for the Runner object

Returns:
Sampler: sampler object
"""
return self._sampler

@property
def table(self):
"""getter method for the table object
Expand All @@ -114,21 +107,6 @@ def table(self):
"""
return self._table

def _create_sampler(self) -> SQASampler:
"""Create sampler instance"""
from metadata.profiler.processor.sampler.sampler_factory import ( # pylint: disable=import-outside-toplevel
sampler_factory_,
)

return sampler_factory_.create(
self.service_connection_config.__class__.__name__,
client=self.session,
table=self.table,
profile_sample_config=self.table_sample_config,
partition_details=self.table_partition_config,
profile_sample_query=self.table_sample_query,
)

def _create_runner(self) -> None:
"""Create a QueryRunner Instance"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@
from metadata.generated.schema.tests.testCase import TestCase
from metadata.generated.schema.tests.testDefinition import TestDefinition
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.profiler.api.models import ProfileSampleConfig
from metadata.sampler.sampler_interface import SamplerInterface
from metadata.utils.logger import test_suite_logger
from metadata.utils.partition import get_partition_details

logger = test_suite_logger()

Expand All @@ -43,22 +42,37 @@ class TestSuiteInterface(ABC):

runtime_params_setter_fact = RuntimeParameterSetterFactory

def __init__(self):
"""This constructor exists to staisfy the linter. Its fields should be set in the implementation class"""
self.ometa_client: DatabaseConnection = None
self.ometa_client: OpenMetadata = None
self.table_entity: Table = None
self.service_connection_config: DatabaseConnection = None

@property
def sampler(self):
"""Get the sampler object
def __init__(
self,
service_connection_config: DatabaseConnection,
ometa_client: OpenMetadata,
sampler: SamplerInterface,
table_entity: Table,
):
"""Required attribute for the interface"""
self.ometa_client = ometa_client
self.service_connection_config = service_connection_config
self.table_entity = table_entity
self.sampler = sampler

Note: Overriden in the implementation class. This should be removed from the interface. It has been
implemented as the RuntimeParameterSetter takes the sampler as an argument, though we may want to
remove that dependency.
"""
return None
@classmethod
def create(
cls,
service_connection_config: DatabaseConnection,
ometa_client: OpenMetadata,
sampler: SamplerInterface,
table_entity: Table,
*args,
**kwargs,
):
return cls(
service_connection_config,
ometa_client,
sampler,
table_entity,
*args,
**kwargs,
)

@abstractmethod
def _get_validator_builder(
Expand Down Expand Up @@ -123,37 +137,10 @@ def run_test_case(self, test_case: TestCase) -> Optional[TestCaseResult]:
)
raise RuntimeError(err)

def _get_sample_query(self) -> Optional[str]:
"""Get the sampling query for the data quality tests

Args:
entity (Table): _description_
"""
if self.table_entity.tableProfilerConfig:
return self.table_entity.tableProfilerConfig.profileQuery

return None

def _get_profile_sample(self) -> Optional[ProfileSampleConfig]:
try:
if self.table_entity.tableProfilerConfig.profileSample:
return ProfileSampleConfig(
profile_sample=self.table_entity.tableProfilerConfig.profileSample,
profile_sample_type=self.table_entity.tableProfilerConfig.profileSampleType,
)
except AttributeError:
# if tableProfilerConfig is None it will indicate that the table has not profiler config
# hence we can return None
return None
return None

def _get_table_config(self):
"""Get the sampling configuration for the data quality tests"""
sample_query = self._get_sample_query()
sample_config = None
partition_config = None
if not sample_query:
sample_config = self._get_profile_sample()
partition_config = get_partition_details(self.table_entity)

return sample_query, sample_config, partition_config
return (
self.sampler.sample_query,
self.sampler.sample_config,
self.sampler.partition_details,
)
Loading
Loading