Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,9 @@ def print_job_logs(
"""
log_client = self.logs_hook.get_conn()
paginator = log_client.get_paginator("filter_log_events")
job_run = self.conn.get_job_run(JobName=job_name, RunId=run_id)["JobRun"]
# StartTime needs to be an int and is Epoch time in milliseconds
start_time = int(job_run["StartedOn"].timestamp() * 1000)

def display_logs_from(log_group: str, continuation_token: str | None) -> str | None:
"""Mutualize iteration over the 2 different log streams glue jobs write to."""
Expand All @@ -245,6 +248,7 @@ def display_logs_from(log_group: str, continuation_token: str | None) -> str | N
for response in paginator.paginate(
logGroupName=log_group,
logStreamNames=[run_id],
startTime=start_time,
PaginationConfig={"StartingToken": continuation_token},
):
fetched_logs.extend([event["message"] for event in response["events"]])
Expand All @@ -270,7 +274,7 @@ def display_logs_from(log_group: str, continuation_token: str | None) -> str | N
self.log.info("No new log from the Glue Job in %s", log_group)
return next_token

log_group_prefix = self.conn.get_job_run(JobName=job_name, RunId=run_id)["JobRun"]["LogGroupName"]
log_group_prefix = job_run["LogGroupName"]
log_group_default = f"{log_group_prefix}/{DEFAULT_LOG_SUFFIX}"
log_group_error = f"{log_group_prefix}/{ERROR_LOG_SUFFIX}"
# one would think that the error log group would contain only errors, but it actually contains
Expand Down