Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve/fix glue job logs printing #30886

Merged
merged 6 commits into from
Apr 28, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 82 additions & 68 deletions airflow/providers/amazon/aws/hooks/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,13 @@
import time

import boto3
from botocore.exceptions import ClientError

from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook

DEFAULT_LOG_SUFFIX = "output"
FAILURE_LOG_SUFFIX = "error"
# A filter value of ' ' translates to "match all".
# see: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html
DEFAULT_LOG_FILTER = " "
FAILURE_LOG_FILTER = "?ERROR ?Exception"
ERROR_LOG_SUFFIX = "error"


class GlueJobHook(AwsBaseHook):
Expand Down Expand Up @@ -58,6 +55,13 @@ class GlueJobHook(AwsBaseHook):

JOB_POLL_INTERVAL = 6 # polls job status after every JOB_POLL_INTERVAL seconds

class LogContinuationTokens:
"""Used to hold the continuation tokens when reading logs from both streams Glue Jobs write to."""

def __init__(self):
self.output_stream_continuation: str | None = None
self.error_stream_continuation: str | None = None

def __init__(
self,
s3_bucket: str | None = None,
Expand Down Expand Up @@ -194,46 +198,61 @@ def print_job_logs(
self,
job_name: str,
run_id: str,
job_failed: bool = False,
next_token: str | None = None,
) -> str | None:
"""Prints the batch of logs to the Airflow task log and returns nextToken."""
log_client = boto3.client("logs")
response = {}
continuation_tokens: LogContinuationTokens,
):
"""
Prints the batch of logs to the Airflow task log and returns nextToken.
vandonr-amz marked this conversation as resolved.
Show resolved Hide resolved

filter_pattern = FAILURE_LOG_FILTER if job_failed else DEFAULT_LOG_FILTER
log_group_prefix = self.conn.get_job_run(JobName=job_name, RunId=run_id)["JobRun"]["LogGroupName"]
log_group_suffix = FAILURE_LOG_SUFFIX if job_failed else DEFAULT_LOG_SUFFIX
log_group_name = f"{log_group_prefix}/{log_group_suffix}"
:param continuation_tokens: the tokens where to resume from when reading logs.
The object gets updated with the new tokens by this method.
"""
log_client = boto3.client("logs")
paginator = log_client.get_paginator("filter_log_events")

try:
if next_token:
response = log_client.filter_log_events(
logGroupName=log_group_name,
def display_logs_from(log_group: str, continuation_token: str | None) -> str | None:
"""Internal method to mutualize iteration over the 2 different log streams glue jobs write to"""
fetched_logs = []
next_token = continuation_token
try:
for response in paginator.paginate(
logGroupName=log_group,
logStreamNames=[run_id],
filterPattern=filter_pattern,
nextToken=next_token,
)
PaginationConfig={"StartingToken": continuation_token},
):
fetched_logs.extend([event["message"] for event in response["events"]])
# if the response is empty there is no nextToken in it
next_token = response.get("nextToken") or next_token
except ClientError as e:
if e.response["Error"]["Code"] == "ResourceNotFoundException":
# we land here when the log groups/streams don't exist yet
self.log.warning(
"No new Glue driver logs so far.\nIf this persists, check the CloudWatch dashboard "
f"at: https://{self.conn_region_name}.console.aws.amazon.com/cloudwatch/home"
)
else:
raise

if len(fetched_logs):
# Add a tab to indent those logs and distinguish them from airflow logs.
# Log lines returned already contain a newline character at the end.
messages = "\t".join(fetched_logs)
self.log.info("Glue Job Run %s Logs:\n\t%s", log_group, messages)
else:
response = log_client.filter_log_events(
logGroupName=log_group_name,
logStreamNames=[run_id],
filterPattern=filter_pattern,
)
if len(response["events"]):
messages = "\t".join([event["message"] for event in response["events"]])
self.log.info("Glue Job Run Logs:\n\t%s", messages)

except log_client.exceptions.ResourceNotFoundException:
self.log.warning(
"No new Glue driver logs found. This might be because there are no new logs, "
"or might be an error.\nIf the error persists, check the CloudWatch dashboard "
f"at: https://{self.conn_region_name}.console.aws.amazon.com/cloudwatch/home"
)
self.log.info("No new log from the Glue Job in %s", log_group)
return next_token

