Skip to content

Commit

Permalink
MINOR: Mongodb column profile (#15252)
Browse files Browse the repository at this point in the history
* feat(nosql-profiler): row count

1. Implemented the NoSQLProfilerInterface as an entrypoint for the nosql profiler.
2. Added the NoSQLMetric as an abstract class.
3. Implemented the interface for the MongoDB database source.
4. Implemented an e2e test using testcontainers.
  • Loading branch information
sushi30 authored Feb 26, 2024
1 parent 22d0b08 commit 50b2709
Show file tree
Hide file tree
Showing 9 changed files with 299 additions and 52 deletions.
98 changes: 90 additions & 8 deletions ingestion/src/metadata/profiler/adaptors/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,43 @@
MongoDB adaptor for the NoSQL profiler.
"""
import json
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Dict, List, Optional
from enum import Enum
from typing import TYPE_CHECKING, Dict, List, Optional, Union

from pydantic import BaseModel, Field
from pymongo.command_cursor import CommandCursor
from pymongo.cursor import Cursor

from metadata.generated.schema.entity.data.table import Column, Table
from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor
from metadata.utils.sqa_like_column import SQALikeColumn

if TYPE_CHECKING:
from pymongo import MongoClient
else:
MongoClient = None # pylint: disable=invalid-name


@dataclass
class Query:
class AggregationFunction(Enum):
SUM = "$sum"
MEAN = "$avg"
COUNT = "$count"
MAX = "$max"
MIN = "$min"


class Executable(BaseModel):
def to_executable(self, client: MongoClient) -> Union[CommandCursor, Cursor]:
raise NotImplementedError


class Query(Executable):
database: str
collection: str
filter: dict = field(default_factory=dict)
filter: dict = Field(default_factory=dict)
limit: Optional[int] = None

def to_executable(self, client: MongoClient):
def to_executable(self, client: MongoClient) -> Cursor:
db = client[self.database]
collection = db[self.collection]
query = collection.find(self.filter)
Expand All @@ -40,6 +57,30 @@ def to_executable(self, client: MongoClient):
return query


class Aggregation(Executable):
database: str
collection: str
column: str
aggregations: List[AggregationFunction]

def to_executable(self, client: MongoClient) -> CommandCursor:
db = client[self.database]
collection = db[self.collection]
return collection.aggregate(
[
{
"$group": {
"_id": None,
**{
a.name.lower(): {a.value: f"${self.column}"}
for a in self.aggregations
},
}
}
]
)


class MongoDB(NoSQLAdaptor):
"""A MongoDB client that serves as an adaptor for profiling data assets on MongoDB"""

Expand Down Expand Up @@ -77,7 +118,48 @@ def query(
)
)

def execute(self, query: Query) -> List[Dict[str, any]]:
def get_aggregates(
self,
table: Table,
column: SQALikeColumn,
aggregate_functions: List[AggregationFunction],
) -> Dict[str, Union[int, float]]:
"""
Get the aggregate functions for a column in a table
Returns:
Dict[str, Union[int, float]]: A dictionary of the aggregate functions
Example:
{
"sum": 100,
"avg": 50,
"count": 2,
"max": 75,
"min": 25
}
"""
row = self.execute(
Aggregation(
database=table.databaseSchema.name,
collection=table.name.__root__,
column=column.name,
aggregations=aggregate_functions,
)
)[0]
return {k: v for k, v in row.items() if k != "_id"}

def sum(self, table: Table, column: SQALikeColumn) -> AggregationFunction:
return AggregationFunction.SUM

def mean(self, table: Table, column: SQALikeColumn) -> AggregationFunction:
return AggregationFunction.MEAN

def max(self, table: Table, column: SQALikeColumn) -> AggregationFunction:
return AggregationFunction.MAX

def min(self, table: Table, column: SQALikeColumn) -> AggregationFunction:
return AggregationFunction.MIN

def execute(self, query: Executable) -> List[Dict[str, any]]:
records = list(query.to_executable(self.client))
result = []
for r in records:
Expand All @@ -89,5 +171,5 @@ def _json_safe(data: any):
try:
json.dumps(data)
return data
except Exception:
except Exception: # noqa
return str(data)
33 changes: 32 additions & 1 deletion ingestion/src/metadata/profiler/adaptors/nosql_adaptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,18 @@
NoSQL adaptor for the NoSQL profiler.
"""
from abc import ABC, abstractmethod
from typing import Dict, List
from typing import Dict, List, Union

from metadata.generated.schema.entity.data.table import Column, Table
from metadata.utils.sqa_like_column import SQALikeColumn


class NoSQLAdaptor(ABC):
"""
NoSQL adaptor for the NoSQL profiler. This class implememts the required methods for retreiving data from a NoSQL
database.
"""

@abstractmethod
def item_count(self, table: Table) -> int:
raise NotImplementedError
Expand All @@ -32,3 +38,28 @@ def query(
self, table: Table, columns: List[Column], query: any, limit: int
) -> List[Dict[str, any]]:
raise NotImplementedError

def get_aggregates(
self, table: Table, column: SQALikeColumn, aggregate_functions: List[any]
) -> Dict[str, Union[int, float]]:
raise NotImplementedError

def sum(
self, table: Table, column: Column # pylint: disable=unused-argument
) -> any:
return None

def mean(
self, table: Table, column: Column # pylint: disable=unused-argument
) -> any:
return None

def max(
self, table: Table, column: Column # pylint: disable=unused-argument
) -> any:
return None

def min(
self, table: Table, column: Column # pylint: disable=unused-argument
) -> any:
return None
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
supporting sqlalchemy abstraction layer
"""
import traceback
from collections import defaultdict
from datetime import datetime, timezone
from typing import Dict, List, Optional, Type

from sqlalchemy import Column
Expand Down Expand Up @@ -71,12 +73,26 @@ def _compute_table_metrics(
def _compute_static_metrics(
self,
metrics: List[Metrics],
runner: List,
column,
runner: NoSQLAdaptor,
column: SQALikeColumn,
*args,
**kwargs,
):
return None
) -> Dict[str, any]:
try:
aggs = [metric(column).nosql_fn(runner)(self.table) for metric in metrics]
filtered = [agg for agg in aggs if agg is not None]
if not filtered:
return {}
row = runner.get_aggregates(self.table, column, filtered)
return dict(row)
except Exception as exc:
logger.debug(
f"{traceback.format_exc()}\n"
f"Error trying to compute metrics for {self.table.fullyQualifiedName}: {exc}"
)
raise RuntimeError(
f"Error trying to compute metris for {self.table.fullyQualifiedName}: {exc}"
)

def _compute_query_metrics(
self,
Expand Down Expand Up @@ -172,7 +188,7 @@ def get_all_metrics(
metric_funcs: List[ThreadPoolMetrics],
):
"""get all profiler metrics"""
profile_results = {"table": {}, "columns": {}}
profile_results = {"table": {}, "columns": defaultdict(dict)}
runner = factory.create(
self.service_connection_config.__class__.__name__, self.connection
)
Expand All @@ -189,7 +205,15 @@ def get_all_metrics(
elif metric_type == MetricTypes.Custom.value and column is None:
profile_results["table"].update(profile)
else:
pass
profile_results["columns"][column].update(
{
"name": column,
"timestamp": int(
datetime.now(tz=timezone.utc).timestamp() * 1000
),
**profile,
}
)
return profile_results

@property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,8 +425,12 @@ def _compute_static_metrics(
runner,
*args,
**kwargs,
):
"""Get metrics"""
) -> Dict[str, Any]:
"""Get metrics
Return:
Dict[str, Any]: dict of metrics tio be merged into the final column profile. Keys need to be compatible with
the `metadata.generated.schema.entity.data.table.ColumnProfile` schema.
"""
raise NotImplementedError

@abstractmethod
Expand Down
16 changes: 13 additions & 3 deletions ingestion/src/metadata/profiler/metrics/static/max.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@
"""
Max Metric definition
"""
# pylint: disable=duplicate-code

from functools import partial
from typing import Callable, Optional

from sqlalchemy import TIME, column
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.functions import GenericFunction

from metadata.profiler.metrics.core import CACHE, StaticMetric, _label
from metadata.generated.schema.entity.data.table import Table
from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor
from metadata.profiler.metrics.core import CACHE, StaticMetric, T, _label
from metadata.profiler.orm.functions.length import LenFn
from metadata.profiler.orm.registry import (
FLOAT_SET,
Expand All @@ -29,6 +31,8 @@
is_quantifiable,
)

# pylint: disable=duplicate-code


class MaxFn(GenericFunction):
name = __qualname__
Expand Down Expand Up @@ -96,3 +100,9 @@ def df_fn(self, dfs=None):
max_ = max((df[self.col.name].max() for df in dfs))
return int(max_.timestamp() * 1000)
return 0

def nosql_fn(self, adaptor: NoSQLAdaptor) -> Callable[[Table], Optional[T]]:
"""nosql function"""
if is_quantifiable(self.col.type):
return partial(adaptor.max, column=self.col)
return lambda table: None
19 changes: 14 additions & 5 deletions ingestion/src/metadata/profiler/metrics/static/mean.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@
"""
AVG Metric definition
"""
# pylint: disable=duplicate-code


from typing import List, cast
from functools import partial
from typing import Callable, List, Optional, cast

from sqlalchemy import column, func
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.functions import GenericFunction

from metadata.profiler.metrics.core import CACHE, StaticMetric, _label
from metadata.generated.schema.entity.data.table import Table
from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor
from metadata.profiler.metrics.core import CACHE, StaticMetric, T, _label
from metadata.profiler.orm.functions.length import LenFn
from metadata.profiler.orm.registry import (
FLOAT_SET,
Expand All @@ -32,6 +32,9 @@
)
from metadata.utils.logger import profiler_logger

# pylint: disable=duplicate-code


logger = profiler_logger()


Expand Down Expand Up @@ -142,3 +145,9 @@ def df_fn(self, dfs=None):
f"Don't know how to process type {self.col.type} when computing MEAN"
)
return None

def nosql_fn(self, adaptor: NoSQLAdaptor) -> Callable[[Table], Optional[T]]:
"""nosql function"""
if is_quantifiable(self.col.type):
return partial(adaptor.mean, column=self.col)
return lambda table: None
15 changes: 13 additions & 2 deletions ingestion/src/metadata/profiler/metrics/static/min.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@
"""
Min Metric definition
"""
# pylint: disable=duplicate-code
from functools import partial
from typing import Callable, Optional

from sqlalchemy import TIME, column
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.functions import GenericFunction

from metadata.profiler.metrics.core import CACHE, StaticMetric, _label
from metadata.generated.schema.entity.data.table import Table
from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor
from metadata.profiler.metrics.core import CACHE, StaticMetric, T, _label
from metadata.profiler.orm.functions.length import LenFn
from metadata.profiler.orm.registry import (
FLOAT_SET,
Expand All @@ -28,6 +31,8 @@
is_quantifiable,
)

# pylint: disable=duplicate-code


class MinFn(GenericFunction):
name = __qualname__
Expand Down Expand Up @@ -96,3 +101,9 @@ def df_fn(self, dfs=None):
min_ = min((df[self.col.name].min() for df in dfs))
return int(min_.timestamp() * 1000)
return 0

def nosql_fn(self, adaptor: NoSQLAdaptor) -> Callable[[Table], Optional[T]]:
"""nosql function"""
if is_quantifiable(self.col.type):
return partial(adaptor.min, column=self.col)
return lambda table: None
Loading

0 comments on commit 50b2709

Please sign in to comment.