Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async sender with kafka #107

Draft
wants to merge 1 commit into
base: sum12-only-async
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 23 additions & 12 deletions journalpump/journalpump.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@
from .daemon import ServiceDaemon
from .senders import (
AWSCloudWatchSender, ElasticsearchSender, FileSender, GoogleCloudLoggingSender, KafkaSender, LogplexSender,
RsyslogSender, WebsocketSender
RsyslogSender, WebsocketSender, AsyncKafkaSender
)
from .senders.base import MAX_KAFKA_MESSAGE_SIZE, SenderInitializationError, Tagged
from .senders.base import AsyncLogSender, MAX_KAFKA_MESSAGE_SIZE, SenderInitializationError, Tagged
from .types import GeoIPProtocol
from .util import atomic_replace_file, default_json_serialization
from functools import reduce
from systemd.journal import Reader
from threading import Thread
from typing import Type, Union

import asyncio
import copy
import datetime
import json
Expand Down Expand Up @@ -114,6 +116,7 @@ class JournalReader(Tagged):
"aws_cloudwatch": AWSCloudWatchSender,
"google_cloud_logging": GoogleCloudLoggingSender,
"websocket": WebsocketSender,
"kafka_async": AsyncKafkaSender,
}

def __init__(
Expand Down Expand Up @@ -237,17 +240,25 @@ def initialize_senders(self):
if not isinstance(extra_field_values, dict):
self.log.warning("extra_field_values: %r not a dictionary object, ignoring", extra_field_values)
extra_field_values = {}
kw = dict(
config=sender_config,
field_filter=field_filter,
extra_field_values=extra_field_values,
msg_buffer_max_length=self.msg_buffer_max_length,
name=sender_name,
reader=self,
stats=self.stats,
tags=self.make_tags(),
)
if isinstance(sender_class, AsyncLogSender):
loop = asyncio.get_event_loop()
kw["aio_loop"] = loop
if not loop.is_running:
# to unbloock the journal reading process
# the same loop can be used for other senders once they become async
Thread(target=loop.run_forever).start()
try:
sender = sender_class(
config=sender_config,
field_filter=field_filter,
extra_field_values=extra_field_values,
msg_buffer_max_length=self.msg_buffer_max_length,
name=sender_name,
reader=self,
stats=self.stats,
tags=self.make_tags(),
)
sender = sender_class(**kw)
except SenderInitializationError:
# If sender init fails, log exception, don't start() the sender
# and don't add it to self.senders dict. A metric about senders that
Expand Down
1 change: 1 addition & 0 deletions journalpump/senders/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@
from .logplex import LogplexSender # noqa: F401
from .rsyslog import RsyslogSender # noqa: F401
from .websocket import WebsocketSender # noqa: F401
from .kafka_async import AsyncKafkaSender # noqa: F401
1 change: 0 additions & 1 deletion journalpump/senders/aws_cloudwatch.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from .base import ThreadedLogSender, SenderInitializationError


import boto3
import botocore
import json
Expand Down
44 changes: 44 additions & 0 deletions journalpump/senders/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from threading import Lock, Thread

import asyncio
import logging
import random
import time
Expand Down Expand Up @@ -265,3 +266,46 @@ def get_and_send_messages(self):
# there is already a broad except handler in send_messages, so why this ?
self.log.exception("Problem sending %r messages", msg_count)
self._backoff()

class AsyncLogSender(LogSender):
def __init__(self, **kw):
self.loop: asyncio.events.AbstractEventLoop = kw.pop('aio_loop')
LogSender.__init__(self, **kw)

async def _backoff(self, *, base=0.5, cap=1800.0):
t = self._get_backoff_secs(base=base, cap=cap)
self.log.info("Sleeping for %.0f seconds", t)
await asyncio.sleep(t)

def handle_maintenance_operations(self):
async def noop():
pass
self.loop.create_task(noop())

async def run(self):
while self.running:
await self.handle_maintenance_operations()
if self.should_try_sending_messages():
await self.get_and_send_messages()
else:
await asyncio.sleep(self._wait_for)
self.log.info("Stopping")

async def get_and_send_messages(self):
batches = self.get_message_bodies_and_cursor()
msg_count = sum(len(batch[0]) for batch in batches)
self.log.debug("Got %d items from msg_buffer", msg_count)
start_time = time.monotonic()
try:
# pop to get free up memory as soon as the send was successful
while batches:
batch = batches.pop(0)
# die retrying, backoff is part of sending mechanism
while self.running and not await self.send_messages(messages=batch[0], cursor=batch[1]):
pass
self.log.debug("Sending %d msgs, took %.4fs", msg_count, time.monotonic() - start_time)
self.last_send_time = time.monotonic()
except Exception: # pylint: disable=broad-except
# there is already a broad except handler in send_messages, so why this ?
self.log.exception("Problem sending %r messages", msg_count)
await self._backoff()
150 changes: 150 additions & 0 deletions journalpump/senders/kafka_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
from .base import AsyncLogSender
from aiokafka import AIOKafkaProducer
from kafka import errors

import asyncio
import logging
import socket


try:
import snappy
except ImportError:
snappy = None

try:
import zstandard as zstd
except ImportError:
zstd = None

KAFKA_CONN_ERRORS = tuple(errors.RETRY_ERROR_TYPES) + (
errors.UnknownError,
socket.timeout,
)

logging.getLogger("kafka").setLevel(logging.CRITICAL) # remove client-internal tracebacks from logging output


class AsyncKafkaSender(AsyncLogSender):
def __init__(self, *, config, **kwargs):
super().__init__(config=config, max_send_interval=config.get("max_send_interval", 0.3), **kwargs)
self.kafka_producer = None
self.kafka_msg_key = self.config.get("kafka_msg_key")
if self.kafka_msg_key:
self.kafka_msg_key = self.kafka_msg_key.encode("utf8")
self.topic = self.config.get("kafka_topic")

def _generate_client_config(self) -> dict:
config = {
"api_version": self.config.get("kafka_api_version"),
"bootstrap_servers": self.config.get("kafka_address"),
"reconnect_backoff_ms": 1000, # up from the default 50ms to reduce connection attempts
"reconnect_backoff_max_ms": 10000, # up the upper bound for backoff to 10 seconds
"loop": self.loop,
}

if self.config.get("ssl"):
config["security_protocol"] = "SSL"
config["ssl_cafile"] = self.config.get("ca")
config["ssl_certfile"] = self.config.get("certfile")
config["ssl_keyfile"] = self.config.get("keyfile")
else:
config["security_protocol"] = "PLAINTEXT"

return config

def _generate_producer_config(self) -> dict:
producer_config = self._generate_client_config()
producer_config["linger_ms"] = 500 # wait up 500 ms to see if we can send msgs in a group

# make sure the python client supports it as well
if zstd and "zstd" in AIOKafkaProducer._COMPRESSORS: # pylint: disable=protected-access
producer_config["compression_type"] = "zstd"
elif snappy:
producer_config["compression_type"] = "snappy"
else:
producer_config["compression_type"] = "gzip"

if self.config.get("socks5_proxy"):
# Socks5_config is supported by Aiven fork of kafka-python for the time being
producer_config["socks5_proxy"] = self.config.get("socks5_proxy")

return producer_config

def start(self):
self.loop.create_task(self.run())

async def _init_kafka(self) -> None:
self.log.info("Initializing Kafka client, address: %r", self.config["kafka_address"])

if self.kafka_producer:
self.kafka_producer.close()
self.kafka_producer = None

self.mark_disconnected()

while self.running and not self._connected:
producer_config = self._generate_producer_config()

try:
kafka_producer = AIOKafkaProducer(**producer_config)
await kafka_producer.start()
except KAFKA_CONN_ERRORS as ex:
self.mark_disconnected(ex)
self.log.warning("Retriable error during Kafka initialization: %s: %s", ex.__class__.__name__, ex)
await self._backoff()
else:
self.log.info("Initialized Kafka Client, address: %r", self.config["kafka_address"])
self.kafka_producer = kafka_producer
self.mark_connected()

# Assume that when the topic configuration is provided we should
# manually create it. This is useful for kafka clusters configured with
# `auto.create.topics.enable = false`
# topic_config = self.config.get("kafka_topic_config", dict())
# num_partitions = topic_config.get("num_partitions")
# replication_factor = topic_config.get("replication_factor")
# if num_partitions is not None and replication_factor is not None:
# kafka_admin = KafkaAdminClient(**self._generate_client_config())
# try:
# kafka_admin.create_topics([NewTopic(self.topic, num_partitions, replication_factor)])
# except errors.TopicAlreadyExistsError:
# self.log.info("Kafka topic %r already exists", self.topic)
# else:
# self.log.info("Create Kafka topic, address: %r", self.topic)

# def send_messages(self, *, messages, cursor):
# task = self.loop.(self._send_messages(messages=messages, cursor=cursor))
# await task
# return task.result()

async def send_messages(self, *, messages, cursor):
if not self.kafka_producer:
await self._init_kafka()
try:
# Collect return values of send():
# FutureRecordMetadata which will trigger when message actually sent (during flush)
result_futures = asyncio.gather(
self.kafka_producer.send(topic=self.topic, value=msg, key=self.kafka_msg_key) for msg in messages
)
await self.kafka_producer.flush()
for result_future in result_futures:
# get() throws error from future, catch below
# flush() above should have sent, getting with 1 sec timeout
result_future.get(timeout=1)
self.mark_sent(messages=messages, cursor=cursor)
return True
except KAFKA_CONN_ERRORS as ex:
self.mark_disconnected(ex)
self.log.info("Kafka retriable error during send: %s: %s, waiting", ex.__class__.__name__, ex)
if self.running:
await self._backoff()
await self._init_kafka()
except Exception as ex: # pylint: disable=broad-except
self.mark_disconnected(ex)
self.log.exception("Unexpected exception during send to kafka")
self.stats.unexpected_exception(ex=ex, where="sender", tags=self.make_tags({"app": "journalpump"}))
if self.running:
await self._backoff()
await self._init_kafka()
return False