Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tracer] reduce memory usage on high-cardinality traces #247

Merged
merged 4 commits into from
Apr 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/GettingStarted.md
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,7 @@ Available options are:
- ``env``: set the environment. Rails users may set it to ``Rails.env`` to use their application settings.
- ``tags``: set global tags that should be applied to all spans. Defaults to an empty hash
- ``log``: defines a custom logger.
- ``partial_flush``: set to ``true`` to enable partial trace flushing (for long running traces.) Disabled by default. *Experimental.*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep this experimental for some releases. Then we may improve it, make it fully supported, or replace with a new behavior.


#### Custom logging

Expand Down
130 changes: 96 additions & 34 deletions lib/ddtrace/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,22 @@ module Datadog
# \Context, it will be related to the original trace.
#
# This data structure is thread-safe.
# rubocop:disable Metrics/ClassLength
class Context
# 100k spans is about a 100Mb footprint
DEFAULT_MAX_LENGTH = 100_000

attr_reader :max_length

# Initialize a new thread-safe \Context.
def initialize(options = {})
@mutex = Mutex.new
# max_length is the amount of spans above which, for a given trace,
# the context will simply drop and ignore spans, avoiding high memory usage.
@max_length = options.fetch(:max_length, DEFAULT_MAX_LENGTH)
reset(options)
end

def reset(options = {})
@trace = []
@parent_trace_id = options.fetch(:trace_id, nil)
@parent_span_id = options.fetch(:span_id, nil)
@sampled = options.fetch(:sampled, false)
@sampling_priority = options.fetch(:sampling_priority, nil)
@finished_spans = 0
@current_span = nil
end

def trace_id
@mutex.synchronize do
@parent_trace_id
Expand Down Expand Up @@ -64,20 +63,19 @@ def current_span
end
end

def set_current_span(span)
@current_span = span
if span
@parent_trace_id = span.trace_id
@parent_span_id = span.span_id
@sampled = span.sampled
else
@parent_span_id = nil
end
end

# Add a span to the context trace list, keeping it as the last active span.
def add_span(span)
@mutex.synchronize do
# If hitting the hard limit, just drop spans. This is really a rare case
# as it means despite the soft limit, the hard limit is reached, so the trace
# by default has 10000 spans, all of which belong to unfinished parts of a
# larger trace. This is a catch-all to reduce global memory usage.
if @max_length > 0 && @trace.length >= @max_length
Datadog::Tracer.log.debug("context full, ignoring span #{span.name}")
# Detach the span from any context, it's being dropped and ignored.
span.context = nil
return
end
set_current_span(span)
@trace << span
span.context = self
Expand Down Expand Up @@ -105,12 +103,6 @@ def close_span(span)
end
end

# Returns if the trace for the current Context is finished or not.
# Low-level internal function, not thread-safe.
def check_finished_spans
@finished_spans > 0 && @trace.length == @finished_spans
end

# Returns if the trace for the current Context is finished or not. A \Context
# is considered finished if all spans in this context are finished.
def finished?
Expand All @@ -135,14 +127,16 @@ def sampled?
# This operation is thread-safe.
def get
@mutex.synchronize do
return nil, nil unless check_finished_spans

trace = @trace
sampled = @sampled

attach_sampling_priority if sampled && @sampling_priority

# still return sampled attribute, even if context is not finished
return nil, sampled unless check_finished_spans()

reset
return trace, sampled
[trace, sampled]
end
end

Expand All @@ -154,17 +148,85 @@ def to_s
end
end

private

def reset(options = {})
@trace = []
@parent_trace_id = options.fetch(:trace_id, nil)
@parent_span_id = options.fetch(:span_id, nil)
@sampled = options.fetch(:sampled, false)
@sampling_priority = options.fetch(:sampling_priority, nil)
@finished_spans = 0
@current_span = nil
end

def set_current_span(span)
@current_span = span
if span
@parent_trace_id = span.trace_id
@parent_span_id = span.span_id
@sampled = span.sampled
else
@parent_span_id = nil
end
end

# Returns if the trace for the current Context is finished or not.
# Low-level internal function, not thread-safe.
def check_finished_spans
@finished_spans > 0 && @trace.length == @finished_spans
end

def attach_sampling_priority
@trace.first.set_metric(
Ext::DistributedTracing::SAMPLING_PRIORITY_KEY,
@sampling_priority
)
end

