Skip to content

Commit

Permalink
simplify checkpoint_state_util
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 committed Dec 14, 2022
1 parent 1235e09 commit 0e10e20
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ def _validate_field_rename(cls: Type, values: dict) -> dict:

value = values.pop(old_field)
if mapped_type == "dataset":
values["urns"] += CheckpointStateUtil.get_dataset_urns_not_in(value, [])
values["urns"] += [
CheckpointStateUtil.get_urn_from_encoded_dataset(encoded_urn)
for encoded_urn in value
]
elif mapped_type == "topic":
values["urns"] += [
CheckpointStateUtil.get_urn_from_encoded_topic(encoded_urn)
Expand Down
29 changes: 6 additions & 23 deletions metadata-ingestion/src/datahub/utilities/checkpoint_state_util.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
from typing import Iterable, List, Set
from typing import List, Set

from datahub.emitter.mce_builder import (
dataset_key_to_urn,
dataset_urn_to_key,
make_dataset_urn,
)
from datahub.emitter.mce_builder import dataset_key_to_urn, make_dataset_urn
from datahub.metadata.schema_classes import DatasetKeyClass


Expand All @@ -25,24 +21,11 @@ def get_encoded_urns_not_in(
return set(encoded_urns_1) - set(encoded_urns_2)

@staticmethod
def get_dataset_lightweight_repr(dataset_urn: str) -> str:
SEP = CheckpointStateUtil.get_separator()
key = dataset_urn_to_key(dataset_urn)
assert key is not None
return f"{key.platform}{SEP}{key.name}{SEP}{key.origin}"

@staticmethod
def get_dataset_urns_not_in(
encoded_urns_1: List[str], encoded_urns_2: List[str]
) -> Iterable[str]:
difference = CheckpointStateUtil.get_encoded_urns_not_in(
encoded_urns_1, encoded_urns_2
def get_urn_from_encoded_dataset(encoded_urn: str) -> str:
platform, name, env = encoded_urn.split(CheckpointStateUtil.get_separator())
return dataset_key_to_urn(
DatasetKeyClass(platform=platform, name=name, origin=env)
)
for encoded_urn in difference:
platform, name, env = encoded_urn.split(CheckpointStateUtil.get_separator())
yield dataset_key_to_urn(
DatasetKeyClass(platform=platform, name=name, origin=env)
)

@staticmethod
def get_urn_from_encoded_topic(encoded_urn: str) -> str:
Expand Down

0 comments on commit 0e10e20

Please sign in to comment.