Skip to content

Commit

Permalink
Fix behavior for reattach_state parameter in BigQueryInsertJobOperator (
Browse files Browse the repository at this point in the history
  • Loading branch information
VladaZakharova authored Jul 10, 2024
1 parent 165b910 commit c29eeb1
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 8 deletions.
19 changes: 13 additions & 6 deletions airflow/providers/google/cloud/operators/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -2955,6 +2955,7 @@ def execute(self, context: Any):

try:
self.log.info("Executing: %s'", self.configuration)
# Create a job
job: BigQueryJob | UnknownJob = self._submit_job(hook, self.job_id)
except Conflict:
# If the job already exists retrieve it
Expand All @@ -2963,18 +2964,24 @@ def execute(self, context: Any):
location=self.location,
job_id=self.job_id,
)
if job.state in self.reattach_states:
# We are reattaching to a job
job._begin()
self._handle_job_error(job)
else:
# Same job configuration so we need force_rerun

if job.state not in self.reattach_states:
# Same job configuration, so we need force_rerun
raise AirflowException(
f"Job with id: {self.job_id} already exists and is in {job.state} state. If you "
f"want to force rerun it consider setting `force_rerun=True`."
f"Or, if you want to reattach in this scenario add {job.state} to `reattach_states`"
)

else:
# Job already reached state DONE
if job.state == "DONE":
raise AirflowException("Job is already in state DONE. Can not reattach to this job.")

# We are reattaching to a job
self.log.info("Reattaching to existing Job in state %s", job.state)
self._handle_job_error(job)

job_types = {
LoadJob._JOB_TYPE: ["sourceTable", "destinationTable"],
CopyJob._JOB_TYPE: ["sourceTable", "destinationTable"],
Expand Down
39 changes: 37 additions & 2 deletions tests/providers/google/cloud/operators/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -1395,7 +1395,7 @@ def test_execute_reattach(self, mock_hook):
job = MagicMock(
job_id=real_job_id,
error_result=False,
state="PENDING",
state="RUNNING",
done=lambda: False,
)
mock_hook.return_value.get_job.return_value = job
Expand All @@ -1407,7 +1407,7 @@ def test_execute_reattach(self, mock_hook):
location=TEST_DATASET_LOCATION,
job_id=job_id,
project_id=TEST_GCP_PROJECT_ID,
reattach_states={"PENDING"},
reattach_states={"PENDING", "RUNNING"},
)
result = op.execute(context=MagicMock())

Expand All @@ -1424,6 +1424,41 @@ def test_execute_reattach(self, mock_hook):

assert result == real_job_id

@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
def test_execute_reattach_to_done_state(self, mock_hook):
job_id = "123456"
hash_ = "hash"
real_job_id = f"{job_id}_{hash_}"

configuration = {
"query": {
"query": "SELECT * FROM any",
"useLegacySql": False,
}
}

mock_hook.return_value.insert_job.side_effect = Conflict("any")
job = MagicMock(
job_id=real_job_id,
error_result=False,
state="DONE",
done=lambda: False,
)
mock_hook.return_value.get_job.return_value = job
mock_hook.return_value.generate_job_id.return_value = real_job_id

op = BigQueryInsertJobOperator(
task_id="insert_query_job",
configuration=configuration,
location=TEST_DATASET_LOCATION,
job_id=job_id,
project_id=TEST_GCP_PROJECT_ID,
reattach_states={"PENDING"},
)
with pytest.raises(AirflowException):
# Not possible to reattach to any state if job is already DONE
op.execute(context=MagicMock())

@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
def test_execute_force_rerun(self, mock_hook):
job_id = "123456"
Expand Down

0 comments on commit c29eeb1

Please sign in to comment.