private :reset
private :check_finished_spans
private :set_current_span
private :attach_sampling_priority
# Return the start time of the root span, or nil if there are no spans or this is undefined.
def start_time
@mutex.synchronize do
return nil if @trace.empty?
@trace[0].start_time
end
end

# Return the length of the current trace held by this context.
def length
@mutex.synchronize do
@trace.length
end
end

# Iterate on each span within the trace. This is thread safe.
def each_span
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@mutex.synchronize do
@trace.each do |span|
yield span
end
end
end

# Delete any span matching the condition. This is thread safe.
def delete_span_if
@mutex.synchronize do
@trace.delete_if do |span|
finished = span.finished?
delete_span = yield span
if delete_span
# We need to detach the span from the context, else, some code
# finishing it afterwards would mess up with the number of
# finished_spans and possibly cause other side effects.
span.context = nil
# Acknowledge there's one span less to finish, if needed.
# It's very important to keep this balanced.
@finished_spans -= 1 if finished
end
delete_span
end
end
end
end

# ThreadLocalContext can be used as a tracer global reference to create
Expand Down
132 changes: 132 additions & 0 deletions lib/ddtrace/context_flush.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
require 'set'

require 'ddtrace/context'

module Datadog
# \ContextFlush is used to cap context size and avoid it using too much memory.
# It performs memory flushes when required.
class ContextFlush
# by default, soft and hard limits are the same
DEFAULT_MAX_SPANS_BEFORE_PARTIAL_FLUSH = Datadog::Context::DEFAULT_MAX_LENGTH
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set this to 1000 to have partial flush be triggered for anything bigger that 1000 spans.

# by default, never do a partial flush
DEFAULT_MIN_SPANS_BEFORE_PARTIAL_FLUSH = Datadog::Context::DEFAULT_MAX_LENGTH
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set this to 10 to avoid anything smaller that 10 spans to be flushed, even partially.

# timeout should be lower than the trace agent window
DEFAULT_PARTIAL_FLUSH_TIMEOUT = 10

private_constant :DEFAULT_MAX_SPANS_BEFORE_PARTIAL_FLUSH
private_constant :DEFAULT_MIN_SPANS_BEFORE_PARTIAL_FLUSH
private_constant :DEFAULT_PARTIAL_FLUSH_TIMEOUT

def initialize(options = {})
# max_spans_before_partial_flush is the amount of spans collected before
# the context starts to partially flush parts of traces. With a setting of 10k,
# the memory overhead is about 10Mb per thread/context (depends on spans metadata,
# this is just an order of magnitude).
@max_spans_before_partial_flush = options.fetch(:max_spans_before_partial_flush,
DEFAULT_MAX_SPANS_BEFORE_PARTIAL_FLUSH)
# min_spans_before_partial_flush is the minimum number of spans required
# for a partial flush to happen on a timeout. This is to prevent partial flush
# of traces which last a very long time but yet have few spans.
@min_spans_before_partial_flush = options.fetch(:min_spans_before_partial_flush,
DEFAULT_MIN_SPANS_BEFORE_PARTIAL_FLUSH)
# partial_flush_timeout is the limit (in seconds) above which the context
# considers flushing parts of the trace. Partial flushes should not be done too
# late else the agent rejects them with a "too far in the past" error.
@partial_flush_timeout = options.fetch(:partial_flush_timeout,
DEFAULT_PARTIAL_FLUSH_TIMEOUT)
@partial_traces = []
end

def add_children(m, spans, ids, leaf)
spans << leaf
ids.add(leaf.span_id)

if m[leaf.span_id]
m[leaf.span_id].each do |sub|
add_children(m, spans, ids, sub)
end
end
end

def partial_traces(context)
# 1st step, taint all parents of an unfinished span as unflushable
unflushable_ids = Set.new

context.send(:each_span) do |span|
next if span.finished? || unflushable_ids.include?(span.span_id)
unflushable_ids.add span.span_id
while span.parent
span = span.parent
unflushable_ids.add span.span_id
end
end

