Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ safety>=2.3.0
bandit>=1.7.0
pip-audit>=2.6.0
debugpy>=1.8.0
responses==0.25.8

# Testing web endpoints
aiohttp-test-utils>=0.5.0
sortedcontainers-stubs>=2.4.0
kafka-python>=2.0.0
sentry-protos==0.2.0

252 changes: 122 additions & 130 deletions src/launchpad/sentry_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,48 @@

import hashlib
import hmac
import io
import json
import logging
import os
import re
import secrets

from pathlib import Path
from typing import Any, Dict, NamedTuple, cast

import requests

from requests.adapters import HTTPAdapter
from requests.auth import AuthBase
from urllib3.util.retry import Retry

logger = logging.getLogger(__name__)


def read_exactly(file: io.BytesIO, n: int) -> bytes:
read = 0
parts = []
while read < n:
part = file.read(n - read)
if not part:
break
read += len(part)
parts.append(part)
return b"".join(parts)


class Rpc0Auth(AuthBase):
utf8_encoded_shared_secret: bytes

def __init__(self, shared_secret: str):
self.utf8_encoded_shared_secret = shared_secret.encode("utf-8")

def __call__(self, r):
signature = hmac.new(self.utf8_encoded_shared_secret, r.body, hashlib.sha256).hexdigest()
r.headers.update({"Authorization": f"rpcsignature rpc0:{signature}"})
return r


class DownloadResult(NamedTuple):
"""Result of artifact download operation."""

Expand Down Expand Up @@ -67,13 +92,17 @@ def create_retry_session(max_retries: int = 3) -> requests.Session:
class SentryClient:
"""Client for authenticated API calls to the Sentry monolith."""

def __init__(self, base_url: str) -> None:
base_url: str
auth: Rpc0Auth

def __init__(self, base_url: str, shared_secret: str | None = None) -> None:
self.base_url = base_url.rstrip("/")
self.shared_secret = os.getenv("LAUNCHPAD_RPC_SHARED_SECRET")
if not self.shared_secret:
if not shared_secret:
shared_secret = os.getenv("LAUNCHPAD_RPC_SHARED_SECRET")
if not shared_secret:
raise RuntimeError("LAUNCHPAD_RPC_SHARED_SECRET must be provided or set as environment variable")

self.session = create_retry_session()
self.auth = Rpc0Auth(shared_secret)

