Skip to content

Commit

Permalink
Aggregation: send sampling rate for timings when it is given (#387)
Browse files Browse the repository at this point in the history
Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>
  • Loading branch information
pedro-stanaka authored Oct 30, 2024
1 parent 5579139 commit 959f0b7
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 10 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ section below.

## Unreleased changes

## Version 3.9.5

- [#387](https://github.com/Shopify/statsd-instrument/pull/387) - Fixing bug when using aggregation and sending sampled
histogram metrics, they will not be scaled properly because of missing sampling rate in the final sent sample.

## Version 3.9.4

- [#384](https://github.com/Shopify/statsd-instrument/pull/384) - Aggregation: fixing bug when sending metrics synchronously
Expand Down
26 changes: 18 additions & 8 deletions lib/statsd/instrument/aggregator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
module StatsD
module Instrument
class AggregationKey
attr_reader :name, :tags, :no_prefix, :type, :hash
attr_reader :name, :tags, :no_prefix, :type, :hash, :sample_rate

def initialize(name, tags, no_prefix, type)
def initialize(name, tags, no_prefix, type, sample_rate: 1.0)
@name = name
@tags = tags
@no_prefix = no_prefix
@type = type
@sample_rate = sample_rate
@hash = [@name, @tags, @no_prefix, @type].hash
end

Expand Down Expand Up @@ -56,7 +57,7 @@ def finalize(aggregation_state, sink, datagram_builders, datagram_builder_class,
key.name,
key.type.to_s,
agg_value,
CONST_SAMPLE_RATE,
key.sample_rate,
key.tags,
)
when GAUGE
Expand Down Expand Up @@ -134,7 +135,7 @@ def increment(name, value = 1, tags: [], no_prefix: false)
end
end

def aggregate_timing(name, value, tags: [], no_prefix: false, type: DISTRIBUTION)
def aggregate_timing(name, value, tags: [], no_prefix: false, type: DISTRIBUTION, sample_rate: CONST_SAMPLE_RATE)
unless thread_healthcheck
@sink << datagram_builder(no_prefix: no_prefix).timing_value_packed(
name, type.to_s, [value], CONST_SAMPLE_RATE, tags
Expand All @@ -143,7 +144,7 @@ def aggregate_timing(name, value, tags: [], no_prefix: false, type: DISTRIBUTION
end

tags = tags_sorted(tags)
key = packet_key(name, tags, no_prefix, type)
key = packet_key(name, tags, no_prefix, type, sample_rate: sample_rate)

@mutex.synchronize do
values = @aggregation_state[key] ||= []
Expand Down Expand Up @@ -176,6 +177,9 @@ def flush

EMPTY_ARRAY = [].freeze

# Flushes the aggregated metrics to the sink.
# Iterates over the aggregation state and sends each metric to the sink.
# If you change this function, you need to update the logic in the finalizer as well.
def do_flush
@aggregation_state.each do |key, value|
case key.type
Expand All @@ -191,7 +195,7 @@ def do_flush
key.name,
key.type.to_s,
value,
CONST_SAMPLE_RATE,
key.sample_rate,
key.tags,
)
when GAUGE
Expand Down Expand Up @@ -219,8 +223,14 @@ def tags_sorted(tags)
datagram_builder(no_prefix: false).normalize_tags(tags)
end

def packet_key(name, tags = "".b, no_prefix = false, type = COUNT)
AggregationKey.new(DatagramBuilder.normalize_string(name), tags, no_prefix, type).freeze
def packet_key(name, tags = "".b, no_prefix = false, type = COUNT, sample_rate: CONST_SAMPLE_RATE)
AggregationKey.new(
DatagramBuilder.normalize_string(name),
tags,
no_prefix,
type,
sample_rate: sample_rate,
).freeze
end

def datagram_builder(no_prefix:)
Expand Down
2 changes: 1 addition & 1 deletion lib/statsd/instrument/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

module StatsD
module Instrument
VERSION = "3.9.4"
VERSION = "3.9.5"
end
end
24 changes: 23 additions & 1 deletion test/aggregator_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,23 @@ def test_distribution_simple
assert_equal([1.0, 100.0], datagram.value)
end

def test_timing_sampling_scaling
@subject.aggregate_timing("timing.sampled", 60.0, sample_rate: 0.01)
@subject.aggregate_timing("timing.sampled", 80.0, sample_rate: 0.01)
@subject.aggregate_timing("timing.unsampled", 60.0, sample_rate: 1.0)

@subject.flush

assert_equal(2, @sink.datagrams.size)

sampled_datagram = @sink.datagrams.find { |d| d.name == "timing.sampled" }
assert_equal([60.0, 80.0], sampled_datagram.value)
assert_equal(0.01, sampled_datagram.sample_rate)

unsampled_datagram = @sink.datagrams.find { |d| d.name == "timing.unsampled" }
assert_equal(60.0, unsampled_datagram.value)
end

def test_mixed_type_timings
@subject.aggregate_timing("foo_ms", 1, tags: { foo: "bar" }, type: :ms)
@subject.aggregate_timing("foo_ms", 100, tags: { foo: "bar" }, type: :ms)
Expand Down Expand Up @@ -304,6 +321,7 @@ def test_finalizer_flushes_pending_metrics
@subject.increment("foo", 1, tags: { foo: "bar" })
@subject.aggregate_timing("bar", 100, tags: { foo: "bar" })
@subject.gauge("baz", 100, tags: { foo: "bar" })
@subject.aggregate_timing("sampled_timing", 100, tags: { foo: "bar" }, sample_rate: 0.01)

# Manually trigger the finalizer
finalizer = StatsD::Instrument::Aggregator.finalize(
Expand All @@ -316,7 +334,7 @@ def test_finalizer_flushes_pending_metrics
finalizer.call

# Verify that all pending metrics are sent
assert_equal(3, @sink.datagrams.size)
assert_equal(4, @sink.datagrams.size)

counter_datagram = @sink.datagrams.find { |d| d.name == "foo" }
assert_equal(1, counter_datagram.value)
Expand All @@ -329,5 +347,9 @@ def test_finalizer_flushes_pending_metrics
gauge_datagram = @sink.datagrams.find { |d| d.name == "baz" }
assert_equal(100, gauge_datagram.value)
assert_equal(["foo:bar"], gauge_datagram.tags)

sampled_timing_datagram = @sink.datagrams.find { |d| d.name == "sampled_timing" }
assert_equal(100.0, sampled_timing_datagram.value)
assert_equal(0.01, sampled_timing_datagram.sample_rate)
end
end

0 comments on commit 959f0b7

Please sign in to comment.