Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions providers/airbyte/src/airflow/providers/airbyte/hooks/airbyte.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ def __init__(
def get_conn_params(self, conn_id: str) -> Any:
conn = self.get_connection(conn_id)

# Intentionally left the password out, you can modify the log to print it out if you are doing testing.
self.log.debug(
"Connection attributes are: host - %s, url - %s, description - %s",
conn.host,
conn.schema,
conn.description,
)
conn_params: dict = {}
conn_params["host"] = conn.host
conn_params["client_id"] = conn.login
Expand All @@ -78,6 +85,7 @@ def create_api_session(self) -> AirbyteAPI:

client = None
if self.conn["proxies"]:
self.log.debug("Creating client proxy...")
client = Session()
client.proxies = self.conn["proxies"]

Expand Down Expand Up @@ -111,6 +119,7 @@ def get_job_details(self, job_id: int) -> Any:
job_id=job_id,
)
)
self.log.debug("Job details are: %s", get_job_res.job_response)
return get_job_res.job_response
except Exception as e:
raise AirflowException(e)
Expand Down Expand Up @@ -138,12 +147,14 @@ def wait_for_job(self, job_id: str | int, wait_seconds: float = 3, timeout: floa
start = time.monotonic()
while True:
if timeout and start + timeout < time.monotonic():
self.log.debug("Canceling job...")
self.cancel_job(job_id=(int(job_id)))
raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s")
time.sleep(wait_seconds)
try:
job = self.get_job_details(job_id=(int(job_id)))
state = job.status
self.log.debug("Job State: %s. Job Details: %s", state, job)

except AirflowException as err:
self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err)
Expand All @@ -161,12 +172,14 @@ def wait_for_job(self, job_id: str | int, wait_seconds: float = 3, timeout: floa

def submit_sync_connection(self, connection_id: str) -> Any:
try:
self.log.debug("Creating job request..")
res = self.airbyte_api.jobs.create_job(
request=JobCreateRequest(
connection_id=connection_id,
job_type=JobTypeEnum.SYNC,
)
)
self.log.debug("Job request successful, response: %s", res.job_response)
return res.job_response
except Exception as e:
raise AirflowException(e)
Expand All @@ -191,6 +204,7 @@ def test_connection(self):
"""Tests the Airbyte connection by hitting the health API."""
try:
health_check = self.airbyte_api.health.get_health_check()
self.log.debug("Health check details: %s", health_check)
if health_check.status_code == 200:
return True, "Connection successfully tested"
return False, str(health_check.raw_response)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,10 @@ def execute(self, context: Context) -> None:
return self.job_id

if not self.deferrable:
self.log.debug("Running in non-deferrable mode...")
hook.wait_for_job(job_id=self.job_id, wait_seconds=self.wait_seconds, timeout=self.timeout)
else:
self.log.debug("Running in defferable mode in job state %s...", state)
if state in (JobStatusEnum.RUNNING, JobStatusEnum.PENDING, JobStatusEnum.INCOMPLETE):
self.defer(
timeout=self.execution_timeout,
Expand Down Expand Up @@ -126,6 +128,7 @@ def execute_complete(self, context: Context, event: Any = None) -> None:
successful.
"""
if event["status"] == "error":
self.log.debug("Error occurred with context: %s", context)
raise AirflowException(event["message"])

self.log.info("%s completed successfully.", self.task_id)
Expand All @@ -134,6 +137,11 @@ def execute_complete(self, context: Context, event: Any = None) -> None:
def on_kill(self):
"""Cancel the job if task is cancelled."""
hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
self.log.debug(
"Job status for job_id %s prior to canceling is: %s",
self.job_id,
hook.get_job_status(self.job_id),
)
if self.job_id:
self.log.info("on_kill: cancel the airbyte Job %s", self.job_id)
hook.cancel_job(self.job_id)
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def poke(self, context: Context) -> bool:

if status == JobStatusEnum.FAILED:
message = f"Job failed: \n{job}"
self.log.debug("Failed with context: %s", context)
raise AirflowException(message)
if status == JobStatusEnum.CANCELLED:
message = f"Job was cancelled: \n{job}"
Expand Down Expand Up @@ -117,6 +118,7 @@ def execute(self, context: Context) -> Any:
self.log.info("%s completed successfully.", self.task_id)
return
elif state == JobStatusEnum.FAILED:
self.log.debug("Failed with context: %s", context)
raise AirflowException(f"Job failed:\n{job}")
elif state == JobStatusEnum.CANCELLED:
raise AirflowException(f"Job was cancelled:\n{job}")
Expand All @@ -133,6 +135,7 @@ def execute_complete(self, context: Context, event: Any = None) -> None:
successful.
"""
if event["status"] == "error":
self.log.debug("An error occurred with context: %s", context)
raise AirflowException(event["message"])

self.log.info("%s completed successfully.", self.task_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,5 +117,8 @@ async def is_still_running(self, hook: AirbyteHook) -> bool:
"""
job_run_status = hook.get_job_status(self.job_id)
if job_run_status in (JobStatusEnum.RUNNING, JobStatusEnum.PENDING, JobStatusEnum.INCOMPLETE):
self.log.debug(
"Job run status is: %s with context: %s", job_run_status, hook.get_job_details(self.job_id)
)
return True
return False
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ def test_execute(self, mock_wait_for_job, mock_submit_sync_connection, create_co
job_id=self.job_id, wait_seconds=self.wait_seconds, timeout=self.timeout
)

@mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job_status")
@mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.cancel_job")
def test_on_kill(self, mock_cancel_job, create_connection_without_db):
def test_on_kill(self, mock_cancel_job, mock_get_job_status, create_connection_without_db):
conn = Connection(conn_id=self.airbyte_conn_id, conn_type="airbyte", host="airbyte.com")
create_connection_without_db(conn)

Expand All @@ -83,3 +84,4 @@ def test_on_kill(self, mock_cancel_job, create_connection_without_db):
op.on_kill()

mock_cancel_job.assert_called_once_with(self.job_id)
mock_get_job_status.assert_called_once_with(self.job_id)
Loading