From 959f0b7b05c651d483248ca77175d14c383f5f9f Mon Sep 17 00:00:00 2001 From: Pedro Tanaka Date: Wed, 30 Oct 2024 12:36:53 +0100 Subject: [PATCH] Aggregation: send sampling rate for timings when it is given (#387) Signed-off-by: Pedro Tanaka --- CHANGELOG.md | 5 +++++ lib/statsd/instrument/aggregator.rb | 26 ++++++++++++++++++-------- lib/statsd/instrument/version.rb | 2 +- test/aggregator_test.rb | 24 +++++++++++++++++++++++- 4 files changed, 47 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ee8c29e..a3d39c9f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,11 @@ section below. ## Unreleased changes +## Version 3.9.5 + +- [#387](https://github.com/Shopify/statsd-instrument/pull/387) - Fixing bug when using aggregation and sending sampled +histogram metrics, they will not be scaled properly because of missing sampling rate in the final sent sample. + ## Version 3.9.4 - [#384](https://github.com/Shopify/statsd-instrument/pull/384) - Aggregation: fixing bug when sending metrics synchronously diff --git a/lib/statsd/instrument/aggregator.rb b/lib/statsd/instrument/aggregator.rb index 56b74b31..b1c4bfd0 100644 --- a/lib/statsd/instrument/aggregator.rb +++ b/lib/statsd/instrument/aggregator.rb @@ -3,13 +3,14 @@ module StatsD module Instrument class AggregationKey - attr_reader :name, :tags, :no_prefix, :type, :hash + attr_reader :name, :tags, :no_prefix, :type, :hash, :sample_rate - def initialize(name, tags, no_prefix, type) + def initialize(name, tags, no_prefix, type, sample_rate: 1.0) @name = name @tags = tags @no_prefix = no_prefix @type = type + @sample_rate = sample_rate @hash = [@name, @tags, @no_prefix, @type].hash end @@ -56,7 +57,7 @@ def finalize(aggregation_state, sink, datagram_builders, datagram_builder_class, key.name, key.type.to_s, agg_value, - CONST_SAMPLE_RATE, + key.sample_rate, key.tags, ) when GAUGE @@ -134,7 +135,7 @@ def increment(name, value = 1, tags: [], no_prefix: false) end end - def aggregate_timing(name, value, tags: [], no_prefix: false, type: DISTRIBUTION) + def aggregate_timing(name, value, tags: [], no_prefix: false, type: DISTRIBUTION, sample_rate: CONST_SAMPLE_RATE) unless thread_healthcheck @sink << datagram_builder(no_prefix: no_prefix).timing_value_packed( name, type.to_s, [value], CONST_SAMPLE_RATE, tags @@ -143,7 +144,7 @@ def aggregate_timing(name, value, tags: [], no_prefix: false, type: DISTRIBUTION end tags = tags_sorted(tags) - key = packet_key(name, tags, no_prefix, type) + key = packet_key(name, tags, no_prefix, type, sample_rate: sample_rate) @mutex.synchronize do values = @aggregation_state[key] ||= [] @@ -176,6 +177,9 @@ def flush EMPTY_ARRAY = [].freeze + # Flushes the aggregated metrics to the sink. + # Iterates over the aggregation state and sends each metric to the sink. + # If you change this function, you need to update the logic in the finalizer as well. def do_flush @aggregation_state.each do |key, value| case key.type @@ -191,7 +195,7 @@ def do_flush key.name, key.type.to_s, value, - CONST_SAMPLE_RATE, + key.sample_rate, key.tags, ) when GAUGE @@ -219,8 +223,14 @@ def tags_sorted(tags) datagram_builder(no_prefix: false).normalize_tags(tags) end - def packet_key(name, tags = "".b, no_prefix = false, type = COUNT) - AggregationKey.new(DatagramBuilder.normalize_string(name), tags, no_prefix, type).freeze + def packet_key(name, tags = "".b, no_prefix = false, type = COUNT, sample_rate: CONST_SAMPLE_RATE) + AggregationKey.new( + DatagramBuilder.normalize_string(name), + tags, + no_prefix, + type, + sample_rate: sample_rate, + ).freeze end def datagram_builder(no_prefix:) diff --git a/lib/statsd/instrument/version.rb b/lib/statsd/instrument/version.rb index ddcf1154..ab1269ee 100644 --- a/lib/statsd/instrument/version.rb +++ b/lib/statsd/instrument/version.rb @@ -2,6 +2,6 @@ module StatsD module Instrument - VERSION = "3.9.4" + VERSION = "3.9.5" end end diff --git a/test/aggregator_test.rb b/test/aggregator_test.rb index b59752f6..eeb071ec 100644 --- a/test/aggregator_test.rb +++ b/test/aggregator_test.rb @@ -58,6 +58,23 @@ def test_distribution_simple assert_equal([1.0, 100.0], datagram.value) end + def test_timing_sampling_scaling + @subject.aggregate_timing("timing.sampled", 60.0, sample_rate: 0.01) + @subject.aggregate_timing("timing.sampled", 80.0, sample_rate: 0.01) + @subject.aggregate_timing("timing.unsampled", 60.0, sample_rate: 1.0) + + @subject.flush + + assert_equal(2, @sink.datagrams.size) + + sampled_datagram = @sink.datagrams.find { |d| d.name == "timing.sampled" } + assert_equal([60.0, 80.0], sampled_datagram.value) + assert_equal(0.01, sampled_datagram.sample_rate) + + unsampled_datagram = @sink.datagrams.find { |d| d.name == "timing.unsampled" } + assert_equal(60.0, unsampled_datagram.value) + end + def test_mixed_type_timings @subject.aggregate_timing("foo_ms", 1, tags: { foo: "bar" }, type: :ms) @subject.aggregate_timing("foo_ms", 100, tags: { foo: "bar" }, type: :ms) @@ -304,6 +321,7 @@ def test_finalizer_flushes_pending_metrics @subject.increment("foo", 1, tags: { foo: "bar" }) @subject.aggregate_timing("bar", 100, tags: { foo: "bar" }) @subject.gauge("baz", 100, tags: { foo: "bar" }) + @subject.aggregate_timing("sampled_timing", 100, tags: { foo: "bar" }, sample_rate: 0.01) # Manually trigger the finalizer finalizer = StatsD::Instrument::Aggregator.finalize( @@ -316,7 +334,7 @@ def test_finalizer_flushes_pending_metrics finalizer.call # Verify that all pending metrics are sent - assert_equal(3, @sink.datagrams.size) + assert_equal(4, @sink.datagrams.size) counter_datagram = @sink.datagrams.find { |d| d.name == "foo" } assert_equal(1, counter_datagram.value) @@ -329,5 +347,9 @@ def test_finalizer_flushes_pending_metrics gauge_datagram = @sink.datagrams.find { |d| d.name == "baz" } assert_equal(100, gauge_datagram.value) assert_equal(["foo:bar"], gauge_datagram.tags) + + sampled_timing_datagram = @sink.datagrams.find { |d| d.name == "sampled_timing" } + assert_equal(100.0, sampled_timing_datagram.value) + assert_equal(0.01, sampled_timing_datagram.sample_rate) end end