Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bigquery retry #3238

Merged
merged 20 commits into from
May 3, 2023
Merged
6 changes: 3 additions & 3 deletions luigi/contrib/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def is_error_5xx(err):
wait=wait_exponential(multiplier=1, min=1, max=10),
stop=stop_after_attempt(3),
reraise=True,
after=lambda x: x.args[0].__initialise_client()
after=lambda x: x.args[0].initialise_client()
)


Expand Down Expand Up @@ -152,9 +152,9 @@ def __init__(self, oauth_credentials=None, descriptor='', http_=None):
self.descriptor = descriptor
self.http_ = http_

self.__initialise_client()
self.initialise_client()

def __initialise_client(self):
def initialise_client(self):
sonjaer marked this conversation as resolved.
Show resolved Hide resolved
authenticate_kwargs = gcp.get_authenticate_kwargs(self.oauth_credentials, self.http_)

if self.descriptor:
Expand Down
29 changes: 29 additions & 0 deletions test/contrib/bigquery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,13 @@

import mock
import pytest
from mock.mock import MagicMock

from luigi.contrib import bigquery
try:
from googleapiclient import errors
except ImportError:
raise unittest.SkipTest('Unable to load googleapiclient module')
sonjaer marked this conversation as resolved.
Show resolved Hide resolved
from luigi.contrib.bigquery import BigQueryLoadTask, BigQueryTarget, BQDataset, \
BigQueryRunQueryTask, BigQueryExtractTask
from luigi.contrib.gcs import GCSTarget
Expand Down Expand Up @@ -147,3 +153,26 @@ def output(self):
}
}
run_job.assert_called_with('proj', expected_body, dataset=BQDataset('proj', 'ds', None))


class BigQueryClientTest(unittest.TestCase):

def test_retry_succeeds_on_second_attempt(self):
client = MagicMock(spec=bigquery.BigQueryClient)
sonjaer marked this conversation as resolved.
Show resolved Hide resolved
sonjaer marked this conversation as resolved.
Show resolved Hide resolved
attempts = [0]
sonjaer marked this conversation as resolved.
Show resolved Hide resolved

@bigquery.bq_retry
def fail_once(bq_client):
attempts[0] += 1
if attempts[0] == 1:
raise errors.HttpError(
resp=MagicMock(status=500),
content=b'{"error": {"message": "stub"}',
)
else:
return MagicMock(status=200)

response = fail_once(client)

self.assertEqual(attempts[0], 2)
self.assertEqual(response.status, 200)