Skip to content

Commit

Permalink
Fix line_num in parsed_log_stream, dedupe logic
Browse files Browse the repository at this point in the history
  • Loading branch information
jason810496 committed Dec 21, 2024
1 parent c752e90 commit afa5b1b
Showing 1 changed file with 17 additions and 12 deletions.
29 changes: 17 additions & 12 deletions airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@

CHUNK_SIZE = 1024 * 1024 * 5 # 5MB
DEFAULT_SORT_DATETIME = pendulum.datetime(2000, 1, 1)
SORT_KEY_OFFSET = 10000000
HEAP_DUMP_SIZE = 500000
HALF_HEAP_DUMP_SIZE = HEAP_DUMP_SIZE // 2

Expand Down Expand Up @@ -126,21 +127,24 @@ def _parse_timestamp(line: str):

def _get_parsed_log_stream(file_path: Path) -> _ParsedLogStreamType:
with open(file_path) as f:
line_num = 0 # line number for each log line
for file_chunk in iter(partial(f.read, CHUNK_SIZE), b""):
if not file_chunk:
break
# parse log lines
lines = file_chunk.splitlines()
timestamp = None
next_timestamp = None
for idx, line in enumerate(lines):
for line in lines:
if line:
with suppress(Exception):
# next_timestamp unchanged if line can't be parsed
next_timestamp = _parse_timestamp(line)
if next_timestamp:
timestamp = next_timestamp
yield timestamp, idx, line

yield timestamp, line_num, line
line_num += 1


def _sort_key(timestamp: pendulum.DateTime | None, line_num: int) -> int:
Expand All @@ -151,7 +155,7 @@ def _sort_key(timestamp: pendulum.DateTime | None, line_num: int) -> int:
:param line_num: line number of the log line
:return: a integer as sort key to avoid overhead of memory usage
"""
return (timestamp or DEFAULT_SORT_DATETIME).int_timestamp * 10000000 + line_num
return int((timestamp or DEFAULT_SORT_DATETIME).timestamp() * 1000) * SORT_KEY_OFFSET + line_num


def _add_log_from_parsed_log_streams_to_heap(
Expand All @@ -167,9 +171,6 @@ def _add_log_from_parsed_log_streams_to_heap(
:param parsed_log_streams: list of parsed log streams
"""
for log_stream in parsed_log_streams:
if log_stream is None:
parsed_log_streams.remove(log_stream)
continue
record: _ParsedLogRecordType | None = next(log_stream, None)
if record is None:
parsed_log_streams.remove(log_stream)
Expand Down Expand Up @@ -218,15 +219,19 @@ def _interleave_logs(*parsed_log_streams: _ParsedLogStreamType) -> Generator[str
if len(heap) >= HEAP_DUMP_SIZE:
for _ in range(HALF_HEAP_DUMP_SIZE):
_, line = heapq.heappop(heap)
if line != last: # dedupe
yield line
if line == last: # dedupe
last = line
continue
yield line
last = line
continue

# yield remaining records
for _, line in heap:
if line != last: # dedupe
yield line
for _ in range(len(heap)):
_, line = heapq.heappop(heap)
if line == last: # dedupe
last = line
continue
yield line
last = line
# free memory
del heap
Expand Down

0 comments on commit afa5b1b

Please sign in to comment.