Skip to content

Commit

Permalink
Improve performance of datapoint ingestion
Browse files Browse the repository at this point in the history
Several users reported timeouts logged in Druid Historical/MiddleManager
daemons when their traffic is really high (like 1500 msg/s). The exporter
was built with a more conservative use case in mind (Wikimedia's), so the
default sync single process of make_server() has been enough up to now.

This patch separates datapoints ingestion (namely traffic that comes from
Druid daemons towards the exporter) and datapoints processing, using
a (thread-safe) queue. It also uses the gevent's WSGIServer implementation,
that uses coroutines/greenlets and it is able to sustain a lot more (concurrent)
traffic.

GH issue: #11

Change-Id: I4b335a1f663957277fe0c443492c4000bbbcac89
  • Loading branch information
elukey committed May 27, 2020
1 parent c8b6d1f commit f22c6d9
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 9 deletions.
30 changes: 24 additions & 6 deletions druid_exporter/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# limitations under the License.

import logging
import queue
import threading

from collections import defaultdict
from prometheus_client.core import (CounterMetricFamily, GaugeMetricFamily,
Expand All @@ -28,6 +30,17 @@ class DruidCollector(object):
'druid_scrape_duration_seconds', 'Druid scrape duration')

def __init__(self, metrics_config):

# The ingestion of the datapoints is separated from their processing,
# to separate concerns and avoid unnecessary slowdowns for Druid
# daemons sending data.
# Only one thread de-queues and process datapoints, in this way we
# don't really need any special locking to guarantee consistency.
# Since this thread is not I/O bound it doesn't seem the case to
# use a gevent's greenlet, but more tests might prove the contrary.
self.datapoints_queue = queue.Queue()
threading.Thread(target=self.process_queued_datapoints).start()

# Datapoints successfully registered
self.datapoints_registered = 0

Expand Down Expand Up @@ -230,10 +243,15 @@ def register_datapoint(self, datapoint):
.format(datapoint))
return

metric_name = str(datapoint['metric'])
if self.metrics_config[daemon][metric_name]['type'] == 'histogram':
self.store_histogram(datapoint)
else:
self.store_counter(datapoint)
self.datapoints_queue.put((daemon, datapoint))

def process_queued_datapoints(self):
while True:
(daemon, datapoint) = self.datapoints_queue.get()
metric_name = str(datapoint['metric'])
if self.metrics_config[daemon][metric_name]['type'] == 'histogram':
self.store_histogram(datapoint)
else:
self.store_counter(datapoint)

self.datapoints_registered += 1
self.datapoints_registered += 1
4 changes: 2 additions & 2 deletions druid_exporter/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from druid_exporter import collector
from prometheus_client import generate_latest, make_wsgi_app, REGISTRY
from wsgiref.simple_server import make_server
from gevent.pywsgi import WSGIServer

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -149,7 +149,7 @@ def main():
druid_wsgi_app = DruidWSGIApp(args.uri, druid_collector,
prometheus_app, args.encoding)

httpd = make_server(address, int(port), druid_wsgi_app)
httpd = WSGIServer((address, int(port)), druid_wsgi_app)
httpd.serve_forever()


Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from setuptools import setup

setup(name='druid_exporter',
version='0.8',
version='0.9',
description='Prometheus exporter for Druid',
url='https://github.com/wikimedia/operations-software-druid_exporter',
author='Luca Toscano',
Expand All @@ -10,6 +10,7 @@
packages=['druid_exporter'],
install_requires=[
'prometheus-client>=0.5.0',
'gevent',
],
entry_points={
'console_scripts': [
Expand Down

0 comments on commit f22c6d9

Please sign in to comment.