Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

writer: tag as private, merge worker #988

Merged
merged 2 commits into from
Sep 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 25 additions & 46 deletions ddtrace/writer.py → ddtrace/internal/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import os
import time

from . import api
from . import _worker
from .internal.logger import get_logger
from .. import api
from .. import _worker
from ..internal.logger import get_logger
from ddtrace.vendor.six.moves.queue import Queue, Full, Empty

log = get_logger(__name__)
Expand All @@ -17,59 +17,38 @@
LOG_ERR_INTERVAL = 60


class AgentWriter(object):
class AgentWriter(_worker.PeriodicWorkerThread):

def __init__(self, hostname='localhost', port=8126, uds_path=None, filters=None, priority_sampler=None):
self._pid = None
self._traces = None
self._worker = None
QUEUE_PROCESSING_INTERVAL = 1

def __init__(self, hostname='localhost', port=8126, uds_path=None,
shutdown_timeout=DEFAULT_TIMEOUT,
filters=None, priority_sampler=None):
super(AgentWriter, self).__init__(interval=self.QUEUE_PROCESSING_INTERVAL,
exit_timeout=shutdown_timeout,
name=self.__class__.__name__)
self._reset_queue()
self._filters = filters
self._priority_sampler = priority_sampler
priority_sampling = priority_sampler is not None
self.api = api.API(hostname, port, uds_path=uds_path, priority_sampling=priority_sampling)

def write(self, spans=None, services=None):
# if the worker needs to be reset, do it.
self._reset_worker()
self._last_error_ts = 0
self.api = api.API(hostname, port, uds_path=uds_path,
priority_sampling=priority_sampler is not None)
self.start()

if spans:
self._traces.put(spans)
def _reset_queue(self):
self._pid = os.getpid()
self._trace_queue = Q(maxsize=MAX_TRACES)

def _reset_worker(self):
def write(self, spans=None, services=None):
# if this queue was created in a different process (i.e. this was
# forked) reset everything so that we can safely work from it.
pid = os.getpid()
if self._pid != pid:
log.debug('resetting queues. pids(old:%s new:%s)', self._pid, pid)
self._traces = Q(maxsize=MAX_TRACES)
self._worker = None
self._pid = pid

# ensure we have an active thread working on this queue
if not self._worker or not self._worker.is_alive():
self._worker = AsyncWorker(
self.api,
self._traces,
filters=self._filters,
priority_sampler=self._priority_sampler,
)

self._reset_queue()

class AsyncWorker(_worker.PeriodicWorkerThread):

QUEUE_PROCESSING_INTERVAL = 1

def __init__(self, api, trace_queue, service_queue=None, shutdown_timeout=DEFAULT_TIMEOUT,
filters=None, priority_sampler=None):
super(AsyncWorker, self).__init__(interval=self.QUEUE_PROCESSING_INTERVAL,
exit_timeout=shutdown_timeout,
name=self.__class__.__name__)
self._trace_queue = trace_queue
self._filters = filters
self._priority_sampler = priority_sampler
self._last_error_ts = 0
self.api = api
self.start()
if spans:
self._trace_queue.put(spans)

def flush_queue(self):
try:
Expand Down Expand Up @@ -127,7 +106,7 @@ def _apply_filters(self, traces):
"""
Here we make each trace go through the filters configured in the
tracer. There is no need for a lock since the traces are owned by the
AsyncWorker at that point.
AgentWriter at that point.
"""
if self._filters is not None:
filtered_traces = []
Expand Down
2 changes: 1 addition & 1 deletion ddtrace/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
from .ext.priority import AUTO_REJECT, AUTO_KEEP
from .internal.logger import get_logger
from .internal.runtime import RuntimeTags, RuntimeWorker
from .internal.writer import AgentWriter
from .provider import DefaultContextProvider
from .context import Context
from .sampler import AllSampler, DatadogSampler, RateSampler, RateByServiceSampler
from .span import Span
from .utils.formats import get_env
from .utils.deprecation import deprecated
from .vendor.dogstatsd import DogStatsd
from .writer import AgentWriter
from . import compat


Expand Down
49 changes: 20 additions & 29 deletions tests/test_writer.py → tests/internal/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pytest

from ddtrace.span import Span
from ddtrace.writer import AsyncWorker, Q, Empty
from ddtrace.internal.writer import AgentWriter, Q, Empty


class RemoveAllFilter():
Expand Down Expand Up @@ -45,57 +45,48 @@ def send_traces(self, traces):
self.traces.append(trace)


N_TRACES = 11
class AgentWriterTests(TestCase):
N_TRACES = 11


