diff --git a/providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py b/providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py index 757de75352876..dd494e855aef2 100644 --- a/providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py +++ b/providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py @@ -133,7 +133,7 @@ def execute_complete(self, context: Context, event: Any = None) -> None: def on_kill(self): """Cancel the job if task is cancelled.""" - hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_type=self.api_type) + hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version) if self.job_id: self.log.info("on_kill: cancel the airbyte Job %s", self.job_id) hook.cancel_job(self.job_id) diff --git a/providers/airbyte/tests/unit/airbyte/operators/test_airbyte.py b/providers/airbyte/tests/unit/airbyte/operators/test_airbyte.py index ffbddcd7e5bf8..f13cf2fa30c68 100644 --- a/providers/airbyte/tests/unit/airbyte/operators/test_airbyte.py +++ b/providers/airbyte/tests/unit/airbyte/operators/test_airbyte.py @@ -69,3 +69,20 @@ def test_execute(self, mock_wait_for_job, mock_submit_sync_connection): mock_wait_for_job.assert_called_once_with( job_id=self.job_id, wait_seconds=self.wait_seconds, timeout=self.timeout ) + + @mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.cancel_job") + def test_on_kill(self, mock_cancel_job): + conn = Connection(conn_id=self.airbyte_conn_id, conn_type="airbyte", host="airbyte.com") + db.merge_conn(conn) + + op = AirbyteTriggerSyncOperator( + task_id="test_Airbyte_op", + airbyte_conn_id=self.airbyte_conn_id, + connection_id=self.connection_id, + wait_seconds=self.wait_seconds, + timeout=self.timeout, + ) + op.job_id = self.job_id + op.on_kill() + + mock_cancel_job.assert_called_once_with(self.job_id)