Skip to content

Commit 947625b

Browse files
authored
Merge pull request #754 from dpkp/benchmarks
Producer metrics + consumer/producer benchmark scripts
2 parents 3666b66 + 7a2ec33 commit 947625b

File tree

11 files changed

+640
-27
lines changed

11 files changed

+640
-27
lines changed

benchmarks/consumer_performance.py

+179
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
#!/usr/bin/env python
2+
# Adapted from https://github.com/mrafayaleem/kafka-jython
3+
4+
from __future__ import absolute_import, print_function
5+
6+
import argparse
7+
import logging
8+
import pprint
9+
import sys
10+
import threading
11+
import traceback
12+
13+
from kafka import KafkaConsumer, KafkaProducer
14+
from test.fixtures import KafkaFixture, ZookeeperFixture
15+
16+
logging.basicConfig(level=logging.ERROR)
17+
18+
19+
def start_brokers(n):
20+
print('Starting {0} {1}-node cluster...'.format(KafkaFixture.kafka_version, n))
21+
print('-> 1 Zookeeper')
22+
zk = ZookeeperFixture.instance()
23+
print('---> {0}:{1}'.format(zk.host, zk.port))
24+
print()
25+
26+
partitions = min(n, 3)
27+
replicas = min(n, 3)
28+
print('-> {0} Brokers [{1} partitions / {2} replicas]'.format(n, partitions, replicas))
29+
brokers = [
30+
KafkaFixture.instance(i, zk.host, zk.port, zk_chroot='',
31+
partitions=partitions, replicas=replicas)
32+
for i in range(n)
33+
]
34+
for broker in brokers:
35+
print('---> {0}:{1}'.format(broker.host, broker.port))
36+
print()
37+
return brokers
38+
39+
40+
class ConsumerPerformance(object):
41+
42+
@staticmethod
43+
def run(args):
44+
try:
45+
props = {}
46+
for prop in args.consumer_config:
47+
k, v = prop.split('=')
48+
try:
49+
v = int(v)
50+
except ValueError:
51+
pass
52+
if v == 'None':
53+
v = None
54+
props[k] = v
55+
56+
if args.brokers:
57+
brokers = start_brokers(args.brokers)
58+
props['bootstrap_servers'] = ['{0}:{1}'.format(broker.host, broker.port)
59+
for broker in brokers]
60+
print('---> bootstrap_servers={0}'.format(props['bootstrap_servers']))
61+
print()
62+
63+
print('-> Producing records')
64+
record = bytes(bytearray(args.record_size))
65+
producer = KafkaProducer(compression_type=args.fixture_compression,
66+
**props)
67+
for i in xrange(args.num_records):
68+
producer.send(topic=args.topic, value=record)
69+
producer.flush()
70+
producer.close()
71+
print('-> OK!')
72+
print()
73+
74+
print('Initializing Consumer...')
75+
props['auto_offset_reset'] = 'earliest'
76+
if 'consumer_timeout_ms' not in props:
77+
props['consumer_timeout_ms'] = 10000
78+
props['metrics_sample_window_ms'] = args.stats_interval * 1000
79+
for k, v in props.items():
80+
print('---> {0}={1}'.format(k, v))
81+
consumer = KafkaConsumer(args.topic, **props)
82+
print('---> group_id={0}'.format(consumer.config['group_id']))
83+
print('---> report stats every {0} secs'.format(args.stats_interval))
84+
print('---> raw metrics? {0}'.format(args.raw_metrics))
85+
timer_stop = threading.Event()
86+
timer = StatsReporter(args.stats_interval, consumer,
87+
event=timer_stop,
88+
raw_metrics=args.raw_metrics)
89+
timer.start()
90+
print('-> OK!')
91+
print()
92+
93+
records = 0
94+
for msg in consumer:
95+
records += 1
96+
if records >= args.num_records:
97+
break
98+
print('Consumed {0} records'.format(records))
99+
100+
timer_stop.set()
101+
102+
except Exception:
103+
exc_info = sys.exc_info()
104+
traceback.print_exception(*exc_info)
105+
sys.exit(1)
106+
107+
108+
class StatsReporter(threading.Thread):
109+
def __init__(self, interval, consumer, event=None, raw_metrics=False):
110+
super(StatsReporter, self).__init__()
111+
self.interval = interval
112+
self.consumer = consumer
113+
self.event = event
114+
self.raw_metrics = raw_metrics
115+
116+
def print_stats(self):
117+
metrics = self.consumer.metrics()
118+
if self.raw_metrics:
119+
pprint.pprint(metrics)
120+
else:
121+
print('{records-consumed-rate} records/sec ({bytes-consumed-rate} B/sec),'
122+
' {fetch-latency-avg} latency,'
123+
' {fetch-rate} fetch/s,'
124+
' {fetch-size-avg} fetch size,'
125+
' {records-lag-max} max record lag,'
126+
' {records-per-request-avg} records/req'
127+
.format(**metrics['consumer-fetch-manager-metrics']))
128+
129+
130+
def print_final(self):
131+
self.print_stats()
132+
133+
def run(self):
134+
while self.event and not self.event.wait(self.interval):
135+
self.print_stats()
136+
else:
137+
self.print_final()
138+
139+
140+
def get_args_parser():
141+
parser = argparse.ArgumentParser(
142+
description='This tool is used to verify the consumer performance.')
143+
144+
parser.add_argument(
145+
'--topic', type=str,
146+
help='Topic for consumer test',
147+
default='kafka-python-benchmark-test')
148+
parser.add_argument(
149+
'--num-records', type=long,
150+
help='number of messages to consume',
151+
default=1000000)
152+
parser.add_argument(
153+
'--record-size', type=int,
154+
help='message size in bytes',
155+
default=100)
156+
parser.add_argument(
157+
'--consumer-config', type=str, nargs='+', default=(),
158+
help='kafka consumer related configuaration properties like '
159+
'bootstrap_servers,client_id etc..')
160+
parser.add_argument(
161+
'--fixture-compression', type=str,
162+
help='specify a compression type for use with broker fixtures / producer')
163+
parser.add_argument(
164+
'--brokers', type=int,
165+
help='Number of kafka brokers to start',
166+
default=0)
167+
parser.add_argument(
168+
'--stats-interval', type=int,
169+
help='Interval in seconds for stats reporting to console',
170+
default=5)
171+
parser.add_argument(
172+
'--raw-metrics', action='store_true',
173+
help='Enable this flag to print full metrics dict on each interval')
174+
return parser
175+
176+
177+
if __name__ == '__main__':
178+
args = get_args_parser().parse_args()
179+
ConsumerPerformance.run(args)
File renamed without changes.

