From a22d48ff924e8cca1de568148a03bf0d5404a95a Mon Sep 17 00:00:00 2001 From: "Christian Mauduit (DataDog)" Date: Wed, 8 Nov 2017 16:26:52 -0500 Subject: [PATCH 1/4] [tracer] reduce memory usage on high-cardinality traces For big traces (typically, long-running traces with one enclosing span and many sub-spans, possibly several thousands) the library could keep everything in memory waiting for an hypothetical flush. This patch partially flushes consistent parts of traces, so that they don't fill up the RAM. --- lib/ddtrace/context.rb | 71 ++++++- lib/ddtrace/context_flush.rb | 132 ++++++++++++ lib/ddtrace/tracer.rb | 23 ++- test/context_flush_test.rb | 376 +++++++++++++++++++++++++++++++++++ test/context_test.rb | 98 ++++++++- test/span_test.rb | 1 - 6 files changed, 694 insertions(+), 7 deletions(-) create mode 100644 lib/ddtrace/context_flush.rb create mode 100644 test/context_flush_test.rb diff --git a/lib/ddtrace/context.rb b/lib/ddtrace/context.rb index bd6ec141292..8b852a6baf3 100644 --- a/lib/ddtrace/context.rb +++ b/lib/ddtrace/context.rb @@ -13,10 +13,19 @@ module Datadog # \Context, it will be related to the original trace. # # This data structure is thread-safe. + # rubocop:disable Metrics/ClassLength class Context + # 100k spans is about a 100Mb footprint + DEFAULT_MAX_LENGTH = 100_000 + + attr_reader :max_length + # Initialize a new thread-safe \Context. def initialize(options = {}) @mutex = Mutex.new + # max_length is the amount of spans above which, for a given trace, + # the context will simply drop and ignore spans, avoiding high memory usage. + @max_length = options.fetch(:max_length, DEFAULT_MAX_LENGTH) reset(options) end @@ -78,6 +87,16 @@ def set_current_span(span) # Add a span to the context trace list, keeping it as the last active span. def add_span(span) @mutex.synchronize do + # If hitting the hard limit, just drop spans. This is really a rare case + # as it means despite the soft limit, the hard limit is reached, so the trace + # by default has 10000 spans, all of which belong to unfinished parts of a + # larger trace. This is a catch-all to reduce global memory usage. + if @max_length > 0 && @trace.length >= @max_length + Datadog::Tracer.log.debug("context full, ignoring span #{span.name}") + # Detach the span from any context, it's being dropped and ignored. + span.context = nil + return + end set_current_span(span) @trace << span span.context = self @@ -135,14 +154,16 @@ def sampled? # This operation is thread-safe. def get @mutex.synchronize do - return nil, nil unless check_finished_spans - trace = @trace sampled = @sampled + attach_sampling_priority if sampled && @sampling_priority + # still return sampled attribute, even if context is not finished + return nil, sampled unless check_finished_spans() + reset - return trace, sampled + [trace, sampled] end end @@ -161,6 +182,50 @@ def attach_sampling_priority ) end + # Return the start time of the root span, or nil if there are no spans or this is undefined. + def start_time + @mutex.synchronize do + return nil if @trace.empty? + @trace[0].start_time + end + end + + # Return the length of the current trace held by this context. + def length + @mutex.synchronize do + @trace.length + end + end + + # Iterate on each span within the trace. This is thread safe. + def each_span + @mutex.synchronize do + @trace.each do |span| + yield span + end + end + end + + # Delete any span matching the condition. This is thread safe. + def delete_span_if + @mutex.synchronize do + @trace.delete_if do |span| + finished = span.finished? + delete_span = yield span + if delete_span + # We need to detach the span from the context, else, some code + # finishing it afterwards would mess up with the number of + # finished_spans and possibly cause other side effects. + span.context = nil + # Acknowledge there's one span less to finish, if needed. + # It's very important to keep this balanced. + @finished_spans -= 1 if finished + end + delete_span + end + end + end + private :reset private :check_finished_spans private :set_current_span diff --git a/lib/ddtrace/context_flush.rb b/lib/ddtrace/context_flush.rb new file mode 100644 index 00000000000..b44792d1e3d --- /dev/null +++ b/lib/ddtrace/context_flush.rb @@ -0,0 +1,132 @@ +require 'set' + +require 'ddtrace/context' + +module Datadog + # \ContextFlush is used to cap context size and avoid it using too much memory. + # It performs memory flushes when required. + class ContextFlush + # by default, soft and hard limits are the same + DEFAULT_MAX_SPANS_BEFORE_PARTIAL_FLUSH = Datadog::Context::DEFAULT_MAX_LENGTH + # by default, never do a partial flush + DEFAULT_MIN_SPANS_BEFORE_PARTIAL_FLUSH = Datadog::Context::DEFAULT_MAX_LENGTH + # timeout should be lower than the trace agent window + DEFAULT_PARTIAL_FLUSH_TIMEOUT = 10 + + private_constant :DEFAULT_MAX_SPANS_BEFORE_PARTIAL_FLUSH + private_constant :DEFAULT_MIN_SPANS_BEFORE_PARTIAL_FLUSH + private_constant :DEFAULT_PARTIAL_FLUSH_TIMEOUT + + def initialize(options = {}) + # max_spans_before_partial_flush is the amount of spans collected before + # the context starts to partially flush parts of traces. With a setting of 10k, + # the memory overhead is about 10Mb per thread/context (depends on spans metadata, + # this is just an order of magnitude). + @max_spans_before_partial_flush = options.fetch(:max_spans_before_partial_flush, + DEFAULT_MAX_SPANS_BEFORE_PARTIAL_FLUSH) + # min_spans_before_partial_flush is the minimum number of spans required + # for a partial flush to happen on a timeout. This is to prevent partial flush + # of traces which last a very long time but yet have few spans. + @min_spans_before_partial_flush = options.fetch(:min_spans_before_partial_flush, + DEFAULT_MIN_SPANS_BEFORE_PARTIAL_FLUSH) + # partial_flush_timeout is the limit (in seconds) above which the context + # considers flushing parts of the trace. Partial flushes should not be done too + # late else the agent rejects them with a "too far in the past" error. + @partial_flush_timeout = options.fetch(:partial_flush_timeout, + DEFAULT_PARTIAL_FLUSH_TIMEOUT) + @partial_traces = [] + end + + def add_children(m, spans, ids, leaf) + spans << leaf + ids.add(leaf.span_id) + + if m[leaf.span_id] + m[leaf.span_id].each do |sub| + add_children(m, spans, ids, sub) + end + end + end + + def partial_traces(context) + # 1st step, taint all parents of an unfinished span as unflushable + unflushable_ids = Set.new + + context.each_span do |span| + next if span.finished? || unflushable_ids.include?(span.span_id) + unflushable_ids.add span.span_id + while span.parent + span = span.parent + unflushable_ids.add span.span_id + end + end + + # 2nd step, find all spans which are at the border between flushable and unflushable + # Along the road, collect a reverse-tree which allows direct walking from parents to + # children but only for the ones we're interested it. + roots = [] + children_map = {} + context.each_span do |span| + # There's no point in trying to put the real root in those partial roots, if + # it's flushable, the default algorithm would figure way more quickly. + if span.parent && !unflushable_ids.include?(span.span_id) + if unflushable_ids.include?(span.parent.span_id) + # span is flushable but is parent is not + roots << span + else + # span is flushable and its parent is too, build the reverse + # parent to child map for this one, it will be useful + children_map[span.parent.span_id] ||= [] + children_map[span.parent.span_id] << span + end + end + end + + # 3rd step, find all children, as this can be costly, only perform it for partial roots + partial_traces = [] + all_ids = Set.new + roots.each do |root| + spans = [] + add_children(children_map, spans, all_ids, root) + partial_traces << spans + end + + return [nil, nil] if partial_traces.empty? + [partial_traces, all_ids] + end + + def partial_flush(context) + traces, flushed_ids = partial_traces(context) + return nil unless traces && flushed_ids + + # We need to reject by span ID and not by value, because a span + # value may be altered (typical example: it's finished by some other thread) + # since we lock only the context, not all the spans which belong to it. + context.delete_span_if { |span| flushed_ids.include? span.span_id } + traces + end + + # Performs an operation which each partial trace it can get from the context. + def each_partial_trace(context) + start_time = context.start_time + length = context.length + # Stop and do not flush anything if there are not enough spans. + return if length <= @min_spans_before_partial_flush + # If there are enough spans, but not too many, check for start time. + # If timeout is not given or 0, then wait + return if length <= @max_spans_before_partial_flush && + (@partial_flush_timeout.nil? || @partial_flush_timeout <= 0 || + (start_time && start_time > Time.now.utc - @partial_flush_timeout)) + # Here, either the trace is old or we have too many spans, flush it. + traces = partial_flush(context) + return unless traces + traces.each do |trace| + yield trace + end + end + + private :add_children + private :partial_traces + private :partial_flush + end +end diff --git a/lib/ddtrace/tracer.rb b/lib/ddtrace/tracer.rb index fb27584cee8..eab03865157 100644 --- a/lib/ddtrace/tracer.rb +++ b/lib/ddtrace/tracer.rb @@ -5,6 +5,7 @@ require 'ddtrace/span' require 'ddtrace/context' +require 'ddtrace/context_flush' require 'ddtrace/provider' require 'ddtrace/logger' require 'ddtrace/writer' @@ -97,6 +98,8 @@ def initialize(options = {}) @provider = options.fetch(:context_provider, Datadog::DefaultContextProvider.new) @provider ||= Datadog::DefaultContextProvider.new # @provider should never be nil + @context_flush = Datadog::ContextFlush.new(options) + @mutex = Mutex.new @services = {} @tags = {} @@ -117,8 +120,13 @@ def configure(options = {}) enabled = options.fetch(:enabled, nil) hostname = options.fetch(:hostname, nil) port = options.fetch(:port, nil) + + # Those are rare "power-user" options. sampler = options.fetch(:sampler, nil) priority_sampling = options[:priority_sampling] + max_spans_before_partial_flush = options.fetch(:max_spans_before_partial_flush, nil) + min_spans_before_partial_flush = options.fetch(:max_spans_before_partial_flush, nil) + partial_flush_timeout = options.fetch(:partial_flush_timeout, nil) @enabled = enabled unless enabled.nil? @sampler = sampler unless sampler.nil? @@ -130,6 +138,10 @@ def configure(options = {}) @writer.transport.hostname = hostname unless hostname.nil? @writer.transport.port = port unless port.nil? + + @context_flush = Datadog::ContextFlush.new(options) unless min_spans_before_partial_flush.nil? && + max_spans_before_partial_flush.nil? && + partial_flush_timeout.nil? end # Set the information about the given service. A valid example is: @@ -295,8 +307,15 @@ def record(context) context = context.context if context.is_a?(Datadog::Span) return if context.nil? trace, sampled = context.get - ready = !trace.nil? && !trace.empty? && sampled - write(trace) if ready + if sampled + if trace.nil? || trace.empty? + @context_flush.each_partial_trace(context) do |t| + write(t) + end + else + write(trace) + end + end end # Return the current active span or +nil+. diff --git a/test/context_flush_test.rb b/test/context_flush_test.rb new file mode 100644 index 00000000000..947ac531796 --- /dev/null +++ b/test/context_flush_test.rb @@ -0,0 +1,376 @@ +require 'helper' +require 'ddtrace/tracer' +require 'ddtrace/context_flush' + +class ContextFlushEachTest < Minitest::Test + def test_each_partial_trace_typical_not_enough_traces + tracer = get_test_tracer + context_flush = Datadog::ContextFlush.new + context = tracer.call_context + + context_flush.each_partial_trace(context) do |_t| + flunk('nothing should be partially flushed, no spans') + end + + # the plan: + # + # root-------------. + # | \______ \ + # | \ \ + # child1 child3 child4 + # | | \_____ + # | | \ + # child2 child5 child6 + + tracer.trace('root') do + tracer.trace('child1') do + tracer.trace('child2') do + end + end + tracer.trace('child3') do + # finished spans are CAPITALIZED + # + # root + # | \______ + # | \ + # CHILD1 child3 + # | + # | + # CHILD2 + context_flush.each_partial_trace(context) do |t| + flunk("nothing should be partially flushed, got: #{t}") + end + end + tracer.trace('child4') do + tracer.trace('child5') do + end + tracer.trace('child6') do + end + end + # finished spans are CAPITALIZED + # + # root-------------. + # | \______ \ + # | \ \ + # CHILD1 CHILD3 CHILD4 + # | | \_____ + # | | \ + # CHILD2 CHILD5 CHILD6 + context_flush.each_partial_trace(context) do |t| + flunk("nothing should be partially flushed, got: #{t}") + end + end + + context_flush.each_partial_trace(context) do |t| + flunk("nothing should be partially flushed, got: #{t}") + end + + assert_equal(0, context.length, 'everything should be written by now') + end + + def test_each_partial_trace_typical + tracer = get_test_tracer + context_flush = Datadog::ContextFlush.new(min_spans_before_partial_flush: 1, + max_spans_before_partial_flush: 1) + context = tracer.call_context + + # the plan: + # + # root-------------. + # | \______ \ + # | \ \ + # child1 child3 child4 + # | | \_____ + # | | \ + # child2 child5 child6 + + action12 = Minitest::Mock.new + action12.expect(:call_with_names, nil, [%w[child1 child2].to_set]) + action3456 = Minitest::Mock.new + action3456.expect(:call_with_names, nil, [['child3'].to_set]) + action3456.expect(:call_with_names, nil, [%w[child4 child5 child6].to_set]) + + tracer.trace('root') do + tracer.trace('child1') do + tracer.trace('child2') do + end + end + tracer.trace('child3') do + # finished spans are CAPITALIZED + # + # root + # | \______ + # | \ + # CHILD1 child3 + # | + # | + # CHILD2 + context_flush.each_partial_trace(context) do |t| + action12.call_with_names(t.map(&:name).to_set) + end + end + tracer.trace('child4') do + tracer.trace('child5') do + end + tracer.trace('child6') do + end + end + # finished spans are CAPITALIZED + # + # root-------------. + # \______ \ + # \ \ + # CHILD3 CHILD4 + # | \_____ + # | \ + # CHILD5 CHILD6 + context_flush.each_partial_trace(context) do |t| + action3456.call_with_names(t.map(&:name).to_set) + end + end + + action12.verify + action3456.verify + + assert_equal(0, context.length, 'everything should be written by now') + end + + # rubocop:disable Metrics/MethodLength + def test_each_partial_trace_mixed + tracer = get_test_tracer + context_flush = Datadog::ContextFlush.new(min_spans_before_partial_flush: 1, + max_spans_before_partial_flush: 1) + context = tracer.call_context + + # the plan: + # + # root + # | \______ + # | \ + # child1 child5 + # | + # | + # child2 + # | \______ + # | \ + # child3 child6 + # | | + # | | + # child4 child7 + + action345 = Minitest::Mock.new + action345.expect(:call_with_names, nil, [%w[child3 child4].to_set]) + action345.expect(:call_with_names, nil, [%w[child5].to_set]) + + root = tracer.start_span('root', child_of: context) + child1 = tracer.start_span('child1', child_of: root) + child2 = tracer.start_span('child2', child_of: child1) + child3 = tracer.start_span('child3', child_of: child2) + child4 = tracer.start_span('child4', child_of: child3) + child5 = tracer.start_span('child5', child_of: root) + child6 = tracer.start_span('child6', child_of: child2) + child7 = tracer.start_span('child7', child_of: child6) + + context_flush.each_partial_trace(context) do |_t| + context_flush.each_partial_trace(context) do |_t| + flunk('nothing should be partially flushed, no span is finished') + end + end + + assert_equal(8, context.length) + + [root, child1, child3, child6].each do |span| + span.finish + context_flush.each_partial_trace(context) do |t| + flunk("nothing should be partially flushed, got: #{t}") + end + end + + # finished spans are CAPITALIZED + # + # ROOT + # | \______ + # | \ + # CHILD1 child5 + # | + # | + # child2 + # | \______ + # | \ + # CHILD3 CHILD6 + # | | + # | | + # child4 child7 + + child2.finish + + context_flush.each_partial_trace(context) do |t| + flunk("nothing should be partially flushed, got: #{t}") + end + + # finished spans are CAPITALIZED + # + # ROOT + # | \______ + # | \ + # CHILD1 child5 + # | + # | + # CHILD2 + # | \______ + # | \ + # CHILD3 CHILD6 + # | | + # | | + # child4 child7 + + child4.finish + child5.finish + + # finished spans are CAPITALIZED + # + # ROOT + # | \______ + # | \ + # CHILD1 CHILD5 + # | + # | + # CHILD2 + # | \______ + # | \ + # CHILD3 CHILD6 + # | | + # | | + # CHILD4 child7 + + context_flush.each_partial_trace(context) do |t| + action345.call_with_names(t.map(&:name).to_set) + end + + child7.finish + + context_flush.each_partial_trace(context) do |t| + flunk("nothing should be partially flushed, got: #{t}") + end + + assert_equal(0, context.length, 'everything should be written by now') + end +end + +module Datadog + class Tracer + attr_accessor :context_flush + end +end + +class ContextFlushPartialTest < Minitest::Test + MIN_SPANS = 10 + MAX_SPANS = 100 + TIMEOUT = 60 # make this very high to reduce test flakiness (1 minute here) + + def get_context_flush + Datadog::ContextFlush.new(min_spans_before_partial_flush: MIN_SPANS, + max_spans_before_partial_flush: MAX_SPANS, + partial_flush_timeout: TIMEOUT) + end + + # rubocop:disable Metrics/AbcSize + def test_partial_caterpillar + tracer = get_test_tracer + context_flush = get_context_flush + tracer.context_flush = context_flush + + write1 = Minitest::Mock.new + expected = [] + MIN_SPANS.times do |i| + expected << "a.#{i}" + end + (MAX_SPANS - MIN_SPANS).times do |i| + expected << "b.#{i}" + end + # We need to sort the values the same way the values will be output by the test transport + expected.sort! + expected.each do |e| + write1.expect(:call_with_name, nil, [e]) + end + + write2 = Minitest::Mock.new + expected = ['root'] + MIN_SPANS.times do |i| + expected << "b.#{i + MAX_SPANS - MIN_SPANS}" + end + # We need to sort the values the same way the values will be output by the test transport + expected.sort! + expected.each do |e| + write2.expect(:call_with_name, nil, [e]) + end + + tracer.trace('root') do + MIN_SPANS.times do |i| + tracer.trace("a.#{i}") do + end + end + spans = tracer.writer.spans() + assert_equal(0, spans.length, 'nothing should be flushed, as max limit is not reached') + MAX_SPANS.times do |i| + tracer.trace("b.#{i}") do + end + end + spans = tracer.writer.spans() + # Let's explain the extra span here, what should happen is: + # - root span is started + # - then 99 spans (10 from 1st batch, 89 from second batch) are put in context + # - then the 101th comes (the 90th from the second batch) and triggers a flush of everything but root span + # - then the last 10 spans from second batch are thrown in, so that's 10 left + the root span + assert_equal(1 + MIN_SPANS, tracer.call_context.length, 'some spans should have been sent') + assert_equal(MAX_SPANS, spans.length) + spans.each do |span| + write1.call_with_name(span.name) + end + write1.verify + end + + spans = tracer.writer.spans() + assert_equal(MIN_SPANS + 1, spans.length) + spans.each do |span| + write2.call_with_name(span.name) + end + write2.verify + end + + # Test the tracer configure args which are forwarded to context flush only. + def test_tracer_configure + tracer = get_test_tracer + + old_context_flush = tracer.context_flush + tracer.configure() + assert_equal(old_context_flush, tracer.context_flush, 'the same context_flush should be reused') + + tracer.configure(min_spans_before_partial_flush: 3, + max_spans_before_partial_flush: 3) + + refute_equal(old_context_flush, tracer.context_flush, 'another context_flush should be have been created') + end + + def test_tracer_hard_limit_overrides_soft_limit + tracer = get_test_tracer + + context = tracer.call_context + tracer.configure(min_spans_before_partial_flush: context.max_length, + max_spans_before_partial_flush: context.max_length, + partial_flush_timeout: 3600) + + n = 1_000_000 + assert_operator(n, :>, context.max_length, 'need to send enough spans') + tracer.trace('root') do + n.times do |_i| + tracer.trace('span.${i}') do + end + spans = tracer.writer.spans() + assert_equal(0, spans.length, 'nothing should be written, soft limit is inhibited') + end + end + spans = tracer.writer.spans() + assert_equal(context.max_length, spans.length, 'size should be capped to hard limit') + end +end diff --git a/test/context_test.rb b/test/context_test.rb index 018e246af51..0d9aebcde7f 100644 --- a/test/context_test.rb +++ b/test/context_test.rb @@ -179,6 +179,7 @@ def test_log_unfinished_spans tracer = get_test_tracer default_log = Datadog::Tracer.log + default_level = Datadog::Tracer.log.level buf = StringIO.new @@ -204,7 +205,7 @@ def test_log_unfinished_spans root.finish() lines = buf.string.lines - assert_equal(3, lines.length, 'there should be 2 log messages') if lines.respond_to? :length + assert_operator(3, :<=, lines.length, 'there should be at least 3 log messages') if lines.respond_to? :length # Test below iterates on lines, this is required for Ruby 1.9 backward compatibility. i = 0 @@ -230,6 +231,7 @@ def test_log_unfinished_spans end Datadog::Tracer.log = default_log + Datadog::Tracer.log.level = default_level end def test_thread_safe @@ -271,6 +273,100 @@ def test_thread_safe assert_nil(ctx.current_span) assert_equal(false, ctx.sampled) end + + def test_length + tracer = get_test_tracer + ctx = Datadog::Context.new + + assert_equal(0, ctx.length) + 10.times do |i| + span = Datadog::Span.new(tracer, "test.op#{i}") + assert_equal(i, ctx.length) + ctx.add_span(span) + assert_equal(i + 1, ctx.length) + ctx.close_span(span) + assert_equal(i + 1, ctx.length) + end + + ctx.get + + assert_equal(0, ctx.length) + end + + def test_start_time + tracer = get_test_tracer + ctx = tracer.call_context + + assert_nil(ctx.start_time) + tracer.trace('test.op') do |span| + assert_equal(span.start_time, ctx.start_time) + end + assert_nil(ctx.start_time) + end + + def test_each_span + span = Datadog::Span.new(nil, 'test.op') + ctx = Datadog::Context.new + ctx.add_span(span) + + action = MiniTest::Mock.new + action.expect(:call_with_name, nil, ['test.op']) + ctx.each_span do |s| + action.call_with_name(s.name) + end + action.verify + end + + def test_delete_span_if + tracer = get_test_tracer + ctx = tracer.call_context + + action = MiniTest::Mock.new + action.expect(:call_with_name, nil, ['test.op2']) + tracer.trace('test.op1') do + tracer.trace('test.op2') do + assert_equal(2, ctx.length) + ctx.delete_span_if { |span| span.name == 'test.op1' } + assert_equal(1, ctx.length) + ctx.each_span do |s| + action.call_with_name(s.name) + end + assert_equal(false, ctx.finished?, 'context is not finished as op2 is not finished') + tracer.trace('test.op3') do + end + assert_equal(2, ctx.length) + ctx.delete_span_if { |span| span.name == 'test.op3' } + assert_equal(1, ctx.length) + end + assert_equal(0, ctx.length, 'op2 has been finished, so context has been finished too') + end + action.verify + end + + def test_max_length + tracer = get_test_tracer + + ctx = Datadog::Context.new + assert_equal(Datadog::Context::DEFAULT_MAX_LENGTH, ctx.max_length) + + max_length = 3 + ctx = Datadog::Context.new(max_length: max_length) + assert_equal(max_length, ctx.max_length) + + spans = [] + (max_length * 2).times do |i| + span = tracer.start_span("test.op#{i}", child_of: ctx) + spans << span + end + + assert_equal(max_length, ctx.length) + trace, = ctx.get + assert_nil(trace) + + spans.each(&:finish) + + assert_equal(0, ctx.length, "context #{ctx}") + end end class ThreadLocalContextTest < Minitest::Test diff --git a/test/span_test.rb b/test/span_test.rb index c99a2aa0470..d9626957339 100644 --- a/test/span_test.rb +++ b/test/span_test.rb @@ -1,6 +1,5 @@ require 'helper' require 'ddtrace/span' - class SpanTest < Minitest::Test def test_span_finish tracer = nil From d58cba9d4240cdf7fd2ec7c416406ac88abb3377 Mon Sep 17 00:00:00 2001 From: David Elner Date: Fri, 30 Mar 2018 13:52:24 -0400 Subject: [PATCH 2/4] Changed: Context flushing to be inactive by default. Provide :partial_flush or configuration options to enable. --- lib/ddtrace/tracer.rb | 22 +++++++++++++++------- test/context_flush_test.rb | 17 ++++++++++++----- 2 files changed, 27 insertions(+), 12 deletions(-) diff --git a/lib/ddtrace/tracer.rb b/lib/ddtrace/tracer.rb index eab03865157..a571b2cd5b3 100644 --- a/lib/ddtrace/tracer.rb +++ b/lib/ddtrace/tracer.rb @@ -98,7 +98,7 @@ def initialize(options = {}) @provider = options.fetch(:context_provider, Datadog::DefaultContextProvider.new) @provider ||= Datadog::DefaultContextProvider.new # @provider should never be nil - @context_flush = Datadog::ContextFlush.new(options) + @context_flush = Datadog::ContextFlush.new(options) if options[:partial_flush] @mutex = Mutex.new @services = {} @@ -307,14 +307,22 @@ def record(context) context = context.context if context.is_a?(Datadog::Span) return if context.nil? trace, sampled = context.get - if sampled - if trace.nil? || trace.empty? - @context_flush.each_partial_trace(context) do |t| - write(t) + + # If context flushing is configured... + if @context_flush + if sampled + if trace.nil? || trace.empty? + @context_flush.each_partial_trace(context) do |t| + write(t) + end + else + write(trace) end - else - write(trace) end + # Default behavior + else + ready = !trace.nil? && !trace.empty? && sampled + write(trace) if ready end end diff --git a/test/context_flush_test.rb b/test/context_flush_test.rb index 947ac531796..f6ff9ae8015 100644 --- a/test/context_flush_test.rb +++ b/test/context_flush_test.rb @@ -342,14 +342,21 @@ def test_partial_caterpillar def test_tracer_configure tracer = get_test_tracer - old_context_flush = tracer.context_flush - tracer.configure() - assert_equal(old_context_flush, tracer.context_flush, 'the same context_flush should be reused') + # By default, context flush doesn't exist. + assert_nil(tracer.context_flush) + # If given a partial_flush option, then uses default context flush. + flush_tracer = Datadog::Tracer.new(writer: FauxWriter.new, partial_flush: true) + refute_nil(flush_tracer.context_flush) + + # If not configured with any flush options, context flush still doesn't exist. + tracer.configure + assert_nil(tracer.context_flush) + + # If configured with flush options, context flush gets set. tracer.configure(min_spans_before_partial_flush: 3, max_spans_before_partial_flush: 3) - - refute_equal(old_context_flush, tracer.context_flush, 'another context_flush should be have been created') + refute_nil(tracer.context_flush) end def test_tracer_hard_limit_overrides_soft_limit From d1c61e1394b854b715e2cfa5b47b0cfffd402119 Mon Sep 17 00:00:00 2001 From: David Elner Date: Fri, 30 Mar 2018 14:02:48 -0400 Subject: [PATCH 3/4] Added: `partial_flush` tracer option to documentation. --- docs/GettingStarted.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/GettingStarted.md b/docs/GettingStarted.md index a997c256f41..ebec956d338 100644 --- a/docs/GettingStarted.md +++ b/docs/GettingStarted.md @@ -798,6 +798,7 @@ Available options are: - ``env``: set the environment. Rails users may set it to ``Rails.env`` to use their application settings. - ``tags``: set global tags that should be applied to all spans. Defaults to an empty hash - ``log``: defines a custom logger. + - ``partial_flush``: set to ``true`` to enable partial trace flushing (for long running traces.) Disabled by default. *Experimental.* #### Custom logging From 603c99d5208ac62ac417c82fa9dd7ed50dc0a148 Mon Sep 17 00:00:00 2001 From: David Elner Date: Fri, 6 Apr 2018 12:13:14 -0400 Subject: [PATCH 4/4] Changed: Make context flush methods on context private. --- lib/ddtrace/context.rb | 61 +++++++++++++++++------------------- lib/ddtrace/context_flush.rb | 10 +++--- test/context_flush_test.rb | 10 +++--- test/context_test.rb | 38 +++++++++++----------- 4 files changed, 58 insertions(+), 61 deletions(-) diff --git a/lib/ddtrace/context.rb b/lib/ddtrace/context.rb index 8b852a6baf3..03778be3239 100644 --- a/lib/ddtrace/context.rb +++ b/lib/ddtrace/context.rb @@ -29,16 +29,6 @@ def initialize(options = {}) reset(options) end - def reset(options = {}) - @trace = [] - @parent_trace_id = options.fetch(:trace_id, nil) - @parent_span_id = options.fetch(:span_id, nil) - @sampled = options.fetch(:sampled, false) - @sampling_priority = options.fetch(:sampling_priority, nil) - @finished_spans = 0 - @current_span = nil - end - def trace_id @mutex.synchronize do @parent_trace_id @@ -73,17 +63,6 @@ def current_span end end - def set_current_span(span) - @current_span = span - if span - @parent_trace_id = span.trace_id - @parent_span_id = span.span_id - @sampled = span.sampled - else - @parent_span_id = nil - end - end - # Add a span to the context trace list, keeping it as the last active span. def add_span(span) @mutex.synchronize do @@ -124,12 +103,6 @@ def close_span(span) end end - # Returns if the trace for the current Context is finished or not. - # Low-level internal function, not thread-safe. - def check_finished_spans - @finished_spans > 0 && @trace.length == @finished_spans - end - # Returns if the trace for the current Context is finished or not. A \Context # is considered finished if all spans in this context are finished. def finished? @@ -175,6 +148,35 @@ def to_s end end + private + + def reset(options = {}) + @trace = [] + @parent_trace_id = options.fetch(:trace_id, nil) + @parent_span_id = options.fetch(:span_id, nil) + @sampled = options.fetch(:sampled, false) + @sampling_priority = options.fetch(:sampling_priority, nil) + @finished_spans = 0 + @current_span = nil + end + + def set_current_span(span) + @current_span = span + if span + @parent_trace_id = span.trace_id + @parent_span_id = span.span_id + @sampled = span.sampled + else + @parent_span_id = nil + end + end + + # Returns if the trace for the current Context is finished or not. + # Low-level internal function, not thread-safe. + def check_finished_spans + @finished_spans > 0 && @trace.length == @finished_spans + end + def attach_sampling_priority @trace.first.set_metric( Ext::DistributedTracing::SAMPLING_PRIORITY_KEY, @@ -225,11 +227,6 @@ def delete_span_if end end end - - private :reset - private :check_finished_spans - private :set_current_span - private :attach_sampling_priority end # ThreadLocalContext can be used as a tracer global reference to create diff --git a/lib/ddtrace/context_flush.rb b/lib/ddtrace/context_flush.rb index b44792d1e3d..b74d5e0d349 100644 --- a/lib/ddtrace/context_flush.rb +++ b/lib/ddtrace/context_flush.rb @@ -52,7 +52,7 @@ def partial_traces(context) # 1st step, taint all parents of an unfinished span as unflushable unflushable_ids = Set.new - context.each_span do |span| + context.send(:each_span) do |span| next if span.finished? || unflushable_ids.include?(span.span_id) unflushable_ids.add span.span_id while span.parent @@ -66,7 +66,7 @@ def partial_traces(context) # children but only for the ones we're interested it. roots = [] children_map = {} - context.each_span do |span| + context.send(:each_span) do |span| # There's no point in trying to put the real root in those partial roots, if # it's flushable, the default algorithm would figure way more quickly. if span.parent && !unflushable_ids.include?(span.span_id) @@ -102,14 +102,14 @@ def partial_flush(context) # We need to reject by span ID and not by value, because a span # value may be altered (typical example: it's finished by some other thread) # since we lock only the context, not all the spans which belong to it. - context.delete_span_if { |span| flushed_ids.include? span.span_id } + context.send(:delete_span_if) { |span| flushed_ids.include? span.span_id } traces end # Performs an operation which each partial trace it can get from the context. def each_partial_trace(context) - start_time = context.start_time - length = context.length + start_time = context.send(:start_time) + length = context.send(:length) # Stop and do not flush anything if there are not enough spans. return if length <= @min_spans_before_partial_flush # If there are enough spans, but not too many, check for start time. diff --git a/test/context_flush_test.rb b/test/context_flush_test.rb index f6ff9ae8015..c238ba0b411 100644 --- a/test/context_flush_test.rb +++ b/test/context_flush_test.rb @@ -65,7 +65,7 @@ def test_each_partial_trace_typical_not_enough_traces flunk("nothing should be partially flushed, got: #{t}") end - assert_equal(0, context.length, 'everything should be written by now') + assert_equal(0, context.send(:length), 'everything should be written by now') end def test_each_partial_trace_typical @@ -132,7 +132,7 @@ def test_each_partial_trace_typical action12.verify action3456.verify - assert_equal(0, context.length, 'everything should be written by now') + assert_equal(0, context.send(:length), 'everything should be written by now') end # rubocop:disable Metrics/MethodLength @@ -177,7 +177,7 @@ def test_each_partial_trace_mixed end end - assert_equal(8, context.length) + assert_equal(8, context.send(:length)) [root, child1, child3, child6].each do |span| span.finish @@ -253,7 +253,7 @@ def test_each_partial_trace_mixed flunk("nothing should be partially flushed, got: #{t}") end - assert_equal(0, context.length, 'everything should be written by now') + assert_equal(0, context.send(:length), 'everything should be written by now') end end @@ -322,7 +322,7 @@ def test_partial_caterpillar # - then 99 spans (10 from 1st batch, 89 from second batch) are put in context # - then the 101th comes (the 90th from the second batch) and triggers a flush of everything but root span # - then the last 10 spans from second batch are thrown in, so that's 10 left + the root span - assert_equal(1 + MIN_SPANS, tracer.call_context.length, 'some spans should have been sent') + assert_equal(1 + MIN_SPANS, tracer.call_context.send(:length), 'some spans should have been sent') assert_equal(MAX_SPANS, spans.length) spans.each do |span| write1.call_with_name(span.name) diff --git a/test/context_test.rb b/test/context_test.rb index 0d9aebcde7f..97a8f3bf2d3 100644 --- a/test/context_test.rb +++ b/test/context_test.rb @@ -278,30 +278,30 @@ def test_length tracer = get_test_tracer ctx = Datadog::Context.new - assert_equal(0, ctx.length) + assert_equal(0, ctx.send(:length)) 10.times do |i| span = Datadog::Span.new(tracer, "test.op#{i}") - assert_equal(i, ctx.length) + assert_equal(i, ctx.send(:length)) ctx.add_span(span) - assert_equal(i + 1, ctx.length) + assert_equal(i + 1, ctx.send(:length)) ctx.close_span(span) - assert_equal(i + 1, ctx.length) + assert_equal(i + 1, ctx.send(:length)) end ctx.get - assert_equal(0, ctx.length) + assert_equal(0, ctx.send(:length)) end def test_start_time tracer = get_test_tracer ctx = tracer.call_context - assert_nil(ctx.start_time) + assert_nil(ctx.send(:start_time)) tracer.trace('test.op') do |span| - assert_equal(span.start_time, ctx.start_time) + assert_equal(span.start_time, ctx.send(:start_time)) end - assert_nil(ctx.start_time) + assert_nil(ctx.send(:start_time)) end def test_each_span @@ -311,7 +311,7 @@ def test_each_span action = MiniTest::Mock.new action.expect(:call_with_name, nil, ['test.op']) - ctx.each_span do |s| + ctx.send(:each_span) do |s| action.call_with_name(s.name) end action.verify @@ -325,20 +325,20 @@ def test_delete_span_if action.expect(:call_with_name, nil, ['test.op2']) tracer.trace('test.op1') do tracer.trace('test.op2') do - assert_equal(2, ctx.length) - ctx.delete_span_if { |span| span.name == 'test.op1' } - assert_equal(1, ctx.length) - ctx.each_span do |s| + assert_equal(2, ctx.send(:length)) + ctx.send(:delete_span_if) { |span| span.name == 'test.op1' } + assert_equal(1, ctx.send(:length)) + ctx.send(:each_span) do |s| action.call_with_name(s.name) end assert_equal(false, ctx.finished?, 'context is not finished as op2 is not finished') tracer.trace('test.op3') do end - assert_equal(2, ctx.length) - ctx.delete_span_if { |span| span.name == 'test.op3' } - assert_equal(1, ctx.length) + assert_equal(2, ctx.send(:length)) + ctx.send(:delete_span_if) { |span| span.name == 'test.op3' } + assert_equal(1, ctx.send(:length)) end - assert_equal(0, ctx.length, 'op2 has been finished, so context has been finished too') + assert_equal(0, ctx.send(:length), 'op2 has been finished, so context has been finished too') end action.verify end @@ -359,13 +359,13 @@ def test_max_length spans << span end - assert_equal(max_length, ctx.length) + assert_equal(max_length, ctx.send(:length)) trace, = ctx.get assert_nil(trace) spans.each(&:finish) - assert_equal(0, ctx.length, "context #{ctx}") + assert_equal(0, ctx.send(:length), "context #{ctx}") end end