Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
from airflow.providers.common.sql.hooks.sql import DbApiHook
from airflow.providers.google.cloud.utils.bigquery import bq_cast
from airflow.providers.google.cloud.utils.credentials_provider import _get_scopes
from airflow.providers.google.cloud.utils.lineage import send_hook_lineage_for_bq_job
from airflow.providers.google.common.consts import CLIENT_INFO
from airflow.providers.google.common.deprecated import deprecated
from airflow.providers.google.common.hooks.base_google import (
Expand All @@ -88,6 +89,7 @@
from google.api_core.retry import Retry
from requests import Session

from airflow.providers.openlineage.sqlparser import DatabaseInfo
from airflow.sdk import Context

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -1330,19 +1332,10 @@ def insert_job(
# Start the job and wait for it to complete and get the result.
job_api_repr.result(timeout=timeout, retry=retry)

self._send_hook_level_lineage_for_bq_job(job=job_api_repr)
send_hook_lineage_for_bq_job(context=self, job=job_api_repr)

return job_api_repr

def _send_hook_level_lineage_for_bq_job(self, job):
# TODO(kacpermuda) Add support for other job types and more params to sql job
if job.job_type == QueryJob.job_type:
send_sql_hook_lineage(
context=self,
sql=job.query,
job_id=job.job_id,
)

def generate_job_id(
self,
job_id: str | None,
Expand Down Expand Up @@ -1503,6 +1496,31 @@ def scopes(self) -> Sequence[str]:
scope_value = self._get_field("scope", None)
return _get_scopes(scope_value)

def get_openlineage_database_info(self, connection) -> DatabaseInfo:
"""Return BigQuery specific information for OpenLineage."""
from airflow.providers.openlineage.sqlparser import DatabaseInfo

return DatabaseInfo(
scheme=self.get_openlineage_database_dialect(None),
authority=None,
database=self.project_id,
information_schema_columns=[
"table_schema",
"table_name",
"column_name",
"ordinal_position",
"data_type",
"table_catalog",
],
information_schema_table_name="INFORMATION_SCHEMA.COLUMNS",
)

def get_openlineage_database_dialect(self, _) -> str:
return "bigquery"

def get_openlineage_default_schema(self) -> str | None:
return None


class BigQueryConnection:
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import logging

from google.cloud.bigquery import CopyJob, ExtractJob, LoadJob, QueryJob

from airflow.providers.common.compat.lineage.hook import get_hook_lineage_collector
from airflow.providers.common.sql.hooks.lineage import send_sql_hook_lineage

log = logging.getLogger(__name__)


def _add_bq_table_to_lineage(collector, context, table_ref, *, is_input: bool):
method = collector.add_input_asset if is_input else collector.add_output_asset
method(
context=context,
scheme="bigquery",
asset_kwargs={
"project_id": table_ref.project,
"dataset_id": table_ref.dataset_id,
"table_id": table_ref.table_id,
},
)


def _add_gcs_uris_to_lineage(collector, context, uris, *, is_input: bool):
method = collector.add_input_asset if is_input else collector.add_output_asset
for uri in uris or []:
method(context=context, uri=uri)


def send_hook_lineage_for_bq_job(context, job):
"""
Send hook-level lineage for a BigQuery job to the lineage collector.

Handles all four BigQuery job types:
- QUERY: delegates to send_sql_hook_lineage for SQL parsing
- LOAD: source URIs (GCS) as inputs, destination table as output
- COPY: source tables as inputs, destination table as output
- EXTRACT: source table as input, destination URIs (GCS) as outputs

:param context: The hook instance used as lineage context.
:param job: A BigQuery job object (QueryJob, LoadJob, CopyJob, or ExtractJob).
"""
collector = get_hook_lineage_collector()

if isinstance(job, QueryJob):
log.debug("Sending Hook Level Lineage for Query job.")
send_sql_hook_lineage(
context=context,
sql=job.query,
job_id=job.job_id,
default_db=job.default_dataset.project if job.default_dataset else None,
default_schema=job.default_dataset.dataset_id if job.default_dataset else None,
)
return

try:
if isinstance(job, LoadJob):
log.debug("Sending Hook Level Lineage for Load job.")
_add_gcs_uris_to_lineage(collector, context, job.source_uris, is_input=True)
if job.destination:
_add_bq_table_to_lineage(collector, context, job.destination, is_input=False)
elif isinstance(job, CopyJob):
log.debug("Sending Hook Level Lineage for Copy job.")
for source_table in job.sources or []:
_add_bq_table_to_lineage(collector, context, source_table, is_input=True)
if job.destination:
_add_bq_table_to_lineage(collector, context, job.destination, is_input=False)
elif isinstance(job, ExtractJob):
log.debug("Sending Hook Level Lineage for Extract job.")
if job.source:
_add_bq_table_to_lineage(collector, context, job.source, is_input=True)
_add_gcs_uris_to_lineage(collector, context, job.destination_uris, is_input=False)
except Exception as e:
log.warning("Sending BQ job hook level lineage failed: %s", f"{e.__class__.__name__}: {str(e)}")
log.debug("Exception details:", exc_info=True)
22 changes: 4 additions & 18 deletions providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -2068,23 +2068,13 @@ def test_get_df_by_chunks_hook_lineage(self, mock_get_pandas_df_by_chunks, mock_
assert call_kw["sql"] == sql
assert call_kw["sql_parameters"] == parameters

@mock.patch("airflow.providers.google.cloud.hooks.bigquery.send_sql_hook_lineage")
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.send_hook_lineage_for_bq_job")
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.QueryJob")
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_client")
def test_insert_job_hook_lineage(self, mock_client, mock_query_job, mock_send_lineage):
query_job_type = "query"
job_conf = {
query_job_type: {
query_job_type: "SELECT * FROM test",
"useLegacySql": "False",
}
}
mock_query_job._JOB_TYPE = query_job_type
mock_query_job.job_type = query_job_type
job_conf = {"query": {"query": "SELECT * FROM test", "useLegacySql": "False"}}
mock_query_job._JOB_TYPE = "query"
mock_job_instance = mock.MagicMock()
mock_job_instance.job_id = JOB_ID
mock_job_instance.query = "SELECT * FROM test"
mock_job_instance.job_type = query_job_type
mock_query_job.from_api_repr.return_value = mock_job_instance

self.hook.insert_job(
Expand All @@ -2095,8 +2085,4 @@ def test_insert_job_hook_lineage(self, mock_client, mock_query_job, mock_send_li
nowait=True,
)

mock_send_lineage.assert_called_once()
call_kw = mock_send_lineage.call_args.kwargs
assert call_kw["context"] is self.hook
assert call_kw["sql"] == "SELECT * FROM test"
assert call_kw["job_id"] == JOB_ID
mock_send_lineage.assert_called_once_with(context=self.hook, job=mock_job_instance)
Loading