Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: add MongoDB sample data #15237

Merged
merged 14 commits into from
Feb 22, 2024
65 changes: 63 additions & 2 deletions ingestion/src/metadata/profiler/adaptors/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,80 @@
"""
MongoDB adaptor for the NoSQL profiler.
"""
import json
from dataclasses import dataclass, field
from typing import Dict, List, Optional

from pymongo import MongoClient

from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.data.table import Column, Table
from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor


@dataclass
class Query:
database: str
collection: str
filter: dict = field(default_factory=dict)
limit: Optional[int] = None

def to_executable(self, client: MongoClient):
db = client[self.database]
collection = db[self.collection]
query = collection.find(self.filter)
if self.limit:
query = query.limit(self.limit)
return query


class MongoDB(NoSQLAdaptor):
"""A MongoDB client that serves as an adaptor for profiling data assets on MongoDB"""

def __init__(self, client: MongoClient):
self.client = client

def get_row_count(self, table: Table) -> int:
def item_count(self, table: Table) -> int:
db = self.client[table.databaseSchema.name]
collection = db[table.name.__root__]
return collection.count_documents({})

def scan(
self, table: Table, columns: List[Column], limit: int
) -> List[Dict[str, any]]:
return self.execute(
Query(
database=table.databaseSchema.name,
collection=table.name.__root__,
limit=limit,
)
)

def query(
self, table: Table, columns: List[Column], query: any, limit: int
) -> List[Dict[str, any]]:
try:
json_query = json.loads(query)
except json.JSONDecodeError:
raise ValueError("Invalid JSON query")
return self.execute(
Query(
database=table.databaseSchema.name,
collection=table.name.__root__,
filter=json_query,
)
)

def execute(self, query: Query) -> List[Dict[str, any]]:
records = list(query.to_executable(self.client))
result = []
for r in records:
result.append({c: self._json_safe(r.get(c)) for c in r})
return result

