Skip to content

Commit

Permalink
[tracer] rewrote partial flush code using separate class
Browse files Browse the repository at this point in the history
  • Loading branch information
ufoot committed Nov 15, 2017
1 parent ef1e55e commit 4a1812a
Show file tree
Hide file tree
Showing 5 changed files with 293 additions and 233 deletions.
148 changes: 47 additions & 101 deletions lib/ddtrace/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,18 @@ module Datadog
# \Context, it will be related to the original trace.
#
# This data structure is thread-safe.
# rubocop:disable Metrics/ClassLength
class Context
# DEFAULT_MAX_SPANS_PER_TRACE_SOFT is the amount of spans collected before
# the context starts to partially flush parts of traces. With a setting of 10k,
# the memory overhead is about 10Mb per thread/context (depends on spans metadata,
# this is just an order of magnitude).
DEFAULT_MAX_SPANS_PER_TRACE_SOFT = 10_000
# DEFAULT_MAX_SPANS_PER_TRACE_HARD is the amount of spans above which, for a
# given trace, the context will simply drop and ignore spans, avoiding
# a high memory usage.
DEFAULT_MAX_SPANS_PER_TRACE_HARD = 100_000
# DEFAULT_MIN_SPANS_FOR_FLUSH_TIMEOUT is the minimum number of spans required
# for a partial flush to happen on a timeout. This is to prevent partial flush
# of traces which last a very long time but yet have few spans.
DEFAULT_MIN_SPANS_FOR_FLUSH_TIMEOUT = 100
# DEFAULT_PARTIAL_FLUSH_TIMEOUT is the limit (in seconds) above which the context
# considers flushing parts of the trace. Partial flushes should not be done too
# late else the agent rejects them with a "too far in the past" error.
DEFAULT_PARTIAL_FLUSH_TIMEOUT = 10
# 100k spans is about a 100Mb footprint
DEFAULT_MAX_SPANS = 100_000

# Initialize a new thread-safe \Context.
def initialize(options = {})
@mutex = Mutex.new
# max_spans is the amount of spans above which, for a
# given trace, the context will simply drop and ignore spans, avoiding
# a high memory usage.
@max_spans = options.fetch(:max_spans,
DEFAULT_MAX_SPANS)
reset
end

Expand Down Expand Up @@ -101,7 +90,7 @@ def add_span(span)
# as it means despite the soft limit, the hard limit is reached, so the trace
# by default has 10000 spans, all of which belong to unfinished parts of a
# larger trace. This is a catch-all to reduce global memory usage.
if @max_spans_per_trace_hard > 0 && @trace.length >= (@max_spans_per_trace_hard - 1)
if @max_spans > 0 && @trace.length >= (@max_spans - 1)
Datadog::Tracer.log.debug("context full, ignoring span #{span.name}")
# This span is going to be finished at some point, but will never increase
# the trace size, so we acknowledge this fact, to avoid to send it to early.
Expand All @@ -111,11 +100,6 @@ def add_span(span)
set_current_span(span)
@trace << span
span.context = self
# If hitting the soft limit, start flushing intermediate data.
if @max_spans_per_trace_soft > 0 && @trace.length >= @max_spans_per_trace_soft
Datadog::Tracer.log.debug("context full, span #{span.name} triggers partial flush")
partial_flush()
end
end
end

Expand Down Expand Up @@ -162,66 +146,6 @@ def sampled?
end
end

# Returns ids of all spans which can be considered as local, partial roots
# from a partial flush perspective. Also returns the span IDs which have
# been marked as non flushable, and which should be kept.
def partial_roots
return nil unless @current_span

marked_ids = ([@current_span.span_id] + @current_span.parent_ids).to_set
roots = []
@trace.each do |span|
# Skip if span is one of the parents of the current span.
next if marked_ids.include? span.span_id
# Skip if the span is not one of the parents of the current span,
# and its parent is not either. It means it just can't be a local, partial root.
next unless marked_ids.include? span.parent_id

roots << span.span_id
end
[roots, marked_ids]
end

# Return a hash containting all sub traces which are candidates for
# a partial flush.
def partial_roots_spans
roots, marked_ids = partial_roots()
return nil unless roots

