Skip to content

Commit

Permalink
Split when 413 happens
Browse files Browse the repository at this point in the history
  • Loading branch information
schani committed Feb 17, 2025
1 parent 48251d9 commit a355d87
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,25 +123,39 @@ def __init__(self):

def _flush_buffer(self):
rows = self.buffer
if len(rows) == 0:
if not rows:
return
self.buffer = []

path = f"stashes/{self.stash_id}/{self.stash_serial}"
logger.debug(f"Flushing {len(rows)} rows to {path} ...")
r = requests.post(
self.url(path),
headers=self.headers(),
json=rows
)
try:
r.raise_for_status()
except Exception as e:
# FIXME: if this is a 413, make the batch size smaller and retry
raise Exception(f"Failed to post rows batch to {path} : {r.text}") from e # nopep8

logger.info(f"Successfully posted {len(rows)} rows to {path}") # nopep8
self.stash_serial += 1
start_idx = 0
chunk_size = len(rows)
stash_serial = self.stash_serial
while start_idx < len(rows):
chunk = rows[start_idx : start_idx + chunk_size]
path = f"stashes/{self.stash_id}/{stash_serial}"
logger.debug(f"Flushing {len(chunk)} rows to {path} ...")

r = requests.post(
self.url(path),
headers=self.headers(),
json=chunk
)
try:
r.raise_for_status()
except requests.HTTPError as e:
if r.status_code == 413 and chunk_size > 1:
chunk_size = max(1, chunk_size // 2)
logger.info(f"413 Payload Too Large. Reducing chunk size to {chunk_size} and retrying.")
continue
raise Exception(f"Failed to post rows batch to {path} : {r.text}") from e

logger.info(f"Successfully posted {len(chunk)} rows to {path}")
stash_serial += 1
start_idx += chunk_size

# We only set the stash serial if the flush of all rows was
# successful, otherwise we could end up with duplicate rows.
self.stash_serial = stash_serial

def add_row(self, row: BigTableRow) -> None:
self.buffer.append(row)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from unittest.mock import patch
import uuid
from requests.exceptions import HTTPError

import json

class TestGlideBigTableRestStrategy(unittest.TestCase):
api_host = "https://test-api-host.com"
Expand Down Expand Up @@ -69,6 +69,39 @@ def test_add_rows_413(self, mock_post):

self.assertIn("Failed to post rows batch", str(context.exception))

@patch.object(requests, "post")
def test_split_batches_on_413(self, mock_post):
threshold_bytes = 200
did_fail = False

def side_effect(*args, **kwargs):
payload = kwargs.get("json", [])
payload_size = len(json.dumps(payload).encode("utf-8"))
mock_response = requests.Response()
if payload_size > threshold_bytes:
mock_response.status_code = 413
mock_response._content = b"Payload Too Large"
nonlocal did_fail
did_fail = True
else:
mock_response.status_code = 200
mock_response._content = b"OK"
return mock_response

mock_post.side_effect = side_effect

# Force small flushes to test smaller batches
self.gbt.batch_size = 2

# Attempt to add rows that will exceed threshold if posted all at once
large_number_of_rows = [
{"test-str": "foo " * 30, "test-num": i} # repeated strings for size
for i in range(10)
]
self.gbt.add_rows(large_number_of_rows)

self.assertTrue(did_fail)

def test_commit_with_pre_existing_table(self):
with patch.object(requests, "post") as mock_post:
TEST_ROW_COUNT = self.batch_size
Expand Down

0 comments on commit a355d87

Please sign in to comment.