# If no new log events are available, filter_log_events will return None.
# In that case, check the same token again next pass.
return response.get("nextToken") or next_token
log_group_prefix = self.conn.get_job_run(JobName=job_name, RunId=run_id)["JobRun"]["LogGroupName"]
log_group_default = f"{log_group_prefix}/{DEFAULT_LOG_SUFFIX}"
log_group_error = f"{log_group_prefix}/{ERROR_LOG_SUFFIX}"

# one would think that the error log group would contain only errors, but it actually contains
# a lot of interesting logs too, so it's valuable to have both
continuation_tokens.output_stream_continuation = display_logs_from(
log_group_default, continuation_tokens.output_stream_continuation
)
continuation_tokens.error_stream_continuation = display_logs_from(
log_group_error, continuation_tokens.error_stream_continuation
)
vandonr-amz marked this conversation as resolved.
Show resolved Hide resolved

def job_completion(self, job_name: str, run_id: str, verbose: bool = False) -> dict[str, str]:
"""
Expand All @@ -247,35 +266,30 @@ def job_completion(self, job_name: str, run_id: str, verbose: bool = False) -> d
"""
failed_states = ["FAILED", "TIMEOUT"]
finished_states = ["SUCCEEDED", "STOPPED"]
next_log_token = None
job_failed = False

next_log_tokens = self.LogContinuationTokens()
while True:
try:
job_run_state = self.get_job_state(job_name, run_id)
if job_run_state in finished_states:
self.log.info("Exiting Job %s Run State: %s", run_id, job_run_state)
return {"JobRunState": job_run_state, "JobRunId": run_id}
if job_run_state in failed_states:
job_failed = True
job_error_message = f"Exiting Job {run_id} Run State: {job_run_state}"
self.log.info(job_error_message)
raise AirflowException(job_error_message)
else:
self.log.info(
"Polling for AWS Glue Job %s current run state with status %s",
job_name,
job_run_state,
)
time.sleep(self.JOB_POLL_INTERVAL)
finally:
if verbose:
next_log_token = self.print_job_logs(
job_name=job_name,
run_id=run_id,
job_failed=job_failed,
next_token=next_log_token,
)
if verbose:
self.print_job_logs(
job_name=job_name,
run_id=run_id,
continuation_tokens=next_log_tokens,
)

job_run_state = self.get_job_state(job_name, run_id)
if job_run_state in finished_states:
self.log.info("Exiting Job %s Run State: %s", run_id, job_run_state)
return {"JobRunState": job_run_state, "JobRunId": run_id}
if job_run_state in failed_states:
job_error_message = f"Exiting Job {run_id} Run State: {job_run_state}"
self.log.info(job_error_message)
raise AirflowException(job_error_message)
else:
self.log.info(
"Polling for AWS Glue Job %s current run state with status %s",
job_name,
job_run_state,
)
time.sleep(self.JOB_POLL_INTERVAL)