roots_spans = Hash[roots.map { |id| [id, []] }]
unfinished = {}
@trace.each do |span|
ids = [span.span_id] + span.parent_ids()
ids.reject! { |id| marked_ids.include? id }
ids.each do |id|
if roots_spans.key?(id)
unfinished[id] = true unless span.finished?
roots_spans[id] << span
end
end
end
# Do not flush unfinished traces.
roots_spans.reject! { |id| unfinished.key? id }
return nil if roots_spans.empty?
roots_spans
end

def partial_flush
roots_spans = partial_roots_spans()
return nil unless roots_spans

flushed_ids = {}
roots_spans.each_value do |spans|
next if spans.empty?
spans.each { |span| flushed_ids[span.span_id] = true }
@partial_traces << spans
end
# We need to reject by span ID and not by value, because a span
# value may be altered (typical example: it's finished by some other thread)
# since we lock only the context, not all the spans which belong to it.
@trace.reject! { |span| flushed_ids.key? span.span_id }
end

# Returns both the trace list generated in the current context and
# if the context is sampled or not. It returns nil, nil if the ``Context`` is
# not finished. If a trace is returned, the \Context will be reset so that it
Expand All @@ -233,23 +157,6 @@ def get
trace = @trace
sampled = @sampled

# There's a need to flush partial parts of traces when they are getting old:
# not doing this, partial bits could be flushed alone later, and trigger
# a "too far in the past" error on the agent.
# By doing this, we send partial information on the server and take the risk
# to split a trace which could have been totally in-memory.
# OTOH the backend will collect these and put them together.
# Traces which do not have enough spans will not be touched
# to avoid slicing small things too often.
unless trace.length <= @min_spans_for_flush_timeout ||
trace[0].start_time.nil? ||
trace[0].start_time > Time.now.utc - @partial_flush_timeout
partial_flush()
end

partial_trace = @partial_traces.shift
return partial_trace, sampled if partial_trace

return nil, nil unless check_finished_spans()

reset
Expand All @@ -265,6 +172,45 @@ def to_s
end
end

# Return the start time of the root span, or nil if there are no spans or this is undefined.
def start_time
@mutex.synchronize do
return nil if @trace.empty?
@trace[0].start_time
end
end

# Return the length of the current trace held by this context.
def length
@mutex.synchronize do
@trace.length
end
end

# Iterate on each span within the trace. This is thread safe.
def each_span
@mutex.synchronize do
@trace.each do |span|
yield span
end
end
end

# Delete any span matching the condition. This is thread safe.
def delete_span_if
@mutex.synchronize do
@trace.delete_if do |span|
finished = span.finished?
delete_span = yield span
if delete_span
span.context = nil
@finished_spans -= 1 if finished
end
delete_span
end
end
end

private :reset
private :check_finished_spans
private :set_current_span
Expand Down
124 changes: 124 additions & 0 deletions lib/ddtrace/context_flush.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
require 'set'

require 'ddtrace/context'

module Datadog
# \ContextFlush is used to cap context size and avoid it using too much memory.
# It performs memory flushes when required.
class ContextFlush
# by default, soft and hard limits are the same
DEFAULT_MAX_SPANS_BEFORE_PARTIAL_FLUSH = Datadog::Context::DEFAULT_MAX_SPANS
# by default, never do a partial flush
DEFAULT_MIN_SPANS_BEFORE_PARTIAL_FLUSH = Datadog::Context::DEFAULT_MAX_SPANS
# timeout should be lower than the trace agent window
DEFAULT_PARTIAL_FLUSH_TIMEOUT = 10

private_constant :DEFAULT_MAX_SPANS_BEFORE_PARTIAL_FLUSH
private_constant :DEFAULT_MIN_SPANS_BEFORE_PARTIAL_FLUSH
private_constant :DEFAULT_PARTIAL_FLUSH_TIMEOUT

