Skip to content

Add async task background worker #4591

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 22 commits into
base: srothh/worker-class-hierarchy
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
63b1f24
ref(transport): Added shared sync/async transport superclass and crea…
srothh Jul 11, 2025
666ff3a
ref(transport) Removed Todo and reverted class name change
srothh Jul 11, 2025
748764e
test(transport): Add test for HTTP error status handling
srothh Jul 11, 2025
ee6dbee
test(transport): Restore accidentally removed comments
srothh Jul 11, 2025
19405fd
ref(transport) Refactor class names to reflect previous functionality
srothh Jul 14, 2025
3736c03
ref(transport): Add flush_async in the Transport abc
srothh Jul 17, 2025
3607d44
ref(transport): Move flush_async from ABC
srothh Jul 17, 2025
0ba5a83
ref(transport): add async type annotations to HTTPTransportCore
srothh Jul 23, 2025
9bb628e
ref(transport): Add abstract base class for worker implementation
srothh Jul 14, 2025
a81487e
ref(transport): Add _create_worker factory method to Transport
srothh Jul 14, 2025
8960e6f
ref(worker): Add flush_async method to Worker ABC
srothh Jul 17, 2025
0f7937b
ref(worker): Move worker flush_async from Worker ABC
srothh Jul 17, 2025
268ea1a
ref(worker): Amend function signature for coroutines
srothh Jul 17, 2025
b3c05cc
feat(transport): Add an async task-based worker for transport
srothh Jul 17, 2025
fb0ad18
ref(worker): Make worker work with new ABC interface
srothh Jul 17, 2025
7edbbaf
fix(worker): Check if callbacks from worker queue are coroutines or f…
srothh Jul 17, 2025
0f63d24
ref(worker): Amend return type of submit and flush to accomodate for …
srothh Jul 17, 2025
2430e2e
ref(worker): Add type parameters for AsyncWorker variables
srothh Jul 17, 2025
96fcd85
ref(worker): Remove loop upon killing worker
srothh Jul 17, 2025
331e40b
feat(worker): Enable concurrent callbacks on async task worker
srothh Jul 18, 2025
5f67485
fix(worker): Modify kill behaviour to mirror threaded worker
srothh Jul 18, 2025
97c5e3d
ref(worker): add proper type annotation to worker task list
srothh Jul 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions sentry_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
)
from sentry_sdk.serializer import serialize
from sentry_sdk.tracing import trace
from sentry_sdk.transport import BaseHttpTransport, make_transport
from sentry_sdk.transport import HttpTransportCore, make_transport
from sentry_sdk.consts import (
SPANDATA,
DEFAULT_MAX_VALUE_LENGTH,
Expand Down Expand Up @@ -403,7 +403,7 @@ def _capture_envelope(envelope: Envelope) -> None:
self.monitor
or self.log_batcher
or has_profiling_enabled(self.options)
or isinstance(self.transport, BaseHttpTransport)
or isinstance(self.transport, HttpTransportCore)
):
# If we have anything on that could spawn a background thread, we
# need to check if it's safe to use them.
Expand Down
158 changes: 97 additions & 61 deletions sentry_sdk/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

from sentry_sdk.consts import EndpointType
from sentry_sdk.utils import Dsn, logger, capture_internal_exceptions
from sentry_sdk.worker import BackgroundWorker
from sentry_sdk.worker import BackgroundWorker, Worker
from sentry_sdk.envelope import Envelope, Item, PayloadRef

