Skip to content

Commit

Permalink
add state inspect command
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 committed Dec 14, 2022
1 parent 0e10e20 commit 81cd311
Show file tree
Hide file tree
Showing 15 changed files with 90 additions and 102 deletions.
19 changes: 11 additions & 8 deletions metadata-ingestion/src/datahub/cli/get_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Any, List, Optional

import click
from click_default_group import DefaultGroup

from datahub.cli.cli_utils import get_aspects_for_entity
from datahub.telemetry import telemetry
Expand All @@ -11,23 +12,25 @@
logger = logging.getLogger(__name__)


@click.command(
name="get",
context_settings=dict(
ignore_unknown_options=False,
allow_extra_args=True,
),
)
@click.group(cls=DefaultGroup, default="urn")
def get() -> None:
"""A group of commands to get metadata from DataHub."""
pass


@get.command()
@click.option("--urn", required=False, type=str)
@click.option("-a", "--aspect", required=False, multiple=True, type=str)
@click.pass_context
@upgrade.check_upgrade
@telemetry.with_telemetry
def get(ctx: Any, urn: Optional[str], aspect: List[str]) -> None:
def urn(ctx: Any, urn: Optional[str], aspect: List[str]) -> None:
"""
Get metadata for an entity with an optional list of aspects to project.
This works for both versioned aspects and timeseries aspects. For timeseries aspects, it fetches the latest value.
"""
# We're using ctx.args here so that we can support `datahub get urn:li:...`
# in addition to the `--urn` variant.

