Skip to content

Commit

Permalink
feat(ingest): add nice semantic run-ids that use source type and time…
Browse files Browse the repository at this point in the history
… of ingestion (datahub-project#3279)
  • Loading branch information
swaroopjagadish authored Sep 21, 2021
1 parent 245b1bf commit 3574294
Showing 1 changed file with 20 additions and 3 deletions.
23 changes: 20 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/run/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import datetime
import logging
import uuid
from typing import Iterable, List, Optional
from typing import Any, Dict, Iterable, List, Optional

import click
from pydantic import Field
from pydantic import validator

from datahub.configuration.common import (
ConfigModel,
Expand Down Expand Up @@ -31,10 +32,26 @@ class PipelineConfig(ConfigModel):
# simplify this configuration and validation.
# See https://github.com/samuelcolvin/pydantic/pull/2336.

run_id: str = Field(default_factory=lambda: str(uuid.uuid1()))
source: SourceConfig
sink: DynamicTypedConfig
transformers: Optional[List[DynamicTypedConfig]]
run_id: str = "__DEFAULT_RUN_ID"

@validator("run_id", pre=True, always=True)
def run_id_should_be_semantic(
cls, v: Optional[str], values: Dict[str, Any], **kwargs: Any
) -> str:
if v == "__DEFAULT_RUN_ID":
if values["source"] is not None:
if values["source"].type is not None:
source_type = values["source"].type
current_time = datetime.datetime.now().strftime("%Y_%m_%d-%H_%M_%S")
return f"{source_type}-{current_time}"

return str(uuid.uuid1()) # default run_id if we cannot infer a source type
else:
assert v is not None
return v


class LoggingCallback(WriteCallback):
Expand Down

0 comments on commit 3574294

Please sign in to comment.