Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client sampling #8

Merged
merged 12 commits into from
Jun 30, 2016
4 changes: 4 additions & 0 deletions ddtrace/contrib/elasticsearch/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ def perform_request(self, method, url, params=None, body=None):
This is ConnectionClass-agnostic.
"""
with self._datadog_tracer.trace("elasticsearch.query") as s:
# Don't instrument if the trace is sampled
if s.sampled:
Copy link
Contributor

@clutchski clutchski Jun 29, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think this should be mandatory. in fact, people might depend on it for logging, etc. if it were me, sampling would only be for submission.

return super(TracedTransport, self).perform_request(method, url, params=params, body=body)

s.service = self._datadog_service
s.span_type = SPAN_TYPE
s.set_tag(metadata.METHOD, method)
Expand Down
31 changes: 16 additions & 15 deletions ddtrace/contrib/flask/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,22 @@ def _finish_span(self, response=None, exception=None):
""" Close and finsh the active span if it exists. """
span = getattr(g, 'flask_datadog_span', None)
if span:
error = 0
code = response.status_code if response else None

# if we didn't get a response, but we did get an exception, set
# codes accordingly.
if not response and exception:
error = 1
code = 500
span.set_tag(errors.ERROR_TYPE, type(exception))
span.set_tag(errors.ERROR_MSG, exception)

span.resource = str(request.endpoint or "").lower()
span.set_tag(http.URL, str(request.base_url or ""))
span.set_tag(http.STATUS_CODE, code)
span.error = error
if not span.sampled:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it confusing, I'd rather see the spans "sampled" to be the one we keep, as per definition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely right, switching them.

error = 0
code = response.status_code if response else None

# if we didn't get a response, but we did get an exception, set
# codes accordingly.
if not response and exception:
error = 1
code = 500
span.set_tag(errors.ERROR_TYPE, type(exception))
span.set_tag(errors.ERROR_MSG, exception)

span.resource = str(request.endpoint or "").lower()
span.set_tag(http.URL, str(request.base_url or ""))
span.set_tag(http.STATUS_CODE, code)
span.error = error
span.finish()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be in the if block as well ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The span already got created. Even if it is not sampled (and we didn't put the various attributes), we still have to finish it (to keep a valid tree).

# Clear our span just in case.
g.flask_datadog_span = None
Expand Down
3 changes: 3 additions & 0 deletions ddtrace/contrib/psycopg/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ def execute(self, query, vars=None):
return cursor.execute(self, query, vars)

with self._datadog_tracer.trace("postgres.query") as s:
if s.sampled:
return super(TracedCursor, self).execute(query, vars)

s.resource = query
s.service = self._datadog_service
s.span_type = sqlx.TYPE
Expand Down
27 changes: 7 additions & 20 deletions ddtrace/contrib/pylons/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,22 @@ def __init__(self, app, tracer, service="pylons"):
self._tracer = tracer

def __call__(self, environ, start_response):
span = None
try:
span = self._tracer.trace("pylons.request", service=self._service, span_type=http.TYPE)
log.debug("Initialize new trace %d", span.trace_id)
with self._tracer.trace("pylons.request", service=self._service, span_type=http.TYPE) as span:

if span.sampled:
return self.app(environ, start_response)

def _start_response(status, *args, **kwargs):
""" a patched response callback which will pluck some metadata. """
span.span_type = http.TYPE
http_code = int(status.split()[0])
span.set_tag(http.STATUS_CODE, http_code)
if http_code >= 500:
span.error = 1
return start_response(status, *args, **kwargs)
except Exception:
log.exception("error starting span")

try:
return self.app(environ, _start_response)
except Exception:
if span:
span.set_traceback()
raise
finally:
if not span:
return

try:
return self.app(environ, _start_response)
finally:
controller = environ.get('pylons.routes_dict', {}).get('controller')
action = environ.get('pylons.routes_dict', {}).get('action')
span.resource = "%s.%s" % (controller, action)
Expand All @@ -50,6 +40,3 @@ def _start_response(status, *args, **kwargs):
"pylons.route.controller": controller,
"pylons.route.action": action,
})
span.finish()
except Exception:
log.exception("Error finishing trace")
4 changes: 4 additions & 0 deletions ddtrace/contrib/sqlite3/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ def execute(self, sql, *args, **kwargs):
return Cursor.execute(self, sql, *args, **kwargs)

with self._datadog_tracer.trace("sqlite3.query", span_type=sqlx.TYPE) as s:
# Don't instrument if the trace is sampled
if s.sampled:
return Cursor.execute(self, sql, *args, **kwargs)

s.set_tag(sqlx.QUERY, sql)
s.service = self._datadog_service
s.resource = sql # will be normalized
Expand Down
184 changes: 4 additions & 180 deletions ddtrace/reporter.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,12 @@
"""
Report spans to the Agent API.