from typing import TYPE_CHECKING
Expand Down Expand Up @@ -162,7 +162,7 @@ def _parse_rate_limits(
continue


class BaseHttpTransport(Transport):
class HttpTransportCore(Transport):
"""The base HTTP transport."""

TIMEOUT = 30 # seconds
Expand All @@ -173,7 +173,7 @@ def __init__(self: Self, options: Dict[str, Any]) -> None:
Transport.__init__(self, options)
assert self.parsed_dsn is not None
self.options: Dict[str, Any] = options
self._worker = BackgroundWorker(queue_size=options["transport_queue_size"])
self._worker = self._create_worker(options)
self._auth = self.parsed_dsn.to_auth("sentry.python/%s" % VERSION)
self._disabled_until: Dict[Optional[str], datetime] = {}
# We only use this Retry() class for the `get_retry_after` method it exposes
Expand Down Expand Up @@ -224,6 +224,10 @@ def __init__(self: Self, options: Dict[str, Any]) -> None:
elif self._compression_algo == "br":
self._compression_level = 4

def _create_worker(self: Self, options: Dict[str, Any]) -> Worker:
# For now, we only support the threaded sync background worker.
return BackgroundWorker(queue_size=options["transport_queue_size"])

def record_lost_event(
self: Self,
reason: str,
Expand Down Expand Up @@ -286,12 +290,8 @@ def _update_rate_limits(
seconds=retry_after
)

def _send_request(
self: Self,
body: bytes,
headers: Dict[str, str],
endpoint_type: EndpointType = EndpointType.ENVELOPE,
envelope: Optional[Envelope] = None,
def _handle_request_error(
self: Self, envelope: Optional[Envelope], loss_reason: str = "network"
) -> None:
def record_loss(reason: str) -> None:
if envelope is None:
Expand All @@ -300,45 +300,45 @@ def record_loss(reason: str) -> None:
for item in envelope.items:
self.record_lost_event(reason, item=item)

self.on_dropped_event(loss_reason)
record_loss("network_error")

def _handle_response(
self: Self,
response: Union[urllib3.BaseHTTPResponse, httpcore.Response],
envelope: Optional[Envelope],
) -> None:
self._update_rate_limits(response)

if response.status == 429:
# if we hit a 429. Something was rate limited but we already
# acted on this in `self._update_rate_limits`. Note that we
# do not want to record event loss here as we will have recorded
# an outcome in relay already.
self.on_dropped_event("status_429")
pass

elif response.status >= 300 or response.status < 200:
logger.error(
"Unexpected status code: %s (body: %s)",
response.status,
getattr(response, "data", getattr(response, "content", None)),
)
self._handle_request_error(
envelope=envelope, loss_reason="status_{}".format(response.status)
)

def _update_headers(
self: Self,
headers: Dict[str, str],
) -> None:

headers.update(
{
"User-Agent": str(self._auth.client),
"X-Sentry-Auth": str(self._auth.to_header()),
}
)
try:
response = self._request(
"POST",
endpoint_type,
body,
headers,
)
except Exception:
self.on_dropped_event("network")
record_loss("network_error")
raise

try:
self._update_rate_limits(response)

if response.status == 429:
# if we hit a 429. Something was rate limited but we already
# acted on this in `self._update_rate_limits`. Note that we
# do not want to record event loss here as we will have recorded
# an outcome in relay already.
self.on_dropped_event("status_429")
pass

elif response.status >= 300 or response.status < 200:
logger.error(
"Unexpected status code: %s (body: %s)",
response.status,
getattr(response, "data", getattr(response, "content", None)),
)
self.on_dropped_event("status_{}".format(response.status))
record_loss("network_error")
finally:
response.close()

def on_dropped_event(self: Self, _reason: str) -> None:
return None
Expand Down Expand Up @@ -375,11 +375,6 @@ def _fetch_pending_client_report(
type="client_report",
)

def _flush_client_reports(self: Self, force: bool = False) -> None:
client_report = self._fetch_pending_client_report(force=force, interval=60)
if client_report is not None:
self.capture_envelope(Envelope(items=[client_report]))

def _check_disabled(self: Self, category: EventDataCategory) -> bool:
def _disabled(bucket: Optional[EventDataCategory]) -> bool:
ts = self._disabled_until.get(bucket)
Expand All @@ -398,9 +393,9 @@ def _is_worker_full(self: Self) -> bool:
def is_healthy(self: Self) -> bool:
return not (self._is_worker_full() or self._is_rate_limited())

def _send_envelope(self: Self, envelope: Envelope) -> None:

# remove all items from the envelope which are over quota
def _prepare_envelope(
self: Self, envelope: Envelope
) -> Optional[Tuple[Envelope, io.BytesIO, Dict[str, str]]]:
new_items = []
for item in envelope.items:
if self._check_disabled(item.data_category):
Expand Down Expand Up @@ -442,13 +437,7 @@ def _send_envelope(self: Self, envelope: Envelope) -> None:
if content_encoding:
headers["Content-Encoding"] = content_encoding

self._send_request(
body.getvalue(),
headers=headers,
endpoint_type=EndpointType.ENVELOPE,
envelope=envelope,
)
return None
return envelope, body, headers

def _serialize_envelope(
self: Self, envelope: Envelope
Expand Down Expand Up @@ -494,6 +483,9 @@ def _make_pool(
httpcore.SOCKSProxy,
httpcore.HTTPProxy,
httpcore.ConnectionPool,
httpcore.AsyncSOCKSProxy,
httpcore.AsyncHTTPProxy,
httpcore.AsyncConnectionPool,
]:
raise NotImplementedError()

Expand All @@ -506,6 +498,54 @@ def _request(
) -> Union[urllib3.BaseHTTPResponse, httpcore.Response]:
raise NotImplementedError()

def kill(self: Self) -> None:
logger.debug("Killing HTTP transport")
self._worker.kill()


class BaseHttpTransport(HttpTransportCore):

def _send_envelope(self: Self, envelope: Envelope) -> None:
_prepared_envelope = self._prepare_envelope(envelope)
if _prepared_envelope is None:
return None
envelope, body, headers = _prepared_envelope
self._send_request(
body.getvalue(),
headers=headers,
endpoint_type=EndpointType.ENVELOPE,
envelope=envelope,
)
return None

def _send_request(
self: Self,
body: bytes,
headers: Dict[str, str],
endpoint_type: EndpointType,
envelope: Optional[Envelope],
) -> None:
self._update_headers(headers)
try:
response = self._request(
"POST",
endpoint_type,
body,
headers,
)
except Exception:
self._handle_request_error(envelope=envelope, loss_reason="network")
raise
try:
self._handle_response(response=response, envelope=envelope)
finally:
response.close()

def _flush_client_reports(self: Self, force: bool = False) -> None:
client_report = self._fetch_pending_client_report(force=force, interval=60)
if client_report is not None:
self.capture_envelope(Envelope(items=[client_report]))

def capture_envelope(self: Self, envelope: Envelope) -> None:
def send_envelope_wrapper() -> None:
with capture_internal_exceptions():
Expand All @@ -528,10 +568,6 @@ def flush(
self._worker.submit(lambda: self._flush_client_reports(force=True))
self._worker.flush(timeout, callback)

def kill(self: Self) -> None:
logger.debug("Killing HTTP transport")
self._worker.kill()


class HttpTransport(BaseHttpTransport):
if TYPE_CHECKING:
Expand Down
Loading
Loading