Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

from __future__ import annotations

import asyncio
import json
import logging
import re
Expand Down Expand Up @@ -1975,46 +1974,11 @@ async def get_job_instance(
async def _get_job(
self, job_id: str | None, project_id: str = PROVIDE_PROJECT_ID, location: str | None = None
) -> BigQueryJob | UnknownJob:
"""
Get BigQuery job by its ID, project ID and location.

WARNING.
This is a temporary workaround for issues below, and it's not intended to be used elsewhere!
https://github.com/apache/airflow/issues/35833
https://github.com/talkiq/gcloud-aio/issues/584

This method was developed, because neither the `google-cloud-bigquery` nor the `gcloud-aio-bigquery`
provides asynchronous access to a BigQuery jobs with location parameter. That's why this method wraps
synchronous client call with the event loop's run_in_executor() method.

This workaround must be deleted along with the method _get_job_sync() and replaced by more robust and
cleaner solution in one of two cases:
1. The `google-cloud-bigquery` library provides async client with get_job method, that supports
optional parameter `location`
2. The `gcloud-aio-bigquery` library supports the `location` parameter in get_job() method.
"""
loop = asyncio.get_event_loop()
job = await loop.run_in_executor(None, self._get_job_sync, job_id, project_id, location)
"""Get BigQuery job by its ID, project ID and location."""
sync_hook = await self.get_sync_hook()
job = sync_hook.get_job(job_id=job_id, project_id=project_id, location=location)
return job

def _get_job_sync(self, job_id, project_id, location):
"""
Get BigQuery job by its ID, project ID and location synchronously.

WARNING
This is a temporary workaround for issues below, and it's not intended to be used elsewhere!
https://github.com/apache/airflow/issues/35833
https://github.com/talkiq/gcloud-aio/issues/584

This workaround must be deleted along with the method _get_job() and replaced by more robust and
cleaner solution in one of two cases:
1. The `google-cloud-bigquery` library provides async client with get_job method, that supports
optional parameter `location`
2. The `gcloud-aio-bigquery` library supports the `location` parameter in get_job() method.
"""
hook = BigQueryHook(**self._hook_kwargs)
return hook.get_job(job_id=job_id, project_id=project_id, location=location)

async def get_job_status(
self, job_id: str | None, project_id: str = PROVIDE_PROJECT_ID, location: str | None = None
) -> dict[str, str]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,9 +441,10 @@ def execute(self, context: Context):
self.service_name,
self.region,
)
return hook.get_service(
service = hook.get_service(
service_name=self.service_name, region=self.region, project_id=self.project_id
)
return Service.to_dict(service)
except google.cloud.exceptions.GoogleCloudError as e:
self.log.error("An error occurred. Exiting.")
raise e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
job_id=self.job_id, project_id=self.project_id, location=self.location
)
if job_status["status"] == "success":
self.log.info("BigQuery Job succeeded")
yield TriggerEvent(
{
"job_id": self.job_id,
Expand All @@ -176,7 +177,13 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
)
return
elif job_status["status"] == "error":
yield TriggerEvent(job_status)
self.log.info("BigQuery Job failed: %s", job_status)
yield TriggerEvent(
{
"status": job_status["status"],
"message": job_status["message"],
}
)
return
else:
self.log.info(
Expand Down Expand Up @@ -334,7 +341,12 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
)
return
elif job_status["status"] == "error":
yield TriggerEvent(job_status)
yield TriggerEvent(
{
"status": job_status["status"],
"message": job_status["message"],
}
)
return
else:
self.log.info(
Expand Down Expand Up @@ -773,7 +785,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
return
job_id = None
elif job_status["status"] == "error":
yield TriggerEvent(job_status)
yield TriggerEvent({"status": job_status["status"]})
return
self.log.info("Sleeping for %s seconds.", self.poll_interval)
await asyncio.sleep(self.poll_interval)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]:

yield TriggerEvent(
{
"status": RunJobStatus.TIMEOUT,
"status": RunJobStatus.TIMEOUT.value,
"job_name": self.job_name,
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
PROJECT_ID = "testproject"
REGION = "us-central1"
JOB_NAME = "jobname"
SERVICE = Service()
SERVICE_NAME = "servicename"
OVERRIDES = {
"container_overrides": [{"args": ["python", "main.py"]}],
Expand Down Expand Up @@ -448,7 +447,7 @@ def test_execute_already_exists(self, hook_mock):
service_name=SERVICE_NAME,
)

result = operator.execute(context=mock.MagicMock())
operator.execute(context=mock.MagicMock())

hook_mock.return_value.create_service.assert_called_once_with(
service=SERVICE,
Expand All @@ -462,8 +461,6 @@ def test_execute_already_exists(self, hook_mock):
project_id=PROJECT_ID,
)

assert result == SERVICE

@mock.patch(CLOUD_RUN_SERVICE_HOOK_PATH)
def test_execute_when_other_error(self, hook_mock):
error_message = "An error occurred. Exiting."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ async def _mock_operation(name):
assert (
TriggerEvent(
{
"status": RunJobStatus.TIMEOUT,
"status": RunJobStatus.TIMEOUT.value,
"job_name": JOB_NAME,
}
)
Expand Down
Loading