benchmarks/producer_performance.py

+158
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
#!/usr/bin/env python
2+
# Adapted from https://github.com/mrafayaleem/kafka-jython
3+
4+
from __future__ import absolute_import, print_function
5+
6+
import argparse
7+
import pprint
8+
import sys
9+
import threading
10+
import traceback
11+
12+
from kafka import KafkaProducer
13+
from test.fixtures import KafkaFixture, ZookeeperFixture
14+
15+
16+
def start_brokers(n):
17+
print('Starting {0} {1}-node cluster...'.format(KafkaFixture.kafka_version, n))
18+
print('-> 1 Zookeeper')
19+
zk = ZookeeperFixture.instance()
20+
print('---> {0}:{1}'.format(zk.host, zk.port))
21+
print()
22+
23+
partitions = min(n, 3)
24+
replicas = min(n, 3)
25+
print('-> {0} Brokers [{1} partitions / {2} replicas]'.format(n, partitions, replicas))
26+
brokers = [
27+
KafkaFixture.instance(i, zk.host, zk.port, zk_chroot='',
28+
partitions=partitions, replicas=replicas)
29+
for i in range(n)
30+
]
31+
for broker in brokers:
32+
print('---> {0}:{1}'.format(broker.host, broker.port))
33+
print()
34+
return brokers
35+
36+
37+
class ProducerPerformance(object):
38+
39+
@staticmethod
40+
def run(args):
41+
try:
42+
props = {}
43+
for prop in args.producer_config:
44+
k, v = prop.split('=')
45+
try:
46+
v = int(v)
47+
except ValueError:
48+
pass
49+
if v == 'None':
50+
v = None
51+
props[k] = v
52+
53+
if args.brokers:
54+
brokers = start_brokers(args.brokers)
55+
props['bootstrap_servers'] = ['{0}:{1}'.format(broker.host, broker.port)
56+
for broker in brokers]
57+
print("---> bootstrap_servers={0}".format(props['bootstrap_servers']))
58+
print()
59+
print('-> OK!')
60+
print()
61+
62+
print('Initializing producer...')
63+
record = bytes(bytearray(args.record_size))
64+
props['metrics_sample_window_ms'] = args.stats_interval * 1000
65+
66+
producer = KafkaProducer(**props)
67+
for k, v in props.items():
68+
print('---> {0}={1}'.format(k, v))
69+
print('---> send {0} byte records'.format(args.record_size))
70+
print('---> report stats every {0} secs'.format(args.stats_interval))
71+
print('---> raw metrics? {0}'.format(args.raw_metrics))
72+
timer_stop = threading.Event()
73+
timer = StatsReporter(args.stats_interval, producer,
74+
event=timer_stop,
75+
raw_metrics=args.raw_metrics)
76+
timer.start()
77+
print('-> OK!')
78+
print()
79+
80+
for i in xrange(args.num_records):
81+
producer.send(topic=args.topic, value=record)
82+
producer.flush()
83+
84+
timer_stop.set()
85+
86+
except Exception:
87+
exc_info = sys.exc_info()
88+
traceback.print_exception(*exc_info)
89+
sys.exit(1)
90+
91+
92+
class StatsReporter(threading.Thread):
93+
def __init__(self, interval, producer, event=None, raw_metrics=False):
94+
super(StatsReporter, self).__init__()
95+
self.interval = interval
96+
self.producer = producer
97+
self.event = event
98+
self.raw_metrics = raw_metrics
99+
100+
def print_stats(self):
101+
metrics = self.producer.metrics()
102+
if self.raw_metrics:
103+
pprint.pprint(metrics)
104+
else:
105+
print('{record-send-rate} records/sec ({byte-rate} B/sec),'
106+
' {request-latency-avg} latency,'
107+
' {record-size-avg} record size,'
108+
' {batch-size-avg} batch size,'
109+
' {records-per-request-avg} records/req'
110+
.format(**metrics['producer-metrics']))
111+
112+
def print_final(self):
113+
self.print_stats()
114+
115+
def run(self):
116+
while self.event and not self.event.wait(self.interval):
117+
self.print_stats()
118+
else:
119+
self.print_final()
120+
121+
122+
def get_args_parser():
123+
parser = argparse.ArgumentParser(
124+
description='This tool is used to verify the producer performance.')
125+
126+
parser.add_argument(
127+
'--topic', type=str,
128+
help='Topic name for test',
129+
default='kafka-python-benchmark-test')
130+
parser.add_argument(
131+
'--num-records', type=long,
132+
help='number of messages to produce',
133+
default=1000000)
134+
parser.add_argument(
135+
'--record-size', type=int,
136+
help='message size in bytes',
137+
default=100)
138+
parser.add_argument(
139+
'--producer-config', type=str, nargs='+', default=(),
140+
help='kafka producer related configuaration properties like '
141+
'bootstrap_servers,client_id etc..')
142+
parser.add_argument(
143+
'--brokers', type=int,
144+
help='Number of kafka brokers to start',
145+
default=0)
146+
parser.add_argument(
147+
'--stats-interval', type=int,
148+
help='Interval in seconds for stats reporting to console',
149+
default=5)
150+
parser.add_argument(
151+
'--raw-metrics', action='store_true',
152+
help='Enable this flag to print full metrics dict on each interval')
153+
return parser
154+
155+
156+
if __name__ == '__main__':
157+
args = get_args_parser().parse_args()
158+
ProducerPerformance.run(args)