class AsyncWorkerTests(TestCase):
def setUp(self):
def create_worker(self, filters):
worker = AgentWriter(filters=filters)
self.api = DummmyAPI()
self.traces = Q()
self.services = Q()
for i in range(N_TRACES):
self.traces.put([
worker.api = self.api
for i in range(self.N_TRACES):
worker.write([
Span(tracer=None, name='name', trace_id=i, span_id=j, parent_id=j - 1 or None)
for j in range(7)
])
worker.stop()
worker.join()
return worker

def test_filters_keep_all(self):
filtr = KeepAllFilter()
filters = [filtr]
worker = AsyncWorker(self.api, self.traces, self.services, filters=filters)
worker.stop()
worker.join()
self.assertEqual(len(self.api.traces), N_TRACES)
self.assertEqual(filtr.filtered_traces, N_TRACES)
self.create_worker([filtr])
self.assertEqual(len(self.api.traces), self.N_TRACES)
self.assertEqual(filtr.filtered_traces, self.N_TRACES)

def test_filters_remove_all(self):
filtr = RemoveAllFilter()
filters = [filtr]
worker = AsyncWorker(self.api, self.traces, self.services, filters=filters)
worker.stop()
worker.join()
self.create_worker([filtr])
self.assertEqual(len(self.api.traces), 0)
self.assertEqual(filtr.filtered_traces, N_TRACES)
self.assertEqual(filtr.filtered_traces, self.N_TRACES)

def test_filters_add_tag(self):
tag_name = 'Tag'
filtr = AddTagFilter(tag_name)
filters = [filtr]
worker = AsyncWorker(self.api, self.traces, self.services, filters=filters)
worker.stop()
worker.join()
self.assertEqual(len(self.api.traces), N_TRACES)
self.assertEqual(filtr.filtered_traces, N_TRACES)
self.create_worker([filtr])
self.assertEqual(len(self.api.traces), self.N_TRACES)
self.assertEqual(filtr.filtered_traces, self.N_TRACES)
for trace in self.api.traces:
for span in trace:
self.assertIsNotNone(span.get_tag(tag_name))

def test_filters_short_circuit(self):
filtr = KeepAllFilter()
filters = [RemoveAllFilter(), filtr]
worker = AsyncWorker(self.api, self.traces, self.services, filters=filters)
worker.stop()
worker.join()
self.create_worker(filters)
self.assertEqual(len(self.api.traces), 0)
self.assertEqual(filtr.filtered_traces, 0)

Expand Down
14 changes: 7 additions & 7 deletions tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ def tearDown(self):
"""
Stop running worker
"""
self.tracer.writer._worker.stop()
self._wait_thread_flush()

def _wait_thread_flush(self):
"""
Helper that waits for the thread flush
"""
self.tracer.writer._worker.stop()
self.tracer.writer._worker.join(None)
self.tracer.writer.stop()
self.tracer.writer.join(None)

def _get_endpoint_payload(self, calls, endpoint):
"""
Expand All @@ -105,7 +105,7 @@ def test_worker_single_trace_uds(self):
self.tracer.configure(uds_path='/tmp/ddagent/trace.sock')
# Write a first trace so we get a _worker
self.tracer.trace('client.testing').finish()
worker = self.tracer.writer._worker
worker = self.tracer.writer
worker._log_error_status = mock.Mock(
worker._log_error_status, wraps=worker._log_error_status,
)
Expand All @@ -120,7 +120,7 @@ def test_worker_single_trace_uds_wrong_socket_path(self):
self.tracer.configure(uds_path='/tmp/ddagent/nosockethere')
# Write a first trace so we get a _worker
self.tracer.trace('client.testing').finish()
worker = self.tracer.writer._worker
worker = self.tracer.writer
worker._log_error_status = mock.Mock(
worker._log_error_status, wraps=worker._log_error_status,
)
Expand Down Expand Up @@ -190,12 +190,12 @@ def test_worker_http_error_logging(self):
self.tracer.writer.api = FlawedAPI(Tracer.DEFAULT_HOSTNAME, Tracer.DEFAULT_PORT)
tracer.trace('client.testing').finish()

log = logging.getLogger('ddtrace.writer')
log = logging.getLogger('ddtrace.internal.writer')
log_handler = MockedLogHandler(level='DEBUG')
log.addHandler(log_handler)

self._wait_thread_flush()
assert tracer.writer._worker._last_error_ts < time.time()
assert tracer.writer._last_error_ts < time.time()

logged_errors = log_handler.messages['error']
assert len(logged_errors) == 1
Expand Down
2 changes: 1 addition & 1 deletion tests/utils/tracer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from collections import deque
from ddtrace.encoding import JSONEncoder, MsgpackEncoder
from ddtrace.internal.writer import AgentWriter
from ddtrace.tracer import Tracer
from ddtrace.writer import AgentWriter
from ddtrace.compat import PY3


Expand Down