def initialize(options = {})
# max_spans_before_partial_flush is the amount of spans collected before
# the context starts to partially flush parts of traces. With a setting of 10k,
# the memory overhead is about 10Mb per thread/context (depends on spans metadata,
# this is just an order of magnitude).
@max_spans_before_partial_flush = options.fetch(:max_spans_before_partial_flush,
DEFAULT_MAX_SPANS_BEFORE_PARTIAL_FLUSH)
# min_spans_before_partial_flush is the minimum number of spans required
# for a partial flush to happen on a timeout. This is to prevent partial flush
# of traces which last a very long time but yet have few spans.
@min_spans_before_partial_flush = options.fetch(:min_spans_before_partial_flush,
DEFAULT_MIN_SPANS_BEFORE_PARTIAL_FLUSH)
# partial_flush_timeout is the limit (in seconds) above which the context
# considers flushing parts of the trace. Partial flushes should not be done too
# late else the agent rejects them with a "too far in the past" error.
@partial_flush_timeout = options.fetch(:partial_flush_timeout,
DEFAULT_PARTIAL_FLUSH_TIMEOUT)
@partial_traces = []
end

# Returns ids of all spans which can be considered as local, partial roots
# from a partial flush perspective. Also returns the span IDs which have
# been marked as non flushable, and which should be kept.
def partial_roots(context)
# Here it's not totally atomic since current_span could change after it's queried.
# Worse case: it's held back and not flushed, but it's safe since in that case
# partial flushing is only "not efficient enought" but never flushes non legit spans.
current_span = context.current_span
return nil unless current_span

marked_ids = ([current_span.span_id] + current_span.parent_ids).to_set
roots = []
context.each_span do |span|
# Skip if span is one of the parents of the current span.
next if marked_ids.include? span.span_id
# Skip if the span is not one of the parents of the current span,
# and its parent is not either. It means it just can't be a local, partial root.
next unless marked_ids.include? span.parent_id

roots << span.span_id
end
[roots, marked_ids]
end

# Return a hash containting all sub traces which are candidates for
# a partial flush.
def partial_roots_spans(context)
roots, marked_ids = partial_roots(context)
return nil unless roots

roots_spans = Hash[roots.map { |id| [id, []] }]
unfinished = Set.new
context.each_span do |span|
ids = [span.span_id] + span.parent_ids()
ids.delete_if { |id| marked_ids.include? id }
ids.each do |id|
if roots_spans.include?(id)
unfinished[id] = true unless span.finished?
roots_spans[id] << span
end
end
end
# Do not flush unfinished traces.
roots_spans.delete_if { |id| unfinished.include? id }
return nil if roots_spans.empty?
roots_spans
end

def partial_flush(context)
roots_spans = partial_roots_spans(context)
return nil unless roots_spans

traces = []
flushed_ids = {}
roots_spans.each_value do |spans|
next if spans.empty?
spans.each { |span| flushed_ids[span.span_id] = true }
traces << spans
end
# We need to reject by span ID and not by value, because a span
# value may be altered (typical example: it's finished by some other thread)
# since we lock only the context, not all the spans which belong to it.
context.delete_span_if { |span| flushed_ids.include? span.span_id }
traces
end

# Performs an operation which each partial trace it can get from the context.
def each_partial_trace(context)
start_time = context.start_time
length = context.length
return if length < @min_spans_before_partial_flush
return if length < @max_spans_before_partial_flush &&
start_time && start_time > Time.now.utc - @partial_flush_timeout
traces = partial_flush(context)
return unless traces
traces.each do |trace|
yield trace
end
end

private :partial_roots
private :partial_roots_spans
private :partial_flush
end
end
17 changes: 12 additions & 5 deletions lib/ddtrace/tracer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

require 'ddtrace/span'
require 'ddtrace/context'
require 'ddtrace/context_flush'
require 'ddtrace/provider'
require 'ddtrace/logger'
require 'ddtrace/writer'
Expand Down Expand Up @@ -97,6 +98,8 @@ def initialize(options = {})
@provider = options.fetch(:context_provider, Datadog::DefaultContextProvider.new)
@provider ||= Datadog::DefaultContextProvider.new # @provider should never be nil

@context_flush = Datadog::ContextFlush.new(options)

@mutex = Mutex.new
@services = {}
@tags = {}
Expand Down Expand Up @@ -288,11 +291,15 @@ def trace(name, options = {})
def record(context)
context = context.context if context.is_a?(Datadog::Span)
return if context.nil?
loop do
trace, sampled = context.get
break unless trace
ready = !trace.empty? && sampled
write(trace) if ready
trace, sampled = context.get
if sampled
if trace.empty?
@context_flush.each_partial_trace do |t|
write(t)
end
else
write(trace)
end
end
end

Expand Down
Loading

0 comments on commit 4a1812a

Please sign in to comment.