Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 30 additions & 8 deletions airflow/providers/google/cloud/operators/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.utils.context import Context

BIGQUERY_JOB_DETAILS_LINK_FMT = "https://console.cloud.google.com/bigquery?j={job_id}"
BIGQUERY_JOB_DETAILS_LINK_FMT = "https://console.cloud.google.com/bigquery?j={project_id}:{location}:{job_id}"


class BigQueryUIColors(enum.Enum):
Expand Down Expand Up @@ -90,8 +90,17 @@ def get_link(
*,
ti_key: TaskInstanceKey,
):
job_id = XCom.get_value(key="job_id", ti_key=ti_key)
return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id) if job_id else ""
job_id_params = XCom.get_value(key="job_id_params", ti_key=ti_key)

return (
BIGQUERY_JOB_DETAILS_LINK_FMT.format(
job_id=job_id_params["job_id"],
project_id=job_id_params["project_id"],
location=job_id_params["location"],
)
if job_id_params
else ""
)


@attr.s(auto_attribs=True)
Expand All @@ -110,13 +119,16 @@ def get_link(
*,
ti_key: TaskInstanceKey,
):
job_ids = XCom.get_value(key="job_id", ti_key=ti_key)
job_ids_params = XCom.get_value(key="job_id_params", ti_key=ti_key)
job_ids = job_ids_params["job_id"]
if not job_ids:
return None
if len(job_ids) < self.index:
return None
job_id = job_ids[self.index]
return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id)
return BIGQUERY_JOB_DETAILS_LINK_FMT.format(
job_id=job_id, project_id=job_ids_params["project_id"], location=job_ids_params["location"]
)


class _BigQueryDbHookMixin:
Expand Down Expand Up @@ -1184,7 +1196,13 @@ def execute(self, context: Context):
]
else:
raise AirflowException(f"argument 'sql' of type {type(str)} is neither a string nor an iterable")
context["task_instance"].xcom_push(key="job_id", value=job_id)
job_id_params = {
"job_id": job_id,
"project_id": self.hook.project_id,
"location": self.location if self.location else "US",
}
context["task_instance"].xcom_push(key="job_id_params", value=job_id_params)
return job_id

def on_kill(self) -> None:
super().on_kill()
Expand Down Expand Up @@ -2727,9 +2745,13 @@ def execute(self, context: Any):
persist_kwargs["dataset_id"] = table["datasetId"]
persist_kwargs["project_id"] = table["projectId"]
BigQueryTableLink.persist(**persist_kwargs)

self.job_id = job.job_id
context["ti"].xcom_push(key="job_id", value=self.job_id)
job_id_params = {
"job_id": job_id,
"project_id": self.project_id or self.hook.project_id,
"location": self.location if self.location else "US",
}
context["ti"].xcom_push(key="job_id_params", value=job_id_params)
# Wait for the job to complete
if not self.deferrable:
job.result(timeout=self.result_timeout, retry=self.result_retry)
Expand Down
69 changes: 47 additions & 22 deletions tests/providers/google/cloud/operators/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@
}
TEST_TABLE = "test-table"
GCP_CONN_ID = "google_cloud_default"
TEST_JOB_ID_1 = "test-job-id"
TEST_JOB_ID_2 = "test-123"
TEST_FULL_JOB_ID = f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_LOCATION}:{TEST_JOB_ID_1}"
TEST_FULL_JOB_ID_2 = f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_LOCATION}:{TEST_JOB_ID_2}"