# 2nd step, find all spans which are at the border between flushable and unflushable
# Along the road, collect a reverse-tree which allows direct walking from parents to
# children but only for the ones we're interested it.
roots = []
children_map = {}
context.send(:each_span) do |span|
# There's no point in trying to put the real root in those partial roots, if
# it's flushable, the default algorithm would figure way more quickly.
if span.parent && !unflushable_ids.include?(span.span_id)
if unflushable_ids.include?(span.parent.span_id)
# span is flushable but is parent is not
roots << span
else
# span is flushable and its parent is too, build the reverse
# parent to child map for this one, it will be useful
children_map[span.parent.span_id] ||= []
children_map[span.parent.span_id] << span
end
end
end

# 3rd step, find all children, as this can be costly, only perform it for partial roots
partial_traces = []
all_ids = Set.new
roots.each do |root|
spans = []
add_children(children_map, spans, all_ids, root)
partial_traces << spans
end

return [nil, nil] if partial_traces.empty?
[partial_traces, all_ids]
end

def partial_flush(context)
traces, flushed_ids = partial_traces(context)
return nil unless traces && flushed_ids

# We need to reject by span ID and not by value, because a span
# value may be altered (typical example: it's finished by some other thread)
# since we lock only the context, not all the spans which belong to it.
context.send(:delete_span_if) { |span| flushed_ids.include? span.span_id }
traces
end

# Performs an operation which each partial trace it can get from the context.
def each_partial_trace(context)
start_time = context.send(:start_time)
length = context.send(:length)
# Stop and do not flush anything if there are not enough spans.
return if length <= @min_spans_before_partial_flush
# If there are enough spans, but not too many, check for start time.
# If timeout is not given or 0, then wait
return if length <= @max_spans_before_partial_flush &&
(@partial_flush_timeout.nil? || @partial_flush_timeout <= 0 ||
(start_time && start_time > Time.now.utc - @partial_flush_timeout))
# Here, either the trace is old or we have too many spans, flush it.
traces = partial_flush(context)
return unless traces
traces.each do |trace|
yield trace
end
end

private :add_children
private :partial_traces
private :partial_flush
end
end
31 changes: 29 additions & 2 deletions lib/ddtrace/tracer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

require 'ddtrace/span'
require 'ddtrace/context'
require 'ddtrace/context_flush'
require 'ddtrace/provider'
require 'ddtrace/logger'
require 'ddtrace/writer'
Expand Down Expand Up @@ -97,6 +98,8 @@ def initialize(options = {})
@provider = options.fetch(:context_provider, Datadog::DefaultContextProvider.new)
@provider ||= Datadog::DefaultContextProvider.new # @provider should never be nil

@context_flush = Datadog::ContextFlush.new(options) if options[:partial_flush]

@mutex = Mutex.new
@services = {}
@tags = {}
Expand All @@ -117,8 +120,13 @@ def configure(options = {})
enabled = options.fetch(:enabled, nil)
hostname = options.fetch(:hostname, nil)
port = options.fetch(:port, nil)

# Those are rare "power-user" options.
sampler = options.fetch(:sampler, nil)
priority_sampling = options[:priority_sampling]
max_spans_before_partial_flush = options.fetch(:max_spans_before_partial_flush, nil)
min_spans_before_partial_flush = options.fetch(:max_spans_before_partial_flush, nil)
partial_flush_timeout = options.fetch(:partial_flush_timeout, nil)

@enabled = enabled unless enabled.nil?
@sampler = sampler unless sampler.nil?
Expand All @@ -130,6 +138,10 @@ def configure(options = {})

@writer.transport.hostname = hostname unless hostname.nil?
@writer.transport.port = port unless port.nil?

@context_flush = Datadog::ContextFlush.new(options) unless min_spans_before_partial_flush.nil? &&
max_spans_before_partial_flush.nil? &&
partial_flush_timeout.nil?
end

# Set the information about the given service. A valid example is:
Expand Down Expand Up @@ -295,8 +307,23 @@ def record(context)
context = context.context if context.is_a?(Datadog::Span)
return if context.nil?
trace, sampled = context.get
ready = !trace.nil? && !trace.empty? && sampled
write(trace) if ready

# If context flushing is configured...
if @context_flush
if sampled
if trace.nil? || trace.empty?
@context_flush.each_partial_trace(context) do |t|
write(t)
end
else
write(trace)
end
end
# Default behavior
else
ready = !trace.nil? && !trace.empty? && sampled
write(trace) if ready
end
end

# Return the current active span or +nil+.
Expand Down
Loading