Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kafka_consumer] add timeout for zk/kafka connections #1592

Merged
merged 1 commit into from
May 6, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions checks.d/kafka_consumer.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,30 @@
# stdlib
from collections import defaultdict
import random

# project
from checks import AgentCheck

# 3rd party
# 3p
from kafka.client import KafkaClient
from kafka.common import OffsetRequest
from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError

# project
from checks import AgentCheck

DEFAULT_KAFKA_TIMEOUT = 5
DEFAULT_ZK_TIMEOUT = 5


class KafkaCheck(AgentCheck):

SOURCE_TYPE_NAME = 'kafka'

def __init__(self, name, init_config, agentConfig, instances=None):
AgentCheck.__init__(self, name, init_config, agentConfig, instances=instances)
self.zk_timeout = int(
init_config.get('zk_timeout', DEFAULT_ZK_TIMEOUT))
self.kafka_timeout = int(
init_config.get('kafka_timeout', DEFAULT_KAFKA_TIMEOUT))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You forgot the AgentCheck.__init__(self, name, init_config, agentConfig, instances=instances).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow good catch! 👍 Pushed a fix.

def check(self, instance):
consumer_groups = self.read_config(instance, 'consumer_groups',
cast=self._validate_consumer_groups)
Expand All @@ -26,7 +36,7 @@ def check(self, instance):
zk_path_tmpl = zk_prefix + '/consumers/%s/offsets/%s/%s'

# Connect to Zookeeper
zk_conn = KazooClient(zk_connect_str)
zk_conn = KazooClient(zk_connect_str, timeout=self.zk_timeout)
zk_conn.start()

try:
Expand Down Expand Up @@ -56,7 +66,7 @@ def check(self, instance):
self.log.exception('Error cleaning up Zookeeper connection')

# Connect to Kafka
kafka_conn = KafkaClient(kafka_host_ports)
kafka_conn = KafkaClient(kafka_host_ports, timeout=self.kafka_timeout)

try:
# Query Kafka for the broker offsets
Expand Down
4 changes: 4 additions & 0 deletions conf.d/kafka_consumer.yaml.example
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
init_config:
# Customize the ZooKeeper connection timeout here
# zk_timeout: 5
# Customize the Kafka connection timeout here
# kafka_timeout: 5

instances:
# - kafka_connect_str: localhost:19092
Expand Down