diff --git a/ddtrace/contrib/elasticsearch/transport.py b/ddtrace/contrib/elasticsearch/transport.py index cd7b7c3dd13..45a1973d75d 100644 --- a/ddtrace/contrib/elasticsearch/transport.py +++ b/ddtrace/contrib/elasticsearch/transport.py @@ -23,6 +23,10 @@ def perform_request(self, method, url, params=None, body=None): This is ConnectionClass-agnostic. """ with self._datadog_tracer.trace("elasticsearch.query") as s: + # Don't instrument if the trace is not sampled + if not s.sampled: + return super(TracedTransport, self).perform_request(method, url, params=params, body=body) + s.service = self._datadog_service s.span_type = SPAN_TYPE s.set_tag(metadata.METHOD, method) diff --git a/ddtrace/contrib/flask/middleware.py b/ddtrace/contrib/flask/middleware.py index c4f0d41d575..7a439fd39c0 100644 --- a/ddtrace/contrib/flask/middleware.py +++ b/ddtrace/contrib/flask/middleware.py @@ -59,24 +59,25 @@ def _start_span(self): self.app.logger.exception("error tracing request") def _finish_span(self, response=None, exception=None): - """ Close and finsh the active span if it exists. """ + """ Close and finish the active span if it exists. """ span = getattr(g, 'flask_datadog_span', None) if span: - error = 0 - code = response.status_code if response else None - - # if we didn't get a response, but we did get an exception, set - # codes accordingly. - if not response and exception: - error = 1 - code = 500 - span.set_tag(errors.ERROR_TYPE, type(exception)) - span.set_tag(errors.ERROR_MSG, exception) - - span.resource = str(request.endpoint or "").lower() - span.set_tag(http.URL, str(request.base_url or "")) - span.set_tag(http.STATUS_CODE, code) - span.error = error + if span.sampled: + error = 0 + code = response.status_code if response else None + + # if we didn't get a response, but we did get an exception, set + # codes accordingly. + if not response and exception: + error = 1 + code = 500 + span.set_tag(errors.ERROR_TYPE, type(exception)) + span.set_tag(errors.ERROR_MSG, exception) + + span.resource = str(request.endpoint or "").lower() + span.set_tag(http.URL, str(request.base_url or "")) + span.set_tag(http.STATUS_CODE, code) + span.error = error span.finish() # Clear our span just in case. g.flask_datadog_span = None diff --git a/ddtrace/contrib/psycopg/connection.py b/ddtrace/contrib/psycopg/connection.py index 18f35c6b38b..c108fe59b6c 100644 --- a/ddtrace/contrib/psycopg/connection.py +++ b/ddtrace/contrib/psycopg/connection.py @@ -45,6 +45,9 @@ def execute(self, query, vars=None): return cursor.execute(self, query, vars) with self._datadog_tracer.trace("postgres.query") as s: + if not s.sampled: + return super(TracedCursor, self).execute(query, vars) + s.resource = query s.service = self._datadog_service s.span_type = sqlx.TYPE diff --git a/ddtrace/contrib/pylons/middleware.py b/ddtrace/contrib/pylons/middleware.py index 8ba9ec51a13..cf5efba60a8 100644 --- a/ddtrace/contrib/pylons/middleware.py +++ b/ddtrace/contrib/pylons/middleware.py @@ -13,32 +13,22 @@ def __init__(self, app, tracer, service="pylons"): self._tracer = tracer def __call__(self, environ, start_response): - span = None - try: - span = self._tracer.trace("pylons.request", service=self._service, span_type=http.TYPE) - log.debug("Initialize new trace %d", span.trace_id) + with self._tracer.trace("pylons.request", service=self._service, span_type=http.TYPE) as span: + + if not span.sampled: + return self.app(environ, start_response) def _start_response(status, *args, **kwargs): """ a patched response callback which will pluck some metadata. """ - span.span_type = http.TYPE http_code = int(status.split()[0]) span.set_tag(http.STATUS_CODE, http_code) if http_code >= 500: span.error = 1 return start_response(status, *args, **kwargs) - except Exception: - log.exception("error starting span") - - try: - return self.app(environ, _start_response) - except Exception: - if span: - span.set_traceback() - raise - finally: - if not span: - return + try: + return self.app(environ, _start_response) + finally: controller = environ.get('pylons.routes_dict', {}).get('controller') action = environ.get('pylons.routes_dict', {}).get('action') span.resource = "%s.%s" % (controller, action) @@ -50,6 +40,3 @@ def _start_response(status, *args, **kwargs): "pylons.route.controller": controller, "pylons.route.action": action, }) - span.finish() - except Exception: - log.exception("Error finishing trace") diff --git a/ddtrace/contrib/sqlite3/connection.py b/ddtrace/contrib/sqlite3/connection.py index 11626ca31d0..8628da45029 100644 --- a/ddtrace/contrib/sqlite3/connection.py +++ b/ddtrace/contrib/sqlite3/connection.py @@ -30,6 +30,10 @@ def execute(self, sql, *args, **kwargs): return Cursor.execute(self, sql, *args, **kwargs) with self._datadog_tracer.trace("sqlite3.query", span_type=sqlx.TYPE) as s: + # Don't instrument if the trace is not sampled + if not s.sampled: + return Cursor.execute(self, sql, *args, **kwargs) + s.set_tag(sqlx.QUERY, sql) s.service = self._datadog_service s.resource = sql # will be normalized diff --git a/ddtrace/reporter.py b/ddtrace/reporter.py index 004d6545759..716c14660cb 100644 --- a/ddtrace/reporter.py +++ b/ddtrace/reporter.py @@ -1,21 +1,12 @@ """ Report spans to the Agent API. - -The asnyc HTTPReporter is taken from raven.transport.threaded. """ - -import atexit -from .compat import httplib import logging -import threading -from time import sleep, time -import os +from time import time # project -from .compat import Queue, json - - -DEFAULT_TIMEOUT = 10 +from .compat import json +from .transport import ThreadedHTTPTransport log = logging.getLogger(__name__) @@ -24,17 +15,11 @@ class AgentReporter(object): SERVICES_FLUSH_INTERVAL = 60 - def __init__(self, disabled=False, config=None): - self.disabled = disabled - self.config = config + def __init__(self): self.transport = ThreadedHTTPTransport() self.last_services_flush = 0 def report(self, spans, services): - if self.disabled: - log.debug("Trace reporter disabled, skip flushing") - return - if spans: self.send_spans(spans) if services: @@ -54,164 +39,3 @@ def send_services(self, services): data = json.dumps(services) headers = {} self.transport.send("PUT", "/services", data, headers) - - -class ThreadedHTTPTransport(object): - - # Async worker, to be defined at first run - _worker = None - - def send(self, method, endpoint, data, headers): - return self.async_send( - method, endpoint, data, headers, - self.success_callback, self.failure_callback - ) - - def async_send(self, method, endpoint, data, headers, success_cb, failure_cb): - self.get_worker().queue( - self.send_sync, method, endpoint, data, headers, success_cb, failure_cb) - - def send_sync(self, method, endpoint, data, headers, success_cb, failure_cb): - try: - conn = httplib.HTTPConnection('localhost', 7777) - conn.request(method, endpoint, data, headers) - except Exception as e: - failure_cb(e) - else: - success_cb() - - def get_worker(self): - if self._worker is None or not self._worker.is_alive(): - self._worker = AsyncWorker() - return self._worker - - def failure_callback(self, error): - log.error("Failed to report a trace, %s", error) - - def success_callback(self): - pass - - -class AsyncWorker(object): - _terminator = object() - - def __init__(self, shutdown_timeout=DEFAULT_TIMEOUT): - self._queue = Queue(-1) - self._lock = threading.Lock() - self._thread = None - self.options = { - 'shutdown_timeout': shutdown_timeout, - } - self.start() - - def is_alive(self): - return self._thread.is_alive() - - def main_thread_terminated(self): - self._lock.acquire() - try: - if not self._thread: - # thread not started or already stopped - nothing to do - return - - # wake the processing thread up - self._queue.put_nowait(self._terminator) - - timeout = self.options['shutdown_timeout'] - - # wait briefly, initially - initial_timeout = 0.1 - if timeout < initial_timeout: - initial_timeout = timeout - - if not self._timed_queue_join(initial_timeout): - # if that didn't work, wait a bit longer - # NB that size is an approximation, because other threads may - # add or remove items - size = self._queue.qsize() - - print("Sentry is attempting to send %i pending error messages" - % size) - print("Waiting up to %s seconds" % timeout) - - if os.name == 'nt': - print("Press Ctrl-Break to quit") - else: - print("Press Ctrl-C to quit") - - self._timed_queue_join(timeout - initial_timeout) - - self._thread = None - - finally: - self._lock.release() - - def _timed_queue_join(self, timeout): - """ - implementation of Queue.join which takes a 'timeout' argument - - returns true on success, false on timeout - """ - deadline = time() + timeout - queue = self._queue - - queue.all_tasks_done.acquire() - try: - while queue.unfinished_tasks: - delay = deadline - time() - if delay <= 0: - # timed out - return False - - queue.all_tasks_done.wait(timeout=delay) - - return True - - finally: - queue.all_tasks_done.release() - - def start(self): - """ - Starts the task thread. - """ - self._lock.acquire() - try: - if not self._thread: - self._thread = threading.Thread(target=self._target) - self._thread.setDaemon(True) - self._thread.start() - finally: - self._lock.release() - atexit.register(self.main_thread_terminated) - - def stop(self, timeout=None): - """ - Stops the task thread. Synchronous! - """ - self._lock.acquire() - try: - if self._thread: - self._queue.put_nowait(self._terminator) - self._thread.join(timeout=timeout) - self._thread = None - finally: - self._lock.release() - - def queue(self, callback, *args, **kwargs): - self._queue.put_nowait((callback, args, kwargs)) - - def _target(self): - while True: - record = self._queue.get() - try: - if record is self._terminator: - break - callback, args, kwargs = record - try: - callback(*args, **kwargs) - except Exception: - log.error('Failed processing job', exc_info=True) - finally: - self._queue.task_done() - - sleep(0) diff --git a/ddtrace/sampler.py b/ddtrace/sampler.py new file mode 100644 index 00000000000..d9fa0dd71ce --- /dev/null +++ b/ddtrace/sampler.py @@ -0,0 +1,29 @@ +import logging + +from .span import MAX_TRACE_ID + +log = logging.getLogger(__name__) + + +class RateSampler(object): + """RateSampler manages the client-side trace sampling based on a rate + + Keep (100 * sample_rate)% of the traces. + Any `sampled = False` trace won't be written, and can be ignored by the instrumentation. + It samples randomly, its main purpose is to reduce the instrumentation footprint. + """ + + def __init__(self, sample_rate): + if sample_rate <= 0: + log.error("sample_rate is negative or null, disable the Sampler") + sample_rate = 1 + elif sample_rate > 1: + sample_rate = 1 + + self.sample_rate = sample_rate + self.sampling_id_threshold = sample_rate * MAX_TRACE_ID + + def sample(self, span): + span.sampled = span.trace_id <= self.sampling_id_threshold + # `weight` is an attribute applied to all spans to help scaling related statistics + span.weight = 1 / (self.sample_rate or 1) diff --git a/ddtrace/span.py b/ddtrace/span.py index 0da43e25341..2470defa714 100644 --- a/ddtrace/span.py +++ b/ddtrace/span.py @@ -53,6 +53,10 @@ def __init__(self, self.span_id = span_id or _new_id() self.parent_id = parent_id + # sampling + self.sampled = True + self.weight = 1 + self._tracer = tracer self._parent = None @@ -74,6 +78,7 @@ def to_dict(self): 'resource' : self.resource, 'name' : self.name, 'error': self.error, + 'weight': self.weight, } if self.start: @@ -185,6 +190,7 @@ def __repr__(self): self.name, ) +MAX_TRACE_ID = 2 ** 63 def _new_id(): """Generate a random trace_id""" return random.getrandbits(63) diff --git a/ddtrace/tracer.py b/ddtrace/tracer.py index b1ca15224fd..adda9045de3 100644 --- a/ddtrace/tracer.py +++ b/ddtrace/tracer.py @@ -1,8 +1,8 @@ - import logging import threading from .buffer import ThreadLocalSpanBuffer +from .sampler import RateSampler from .span import Span from .writer import AgentWriter @@ -12,16 +12,18 @@ class Tracer(object): - def __init__(self, enabled=True, writer=None, span_buffer=None): + def __init__(self, enabled=True, writer=None, span_buffer=None, sample_rate=1): """ Create a new tracer object. - enabled: if False, no spans will be submitted to the writer. - + enabled: if False, no spans will be submitted to the writer writer: an instance of Writer span_buffer: a span buffer instance. used to store inflight traces. by - default, will use thread local storage. + default, will use thread local storage + sample_rate: Pre-sampling rate. """ + self.enabled = enabled + self._writer = writer or AgentWriter() self._span_buffer = span_buffer or ThreadLocalSpanBuffer() @@ -29,7 +31,7 @@ def __init__(self, enabled=True, writer=None, span_buffer=None): self._spans_lock = threading.Lock() self._spans = [] - self.enabled = enabled + self.sampler = RateSampler(sample_rate) # A hook for local debugging. shouldn't be needed or used # in production. @@ -49,26 +51,31 @@ def trace(self, name, service=None, resource=None, span_type=None): >>> parent.finish() >>> parent2 = tracer.trace("parent2") # has no parent span """ - # if we have a current span link the parent + child nodes. + span = None parent = self._span_buffer.get() - trace_id, parent_id = None, None - if parent: - trace_id, parent_id = parent.trace_id, parent.span_id - - # Create the trace. - span = Span(self, - name, - service=service, - resource=resource, - trace_id=trace_id, - parent_id=parent_id, - span_type=span_type, - ) - - # if there's a parent, link them and inherit the service. + if parent: + # if we have a current span link the parent + child nodes. + span = Span( + self, + name, + trace_id=parent.trace_id, + parent_id=parent.span_id, + service=(service or parent.service), + resource=resource, + span_type=span_type, + ) span._parent = parent - span.service = span.service or parent.service + span.sampled = parent.sampled + else: + span = Span( + self, + name, + service=service, + resource=resource, + span_type=span_type, + ) + self.sampler.sample(span) # Note the current trace. self._span_buffer.set(span) @@ -84,18 +91,17 @@ def record(self, span): if not self.enabled: return - if self._writer: - spans = None - with self._spans_lock: - self._spans.append(span) - parent = span._parent - self._span_buffer.set(parent) - if not parent: - spans = self._spans - self._spans = [] + spans = [] + with self._spans_lock: + self._spans.append(span) + parent = span._parent + self._span_buffer.set(parent) + if not parent: + spans = self._spans + self._spans = [] - if spans: - self.write(spans) + if self._writer and span.sampled: + self.write(spans) def write(self, spans): """ Submit the given spans to the agent. """ @@ -103,9 +109,6 @@ def write(self, spans): if self.debug_logging: log.debug("submitting %s spans", len(spans)) for span in spans: - log.debug("\n%s" % span.pprint()) + log.debug("\n%s", span.pprint()) self._writer.write(spans) - - - diff --git a/ddtrace/transport.py b/ddtrace/transport.py new file mode 100644 index 00000000000..5131ab942dc --- /dev/null +++ b/ddtrace/transport.py @@ -0,0 +1,177 @@ +""" +The asnyc HTTPReporter is taken from raven.transport.threaded. +""" + +import atexit +import logging +import threading +from time import sleep, time +import os + +# project +from .compat import httplib, Queue + +log = logging.getLogger(__name__) + + +DEFAULT_TIMEOUT = 10 + +class ThreadedHTTPTransport(object): + + # Async worker, to be defined at first run + _worker = None + + def send(self, method, endpoint, data, headers): + return self.async_send( + method, endpoint, data, headers, + self.success_callback, self.failure_callback + ) + + def async_send(self, method, endpoint, data, headers, success_cb, failure_cb): + self.get_worker().queue( + self.send_sync, method, endpoint, data, headers, success_cb, failure_cb) + + def send_sync(self, method, endpoint, data, headers, success_cb, failure_cb): + try: + conn = httplib.HTTPConnection('localhost', 7777) + conn.request(method, endpoint, data, headers) + except Exception as e: + failure_cb(e) + else: + success_cb() + + def get_worker(self): + if self._worker is None or not self._worker.is_alive(): + self._worker = AsyncWorker() + return self._worker + + def failure_callback(self, error): + log.error("Failed to report a trace, %s", error) + + def success_callback(self): + pass + + +class AsyncWorker(object): + _terminator = object() + + def __init__(self, shutdown_timeout=DEFAULT_TIMEOUT): + self._queue = Queue(-1) + self._lock = threading.Lock() + self._thread = None + self.options = { + 'shutdown_timeout': shutdown_timeout, + } + self.start() + + def is_alive(self): + return self._thread.is_alive() + + def main_thread_terminated(self): + self._lock.acquire() + try: + if not self._thread: + # thread not started or already stopped - nothing to do + return + + # wake the processing thread up + self._queue.put_nowait(self._terminator) + + timeout = self.options['shutdown_timeout'] + + # wait briefly, initially + initial_timeout = 0.1 + if timeout < initial_timeout: + initial_timeout = timeout + + if not self._timed_queue_join(initial_timeout): + # if that didn't work, wait a bit longer + # NB that size is an approximation, because other threads may + # add or remove items + size = self._queue.qsize() + + print("Sentry is attempting to send %i pending error messages" + % size) + print("Waiting up to %s seconds" % timeout) + + if os.name == 'nt': + print("Press Ctrl-Break to quit") + else: + print("Press Ctrl-C to quit") + + self._timed_queue_join(timeout - initial_timeout) + + self._thread = None + + finally: + self._lock.release() + + def _timed_queue_join(self, timeout): + """ + implementation of Queue.join which takes a 'timeout' argument + + returns true on success, false on timeout + """ + deadline = time() + timeout + queue = self._queue + + queue.all_tasks_done.acquire() + try: + while queue.unfinished_tasks: + delay = deadline - time() + if delay <= 0: + # timed out + return False + + queue.all_tasks_done.wait(timeout=delay) + + return True + + finally: + queue.all_tasks_done.release() + + def start(self): + """ + Starts the task thread. + """ + self._lock.acquire() + try: + if not self._thread: + self._thread = threading.Thread(target=self._target) + self._thread.setDaemon(True) + self._thread.start() + finally: + self._lock.release() + atexit.register(self.main_thread_terminated) + + def stop(self, timeout=None): + """ + Stops the task thread. Synchronous! + """ + self._lock.acquire() + try: + if self._thread: + self._queue.put_nowait(self._terminator) + self._thread.join(timeout=timeout) + self._thread = None + finally: + self._lock.release() + + def queue(self, callback, *args, **kwargs): + self._queue.put_nowait((callback, args, kwargs)) + + def _target(self): + while True: + record = self._queue.get() + try: + if record is self._terminator: + break + callback, args, kwargs = record + try: + callback(*args, **kwargs) + except Exception: + log.error('Failed processing job', exc_info=True) + finally: + self._queue.task_done() + + sleep(0) diff --git a/ddtrace/writer.py b/ddtrace/writer.py index 8a80f8dfefb..4b4822878be 100644 --- a/ddtrace/writer.py +++ b/ddtrace/writer.py @@ -1,25 +1,10 @@ - from .reporter import AgentReporter -class Writer(object): - - def write(self, spans): - raise NotImplementedError() - - -class NullWriter(Writer): - - def write(self, spans): - pass - - -class AgentWriter(Writer): +class AgentWriter(object): def __init__(self): self._reporter = AgentReporter() - self.enabled = True # flip this to disable on the fly def write(self, spans): - if self.enabled: - self._reporter.report(spans, []) + self._reporter.report(spans, []) diff --git a/tests/test_tracer.py b/tests/test_tracer.py index b43b95fdee3..382698fbd23 100644 --- a/tests/test_tracer.py +++ b/tests/test_tracer.py @@ -3,6 +3,8 @@ """ import time +import random + from nose.tools import eq_ from ddtrace.tracer import Tracer @@ -88,6 +90,37 @@ def test_tracer_disabled(): s.set_tag("a", "b") assert not writer.pop() +def test_sampling(): + writer = DummyWriter() + tracer = Tracer(writer=writer, sample_rate=0.5) + + # Set the seed so that the choice of sampled traces is deterministic, then write tests accordingly + random.seed(4012) + + # First trace, sampled + with tracer.trace("foo") as s: + assert s.sampled + assert s.weight == 2 + assert writer.pop() + + # Second trace, not sampled + with tracer.trace("figh") as s: + assert not s.sampled + s2 = tracer.trace("what") + assert not s2.sampled + s2.finish() + with tracer.trace("ever") as s3: + assert not s3.sampled + s4 = tracer.trace("!") + assert not s4.sampled + s4.finish() + spans = writer.pop() + assert not spans, spans + + # Third trace, not sampled + with tracer.trace("ters") as s: + assert s.sampled + assert writer.pop() class DummyWriter(object):