diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 024950e3a6fd5..ac0c05558e1f4 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -645,6 +645,7 @@ def get_long_description(): ], "datahub.ingestion.checkpointing_provider.plugins": [ "datahub = datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider:DatahubIngestionCheckpointingProvider", + "file = datahub.ingestion.source.state_provider.file_ingestion_checkpointing_provider:FileIngestionCheckpointingProvider", ], "datahub.ingestion.reporting_provider.plugins": [ "datahub = datahub.ingestion.reporting.datahub_ingestion_run_summary_provider:DatahubIngestionRunSummaryProvider", diff --git a/metadata-ingestion/src/datahub/ingestion/source/state_provider/file_ingestion_checkpointing_provider.py b/metadata-ingestion/src/datahub/ingestion/source/state_provider/file_ingestion_checkpointing_provider.py new file mode 100644 index 0000000000000..1bfbc7e8c0a4b --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/state_provider/file_ingestion_checkpointing_provider.py @@ -0,0 +1,133 @@ +import logging +import pathlib +from datetime import datetime +from typing import Any, Dict, Optional, cast + +from datahub.configuration.common import ConfigurationError +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.ingestion_job_checkpointing_provider_base import ( + IngestionCheckpointingProviderBase, + IngestionCheckpointingProviderConfig, + JobId, +) +from datahub.ingestion.sink.file import write_metadata_file +from datahub.ingestion.source.file import read_metadata_file +from datahub.metadata.schema_classes import DatahubIngestionCheckpointClass + +logger = logging.getLogger(__name__) + + +class FileIngestionStateProviderConfig(IngestionCheckpointingProviderConfig): + filename: Optional[str] = None + + +class FileIngestionCheckpointingProvider(IngestionCheckpointingProviderBase): + orchestrator_name: str = "file" + + def __init__(self, filename: str, name: str): + super().__init__(name) + self.filename = filename + self.committed = False + + @classmethod + def create( + cls, config_dict: Dict[str, Any], ctx: PipelineContext, name: str + ) -> "FileIngestionCheckpointingProvider": + if config_dict is None: + raise ConfigurationError("Missing provider configuration.") + else: + provider_config = FileIngestionStateProviderConfig.parse_obj_allow_extras( + config_dict + ) + if provider_config.filename: + return cls(provider_config.filename, name) + else: + raise ConfigurationError( + "Missing filename. Provide filename under the state_provider configuration." + ) + + def get_latest_checkpoint( + self, + pipeline_name: str, + job_name: JobId, + ) -> Optional[DatahubIngestionCheckpointClass]: + logger.debug( + f"Querying for the latest ingestion checkpoint for pipelineName:'{pipeline_name}'," + f" job_name:'{job_name}'" + ) + + data_job_urn = self.get_data_job_urn( + self.orchestrator_name, pipeline_name, job_name + ) + latest_checkpoint: Optional[DatahubIngestionCheckpointClass] = None + for obj in read_metadata_file(pathlib.Path(self.filename)): + if isinstance(obj, MetadataChangeProposalWrapper) and obj.aspect: + if ( + obj.entityUrn == data_job_urn + and obj.aspectName == "datahubIngestionCheckpoint" + and obj.aspect.get("pipelineName", "") == pipeline_name + ): + latest_checkpoint = cast( + Optional[DatahubIngestionCheckpointClass], obj.aspect + ) + break + + if latest_checkpoint: + logger.debug( + f"The last committed ingestion checkpoint for pipelineName:'{pipeline_name}'," + f" job_name:'{job_name}' found with start_time:" + f" {datetime.utcfromtimestamp(latest_checkpoint.timestampMillis/1000)}" + ) + return latest_checkpoint + else: + logger.debug( + f"No committed ingestion checkpoint for pipelineName:'{pipeline_name}'," + f" job_name:'{job_name}' found" + ) + + return None + + def commit(self) -> None: + if not self.state_to_commit: + logger.warning(f"No state available to commit for {self.name}") + return None + + for job_name, checkpoint in self.state_to_commit.items(): + # Emit the ingestion state for each job + logger.debug( + f"Committing ingestion checkpoint for pipeline:'{checkpoint.pipelineName}', " + f"job:'{job_name}'" + ) + + datajob_urn = self.get_data_job_urn( + self.orchestrator_name, + checkpoint.pipelineName, + job_name, + ) + + if not self.committed: + checkpoint_workunit = MetadataChangeProposalWrapper( + entityUrn=datajob_urn, + aspect=checkpoint, + ) + write_metadata_file(pathlib.Path(self.filename), [checkpoint_workunit]) + self.committed = True + else: + existing_checkpoint_workunits = [ + obj for obj in read_metadata_file(pathlib.Path(self.filename)) + ] + existing_checkpoint_workunits.append( + MetadataChangeProposalWrapper( + entityUrn=datajob_urn, + aspect=checkpoint, + ) + ) + write_metadata_file( + pathlib.Path(self.filename), existing_checkpoint_workunits + ) + + logger.debug( + f"Committed ingestion checkpoint for pipeline:'{checkpoint.pipelineName}', " + f"job:'{job_name}'" + ) diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/provider/test_file_ingestion_checkpointing_provider.py b/metadata-ingestion/tests/unit/stateful_ingestion/provider/test_file_ingestion_checkpointing_provider.py new file mode 100644 index 0000000000000..44b4acebedab6 --- /dev/null +++ b/metadata-ingestion/tests/unit/stateful_ingestion/provider/test_file_ingestion_checkpointing_provider.py @@ -0,0 +1,86 @@ +from typing import List + +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.ingestion_job_checkpointing_provider_base import JobId +from datahub.ingestion.source.state.checkpoint import Checkpoint +from datahub.ingestion.source.state.sql_common_state import ( + BaseSQLAlchemyCheckpointState, +) +from datahub.ingestion.source.state.usage_common_state import ( + BaseTimeWindowCheckpointState, +) +from datahub.ingestion.source.state_provider.file_ingestion_checkpointing_provider import ( + FileIngestionCheckpointingProvider, +) +from tests.test_helpers.type_helpers import assert_not_null + +pipeline_name: str = "test_pipeline" +job_names: List[JobId] = [JobId("job1"), JobId("job2")] +run_id: str = "test_run" + + +def test_file_ingestion_checkpointing_provider(tmp_path): + ctx: PipelineContext = PipelineContext(run_id=run_id, pipeline_name=pipeline_name) + provider = FileIngestionCheckpointingProvider.create( + {"filename": str(tmp_path / "checkpoint_mces.json")}, + ctx, + name=FileIngestionCheckpointingProvider.__name__, + ) + # 1. Create the individual job checkpoints with appropriate states. + # Job1 - Checkpoint with a BaseSQLAlchemyCheckpointState state + job1_state_obj = BaseSQLAlchemyCheckpointState() + job1_checkpoint = Checkpoint( + job_name=job_names[0], + pipeline_name=pipeline_name, + run_id=run_id, + state=job1_state_obj, + ) + # Job2 - Checkpoint with a BaseTimeWindowCheckpointState state + job2_state_obj = BaseTimeWindowCheckpointState( + begin_timestamp_millis=10, end_timestamp_millis=100 + ) + job2_checkpoint = Checkpoint( + job_name=job_names[1], + pipeline_name=pipeline_name, + run_id=run_id, + state=job2_state_obj, + ) + + # 2. Set the provider's state_to_commit. + provider.state_to_commit = { + # NOTE: state_to_commit accepts only the aspect version of the checkpoint. + job_names[0]: assert_not_null( + job1_checkpoint.to_checkpoint_aspect(max_allowed_state_size=2**20) + ), + job_names[1]: assert_not_null( + job2_checkpoint.to_checkpoint_aspect(max_allowed_state_size=2**20) + ), + } + + # 3. Perform the commit + # NOTE: This will commit the state to the in-memory mcps_emitted because of the monkey-patching. + provider.commit() + assert provider.committed + + # 4. Get last committed state. This must match what has been committed earlier. + # NOTE: This will retrieve from in-memory mcps_emitted because of the monkey-patching. + job1_last_state = provider.get_latest_checkpoint(pipeline_name, job_names[0]) + job2_last_state = provider.get_latest_checkpoint(pipeline_name, job_names[1]) + + # 5. Validate individual job checkpoint state values that have been committed and retrieved + # against the original values. + assert job1_last_state is not None + job1_last_checkpoint = Checkpoint.create_from_checkpoint_aspect( + job_name=job_names[0], + checkpoint_aspect=job1_last_state, + state_class=type(job1_state_obj), + ) + assert job1_last_checkpoint == job1_checkpoint + + assert job2_last_state is not None + job2_last_checkpoint = Checkpoint.create_from_checkpoint_aspect( + job_name=job_names[1], + checkpoint_aspect=job2_last_state, + state_class=type(job2_state_obj), + ) + assert job2_last_checkpoint == job2_checkpoint