Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion/airflow): Added support of datajob entity as input lineage #16

Closed

Conversation

shubhamjagtap639
Copy link
Owner

Checklist

  • The PR conforms to DataHub's Contributing Guideline (particularly Commit Message Format)
  • Links to related issues (if applicable)
  • Tests for the changes have been added/updated (if applicable)
  • Docs related to the changes have been added/updated (if applicable). If a new feature has been added a Usage Guide has been added for the same.
  • For any breaking change/potential downtime/deprecation/big changes an entry has been made in Updating DataHub

Copy link

@hsheth2 hsheth2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one suggestion, mostly looking good

from airflow.models import DagRun, TaskInstance

from airflow import DAG
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why'd this change?

datajob.outlets.extend(
DatasetUrn.create_from_string(urn) for urn in output_urns
)
datajob.inlets.extend(entities_to_dataset_urn_list(input_urns))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about a entities_to_urn_list(list, allowed_entities={"dataset, "datajob"})

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use common method to extend both iolets and upstrem_jobs, we get lint error as:
src/datahub_airflow_plugin/datahub_listener.py:279: error: Argument 1 to "extend" of "list" has incompatible type "list[DatasetUrn | DataJobUrn]"; expected "Iterable[DatasetUrn]" [arg-type]
with below method definition:
def entities_to_urn_list( iolets: List[str], allowed_entities: Set[str] ) -> List[Union[DatasetUrn, DataJobUrn]]: urn_list: List[Union[DatasetUrn, DataJobUrn]] = [] for let in iolets: entity = guess_entity_type(let) if entity in allowed_entities: if entity == "dataset": urn_list.append(DatasetUrn.create_from_string(let)) elif entity == "dataJob": urn_list.append(DataJobUrn.create_from_string(let)) return urn_list

@hsheth2
Copy link

hsheth2 commented Nov 9, 2023

@shubhamjagtap639 can you move this PR to open-source?

@shubhamjagtap639 shubhamjagtap639 force-pushed the Airflow-Input-lineage-support-airflow-task branch from e4fb11e to d57d86e Compare November 9, 2023 09:52
@shubhamjagtap639 shubhamjagtap639 deleted the Airflow-Input-lineage-support-airflow-task branch March 14, 2024 11:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants