Skip to content

Commit

Permalink
Merge pull request #247 from DataDog/christian/limittracesize
Browse files Browse the repository at this point in the history
[tracer] reduce memory usage on high-cardinality traces
  • Loading branch information
delner authored Apr 6, 2018
2 parents 9dfc349 + 603c99d commit b3a85cd
Show file tree
Hide file tree
Showing 7 changed files with 738 additions and 38 deletions.
1 change: 1 addition & 0 deletions docs/GettingStarted.md
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,7 @@ Available options are:
- ``env``: set the environment. Rails users may set it to ``Rails.env`` to use their application settings.
- ``tags``: set global tags that should be applied to all spans. Defaults to an empty hash
- ``log``: defines a custom logger.
- ``partial_flush``: set to ``true`` to enable partial trace flushing (for long running traces.) Disabled by default. *Experimental.*

#### Custom logging

Expand Down
130 changes: 96 additions & 34 deletions lib/ddtrace/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,22 @@ module Datadog
# \Context, it will be related to the original trace.
#
# This data structure is thread-safe.
# rubocop:disable Metrics/ClassLength
class Context
# 100k spans is about a 100Mb footprint
DEFAULT_MAX_LENGTH = 100_000

attr_reader :max_length

# Initialize a new thread-safe \Context.
def initialize(options = {})
@mutex = Mutex.new
# max_length is the amount of spans above which, for a given trace,
# the context will simply drop and ignore spans, avoiding high memory usage.
@max_length = options.fetch(:max_length, DEFAULT_MAX_LENGTH)
reset(options)
end

def reset(options = {})
@trace = []
@parent_trace_id = options.fetch(:trace_id, nil)
@parent_span_id = options.fetch(:span_id, nil)
@sampled = options.fetch(:sampled, false)
@sampling_priority = options.fetch(:sampling_priority, nil)
@finished_spans = 0
@current_span = nil
end

def trace_id
@mutex.synchronize do
@parent_trace_id
Expand Down Expand Up @@ -64,20 +63,19 @@ def current_span
end
end

def set_current_span(span)
@current_span = span
if span
@parent_trace_id = span.trace_id
@parent_span_id = span.span_id
@sampled = span.sampled
else
@parent_span_id = nil
end
end

# Add a span to the context trace list, keeping it as the last active span.
def add_span(span)
@mutex.synchronize do
# If hitting the hard limit, just drop spans. This is really a rare case
# as it means despite the soft limit, the hard limit is reached, so the trace
# by default has 10000 spans, all of which belong to unfinished parts of a
# larger trace. This is a catch-all to reduce global memory usage.
if @max_length > 0 && @trace.length >= @max_length
Datadog::Tracer.log.debug("context full, ignoring span #{span.name}")
# Detach the span from any context, it's being dropped and ignored.
span.context = nil
return
end
set_current_span(span)
@trace << span
span.context = self
Expand Down Expand Up @@ -105,12 +103,6 @@ def close_span(span)
end
end

# Returns if the trace for the current Context is finished or not.
# Low-level internal function, not thread-safe.
def check_finished_spans
@finished_spans > 0 && @trace.length == @finished_spans
end

# Returns if the trace for the current Context is finished or not. A \Context
# is considered finished if all spans in this context are finished.
def finished?
Expand All @@ -135,14 +127,16 @@ def sampled?
# This operation is thread-safe.
def get
@mutex.synchronize do
return nil, nil unless check_finished_spans

trace = @trace
sampled = @sampled

attach_sampling_priority if sampled && @sampling_priority

# still return sampled attribute, even if context is not finished
return nil, sampled unless check_finished_spans()

reset
return trace, sampled
[trace, sampled]
end
end

Expand All @@ -154,17 +148,85 @@ def to_s
end
end

private

def reset(options = {})
@trace = []
@parent_trace_id = options.fetch(:trace_id, nil)
@parent_span_id = options.fetch(:span_id, nil)
@sampled = options.fetch(:sampled, false)
@sampling_priority = options.fetch(:sampling_priority, nil)
@finished_spans = 0
@current_span = nil
end

def set_current_span(span)
@current_span = span
if span
@parent_trace_id = span.trace_id
@parent_span_id = span.span_id
@sampled = span.sampled
else
@parent_span_id = nil
end
end

# Returns if the trace for the current Context is finished or not.
# Low-level internal function, not thread-safe.
def check_finished_spans
@finished_spans > 0 && @trace.length == @finished_spans
end

def attach_sampling_priority
@trace.first.set_metric(
Ext::DistributedTracing::SAMPLING_PRIORITY_KEY,
@sampling_priority
)
end

private :reset
private :check_finished_spans
private :set_current_span
private :attach_sampling_priority
# Return the start time of the root span, or nil if there are no spans or this is undefined.
def start_time
@mutex.synchronize do
return nil if @trace.empty?
@trace[0].start_time
end
end

# Return the length of the current trace held by this context.
def length
@mutex.synchronize do
@trace.length
end
end

# Iterate on each span within the trace. This is thread safe.
def each_span
@mutex.synchronize do
@trace.each do |span|
yield span
end
end
end

# Delete any span matching the condition. This is thread safe.
def delete_span_if
@mutex.synchronize do
@trace.delete_if do |span|
finished = span.finished?
delete_span = yield span
if delete_span
# We need to detach the span from the context, else, some code
# finishing it afterwards would mess up with the number of
# finished_spans and possibly cause other side effects.
span.context = nil
# Acknowledge there's one span less to finish, if needed.
# It's very important to keep this balanced.
@finished_spans -= 1 if finished
end
delete_span
end
end
end
end

# ThreadLocalContext can be used as a tracer global reference to create
Expand Down
132 changes: 132 additions & 0 deletions lib/ddtrace/context_flush.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
require 'set'

require 'ddtrace/context'

module Datadog
# \ContextFlush is used to cap context size and avoid it using too much memory.
# It performs memory flushes when required.
class ContextFlush
# by default, soft and hard limits are the same
DEFAULT_MAX_SPANS_BEFORE_PARTIAL_FLUSH = Datadog::Context::DEFAULT_MAX_LENGTH
# by default, never do a partial flush
DEFAULT_MIN_SPANS_BEFORE_PARTIAL_FLUSH = Datadog::Context::DEFAULT_MAX_LENGTH
# timeout should be lower than the trace agent window
DEFAULT_PARTIAL_FLUSH_TIMEOUT = 10

private_constant :DEFAULT_MAX_SPANS_BEFORE_PARTIAL_FLUSH
private_constant :DEFAULT_MIN_SPANS_BEFORE_PARTIAL_FLUSH
private_constant :DEFAULT_PARTIAL_FLUSH_TIMEOUT

def initialize(options = {})
# max_spans_before_partial_flush is the amount of spans collected before
# the context starts to partially flush parts of traces. With a setting of 10k,
# the memory overhead is about 10Mb per thread/context (depends on spans metadata,
# this is just an order of magnitude).
@max_spans_before_partial_flush = options.fetch(:max_spans_before_partial_flush,
DEFAULT_MAX_SPANS_BEFORE_PARTIAL_FLUSH)
# min_spans_before_partial_flush is the minimum number of spans required
# for a partial flush to happen on a timeout. This is to prevent partial flush
# of traces which last a very long time but yet have few spans.
@min_spans_before_partial_flush = options.fetch(:min_spans_before_partial_flush,
DEFAULT_MIN_SPANS_BEFORE_PARTIAL_FLUSH)
# partial_flush_timeout is the limit (in seconds) above which the context
# considers flushing parts of the trace. Partial flushes should not be done too
# late else the agent rejects them with a "too far in the past" error.
@partial_flush_timeout = options.fetch(:partial_flush_timeout,
DEFAULT_PARTIAL_FLUSH_TIMEOUT)
@partial_traces = []
end

def add_children(m, spans, ids, leaf)
spans << leaf
ids.add(leaf.span_id)

if m[leaf.span_id]
m[leaf.span_id].each do |sub|
add_children(m, spans, ids, sub)
end
end
end

def partial_traces(context)
# 1st step, taint all parents of an unfinished span as unflushable
unflushable_ids = Set.new

context.send(:each_span) do |span|
next if span.finished? || unflushable_ids.include?(span.span_id)
unflushable_ids.add span.span_id
while span.parent
span = span.parent
unflushable_ids.add span.span_id
end
end

# 2nd step, find all spans which are at the border between flushable and unflushable
# Along the road, collect a reverse-tree which allows direct walking from parents to
# children but only for the ones we're interested it.
roots = []
children_map = {}
context.send(:each_span) do |span|
# There's no point in trying to put the real root in those partial roots, if
# it's flushable, the default algorithm would figure way more quickly.
if span.parent && !unflushable_ids.include?(span.span_id)
if unflushable_ids.include?(span.parent.span_id)
# span is flushable but is parent is not
roots << span
else
# span is flushable and its parent is too, build the reverse
# parent to child map for this one, it will be useful
children_map[span.parent.span_id] ||= []
children_map[span.parent.span_id] << span
end
end
end

# 3rd step, find all children, as this can be costly, only perform it for partial roots
partial_traces = []
all_ids = Set.new
roots.each do |root|
spans = []
add_children(children_map, spans, all_ids, root)
partial_traces << spans
end

return [nil, nil] if partial_traces.empty?
[partial_traces, all_ids]
end

def partial_flush(context)
traces, flushed_ids = partial_traces(context)
return nil unless traces && flushed_ids

# We need to reject by span ID and not by value, because a span
# value may be altered (typical example: it's finished by some other thread)
# since we lock only the context, not all the spans which belong to it.
context.send(:delete_span_if) { |span| flushed_ids.include? span.span_id }
traces
end

# Performs an operation which each partial trace it can get from the context.
def each_partial_trace(context)
start_time = context.send(:start_time)
length = context.send(:length)
# Stop and do not flush anything if there are not enough spans.
return if length <= @min_spans_before_partial_flush
# If there are enough spans, but not too many, check for start time.
# If timeout is not given or 0, then wait
return if length <= @max_spans_before_partial_flush &&
(@partial_flush_timeout.nil? || @partial_flush_timeout <= 0 ||
(start_time && start_time > Time.now.utc - @partial_flush_timeout))
# Here, either the trace is old or we have too many spans, flush it.
traces = partial_flush(context)
return unless traces
traces.each do |trace|
yield trace
end
end

private :add_children
private :partial_traces
private :partial_flush
end
end
31 changes: 29 additions & 2 deletions lib/ddtrace/tracer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

require 'ddtrace/span'
require 'ddtrace/context'
require 'ddtrace/context_flush'
require 'ddtrace/provider'
require 'ddtrace/logger'
require 'ddtrace/writer'
Expand Down Expand Up @@ -97,6 +98,8 @@ def initialize(options = {})
@provider = options.fetch(:context_provider, Datadog::DefaultContextProvider.new)
@provider ||= Datadog::DefaultContextProvider.new # @provider should never be nil

@context_flush = Datadog::ContextFlush.new(options) if options[:partial_flush]

@mutex = Mutex.new
@services = {}
@tags = {}
Expand All @@ -117,8 +120,13 @@ def configure(options = {})
enabled = options.fetch(:enabled, nil)
hostname = options.fetch(:hostname, nil)
port = options.fetch(:port, nil)

# Those are rare "power-user" options.
sampler = options.fetch(:sampler, nil)
priority_sampling = options[:priority_sampling]
max_spans_before_partial_flush = options.fetch(:max_spans_before_partial_flush, nil)
min_spans_before_partial_flush = options.fetch(:max_spans_before_partial_flush, nil)
partial_flush_timeout = options.fetch(:partial_flush_timeout, nil)

@enabled = enabled unless enabled.nil?
@sampler = sampler unless sampler.nil?
Expand All @@ -130,6 +138,10 @@ def configure(options = {})

@writer.transport.hostname = hostname unless hostname.nil?
@writer.transport.port = port unless port.nil?

@context_flush = Datadog::ContextFlush.new(options) unless min_spans_before_partial_flush.nil? &&
max_spans_before_partial_flush.nil? &&
partial_flush_timeout.nil?
end

# Set the information about the given service. A valid example is:
Expand Down Expand Up @@ -295,8 +307,23 @@ def record(context)
context = context.context if context.is_a?(Datadog::Span)
return if context.nil?
trace, sampled = context.get
ready = !trace.nil? && !trace.empty? && sampled
write(trace) if ready

# If context flushing is configured...
if @context_flush
if sampled
if trace.nil? || trace.empty?
@context_flush.each_partial_trace(context) do |t|
write(t)
end
else
write(trace)
end
end
# Default behavior
else
ready = !trace.nil? && !trace.empty? && sampled
write(trace) if ready
end
end

# Return the current active span or +nil+.
Expand Down
Loading

0 comments on commit b3a85cd

Please sign in to comment.