diff --git a/lib/ddtrace/context.rb b/lib/ddtrace/context.rb index b2f01728c57..d82d6998c6b 100644 --- a/lib/ddtrace/context.rb +++ b/lib/ddtrace/context.rb @@ -13,29 +13,18 @@ module Datadog # \Context, it will be related to the original trace. # # This data structure is thread-safe. - # rubocop:disable Metrics/ClassLength class Context - # DEFAULT_MAX_SPANS_PER_TRACE_SOFT is the amount of spans collected before - # the context starts to partially flush parts of traces. With a setting of 10k, - # the memory overhead is about 10Mb per thread/context (depends on spans metadata, - # this is just an order of magnitude). - DEFAULT_MAX_SPANS_PER_TRACE_SOFT = 10_000 - # DEFAULT_MAX_SPANS_PER_TRACE_HARD is the amount of spans above which, for a - # given trace, the context will simply drop and ignore spans, avoiding - # a high memory usage. - DEFAULT_MAX_SPANS_PER_TRACE_HARD = 100_000 - # DEFAULT_MIN_SPANS_FOR_FLUSH_TIMEOUT is the minimum number of spans required - # for a partial flush to happen on a timeout. This is to prevent partial flush - # of traces which last a very long time but yet have few spans. - DEFAULT_MIN_SPANS_FOR_FLUSH_TIMEOUT = 100 - # DEFAULT_PARTIAL_FLUSH_TIMEOUT is the limit (in seconds) above which the context - # considers flushing parts of the trace. Partial flushes should not be done too - # late else the agent rejects them with a "too far in the past" error. - DEFAULT_PARTIAL_FLUSH_TIMEOUT = 10 + # 100k spans is about a 100Mb footprint + DEFAULT_MAX_SPANS = 100_000 # Initialize a new thread-safe \Context. def initialize(options = {}) @mutex = Mutex.new + # max_spans is the amount of spans above which, for a + # given trace, the context will simply drop and ignore spans, avoiding + # a high memory usage. + @max_spans = options.fetch(:max_spans, + DEFAULT_MAX_SPANS) reset end @@ -101,7 +90,7 @@ def add_span(span) # as it means despite the soft limit, the hard limit is reached, so the trace # by default has 10000 spans, all of which belong to unfinished parts of a # larger trace. This is a catch-all to reduce global memory usage. - if @max_spans_per_trace_hard > 0 && @trace.length >= (@max_spans_per_trace_hard - 1) + if @max_spans > 0 && @trace.length >= (@max_spans - 1) Datadog::Tracer.log.debug("context full, ignoring span #{span.name}") # This span is going to be finished at some point, but will never increase # the trace size, so we acknowledge this fact, to avoid to send it to early. @@ -111,11 +100,6 @@ def add_span(span) set_current_span(span) @trace << span span.context = self - # If hitting the soft limit, start flushing intermediate data. - if @max_spans_per_trace_soft > 0 && @trace.length >= @max_spans_per_trace_soft - Datadog::Tracer.log.debug("context full, span #{span.name} triggers partial flush") - partial_flush() - end end end @@ -162,66 +146,6 @@ def sampled? end end - # Returns ids of all spans which can be considered as local, partial roots - # from a partial flush perspective. Also returns the span IDs which have - # been marked as non flushable, and which should be kept. - def partial_roots - return nil unless @current_span - - marked_ids = ([@current_span.span_id] + @current_span.parent_ids).to_set - roots = [] - @trace.each do |span| - # Skip if span is one of the parents of the current span. - next if marked_ids.include? span.span_id - # Skip if the span is not one of the parents of the current span, - # and its parent is not either. It means it just can't be a local, partial root. - next unless marked_ids.include? span.parent_id - - roots << span.span_id - end - [roots, marked_ids] - end - - # Return a hash containting all sub traces which are candidates for - # a partial flush. - def partial_roots_spans - roots, marked_ids = partial_roots() - return nil unless roots - - roots_spans = Hash[roots.map { |id| [id, []] }] - unfinished = {} - @trace.each do |span| - ids = [span.span_id] + span.parent_ids() - ids.reject! { |id| marked_ids.include? id } - ids.each do |id| - if roots_spans.key?(id) - unfinished[id] = true unless span.finished? - roots_spans[id] << span - end - end - end - # Do not flush unfinished traces. - roots_spans.reject! { |id| unfinished.key? id } - return nil if roots_spans.empty? - roots_spans - end - - def partial_flush - roots_spans = partial_roots_spans() - return nil unless roots_spans - - flushed_ids = {} - roots_spans.each_value do |spans| - next if spans.empty? - spans.each { |span| flushed_ids[span.span_id] = true } - @partial_traces << spans - end - # We need to reject by span ID and not by value, because a span - # value may be altered (typical example: it's finished by some other thread) - # since we lock only the context, not all the spans which belong to it. - @trace.reject! { |span| flushed_ids.key? span.span_id } - end - # Returns both the trace list generated in the current context and # if the context is sampled or not. It returns nil, nil if the ``Context`` is # not finished. If a trace is returned, the \Context will be reset so that it @@ -233,23 +157,6 @@ def get trace = @trace sampled = @sampled - # There's a need to flush partial parts of traces when they are getting old: - # not doing this, partial bits could be flushed alone later, and trigger - # a "too far in the past" error on the agent. - # By doing this, we send partial information on the server and take the risk - # to split a trace which could have been totally in-memory. - # OTOH the backend will collect these and put them together. - # Traces which do not have enough spans will not be touched - # to avoid slicing small things too often. - unless trace.length <= @min_spans_for_flush_timeout || - trace[0].start_time.nil? || - trace[0].start_time > Time.now.utc - @partial_flush_timeout - partial_flush() - end - - partial_trace = @partial_traces.shift - return partial_trace, sampled if partial_trace - return nil, nil unless check_finished_spans() reset @@ -265,6 +172,45 @@ def to_s end end + # Return the start time of the root span, or nil if there are no spans or this is undefined. + def start_time + @mutex.synchronize do + return nil if @trace.empty? + @trace[0].start_time + end + end + + # Return the length of the current trace held by this context. + def length + @mutex.synchronize do + @trace.length + end + end + + # Iterate on each span within the trace. This is thread safe. + def each_span + @mutex.synchronize do + @trace.each do |span| + yield span + end + end + end + + # Delete any span matching the condition. This is thread safe. + def delete_span_if + @mutex.synchronize do + @trace.delete_if do |span| + finished = span.finished? + delete_span = yield span + if delete_span + span.context = nil + @finished_spans -= 1 if finished + end + delete_span + end + end + end + private :reset private :check_finished_spans private :set_current_span diff --git a/lib/ddtrace/context_flush.rb b/lib/ddtrace/context_flush.rb new file mode 100644 index 00000000000..3bde47b5f52 --- /dev/null +++ b/lib/ddtrace/context_flush.rb @@ -0,0 +1,124 @@ +require 'set' + +require 'ddtrace/context' + +module Datadog + # \ContextFlush is used to cap context size and avoid it using too much memory. + # It performs memory flushes when required. + class ContextFlush + # by default, soft and hard limits are the same + DEFAULT_MAX_SPANS_BEFORE_PARTIAL_FLUSH = Datadog::Context::DEFAULT_MAX_SPANS + # by default, never do a partial flush + DEFAULT_MIN_SPANS_BEFORE_PARTIAL_FLUSH = Datadog::Context::DEFAULT_MAX_SPANS + # timeout should be lower than the trace agent window + DEFAULT_PARTIAL_FLUSH_TIMEOUT = 10 + + private_constant :DEFAULT_MAX_SPANS_BEFORE_PARTIAL_FLUSH + private_constant :DEFAULT_MIN_SPANS_BEFORE_PARTIAL_FLUSH + private_constant :DEFAULT_PARTIAL_FLUSH_TIMEOUT + + def initialize(options = {}) + # max_spans_before_partial_flush is the amount of spans collected before + # the context starts to partially flush parts of traces. With a setting of 10k, + # the memory overhead is about 10Mb per thread/context (depends on spans metadata, + # this is just an order of magnitude). + @max_spans_before_partial_flush = options.fetch(:max_spans_before_partial_flush, + DEFAULT_MAX_SPANS_BEFORE_PARTIAL_FLUSH) + # min_spans_before_partial_flush is the minimum number of spans required + # for a partial flush to happen on a timeout. This is to prevent partial flush + # of traces which last a very long time but yet have few spans. + @min_spans_before_partial_flush = options.fetch(:min_spans_before_partial_flush, + DEFAULT_MIN_SPANS_BEFORE_PARTIAL_FLUSH) + # partial_flush_timeout is the limit (in seconds) above which the context + # considers flushing parts of the trace. Partial flushes should not be done too + # late else the agent rejects them with a "too far in the past" error. + @partial_flush_timeout = options.fetch(:partial_flush_timeout, + DEFAULT_PARTIAL_FLUSH_TIMEOUT) + @partial_traces = [] + end + + # Returns ids of all spans which can be considered as local, partial roots + # from a partial flush perspective. Also returns the span IDs which have + # been marked as non flushable, and which should be kept. + def partial_roots(context) + # Here it's not totally atomic since current_span could change after it's queried. + # Worse case: it's held back and not flushed, but it's safe since in that case + # partial flushing is only "not efficient enought" but never flushes non legit spans. + current_span = context.current_span + return nil unless current_span + + marked_ids = ([current_span.span_id] + current_span.parent_ids).to_set + roots = [] + context.each_span do |span| + # Skip if span is one of the parents of the current span. + next if marked_ids.include? span.span_id + # Skip if the span is not one of the parents of the current span, + # and its parent is not either. It means it just can't be a local, partial root. + next unless marked_ids.include? span.parent_id + + roots << span.span_id + end + [roots, marked_ids] + end + + # Return a hash containting all sub traces which are candidates for + # a partial flush. + def partial_roots_spans(context) + roots, marked_ids = partial_roots(context) + return nil unless roots + + roots_spans = Hash[roots.map { |id| [id, []] }] + unfinished = Set.new + context.each_span do |span| + ids = [span.span_id] + span.parent_ids() + ids.delete_if { |id| marked_ids.include? id } + ids.each do |id| + if roots_spans.include?(id) + unfinished[id] = true unless span.finished? + roots_spans[id] << span + end + end + end + # Do not flush unfinished traces. + roots_spans.delete_if { |id| unfinished.include? id } + return nil if roots_spans.empty? + roots_spans + end + + def partial_flush(context) + roots_spans = partial_roots_spans(context) + return nil unless roots_spans + + traces = [] + flushed_ids = {} + roots_spans.each_value do |spans| + next if spans.empty? + spans.each { |span| flushed_ids[span.span_id] = true } + traces << spans + end + # We need to reject by span ID and not by value, because a span + # value may be altered (typical example: it's finished by some other thread) + # since we lock only the context, not all the spans which belong to it. + context.delete_span_if { |span| flushed_ids.include? span.span_id } + traces + end + + # Performs an operation which each partial trace it can get from the context. + def each_partial_trace(context) + start_time = context.start_time + length = context.length + return if length < @min_spans_before_partial_flush + return if length < @max_spans_before_partial_flush && + start_time && start_time > Time.now.utc - @partial_flush_timeout + traces = partial_flush(context) + return unless traces + traces.each do |trace| + yield trace + end + end + + private :partial_roots + private :partial_roots_spans + private :partial_flush + end +end diff --git a/lib/ddtrace/tracer.rb b/lib/ddtrace/tracer.rb index eb27e4d5487..7360d9825c4 100644 --- a/lib/ddtrace/tracer.rb +++ b/lib/ddtrace/tracer.rb @@ -5,6 +5,7 @@ require 'ddtrace/span' require 'ddtrace/context' +require 'ddtrace/context_flush' require 'ddtrace/provider' require 'ddtrace/logger' require 'ddtrace/writer' @@ -97,6 +98,8 @@ def initialize(options = {}) @provider = options.fetch(:context_provider, Datadog::DefaultContextProvider.new) @provider ||= Datadog::DefaultContextProvider.new # @provider should never be nil + @context_flush = Datadog::ContextFlush.new(options) + @mutex = Mutex.new @services = {} @tags = {} @@ -288,11 +291,15 @@ def trace(name, options = {}) def record(context) context = context.context if context.is_a?(Datadog::Span) return if context.nil? - loop do - trace, sampled = context.get - break unless trace - ready = !trace.empty? && sampled - write(trace) if ready + trace, sampled = context.get + if sampled + if trace.empty? + @context_flush.each_partial_trace do |t| + write(t) + end + else + write(trace) + end end end diff --git a/test/context_flush_test.rb b/test/context_flush_test.rb new file mode 100644 index 00000000000..b877f2be80a --- /dev/null +++ b/test/context_flush_test.rb @@ -0,0 +1,59 @@ +require 'helper' +require 'ddtrace/tracer' +require 'ddtrace/context_flush' + +module Datadog + class ContextFlush + attr_accessor :max_spans_per_trace_soft + attr_accessor :max_spans_per_trace_hard + attr_accessor :min_spans_for_flush_timeout + attr_accessor :partial_flush_timeout + + public :partial_roots + public :partial_roots_spans + public :partial_flush + end +end + +class ContextFlushTest < Minitest::Test + def test_partial_roots_typical + tracer = get_test_tracer + context_flush = Datadog::ContextFlush.new + context = tracer.call_context + + root_id = nil + child1_id = nil + child2_id = nil + child3_id = nil + tracer.trace('root') do |root| + root_id = root.span_id + tracer.trace('child1') do |child1| + child1_id = child1.span_id + tracer.trace('child2') do |child2| + child2_id = child2.span_id + end + end + tracer.trace('child3') do |child3| + child3_id = child3.span_id + + partial_roots, marked_ids = context_flush.partial_roots(context) + assert_equal([child1_id], partial_roots) + assert_equal([root_id, child3_id].to_set, marked_ids) + partial_roots_spans = context_flush.partial_roots_spans(context) + assert_includes(partial_roots_spans, child1_id) + end + end + end + + def test_partial_roots_empty + tracer = get_test_tracer + context_flush = Datadog::ContextFlush.new + context = tracer.call_context + + partial_roots, marked_ids = context_flush.partial_roots(context) + assert_nil(partial_roots) + assert_nil(marked_ids) + partial_roots_spans = context_flush.partial_roots_spans(context) + assert_nil(partial_roots_spans) + end +end diff --git a/test/context_test.rb b/test/context_test.rb index 8e0e47e8889..68fa21beb64 100644 --- a/test/context_test.rb +++ b/test/context_test.rb @@ -1,17 +1,6 @@ require 'helper' require 'ddtrace/tracer' -module Datadog - class Context - attr_accessor :max_spans_per_trace_soft - attr_accessor :max_spans_per_trace_hard - attr_accessor :partial_flush_timeout - - public :partial_roots - public :partial_roots_spans - end -end - # rubocop:disable Metrics/ClassLength class ContextTest < Minitest::Test def test_nil_tracer @@ -279,138 +268,73 @@ def test_thread_safe assert_equal(false, ctx.sampled) end - def test_partial_roots_typical + def test_length tracer = get_test_tracer - root_id = nil - child1_id = nil - child2_id = nil - child3_id = nil - tracer.trace('root') do |root| - root_id = root.span_id - tracer.trace('child1') do |child1| - child1_id = child1.span_id - tracer.trace('child2') do |child2| - child2_id = child2.span_id - end - end - tracer.trace('child3') do |child3| - child3_id = child3.span_id - - partial_roots, marked_ids = tracer.call_context.partial_roots - assert_equal([child1_id], partial_roots) - assert_equal([root_id, child3_id].to_set, marked_ids) - partial_roots_spans = tracer.call_context.partial_roots_spans - assert_includes(partial_roots_spans, child1_id) - end + ctx = Datadog::Context.new + + assert_equal(0, ctx.length) + 10.times do |i| + span = Datadog::Span.new(tracer, "test.op#{i}") + assert_equal(i, ctx.length) + ctx.add_span(span) + assert_equal(i + 1, ctx.length) + ctx.close_span(span) + assert_equal(i + 1, ctx.length) end + + ctx.get + + assert_equal(0, ctx.length) end - def test_partial_roots_empty + def test_start_time tracer = get_test_tracer - partial_roots, marked_ids = tracer.call_context.partial_roots - assert_nil(partial_roots) - assert_nil(marked_ids) - partial_roots_spans = tracer.call_context.partial_roots_spans - assert_nil(partial_roots_spans) + ctx = tracer.call_context + + assert_nil(ctx.start_time) + tracer.trace('test.op') do |span| + assert_equal(span.start_time, ctx.start_time) + end + assert_nil(ctx.start_time) end - # rubocop:disable Metrics/AbcSize - def test_partial_flush_soft - tracer = get_test_tracer - root_id = nil - child3_id = nil - n = 10 - roots_ids = [] - spans_ids = [] - tracer.trace('root') do |root| - ctx = tracer.call_context - ctx.get - - ctx.max_spans_per_trace_soft = n + 2 # +1 for root span, +1 for child3 - ctx.max_spans_per_trace_hard = ctx.max_spans_per_trace_soft + 1 # make sure hard is higher than soft - ctx.partial_flush_timeout = 3600 # disable timeout flushes - - root_id = root.span_id - (n / 2).times do - tracer.trace('child') do |child| - child1_id = child.span_id - tracer.trace('child2') do |child2| - child2_id = child2.span_id - roots_ids << child1_id - spans_ids << { child1: child1_id, chilld2: child2_id } - end - end - end - tracer.trace('child3') do |child3| - child3_id = child3.span_id - - partial_roots, marked_ids = ctx.partial_roots - assert_equal([], partial_roots) - assert_equal([root_id, child3_id].to_set, marked_ids) - partial_roots_spans = ctx.partial_roots_spans - assert_nil(partial_roots_spans) - - (n / 2).times do - trace, sampled = ctx.get - assert_equal(2, trace.length) - assert_equal(true, sampled) - assert_includes(roots_ids, trace[0].span_id) - assert_equal(root_id, trace[0].parent_id) - assert_equal(trace[0].span_id, trace[1].parent_id) - assert_equal(root.trace_id, trace[0].trace_id) - assert_equal(root.trace_id, trace[1].trace_id) - end - trace, sampled = ctx.get - assert_nil(trace) - assert_nil(sampled) - end + def test_each_span + span = Datadog::Span.new(nil, 'test.op') + ctx = Datadog::Context.new + ctx.add_span(span) + + action = MiniTest::Mock.new + action.expect(:nop, nil, ['test.op']) + ctx.each_span do |s| + action.nop(s.name) end + action.verify end - def test_partial_flush_hard + def test_delete_span_if tracer = get_test_tracer - root_id = nil - n = 10 - roots_ids = [] - spans_ids = [] - ctx = nil - ctx = tracer.call_context - ctx.get - - root = Datadog::Span.new(tracer, 'root') - ctx.add_span(root) - ctx.max_spans_per_trace_soft = n - ctx.max_spans_per_trace_hard = ctx.max_spans_per_trace_soft # hard limit shadows soft limit - ctx.partial_flush_timeout = 3600 # disable timeout flushes - - root_id = root.span_id - (n / 2).times do - tracer.trace('child') do |child| - child1_id = child.span_id - tracer.trace('child2') do |child2| - child2_id = child2.span_id - roots_ids << child1_id - spans_ids << { child1: child1_id, chilld2: child2_id } + action = MiniTest::Mock.new + action.expect(:nop, nil, ['test.op2']) + tracer.trace('test.op1') do + tracer.trace('test.op2') do + assert_equal(2, ctx.length) + ctx.delete_span_if { |span| span.name == 'test.op1' } + assert_equal(1, ctx.length) + ctx.each_span do |s| + action.nop(s.name) end + assert_equal(false, ctx.finished?, 'context is not finished as op2 is not finished') + tracer.trace('test.op3') do + end + assert_equal(2, ctx.length) + ctx.delete_span_if { |span| span.name == 'test.op3' } + assert_equal(1, ctx.length) end + assert_equal(0, ctx.length, 'op2 has been finished, so context has been finished too') end - tracer.trace('child3') do - # child3 should be dropped and never show up. - # Even more, the latest span of the 'child' serie should not show up, - # because starting at n-1, we should drop everything. - - partial_roots, marked_ids = ctx.partial_roots - assert_equal(roots_ids[0...(n / 2 - 1)], partial_roots, 'all children but one appear in partial roots') - assert_equal([root_id].to_set, marked_ids, 'only root is marked, everything else is flushable') - partial_roots_spans = ctx.partial_roots_spans - assert_equal((n / 2 - 1), partial_roots_spans.length) - end - ctx.close_span(root) - trace, sampled = ctx.get - assert_equal(n - 1, trace.length, 'trace should be completely sent, and its size n-1') - assert_equal(true, sampled) + action.verify end end