kafka/consumer/fetcher.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -729,6 +729,8 @@ def _handle_fetch_response(self, request, send_time, response):
729729
else:
730730
raise error_type('Unexpected error while fetching data')
731731

732+
# Because we are currently decompressing messages lazily, the sensors here
733+
# will get compressed bytes / message set stats when compression is enabled
732734
self._sensors.bytes_fetched.record(total_bytes)
733735
self._sensors.records_fetched.record(total_count)
734736
if response.API_VERSION >= 1:
@@ -774,12 +776,12 @@ def __init__(self, metrics, prefix):
774776
'The maximum throttle time in ms'), Max())
775777

776778
def record_topic_fetch_metrics(self, topic, num_bytes, num_records):
777-
metric_tags = {'topic': topic.replace('.', '_')}
778-
779779
# record bytes fetched
780780
name = '.'.join(['topic', topic, 'bytes-fetched'])
781781
bytes_fetched = self.metrics.get_sensor(name)
782782
if not bytes_fetched:
783+
metric_tags = {'topic': topic.replace('.', '_')}
784+
783785
bytes_fetched = self.metrics.sensor(name)
784786
bytes_fetched.add(self.metrics.metric_name('fetch-size-avg',
785787
self.group_name,
@@ -799,6 +801,8 @@ def record_topic_fetch_metrics(self, topic, num_bytes, num_records):
799801
name = '.'.join(['topic', topic, 'records-fetched'])
800802
records_fetched = self.metrics.get_sensor(name)
801803
if not records_fetched:
804+
metric_tags = {'topic': topic.replace('.', '_')}
805+
802806
records_fetched = self.metrics.sensor(name)
803807
records_fetched.add(self.metrics.metric_name('records-per-request-avg',
804808
self.group_name,

0 commit comments

Comments
 (0)