Skip to content

Commit

Permalink
GCSToBigQueryOperator Resolve max_id_key job retrieval and xcom ret…
Browse files Browse the repository at this point in the history
…urn (#26285)

* Resolve `max_id_key` job retrieval and xcom return

* Fixing Flake8 Issue

* Fixing Unit Test failure

* Code fixes requested during review
  • Loading branch information
patricker authored Sep 18, 2022
1 parent b4f8a06 commit 07fe356
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 2 deletions.
4 changes: 3 additions & 1 deletion airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,10 @@ def execute(self, context: Context):
warnings.simplefilter("ignore", DeprecationWarning)
job_id = bq_hook.run_query(
sql=select_command,
location=self.location,
use_legacy_sql=False,
)
row = list(bq_hook.get_job(job_id).result())
row = list(bq_hook.get_job(job_id=job_id, location=self.location).result())
if row:
max_id = row[0] if row[0] else 0
self.log.info(
Expand All @@ -327,5 +328,6 @@ def execute(self, context: Context):
self.max_id_key,
max_id,
)
return max_id
else:
raise RuntimeError(f"The {select_command} returned no rows!")
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,13 @@ def test_execute_explicit_project(self, bq_hook):

bq_hook.return_value.get_job.return_value.result.return_value = ('1',)

operator.execute(None)
result = operator.execute(None)

assert result == '1'

bq_hook.return_value.run_query.assert_called_once_with(
sql="SELECT MAX(id) FROM `test-project.dataset.table`",
location=None,
use_legacy_sql=False,
)

Expand Down

0 comments on commit 07fe356

Please sign in to comment.