diff --git a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py index 6a4345bfe2ed8..a0083be4305ce 100644 --- a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py +++ b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py @@ -20,7 +20,6 @@ from __future__ import annotations -import asyncio import json import logging import re @@ -1975,46 +1974,11 @@ async def get_job_instance( async def _get_job( self, job_id: str | None, project_id: str = PROVIDE_PROJECT_ID, location: str | None = None ) -> BigQueryJob | UnknownJob: - """ - Get BigQuery job by its ID, project ID and location. - - WARNING. - This is a temporary workaround for issues below, and it's not intended to be used elsewhere! - https://github.com/apache/airflow/issues/35833 - https://github.com/talkiq/gcloud-aio/issues/584 - - This method was developed, because neither the `google-cloud-bigquery` nor the `gcloud-aio-bigquery` - provides asynchronous access to a BigQuery jobs with location parameter. That's why this method wraps - synchronous client call with the event loop's run_in_executor() method. - - This workaround must be deleted along with the method _get_job_sync() and replaced by more robust and - cleaner solution in one of two cases: - 1. The `google-cloud-bigquery` library provides async client with get_job method, that supports - optional parameter `location` - 2. The `gcloud-aio-bigquery` library supports the `location` parameter in get_job() method. - """ - loop = asyncio.get_event_loop() - job = await loop.run_in_executor(None, self._get_job_sync, job_id, project_id, location) + """Get BigQuery job by its ID, project ID and location.""" + sync_hook = await self.get_sync_hook() + job = sync_hook.get_job(job_id=job_id, project_id=project_id, location=location) return job - def _get_job_sync(self, job_id, project_id, location): - """ - Get BigQuery job by its ID, project ID and location synchronously. - - WARNING - This is a temporary workaround for issues below, and it's not intended to be used elsewhere! - https://github.com/apache/airflow/issues/35833 - https://github.com/talkiq/gcloud-aio/issues/584 - - This workaround must be deleted along with the method _get_job() and replaced by more robust and - cleaner solution in one of two cases: - 1. The `google-cloud-bigquery` library provides async client with get_job method, that supports - optional parameter `location` - 2. The `gcloud-aio-bigquery` library supports the `location` parameter in get_job() method. - """ - hook = BigQueryHook(**self._hook_kwargs) - return hook.get_job(job_id=job_id, project_id=project_id, location=location) - async def get_job_status( self, job_id: str | None, project_id: str = PROVIDE_PROJECT_ID, location: str | None = None ) -> dict[str, str]: diff --git a/providers/google/src/airflow/providers/google/cloud/operators/cloud_run.py b/providers/google/src/airflow/providers/google/cloud/operators/cloud_run.py index b0da1debc3667..c60d638a2d78a 100644 --- a/providers/google/src/airflow/providers/google/cloud/operators/cloud_run.py +++ b/providers/google/src/airflow/providers/google/cloud/operators/cloud_run.py @@ -441,9 +441,10 @@ def execute(self, context: Context): self.service_name, self.region, ) - return hook.get_service( + service = hook.get_service( service_name=self.service_name, region=self.region, project_id=self.project_id ) + return Service.to_dict(service) except google.cloud.exceptions.GoogleCloudError as e: self.log.error("An error occurred. Exiting.") raise e diff --git a/providers/google/src/airflow/providers/google/cloud/triggers/bigquery.py b/providers/google/src/airflow/providers/google/cloud/triggers/bigquery.py index 915f1124b3e36..a38e4268041dc 100644 --- a/providers/google/src/airflow/providers/google/cloud/triggers/bigquery.py +++ b/providers/google/src/airflow/providers/google/cloud/triggers/bigquery.py @@ -167,6 +167,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: job_id=self.job_id, project_id=self.project_id, location=self.location ) if job_status["status"] == "success": + self.log.info("BigQuery Job succeeded") yield TriggerEvent( { "job_id": self.job_id, @@ -176,7 +177,13 @@ async def run(self) -> AsyncIterator[TriggerEvent]: ) return elif job_status["status"] == "error": - yield TriggerEvent(job_status) + self.log.info("BigQuery Job failed: %s", job_status) + yield TriggerEvent( + { + "status": job_status["status"], + "message": job_status["message"], + } + ) return else: self.log.info( @@ -334,7 +341,12 @@ async def run(self) -> AsyncIterator[TriggerEvent]: ) return elif job_status["status"] == "error": - yield TriggerEvent(job_status) + yield TriggerEvent( + { + "status": job_status["status"], + "message": job_status["message"], + } + ) return else: self.log.info( @@ -773,7 +785,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: return job_id = None elif job_status["status"] == "error": - yield TriggerEvent(job_status) + yield TriggerEvent({"status": job_status["status"]}) return self.log.info("Sleeping for %s seconds.", self.poll_interval) await asyncio.sleep(self.poll_interval) diff --git a/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py b/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py index d9adb0bb1ee94..cafa403d2903c 100644 --- a/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py +++ b/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py @@ -134,7 +134,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: yield TriggerEvent( { - "status": RunJobStatus.TIMEOUT, + "status": RunJobStatus.TIMEOUT.value, "job_name": self.job_name, } ) diff --git a/providers/google/tests/unit/google/cloud/operators/test_cloud_run.py b/providers/google/tests/unit/google/cloud/operators/test_cloud_run.py index 25496119f4434..8ca00c48091ce 100644 --- a/providers/google/tests/unit/google/cloud/operators/test_cloud_run.py +++ b/providers/google/tests/unit/google/cloud/operators/test_cloud_run.py @@ -46,7 +46,6 @@ PROJECT_ID = "testproject" REGION = "us-central1" JOB_NAME = "jobname" -SERVICE = Service() SERVICE_NAME = "servicename" OVERRIDES = { "container_overrides": [{"args": ["python", "main.py"]}], @@ -448,7 +447,7 @@ def test_execute_already_exists(self, hook_mock): service_name=SERVICE_NAME, ) - result = operator.execute(context=mock.MagicMock()) + operator.execute(context=mock.MagicMock()) hook_mock.return_value.create_service.assert_called_once_with( service=SERVICE, @@ -462,8 +461,6 @@ def test_execute_already_exists(self, hook_mock): project_id=PROJECT_ID, ) - assert result == SERVICE - @mock.patch(CLOUD_RUN_SERVICE_HOOK_PATH) def test_execute_when_other_error(self, hook_mock): error_message = "An error occurred. Exiting." diff --git a/providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py b/providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py index d64c4cee106bf..7a526d590c265 100644 --- a/providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py +++ b/providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py @@ -152,7 +152,7 @@ async def _mock_operation(name): assert ( TriggerEvent( { - "status": RunJobStatus.TIMEOUT, + "status": RunJobStatus.TIMEOUT.value, "job_name": JOB_NAME, } )