Skip to content

Commit

Permalink
improve/fix glue job logs printing (#30886)
Browse files Browse the repository at this point in the history
  • Loading branch information
vandonr-amz authored Apr 28, 2023
1 parent 093174d commit 1f01749
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 83 deletions.
150 changes: 82 additions & 68 deletions airflow/providers/amazon/aws/hooks/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,13 @@
import time

import boto3
from botocore.exceptions import ClientError

from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook

DEFAULT_LOG_SUFFIX = "output"
FAILURE_LOG_SUFFIX = "error"
# A filter value of ' ' translates to "match all".
# see: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html
DEFAULT_LOG_FILTER = " "
FAILURE_LOG_FILTER = "?ERROR ?Exception"
ERROR_LOG_SUFFIX = "error"


class GlueJobHook(AwsBaseHook):
Expand Down Expand Up @@ -58,6 +55,13 @@ class GlueJobHook(AwsBaseHook):

JOB_POLL_INTERVAL = 6 # polls job status after every JOB_POLL_INTERVAL seconds

class LogContinuationTokens:
"""Used to hold the continuation tokens when reading logs from both streams Glue Jobs write to."""

def __init__(self):
self.output_stream_continuation: str | None = None
self.error_stream_continuation: str | None = None

def __init__(
self,
s3_bucket: str | None = None,
Expand Down Expand Up @@ -194,46 +198,61 @@ def print_job_logs(
self,
job_name: str,
run_id: str,
job_failed: bool = False,
next_token: str | None = None,
) -> str | None:
"""Prints the batch of logs to the Airflow task log and returns nextToken."""
log_client = boto3.client("logs")
response = {}
continuation_tokens: LogContinuationTokens,
):
"""
Prints the latest job logs to the Airflow task log and updates the continuation tokens.
filter_pattern = FAILURE_LOG_FILTER if job_failed else DEFAULT_LOG_FILTER
log_group_prefix = self.conn.get_job_run(JobName=job_name, RunId=run_id)["JobRun"]["LogGroupName"]
log_group_suffix = FAILURE_LOG_SUFFIX if job_failed else DEFAULT_LOG_SUFFIX
log_group_name = f"{log_group_prefix}/{log_group_suffix}"
:param continuation_tokens: the tokens where to resume from when reading logs.
The object gets updated with the new tokens by this method.
"""
log_client = boto3.client("logs")
paginator = log_client.get_paginator("filter_log_events")

try:
if next_token:
response = log_client.filter_log_events(
logGroupName=log_group_name,
def display_logs_from(log_group: str, continuation_token: str | None) -> str | None:
"""Internal method to mutualize iteration over the 2 different log streams glue jobs write to"""
fetched_logs = []
next_token = continuation_token
try:
for response in paginator.paginate(
logGroupName=log_group,
logStreamNames=[run_id],
filterPattern=filter_pattern,
nextToken=next_token,
)
PaginationConfig={"StartingToken": continuation_token},
):
fetched_logs.extend([event["message"] for event in response["events"]])
# if the response is empty there is no nextToken in it
next_token = response.get("nextToken") or next_token
except ClientError as e:
if e.response["Error"]["Code"] == "ResourceNotFoundException":
# we land here when the log groups/streams don't exist yet
self.log.warning(
"No new Glue driver logs so far.\nIf this persists, check the CloudWatch dashboard "
f"at: https://{self.conn_region_name}.console.aws.amazon.com/cloudwatch/home"
)
else:
raise

if len(fetched_logs):
# Add a tab to indent those logs and distinguish them from airflow logs.
# Log lines returned already contain a newline character at the end.
messages = "\t".join(fetched_logs)
self.log.info("Glue Job Run %s Logs:\n\t%s", log_group, messages)
else:
response = log_client.filter_log_events(
logGroupName=log_group_name,
logStreamNames=[run_id],
filterPattern=filter_pattern,
)
if len(response["events"]):
messages = "\t".join([event["message"] for event in response["events"]])
self.log.info("Glue Job Run Logs:\n\t%s", messages)

except log_client.exceptions.ResourceNotFoundException:
self.log.warning(
"No new Glue driver logs found. This might be because there are no new logs, "
"or might be an error.\nIf the error persists, check the CloudWatch dashboard "
f"at: https://{self.conn_region_name}.console.aws.amazon.com/cloudwatch/home"
)
self.log.info("No new log from the Glue Job in %s", log_group)
return next_token

# If no new log events are available, filter_log_events will return None.
# In that case, check the same token again next pass.
return response.get("nextToken") or next_token
log_group_prefix = self.conn.get_job_run(JobName=job_name, RunId=run_id)["JobRun"]["LogGroupName"]
log_group_default = f"{log_group_prefix}/{DEFAULT_LOG_SUFFIX}"
log_group_error = f"{log_group_prefix}/{ERROR_LOG_SUFFIX}"

# one would think that the error log group would contain only errors, but it actually contains
# a lot of interesting logs too, so it's valuable to have both
continuation_tokens.output_stream_continuation = display_logs_from(
log_group_default, continuation_tokens.output_stream_continuation
)
continuation_tokens.error_stream_continuation = display_logs_from(
log_group_error, continuation_tokens.error_stream_continuation
)

def job_completion(self, job_name: str, run_id: str, verbose: bool = False) -> dict[str, str]:
"""
Expand All @@ -247,35 +266,30 @@ def job_completion(self, job_name: str, run_id: str, verbose: bool = False) -> d
"""
failed_states = ["FAILED", "TIMEOUT"]
finished_states = ["SUCCEEDED", "STOPPED"]
next_log_token = None
job_failed = False

next_log_tokens = self.LogContinuationTokens()
while True:
try:
job_run_state = self.get_job_state(job_name, run_id)
if job_run_state in finished_states:
self.log.info("Exiting Job %s Run State: %s", run_id, job_run_state)
return {"JobRunState": job_run_state, "JobRunId": run_id}
if job_run_state in failed_states:
job_failed = True
job_error_message = f"Exiting Job {run_id} Run State: {job_run_state}"
self.log.info(job_error_message)
raise AirflowException(job_error_message)
else:
self.log.info(
"Polling for AWS Glue Job %s current run state with status %s",
job_name,
job_run_state,
)
time.sleep(self.JOB_POLL_INTERVAL)
finally:
if verbose:
next_log_token = self.print_job_logs(
job_name=job_name,
run_id=run_id,
job_failed=job_failed,
next_token=next_log_token,
)
if verbose:
self.print_job_logs(
job_name=job_name,
run_id=run_id,
continuation_tokens=next_log_tokens,
)

job_run_state = self.get_job_state(job_name, run_id)
if job_run_state in finished_states:
self.log.info("Exiting Job %s Run State: %s", run_id, job_run_state)
return {"JobRunState": job_run_state, "JobRunId": run_id}
if job_run_state in failed_states:
job_error_message = f"Exiting Job {run_id} Run State: {job_run_state}"
self.log.info(job_error_message)
raise AirflowException(job_error_message)
else:
self.log.info(
"Polling for AWS Glue Job %s current run state with status %s",
job_name,
job_run_state,
)
time.sleep(self.JOB_POLL_INTERVAL)

def has_job(self, job_name) -> bool:
"""
Expand Down
9 changes: 3 additions & 6 deletions airflow/providers/amazon/aws/sensors/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def __init__(
self.aws_conn_id = aws_conn_id
self.success_states: list[str] = ["SUCCEEDED"]
self.errored_states: list[str] = ["FAILED", "STOPPED", "TIMEOUT"]
self.next_log_token: str | None = None
self.next_log_tokens = GlueJobHook.LogContinuationTokens()

@cached_property
def hook(self):
Expand All @@ -69,24 +69,21 @@ def hook(self):
def poke(self, context: Context):
self.log.info("Poking for job run status :for Glue Job %s and ID %s", self.job_name, self.run_id)
job_state = self.hook.get_job_state(job_name=self.job_name, run_id=self.run_id)
job_failed = False

try:
if job_state in self.success_states:
self.log.info("Exiting Job %s Run State: %s", self.run_id, job_state)
return True
elif job_state in self.errored_states:
job_failed = True
job_error_message = "Exiting Job %s Run State: %s", self.run_id, job_state
self.log.info(job_error_message)
raise AirflowException(job_error_message)
else:
return False
finally:
if self.verbose:
self.next_log_token = self.hook.print_job_logs(
self.hook.print_job_logs(
job_name=self.job_name,
run_id=self.run_id,
job_failed=job_failed,
next_token=self.next_log_token,
continuation_tokens=self.next_log_tokens,
)
46 changes: 46 additions & 0 deletions tests/providers/amazon/aws/hooks/test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

import json
from unittest import mock
from unittest.mock import MagicMock

import boto3
import pytest
from botocore.exceptions import ClientError
from moto import mock_glue, mock_iam

from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
Expand Down Expand Up @@ -303,3 +305,47 @@ def test_initialize_job(self, mock_conn, mock_get_job_state):
glue_job_run = glue_job_hook.initialize_job(some_script_arguments, some_run_kwargs)
glue_job_run_state = glue_job_hook.get_job_state(glue_job_run["JobName"], glue_job_run["JobRunId"])
assert glue_job_run_state == mock_job_run_state, "Mocks but be equal"

@mock.patch("airflow.providers.amazon.aws.hooks.glue.boto3.client")
@mock.patch.object(GlueJobHook, "conn")
def test_print_job_logs_returns_token(self, conn_mock: MagicMock, client_mock: MagicMock, caplog):
hook = GlueJobHook()
conn_mock().get_job_run.return_value = {"JobRun": {"LogGroupName": "my_log_group"}}
client_mock().get_paginator().paginate.return_value = [
# first response : 2 log lines
{
"events": [
{"logStreamName": "stream", "timestamp": 123, "message": "hello\n"},
{"logStreamName": "stream", "timestamp": 123, "message": "world\n"},
],
"searchedLogStreams": [],
"nextToken": "my_continuation_token",
"ResponseMetadata": {"HTTPStatusCode": 200},
},
# second response, reached end of stream
{"events": [], "searchedLogStreams": [], "ResponseMetadata": {"HTTPStatusCode": 200}},
]

tokens = GlueJobHook.LogContinuationTokens()
with caplog.at_level("INFO"):
hook.print_job_logs("name", "run", tokens)

assert "\thello\n\tworld\n" in caplog.text
assert tokens.output_stream_continuation == "my_continuation_token"
assert tokens.error_stream_continuation == "my_continuation_token"

@mock.patch("airflow.providers.amazon.aws.hooks.glue.boto3.client")
@mock.patch.object(GlueJobHook, "conn")
def test_print_job_logs_no_stream_yet(self, conn_mock: MagicMock, client_mock: MagicMock):
hook = GlueJobHook()
conn_mock().get_job_run.return_value = {"JobRun": {"LogGroupName": "my_log_group"}}
client_mock().get_paginator().paginate.side_effect = ClientError(
{"Error": {"Code": "ResourceNotFoundException"}}, "op"
)

tokens = GlueJobHook.LogContinuationTokens()
hook.print_job_logs("name", "run", tokens) # should not error

assert tokens.output_stream_continuation is None
assert tokens.error_stream_continuation is None
assert client_mock().get_paginator().paginate.call_count == 2
5 changes: 1 addition & 4 deletions tests/providers/amazon/aws/operators/test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,7 @@ def test_execute_with_verbose_logging(

mock_initialize_job.assert_called_once_with({}, {})
mock_print_job_logs.assert_called_once_with(
job_name=JOB_NAME,
run_id=JOB_RUN_ID,
job_failed=False,
next_token=None,
job_name=JOB_NAME, run_id=JOB_RUN_ID, continuation_tokens=mock.ANY
)
assert glue.job_name == JOB_NAME

Expand Down
6 changes: 2 additions & 4 deletions tests/providers/amazon/aws/sensors/test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ def test_poke_with_verbose_logging(self, mock_get_job_state, mock_conn, mock_pri
mock_print_job_logs.assert_called_once_with(
job_name=job_name,
run_id=job_run_id,
job_failed=False,
next_token=ANY,
continuation_tokens=ANY,
)

@mock.patch.object(GlueJobHook, "print_job_logs")
Expand Down Expand Up @@ -111,8 +110,7 @@ def test_poke_false_with_verbose_logging(self, mock_get_job_state, mock_conn, mo
mock_print_job_logs.assert_called_once_with(
job_name=job_name,
run_id=job_run_id,
job_failed=False,
next_token=ANY,
continuation_tokens=ANY,
)

@mock.patch.object(GlueJobHook, "print_job_logs")
Expand Down
3 changes: 2 additions & 1 deletion tests/system/providers/amazon/aws/example_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,10 @@ def glue_cleanup(crawler_name: str, job_name: str, db_name: str) -> None:
job_name=glue_job_name,
# Job ID extracted from previous Glue Job Operator task
run_id=submit_glue_job.output,
verbose=True, # prints glue job logs in airflow logs
)
# [END howto_sensor_glue]
wait_for_job.poke_interval = 10
wait_for_job.poke_interval = 5

delete_bucket = S3DeleteBucketOperator(
task_id="delete_bucket",
Expand Down

0 comments on commit 1f01749

Please sign in to comment.