Skip to content

Commit

Permalink
feat(ingest/airflow): fix materialize_iolets bug (#10613)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 committed Jun 12, 2024
1 parent 3ac9b72 commit 5da77df
Show file tree
Hide file tree
Showing 17 changed files with 631 additions and 1,029 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,8 @@ def _get_dependencies(

@staticmethod
def generate_dataflow(
cluster: str,
config: DatahubLineageConfig,
dag: "DAG",
capture_owner: bool = True,
capture_tags: bool = True,
) -> DataFlow:
"""
Generates a Dataflow object from an Airflow DAG
Expand All @@ -146,7 +144,10 @@ def generate_dataflow(
orchestrator = "airflow"
description = "\n\n".join(filter(None, [dag.description, dag.doc_md])) or None
data_flow = DataFlow(
env=cluster, id=id, orchestrator=orchestrator, description=description
env=config.cluster,
id=id,
orchestrator=orchestrator,
description=description,
)

flow_property_bag: Dict[str, str] = {}
Expand All @@ -173,10 +174,10 @@ def generate_dataflow(
base_url = conf.get("webserver", "base_url")
data_flow.url = f"{base_url}/tree?dag_id={dag.dag_id}"

if capture_owner and dag.owner:
if config.capture_ownership_info and dag.owner:
data_flow.owners.update(owner.strip() for owner in dag.owner.split(","))

if capture_tags and dag.tags:
if config.capture_tags_info and dag.tags:
data_flow.tags.update(dag.tags)

return data_flow
Expand Down Expand Up @@ -311,14 +312,14 @@ def create_datajob_instance(
@staticmethod
def run_dataflow(
emitter: Emitter,
cluster: str,
config: DatahubLineageConfig,
dag_run: "DagRun",
start_timestamp_millis: Optional[int] = None,
dataflow: Optional[DataFlow] = None,
) -> None:
if dataflow is None:
assert dag_run.dag
dataflow = AirflowGenerator.generate_dataflow(cluster, dag_run.dag)
dataflow = AirflowGenerator.generate_dataflow(config, dag_run.dag)

if start_timestamp_millis is None:
assert dag_run.execution_date
Expand Down Expand Up @@ -357,13 +358,15 @@ def run_dataflow(
dpi.properties.update(property_bag)

dpi.emit_process_start(
emitter=emitter, start_timestamp_millis=start_timestamp_millis
emitter=emitter,
start_timestamp_millis=start_timestamp_millis,
materialize_iolets=config.materialize_iolets,
)

@staticmethod
def complete_dataflow(
emitter: Emitter,
cluster: str,
config: DatahubLineageConfig,
dag_run: "DagRun",
end_timestamp_millis: Optional[int] = None,
dataflow: Optional[DataFlow] = None,
Expand All @@ -378,7 +381,7 @@ def complete_dataflow(
"""
if dataflow is None:
assert dag_run.dag
dataflow = AirflowGenerator.generate_dataflow(cluster, dag_run.dag)
dataflow = AirflowGenerator.generate_dataflow(config, dag_run.dag)

assert dag_run.run_id
dpi = DataProcessInstance.from_dataflow(dataflow=dataflow, id=dag_run.run_id)
Expand Down Expand Up @@ -409,28 +412,27 @@ def complete_dataflow(
@staticmethod
def run_datajob(
emitter: Emitter,
cluster: str,
ti: "TaskInstance",
dag: "DAG",
dag_run: "DagRun",
config: DatahubLineageConfig,
start_timestamp_millis: Optional[int] = None,
datajob: Optional[DataJob] = None,
attempt: Optional[int] = None,
emit_templates: bool = True,
config: Optional[DatahubLineageConfig] = None,
) -> DataProcessInstance:
if datajob is None:
assert ti.task is not None
datajob = AirflowGenerator.generate_datajob(
cluster, ti.task, dag, config=config
config.cluster, ti.task, dag, config=config
)

assert dag_run.run_id
dpi = DataProcessInstance.from_datajob(
datajob=datajob,
id=f"{dag.dag_id}_{ti.task_id}_{dag_run.run_id}",
clone_inlets=config is None or config.materialize_iolets,
clone_outlets=config is None or config.materialize_iolets,
clone_inlets=True,
clone_outlets=True,
)
job_property_bag: Dict[str, str] = {}
job_property_bag["run_id"] = str(dag_run.run_id)
Expand Down Expand Up @@ -481,6 +483,7 @@ def run_datajob(
start_timestamp_millis=start_timestamp_millis,
attempt=attempt,
emit_template=emit_templates,
materialize_iolets=config.materialize_iolets,
)
return dpi

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,13 +408,12 @@ def on_task_instance_running(
if self.config.capture_executions:
dpi = AirflowGenerator.run_datajob(
emitter=self.emitter,
cluster=self.config.cluster,
config=self.config,
ti=task_instance,
dag=dag,
dag_run=dagrun,
datajob=datajob,
emit_templates=False,
config=self.config,
)
logger.debug(f"Emitted DataHub DataProcess Instance start: {dpi}")

Expand Down Expand Up @@ -530,10 +529,8 @@ def on_dag_start(self, dag_run: "DagRun") -> None:
return

dataflow = AirflowGenerator.generate_dataflow(
cluster=self.config.cluster,
config=self.config,
dag=dag,
capture_tags=self.config.capture_tags_info,
capture_owner=self.config.capture_ownership_info,
)
dataflow.emit(self.emitter, callback=self._make_emit_callback())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,8 @@ def datahub_task_status_callback(context, status):
)

dataflow = AirflowGenerator.generate_dataflow(
cluster=config.cluster,
config=config,
dag=dag,
capture_tags=config.capture_tags_info,
capture_owner=config.capture_ownership_info,
)
task.log.info(f"Emitting Datahub Dataflow: {dataflow}")
dataflow.emit(emitter, callback=_make_emit_callback(task.log))
Expand Down Expand Up @@ -139,13 +137,12 @@ def datahub_task_status_callback(context, status):
if config.capture_executions:
dpi = AirflowGenerator.run_datajob(
emitter=emitter,
cluster=config.cluster,
config=config,
ti=ti,
dag=dag,
dag_run=context["dag_run"],
datajob=datajob,
start_timestamp_millis=int(ti.start_date.timestamp() * 1000),
config=config,
)

task.log.info(f"Emitted Start Datahub Dataprocess Instance: {dpi}")
Expand Down Expand Up @@ -207,13 +204,12 @@ def datahub_pre_execution(context):
if config.capture_executions:
dpi = AirflowGenerator.run_datajob(
emitter=emitter,
cluster=config.cluster,
config=config,
ti=ti,
dag=dag,
dag_run=context["dag_run"],
datajob=datajob,
start_timestamp_millis=int(ti.start_date.timestamp() * 1000),
config=config,
)

task.log.info(f"Emitting Datahub Dataprocess Instance: {dpi}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,8 @@ def send_lineage_to_datahub(
emitter = hook.make_emitter()

dataflow = AirflowGenerator.generate_dataflow(
cluster=config.cluster,
config=config,
dag=dag,
capture_tags=config.capture_tags_info,
capture_owner=config.capture_ownership_info,
)
dataflow.emit(emitter)
operator.log.info(f"Emitted from Lineage: {dataflow}")
Expand Down Expand Up @@ -68,7 +66,7 @@ def send_lineage_to_datahub(

dpi = AirflowGenerator.run_datajob(
emitter=emitter,
cluster=config.cluster,
config=config,
ti=ti,
dag=dag,
dag_run=dag_run,
Expand Down
Loading

0 comments on commit 5da77df

Please sign in to comment.