Skip to content

Commit

Permalink
File checkpointing provider code and test case added
Browse files Browse the repository at this point in the history
  • Loading branch information
shubhamjagtap639 committed Oct 13, 2023
1 parent d33a853 commit 1bd06b9
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 0 deletions.
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,7 @@ def get_long_description():
],
"datahub.ingestion.checkpointing_provider.plugins": [
"datahub = datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider:DatahubIngestionCheckpointingProvider",
"file = datahub.ingestion.source.state_provider.file_ingestion_checkpointing_provider:FileIngestionCheckpointingProvider",
],
"datahub.ingestion.reporting_provider.plugins": [
"datahub = datahub.ingestion.reporting.datahub_ingestion_run_summary_provider:DatahubIngestionRunSummaryProvider",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import logging
import pathlib
from datetime import datetime
from typing import Any, Dict, Optional, cast

from datahub.configuration.common import ConfigurationError
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.ingestion_job_checkpointing_provider_base import (
IngestionCheckpointingProviderBase,
IngestionCheckpointingProviderConfig,
JobId,
)
from datahub.ingestion.sink.file import write_metadata_file
from datahub.ingestion.source.file import read_metadata_file
from datahub.metadata.schema_classes import DatahubIngestionCheckpointClass

logger = logging.getLogger(__name__)


class FileIngestionStateProviderConfig(IngestionCheckpointingProviderConfig):
filename: Optional[str] = None


class FileIngestionCheckpointingProvider(IngestionCheckpointingProviderBase):
orchestrator_name: str = "file"

def __init__(self, filename: str, name: str):
super().__init__(name)
self.filename = filename
self.committed = False

@classmethod
def create(
cls, config_dict: Dict[str, Any], ctx: PipelineContext, name: str
) -> "FileIngestionCheckpointingProvider":
if config_dict is None:
raise ConfigurationError("Missing provider configuration.")
else:
provider_config = FileIngestionStateProviderConfig.parse_obj_allow_extras(
config_dict
)
if provider_config.filename:
return cls(provider_config.filename, name)
else:
raise ConfigurationError(
"Missing filename. Provide filename under the state_provider configuration."
)

def get_latest_checkpoint(
self,
pipeline_name: str,
job_name: JobId,
) -> Optional[DatahubIngestionCheckpointClass]:
logger.debug(
f"Querying for the latest ingestion checkpoint for pipelineName:'{pipeline_name}',"
f" job_name:'{job_name}'"
)

data_job_urn = self.get_data_job_urn(
self.orchestrator_name, pipeline_name, job_name
)
latest_checkpoint: Optional[DatahubIngestionCheckpointClass] = None
for obj in read_metadata_file(pathlib.Path(self.filename)):
if isinstance(obj, MetadataChangeProposalWrapper) and obj.aspect:
if (
obj.entityUrn == data_job_urn
and obj.aspectName == "datahubIngestionCheckpoint"
and obj.aspect.get("pipelineName", "") == pipeline_name
):
latest_checkpoint = cast(
Optional[DatahubIngestionCheckpointClass], obj.aspect
)
break

if latest_checkpoint:
logger.debug(
f"The last committed ingestion checkpoint for pipelineName:'{pipeline_name}',"
f" job_name:'{job_name}' found with start_time:"
f" {datetime.utcfromtimestamp(latest_checkpoint.timestampMillis/1000)}"
)
return latest_checkpoint
else:
logger.debug(
f"No committed ingestion checkpoint for pipelineName:'{pipeline_name}',"
f" job_name:'{job_name}' found"
)

return None

def commit(self) -> None:
if not self.state_to_commit:
logger.warning(f"No state available to commit for {self.name}")
return None

for job_name, checkpoint in self.state_to_commit.items():
# Emit the ingestion state for each job
logger.debug(
f"Committing ingestion checkpoint for pipeline:'{checkpoint.pipelineName}', "
f"job:'{job_name}'"
)

datajob_urn = self.get_data_job_urn(
self.orchestrator_name,
checkpoint.pipelineName,
job_name,
)

if not self.committed:
checkpoint_workunit = MetadataChangeProposalWrapper(
entityUrn=datajob_urn,
aspect=checkpoint,
)
write_metadata_file(pathlib.Path(self.filename), [checkpoint_workunit])
self.committed = True
else:
existing_checkpoint_workunits = [
obj for obj in read_metadata_file(pathlib.Path(self.filename))
]
existing_checkpoint_workunits.append(
MetadataChangeProposalWrapper(
entityUrn=datajob_urn,
aspect=checkpoint,
)
)
write_metadata_file(
pathlib.Path(self.filename), existing_checkpoint_workunits
)

logger.debug(
f"Committed ingestion checkpoint for pipeline:'{checkpoint.pipelineName}', "
f"job:'{job_name}'"
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from typing import List

from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.ingestion_job_checkpointing_provider_base import JobId
from datahub.ingestion.source.state.checkpoint import Checkpoint
from datahub.ingestion.source.state.sql_common_state import (
BaseSQLAlchemyCheckpointState,
)
from datahub.ingestion.source.state.usage_common_state import (
BaseTimeWindowCheckpointState,
)
from datahub.ingestion.source.state_provider.file_ingestion_checkpointing_provider import (
FileIngestionCheckpointingProvider,
)
from tests.test_helpers.type_helpers import assert_not_null

pipeline_name: str = "test_pipeline"
job_names: List[JobId] = [JobId("job1"), JobId("job2")]
run_id: str = "test_run"


def test_file_ingestion_checkpointing_provider(tmp_path):
ctx: PipelineContext = PipelineContext(run_id=run_id, pipeline_name=pipeline_name)
provider = FileIngestionCheckpointingProvider.create(
{"filename": str(tmp_path / "checkpoint_mces.json")},
ctx,
name=FileIngestionCheckpointingProvider.__name__,
)
# 1. Create the individual job checkpoints with appropriate states.
# Job1 - Checkpoint with a BaseSQLAlchemyCheckpointState state
job1_state_obj = BaseSQLAlchemyCheckpointState()
job1_checkpoint = Checkpoint(
job_name=job_names[0],
pipeline_name=pipeline_name,
run_id=run_id,
state=job1_state_obj,
)
# Job2 - Checkpoint with a BaseTimeWindowCheckpointState state
job2_state_obj = BaseTimeWindowCheckpointState(
begin_timestamp_millis=10, end_timestamp_millis=100
)
job2_checkpoint = Checkpoint(
job_name=job_names[1],
pipeline_name=pipeline_name,
run_id=run_id,
state=job2_state_obj,
)

# 2. Set the provider's state_to_commit.
provider.state_to_commit = {
# NOTE: state_to_commit accepts only the aspect version of the checkpoint.
job_names[0]: assert_not_null(
job1_checkpoint.to_checkpoint_aspect(max_allowed_state_size=2**20)
),
job_names[1]: assert_not_null(
job2_checkpoint.to_checkpoint_aspect(max_allowed_state_size=2**20)
),
}

# 3. Perform the commit
# NOTE: This will commit the state to the in-memory mcps_emitted because of the monkey-patching.
provider.commit()
assert provider.committed

# 4. Get last committed state. This must match what has been committed earlier.
# NOTE: This will retrieve from in-memory mcps_emitted because of the monkey-patching.
job1_last_state = provider.get_latest_checkpoint(pipeline_name, job_names[0])
job2_last_state = provider.get_latest_checkpoint(pipeline_name, job_names[1])

# 5. Validate individual job checkpoint state values that have been committed and retrieved
# against the original values.
assert job1_last_state is not None
job1_last_checkpoint = Checkpoint.create_from_checkpoint_aspect(
job_name=job_names[0],
checkpoint_aspect=job1_last_state,
state_class=type(job1_state_obj),
)
assert job1_last_checkpoint == job1_checkpoint

assert job2_last_state is not None
job2_last_checkpoint = Checkpoint.create_from_checkpoint_aspect(
job_name=job_names[1],
checkpoint_aspect=job2_last_state,
state_class=type(job2_state_obj),
)
assert job2_last_checkpoint == job2_checkpoint

0 comments on commit 1bd06b9

Please sign in to comment.