if urn is None:
if not ctx.args:
Expand Down
2 changes: 2 additions & 0 deletions metadata-ingestion/src/datahub/entrypoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from datahub.cli.ingest_cli import ingest
from datahub.cli.migrate import migrate
from datahub.cli.put_cli import put
from datahub.cli.state_cli import state
from datahub.cli.telemetry import telemetry as telemetry_cli
from datahub.cli.timeline_cli import timeline
from datahub.telemetry import telemetry
Expand Down Expand Up @@ -149,6 +150,7 @@ def init() -> None:
datahub.add_command(delete)
datahub.add_command(get)
datahub.add_command(put)
datahub.add_command(state)
datahub.add_command(telemetry_cli)
datahub.add_command(migrate)
datahub.add_command(timeline)
Expand Down
3 changes: 1 addition & 2 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,6 @@ def list_all_entity_urns(
def get_latest_timeseries_value(
self,
entity_urn: str,
aspect_name: str,
aspect_type: Type[Aspect],
filter_criteria_map: Dict[str, str],
) -> Optional[Aspect]:
Expand All @@ -285,7 +284,7 @@ def get_latest_timeseries_value(
query_body = {
"urn": entity_urn,
"entity": guess_entity_type(entity_urn),
"aspect": aspect_name,
"aspect": aspect_type.ASPECT_NAME,
"latestValue": True,
"filter": {"or": [{"and": filter_criteria}]},
}
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/pulsar.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.extractor import schema_util
from datahub.ingestion.source.state.kafka_state import KafkaCheckpointState
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
)
Expand Down Expand Up @@ -102,7 +102,7 @@ def __init__(self, config: PulsarSourceConfig, ctx: PipelineContext):
self.stale_entity_removal_handler = StaleEntityRemovalHandler(
source=self,
config=self.config,
state_type_class=KafkaCheckpointState,
state_type_class=GenericCheckpointState,
pipeline_name=self.ctx.pipeline_name,
run_id=self.ctx.run_id,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def create_from_checkpoint_aspect(
job_name: str,
checkpoint_aspect: Optional[DatahubIngestionCheckpointClass],
state_class: Type[StateType],
) -> Optional["Checkpoint"]:
) -> Optional["Checkpoint[StateType]"]:
if checkpoint_aspect is None:
return None
else:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,8 @@
from datahub.ingestion.source.state.entity_removal_state import (
GenericCheckpointState,
pydantic_state_migrator,
)
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState


class DbtCheckpointState(GenericCheckpointState):
"""
Class for representing the checkpoint state for DBT sources.
Stores all nodes and assertions being ingested and is used to remove any stale entities.
"""

_migration = pydantic_state_migrator(
{
"encoded_node_urns": "dataset",
"encoded_assertion_urns": "assertion",
}
)
Original file line number Diff line number Diff line change
Expand Up @@ -11,50 +11,6 @@
from datahub.utilities.urns.urn import guess_entity_type


class GenericCheckpointState(StaleEntityCheckpointStateBase["GenericCheckpointState"]):
urns: List[str] = pydantic.Field(default_factory=list)

# We store a bit of extra internal-only state so that we can keep the urns list deduplicated.
# However, we still want `urns` to be a list so that it maintains its order.
# We can't used OrderedSet here because pydantic doesn't recognize it and
# it isn't JSON serializable.
_urns_set: set = pydantic.PrivateAttr(default_factory=set)

def __init__(self, **data: Any): # type: ignore
super().__init__(**data)
self.urns = deduplicate_list(self.urns)
self._urns_set = set(self.urns)

@classmethod
def get_supported_types(cls) -> List[str]:
return ["*"]

def add_checkpoint_urn(self, type: str, urn: str) -> None:
if urn not in self._urns_set:
self.urns.append(urn)
self._urns_set.add(urn)

def get_urns_not_in(
self, type: str, other_checkpoint_state: "GenericCheckpointState"
) -> Iterable[str]:
diff = set(self.urns) - set(other_checkpoint_state.urns)

# To maintain backwards compatibility, we provide this filtering mechanism.
if type == "*":
yield from diff
elif type == "topic":
yield from (urn for urn in diff if guess_entity_type(urn) == "dataset")
else:
yield from (urn for urn in diff if guess_entity_type(urn) == type)

def get_percent_entities_changed(
self, old_checkpoint_state: "GenericCheckpointState"
) -> float:
return StaleEntityCheckpointStateBase.compute_percent_entities_changed(
[(self.urns, old_checkpoint_state.urns)]
)


def pydantic_state_migrator(mapping: Dict[str, str]) -> classmethod:
# mapping would be something like:
# {
Expand Down Expand Up @@ -98,3 +54,62 @@ def _validate_field_rename(cls: Type, values: dict) -> dict:
return values

return pydantic.root_validator(pre=True, allow_reuse=True)(_validate_field_rename)


class GenericCheckpointState(StaleEntityCheckpointStateBase["GenericCheckpointState"]):
urns: List[str] = pydantic.Field(default_factory=list)

# We store a bit of extra internal-only state so that we can keep the urns list deduplicated.
# However, we still want `urns` to be a list so that it maintains its order.
# We can't used OrderedSet here because pydantic doesn't recognize it and
# it isn't JSON serializable.
_urns_set: set = pydantic.PrivateAttr(default_factory=set)

_migration = pydantic_state_migrator(
{
# From SQL:
"encoded_table_urns": "dataset",
"encoded_view_urns": "dataset",
"encoded_container_urns": "container",
"encoded_assertion_urns": "assertion",
# From kafka:
"encoded_topic_urns": "topic",
# From dbt:
"encoded_node_urns": "dataset",
# "encoded_assertion_urns": "assertion", # already handled from SQL
}
)

def __init__(self, **data: Any): # type: ignore
super().__init__(**data)
self.urns = deduplicate_list(self.urns)
self._urns_set = set(self.urns)

@classmethod
def get_supported_types(cls) -> List[str]:
return ["*"]

def add_checkpoint_urn(self, type: str, urn: str) -> None:
if urn not in self._urns_set:
self.urns.append(urn)
self._urns_set.add(urn)

def get_urns_not_in(
self, type: str, other_checkpoint_state: "GenericCheckpointState"
) -> Iterable[str]:
diff = set(self.urns) - set(other_checkpoint_state.urns)

# To maintain backwards compatibility, we provide this filtering mechanism.
if type == "*":
yield from diff
elif type == "topic":
yield from (urn for urn in diff if guess_entity_type(urn) == "dataset")
else:
yield from (urn for urn in diff if guess_entity_type(urn) == type)

def get_percent_entities_changed(
self, old_checkpoint_state: "GenericCheckpointState"
) -> float:
return StaleEntityCheckpointStateBase.compute_percent_entities_changed(
[(self.urns, old_checkpoint_state.urns)]
)
Original file line number Diff line number Diff line change
@@ -1,17 +1,8 @@
from datahub.ingestion.source.state.entity_removal_state import (
GenericCheckpointState,
pydantic_state_migrator,
)
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState


class KafkaCheckpointState(GenericCheckpointState):
"""
This class represents the checkpoint state for Kafka based sources.
Stores all the topics being ingested and it is used to remove any stale entities.
"""

_migration = pydantic_state_migrator(
{
"encoded_topic_urns": "topic",
}
)
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
from datahub.ingestion.source.state.entity_removal_state import (
GenericCheckpointState,
pydantic_state_migrator,
)
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState


class BaseSQLAlchemyCheckpointState(GenericCheckpointState):
Expand All @@ -10,12 +7,3 @@ class BaseSQLAlchemyCheckpointState(GenericCheckpointState):
Stores all tables and views being ingested and is used to remove any stale entities.
Subclasses can define additional state as appropriate.
"""

_migration = pydantic_state_migrator(
{
"encoded_table_urns": "dataset",
"encoded_view_urns": "dataset",
"encoded_container_urns": "container",
"encoded_assertion_urns": "assertion",
}
)
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ def __init__(
self._job_id = self._init_job_id()
self.source.register_stateful_ingestion_usecase_handler(self)

def _init_job_id(self) -> JobId:
@classmethod
def compute_job_id(cls, platform: Optional[str]) -> JobId:
# Handle backward-compatibility for existing sources.
backward_comp_platform_to_job_name: Dict[str, str] = {
"bigquery": "ingest_from_bigquery_source",
Expand All @@ -184,14 +185,17 @@ def _init_job_id(self) -> JobId:
"pulsar": "ingest_from_pulsar_source",
"snowflake": "common_ingest_from_sql_source",
}
platform: Optional[str] = getattr(self.source, "platform")
if platform in backward_comp_platform_to_job_name:
return JobId(backward_comp_platform_to_job_name[platform])

# Default name for everything else
job_name_suffix = "stale_entity_removal"
return JobId(f"{platform}_{job_name_suffix}" if platform else job_name_suffix)

def _init_job_id(self) -> JobId:
platform: Optional[str] = getattr(self.source, "platform")
return self.compute_job_id(platform)

def _ignore_old_state(self) -> bool:
if (
self.stateful_ingestion_config is not None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,11 @@ def get_latest_checkpoint(
DatahubIngestionCheckpointClass
] = self.graph.get_latest_timeseries_value(
entity_urn=data_job_urn,
aspect_name="datahubIngestionCheckpoint",
aspect_type=DatahubIngestionCheckpointClass,
filter_criteria_map={
"pipelineName": pipeline_name,
"platformInstanceId": platform_instance_id,
},
aspect_type=DatahubIngestionCheckpointClass,
)
if latest_checkpoint:
logger.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def __exit__(self, exc_type, exc, traceback):

def get_current_checkpoint_from_pipeline(
pipeline: Pipeline,
) -> Optional[Checkpoint]:
) -> Optional[Checkpoint[KafkaCheckpointState]]:
kafka_source = cast(KafkaSource, pipeline.source)
return kafka_source.get_current_checkpoint(
kafka_source.stale_entity_removal_handler.job_id
Expand Down Expand Up @@ -172,8 +172,8 @@ def test_kafka_ingest_with_stateful(

# 3. Perform all assertions on the states. The deleted topic should not be
# part of the second state
state1 = cast(KafkaCheckpointState, checkpoint1.state)
state2 = cast(KafkaCheckpointState, checkpoint2.state)
state1 = checkpoint1.state
state2 = checkpoint2.state
difference_urns = list(
state1.get_urns_not_in(type="topic", other_checkpoint_state=state2)
)
Expand Down
1 change: 0 additions & 1 deletion metadata-ingestion/tests/test_helpers/state_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ def monkey_patch_get_latest_timeseries_value(
self,
graph_ref: MagicMock,
entity_urn: str,
aspect_name: str,
aspect_type: Type[DictWrapper],
filter_criteria_map: Dict[str, str],
) -> Optional[DictWrapper]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ def monkey_patch_get_latest_timeseries_value(
self,
graph_ref: MagicMock,
entity_urn: str,
aspect_name: str,
aspect_type: Type[DictWrapper],
filter_criteria_map: Dict[str, str],
) -> Optional[DictWrapper]:
Expand All @@ -98,7 +97,6 @@ def monkey_patch_get_latest_timeseries_value(
for a given entity urn.
"""
self.assertIsNotNone(graph_ref)
self.assertEqual(aspect_name, "datahubIngestionCheckpoint")
self.assertEqual(aspect_type, CheckpointJobStateType)
self.assertEqual(
filter_criteria_map,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def test_sql_common_state() -> None:
)


def test_backward_compat() -> None:
def test_state_backward_compat() -> None:
state = BaseSQLAlchemyCheckpointState.parse_obj(
dict(
encoded_table_urns=["mysql||db1.t1||PROD"],
Expand Down

0 comments on commit 81cd311

Please sign in to comment.