The asnyc HTTPReporter is taken from raven.transport.threaded.
"""

import atexit
from .compat import httplib
import logging
import threading
from time import sleep, time
import os
from time import time

# project
from .compat import Queue, json


DEFAULT_TIMEOUT = 10
from .compat import json
from .transport import ThreadedHTTPTransport


log = logging.getLogger(__name__)
Expand All @@ -24,17 +15,11 @@
class AgentReporter(object):
SERVICES_FLUSH_INTERVAL = 60

def __init__(self, disabled=False, config=None):
self.disabled = disabled
self.config = config
def __init__(self):
self.transport = ThreadedHTTPTransport()
self.last_services_flush = 0

def report(self, spans, services):
if self.disabled:
log.debug("Trace reporter disabled, skip flushing")
return

if spans:
self.send_spans(spans)
if services:
Expand All @@ -54,164 +39,3 @@ def send_services(self, services):
data = json.dumps(services)
headers = {}
self.transport.send("PUT", "/services", data, headers)


class ThreadedHTTPTransport(object):

# Async worker, to be defined at first run
_worker = None

def send(self, method, endpoint, data, headers):
return self.async_send(
method, endpoint, data, headers,
self.success_callback, self.failure_callback
)

def async_send(self, method, endpoint, data, headers, success_cb, failure_cb):
self.get_worker().queue(
self.send_sync, method, endpoint, data, headers, success_cb, failure_cb)

def send_sync(self, method, endpoint, data, headers, success_cb, failure_cb):
try:
conn = httplib.HTTPConnection('localhost', 7777)
conn.request(method, endpoint, data, headers)
except Exception as e:
failure_cb(e)
else:
success_cb()

def get_worker(self):
if self._worker is None or not self._worker.is_alive():
self._worker = AsyncWorker()
return self._worker

def failure_callback(self, error):
log.error("Failed to report a trace, %s", error)

def success_callback(self):
pass


class AsyncWorker(object):
_terminator = object()

def __init__(self, shutdown_timeout=DEFAULT_TIMEOUT):
self._queue = Queue(-1)
self._lock = threading.Lock()
self._thread = None
self.options = {
'shutdown_timeout': shutdown_timeout,
}
self.start()

def is_alive(self):
return self._thread.is_alive()

def main_thread_terminated(self):
self._lock.acquire()
try:
if not self._thread:
# thread not started or already stopped - nothing to do
return

# wake the processing thread up
self._queue.put_nowait(self._terminator)

timeout = self.options['shutdown_timeout']

# wait briefly, initially
initial_timeout = 0.1
if timeout < initial_timeout:
initial_timeout = timeout

if not self._timed_queue_join(initial_timeout):
# if that didn't work, wait a bit longer
# NB that size is an approximation, because other threads may
# add or remove items
size = self._queue.qsize()

print("Sentry is attempting to send %i pending error messages"
% size)
print("Waiting up to %s seconds" % timeout)

if os.name == 'nt':
print("Press Ctrl-Break to quit")
else:
print("Press Ctrl-C to quit")

self._timed_queue_join(timeout - initial_timeout)

self._thread = None

finally:
self._lock.release()

def _timed_queue_join(self, timeout):
"""
implementation of Queue.join which takes a 'timeout' argument

returns true on success, false on timeout
"""
deadline = time() + timeout
queue = self._queue

queue.all_tasks_done.acquire()
try:
while queue.unfinished_tasks:
delay = deadline - time()
if delay <= 0:
# timed out
return False

queue.all_tasks_done.wait(timeout=delay)

return True

finally:
queue.all_tasks_done.release()

def start(self):
"""
Starts the task thread.
"""
self._lock.acquire()
try:
if not self._thread:
self._thread = threading.Thread(target=self._target)
self._thread.setDaemon(True)
self._thread.start()
finally:
self._lock.release()
atexit.register(self.main_thread_terminated)

def stop(self, timeout=None):
"""
Stops the task thread. Synchronous!
"""
self._lock.acquire()
try:
if self._thread:
self._queue.put_nowait(self._terminator)
self._thread.join(timeout=timeout)
self._thread = None
finally:
self._lock.release()

def queue(self, callback, *args, **kwargs):
self._queue.put_nowait((callback, args, kwargs))

def _target(self):
while True:
record = self._queue.get()
try:
if record is self._terminator:
break
callback, args, kwargs = record
try:
callback(*args, **kwargs)
except Exception:
log.error('Failed processing job', exc_info=True)
finally:
self._queue.task_done()

sleep(0)
16 changes: 16 additions & 0 deletions ddtrace/sampler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from .span import MAX_TRACE_ID

class Sampler(object):
"""Sampler manages the client-side trace sampling

Keep (100 * sample_rate)% of the traces.
Any sampled trace should be entirely ignored by the instrumentation and won't be written.
It samples randomly, its main purpose is to reduce the instrumentation footprint.
"""

def __init__(self, sample_rate):
self.sample_rate = sample_rate
self.sampling_id_threshold = sample_rate * MAX_TRACE_ID

def should_sample(self, span):
return span.trace_id >= self.sampling_id_threshold
6 changes: 6 additions & 0 deletions ddtrace/span.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ def __init__(self,
self.span_id = span_id or _new_id()
self.parent_id = parent_id

# sampling
self.sampled = False
self.weight = 1

self._tracer = tracer
self._parent = None

Expand All @@ -74,6 +78,7 @@ def to_dict(self):
'resource' : self.resource,
'name' : self.name,
'error': self.error,
'weight': self.weight,
}

if self.start:
Expand Down Expand Up @@ -185,6 +190,7 @@ def __repr__(self):
self.name,
)

MAX_TRACE_ID = 2 ** 63
def _new_id():
"""Generate a random trace_id"""
return random.getrandbits(63)
Expand Down
Loading