Skip to content

Commit

Permalink
feat(ingest/airflow): fix materialize_iolets bug
Browse files Browse the repository at this point in the history
This would cause it to not be respected for data process instances.
Also fixes a bug from #10421.
  • Loading branch information
hsheth2 committed May 29, 2024
1 parent 013425a commit 491640e
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,8 @@ def _get_dependencies(

@staticmethod
def generate_dataflow(
cluster: str,
config: DatahubLineageConfig,
dag: "DAG",
capture_owner: bool = True,
capture_tags: bool = True,
) -> DataFlow:
"""
Generates a Dataflow object from an Airflow DAG
Expand All @@ -146,7 +144,10 @@ def generate_dataflow(
orchestrator = "airflow"
description = "\n\n".join(filter(None, [dag.description, dag.doc_md])) or None
data_flow = DataFlow(
env=cluster, id=id, orchestrator=orchestrator, description=description
env=config.cluster,
id=id,
orchestrator=orchestrator,
description=description,
)

flow_property_bag: Dict[str, str] = {}
Expand All @@ -173,10 +174,10 @@ def generate_dataflow(
base_url = conf.get("webserver", "base_url")
data_flow.url = f"{base_url}/tree?dag_id={dag.dag_id}"

if capture_owner and dag.owner:
if config.capture_ownership_info and dag.owner:
data_flow.owners.update(owner.strip() for owner in dag.owner.split(","))

if capture_tags and dag.tags:
if config.capture_tags_info and dag.tags:
data_flow.tags.update(dag.tags)

return data_flow
Expand Down Expand Up @@ -311,14 +312,14 @@ def create_datajob_instance(
@staticmethod
def run_dataflow(
emitter: Emitter,
cluster: str,
config: DatahubLineageConfig,
dag_run: "DagRun",
start_timestamp_millis: Optional[int] = None,
dataflow: Optional[DataFlow] = None,
) -> None:
if dataflow is None:
assert dag_run.dag
dataflow = AirflowGenerator.generate_dataflow(cluster, dag_run.dag)
dataflow = AirflowGenerator.generate_dataflow(config, dag_run.dag)

if start_timestamp_millis is None:
assert dag_run.execution_date
Expand Down Expand Up @@ -357,13 +358,15 @@ def run_dataflow(
dpi.properties.update(property_bag)

dpi.emit_process_start(
emitter=emitter, start_timestamp_millis=start_timestamp_millis
emitter=emitter,
start_timestamp_millis=start_timestamp_millis,
materialize_iolets=config.materialize_iolets,
)

@staticmethod
def complete_dataflow(
emitter: Emitter,
cluster: str,
config: DatahubLineageConfig,
dag_run: "DagRun",
end_timestamp_millis: Optional[int] = None,
dataflow: Optional[DataFlow] = None,
Expand All @@ -378,7 +381,7 @@ def complete_dataflow(
"""
if dataflow is None:
assert dag_run.dag
dataflow = AirflowGenerator.generate_dataflow(cluster, dag_run.dag)
dataflow = AirflowGenerator.generate_dataflow(config, dag_run.dag)

assert dag_run.run_id
dpi = DataProcessInstance.from_dataflow(dataflow=dataflow, id=dag_run.run_id)
Expand Down Expand Up @@ -409,28 +412,27 @@ def complete_dataflow(
@staticmethod
def run_datajob(
emitter: Emitter,
cluster: str,
ti: "TaskInstance",
dag: "DAG",
dag_run: "DagRun",
config: DatahubLineageConfig,
start_timestamp_millis: Optional[int] = None,
datajob: Optional[DataJob] = None,
attempt: Optional[int] = None,
emit_templates: bool = True,
config: Optional[DatahubLineageConfig] = None,
) -> DataProcessInstance:
if datajob is None:
assert ti.task is not None
datajob = AirflowGenerator.generate_datajob(
cluster, ti.task, dag, config=config
config.cluster, ti.task, dag, config=config
)

assert dag_run.run_id
dpi = DataProcessInstance.from_datajob(
datajob=datajob,
id=f"{dag.dag_id}_{ti.task_id}_{dag_run.run_id}",
clone_inlets=config is None or config.materialize_iolets,
clone_outlets=config is None or config.materialize_iolets,
clone_inlets=True,
clone_outlets=True,
)
job_property_bag: Dict[str, str] = {}
job_property_bag["run_id"] = str(dag_run.run_id)
Expand Down Expand Up @@ -481,6 +483,7 @@ def run_datajob(
start_timestamp_millis=start_timestamp_millis,
attempt=attempt,
emit_template=emit_templates,
materialize_iolets=config.materialize_iolets,
)
return dpi

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,13 +403,12 @@ def on_task_instance_running(
if self.config.capture_executions:
dpi = AirflowGenerator.run_datajob(
emitter=self.emitter,
cluster=self.config.cluster,
config=self.config,
ti=task_instance,
dag=dag,
dag_run=dagrun,
datajob=datajob,
emit_templates=False,
config=self.config,
)
logger.debug(f"Emitted DataHub DataProcess Instance start: {dpi}")

Expand Down Expand Up @@ -525,10 +524,8 @@ def on_dag_start(self, dag_run: "DagRun") -> None:
return

dataflow = AirflowGenerator.generate_dataflow(
cluster=self.config.cluster,
config=self.config,
dag=dag,
capture_tags=self.config.capture_tags_info,
capture_owner=self.config.capture_ownership_info,
)
dataflow.emit(self.emitter, callback=self._make_emit_callback())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,8 @@ def datahub_task_status_callback(context, status):
)

dataflow = AirflowGenerator.generate_dataflow(
cluster=config.cluster,
config=config,
dag=dag,
capture_tags=config.capture_tags_info,
capture_owner=config.capture_ownership_info,
)
task.log.info(f"Emitting Datahub Dataflow: {dataflow}")
dataflow.emit(emitter, callback=_make_emit_callback(task.log))
Expand Down Expand Up @@ -139,13 +137,12 @@ def datahub_task_status_callback(context, status):
if config.capture_executions:
dpi = AirflowGenerator.run_datajob(
emitter=emitter,
cluster=config.cluster,
config=config,
ti=ti,
dag=dag,
dag_run=context["dag_run"],
datajob=datajob,
start_timestamp_millis=int(ti.start_date.timestamp() * 1000),
config=config,
)

task.log.info(f"Emitted Start Datahub Dataprocess Instance: {dpi}")
Expand Down Expand Up @@ -207,13 +204,12 @@ def datahub_pre_execution(context):
if config.capture_executions:
dpi = AirflowGenerator.run_datajob(
emitter=emitter,
cluster=config.cluster,
config=config,
ti=ti,
dag=dag,
dag_run=context["dag_run"],
datajob=datajob,
start_timestamp_millis=int(ti.start_date.timestamp() * 1000),
config=config,
)

task.log.info(f"Emitting Datahub Dataprocess Instance: {dpi}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,8 @@ def send_lineage_to_datahub(
emitter = hook.make_emitter()

dataflow = AirflowGenerator.generate_dataflow(
cluster=config.cluster,
config=config,
dag=dag,
capture_tags=config.capture_tags_info,
capture_owner=config.capture_ownership_info,
)
dataflow.emit(emitter)
operator.log.info(f"Emitted from Lineage: {dataflow}")
Expand Down Expand Up @@ -68,7 +66,7 @@ def send_lineage_to_datahub(

dpi = AirflowGenerator.run_datajob(
emitter=emitter,
cluster=config.cluster,
config=config,
ti=ti,
dag=dag,
dag_run=dag_run,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,18 @@ def emit_process_start(
start_timestamp_millis: int,
attempt: Optional[int] = None,
emit_template: bool = True,
materialize_iolets: bool = True,
callback: Optional[Callable[[Exception, str], None]] = None,
) -> None:
"""
:rtype: None
:param emitter: Datahub Emitter to emit the process event
:param start_timestamp_millis: (int) the execution start time in milliseconds
:param start_timestamp_millis: the execution start time in milliseconds
:param attempt: the number of attempt of the execution with the same execution id
:param emit_template: (bool) If it is set the template of the execution (datajob, dataflow) will be emitted as well.
:param callback: (Optional[Callable[[Exception, str], None]]) the callback method for KafkaEmitter if it is used
:param emit_template: If it is set the template of the execution (datajob, dataflow) will be emitted as well.
:param materialize_iolets: If it is set the iolets will be materialized
:param callback: the callback method for KafkaEmitter if it is used
"""
if emit_template and self.template_urn is not None:
template_object: Union[DataJob, DataFlow]
Expand Down Expand Up @@ -157,7 +159,10 @@ def emit_process_start(
for mcp in template_object.generate_mcp():
self._emit_mcp(mcp, emitter, callback)

for mcp in self.generate_mcp(created_ts_millis=start_timestamp_millis):
for mcp in self.generate_mcp(
created_ts_millis=start_timestamp_millis,
materialize_iolets=materialize_iolets,
):
self._emit_mcp(mcp, emitter, callback)
for mcp in self.start_event_mcp(start_timestamp_millis, attempt):
self._emit_mcp(mcp, emitter, callback)
Expand Down Expand Up @@ -230,7 +235,7 @@ def emit_process_end(
self._emit_mcp(mcp, emitter, callback)

def generate_mcp(
self, created_ts_millis: Optional[int] = None, materialize_iolets: bool = True
self, created_ts_millis: Optional[int], materialize_iolets: bool
) -> Iterable[MetadataChangeProposalWrapper]:
"""Generates mcps from the object"""

Expand Down Expand Up @@ -280,13 +285,17 @@ def emit(
self,
emitter: Emitter,
callback: Optional[Callable[[Exception, str], None]] = None,
created_ts_millis: Optional[int] = None,
) -> None:
"""
:param emitter: (Emitter) the datahub emitter to emit generated mcps
:param callback: (Optional[Callable[[Exception, str], None]]) the callback method for KafkaEmitter if it is used
"""
for mcp in self.generate_mcp():
for mcp in self.generate_mcp(
created_ts_millis=created_ts_millis,
materialize_iolets=True,
):
self._emit_mcp(mcp, emitter, callback)

@staticmethod
Expand Down

0 comments on commit 491640e

Please sign in to comment.