Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix runtime workers not flushing to Dogstatsd #939

Merged
merged 10 commits into from
Jun 3, 2019
5 changes: 3 additions & 2 deletions ddtrace/internal/runtime/runtime_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ class RuntimeWorker(_worker.PeriodicWorkerThread):

FLUSH_INTERVAL = 10

def __init__(self, statsd_client, flush_interval=FLUSH_INTERVAL):
def __init__(self, statsd_client, flush_interval=None):
flush_interval = self.FLUSH_INTERVAL if flush_interval is None else flush_interval
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We did this because we had trouble finding a way to override the flush interval in a test case.

This isn't ideal and should likely be refactored into something else.

Maybe:

class Tracer:
    def configure(self, ... _runtime_flush_interval=10):
        pass

This is what we are doing that required us to make this change:

default_flush_interval = RuntimeWorker.FLUSH_INTERVAL
try:
# lower flush interval
RuntimeWorker.FLUSH_INTERVAL = 1/4
# configure tracer for runtime metrics
self.tracer.configure(collect_metrics=True)
finally:
# reset flush interval
RuntimeWorker.FLUSH_INTERVAL = default_flush_interval

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough.

Yeah configure should allow to customize that ultimately.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I think the main think we wanted to test was end-to-end letting the tracer configure the runtime worker, did the right thing

super(RuntimeWorker, self).__init__(interval=flush_interval,
name=self.__class__.__name__)
self._statsd_client = statsd_client
Expand All @@ -78,7 +79,7 @@ def flush(self):
for key, value in self._runtime_metrics:
self._write_metric(key, value)

on_periodic = flush
run_periodic = flush
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦‍♂

I guess the tests passed because we call flush manually?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, we call flush on stop()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or join or w/e

on_shutdown = flush

def reset(self):
Expand Down
34 changes: 21 additions & 13 deletions ddtrace/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ def __init__(self):
self._runtime_id = generate_runtime_id()
self._runtime_worker = None
self._dogstatsd_client = None
self._dogstatsd_host = self.DEFAULT_HOSTNAME
self._dogstatsd_port = self.DEFAULT_DOGSTATSD_PORT

def get_call_context(self, *args, **kwargs):
"""
Expand Down Expand Up @@ -154,12 +156,11 @@ def configure(self, enabled=None, hostname=None, port=None, dogstatsd_host=None,
self._wrap_executor = wrap_executor

if collect_metrics and self._runtime_worker is None:
self._dogstatsd_host = dogstatsd_host or self._dogstatsd_host
self._dogstatsd_port = dogstatsd_port or self._dogstatsd_port
# start dogstatsd client if not already running
if not self._dogstatsd_client:
self._start_dogstatsd_client(
dogstatsd_host or self.DEFAULT_HOSTNAME,
dogstatsd_port or self.DEFAULT_DOGSTATSD_PORT,
)
self._start_dogstatsd_client()

self._start_runtime_worker()

Expand Down Expand Up @@ -271,18 +272,18 @@ def start_span(self, name, child_of=None, service=None, resource=None, span_type
# add it to the current context
context.add_span(span)

# check for new process if runtime metrics worker has already been started
if self._runtime_worker:
self._check_new_process()

# update set of services handled by tracer
if service:
if service and service not in self._services:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noticed by looking at logs that we were updating self._services and constant tags with each span, which is definitely overkill. So added this membership test before adding a service.

self._services.add(service)

# The constant tags for the dogstatsd client needs to updated with any new
# service(s) that may have been added.
self._update_dogstatsd_constant_tags()

# check for new process if runtime metrics worker has already been started
if self._runtime_worker:
self._check_new_process()

return span

def _update_dogstatsd_constant_tags(self):
Expand All @@ -299,12 +300,15 @@ def _update_dogstatsd_constant_tags(self):
log.debug('Updating constant tags {}'.format(tags))
self._dogstatsd_client.constant_tags = tags

def _start_dogstatsd_client(self, host, port):
def _start_dogstatsd_client(self):
# start dogstatsd as client with constant tags
log.debug('Starting DogStatsd on {}:{}'.format(host, port))
log.debug('Connecting to DogStatsd on {}:{}'.format(
self._dogstatsd_host,
self._dogstatsd_port
))
self._dogstatsd_client = DogStatsd(
host=host,
port=port,
host=self._dogstatsd_host,
port=self._dogstatsd_port,
)

def _start_runtime_worker(self):
Expand All @@ -330,6 +334,10 @@ def _check_new_process(self):

self._start_runtime_worker()

# force an immediate update constant tags since we have reset services
# and generated a new runtime id
self._update_dogstatsd_constant_tags()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will run for every single trace being finished, is that what we want?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the major change for this PR right? Everything else looks like some organization work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be run whenever a new child process is forked. And, right, the other changes could be removed to keep this PR focused.


def trace(self, name, service=None, resource=None, span_type=None):
"""
Return a span that will trace an operation called `name`. The context that created
Expand Down
35 changes: 24 additions & 11 deletions tests/internal/runtime/test_runtime_metrics.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import time

from ddtrace.internal.runtime.runtime_metrics import (
RuntimeTags,
RuntimeMetrics,
Expand Down Expand Up @@ -43,21 +45,30 @@ def test_one_metric(self):


class TestRuntimeWorker(BaseTracerTestCase):
def test_worker_metrics(self):
self.tracer.configure(collect_metrics=True)
def test_tracer_metrics(self):
# mock dogstatsd client before configuring tracer for runtime metrics
self.tracer._dogstatsd_client = DogStatsd()
self.tracer._dogstatsd_client.socket = FakeSocket()

default_flush_interval = RuntimeWorker.FLUSH_INTERVAL
try:
# lower flush interval
RuntimeWorker.FLUSH_INTERVAL = 1./4

# configure tracer for runtime metrics
self.tracer.configure(collect_metrics=True)
finally:
# reset flush interval
RuntimeWorker.FLUSH_INTERVAL = default_flush_interval

with self.override_global_tracer(self.tracer):
self.tracer._dogstatsd_client = DogStatsd()
self.tracer._dogstatsd_client.socket = FakeSocket()

root = self.start_span('parent', service='parent')
context = root.context
self.start_span('child', service='child', child_of=context)

self.worker = RuntimeWorker(self.tracer._dogstatsd_client, 0)
self.worker.start()
self.worker.stop()
self.worker.join()
time.sleep(self.tracer._runtime_worker.interval * 2)
self.tracer._runtime_worker.stop()
self.tracer._runtime_worker.join()

# get all received metrics
received = []
Expand All @@ -69,7 +80,8 @@ def test_worker_metrics(self):
received.append(new)

# expect received all default metrics
self.assertEqual(len(received), len(DEFAULT_RUNTIME_METRICS))
# we expect more than one flush since it is also called on shutdown
assert len(received) / len(DEFAULT_RUNTIME_METRICS) > 1

# expect all metrics in default set are received
# DEV: dogstatsd gauges in form "{metric_name}:{metric_value}|g#t{tag_name}:{tag_value},..."
Expand All @@ -78,7 +90,8 @@ def test_worker_metrics(self):
DEFAULT_RUNTIME_METRICS
)

for gauge in received:
# check to last set of metrics returned to confirm tags were set
for gauge in received[-len(DEFAULT_RUNTIME_METRICS):]:
self.assertRegexpMatches(gauge, 'runtime-id:')
self.assertRegexpMatches(gauge, 'service:parent')
self.assertRegexpMatches(gauge, 'service:child')