diff --git a/providers/airbyte/src/airflow/providers/airbyte/hooks/airbyte.py b/providers/airbyte/src/airflow/providers/airbyte/hooks/airbyte.py index 5c30b1ddef32f..7708a43f98bb5 100644 --- a/providers/airbyte/src/airflow/providers/airbyte/hooks/airbyte.py +++ b/providers/airbyte/src/airflow/providers/airbyte/hooks/airbyte.py @@ -59,6 +59,13 @@ def __init__( def get_conn_params(self, conn_id: str) -> Any: conn = self.get_connection(conn_id) + # Intentionally left the password out, you can modify the log to print it out if you are doing testing. + self.log.debug( + "Connection attributes are: host - %s, url - %s, description - %s", + conn.host, + conn.schema, + conn.description, + ) conn_params: dict = {} conn_params["host"] = conn.host conn_params["client_id"] = conn.login @@ -78,6 +85,7 @@ def create_api_session(self) -> AirbyteAPI: client = None if self.conn["proxies"]: + self.log.debug("Creating client proxy...") client = Session() client.proxies = self.conn["proxies"] @@ -111,6 +119,7 @@ def get_job_details(self, job_id: int) -> Any: job_id=job_id, ) ) + self.log.debug("Job details are: %s", get_job_res.job_response) return get_job_res.job_response except Exception as e: raise AirflowException(e) @@ -138,12 +147,14 @@ def wait_for_job(self, job_id: str | int, wait_seconds: float = 3, timeout: floa start = time.monotonic() while True: if timeout and start + timeout < time.monotonic(): + self.log.debug("Canceling job...") self.cancel_job(job_id=(int(job_id))) raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s") time.sleep(wait_seconds) try: job = self.get_job_details(job_id=(int(job_id))) state = job.status + self.log.debug("Job State: %s. Job Details: %s", state, job) except AirflowException as err: self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err) @@ -161,12 +172,14 @@ def wait_for_job(self, job_id: str | int, wait_seconds: float = 3, timeout: floa def submit_sync_connection(self, connection_id: str) -> Any: try: + self.log.debug("Creating job request..") res = self.airbyte_api.jobs.create_job( request=JobCreateRequest( connection_id=connection_id, job_type=JobTypeEnum.SYNC, ) ) + self.log.debug("Job request successful, response: %s", res.job_response) return res.job_response except Exception as e: raise AirflowException(e) @@ -191,6 +204,7 @@ def test_connection(self): """Tests the Airbyte connection by hitting the health API.""" try: health_check = self.airbyte_api.health.get_health_check() + self.log.debug("Health check details: %s", health_check) if health_check.status_code == 200: return True, "Connection successfully tested" return False, str(health_check.raw_response) diff --git a/providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py b/providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py index c2091d61cb1a0..dc9dd346fdd9e 100644 --- a/providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py +++ b/providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py @@ -93,8 +93,10 @@ def execute(self, context: Context) -> None: return self.job_id if not self.deferrable: + self.log.debug("Running in non-deferrable mode...") hook.wait_for_job(job_id=self.job_id, wait_seconds=self.wait_seconds, timeout=self.timeout) else: + self.log.debug("Running in defferable mode in job state %s...", state) if state in (JobStatusEnum.RUNNING, JobStatusEnum.PENDING, JobStatusEnum.INCOMPLETE): self.defer( timeout=self.execution_timeout, @@ -126,6 +128,7 @@ def execute_complete(self, context: Context, event: Any = None) -> None: successful. """ if event["status"] == "error": + self.log.debug("Error occurred with context: %s", context) raise AirflowException(event["message"]) self.log.info("%s completed successfully.", self.task_id) @@ -134,6 +137,11 @@ def execute_complete(self, context: Context, event: Any = None) -> None: def on_kill(self): """Cancel the job if task is cancelled.""" hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version) + self.log.debug( + "Job status for job_id %s prior to canceling is: %s", + self.job_id, + hook.get_job_status(self.job_id), + ) if self.job_id: self.log.info("on_kill: cancel the airbyte Job %s", self.job_id) hook.cancel_job(self.job_id) diff --git a/providers/airbyte/src/airflow/providers/airbyte/sensors/airbyte.py b/providers/airbyte/src/airflow/providers/airbyte/sensors/airbyte.py index e8a7d1e2e4bf8..8e802791271be 100644 --- a/providers/airbyte/src/airflow/providers/airbyte/sensors/airbyte.py +++ b/providers/airbyte/src/airflow/providers/airbyte/sensors/airbyte.py @@ -79,6 +79,7 @@ def poke(self, context: Context) -> bool: if status == JobStatusEnum.FAILED: message = f"Job failed: \n{job}" + self.log.debug("Failed with context: %s", context) raise AirflowException(message) if status == JobStatusEnum.CANCELLED: message = f"Job was cancelled: \n{job}" @@ -117,6 +118,7 @@ def execute(self, context: Context) -> Any: self.log.info("%s completed successfully.", self.task_id) return elif state == JobStatusEnum.FAILED: + self.log.debug("Failed with context: %s", context) raise AirflowException(f"Job failed:\n{job}") elif state == JobStatusEnum.CANCELLED: raise AirflowException(f"Job was cancelled:\n{job}") @@ -133,6 +135,7 @@ def execute_complete(self, context: Context, event: Any = None) -> None: successful. """ if event["status"] == "error": + self.log.debug("An error occurred with context: %s", context) raise AirflowException(event["message"]) self.log.info("%s completed successfully.", self.task_id) diff --git a/providers/airbyte/src/airflow/providers/airbyte/triggers/airbyte.py b/providers/airbyte/src/airflow/providers/airbyte/triggers/airbyte.py index 641b0062c7fc8..81e754a7f0370 100644 --- a/providers/airbyte/src/airflow/providers/airbyte/triggers/airbyte.py +++ b/providers/airbyte/src/airflow/providers/airbyte/triggers/airbyte.py @@ -117,5 +117,8 @@ async def is_still_running(self, hook: AirbyteHook) -> bool: """ job_run_status = hook.get_job_status(self.job_id) if job_run_status in (JobStatusEnum.RUNNING, JobStatusEnum.PENDING, JobStatusEnum.INCOMPLETE): + self.log.debug( + "Job run status is: %s with context: %s", job_run_status, hook.get_job_details(self.job_id) + ) return True return False diff --git a/providers/airbyte/tests/unit/airbyte/operators/test_airbyte.py b/providers/airbyte/tests/unit/airbyte/operators/test_airbyte.py index 0e9fc54c6b883..951194a546a3a 100644 --- a/providers/airbyte/tests/unit/airbyte/operators/test_airbyte.py +++ b/providers/airbyte/tests/unit/airbyte/operators/test_airbyte.py @@ -67,8 +67,9 @@ def test_execute(self, mock_wait_for_job, mock_submit_sync_connection, create_co job_id=self.job_id, wait_seconds=self.wait_seconds, timeout=self.timeout ) + @mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job_status") @mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.cancel_job") - def test_on_kill(self, mock_cancel_job, create_connection_without_db): + def test_on_kill(self, mock_cancel_job, mock_get_job_status, create_connection_without_db): conn = Connection(conn_id=self.airbyte_conn_id, conn_type="airbyte", host="airbyte.com") create_connection_without_db(conn) @@ -83,3 +84,4 @@ def test_on_kill(self, mock_cancel_job, create_connection_without_db): op.on_kill() mock_cancel_job.assert_called_once_with(self.job_id) + mock_get_job_status.assert_called_once_with(self.job_id)