diff --git a/airflow/providers/amazon/aws/hooks/glue.py b/airflow/providers/amazon/aws/hooks/glue.py index 0102f93ccccf..6f4ed8342e07 100644 --- a/airflow/providers/amazon/aws/hooks/glue.py +++ b/airflow/providers/amazon/aws/hooks/glue.py @@ -20,16 +20,13 @@ import time import boto3 +from botocore.exceptions import ClientError from airflow.exceptions import AirflowException from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook DEFAULT_LOG_SUFFIX = "output" -FAILURE_LOG_SUFFIX = "error" -# A filter value of ' ' translates to "match all". -# see: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html -DEFAULT_LOG_FILTER = " " -FAILURE_LOG_FILTER = "?ERROR ?Exception" +ERROR_LOG_SUFFIX = "error" class GlueJobHook(AwsBaseHook): @@ -58,6 +55,13 @@ class GlueJobHook(AwsBaseHook): JOB_POLL_INTERVAL = 6 # polls job status after every JOB_POLL_INTERVAL seconds + class LogContinuationTokens: + """Used to hold the continuation tokens when reading logs from both streams Glue Jobs write to.""" + + def __init__(self): + self.output_stream_continuation: str | None = None + self.error_stream_continuation: str | None = None + def __init__( self, s3_bucket: str | None = None, @@ -194,46 +198,61 @@ def print_job_logs( self, job_name: str, run_id: str, - job_failed: bool = False, - next_token: str | None = None, - ) -> str | None: - """Prints the batch of logs to the Airflow task log and returns nextToken.""" - log_client = boto3.client("logs") - response = {} + continuation_tokens: LogContinuationTokens, + ): + """ + Prints the latest job logs to the Airflow task log and updates the continuation tokens. - filter_pattern = FAILURE_LOG_FILTER if job_failed else DEFAULT_LOG_FILTER - log_group_prefix = self.conn.get_job_run(JobName=job_name, RunId=run_id)["JobRun"]["LogGroupName"] - log_group_suffix = FAILURE_LOG_SUFFIX if job_failed else DEFAULT_LOG_SUFFIX - log_group_name = f"{log_group_prefix}/{log_group_suffix}" + :param continuation_tokens: the tokens where to resume from when reading logs. + The object gets updated with the new tokens by this method. + """ + log_client = boto3.client("logs") + paginator = log_client.get_paginator("filter_log_events") - try: - if next_token: - response = log_client.filter_log_events( - logGroupName=log_group_name, + def display_logs_from(log_group: str, continuation_token: str | None) -> str | None: + """Internal method to mutualize iteration over the 2 different log streams glue jobs write to""" + fetched_logs = [] + next_token = continuation_token + try: + for response in paginator.paginate( + logGroupName=log_group, logStreamNames=[run_id], - filterPattern=filter_pattern, - nextToken=next_token, - ) + PaginationConfig={"StartingToken": continuation_token}, + ): + fetched_logs.extend([event["message"] for event in response["events"]]) + # if the response is empty there is no nextToken in it + next_token = response.get("nextToken") or next_token + except ClientError as e: + if e.response["Error"]["Code"] == "ResourceNotFoundException": + # we land here when the log groups/streams don't exist yet + self.log.warning( + "No new Glue driver logs so far.\nIf this persists, check the CloudWatch dashboard " + f"at: https://{self.conn_region_name}.console.aws.amazon.com/cloudwatch/home" + ) + else: + raise + + if len(fetched_logs): + # Add a tab to indent those logs and distinguish them from airflow logs. + # Log lines returned already contain a newline character at the end. + messages = "\t".join(fetched_logs) + self.log.info("Glue Job Run %s Logs:\n\t%s", log_group, messages) else: - response = log_client.filter_log_events( - logGroupName=log_group_name, - logStreamNames=[run_id], - filterPattern=filter_pattern, - ) - if len(response["events"]): - messages = "\t".join([event["message"] for event in response["events"]]) - self.log.info("Glue Job Run Logs:\n\t%s", messages) - - except log_client.exceptions.ResourceNotFoundException: - self.log.warning( - "No new Glue driver logs found. This might be because there are no new logs, " - "or might be an error.\nIf the error persists, check the CloudWatch dashboard " - f"at: https://{self.conn_region_name}.console.aws.amazon.com/cloudwatch/home" - ) + self.log.info("No new log from the Glue Job in %s", log_group) + return next_token - # If no new log events are available, filter_log_events will return None. - # In that case, check the same token again next pass. - return response.get("nextToken") or next_token + log_group_prefix = self.conn.get_job_run(JobName=job_name, RunId=run_id)["JobRun"]["LogGroupName"] + log_group_default = f"{log_group_prefix}/{DEFAULT_LOG_SUFFIX}" + log_group_error = f"{log_group_prefix}/{ERROR_LOG_SUFFIX}" + + # one would think that the error log group would contain only errors, but it actually contains + # a lot of interesting logs too, so it's valuable to have both + continuation_tokens.output_stream_continuation = display_logs_from( + log_group_default, continuation_tokens.output_stream_continuation + ) + continuation_tokens.error_stream_continuation = display_logs_from( + log_group_error, continuation_tokens.error_stream_continuation + ) def job_completion(self, job_name: str, run_id: str, verbose: bool = False) -> dict[str, str]: """ @@ -247,35 +266,30 @@ def job_completion(self, job_name: str, run_id: str, verbose: bool = False) -> d """ failed_states = ["FAILED", "TIMEOUT"] finished_states = ["SUCCEEDED", "STOPPED"] - next_log_token = None - job_failed = False - + next_log_tokens = self.LogContinuationTokens() while True: - try: - job_run_state = self.get_job_state(job_name, run_id) - if job_run_state in finished_states: - self.log.info("Exiting Job %s Run State: %s", run_id, job_run_state) - return {"JobRunState": job_run_state, "JobRunId": run_id} - if job_run_state in failed_states: - job_failed = True - job_error_message = f"Exiting Job {run_id} Run State: {job_run_state}" - self.log.info(job_error_message) - raise AirflowException(job_error_message) - else: - self.log.info( - "Polling for AWS Glue Job %s current run state with status %s", - job_name, - job_run_state, - ) - time.sleep(self.JOB_POLL_INTERVAL) - finally: - if verbose: - next_log_token = self.print_job_logs( - job_name=job_name, - run_id=run_id, - job_failed=job_failed, - next_token=next_log_token, - ) + if verbose: + self.print_job_logs( + job_name=job_name, + run_id=run_id, + continuation_tokens=next_log_tokens, + ) + + job_run_state = self.get_job_state(job_name, run_id) + if job_run_state in finished_states: + self.log.info("Exiting Job %s Run State: %s", run_id, job_run_state) + return {"JobRunState": job_run_state, "JobRunId": run_id} + if job_run_state in failed_states: + job_error_message = f"Exiting Job {run_id} Run State: {job_run_state}" + self.log.info(job_error_message) + raise AirflowException(job_error_message) + else: + self.log.info( + "Polling for AWS Glue Job %s current run state with status %s", + job_name, + job_run_state, + ) + time.sleep(self.JOB_POLL_INTERVAL) def has_job(self, job_name) -> bool: """ diff --git a/airflow/providers/amazon/aws/sensors/glue.py b/airflow/providers/amazon/aws/sensors/glue.py index 85db6944d0a6..761a51609bed 100644 --- a/airflow/providers/amazon/aws/sensors/glue.py +++ b/airflow/providers/amazon/aws/sensors/glue.py @@ -60,7 +60,7 @@ def __init__( self.aws_conn_id = aws_conn_id self.success_states: list[str] = ["SUCCEEDED"] self.errored_states: list[str] = ["FAILED", "STOPPED", "TIMEOUT"] - self.next_log_token: str | None = None + self.next_log_tokens = GlueJobHook.LogContinuationTokens() @cached_property def hook(self): @@ -69,14 +69,12 @@ def hook(self): def poke(self, context: Context): self.log.info("Poking for job run status :for Glue Job %s and ID %s", self.job_name, self.run_id) job_state = self.hook.get_job_state(job_name=self.job_name, run_id=self.run_id) - job_failed = False try: if job_state in self.success_states: self.log.info("Exiting Job %s Run State: %s", self.run_id, job_state) return True elif job_state in self.errored_states: - job_failed = True job_error_message = "Exiting Job %s Run State: %s", self.run_id, job_state self.log.info(job_error_message) raise AirflowException(job_error_message) @@ -84,9 +82,8 @@ def poke(self, context: Context): return False finally: if self.verbose: - self.next_log_token = self.hook.print_job_logs( + self.hook.print_job_logs( job_name=self.job_name, run_id=self.run_id, - job_failed=job_failed, - next_token=self.next_log_token, + continuation_tokens=self.next_log_tokens, ) diff --git a/tests/providers/amazon/aws/hooks/test_glue.py b/tests/providers/amazon/aws/hooks/test_glue.py index c63b37916182..9a46abb34fdb 100644 --- a/tests/providers/amazon/aws/hooks/test_glue.py +++ b/tests/providers/amazon/aws/hooks/test_glue.py @@ -19,9 +19,11 @@ import json from unittest import mock +from unittest.mock import MagicMock import boto3 import pytest +from botocore.exceptions import ClientError from moto import mock_glue, mock_iam from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook @@ -303,3 +305,47 @@ def test_initialize_job(self, mock_conn, mock_get_job_state): glue_job_run = glue_job_hook.initialize_job(some_script_arguments, some_run_kwargs) glue_job_run_state = glue_job_hook.get_job_state(glue_job_run["JobName"], glue_job_run["JobRunId"]) assert glue_job_run_state == mock_job_run_state, "Mocks but be equal" + + @mock.patch("airflow.providers.amazon.aws.hooks.glue.boto3.client") + @mock.patch.object(GlueJobHook, "conn") + def test_print_job_logs_returns_token(self, conn_mock: MagicMock, client_mock: MagicMock, caplog): + hook = GlueJobHook() + conn_mock().get_job_run.return_value = {"JobRun": {"LogGroupName": "my_log_group"}} + client_mock().get_paginator().paginate.return_value = [ + # first response : 2 log lines + { + "events": [ + {"logStreamName": "stream", "timestamp": 123, "message": "hello\n"}, + {"logStreamName": "stream", "timestamp": 123, "message": "world\n"}, + ], + "searchedLogStreams": [], + "nextToken": "my_continuation_token", + "ResponseMetadata": {"HTTPStatusCode": 200}, + }, + # second response, reached end of stream + {"events": [], "searchedLogStreams": [], "ResponseMetadata": {"HTTPStatusCode": 200}}, + ] + + tokens = GlueJobHook.LogContinuationTokens() + with caplog.at_level("INFO"): + hook.print_job_logs("name", "run", tokens) + + assert "\thello\n\tworld\n" in caplog.text + assert tokens.output_stream_continuation == "my_continuation_token" + assert tokens.error_stream_continuation == "my_continuation_token" + + @mock.patch("airflow.providers.amazon.aws.hooks.glue.boto3.client") + @mock.patch.object(GlueJobHook, "conn") + def test_print_job_logs_no_stream_yet(self, conn_mock: MagicMock, client_mock: MagicMock): + hook = GlueJobHook() + conn_mock().get_job_run.return_value = {"JobRun": {"LogGroupName": "my_log_group"}} + client_mock().get_paginator().paginate.side_effect = ClientError( + {"Error": {"Code": "ResourceNotFoundException"}}, "op" + ) + + tokens = GlueJobHook.LogContinuationTokens() + hook.print_job_logs("name", "run", tokens) # should not error + + assert tokens.output_stream_continuation is None + assert tokens.error_stream_continuation is None + assert client_mock().get_paginator().paginate.call_count == 2 diff --git a/tests/providers/amazon/aws/operators/test_glue.py b/tests/providers/amazon/aws/operators/test_glue.py index d7712da4d7cb..db5ff1e6c232 100644 --- a/tests/providers/amazon/aws/operators/test_glue.py +++ b/tests/providers/amazon/aws/operators/test_glue.py @@ -121,10 +121,7 @@ def test_execute_with_verbose_logging( mock_initialize_job.assert_called_once_with({}, {}) mock_print_job_logs.assert_called_once_with( - job_name=JOB_NAME, - run_id=JOB_RUN_ID, - job_failed=False, - next_token=None, + job_name=JOB_NAME, run_id=JOB_RUN_ID, continuation_tokens=mock.ANY ) assert glue.job_name == JOB_NAME diff --git a/tests/providers/amazon/aws/sensors/test_glue.py b/tests/providers/amazon/aws/sensors/test_glue.py index c8d593eed415..bbb77e68263a 100644 --- a/tests/providers/amazon/aws/sensors/test_glue.py +++ b/tests/providers/amazon/aws/sensors/test_glue.py @@ -69,8 +69,7 @@ def test_poke_with_verbose_logging(self, mock_get_job_state, mock_conn, mock_pri mock_print_job_logs.assert_called_once_with( job_name=job_name, run_id=job_run_id, - job_failed=False, - next_token=ANY, + continuation_tokens=ANY, ) @mock.patch.object(GlueJobHook, "print_job_logs") @@ -111,8 +110,7 @@ def test_poke_false_with_verbose_logging(self, mock_get_job_state, mock_conn, mo mock_print_job_logs.assert_called_once_with( job_name=job_name, run_id=job_run_id, - job_failed=False, - next_token=ANY, + continuation_tokens=ANY, ) @mock.patch.object(GlueJobHook, "print_job_logs") diff --git a/tests/system/providers/amazon/aws/example_glue.py b/tests/system/providers/amazon/aws/example_glue.py index f010b2dfd9fe..e0ce565cdf02 100644 --- a/tests/system/providers/amazon/aws/example_glue.py +++ b/tests/system/providers/amazon/aws/example_glue.py @@ -162,9 +162,10 @@ def glue_cleanup(crawler_name: str, job_name: str, db_name: str) -> None: job_name=glue_job_name, # Job ID extracted from previous Glue Job Operator task run_id=submit_glue_job.output, + verbose=True, # prints glue job logs in airflow logs ) # [END howto_sensor_glue] - wait_for_job.poke_interval = 10 + wait_for_job.poke_interval = 5 delete_bucket = S3DeleteBucketOperator( task_id="delete_bucket",