Skip to content

Commit

Permalink
fix: send chunk_size instead of manually handling it (#459)
Browse files Browse the repository at this point in the history
* fix: send chunk_size instead of manually handling it

* test: update file tests
  • Loading branch information
Ian2012 authored Oct 15, 2024
1 parent b7e0c4b commit 1be477a
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,14 +179,13 @@ def _get_raw_log_size():
return os.path.getsize(tracking_log_path)


def _get_raw_log_stream(_, start_bytes, end_bytes):
def _get_raw_log_stream(_, start_bytes, chunk_size):
"""
Return raw event json parsed from current fixtures
"""
tracking_log_path = _get_tracking_log_file_path()
with open(tracking_log_path, "rb") as current:
current.seek(start_bytes)
yield current.read(end_bytes - start_bytes)
yield current.read()


@pytest.mark.parametrize("command_opts", command_options())
Expand Down Expand Up @@ -421,7 +420,7 @@ def test_get_chunks():
fake_source.download_object_range_as_stream.return_value = "abc"

# Check that we got the expected return value
assert _get_chunks(fake_source, "", 0, 1) == "abc"
assert _get_chunks(fake_source, "") == "abc"
# Check that we broke out of the retry loop as expected
assert fake_source.download_object_range_as_stream.call_count == 1

Expand All @@ -431,7 +430,7 @@ def test_get_chunks():
# Speed up our test, don't wait for the sleep
with patch("event_routing_backends.management.commands.transform_tracking_logs.sleep"):
with pytest.raises(Exception) as e:
_get_chunks(fake_source_err, "", 0, 1)
_get_chunks(fake_source_err, "")

# Make sure we're getting the error we expect
assert "boom" in str(e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
CHUNK_SIZE = 1024 * 1024 * 2


def _get_chunks(source, file, start_byte, end_byte):
def _get_chunks(source, file):
"""
Fetch a chunk from the upstream source, retry 3 times if necessary.
Expand All @@ -35,8 +35,8 @@ def _get_chunks(source, file, start_byte, end_byte):
try:
chunks = source.download_object_range_as_stream(
file,
start_bytes=start_byte,
end_bytes=end_byte
start_bytes=0,
chunk_size=CHUNK_SIZE
)
break
# Catching all exceptions here because there's no telling what all
Expand Down Expand Up @@ -72,29 +72,22 @@ def transform_tracking_logs(
# Download the file as a stream of characters to save on memory
print(f"Streaming file {file}...")

last_successful_byte = 0
line = ""

while last_successful_byte < int(file.size):
end_byte = last_successful_byte + CHUNK_SIZE
chunks = _get_chunks(source, file)

end_byte = min(end_byte, file.size)
for chunk in chunks:
chunk = chunk.decode('utf-8')

chunks = _get_chunks(source, file, last_successful_byte, end_byte)
# Loop through this chunk, if we find a newline it's time to process
# otherwise just keep appending.
for char in chunk:
if char == "\n" and line:
sender.transform_and_queue(line)
line = ""
else:
line += char

for chunk in chunks:
chunk = chunk.decode('utf-8')

# Loop through this chunk, if we find a newline it's time to process
# otherwise just keep appending.
for char in chunk:
if char == "\n" and line:
sender.transform_and_queue(line)
line = ""
else:
line += char

last_successful_byte = end_byte
# Sometimes the file doesn't end with a newline, we try to use
# any remaining bytes as a final line.
if line:
Expand Down

0 comments on commit 1be477a

Please sign in to comment.