diff --git a/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py b/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py index 80c05feca92c3..d025652d0a848 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/glue.py @@ -236,6 +236,9 @@ def print_job_logs( """ log_client = self.logs_hook.get_conn() paginator = log_client.get_paginator("filter_log_events") + job_run = self.conn.get_job_run(JobName=job_name, RunId=run_id)["JobRun"] + # StartTime needs to be an int and is Epoch time in milliseconds + start_time = int(job_run["StartedOn"].timestamp() * 1000) def display_logs_from(log_group: str, continuation_token: str | None) -> str | None: """Mutualize iteration over the 2 different log streams glue jobs write to.""" @@ -245,6 +248,7 @@ def display_logs_from(log_group: str, continuation_token: str | None) -> str | N for response in paginator.paginate( logGroupName=log_group, logStreamNames=[run_id], + startTime=start_time, PaginationConfig={"StartingToken": continuation_token}, ): fetched_logs.extend([event["message"] for event in response["events"]]) @@ -270,7 +274,7 @@ def display_logs_from(log_group: str, continuation_token: str | None) -> str | N self.log.info("No new log from the Glue Job in %s", log_group) return next_token - log_group_prefix = self.conn.get_job_run(JobName=job_name, RunId=run_id)["JobRun"]["LogGroupName"] + log_group_prefix = job_run["LogGroupName"] log_group_default = f"{log_group_prefix}/{DEFAULT_LOG_SUFFIX}" log_group_error = f"{log_group_prefix}/{ERROR_LOG_SUFFIX}" # one would think that the error log group would contain only errors, but it actually contains