class TestBigQueryCreateEmptyTableOperator:
Expand Down Expand Up @@ -672,11 +676,15 @@ def test_bigquery_operator_extra_serialized_field_when_single_query(

# Check DeSerialized version of operator link
assert isinstance(list(simple_task.operator_extra_links)[0], BigQueryConsoleLink)

ti.xcom_push("job_id", 12345)
test_job_id_params = {
"job_id": TEST_JOB_ID_1,
"project_id": TEST_GCP_PROJECT_ID,
"location": TEST_DATASET_LOCATION,
}
ti.xcom_push("job_id_params", test_job_id_params)

url = simple_task.get_extra_links(ti, BigQueryConsoleLink.name)
assert url == "https://console.cloud.google.com/bigquery?j=12345"
assert url == f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"

@pytest.mark.need_serialized_dag
def test_bigquery_operator_extra_serialized_field_when_multiple_queries(
Expand Down Expand Up @@ -711,17 +719,23 @@ def test_bigquery_operator_extra_serialized_field_when_multiple_queries(
# Check DeSerialized version of operator link
assert isinstance(list(simple_task.operator_extra_links)[0], BigQueryConsoleIndexableLink)

job_id = ["123", "45"]
ti.xcom_push(key="job_id", value=job_id)
test_job_id_params = {
"job_id": [TEST_JOB_ID_1, TEST_JOB_ID_2],
"project_id": TEST_GCP_PROJECT_ID,
"location": TEST_DATASET_LOCATION,
}
ti.xcom_push(key="job_id_params", value=test_job_id_params)

assert {"BigQuery Console #1", "BigQuery Console #2"} == simple_task.operator_extra_link_dict.keys()

assert "https://console.cloud.google.com/bigquery?j=123" == simple_task.get_extra_links(
ti, "BigQuery Console #1"
assert (
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"
== simple_task.get_extra_links(ti, "BigQuery Console #1")
)

assert "https://console.cloud.google.com/bigquery?j=45" == simple_task.get_extra_links(
ti, "BigQuery Console #2"
assert (
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID_2}"
== simple_task.get_extra_links(ti, "BigQuery Console #2")
)

@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
Expand All @@ -740,7 +754,9 @@ def test_bigquery_operator_extra_link_when_missing_job_id(

@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
def test_bigquery_operator_extra_link_when_single_query(
self, mock_hook, create_task_instance_of_operator
self,
mock_hook,
create_task_instance_of_operator,
):
ti = create_task_instance_of_operator(
BigQueryExecuteQueryOperator,
Expand All @@ -751,11 +767,15 @@ def test_bigquery_operator_extra_link_when_single_query(
)
bigquery_task = ti.task

job_id = "12345"
ti.xcom_push(key="job_id", value=job_id)

assert f"https://console.cloud.google.com/bigquery?j={job_id}" == bigquery_task.get_extra_links(
ti, BigQueryConsoleLink.name
test_job_id_params = {
"job_id": TEST_JOB_ID_1,
"project_id": TEST_GCP_PROJECT_ID,
"location": TEST_DATASET_LOCATION,
}
ti.xcom_push(key="job_id_params", value=test_job_id_params)
assert (
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"
== bigquery_task.get_extra_links(ti, BigQueryConsoleLink.name)
)

@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
Expand All @@ -771,17 +791,22 @@ def test_bigquery_operator_extra_link_when_multiple_query(
)
bigquery_task = ti.task

job_id = ["123", "45"]
ti.xcom_push(key="job_id", value=job_id)

test_job_id_params = {
"job_id": [TEST_JOB_ID_1, TEST_JOB_ID_2],
"project_id": TEST_GCP_PROJECT_ID,
"location": TEST_DATASET_LOCATION,
}
ti.xcom_push(key="job_id_params", value=test_job_id_params)
assert {"BigQuery Console #1", "BigQuery Console #2"} == bigquery_task.operator_extra_link_dict.keys()

assert "https://console.cloud.google.com/bigquery?j=123" == bigquery_task.get_extra_links(
ti, "BigQuery Console #1"
assert (
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"
== bigquery_task.get_extra_links(ti, "BigQuery Console #1")
)

assert "https://console.cloud.google.com/bigquery?j=45" == bigquery_task.get_extra_links(
ti, "BigQuery Console #2"
assert (
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID_2}"
== bigquery_task.get_extra_links(ti, "BigQuery Console #2")
)


Expand Down