Skip to content

Commit 29b9252

Browse files
committed
Upload a chunk at a time
1 parent a2c012a commit 29b9252

File tree

3 files changed

+214
-160
lines changed

3 files changed

+214
-160
lines changed

requirements-dev.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@ safety>=2.3.0
1818
bandit>=1.7.0
1919
pip-audit>=2.6.0
2020
debugpy>=1.8.0
21+
responses==0.25.8
2122

2223
# Testing web endpoints
2324
aiohttp-test-utils>=0.5.0
2425
sortedcontainers-stubs>=2.4.0
2526
kafka-python>=2.0.0
2627
sentry-protos==0.2.0
28+

src/launchpad/sentry_client.py

Lines changed: 121 additions & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@
44

55
import hashlib
66
import hmac
7+
import io
78
import json
89
import logging
910
import os
1011
import re
11-
import secrets
1212

1313
from pathlib import Path
1414
from typing import Any, Dict, NamedTuple, cast
@@ -21,6 +21,30 @@
2121
logger = logging.getLogger(__name__)
2222

2323

24+
def read_exactly(file: io.BytesIO, n: int) -> bytes:
25+
read = 0
26+
parts = []
27+
while read < n:
28+
part = file.read(n - read)
29+
if not part:
30+
break
31+
read += len(part)
32+
parts.append(part)
33+
return b"".join(parts)
34+
35+
36+
class Rpc0Auth(requests.auth.AuthBase):
37+
utf8_encoded_shared_secret: str
38+
39+
def __init__(self, shared_secret: str):
40+
self.utf8_encoded_shared_secret = shared_secret.encode("utf-8")
41+
42+
def __call__(self, r):
43+
signature = hmac.new(self.utf8_encoded_shared_secret, r.body, hashlib.sha256).hexdigest()
44+
r.headers.update({"Authorization": f"rpcsignature rpc0:{signature}"})
45+
return r
46+
47+
2448
class DownloadResult(NamedTuple):
2549
"""Result of artifact download operation."""
2650

@@ -67,13 +91,17 @@ def create_retry_session(max_retries: int = 3) -> requests.Session:
6791
class SentryClient:
6892
"""Client for authenticated API calls to the Sentry monolith."""
6993

70-
def __init__(self, base_url: str) -> None:
94+
base_url: str
95+
auth: Rpc0Auth
96+
97+
def __init__(self, base_url: str, shared_secret: str | None = None) -> None:
7198
self.base_url = base_url.rstrip("/")
72-
self.shared_secret = os.getenv("LAUNCHPAD_RPC_SHARED_SECRET")
73-
if not self.shared_secret:
99+
if not shared_secret:
100+
shared_secret = os.getenv("LAUNCHPAD_RPC_SHARED_SECRET")
101+
if not shared_secret:
74102
raise RuntimeError("LAUNCHPAD_RPC_SHARED_SECRET must be provided or set as environment variable")
75-
76103
self.session = create_retry_session()
104+
self.auth = Rpc0Auth(shared_secret)
77105

