Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/airflow): fix materialize_iolets bug #10613

Merged
merged 3 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,8 @@ def _get_dependencies(

@staticmethod
def generate_dataflow(
cluster: str,
config: DatahubLineageConfig,
dag: "DAG",
capture_owner: bool = True,
capture_tags: bool = True,
) -> DataFlow:
"""
Generates a Dataflow object from an Airflow DAG
Expand All @@ -146,7 +144,10 @@ def generate_dataflow(
orchestrator = "airflow"
description = "\n\n".join(filter(None, [dag.description, dag.doc_md])) or None
data_flow = DataFlow(
env=cluster, id=id, orchestrator=orchestrator, description=description
env=config.cluster,
id=id,
orchestrator=orchestrator,
description=description,
)

flow_property_bag: Dict[str, str] = {}
Expand All @@ -173,10 +174,10 @@ def generate_dataflow(
base_url = conf.get("webserver", "base_url")
data_flow.url = f"{base_url}/tree?dag_id={dag.dag_id}"

if capture_owner and dag.owner:
if config.capture_ownership_info and dag.owner:
data_flow.owners.update(owner.strip() for owner in dag.owner.split(","))

if capture_tags and dag.tags:
if config.capture_tags_info and dag.tags:
data_flow.tags.update(dag.tags)

return data_flow
Expand Down Expand Up @@ -311,14 +312,14 @@ def create_datajob_instance(
@staticmethod
def run_dataflow(
emitter: Emitter,
cluster: str,
config: DatahubLineageConfig,
dag_run: "DagRun",
start_timestamp_millis: Optional[int] = None,
dataflow: Optional[DataFlow] = None,
) -> None:
if dataflow is None:
assert dag_run.dag
dataflow = AirflowGenerator.generate_dataflow(cluster, dag_run.dag)
dataflow = AirflowGenerator.generate_dataflow(config, dag_run.dag)

if start_timestamp_millis is None:
assert dag_run.execution_date
Expand Down Expand Up @@ -357,13 +358,15 @@ def run_dataflow(
dpi.properties.update(property_bag)

dpi.emit_process_start(
emitter=emitter, start_timestamp_millis=start_timestamp_millis
emitter=emitter,
start_timestamp_millis=start_timestamp_millis,
materialize_iolets=config.materialize_iolets,
)

@staticmethod
def complete_dataflow(
emitter: Emitter,
cluster: str,
config: DatahubLineageConfig,
dag_run: "DagRun",
end_timestamp_millis: Optional[int] = None,
dataflow: Optional[DataFlow] = None,
Expand All @@ -378,7 +381,7 @@ def complete_dataflow(
"""
if dataflow is None:
assert dag_run.dag
dataflow = AirflowGenerator.generate_dataflow(cluster, dag_run.dag)
dataflow = AirflowGenerator.generate_dataflow(config, dag_run.dag)

assert dag_run.run_id
dpi = DataProcessInstance.from_dataflow(dataflow=dataflow, id=dag_run.run_id)
Expand Down Expand Up @@ -409,28 +412,27 @@ def complete_dataflow(
@staticmethod
def run_datajob(
emitter: Emitter,
cluster: str,
ti: "TaskInstance",
dag: "DAG",
dag_run: "DagRun",
config: DatahubLineageConfig,
start_timestamp_millis: Optional[int] = None,
datajob: Optional[DataJob] = None,
attempt: Optional[int] = None,
emit_templates: bool = True,
config: Optional[DatahubLineageConfig] = None,
) -> DataProcessInstance:
if datajob is None:
assert ti.task is not None
datajob = AirflowGenerator.generate_datajob(
cluster, ti.task, dag, config=config
config.cluster, ti.task, dag, config=config
)

assert dag_run.run_id
dpi = DataProcessInstance.from_datajob(
datajob=datajob,
id=f"{dag.dag_id}_{ti.task_id}_{dag_run.run_id}",
clone_inlets=config is None or config.materialize_iolets,
clone_outlets=config is None or config.materialize_iolets,
clone_inlets=True,
clone_outlets=True,
)
job_property_bag: Dict[str, str] = {}
job_property_bag["run_id"] = str(dag_run.run_id)
Expand Down Expand Up @@ -481,6 +483,7 @@ def run_datajob(
start_timestamp_millis=start_timestamp_millis,
attempt=attempt,
emit_template=emit_templates,
materialize_iolets=config.materialize_iolets,
)
return dpi

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,13 +403,12 @@ def on_task_instance_running(
if self.config.capture_executions:
dpi = AirflowGenerator.run_datajob(
emitter=self.emitter,
cluster=self.config.cluster,
config=self.config,
ti=task_instance,
dag=dag,
dag_run=dagrun,
datajob=datajob,
emit_templates=False,
config=self.config,
)
logger.debug(f"Emitted DataHub DataProcess Instance start: {dpi}")

Expand Down Expand Up @@ -525,10 +524,8 @@ def on_dag_start(self, dag_run: "DagRun") -> None:
return

dataflow = AirflowGenerator.generate_dataflow(
cluster=self.config.cluster,
config=self.config,
dag=dag,
capture_tags=self.config.capture_tags_info,
capture_owner=self.config.capture_ownership_info,
)
dataflow.emit(self.emitter, callback=self._make_emit_callback())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,8 @@ def datahub_task_status_callback(context, status):
)

dataflow = AirflowGenerator.generate_dataflow(
cluster=config.cluster,
config=config,
dag=dag,
capture_tags=config.capture_tags_info,
capture_owner=config.capture_ownership_info,
)
task.log.info(f"Emitting Datahub Dataflow: {dataflow}")
dataflow.emit(emitter, callback=_make_emit_callback(task.log))
Expand Down Expand Up @@ -139,13 +137,12 @@ def datahub_task_status_callback(context, status):
if config.capture_executions:
dpi = AirflowGenerator.run_datajob(
emitter=emitter,
cluster=config.cluster,
config=config,
ti=ti,
dag=dag,
dag_run=context["dag_run"],
datajob=datajob,
start_timestamp_millis=int(ti.start_date.timestamp() * 1000),
config=config,
)

task.log.info(f"Emitted Start Datahub Dataprocess Instance: {dpi}")
Expand Down Expand Up @@ -207,13 +204,12 @@ def datahub_pre_execution(context):
if config.capture_executions:
dpi = AirflowGenerator.run_datajob(
emitter=emitter,
cluster=config.cluster,
config=config,
ti=ti,
dag=dag,
dag_run=context["dag_run"],
datajob=datajob,
start_timestamp_millis=int(ti.start_date.timestamp() * 1000),
config=config,
)

task.log.info(f"Emitting Datahub Dataprocess Instance: {dpi}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,8 @@ def send_lineage_to_datahub(
emitter = hook.make_emitter()

dataflow = AirflowGenerator.generate_dataflow(
cluster=config.cluster,
config=config,
dag=dag,
capture_tags=config.capture_tags_info,
capture_owner=config.capture_ownership_info,
)
dataflow.emit(emitter)
operator.log.info(f"Emitted from Lineage: {dataflow}")
Expand Down Expand Up @@ -68,7 +66,7 @@ def send_lineage_to_datahub(

dpi = AirflowGenerator.run_datajob(
emitter=emitter,
cluster=config.cluster,
config=config,
ti=ti,
dag=dag,
dag_run=dag_run,
Expand Down
Loading
Loading