Skip to content

Commit

Permalink
feat(ingest/airflow): support BigQueryInsertJobOperator (datahub-proj…
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored and sleeperdeep committed Jun 25, 2024
1 parent 1e01389 commit e450f50
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 9 deletions.
11 changes: 10 additions & 1 deletion docs/lineage/airflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ If you're looking to schedule DataHub ingestion using Airflow, see the guide on

The DataHub Airflow plugin supports:

- Automatic column-level lineage extraction from various operators e.g. SQL operators (including `MySqlOperator`, `PostgresOperator`, `SnowflakeOperator`, and more), `S3FileTransformOperator`, and more.
- Automatic column-level lineage extraction from various operators e.g. SQL operators (including `MySqlOperator`, `PostgresOperator`, `SnowflakeOperator`, `BigQueryInsertJobOperator`, and more), `S3FileTransformOperator`, and more.
- Airflow DAG and tasks, including properties, ownership, and tags.
- Task run information, including task successes and failures.
- Manual lineage annotations using `inlets` and `outlets` on Airflow operators.
Expand Down Expand Up @@ -166,6 +166,7 @@ Supported operators:
- `SQLExecuteQueryOperator`, including any subclasses. Note that in newer versions of Airflow (generally Airflow 2.5+), most SQL operators inherit from this class.
- `AthenaOperator` and `AWSAthenaOperator`
- `BigQueryOperator` and `BigQueryExecuteQueryOperator`
- `BigQueryInsertJobOperator` (incubating)
- `MySqlOperator`
- `PostgresOperator`
- `RedshiftSQLOperator`
Expand Down Expand Up @@ -224,6 +225,14 @@ class DbtOperator(BaseOperator):

If you override the `pre_execute` and `post_execute` function, ensure they include the `@prepare_lineage` and `@apply_lineage` decorators respectively. Reference the [Airflow docs](https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/lineage.html#lineage) for more details.

### Custom Extractors

Note: these are only supported in the v2 plugin.

You can also create a custom extractor to extract lineage from any operator. This is useful if you're using a built-in Airflow operator for which we don't support automatic lineage extraction.

See this [example PR](https://github.com/datahub-project/datahub/pull/10452) which adds a custom extractor for the `BigQueryInsertJobOperator` operator.

## Emit Lineage Directly

If you can't use the plugin or annotate inlets/outlets, you can also emit lineage using the `DatahubEmitterOperator`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ def __init__(self):
for operator in _sql_operator_overrides:
self.task_to_extractor.extractors[operator] = GenericSqlExtractor

self.task_to_extractor.extractors[
"BigQueryInsertJobOperator"
] = BigQueryInsertJobOperatorExtractor

self._graph: Optional["DataHubGraph"] = None

@contextlib.contextmanager
Expand All @@ -78,7 +82,7 @@ def _patch_extractors(self):
unittest.mock.patch.object(
SnowflakeExtractor,
"default_schema",
property(snowflake_default_schema),
property(_snowflake_default_schema),
)
)

Expand Down Expand Up @@ -166,12 +170,6 @@ def _sql_extractor_extract(self: "SqlExtractor") -> TaskMetadata:
task_name = f"{self.operator.dag_id}.{self.operator.task_id}"
sql = self.operator.sql

run_facets = {}
job_facets = {"sql": SqlJobFacet(query=self._normalize_sql(sql))}

# Prepare to run the SQL parser.
graph = self.context.get(_DATAHUB_GRAPH_CONTEXT_KEY, None)

default_database = getattr(self.operator, "database", None)
if not default_database:
default_database = self.database
Expand All @@ -185,6 +183,31 @@ def _sql_extractor_extract(self: "SqlExtractor") -> TaskMetadata:
# Run the SQL parser.
scheme = self.scheme
platform = OL_SCHEME_TWEAKS.get(scheme, scheme)

return _parse_sql_into_task_metadata(
self,
sql,
platform=platform,
default_database=default_database,
default_schema=default_schema,
)


def _parse_sql_into_task_metadata(
self: "BaseExtractor",
sql: str,
platform: str,
default_database: Optional[str],
default_schema: Optional[str],
) -> TaskMetadata:
task_name = f"{self.operator.dag_id}.{self.operator.task_id}"

run_facets = {}
job_facets = {"sql": SqlJobFacet(query=self._normalize_sql(sql))}

# Prepare to run the SQL parser.
graph = self.context.get(_DATAHUB_GRAPH_CONTEXT_KEY, None)

self.log.debug(
"Running the SQL parser %s (platform=%s, default db=%s, schema=%s): %s",
"with graph client" if graph else "in offline mode",
Expand Down Expand Up @@ -232,7 +255,28 @@ def _sql_extractor_extract(self: "SqlExtractor") -> TaskMetadata:
)


def snowflake_default_schema(self: "SnowflakeExtractor") -> Optional[str]:
class BigQueryInsertJobOperatorExtractor(BaseExtractor):
def extract(self) -> Optional[TaskMetadata]:
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryInsertJobOperator, # type: ignore
)

operator: "BigQueryInsertJobOperator" = self.operator
sql = operator.configuration.get("query")
if not sql:
self.log.warning("No query found in BigQueryInsertJobOperator")
return None

return _parse_sql_into_task_metadata(
self,
sql,
platform="bigquery",
default_database=operator.project_id,
default_schema=None,
)


def _snowflake_default_schema(self: "SnowflakeExtractor") -> Optional[str]:
if hasattr(self.operator, "schema") and self.operator.schema is not None:
return self.operator.schema
return (
Expand Down

0 comments on commit e450f50

Please sign in to comment.