78106
def download_artifact_to_file(self, org: str, project: str, artifact_id: str, out) -> int:
79107
"""Download preprod artifact directly to a file-like object.
@@ -95,7 +123,7 @@ def download_artifact_to_file(self, org: str, project: str, artifact_id: str, ou
95123

96124
logger.debug(f"GET {url}")
97125

98-
response = self.session.get(url, headers=self._get_auth_headers(), timeout=120, stream=True)
126+
response = self.session.get(url, auth=self.auth, timeout=120, stream=True)
99127

100128
if response.status_code != 200:
101129
error_result = self._handle_error_response(response, "Download artifact")
@@ -134,7 +162,7 @@ def upload_size_analysis_file(
134162
max_retries: int = 3,
135163
) -> Dict[str, Any] | ErrorResult:
136164
"""Upload size analysis file with chunking following Rust sentry-cli pattern."""
137-
return self._upload_file_with_assembly(
165+
return self._upload_path_with_assembly(
138166
org=org,
139167
project=project,
140168
artifact_id=artifact_id,
@@ -148,40 +176,66 @@ def upload_installable_app(
148176
org: str,
149177
project: str,
150178
artifact_id: str,
151-
file_path: str,
179+
file: str | io.BytesIO,
152180
max_retries: int = 3,
153181
) -> Dict[str, Any] | ErrorResult:
154182
"""Upload installable app file with chunking following Rust sentry-cli pattern."""
155-
return self._upload_file_with_assembly(
156-
org=org,
157-
project=project,
158-
artifact_id=artifact_id,
159-
file_path=file_path,
160-
max_retries=max_retries,
161-
assemble_type="installable_app",
162-
)
163183

164-
def _upload_file_with_assembly(
184+
if isinstance(file, io.IOBase):
185+
return self._upload_file_with_assembly(
186+
org=org,
187+
project=project,
188+
artifact_id=artifact_id,
189+
file=file,
190+
max_retries=max_retries,
191+
assemble_type="installable_app",
192+
)
193+
else:
194+
# We should try remove this branch and only take file-like
195+
# objects for this API.
196+
return self._upload_path_with_assembly(
197+
org=org,
198+
project=project,
199+
artifact_id=artifact_id,
200+
file_path=file,
201+
max_retries=max_retries,
202+
assemble_type="installable_app",
203+
)
204+
205+
def _upload_path_with_assembly(
165206
self,
166207
org: str,
167208
project: str,
168209
artifact_id: str,
169-
file_path: str,
210+
file_path: str | Path,
170211
max_retries: int,
171212
assemble_type: str,
172213
) -> Dict[str, Any] | ErrorResult:
173-
"""Upload file with chunking and assembly following Rust sentry-cli pattern."""
174-
# Basic path validation
214+
logger.info(f"Uploading {file_path}")
215+
175216
path = Path(file_path).resolve()
176217
if not path.exists():
177218
raise FileNotFoundError(f"File not found: {file_path}")
178-
if ".." in file_path:
219+
# TODO(EME-217): It looks like this is trying to prevent
220+
# directory escapes, but it's both too strong (files could be
221+
# called "foo..apk") and not sufficent (doesn't handle strange
222+
# unicode, absolute paths, etc).
223+
if ".." in str(file_path):
179224
raise ValueError(f"Invalid file path: {file_path}")
180225

181226
with open(path, "rb") as f:
182-
content = f.read()
227+
return self._upload_file_with_assembly(org, project, artifact_id, f, max_retries, assemble_type)
183228

184-
logger.info(f"Uploading {file_path} ({len(content)} bytes, {len(content) / 1024 / 1024:.2f} MB)")
229+
def _upload_file_with_assembly(
230+
self,
231+
org: str,
232+
project: str,
233+
artifact_id: str,
234+
file: io.BytesIO,
235+
max_retries: int,
236+
assemble_type: str,
237+
) -> Dict[str, Any] | ErrorResult:
238+
"""Upload file with chunking and assembly following Rust sentry-cli pattern."""
185239

186240
# Step 1: Get chunk upload options from server
187241
logger.debug("Getting chunk upload options...")
@@ -194,20 +248,33 @@ def _upload_file_with_assembly(
194248

195249
chunk_options = options_result.get("chunking", {})
196250
chunk_size = chunk_options.get("chunk_size", 8 * 1024 * 1024) # fallback to 8MB
251+
252+
# TODO(EME-216): max_chunks is unused? Should we be using it?
197253
max_chunks = chunk_options.get("max_chunks", 64)
198254

199255
logger.debug(f"Server chunk config: size={chunk_size}, max_chunks={max_chunks}")
200256

201-
# Step 2: Create chunks and calculate checksums
202-
total_checksum = hashlib.sha1(content).hexdigest()
203-
chunks = self._create_chunks(content, chunk_size)
204-
chunk_checksums = [c["checksum"] for c in chunks]
205-
206-
logger.info(f"File prepared: SHA1={total_checksum}, chunks={len(chunks)}")
207-
208-
# Step 3: Upload ALL chunks first (following Rust pattern)
209-
logger.info(f"Uploading all {len(chunks)} chunks...")
210-
self._upload_chunks(org, chunks, chunk_checksums)
257+
# Step 2: Calculate checksums
258+
total_checksum = hashlib.file_digest(file, "sha1").hexdigest()
259+
size = file.tell()
260+
file.seek(0)
261+
262+
checksums = []
263+
while True:
264+
chunk = read_exactly(file, chunk_size)
265+
if not chunk:
266+
break
267+
checksums.append(hashlib.sha1(chunk).hexdigest())
268+
file.seek(0)
269+
logger.info(f"File prepared: SHA1={total_checksum} size={size} chunks={len(checksums)}")
270+
271+
# Step 3: Upload chunks
272+
logger.info(f"Uploading all {len(checksums)} chunks...")
273+
while True:
274+
chunk = read_exactly(file, chunk_size)
275+
if not chunk:
276+
break
277+
self.upload_chunk(org, chunk)
211278

212279
# Step 4: Assemble with retry loop
213280
for attempt in range(max_retries):
@@ -218,7 +285,7 @@ def _upload_file_with_assembly(
218285
project=project,
219286
artifact_id=artifact_id,
220287
checksum=total_checksum,
221-
chunks=chunk_checksums,
288+
chunks=checksums,
222289
assemble_type=assemble_type,
223290
)
224291

@@ -235,29 +302,18 @@ def _upload_file_with_assembly(
235302
return result
236303
elif state == "not_found":
237304
missing = result.get("missingChunks", [])
238-
if not missing:
305+
if missing:
306+
logger.warning(f"{len(missing)} chunks failed to upload")
307+
else:
239308
logger.warning("Assembly failed but no missing chunks reported")
240-
return result
241-
242-
logger.info(f"Re-uploading {len(missing)} missing chunks")
243-
if not self._upload_chunks(org, chunks, missing):
244-
logger.warning(f"Some chunks failed to re-upload on attempt {attempt + 1}")
309+
return result
245310
else:
246311
logger.warning(f"Assembly attempt {attempt + 1} failed: {result}")
247312
if attempt == max_retries - 1: # Last attempt
248313
return result
249314

250315
return ErrorResult(error=f"Failed after {max_retries} attempts", status_code=500)
251316

252-
def _get_auth_headers(self, body: bytes | None = None) -> Dict[str, str]:
253-
"""Get authentication headers for a request."""
254-
body = body or b""
255-
signature = hmac.new(self.shared_secret.encode("utf-8"), body, hashlib.sha256).hexdigest()
256-
return {
257-
"Authorization": f"rpcsignature rpc0:{signature}",
258-
"Content-Type": "application/json",
259-
}
260-
261317
def _build_url(self, endpoint: str) -> str:
262318
"""Build full URL from endpoint."""
263319
return f"{self.base_url}{endpoint}"
@@ -290,7 +346,7 @@ def _make_json_request(
290346
method=method,
291347
url=url,
292348
data=body or None,
293-
headers=self._get_auth_headers(body),
349+
auth=self.auth,
294350
timeout=timeout,
295351
)
296352

@@ -304,73 +360,26 @@ def _get_chunk_upload_options(self, org: str) -> Dict[str, Any] | ErrorResult:
304360
endpoint = f"/api/0/organizations/{org}/chunk-upload/"
305361
return self._make_json_request("GET", endpoint, operation="Get chunk options")
306362

307-
def _create_chunks(self, content: bytes, chunk_size: int) -> list[Dict[str, Any]]:
308-
"""Split content into chunks with checksums."""
309-
chunks = []
310-
for i in range(0, len(content), chunk_size):
311-
data = content[i : i + chunk_size]
312-
chunks.append(
313-
{
314-
"checksum": hashlib.sha1(data).hexdigest(),
315-
"data": data,
316-
"size": len(data),
317-
}
318-
)
319-
320-
logger.debug(f"Created {len(chunks)} chunks")
321-
322-
# Show individual chunk details (limit for large files, similar to Rust version)
323-
max_chunks_to_show = 5
324-
for i, chunk in enumerate(chunks[:max_chunks_to_show]):
325-
logger.debug(f" Chunk {i + 1}: {chunk['size']} bytes (SHA1: {chunk['checksum']})")
326-
if len(chunks) > max_chunks_to_show:
327-
logger.debug(f" ... and {len(chunks) - max_chunks_to_show} more chunks")
328-
329-
return chunks
330-
331-
def _upload_chunks(self, org: str, chunks: list[Dict[str, Any]], target_checksums: list[str]) -> bool:
332-
"""Upload chunks by checksum list."""
333-
chunk_map = {c["checksum"]: c for c in chunks}
334-
success = 0
335-
336-
for checksum in target_checksums:
337-
if checksum not in chunk_map:
338-
logger.error(f"Chunk not found in map: {checksum}")
339-
continue
340-
341-
if self._upload_chunk(org, chunk_map[checksum]):
342-
success += 1
343-
logger.debug(f"Uploaded chunk {success}/{len(target_checksums)}: {checksum}")
344-
345-
logger.debug(f"Uploaded {success}/{len(target_checksums)} chunks successfully")
346-
return success == len(target_checksums)
347-
348-
def _upload_chunk(self, org: str, chunk: Dict[str, Any]) -> bool:
363+
def upload_chunk(self, org: str, chunk: bytes) -> bool:
349364
"""Upload single chunk."""
350-
url = f"{self.base_url}/api/0/organizations/{org}/chunk-upload/"
351-
boundary = f"----FormBoundary{secrets.token_hex(16)}"
352365

353-
# Create multipart body
354-
body = self._create_multipart_body(boundary, chunk["checksum"], chunk["data"])
366+
checksum = hashlib.sha1(chunk).hexdigest()
355367

356-
# For multipart, we need custom headers
357-
signature = hmac.new(self.shared_secret.encode("utf-8"), body, hashlib.sha256).hexdigest()
358-
headers = {
359-
"Authorization": f"rpcsignature rpc0:{signature}",
360-
"Content-Type": f"multipart/form-data; boundary={boundary}",
361-
}
362-
363-
try:
364-
response = self.session.post(url, data=body, headers=headers, timeout=60)
368+
r = requests.Request(
369+
"POST",
370+
f"{self.base_url}/api/0/organizations/{org}/chunk-upload/",
371+
files={
372+
"file": (checksum, chunk, "application/octet-stream"),
373+
},
374+
auth=self.auth,
375+
)
365376

366-
success = response.status_code in [200, 201, 409] # 409 = already exists
367-
if not success:
368-
logger.warning(f"Chunk upload failed: {response.status_code}")
369-
return success
377+
response = self.session.send(r.prepare(), timeout=60)
370378

371-
except Exception as e:
372-
logger.error(f"Chunk upload error: {e}")
373-
return False
379+
success = response.status_code in [200, 201, 409] # 409 = already exists
380+
if not success:
381+
logger.warning(f"Chunk upload failed: {response.status_code}")
382+
return success
374383

375384
def _assemble_file(
376385
self,
@@ -398,24 +407,6 @@ def _assemble_file(
398407
endpoint = f"/api/0/internal/{org}/{project}/files/preprodartifacts/{artifact_id}/assemble-generic/"
399408
return self._make_json_request("POST", endpoint, data, operation="Assemble request")
400409

401-
def _create_multipart_body(self, boundary: str, filename: str, data: bytes) -> bytes:
402-
"""Create multipart/form-data body."""
403-
lines = [
404-
f"--{boundary}",
405-
f'Content-Disposition: form-data; name="file"; filename="{filename}"',
406-
"Content-Type: application/octet-stream",
407-
"",
408-
]
409-
410-
parts = [
411-
"\r\n".join(lines).encode("utf-8"),
412-
b"\r\n",
413-
data,
414-
f"\r\n--{boundary}--\r\n".encode("utf-8"),
415-
]
416-
417-
return b"".join(parts)
418-
419410

420411
def categorize_http_error(error_result: ErrorResult | Dict[str, Any]) -> tuple[str, str]:
421412
"""

0 commit comments

Comments
 (0)