def has_job(self, job_name) -> bool:
"""
Expand Down
9 changes: 3 additions & 6 deletions airflow/providers/amazon/aws/sensors/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def __init__(
self.aws_conn_id = aws_conn_id
self.success_states: list[str] = ["SUCCEEDED"]
self.errored_states: list[str] = ["FAILED", "STOPPED", "TIMEOUT"]
self.next_log_token: str | None = None
self.next_log_tokens = GlueJobHook.LogContinuationTokens()

@cached_property
def hook(self):
Expand All @@ -69,24 +69,21 @@ def hook(self):
def poke(self, context: Context):
self.log.info("Poking for job run status :for Glue Job %s and ID %s", self.job_name, self.run_id)
job_state = self.hook.get_job_state(job_name=self.job_name, run_id=self.run_id)
job_failed = False

try:
if job_state in self.success_states:
self.log.info("Exiting Job %s Run State: %s", self.run_id, job_state)
return True
elif job_state in self.errored_states:
job_failed = True
job_error_message = "Exiting Job %s Run State: %s", self.run_id, job_state
self.log.info(job_error_message)
raise AirflowException(job_error_message)
else:
return False
finally:
if self.verbose:
self.next_log_token = self.hook.print_job_logs(
self.hook.print_job_logs(
job_name=self.job_name,
run_id=self.run_id,
job_failed=job_failed,
next_token=self.next_log_token,
continuation_tokens=self.next_log_tokens,
)
46 changes: 46 additions & 0 deletions tests/providers/amazon/aws/hooks/test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

import json
from unittest import mock
from unittest.mock import MagicMock

import boto3
import pytest
from botocore.exceptions import ClientError
from moto import mock_glue, mock_iam

from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
Expand Down Expand Up @@ -303,3 +305,47 @@ def test_initialize_job(self, mock_conn, mock_get_job_state):
glue_job_run = glue_job_hook.initialize_job(some_script_arguments, some_run_kwargs)
glue_job_run_state = glue_job_hook.get_job_state(glue_job_run["JobName"], glue_job_run["JobRunId"])
assert glue_job_run_state == mock_job_run_state, "Mocks but be equal"

@mock.patch("airflow.providers.amazon.aws.hooks.glue.boto3.client")
@mock.patch.object(GlueJobHook, "conn")
def test_print_job_logs_returns_token(self, conn_mock: MagicMock, client_mock: MagicMock, caplog):
hook = GlueJobHook()
conn_mock().get_job_run.return_value = {"JobRun": {"LogGroupName": "my_log_group"}}
client_mock().get_paginator().paginate.return_value = [
# first response : 2 log lines
{
"events": [
{"logStreamName": "stream", "timestamp": 123, "message": "hello\n"},
{"logStreamName": "stream", "timestamp": 123, "message": "world\n"},
],
"searchedLogStreams": [],
"nextToken": "my_continuation_token",
"ResponseMetadata": {"HTTPStatusCode": 200},
},
# second response, reached end of stream
{"events": [], "searchedLogStreams": [], "ResponseMetadata": {"HTTPStatusCode": 200}},
]

tokens = GlueJobHook.LogContinuationTokens()
with caplog.at_level("INFO"):
hook.print_job_logs("name", "run", tokens)

assert "\thello\n\tworld\n" in caplog.text
assert tokens.output_stream_continuation == "my_continuation_token"
assert tokens.error_stream_continuation == "my_continuation_token"

@mock.patch("airflow.providers.amazon.aws.hooks.glue.boto3.client")
@mock.patch.object(GlueJobHook, "conn")
def test_print_job_logs_no_stream_yet(self, conn_mock: MagicMock, client_mock: MagicMock):
hook = GlueJobHook()
conn_mock().get_job_run.return_value = {"JobRun": {"LogGroupName": "my_log_group"}}
client_mock().get_paginator().paginate.side_effect = ClientError(
{"Error": {"Code": "ResourceNotFoundException"}}, "op"
)

tokens = GlueJobHook.LogContinuationTokens()
hook.print_job_logs("name", "run", tokens) # should not error

assert tokens.output_stream_continuation is None
assert tokens.error_stream_continuation is None
assert client_mock().get_paginator().paginate.call_count == 2
5 changes: 1 addition & 4 deletions tests/providers/amazon/aws/operators/test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,7 @@ def test_execute_with_verbose_logging(

mock_initialize_job.assert_called_once_with({}, {})
mock_print_job_logs.assert_called_once_with(
job_name=JOB_NAME,
run_id=JOB_RUN_ID,
job_failed=False,
next_token=None,
job_name=JOB_NAME, run_id=JOB_RUN_ID, continuation_tokens=mock.ANY
)
assert glue.job_name == JOB_NAME

Expand Down
6 changes: 2 additions & 4 deletions tests/providers/amazon/aws/sensors/test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ def test_poke_with_verbose_logging(self, mock_get_job_state, mock_conn, mock_pri
mock_print_job_logs.assert_called_once_with(
job_name=job_name,
run_id=job_run_id,
job_failed=False,
next_token=ANY,
continuation_tokens=ANY,
)

@mock.patch.object(GlueJobHook, "print_job_logs")
Expand Down Expand Up @@ -111,8 +110,7 @@ def test_poke_false_with_verbose_logging(self, mock_get_job_state, mock_conn, mo
mock_print_job_logs.assert_called_once_with(
job_name=job_name,
run_id=job_run_id,
job_failed=False,
next_token=ANY,
continuation_tokens=ANY,
)

@mock.patch.object(GlueJobHook, "print_job_logs")
Expand Down
3 changes: 2 additions & 1 deletion tests/system/providers/amazon/aws/example_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,10 @@ def glue_cleanup(crawler_name: str, job_name: str, db_name: str) -> None:
job_name=glue_job_name,
# Job ID extracted from previous Glue Job Operator task
run_id=submit_glue_job.output,
verbose=True, # prints glue job logs in airflow logs
)
# [END howto_sensor_glue]
wait_for_job.poke_interval = 10
wait_for_job.poke_interval = 5

delete_bucket = S3DeleteBucketOperator(
task_id="delete_bucket",
Expand Down