diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py index 2fd6aacf5c618..6fb94f21d7496 100644 --- a/airflow/providers/google/cloud/operators/bigquery.py +++ b/airflow/providers/google/cloud/operators/bigquery.py @@ -58,7 +58,7 @@ from airflow.models.taskinstancekey import TaskInstanceKey from airflow.utils.context import Context -BIGQUERY_JOB_DETAILS_LINK_FMT = "https://console.cloud.google.com/bigquery?j={job_id}" +BIGQUERY_JOB_DETAILS_LINK_FMT = "https://console.cloud.google.com/bigquery?j={project_id}:{location}:{job_id}" class BigQueryUIColors(enum.Enum): @@ -90,8 +90,17 @@ def get_link( *, ti_key: TaskInstanceKey, ): - job_id = XCom.get_value(key="job_id", ti_key=ti_key) - return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id) if job_id else "" + job_id_params = XCom.get_value(key="job_id_params", ti_key=ti_key) + + return ( + BIGQUERY_JOB_DETAILS_LINK_FMT.format( + job_id=job_id_params["job_id"], + project_id=job_id_params["project_id"], + location=job_id_params["location"], + ) + if job_id_params + else "" + ) @attr.s(auto_attribs=True) @@ -110,13 +119,16 @@ def get_link( *, ti_key: TaskInstanceKey, ): - job_ids = XCom.get_value(key="job_id", ti_key=ti_key) + job_ids_params = XCom.get_value(key="job_id_params", ti_key=ti_key) + job_ids = job_ids_params["job_id"] if not job_ids: return None if len(job_ids) < self.index: return None job_id = job_ids[self.index] - return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id) + return BIGQUERY_JOB_DETAILS_LINK_FMT.format( + job_id=job_id, project_id=job_ids_params["project_id"], location=job_ids_params["location"] + ) class _BigQueryDbHookMixin: @@ -1184,7 +1196,13 @@ def execute(self, context: Context): ] else: raise AirflowException(f"argument 'sql' of type {type(str)} is neither a string nor an iterable") - context["task_instance"].xcom_push(key="job_id", value=job_id) + job_id_params = { + "job_id": job_id, + "project_id": self.hook.project_id, + "location": self.location if self.location else "US", + } + context["task_instance"].xcom_push(key="job_id_params", value=job_id_params) + return job_id def on_kill(self) -> None: super().on_kill() @@ -2727,9 +2745,13 @@ def execute(self, context: Any): persist_kwargs["dataset_id"] = table["datasetId"] persist_kwargs["project_id"] = table["projectId"] BigQueryTableLink.persist(**persist_kwargs) - self.job_id = job.job_id - context["ti"].xcom_push(key="job_id", value=self.job_id) + job_id_params = { + "job_id": job_id, + "project_id": self.project_id or self.hook.project_id, + "location": self.location if self.location else "US", + } + context["ti"].xcom_push(key="job_id_params", value=job_id_params) # Wait for the job to complete if not self.deferrable: job.result(timeout=self.result_timeout, retry=self.result_retry) diff --git a/tests/providers/google/cloud/operators/test_bigquery.py b/tests/providers/google/cloud/operators/test_bigquery.py index 25b341a4c38a5..367ef99cbbe1b 100644 --- a/tests/providers/google/cloud/operators/test_bigquery.py +++ b/tests/providers/google/cloud/operators/test_bigquery.py @@ -83,6 +83,10 @@ } TEST_TABLE = "test-table" GCP_CONN_ID = "google_cloud_default" +TEST_JOB_ID_1 = "test-job-id" +TEST_JOB_ID_2 = "test-123" +TEST_FULL_JOB_ID = f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_LOCATION}:{TEST_JOB_ID_1}" +TEST_FULL_JOB_ID_2 = f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_LOCATION}:{TEST_JOB_ID_2}" class TestBigQueryCreateEmptyTableOperator: @@ -672,11 +676,15 @@ def test_bigquery_operator_extra_serialized_field_when_single_query( # Check DeSerialized version of operator link assert isinstance(list(simple_task.operator_extra_links)[0], BigQueryConsoleLink) - - ti.xcom_push("job_id", 12345) + test_job_id_params = { + "job_id": TEST_JOB_ID_1, + "project_id": TEST_GCP_PROJECT_ID, + "location": TEST_DATASET_LOCATION, + } + ti.xcom_push("job_id_params", test_job_id_params) url = simple_task.get_extra_links(ti, BigQueryConsoleLink.name) - assert url == "https://console.cloud.google.com/bigquery?j=12345" + assert url == f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}" @pytest.mark.need_serialized_dag def test_bigquery_operator_extra_serialized_field_when_multiple_queries( @@ -711,17 +719,23 @@ def test_bigquery_operator_extra_serialized_field_when_multiple_queries( # Check DeSerialized version of operator link assert isinstance(list(simple_task.operator_extra_links)[0], BigQueryConsoleIndexableLink) - job_id = ["123", "45"] - ti.xcom_push(key="job_id", value=job_id) + test_job_id_params = { + "job_id": [TEST_JOB_ID_1, TEST_JOB_ID_2], + "project_id": TEST_GCP_PROJECT_ID, + "location": TEST_DATASET_LOCATION, + } + ti.xcom_push(key="job_id_params", value=test_job_id_params) assert {"BigQuery Console #1", "BigQuery Console #2"} == simple_task.operator_extra_link_dict.keys() - assert "https://console.cloud.google.com/bigquery?j=123" == simple_task.get_extra_links( - ti, "BigQuery Console #1" + assert ( + f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}" + == simple_task.get_extra_links(ti, "BigQuery Console #1") ) - assert "https://console.cloud.google.com/bigquery?j=45" == simple_task.get_extra_links( - ti, "BigQuery Console #2" + assert ( + f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID_2}" + == simple_task.get_extra_links(ti, "BigQuery Console #2") ) @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook") @@ -740,7 +754,9 @@ def test_bigquery_operator_extra_link_when_missing_job_id( @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook") def test_bigquery_operator_extra_link_when_single_query( - self, mock_hook, create_task_instance_of_operator + self, + mock_hook, + create_task_instance_of_operator, ): ti = create_task_instance_of_operator( BigQueryExecuteQueryOperator, @@ -751,11 +767,15 @@ def test_bigquery_operator_extra_link_when_single_query( ) bigquery_task = ti.task - job_id = "12345" - ti.xcom_push(key="job_id", value=job_id) - - assert f"https://console.cloud.google.com/bigquery?j={job_id}" == bigquery_task.get_extra_links( - ti, BigQueryConsoleLink.name + test_job_id_params = { + "job_id": TEST_JOB_ID_1, + "project_id": TEST_GCP_PROJECT_ID, + "location": TEST_DATASET_LOCATION, + } + ti.xcom_push(key="job_id_params", value=test_job_id_params) + assert ( + f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}" + == bigquery_task.get_extra_links(ti, BigQueryConsoleLink.name) ) @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook") @@ -771,17 +791,22 @@ def test_bigquery_operator_extra_link_when_multiple_query( ) bigquery_task = ti.task - job_id = ["123", "45"] - ti.xcom_push(key="job_id", value=job_id) - + test_job_id_params = { + "job_id": [TEST_JOB_ID_1, TEST_JOB_ID_2], + "project_id": TEST_GCP_PROJECT_ID, + "location": TEST_DATASET_LOCATION, + } + ti.xcom_push(key="job_id_params", value=test_job_id_params) assert {"BigQuery Console #1", "BigQuery Console #2"} == bigquery_task.operator_extra_link_dict.keys() - assert "https://console.cloud.google.com/bigquery?j=123" == bigquery_task.get_extra_links( - ti, "BigQuery Console #1" + assert ( + f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}" + == bigquery_task.get_extra_links(ti, "BigQuery Console #1") ) - assert "https://console.cloud.google.com/bigquery?j=45" == bigquery_task.get_extra_links( - ti, "BigQuery Console #2" + assert ( + f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID_2}" + == bigquery_task.get_extra_links(ti, "BigQuery Console #2") )