@staticmethod
def _json_safe(data: any):
try:
json.dumps(data)
return data
except Exception:
return str(data)
16 changes: 14 additions & 2 deletions ingestion/src/metadata/profiler/adaptors/nosql_adaptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,23 @@
NoSQL adaptor for the NoSQL profiler.
"""
from abc import ABC, abstractmethod
from typing import Dict, List

from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.data.table import Column, Table


class NoSQLAdaptor(ABC):
@abstractmethod
def get_row_count(self, table: Table) -> int:
def item_count(self, table: Table) -> int:
raise NotImplementedError

@abstractmethod
def scan(
self, table: Table, columns: List[Column], limit: int
) -> List[Dict[str, any]]:
pass

def query(
self, table: Table, columns: List[Column], query: any, limit: int
) -> List[Dict[str, any]]:
raise NotImplementedError
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from metadata.profiler.interface.profiler_interface import ProfilerInterface
from metadata.profiler.metrics.core import Metric, MetricTypes
from metadata.profiler.metrics.registry import Metrics
from metadata.profiler.processor.sampler.nosql.sampler import NoSQLSampler
from metadata.utils.logger import profiler_interface_registry_logger
from metadata.utils.sqa_like_column import SQALikeColumn

Expand All @@ -41,8 +42,9 @@ class NoSQLProfilerInterface(ProfilerInterface):

# pylint: disable=too-many-arguments

def _get_sampler(self):
return None
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.sampler = self._get_sampler()

def _compute_table_metrics(
self,
Expand Down Expand Up @@ -119,6 +121,7 @@ def compute_metrics(
row = self._get_metric_fn[metric_func.metric_type.value](
metric_func.metrics,
client,
column=metric_func.column,
)
except Exception as exc:
name = f"{metric_func.column if metric_func.column is not None else metric_func.table}"
Expand All @@ -134,8 +137,23 @@ def compute_metrics(
column = None
return row, column, metric_func.metric_type.value

def fetch_sample_data(self, table, columns: SQALikeColumn) -> TableData:
return None
def fetch_sample_data(self, table, columns: List[SQALikeColumn]) -> TableData:
return self.sampler.fetch_sample_data(columns)

def _get_sampler(self) -> NoSQLSampler:
"""Get NoSQL sampler from config"""
from metadata.profiler.processor.sampler.sampler_factory import ( # pylint: disable=import-outside-toplevel
sampler_factory_,
)

return sampler_factory_.create(
self.service_connection_config.__class__.__name__,
table=self.table,
client=factory.construct(self.connection),
profile_sample_config=self.profile_sample_config,
partition_details=self.partition_details,
profile_sample_query=self.profile_query,
)

def get_composed_metrics(
self, column: Column, metric: Metrics, column_results: Dict
Expand Down Expand Up @@ -176,7 +194,10 @@ def table(self):
return self.table_entity

def get_columns(self) -> List[Optional[SQALikeColumn]]:
return []
return [
SQALikeColumn(name=c.name.__root__, type=c.dataType)
for c in self.table.columns
]

def close(self):
self.connection.close()
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,4 @@ def df_fn(self, dfs=None):

@classmethod
def nosql_fn(cls, client: NoSQLAdaptor) -> Callable[[Table], int]:
return client.get_row_count
return client.item_count
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from typing import Dict, List, Optional, Tuple

from metadata.generated.schema.entity.data.table import ProfileSampleType, TableData
from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor
from metadata.profiler.processor.sampler.sampler_interface import SamplerInterface
from metadata.utils.constants import SAMPLE_DATA_DEFAULT_COUNT
from metadata.utils.sqa_like_column import SQALikeColumn


class NoSQLSampler(SamplerInterface):
client: NoSQLAdaptor

def _rdn_sample_from_user_query(self) -> List[Dict[str, any]]:
"""
Get random sample from user query
"""
limit = self._get_limit()
return self.client.query(
self.table, self.table.columns, self._profile_sample_query, limit
)

def _fetch_sample_data_from_user_query(self) -> TableData:
"""
Fetch sample data based on a user query. Assuming the enging has one (example: MongoDB)
If the engine does not support a custom query, an error will be raised.
"""
records = self._rdn_sample_from_user_query()
columns = [
SQALikeColumn(name=column.name.__root__, type=column.dataType)
for column in self.table.columns
]
rows, cols = self.transpose_records(records, columns)
return TableData(rows=rows, columns=[c.name for c in cols])

def random_sample(self):
pass

def fetch_sample_data(self, columns: List[SQALikeColumn]) -> TableData:
if self._profile_sample_query:
return self._fetch_sample_data_from_user_query()
return self._fetch_sample_data(columns)

def _fetch_sample_data(self, columns: List[SQALikeColumn]):
"""
returns sampled ometa dataframes
"""
limit = self._get_limit()
records = self.client.scan(self.table, self.table.columns, limit)
rows, cols = self.transpose_records(records, columns)
return TableData(rows=rows, columns=[col.name for col in cols])

def _get_limit(self) -> Optional[int]:
num_rows = self.client.item_count(self.table)
if self.profile_sample_type == ProfileSampleType.PERCENTAGE:
limit = num_rows * (self.profile_sample / 100)
elif self.profile_sample_type == ProfileSampleType.ROWS:
limit = self.profile_sample
else:
limit = SAMPLE_DATA_DEFAULT_COUNT
return limit

@staticmethod
def transpose_records(
records: List[Dict[str, any]], columns: List[SQALikeColumn]
) -> Tuple[List[List[any]], List[SQALikeColumn]]:
rows = []
for record in records:
row = []
for column in columns:
row.append(record.get(column.name))
rows.append(row)
return rows, columns
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@
from metadata.generated.schema.entity.services.connections.database.datalakeConnection import (
DatalakeConnection,
)
from metadata.generated.schema.entity.services.connections.database.mongoDBConnection import (
MongoDBConnection,
)
from metadata.generated.schema.entity.services.connections.database.trinoConnection import (
TrinoConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseConnection
from metadata.profiler.processor.sampler.nosql.sampler import NoSQLSampler
from metadata.profiler.processor.sampler.pandas.sampler import DatalakeSampler
from metadata.profiler.processor.sampler.sqlalchemy.bigquery.sampler import (
BigQuerySampler,
Expand Down Expand Up @@ -59,3 +63,4 @@ def create(
sampler_factory_.register(BigQueryConnection.__name__, BigQuerySampler)
sampler_factory_.register(DatalakeConnection.__name__, DatalakeSampler)
sampler_factory_.register(TrinoConnection.__name__, TrinoSampler)
sampler_factory_.register(MongoDBConnection.__name__, NoSQLSampler)
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from sqlalchemy import Column

from metadata.generated.schema.entity.data.table import TableData
from metadata.generated.schema.entity.data.table import Table, TableData
from metadata.profiler.api.models import ProfileSampleConfig
from metadata.utils.constants import SAMPLE_DATA_DEFAULT_COUNT
from metadata.utils.sqa_like_column import SQALikeColumn
Expand All @@ -29,7 +29,7 @@ class SamplerInterface(ABC):
def __init__(
self,
client,
table,
table: Table,
profile_sample_config: Optional[ProfileSampleConfig] = None,
partition_details: Optional[Dict] = None,
profile_sample_query: Optional[str] = None,
Expand Down
Loading
Loading