def download_artifact_to_file(self, org: str, project: str, artifact_id: str, out) -> int:
"""Download preprod artifact directly to a file-like object.
Expand All @@ -95,7 +124,7 @@ def download_artifact_to_file(self, org: str, project: str, artifact_id: str, ou

logger.debug(f"GET {url}")

response = self.session.get(url, headers=self._get_auth_headers(), timeout=120, stream=True)
response = self.session.get(url, auth=self.auth, timeout=120, stream=True)

if response.status_code != 200:
error_result = self._handle_error_response(response, "Download artifact")
Expand Down Expand Up @@ -134,7 +163,7 @@ def upload_size_analysis_file(
max_retries: int = 3,
) -> Dict[str, Any] | ErrorResult:
"""Upload size analysis file with chunking following Rust sentry-cli pattern."""
return self._upload_file_with_assembly(
return self._upload_path_with_assembly(
org=org,
project=project,
artifact_id=artifact_id,
Expand All @@ -148,40 +177,66 @@ def upload_installable_app(
org: str,
project: str,
artifact_id: str,
file_path: str,
file: str | io.BytesIO,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this intentional? i dont see any changes to service.py

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now only the tests use this but ideally I think it would be better to pass around file objects rather than paths. That way it's much harder to accidentally either fail to delete something (so leaking the disk space till the process dies) or deleting something too soon (and causing crashes).

I want to update the calling code to pass the arguments as files, then we can delete the 'file_path' handling (here + all of _upload_path_with_assembly) but not in this PR.

max_retries: int = 3,
) -> Dict[str, Any] | ErrorResult:
"""Upload installable app file with chunking following Rust sentry-cli pattern."""
return self._upload_file_with_assembly(
org=org,
project=project,
artifact_id=artifact_id,
file_path=file_path,
max_retries=max_retries,
assemble_type="installable_app",
)

def _upload_file_with_assembly(
if isinstance(file, io.IOBase):
return self._upload_file_with_assembly(
org=org,
project=project,
artifact_id=artifact_id,
file=file,
max_retries=max_retries,
assemble_type="installable_app",
)
else:
# We should try remove this branch and only take file-like
# objects for this API.
return self._upload_path_with_assembly(
org=org,
project=project,
artifact_id=artifact_id,
file_path=file,
max_retries=max_retries,
assemble_type="installable_app",
)

def _upload_path_with_assembly(
self,
org: str,
project: str,
artifact_id: str,
file_path: str,
file_path: str | Path,
max_retries: int,
assemble_type: str,
) -> Dict[str, Any] | ErrorResult:
"""Upload file with chunking and assembly following Rust sentry-cli pattern."""
# Basic path validation
logger.info(f"Uploading {file_path}")

path = Path(file_path).resolve()
if not path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
if ".." in file_path:
# TODO(EME-217): It looks like this is trying to prevent
# directory escapes, but it's both too strong (files could be
# called "foo..apk") and not sufficent (doesn't handle strange
# unicode, absolute paths, etc).
if ".." in str(file_path):
raise ValueError(f"Invalid file path: {file_path}")

with open(path, "rb") as f:
content = f.read()
return self._upload_file_with_assembly(org, project, artifact_id, f, max_retries, assemble_type)

logger.info(f"Uploading {file_path} ({len(content)} bytes, {len(content) / 1024 / 1024:.2f} MB)")
def _upload_file_with_assembly(
self,
org: str,
project: str,
artifact_id: str,
file: io.BytesIO,
max_retries: int,
assemble_type: str,
) -> Dict[str, Any] | ErrorResult:
"""Upload file with chunking and assembly following Rust sentry-cli pattern."""

# Step 1: Get chunk upload options from server
logger.debug("Getting chunk upload options...")
Expand All @@ -194,20 +249,33 @@ def _upload_file_with_assembly(

chunk_options = options_result.get("chunking", {})
chunk_size = chunk_options.get("chunk_size", 8 * 1024 * 1024) # fallback to 8MB

# TODO(EME-216): max_chunks is unused? Should we be using it?
max_chunks = chunk_options.get("max_chunks", 64)

logger.debug(f"Server chunk config: size={chunk_size}, max_chunks={max_chunks}")

# Step 2: Create chunks and calculate checksums
total_checksum = hashlib.sha1(content).hexdigest()
chunks = self._create_chunks(content, chunk_size)
chunk_checksums = [c["checksum"] for c in chunks]

logger.info(f"File prepared: SHA1={total_checksum}, chunks={len(chunks)}")

# Step 3: Upload ALL chunks first (following Rust pattern)
logger.info(f"Uploading all {len(chunks)} chunks...")
self._upload_chunks(org, chunks, chunk_checksums)
# Step 2: Calculate checksums
total_checksum = hashlib.file_digest(file, "sha1").hexdigest()
size = file.tell()
file.seek(0)

checksums = []
while True:
chunk = read_exactly(file, chunk_size)
if not chunk:
break
checksums.append(hashlib.sha1(chunk).hexdigest())
file.seek(0)
logger.info(f"File prepared: SHA1={total_checksum} size={size} chunks={len(checksums)}")

# Step 3: Upload chunks
logger.info(f"Uploading all {len(checksums)} chunks...")
while True:
chunk = read_exactly(file, chunk_size)
if not chunk:
break
self.upload_chunk(org, chunk)

# Step 4: Assemble with retry loop
for attempt in range(max_retries):
Expand All @@ -218,7 +286,7 @@ def _upload_file_with_assembly(
project=project,
artifact_id=artifact_id,
checksum=total_checksum,
chunks=chunk_checksums,
chunks=checksums,
assemble_type=assemble_type,
)

Expand All @@ -235,29 +303,18 @@ def _upload_file_with_assembly(
return result
elif state == "not_found":
missing = result.get("missingChunks", [])
if not missing:
if missing:
logger.warning(f"{len(missing)} chunks failed to upload")
else:
logger.warning("Assembly failed but no missing chunks reported")
return result

logger.info(f"Re-uploading {len(missing)} missing chunks")
if not self._upload_chunks(org, chunks, missing):
logger.warning(f"Some chunks failed to re-upload on attempt {attempt + 1}")
return result
Comment on lines +306 to +310
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assembly process no longer attempts to re-upload missing chunks when they're reported. This removes an important recovery mechanism from the original implementation.

In the previous code, when chunks were missing, it would:

logger.info(f"Re-uploading {len(missing)} missing chunks")
if not self._upload_chunks(org, chunks, missing):
    logger.warning(f"Some chunks failed to re-upload on attempt {attempt + 1}")

The new implementation simply logs a warning and returns the error result immediately. This could lead to failed uploads that might have succeeded with the retry mechanism. Consider restoring the chunk re-upload functionality to maintain the same resilience as the original implementation.

Suggested change
if missing:
logger.warning(f"{len(missing)} chunks failed to upload")
else:
logger.warning("Assembly failed but no missing chunks reported")
return result
logger.info(f"Re-uploading {len(missing)} missing chunks")
if not self._upload_chunks(org, chunks, missing):
logger.warning(f"Some chunks failed to re-upload on attempt {attempt + 1}")
return result
if missing:
logger.info(f"Re-uploading {len(missing)} missing chunks")
if not self._upload_chunks(org, chunks, missing):
logger.warning(f"{len(missing)} chunks failed to re-upload")
else:
logger.warning("Assembly failed but no missing chunks reported")
return result

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to put this in loop so it starts with an assemble then only uploads the missing chunks. I'll do it in a follow up if that works for you.

else:
logger.warning(f"Assembly attempt {attempt + 1} failed: {result}")
if attempt == max_retries - 1: # Last attempt
return result

return ErrorResult(error=f"Failed after {max_retries} attempts", status_code=500)

def _get_auth_headers(self, body: bytes | None = None) -> Dict[str, str]:
"""Get authentication headers for a request."""
body = body or b""
signature = hmac.new(self.shared_secret.encode("utf-8"), body, hashlib.sha256).hexdigest()
return {
"Authorization": f"rpcsignature rpc0:{signature}",
"Content-Type": "application/json",
}

def _build_url(self, endpoint: str) -> str:
"""Build full URL from endpoint."""
return f"{self.base_url}{endpoint}"
Expand Down Expand Up @@ -290,7 +347,7 @@ def _make_json_request(
method=method,
url=url,
data=body or None,
headers=self._get_auth_headers(body),
auth=self.auth,
timeout=timeout,
)

Expand All @@ -304,73 +361,26 @@ def _get_chunk_upload_options(self, org: str) -> Dict[str, Any] | ErrorResult:
endpoint = f"/api/0/organizations/{org}/chunk-upload/"
return self._make_json_request("GET", endpoint, operation="Get chunk options")

def _create_chunks(self, content: bytes, chunk_size: int) -> list[Dict[str, Any]]:
"""Split content into chunks with checksums."""
chunks = []
for i in range(0, len(content), chunk_size):
data = content[i : i + chunk_size]
chunks.append(
{
"checksum": hashlib.sha1(data).hexdigest(),
"data": data,
"size": len(data),
}
)

logger.debug(f"Created {len(chunks)} chunks")

# Show individual chunk details (limit for large files, similar to Rust version)
max_chunks_to_show = 5
for i, chunk in enumerate(chunks[:max_chunks_to_show]):
logger.debug(f" Chunk {i + 1}: {chunk['size']} bytes (SHA1: {chunk['checksum']})")
if len(chunks) > max_chunks_to_show:
logger.debug(f" ... and {len(chunks) - max_chunks_to_show} more chunks")

return chunks

def _upload_chunks(self, org: str, chunks: list[Dict[str, Any]], target_checksums: list[str]) -> bool:
"""Upload chunks by checksum list."""
chunk_map = {c["checksum"]: c for c in chunks}
success = 0

for checksum in target_checksums:
if checksum not in chunk_map:
logger.error(f"Chunk not found in map: {checksum}")
continue

if self._upload_chunk(org, chunk_map[checksum]):
success += 1
logger.debug(f"Uploaded chunk {success}/{len(target_checksums)}: {checksum}")

logger.debug(f"Uploaded {success}/{len(target_checksums)} chunks successfully")
return success == len(target_checksums)

def _upload_chunk(self, org: str, chunk: Dict[str, Any]) -> bool:
def upload_chunk(self, org: str, chunk: bytes) -> bool:
"""Upload single chunk."""
url = f"{self.base_url}/api/0/organizations/{org}/chunk-upload/"
boundary = f"----FormBoundary{secrets.token_hex(16)}"

# Create multipart body
body = self._create_multipart_body(boundary, chunk["checksum"], chunk["data"])
checksum = hashlib.sha1(chunk).hexdigest()

# For multipart, we need custom headers
signature = hmac.new(self.shared_secret.encode("utf-8"), body, hashlib.sha256).hexdigest()
headers = {
"Authorization": f"rpcsignature rpc0:{signature}",
"Content-Type": f"multipart/form-data; boundary={boundary}",
}

try:
response = self.session.post(url, data=body, headers=headers, timeout=60)
r = requests.Request(
"POST",
f"{self.base_url}/api/0/organizations/{org}/chunk-upload/",
files={
"file": (checksum, chunk, "application/octet-stream"),
},
auth=self.auth,
)

success = response.status_code in [200, 201, 409] # 409 = already exists
if not success:
logger.warning(f"Chunk upload failed: {response.status_code}")
return success
response = self.session.send(r.prepare(), timeout=60)

except Exception as e:
logger.error(f"Chunk upload error: {e}")
return False
success = response.status_code in [200, 201, 409] # 409 = already exists
if not success:
logger.warning(f"Chunk upload failed: {response.status_code}")
return success

def _assemble_file(
self,
Expand Down Expand Up @@ -398,24 +408,6 @@ def _assemble_file(
endpoint = f"/api/0/internal/{org}/{project}/files/preprodartifacts/{artifact_id}/assemble-generic/"
return self._make_json_request("POST", endpoint, data, operation="Assemble request")

def _create_multipart_body(self, boundary: str, filename: str, data: bytes) -> bytes:
"""Create multipart/form-data body."""
lines = [
f"--{boundary}",
f'Content-Disposition: form-data; name="file"; filename="{filename}"',
"Content-Type: application/octet-stream",
"",
]

parts = [
"\r\n".join(lines).encode("utf-8"),
b"\r\n",
data,
f"\r\n--{boundary}--\r\n".encode("utf-8"),
]

return b"".join(parts)


def categorize_http_error(error_result: ErrorResult | Dict[str, Any]) -> tuple[str, str]:
"""
Expand Down
Loading
Loading