Skip to content

Commit

Permalink
Revert "MINOR: add MongoDB sample data (open-metadata#15237)"
Browse files Browse the repository at this point in the history
This reverts commit ff2ecc5.
  • Loading branch information
TeddyCr committed Feb 28, 2024
1 parent f63ed3b commit 8ac2cef
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 310 deletions.
65 changes: 2 additions & 63 deletions ingestion/src/metadata/profiler/adaptors/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,80 +11,19 @@
"""
MongoDB adaptor for the NoSQL profiler.
"""
import json
from dataclasses import dataclass, field
from typing import Dict, List, Optional

from pymongo import MongoClient

from metadata.generated.schema.entity.data.table import Column, Table
from metadata.generated.schema.entity.data.table import Table
from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor


@dataclass
class Query:
database: str
collection: str
filter: dict = field(default_factory=dict)
limit: Optional[int] = None

def to_executable(self, client: MongoClient):
db = client[self.database]
collection = db[self.collection]
query = collection.find(self.filter)
if self.limit:
query = query.limit(self.limit)
return query


class MongoDB(NoSQLAdaptor):
"""A MongoDB client that serves as an adaptor for profiling data assets on MongoDB"""

def __init__(self, client: MongoClient):
self.client = client

def item_count(self, table: Table) -> int:
def get_row_count(self, table: Table) -> int:
db = self.client[table.databaseSchema.name]
collection = db[table.name.__root__]
return collection.count_documents({})

def scan(
self, table: Table, columns: List[Column], limit: int
) -> List[Dict[str, any]]:
return self.execute(
Query(
database=table.databaseSchema.name,
collection=table.name.__root__,
limit=limit,
)
)

def query(
self, table: Table, columns: List[Column], query: any, limit: int
) -> List[Dict[str, any]]:
try:
json_query = json.loads(query)
except json.JSONDecodeError:
raise ValueError("Invalid JSON query")
return self.execute(
Query(
database=table.databaseSchema.name,
collection=table.name.__root__,
filter=json_query,
)
)

def execute(self, query: Query) -> List[Dict[str, any]]:
records = list(query.to_executable(self.client))
result = []
for r in records:
result.append({c: self._json_safe(r.get(c)) for c in r})
return result

@staticmethod
def _json_safe(data: any):
try:
json.dumps(data)
return data
except Exception:
return str(data)
16 changes: 2 additions & 14 deletions ingestion/src/metadata/profiler/adaptors/nosql_adaptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,11 @@
NoSQL adaptor for the NoSQL profiler.
"""
from abc import ABC, abstractmethod
from typing import Dict, List

from metadata.generated.schema.entity.data.table import Column, Table
from metadata.generated.schema.entity.data.table import Table


class NoSQLAdaptor(ABC):
@abstractmethod
def item_count(self, table: Table) -> int:
raise NotImplementedError

@abstractmethod
def scan(
self, table: Table, columns: List[Column], limit: int
) -> List[Dict[str, any]]:
pass

def query(
self, table: Table, columns: List[Column], query: any, limit: int
) -> List[Dict[str, any]]:
def get_row_count(self, table: Table) -> int:
raise NotImplementedError
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from metadata.profiler.interface.profiler_interface import ProfilerInterface
from metadata.profiler.metrics.core import Metric, MetricTypes
from metadata.profiler.metrics.registry import Metrics
from metadata.profiler.processor.sampler.nosql.sampler import NoSQLSampler
from metadata.utils.logger import profiler_interface_registry_logger
from metadata.utils.sqa_like_column import SQALikeColumn

Expand All @@ -42,9 +41,8 @@ class NoSQLProfilerInterface(ProfilerInterface):

# pylint: disable=too-many-arguments

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.sampler = self._get_sampler()
def _get_sampler(self):
return None

def _compute_table_metrics(
self,
Expand Down Expand Up @@ -121,7 +119,6 @@ def compute_metrics(
row = self._get_metric_fn[metric_func.metric_type.value](
metric_func.metrics,
client,
column=metric_func.column,
)
except Exception as exc:
name = f"{metric_func.column if metric_func.column is not None else metric_func.table}"
Expand All @@ -137,23 +134,8 @@ def compute_metrics(
column = None
return row, column, metric_func.metric_type.value

def fetch_sample_data(self, table, columns: List[SQALikeColumn]) -> TableData:
return self.sampler.fetch_sample_data(columns)

def _get_sampler(self) -> NoSQLSampler:
"""Get NoSQL sampler from config"""
from metadata.profiler.processor.sampler.sampler_factory import ( # pylint: disable=import-outside-toplevel
sampler_factory_,
)

return sampler_factory_.create(
self.service_connection_config.__class__.__name__,
table=self.table,
client=factory.construct(self.connection),
profile_sample_config=self.profile_sample_config,
partition_details=self.partition_details,
profile_sample_query=self.profile_query,
)
def fetch_sample_data(self, table, columns: SQALikeColumn) -> TableData:
return None

def get_composed_metrics(
self, column: Column, metric: Metrics, column_results: Dict
Expand Down Expand Up @@ -194,10 +176,7 @@ def table(self):
return self.table_entity

def get_columns(self) -> List[Optional[SQALikeColumn]]:
return [
SQALikeColumn(name=c.name.__root__, type=c.dataType)
for c in self.table.columns
]
return []

def close(self):
self.connection.close()
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,4 @@ def df_fn(self, dfs=None):

@classmethod
def nosql_fn(cls, client: NoSQLAdaptor) -> Callable[[Table], int]:
return client.item_count
return client.get_row_count
72 changes: 0 additions & 72 deletions ingestion/src/metadata/profiler/processor/sampler/nosql/sampler.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,10 @@
from metadata.generated.schema.entity.services.connections.database.datalakeConnection import (
DatalakeConnection,
)
from metadata.generated.schema.entity.services.connections.database.mongoDBConnection import (
MongoDBConnection,
)
from metadata.generated.schema.entity.services.connections.database.trinoConnection import (
TrinoConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseConnection
from metadata.profiler.processor.sampler.nosql.sampler import NoSQLSampler
from metadata.profiler.processor.sampler.pandas.sampler import DatalakeSampler
from metadata.profiler.processor.sampler.sqlalchemy.bigquery.sampler import (
BigQuerySampler,
Expand Down Expand Up @@ -63,4 +59,3 @@ def create(
sampler_factory_.register(BigQueryConnection.__name__, BigQuerySampler)
sampler_factory_.register(DatalakeConnection.__name__, DatalakeSampler)
sampler_factory_.register(TrinoConnection.__name__, TrinoSampler)
sampler_factory_.register(MongoDBConnection.__name__, NoSQLSampler)
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from sqlalchemy import Column

from metadata.generated.schema.entity.data.table import Table, TableData
from metadata.generated.schema.entity.data.table import TableData
from metadata.profiler.api.models import ProfileSampleConfig
from metadata.utils.constants import SAMPLE_DATA_DEFAULT_COUNT
from metadata.utils.sqa_like_column import SQALikeColumn
Expand All @@ -29,7 +29,7 @@ class SamplerInterface(ABC):
def __init__(
self,
client,
table: Table,
table,
profile_sample_config: Optional[ProfileSampleConfig] = None,
partition_details: Optional[Dict] = None,
profile_sample_query: Optional[str] = None,
Expand Down
Loading

0 comments on commit 8ac2cef

Please sign in to comment.