Skip to content

Commit

Permalink
fix BigQueryInsertJobOperator
Browse files Browse the repository at this point in the history
  • Loading branch information
raphaelauv committed Jun 3, 2022
1 parent e13b159 commit 881e2da
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 9 deletions.
19 changes: 11 additions & 8 deletions airflow/providers/google/cloud/operators/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -2167,14 +2167,17 @@ def execute(self, context: Any):
f"Or, if you want to reattach in this scenario add {job.state} to `reattach_states`"
)

table = job.to_api_repr()["configuration"]["query"]["destinationTable"]
BigQueryTableLink.persist(
context=context,
task_instance=self,
dataset_id=table["datasetId"],
project_id=table["projectId"],
table_id=table["tableId"],
)
if "query" in job.to_api_repr()["configuration"]:
if "destinationTable" in job.to_api_repr()["configuration"]["query"]:
table = job.to_api_repr()["configuration"]["query"]["destinationTable"]
BigQueryTableLink.persist(
context=context,
task_instance=self,
dataset_id=table["datasetId"],
project_id=table["projectId"],
table_id=table["tableId"],
)

self.job_id = job.job_id
return job.job_id

Expand Down
44 changes: 43 additions & 1 deletion tests/providers/google/cloud/operators/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,7 @@ def test_execute(self, mock_hook):
class TestBigQueryInsertJobOperator:
@mock.patch('airflow.providers.google.cloud.operators.bigquery.hashlib.md5')
@mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook')
def test_execute_success(self, mock_hook, mock_md5):
def test_execute_query_success(self, mock_hook, mock_md5):
job_id = "123456"
hash_ = "hash"
real_job_id = f"{job_id}_{hash_}"
Expand Down Expand Up @@ -822,6 +822,48 @@ def test_execute_success(self, mock_hook, mock_md5):

assert result == real_job_id

@mock.patch('airflow.providers.google.cloud.operators.bigquery.hashlib.md5')
@mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook')
def test_execute_copy_success(self, mock_hook, mock_md5):
job_id = "123456"
hash_ = "hash"
real_job_id = f"{job_id}_{hash_}"
mock_md5.return_value.hexdigest.return_value = hash_

configuration = {
"copy": {
"sourceTable": "aaa",
"destinationTable": "bbb",
}
}
mock_configuration = {
"configuration": configuration,
"jobReference": "a",
}
mock_hook.return_value.insert_job.return_value = MagicMock(job_id=real_job_id, error_result=False)

mock_hook.return_value.insert_job.return_value.to_api_repr.return_value = mock_configuration

op = BigQueryInsertJobOperator(
task_id="copy_query_job",
configuration=configuration,
location=TEST_DATASET_LOCATION,
job_id=job_id,
project_id=TEST_GCP_PROJECT_ID,
)
result = op.execute(context=MagicMock())

mock_hook.return_value.insert_job.assert_called_once_with(
configuration=configuration,
location=TEST_DATASET_LOCATION,
job_id=real_job_id,
project_id=TEST_GCP_PROJECT_ID,
retry=DEFAULT_RETRY,
timeout=None,
)

assert result == real_job_id

@mock.patch('airflow.providers.google.cloud.operators.bigquery.hashlib.md5')
@mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook')
def test_on_kill(self, mock_hook, mock_md5):
Expand Down

0 comments on commit 881e2da

Please sign in to comment.