Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Event-Publisher Flush Queue on Shutdown (#767)" #789

Merged
merged 1 commit into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions baseplate/sidecars/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,6 @@ def age(self) -> float:
return 0
return time.time() - self.batch_start

@property
def is_ready(self) -> bool:
return self.age >= self.max_age

def add(self, item: Optional[bytes]) -> None:
if self.age >= self.max_age:
raise BatchFull
Expand Down
49 changes: 12 additions & 37 deletions baseplate/sidecars/event_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
import hashlib
import hmac
import logging
import signal
import sys

from types import FrameType
from typing import Any
from typing import List
from typing import Optional
Expand Down Expand Up @@ -167,16 +164,6 @@ def publish(self, payload: SerializedBatch) -> None:
SERIALIZER_BY_VERSION = {"2": V2Batch, "2j": V2JBatch}


def serialize_and_publish_batch(publisher: BatchPublisher, batcher: TimeLimitedBatch) -> None:
"""Serializes batch, publishes it using the publisher, and then resets the batch for more messages."""
serialized_batch = batcher.serialize()
try:
publisher.publish(serialized_batch)
except Exception:
logger.exception("Events publishing failed.")
batcher.reset()


def publish_events() -> None:
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument(
Expand Down Expand Up @@ -227,28 +214,6 @@ def publish_events() -> None:
batcher = TimeLimitedBatch(serializer, MAX_BATCH_AGE)
publisher = BatchPublisher(metrics_client, cfg)

def flush_queue_signal_handler(_signo: int, _frame: FrameType) -> None:
"""Signal handler for flushing messages from the queue and publishing them."""
message: Optional[bytes]
logger.info("Shutdown signal received. Flushing events...")

while True:
try:
message = event_queue.get(timeout=0.2)
except TimedOutError:
if len(batcher.serialize()) > 0:
serialize_and_publish_batch(publisher, batcher)
break

if batcher.is_ready:
serialize_and_publish_batch(publisher, batcher)
batcher.add(message)
sys.exit(0)

for sig in (signal.SIGINT, signal.SIGTERM):
signal.signal(sig, flush_queue_signal_handler)
signal.siginterrupt(sig, False)

while True:
message: Optional[bytes]

Expand All @@ -257,8 +222,18 @@ def flush_queue_signal_handler(_signo: int, _frame: FrameType) -> None:
except TimedOutError:
message = None

if batcher.is_ready:
serialize_and_publish_batch(publisher, batcher)
try:
batcher.add(message)
continue
except BatchFull:
pass

serialized = batcher.serialize()
try:
publisher.publish(serialized)
except Exception:
logger.exception("Events publishing failed.")
batcher.reset()
batcher.add(message)


Expand Down