Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Event-Publisher Flush Queue on Shutdown (#767)" #793

Open
wants to merge 2 commits into
base: v2.5-unsupported-gcp
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions baseplate/sidecars/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,6 @@ def age(self) -> float:
return 0
return time.time() - self.batch_start

@property
def is_ready(self) -> bool:
return self.is_full or self.age >= self.max_age

def add(self, item: Optional[bytes]) -> None:
if self.age >= self.max_age:
raise BatchFull
Expand Down
47 changes: 9 additions & 38 deletions baseplate/sidecars/event_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
import hashlib
import hmac
import logging
import signal
import sys

from types import FrameType
from typing import Any
from typing import List
from typing import Optional
Expand Down Expand Up @@ -192,24 +189,20 @@ def build_and_publish_batch(
"""Continuously polls for messages, then batches and publishes them."""
while True:
message: Optional[bytes]

try:
message = event_queue.get(timeout)
batcher.add(message)
continue # Start loop again - we will publish on the next loop if batch full/queue empty
message = event_queue.get(timeout=timeout)
except TimedOutError:
message = None
# Keep going - we may want to publish if we have other messages in the batch and time is up
except BatchFull:
batcher.is_full = True
# Keep going - we want to publish bc batch is full

if batcher.is_ready: # Time is up or batch is full
serialize_and_publish_batch(publisher, batcher)
try:
batcher.add(message)
continue
except BatchFull:
pass

if (
message
): # If we published because batch was full, we need to add the straggler we popped
batcher.add(message)
serialize_and_publish_batch(publisher, batcher)
batcher.add(message)


def publish_events() -> None:
Expand Down Expand Up @@ -265,25 +258,6 @@ def publish_events() -> None:
batcher = TimeLimitedBatch(serializer, MAX_BATCH_AGE)
publisher = BatchPublisher(metrics_client, cfg)

def flush_queue_signal_handler(_signo: int, _frame: FrameType) -> None:
"""Signal handler for flushing messages from the queue and publishing them."""
message: Optional[bytes]
logger.info("Shutdown signal received. Flushing events...")

while True:
try:
message = event_queue.get(timeout=0.2)
except TimedOutError:
# Once the queue drains, publish anything remaining and then exit
if len(batcher.serialize()) > 0:
serialize_and_publish_batch(publisher, batcher)
break

if batcher.is_ready:
serialize_and_publish_batch(publisher, batcher)
batcher.add(message)
sys.exit(0)

if cfg.queue_type == QueueType.IN_MEMORY.value and isinstance(
event_queue, InMemoryMessageQueue
):
Expand All @@ -297,9 +271,6 @@ def flush_queue_signal_handler(_signo: int, _frame: FrameType) -> None:
build_and_publish_batch(event_queue, batcher, publisher, QUEUE_TIMEOUT)

else:
for sig in (signal.SIGINT, signal.SIGTERM):
signal.signal(sig, flush_queue_signal_handler)
signal.siginterrupt(sig, False)
build_and_publish_batch(event_queue, batcher, publisher, QUEUE_TIMEOUT)


Expand Down