diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py index eb333181fa065..f162db3ea9ce8 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py @@ -1,9 +1,10 @@ +import datetime import logging import uuid -from typing import Iterable, List, Optional +from typing import Any, Dict, Iterable, List, Optional import click -from pydantic import Field +from pydantic import validator from datahub.configuration.common import ( ConfigModel, @@ -31,10 +32,26 @@ class PipelineConfig(ConfigModel): # simplify this configuration and validation. # See https://github.com/samuelcolvin/pydantic/pull/2336. - run_id: str = Field(default_factory=lambda: str(uuid.uuid1())) source: SourceConfig sink: DynamicTypedConfig transformers: Optional[List[DynamicTypedConfig]] + run_id: str = "__DEFAULT_RUN_ID" + + @validator("run_id", pre=True, always=True) + def run_id_should_be_semantic( + cls, v: Optional[str], values: Dict[str, Any], **kwargs: Any + ) -> str: + if v == "__DEFAULT_RUN_ID": + if values["source"] is not None: + if values["source"].type is not None: + source_type = values["source"].type + current_time = datetime.datetime.now().strftime("%Y_%m_%d-%H_%M_%S") + return f"{source_type}-{current_time}" + + return str(uuid.uuid1()) # default run_id if we cannot infer a source type + else: + assert v is not None + return v class LoggingCallback(WriteCallback):