Skip to content

Commit

Permalink
fix(ingestion/airflow-plugin): warning log for non-materialized iolets (
Browse files Browse the repository at this point in the history
  • Loading branch information
dushayntAW authored and sleeperdeep committed Jun 25, 2024
1 parent f769962 commit 5477466
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -428,8 +428,8 @@ def run_datajob(
dpi = DataProcessInstance.from_datajob(
datajob=datajob,
id=f"{dag.dag_id}_{ti.task_id}_{dag_run.run_id}",
clone_inlets=True,
clone_outlets=True,
clone_inlets=config is None or config.materialize_iolets,
clone_outlets=config is None or config.materialize_iolets,
)
job_property_bag: Dict[str, str] = {}
job_property_bag["run_id"] = str(dag_run.run_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,14 @@ def on_task_instance_running(

self.emitter.emit(operation_mcp)
logger.debug(f"Emitted Dataset Operation: {outlet}")
else:
if self.graph:
for outlet in datajob.outlets:
if not self.graph.exists(str(outlet)):
logger.warning(f"Dataset {str(outlet)} not materialized")
for inlet in datajob.inlets:
if not self.graph.exists(str(inlet)):
logger.warning(f"Dataset {str(inlet)} not materialized")

def on_task_instance_finish(
self, task_instance: "TaskInstance", status: InstanceRunResult
Expand Down

0 comments on commit 5477466

Please sign in to comment.