From 3db289f1af971740b02b31d3a23579bee71ddaa1 Mon Sep 17 00:00:00 2001 From: Andrey Marchenko Date: Wed, 12 Jun 2024 16:42:36 +0200 Subject: [PATCH 01/21] rename Telemetry::Heartbeat to Telemetry::Worker --- lib/datadog/core/telemetry/client.rb | 4 ++-- .../core/telemetry/{heartbeat.rb => worker.rb} | 2 +- sig/datadog/core/telemetry/client.rbs | 2 +- sig/datadog/core/telemetry/http/ext.rbs | 2 +- .../telemetry/{heartbeat.rbs => worker.rbs} | 2 +- spec/datadog/core/telemetry/client_spec.rb | 4 ++-- .../{heartbeat_spec.rb => worker_spec.rb} | 18 +++++++++--------- 7 files changed, 17 insertions(+), 17 deletions(-) rename lib/datadog/core/telemetry/{heartbeat.rb => worker.rb} (95%) rename sig/datadog/core/telemetry/{heartbeat.rbs => worker.rbs} (92%) rename spec/datadog/core/telemetry/{heartbeat_spec.rb => worker_spec.rb} (63%) diff --git a/lib/datadog/core/telemetry/client.rb b/lib/datadog/core/telemetry/client.rb index 172145c9342..db5c01673ee 100644 --- a/lib/datadog/core/telemetry/client.rb +++ b/lib/datadog/core/telemetry/client.rb @@ -2,7 +2,7 @@ require_relative 'emitter' require_relative 'event' -require_relative 'heartbeat' +require_relative 'worker' require_relative '../utils/forking' module Datadog @@ -27,7 +27,7 @@ def initialize(heartbeat_interval_seconds:, dependency_collection:, enabled: tru @started = false @dependency_collection = dependency_collection - @worker = Telemetry::Heartbeat.new(enabled: @enabled, heartbeat_interval_seconds: heartbeat_interval_seconds) do + @worker = Telemetry::Worker.new(enabled: @enabled, heartbeat_interval_seconds: heartbeat_interval_seconds) do next unless @started # `started!` should be the first event, thus ensure that `heartbeat!` is not sent first. heartbeat! diff --git a/lib/datadog/core/telemetry/heartbeat.rb b/lib/datadog/core/telemetry/worker.rb similarity index 95% rename from lib/datadog/core/telemetry/heartbeat.rb rename to lib/datadog/core/telemetry/worker.rb index b2129504e68..8e0f106d0bd 100644 --- a/lib/datadog/core/telemetry/heartbeat.rb +++ b/lib/datadog/core/telemetry/worker.rb @@ -7,7 +7,7 @@ module Datadog module Core module Telemetry # Periodically sends a heartbeat event to the telemetry API. - class Heartbeat < Core::Worker + class Worker < Core::Worker include Core::Workers::Polling def initialize(heartbeat_interval_seconds:, enabled: true, &block) diff --git a/sig/datadog/core/telemetry/client.rbs b/sig/datadog/core/telemetry/client.rbs index 9bd2f4a97cc..79aa3850b1c 100644 --- a/sig/datadog/core/telemetry/client.rbs +++ b/sig/datadog/core/telemetry/client.rbs @@ -8,7 +8,7 @@ module Datadog @stopped: bool @emitter: Datadog::Core::Telemetry::Emitter @unsupported: bool - @worker: Datadog::Core::Telemetry::Heartbeat + @worker: Datadog::Core::Telemetry::Worker attr_reader enabled: bool diff --git a/sig/datadog/core/telemetry/http/ext.rbs b/sig/datadog/core/telemetry/http/ext.rbs index 22cea7d1fd0..11271822da3 100644 --- a/sig/datadog/core/telemetry/http/ext.rbs +++ b/sig/datadog/core/telemetry/http/ext.rbs @@ -21,7 +21,7 @@ module Datadog CONTENT_TYPE_APPLICATION_JSON: "application/json" - API_VERSION: "v1" + API_VERSION: "v2" AGENT_ENDPOINT: "/telemetry/proxy/api/v2/apmtelemetry" end diff --git a/sig/datadog/core/telemetry/heartbeat.rbs b/sig/datadog/core/telemetry/worker.rbs similarity index 92% rename from sig/datadog/core/telemetry/heartbeat.rbs rename to sig/datadog/core/telemetry/worker.rbs index b89aeedca8b..779cbd7aa6c 100644 --- a/sig/datadog/core/telemetry/heartbeat.rbs +++ b/sig/datadog/core/telemetry/worker.rbs @@ -1,7 +1,7 @@ module Datadog module Core module Telemetry - class Heartbeat < Core::Worker + class Worker < Core::Worker include Core::Workers::Polling include Core::Workers::Async::Thread include Core::Workers::Async::Thread::PrependedMethods diff --git a/spec/datadog/core/telemetry/client_spec.rb b/spec/datadog/core/telemetry/client_spec.rb index 5618d4ea65e..ebd59516a42 100644 --- a/spec/datadog/core/telemetry/client_spec.rb +++ b/spec/datadog/core/telemetry/client_spec.rb @@ -213,10 +213,10 @@ describe '#stop!' do subject(:stop!) { client.stop! } - let(:worker) { instance_double(Datadog::Core::Telemetry::Heartbeat) } + let(:worker) { instance_double(Datadog::Core::Telemetry::Worker) } before do - allow(Datadog::Core::Telemetry::Heartbeat).to receive(:new) + allow(Datadog::Core::Telemetry::Worker).to receive(:new) .with(enabled: enabled, heartbeat_interval_seconds: heartbeat_interval_seconds).and_return(worker) allow(worker).to receive(:start) allow(worker).to receive(:stop) diff --git a/spec/datadog/core/telemetry/heartbeat_spec.rb b/spec/datadog/core/telemetry/worker_spec.rb similarity index 63% rename from spec/datadog/core/telemetry/heartbeat_spec.rb rename to spec/datadog/core/telemetry/worker_spec.rb index 645120267e0..532e2873375 100644 --- a/spec/datadog/core/telemetry/heartbeat_spec.rb +++ b/spec/datadog/core/telemetry/worker_spec.rb @@ -1,9 +1,9 @@ require 'spec_helper' -require 'datadog/core/telemetry/heartbeat' +require 'datadog/core/telemetry/worker' -RSpec.describe Datadog::Core::Telemetry::Heartbeat do - subject(:heartbeat) do +RSpec.describe Datadog::Core::Telemetry::Worker do + subject(:worker) do described_class.new(enabled: enabled, heartbeat_interval_seconds: heartbeat_interval_seconds, &block) end @@ -12,13 +12,13 @@ let(:block) { proc {} } after do - heartbeat.stop(true, 0) - heartbeat.join + worker.stop(true, 0) + worker.join end describe '.new' do context 'when using default settings' do - subject(:heartbeat) { described_class.new(heartbeat_interval_seconds: heartbeat_interval_seconds, &block) } + subject(:worker) { described_class.new(heartbeat_interval_seconds: heartbeat_interval_seconds, &block) } it do is_expected.to have_attributes( enabled?: true, @@ -32,10 +32,10 @@ let(:enabled) { true } it do - heartbeat + worker - try_wait_until { heartbeat.running? } - expect(heartbeat).to have_attributes( + try_wait_until { worker.running? } + expect(worker).to have_attributes( run_async?: true, running?: true, started?: true From b1061efa78d28be2f799c25224033232fbbb55bf Mon Sep 17 00:00:00 2001 From: Andrey Marchenko Date: Thu, 13 Jun 2024 13:33:40 +0200 Subject: [PATCH 02/21] Add queue to telemetry worker. Move sending heartbeat logic to the telemetry worker. --- lib/datadog/core/telemetry/client.rb | 20 ++++----- lib/datadog/core/telemetry/worker.rb | 31 +++++++++----- sig/datadog/core/telemetry/worker.rbs | 11 +++-- spec/datadog/core/telemetry/client_spec.rb | 3 +- spec/datadog/core/telemetry/worker_spec.rb | 47 +++++++++++++++------- 5 files changed, 70 insertions(+), 42 deletions(-) diff --git a/lib/datadog/core/telemetry/client.rb b/lib/datadog/core/telemetry/client.rb index db5c01673ee..eeeae58556e 100644 --- a/lib/datadog/core/telemetry/client.rb +++ b/lib/datadog/core/telemetry/client.rb @@ -27,11 +27,11 @@ def initialize(heartbeat_interval_seconds:, dependency_collection:, enabled: tru @started = false @dependency_collection = dependency_collection - @worker = Telemetry::Worker.new(enabled: @enabled, heartbeat_interval_seconds: heartbeat_interval_seconds) do - next unless @started # `started!` should be the first event, thus ensure that `heartbeat!` is not sent first. - - heartbeat! - end + @worker = Telemetry::Worker.new( + enabled: @enabled, + heartbeat_interval_seconds: heartbeat_interval_seconds, + emitter: @emitter + ) end def disable! @@ -51,6 +51,8 @@ def started! return res end + @worker.start + @emitter.request(Event::AppDependenciesLoaded.new) if @dependency_collection @started = true @@ -81,14 +83,6 @@ def client_configuration_change!(changes) @emitter.request(Event::AppClientConfigurationChange.new(changes, 'remote_config')) end - - private - - def heartbeat! - return if !@enabled || forked? - - @emitter.request(Event::AppHeartbeat.new) - end end end end diff --git a/lib/datadog/core/telemetry/worker.rb b/lib/datadog/core/telemetry/worker.rb index 8e0f106d0bd..5683f2028b7 100644 --- a/lib/datadog/core/telemetry/worker.rb +++ b/lib/datadog/core/telemetry/worker.rb @@ -1,31 +1,44 @@ # frozen_string_literal: true -require_relative '../worker' +require_relative 'event' + require_relative '../workers/polling' +require_relative '../workers/queue' module Datadog module Core module Telemetry - # Periodically sends a heartbeat event to the telemetry API. - class Worker < Core::Worker + # Accumulates events and sends them to the API at a regular interval, including heartbeat event. + class Worker + include Core::Workers::Queue include Core::Workers::Polling - def initialize(heartbeat_interval_seconds:, enabled: true, &block) + def initialize(heartbeat_interval_seconds:, emitter:, enabled: true) + @emitter = emitter + # Workers::Polling settings self.enabled = enabled # Workers::IntervalLoop settings self.loop_base_interval = heartbeat_interval_seconds self.fork_policy = Core::Workers::Async::Thread::FORK_POLICY_STOP - super(&block) - start end - def loop_wait_before_first_iteration?; end + def start + return if !enabled? || forked? + + perform + end private - def start - perform + def perform(*_events) + return if !enabled? || forked? + + heartbeat! + end + + def heartbeat! + @emitter.request(Event::AppHeartbeat.new) end end end diff --git a/sig/datadog/core/telemetry/worker.rbs b/sig/datadog/core/telemetry/worker.rbs index 779cbd7aa6c..0fa008ebca9 100644 --- a/sig/datadog/core/telemetry/worker.rbs +++ b/sig/datadog/core/telemetry/worker.rbs @@ -1,19 +1,22 @@ module Datadog module Core module Telemetry - class Worker < Core::Worker + class Worker include Core::Workers::Polling include Core::Workers::Async::Thread include Core::Workers::Async::Thread::PrependedMethods include Core::Workers::IntervalLoop + include Core::Workers::Queue - def initialize: (?enabled: bool, heartbeat_interval_seconds: Numeric) ?{ () -> void } -> void + @emitter: Emitter - def loop_wait_before_first_iteration?: () -> bool? + def initialize: (?enabled: bool, heartbeat_interval_seconds: Numeric, emitter: Emitter) -> void + + def start: () -> void private - def start: () -> void + def heartbeat!: () -> void end end end diff --git a/spec/datadog/core/telemetry/client_spec.rb b/spec/datadog/core/telemetry/client_spec.rb index ebd59516a42..c8a89f6264f 100644 --- a/spec/datadog/core/telemetry/client_spec.rb +++ b/spec/datadog/core/telemetry/client_spec.rb @@ -217,7 +217,8 @@ before do allow(Datadog::Core::Telemetry::Worker).to receive(:new) - .with(enabled: enabled, heartbeat_interval_seconds: heartbeat_interval_seconds).and_return(worker) + .with(enabled: enabled, heartbeat_interval_seconds: heartbeat_interval_seconds, emitter: emitter) + .and_return(worker) allow(worker).to receive(:start) allow(worker).to receive(:stop) end diff --git a/spec/datadog/core/telemetry/worker_spec.rb b/spec/datadog/core/telemetry/worker_spec.rb index 532e2873375..0a874112993 100644 --- a/spec/datadog/core/telemetry/worker_spec.rb +++ b/spec/datadog/core/telemetry/worker_spec.rb @@ -4,12 +4,16 @@ RSpec.describe Datadog::Core::Telemetry::Worker do subject(:worker) do - described_class.new(enabled: enabled, heartbeat_interval_seconds: heartbeat_interval_seconds, &block) + described_class.new(enabled: enabled, heartbeat_interval_seconds: heartbeat_interval_seconds, emitter: emitter) end let(:enabled) { true } let(:heartbeat_interval_seconds) { 1.2 } - let(:block) { proc {} } + let(:emitter) { double(Datadog::Core::Telemetry::Emitter) } + + before do + allow(emitter).to receive(:request) + end after do worker.stop(true, 0) @@ -17,29 +21,42 @@ end describe '.new' do - context 'when using default settings' do - subject(:worker) { described_class.new(heartbeat_interval_seconds: heartbeat_interval_seconds, &block) } - it do - is_expected.to have_attributes( - enabled?: true, - loop_base_interval: 1.2, # seconds - task: block - ) - end + it 'creates a new worker in stopped state' do + expect(worker).to have_attributes( + enabled?: true, + loop_base_interval: 1.2, # seconds + run_async?: false, + running?: false, + started?: false + ) end + end + describe '#start' do context 'when enabled' do - let(:enabled) { true } - - it do - worker + it 'starts the worker and sends heartbeat event' do + worker.start try_wait_until { worker.running? } + expect(worker).to have_attributes( + enabled?: true, + loop_base_interval: 1.2, # seconds run_async?: true, running?: true, started?: true ) + expect(emitter).to have_received(:request).with(an_instance_of(Datadog::Core::Telemetry::Event::AppHeartbeat)) + end + end + + context 'when disabled' do + let(:enabled) { false } + + it 'does not start the worker' do + expect(worker).not_to receive(:perform) + + worker.start end end end From d4350cab60dbe80d0959bb741b12d2d0e97b83b3 Mon Sep 17 00:00:00 2001 From: Andrey Marchenko Date: Thu, 13 Jun 2024 15:07:16 +0200 Subject: [PATCH 03/21] move AppStarted telemetry event out of the critical path --- lib/datadog/core/telemetry/client.rb | 14 +--- lib/datadog/core/telemetry/worker.rb | 21 ++++++ lib/datadog/core/workers/polling.rb | 1 + sig/datadog/core/telemetry/client.rbs | 7 -- sig/datadog/core/telemetry/worker.rbs | 5 ++ spec/datadog/core/telemetry/client_spec.rb | 55 +------------- spec/datadog/core/telemetry/worker_spec.rb | 86 ++++++++++++++++++---- 7 files changed, 103 insertions(+), 86 deletions(-) diff --git a/lib/datadog/core/telemetry/client.rb b/lib/datadog/core/telemetry/client.rb index eeeae58556e..07028e96ad5 100644 --- a/lib/datadog/core/telemetry/client.rb +++ b/lib/datadog/core/telemetry/client.rb @@ -10,9 +10,7 @@ module Core module Telemetry # Telemetry entrypoint, coordinates sending telemetry events at various points in app lifecycle. class Client - attr_reader \ - :enabled, - :unsupported + attr_reader :enabled include Core::Utils::Forking @@ -23,7 +21,6 @@ def initialize(heartbeat_interval_seconds:, dependency_collection:, enabled: tru @enabled = enabled @emitter = Emitter.new @stopped = false - @unsupported = false @started = false @dependency_collection = dependency_collection @@ -42,15 +39,6 @@ def disable! def started! return if !@enabled || forked? - res = @emitter.request(Event::AppStarted.new) - - if res.not_found? # Telemetry is only supported by agent versions 7.34 and up - Datadog.logger.debug('Agent does not support telemetry; disabling future telemetry events.') - disable! - @unsupported = true # Prevent telemetry from getting re-enabled - return res - end - @worker.start @emitter.request(Event::AppDependenciesLoaded.new) if @dependency_collection diff --git a/lib/datadog/core/telemetry/worker.rb b/lib/datadog/core/telemetry/worker.rb index 5683f2028b7..9ff7702e747 100644 --- a/lib/datadog/core/telemetry/worker.rb +++ b/lib/datadog/core/telemetry/worker.rb @@ -16,6 +16,8 @@ class Worker def initialize(heartbeat_interval_seconds:, emitter:, enabled: true) @emitter = emitter + @sent_started_event = false + # Workers::Polling settings self.enabled = enabled # Workers::IntervalLoop settings @@ -26,20 +28,39 @@ def initialize(heartbeat_interval_seconds:, emitter:, enabled: true) def start return if !enabled? || forked? + # starts async worker perform end + def sent_started_event? + @sent_started_event + end + private def perform(*_events) return if !enabled? || forked? + unless @sent_started_event + started! + @sent_started_event = true + end + heartbeat! end def heartbeat! @emitter.request(Event::AppHeartbeat.new) end + + def started! + res = @emitter.request(Event::AppStarted.new) + + if res.not_found? # Telemetry is only supported by agent versions 7.34 and up + Datadog.logger.debug('Agent does not support telemetry; disabling future telemetry events.') + self.enabled = false + end + end end end end diff --git a/lib/datadog/core/workers/polling.rb b/lib/datadog/core/workers/polling.rb index cc4318f9afa..1d563cdbf4a 100644 --- a/lib/datadog/core/workers/polling.rb +++ b/lib/datadog/core/workers/polling.rb @@ -24,6 +24,7 @@ def perform(*args) end def stop(force_stop = false, timeout = DEFAULT_SHUTDOWN_TIMEOUT) + # p "in stop" if running? # Attempt graceful stop and wait stop_loop diff --git a/sig/datadog/core/telemetry/client.rbs b/sig/datadog/core/telemetry/client.rbs index 79aa3850b1c..e1646cb1eb8 100644 --- a/sig/datadog/core/telemetry/client.rbs +++ b/sig/datadog/core/telemetry/client.rbs @@ -7,13 +7,10 @@ module Datadog @started: bool @stopped: bool @emitter: Datadog::Core::Telemetry::Emitter - @unsupported: bool @worker: Datadog::Core::Telemetry::Worker attr_reader enabled: bool - attr_reader unsupported: bool - include Core::Utils::Forking def initialize: (heartbeat_interval_seconds: Numeric, dependency_collection: bool, enabled: bool) -> void @@ -29,10 +26,6 @@ module Datadog def stop!: () -> void def integrations_change!: () -> void - - private - - def heartbeat!: () -> void end end end diff --git a/sig/datadog/core/telemetry/worker.rbs b/sig/datadog/core/telemetry/worker.rbs index 0fa008ebca9..e1c5c09cf70 100644 --- a/sig/datadog/core/telemetry/worker.rbs +++ b/sig/datadog/core/telemetry/worker.rbs @@ -9,14 +9,19 @@ module Datadog include Core::Workers::Queue @emitter: Emitter + @sent_started_event: bool def initialize: (?enabled: bool, heartbeat_interval_seconds: Numeric, emitter: Emitter) -> void def start: () -> void + def sent_started_event?: () -> bool + private def heartbeat!: () -> void + + def started!: () -> void end end end diff --git a/spec/datadog/core/telemetry/client_spec.rb b/spec/datadog/core/telemetry/client_spec.rb index c8a89f6264f..8532ad80185 100644 --- a/spec/datadog/core/telemetry/client_spec.rb +++ b/spec/datadog/core/telemetry/client_spec.rb @@ -12,7 +12,7 @@ end let(:enabled) { true } - let(:heartbeat_interval_seconds) { 1.3 } + let(:heartbeat_interval_seconds) { 0 } let(:dependency_collection) { true } let(:emitter) { double(Datadog::Core::Telemetry::Emitter) } let(:response) { double(Datadog::Core::Telemetry::Http::Adapters::Net::Response) } @@ -83,15 +83,12 @@ context 'when dependency_collection is true' do it do - app_started = double - allow(Datadog::Core::Telemetry::Event::AppStarted).to receive(:new).with(no_args).and_return(app_started) - dependencies = double allow(Datadog::Core::Telemetry::Event::AppDependenciesLoaded) .to receive(:new).with(no_args).and_return(dependencies) started! - expect(emitter).to have_received(:request).with(app_started) + expect(emitter).to have_received(:request).with(dependencies) end end @@ -100,63 +97,17 @@ let(:dependency_collection) { false } it do - app_started = double - allow(Datadog::Core::Telemetry::Event::AppStarted).to receive(:new).with(no_args).and_return(app_started) - dependencies = double allow(Datadog::Core::Telemetry::Event::AppDependenciesLoaded) .to receive(:new).with(no_args).and_return(dependencies) started! - expect(emitter).to have_received(:request).with(app_started) - expect(emitter).to_not have_received(:request).with(dependencies) - end - - context 'with heartbeat' do - let(:heartbeat_interval_seconds) { 0 } - it 'sends a heartbeat strictly after app-started' do - @sent_hearbeat = false - allow(emitter).to receive(:request).with(kind_of(Datadog::Core::Telemetry::Event::AppHeartbeat)) do - # Ensure app-started was already sent by now - expect(emitter).to have_received(:request).with(kind_of(Datadog::Core::Telemetry::Event::AppStarted)) - @sent_hearbeat = true - response - end - - client.started! - - try_wait_until { @sent_hearbeat } - end + expect(emitter).to_not have_received(:request).with(dependencies) end end end - context 'when internal error returned by emitter' do - let(:response) { Datadog::Core::Telemetry::Http::InternalErrorResponse.new('error') } - - it { expect { started! }.to_not raise_error } - end - - context 'when response returns 404' do - let(:not_found) { true } - - before do - logger = double(Datadog::Core::Logger) - allow(logger).to receive(:debug).with(any_args) - allow(Datadog).to receive(:logger).and_return(logger) - end - - it do - started! - expect(client.enabled).to be(false) - expect(client.unsupported).to be(true) - expect(Datadog.logger).to have_received(:debug).with( - 'Agent does not support telemetry; disabling future telemetry events.' - ) - end - end - context 'when in fork' do before { skip 'Fork not supported on current platform' unless Process.respond_to?(:fork) } diff --git a/spec/datadog/core/telemetry/worker_spec.rb b/spec/datadog/core/telemetry/worker_spec.rb index 0a874112993..b2000f502a3 100644 --- a/spec/datadog/core/telemetry/worker_spec.rb +++ b/spec/datadog/core/telemetry/worker_spec.rb @@ -8,11 +8,13 @@ end let(:enabled) { true } - let(:heartbeat_interval_seconds) { 1.2 } + let(:heartbeat_interval_seconds) { 0.5 } let(:emitter) { double(Datadog::Core::Telemetry::Emitter) } before do - allow(emitter).to receive(:request) + logger = double(Datadog::Core::Logger) + allow(logger).to receive(:debug).with(any_args) + allow(Datadog).to receive(:logger).and_return(logger) end after do @@ -24,7 +26,7 @@ it 'creates a new worker in stopped state' do expect(worker).to have_attributes( enabled?: true, - loop_base_interval: 1.2, # seconds + loop_base_interval: heartbeat_interval_seconds, run_async?: false, running?: false, started?: false @@ -34,19 +36,75 @@ describe '#start' do context 'when enabled' do - it 'starts the worker and sends heartbeat event' do - worker.start + let(:response) do + double(Datadog::Core::Telemetry::Http::Adapters::Net::Response, not_found?: !backend_supports_telemetry?) + end + + before do + expect(emitter).to receive(:request) + .with(an_instance_of(Datadog::Core::Telemetry::Event::AppStarted)) + .and_return(response) + end + + context "when backend doesn't support telemetry" do + let(:backend_supports_telemetry?) { false } + + it 'disables the worker' do + worker.start + + try_wait_until { worker.sent_started_event? } + + expect(worker).to have_attributes( + enabled?: false, + loop_base_interval: heartbeat_interval_seconds, + ) + expect(Datadog.logger).to have_received(:debug).with( + 'Agent does not support telemetry; disabling future telemetry events.' + ) + end + end + + context 'when backend supports telemetry' do + let(:backend_supports_telemetry?) { true } + + it 'starts the worker and sends heartbeat event' do + expect(emitter).to receive(:request) + .with(an_instance_of(Datadog::Core::Telemetry::Event::AppHeartbeat)) + + worker.start + + try_wait_until { worker.sent_started_event? } + + expect(worker).to have_attributes( + enabled?: true, + loop_base_interval: heartbeat_interval_seconds, + run_async?: true, + running?: true, + started?: true + ) + end + + it 'always sends heartbeat event after started event' do + @sent_hearbeat = false + allow(emitter).to receive(:request).with(kind_of(Datadog::Core::Telemetry::Event::AppHeartbeat)) do + # app-started was already sent by now + expect(worker.sent_started_event?).to be(true) + + @sent_hearbeat = true + + response + end + + worker.start + + try_wait_until { @sent_hearbeat } + end + end - try_wait_until { worker.running? } + context 'when internal error returned by emitter' do + let(:response) { Datadog::Core::Telemetry::Http::InternalErrorResponse.new('error') } - expect(worker).to have_attributes( - enabled?: true, - loop_base_interval: 1.2, # seconds - run_async?: true, - running?: true, - started?: true - ) - expect(emitter).to have_received(:request).with(an_instance_of(Datadog::Core::Telemetry::Event::AppHeartbeat)) + it { expect { worker.start }.to_not raise_error } end end From bb9f888de3c93ba52f8ca5ce2e32944697277947 Mon Sep 17 00:00:00 2001 From: Andrey Marchenko Date: Fri, 14 Jun 2024 10:38:59 +0200 Subject: [PATCH 04/21] fix failing tests by waiting for worker startup --- lib/datadog/core/workers/polling.rb | 1 - spec/datadog/core/telemetry/worker_spec.rb | 16 +++++++++++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/lib/datadog/core/workers/polling.rb b/lib/datadog/core/workers/polling.rb index 1d563cdbf4a..cc4318f9afa 100644 --- a/lib/datadog/core/workers/polling.rb +++ b/lib/datadog/core/workers/polling.rb @@ -24,7 +24,6 @@ def perform(*args) end def stop(force_stop = false, timeout = DEFAULT_SHUTDOWN_TIMEOUT) - # p "in stop" if running? # Attempt graceful stop and wait stop_loop diff --git a/spec/datadog/core/telemetry/worker_spec.rb b/spec/datadog/core/telemetry/worker_spec.rb index b2000f502a3..66dd3ebb416 100644 --- a/spec/datadog/core/telemetry/worker_spec.rb +++ b/spec/datadog/core/telemetry/worker_spec.rb @@ -104,7 +104,21 @@ context 'when internal error returned by emitter' do let(:response) { Datadog::Core::Telemetry::Http::InternalErrorResponse.new('error') } - it { expect { worker.start }.to_not raise_error } + it do + expect do + worker.start + + try_wait_until { worker.sent_started_event? } + + expect(worker).to have_attributes( + enabled?: true, + loop_base_interval: heartbeat_interval_seconds, + run_async?: true, + running?: true, + started?: true + ) + end.to_not raise_error + end end end From a11e48d566c810cb9a2fd4f4db2291240002a3dc Mon Sep 17 00:00:00 2001 From: Andrey Marchenko Date: Fri, 14 Jun 2024 10:59:36 +0200 Subject: [PATCH 05/21] debug logging, flushing events, attempt at fixing failing test for worker --- lib/datadog/core/telemetry/worker.rb | 28 +++++++++++++++++++--- sig/datadog/core/telemetry/worker.rbs | 4 ++++ spec/datadog/core/telemetry/worker_spec.rb | 3 +++ 3 files changed, 32 insertions(+), 3 deletions(-) diff --git a/lib/datadog/core/telemetry/worker.rb b/lib/datadog/core/telemetry/worker.rb index 9ff7702e747..4e7ca78936e 100644 --- a/lib/datadog/core/telemetry/worker.rb +++ b/lib/datadog/core/telemetry/worker.rb @@ -38,7 +38,7 @@ def sent_started_event? private - def perform(*_events) + def perform(*events) return if !enabled? || forked? unless @sent_started_event @@ -47,20 +47,42 @@ def perform(*_events) end heartbeat! + + send_events(events) + end + + def send_events(events) + return unless enabled? + + Datadog.logger.debug { "Sending #{events&.count} telemetry events" } + (events || []).each do |event| + send_event(event) + end end def heartbeat! - @emitter.request(Event::AppHeartbeat.new) + return unless enabled? + + send_event(Event::AppHeartbeat.new) end def started! - res = @emitter.request(Event::AppStarted.new) + return unless enabled? + + res = send_event(Event::AppStarted.new) if res.not_found? # Telemetry is only supported by agent versions 7.34 and up Datadog.logger.debug('Agent does not support telemetry; disabling future telemetry events.') self.enabled = false end end + + def send_event(event) + Datadog.logger.debug { "Sending telemetry event: #{event}" } + response = @emitter.request(event) + Datadog.logger.debug { "Received response: #{response}" } + response + end end end end diff --git a/sig/datadog/core/telemetry/worker.rbs b/sig/datadog/core/telemetry/worker.rbs index e1c5c09cf70..e8855724a6d 100644 --- a/sig/datadog/core/telemetry/worker.rbs +++ b/sig/datadog/core/telemetry/worker.rbs @@ -22,6 +22,10 @@ module Datadog def heartbeat!: () -> void def started!: () -> void + + def send_events: (Array[Event::Base] events) -> void + + def send_event: (Event::Base event) -> Datadog::Core::Telemetry::Http::Adapters::Net::Response end end end diff --git a/spec/datadog/core/telemetry/worker_spec.rb b/spec/datadog/core/telemetry/worker_spec.rb index 66dd3ebb416..0f4d9919e9a 100644 --- a/spec/datadog/core/telemetry/worker_spec.rb +++ b/spec/datadog/core/telemetry/worker_spec.rb @@ -105,6 +105,9 @@ let(:response) { Datadog::Core::Telemetry::Http::InternalErrorResponse.new('error') } it do + expect(emitter).to receive(:request) + .with(an_instance_of(Datadog::Core::Telemetry::Event::AppHeartbeat)) + expect do worker.start From 6b9204ee9b137b880c8b18f1652fbbe7c13fc635 Mon Sep 17 00:00:00 2001 From: Andrey Marchenko Date: Fri, 14 Jun 2024 11:34:25 +0200 Subject: [PATCH 06/21] don't send heartbeat event if started event wasn't successfully sent --- lib/datadog/core/telemetry/worker.rb | 18 ++++---- sig/datadog/core/telemetry/worker.rbs | 2 +- spec/datadog/core/telemetry/worker_spec.rb | 50 +++++++++++----------- 3 files changed, 37 insertions(+), 33 deletions(-) diff --git a/lib/datadog/core/telemetry/worker.rb b/lib/datadog/core/telemetry/worker.rb index 4e7ca78936e..a837259c3f9 100644 --- a/lib/datadog/core/telemetry/worker.rb +++ b/lib/datadog/core/telemetry/worker.rb @@ -41,18 +41,15 @@ def sent_started_event? def perform(*events) return if !enabled? || forked? - unless @sent_started_event - started! - @sent_started_event = true - end + started! unless sent_started_event? heartbeat! - send_events(events) + flush_events(events) end - def send_events(events) - return unless enabled? + def flush_events(events) + return if !enabled? || !sent_started_event? Datadog.logger.debug { "Sending #{events&.count} telemetry events" } (events || []).each do |event| @@ -61,7 +58,7 @@ def send_events(events) end def heartbeat! - return unless enabled? + return if !enabled? || !sent_started_event? send_event(Event::AppHeartbeat.new) end @@ -75,6 +72,11 @@ def started! Datadog.logger.debug('Agent does not support telemetry; disabling future telemetry events.') self.enabled = false end + + if res.ok? + Datadog.logger.debug('Telemetry app-started event is successfully sent') + @sent_started_event = true + end end def send_event(event) diff --git a/sig/datadog/core/telemetry/worker.rbs b/sig/datadog/core/telemetry/worker.rbs index e8855724a6d..64b3de884d3 100644 --- a/sig/datadog/core/telemetry/worker.rbs +++ b/sig/datadog/core/telemetry/worker.rbs @@ -23,7 +23,7 @@ module Datadog def started!: () -> void - def send_events: (Array[Event::Base] events) -> void + def flush_events: (Array[Event::Base] events) -> void def send_event: (Event::Base event) -> Datadog::Core::Telemetry::Http::Adapters::Net::Response end diff --git a/spec/datadog/core/telemetry/worker_spec.rb b/spec/datadog/core/telemetry/worker_spec.rb index 0f4d9919e9a..8b8aba31784 100644 --- a/spec/datadog/core/telemetry/worker_spec.rb +++ b/spec/datadog/core/telemetry/worker_spec.rb @@ -37,13 +37,28 @@ describe '#start' do context 'when enabled' do let(:response) do - double(Datadog::Core::Telemetry::Http::Adapters::Net::Response, not_found?: !backend_supports_telemetry?) + double( + Datadog::Core::Telemetry::Http::Adapters::Net::Response, + not_found?: !backend_supports_telemetry?, + ok?: backend_supports_telemetry? + ) end before do - expect(emitter).to receive(:request) - .with(an_instance_of(Datadog::Core::Telemetry::Event::AppStarted)) - .and_return(response) + @received_started = false + @received_heartbeat = false + + allow(emitter).to receive(:request).with(an_instance_of(Datadog::Core::Telemetry::Event::AppStarted)) do + @received_started = true + + response + end + + allow(emitter).to receive(:request).with(an_instance_of(Datadog::Core::Telemetry::Event::AppHeartbeat)) do + @received_heartbeat = true + + response + end end context "when backend doesn't support telemetry" do @@ -52,7 +67,7 @@ it 'disables the worker' do worker.start - try_wait_until { worker.sent_started_event? } + try_wait_until { @received_started } expect(worker).to have_attributes( enabled?: false, @@ -61,6 +76,7 @@ expect(Datadog.logger).to have_received(:debug).with( 'Agent does not support telemetry; disabling future telemetry events.' ) + expect(@received_heartbeat).to be(false) end end @@ -68,12 +84,9 @@ let(:backend_supports_telemetry?) { true } it 'starts the worker and sends heartbeat event' do - expect(emitter).to receive(:request) - .with(an_instance_of(Datadog::Core::Telemetry::Event::AppHeartbeat)) - worker.start - try_wait_until { worker.sent_started_event? } + try_wait_until { @received_heartbeat } expect(worker).to have_attributes( enabled?: true, @@ -104,23 +117,12 @@ context 'when internal error returned by emitter' do let(:response) { Datadog::Core::Telemetry::Http::InternalErrorResponse.new('error') } - it do - expect(emitter).to receive(:request) - .with(an_instance_of(Datadog::Core::Telemetry::Event::AppHeartbeat)) - - expect do - worker.start + it 'does not send heartbeat event' do + worker.start - try_wait_until { worker.sent_started_event? } + try_wait_until { @received_started } - expect(worker).to have_attributes( - enabled?: true, - loop_base_interval: heartbeat_interval_seconds, - run_async?: true, - running?: true, - started?: true - ) - end.to_not raise_error + expect(@received_heartbeat).to be(false) end end end From dcedde62cb73432d6d0421229b37fee977883315 Mon Sep 17 00:00:00 2001 From: Andrey Marchenko Date: Fri, 14 Jun 2024 11:42:22 +0200 Subject: [PATCH 07/21] fix client_spec --- spec/datadog/core/telemetry/client_spec.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/spec/datadog/core/telemetry/client_spec.rb b/spec/datadog/core/telemetry/client_spec.rb index 8532ad80185..ed5d991feb7 100644 --- a/spec/datadog/core/telemetry/client_spec.rb +++ b/spec/datadog/core/telemetry/client_spec.rb @@ -22,6 +22,7 @@ allow(Datadog::Core::Telemetry::Emitter).to receive(:new).and_return(emitter) allow(emitter).to receive(:request).and_return(response) allow(response).to receive(:not_found?).and_return(not_found) + allow(response).to receive(:ok?).and_return(!not_found) end describe '#initialize' do From 3a0256fc40933aaea99638cde6ee4ae49bf1b6f5 Mon Sep 17 00:00:00 2001 From: Andrey Marchenko Date: Fri, 14 Jun 2024 13:41:01 +0200 Subject: [PATCH 08/21] enqueue events to be sent later by worker instead of sending them synchronously --- lib/datadog/core/configuration/components.rb | 3 +- lib/datadog/core/telemetry/client.rb | 14 +-- lib/datadog/core/telemetry/worker.rb | 37 +++++- sig/datadog/core/telemetry/client.rbs | 3 +- sig/datadog/core/telemetry/worker.rbs | 10 +- sig/datadog/core/workers/polling.rbs | 2 +- spec/datadog/core/telemetry/client_spec.rb | 117 ++++++++----------- spec/datadog/core/telemetry/worker_spec.rb | 69 +++++++---- 8 files changed, 148 insertions(+), 107 deletions(-) diff --git a/lib/datadog/core/configuration/components.rb b/lib/datadog/core/configuration/components.rb index f7e00d7a981..46106accc12 100644 --- a/lib/datadog/core/configuration/components.rb +++ b/lib/datadog/core/configuration/components.rb @@ -169,8 +169,9 @@ def shutdown!(replacement = nil) unused_statsd = (old_statsd - (old_statsd & new_statsd)) unused_statsd.each(&:close) - telemetry.stop! + # enqueue closing event before stopping telemetry so it will be send out on shutdown telemetry.emit_closing! unless replacement + telemetry.stop! end end end diff --git a/lib/datadog/core/telemetry/client.rb b/lib/datadog/core/telemetry/client.rb index 07028e96ad5..05a96e39239 100644 --- a/lib/datadog/core/telemetry/client.rb +++ b/lib/datadog/core/telemetry/client.rb @@ -19,7 +19,6 @@ class Client # @param [Boolean] dependency_collection Whether to send the `app-dependencies-loaded` event def initialize(heartbeat_interval_seconds:, dependency_collection:, enabled: true) @enabled = enabled - @emitter = Emitter.new @stopped = false @started = false @dependency_collection = dependency_collection @@ -27,7 +26,7 @@ def initialize(heartbeat_interval_seconds:, dependency_collection:, enabled: tru @worker = Telemetry::Worker.new( enabled: @enabled, heartbeat_interval_seconds: heartbeat_interval_seconds, - emitter: @emitter + emitter: Emitter.new ) end @@ -41,7 +40,7 @@ def started! @worker.start - @emitter.request(Event::AppDependenciesLoaded.new) if @dependency_collection + @worker.enqueue(Event::AppDependenciesLoaded.new) if @dependency_collection @started = true end @@ -49,27 +48,28 @@ def started! def emit_closing! return if !@enabled || forked? - @emitter.request(Event::AppClosing.new) + @worker.enqueue(Event::AppClosing.new) end def stop! return if @stopped - @worker.stop(true, 0) + # gracefully stop the worker and send leftover events + @worker.stop @stopped = true end def integrations_change! return if !@enabled || forked? - @emitter.request(Event::AppIntegrationsChange.new) + @worker.enqueue(Event::AppIntegrationsChange.new) end # Report configuration changes caused by Remote Configuration. def client_configuration_change!(changes) return if !@enabled || forked? - @emitter.request(Event::AppClientConfigurationChange.new(changes, 'remote_config')) + @worker.enqueue(Event::AppClientConfigurationChange.new(changes, 'remote_config')) end end end diff --git a/lib/datadog/core/telemetry/worker.rb b/lib/datadog/core/telemetry/worker.rb index a837259c3f9..93fccab9a6c 100644 --- a/lib/datadog/core/telemetry/worker.rb +++ b/lib/datadog/core/telemetry/worker.rb @@ -13,7 +13,15 @@ class Worker include Core::Workers::Queue include Core::Workers::Polling - def initialize(heartbeat_interval_seconds:, emitter:, enabled: true) + DEFAULT_BUFFER_MAX_SIZE = 1000 + + def initialize( + heartbeat_interval_seconds:, + emitter:, + enabled: true, + shutdown_timeout: Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT, + buffer_size: DEFAULT_BUFFER_MAX_SIZE + ) @emitter = emitter @sent_started_event = false @@ -23,6 +31,11 @@ def initialize(heartbeat_interval_seconds:, emitter:, enabled: true) # Workers::IntervalLoop settings self.loop_base_interval = heartbeat_interval_seconds self.fork_policy = Core::Workers::Async::Thread::FORK_POLICY_STOP + + @shutdown_timeout = shutdown_timeout + @buffer_size = buffer_size + + self.buffer = buffer_klass.new(@buffer_size) end def start @@ -32,6 +45,16 @@ def start perform end + def stop(force_stop = false, timeout = @shutdown_timeout) + buffer.close if running? + + super + end + + def enqueue(event) + buffer.push(event) + end + def sent_started_event? @sent_started_event end @@ -85,6 +108,18 @@ def send_event(event) Datadog.logger.debug { "Received response: #{response}" } response end + + def dequeue + buffer.pop + end + + def buffer_klass + if Core::Environment::Ext::RUBY_ENGINE == 'ruby' + Core::Buffer::CRuby + else + Core::Buffer::ThreadSafe + end + end end end end diff --git a/sig/datadog/core/telemetry/client.rbs b/sig/datadog/core/telemetry/client.rbs index e1646cb1eb8..007050163a7 100644 --- a/sig/datadog/core/telemetry/client.rbs +++ b/sig/datadog/core/telemetry/client.rbs @@ -6,14 +6,13 @@ module Datadog @dependency_collection: bool @started: bool @stopped: bool - @emitter: Datadog::Core::Telemetry::Emitter @worker: Datadog::Core::Telemetry::Worker attr_reader enabled: bool include Core::Utils::Forking - def initialize: (heartbeat_interval_seconds: Numeric, dependency_collection: bool, enabled: bool) -> void + def initialize: (heartbeat_interval_seconds: Numeric, dependency_collection: bool, ?enabled: bool) -> void def disable!: () -> void diff --git a/sig/datadog/core/telemetry/worker.rbs b/sig/datadog/core/telemetry/worker.rbs index 64b3de884d3..9220dfeea09 100644 --- a/sig/datadog/core/telemetry/worker.rbs +++ b/sig/datadog/core/telemetry/worker.rbs @@ -8,15 +8,21 @@ module Datadog include Core::Workers::IntervalLoop include Core::Workers::Queue + DEFAULT_BUFFER_MAX_SIZE: 1000 + @emitter: Emitter @sent_started_event: bool + @shutdown_timeout: Integer + @buffer_size: Integer - def initialize: (?enabled: bool, heartbeat_interval_seconds: Numeric, emitter: Emitter) -> void + def initialize: (?enabled: bool, heartbeat_interval_seconds: Numeric, emitter: Emitter, ?shutdown_timeout: Integer, ?buffer_size: Integer) -> void def start: () -> void def sent_started_event?: () -> bool + def enqueue: (Event::Base event) -> void + private def heartbeat!: () -> void @@ -26,6 +32,8 @@ module Datadog def flush_events: (Array[Event::Base] events) -> void def send_event: (Event::Base event) -> Datadog::Core::Telemetry::Http::Adapters::Net::Response + + def buffer_klass: () -> untyped end end end diff --git a/sig/datadog/core/workers/polling.rbs b/sig/datadog/core/workers/polling.rbs index 7f4d8f9c55b..43c1360a92c 100644 --- a/sig/datadog/core/workers/polling.rbs +++ b/sig/datadog/core/workers/polling.rbs @@ -2,7 +2,7 @@ module Datadog module Core module Workers module Polling - SHUTDOWN_TIMEOUT: 1 + DEFAULT_SHUTDOWN_TIMEOUT: 1 def self.included: (Class | Module base) -> void diff --git a/spec/datadog/core/telemetry/client_spec.rb b/spec/datadog/core/telemetry/client_spec.rb index ed5d991feb7..2e1a0a34bcf 100644 --- a/spec/datadog/core/telemetry/client_spec.rb +++ b/spec/datadog/core/telemetry/client_spec.rb @@ -14,15 +14,15 @@ let(:enabled) { true } let(:heartbeat_interval_seconds) { 0 } let(:dependency_collection) { true } - let(:emitter) { double(Datadog::Core::Telemetry::Emitter) } - let(:response) { double(Datadog::Core::Telemetry::Http::Adapters::Net::Response) } + let(:worker) { double(Datadog::Core::Telemetry::Worker) } let(:not_found) { false } before do - allow(Datadog::Core::Telemetry::Emitter).to receive(:new).and_return(emitter) - allow(emitter).to receive(:request).and_return(response) - allow(response).to receive(:not_found?).and_return(not_found) - allow(response).to receive(:ok?).and_return(!not_found) + allow(Datadog::Core::Telemetry::Worker).to receive(:new).and_return(worker) + allow(worker).to receive(:start) + allow(worker).to receive(:enqueue) + allow(worker).to receive(:stop) + allow(worker).to receive(:"enabled=") end describe '#initialize' do @@ -62,6 +62,12 @@ end it { expect { client.disable! }.to change { client.enabled }.from(true).to(false) } + + it 'disables worker' do + client.disable! + + expect(worker).to have_received(:"enabled=").with(false) + end end describe '#started!' do @@ -75,7 +81,8 @@ let(:enabled) { false } it do started! - expect(emitter).to_not have_received(:request) + + expect(worker).to_not have_received(:start) end end @@ -84,13 +91,11 @@ context 'when dependency_collection is true' do it do - dependencies = double - allow(Datadog::Core::Telemetry::Event::AppDependenciesLoaded) - .to receive(:new).with(no_args).and_return(dependencies) - started! - expect(emitter).to have_received(:request).with(dependencies) + expect(worker).to have_received(:enqueue).with( + an_instance_of(Datadog::Core::Telemetry::Event::AppDependenciesLoaded) + ) end end @@ -98,13 +103,9 @@ let(:dependency_collection) { false } it do - dependencies = double - allow(Datadog::Core::Telemetry::Event::AppDependenciesLoaded) - .to receive(:new).with(no_args).and_return(dependencies) - started! - expect(emitter).to_not have_received(:request).with(dependencies) + expect(worker).not_to have_received(:enqueue) end end end @@ -115,8 +116,9 @@ it do client expect_in_fork do - expect(emitter).to_not receive(:request) client.started! + + expect(worker).to_not have_received(:start) end end end @@ -133,21 +135,20 @@ let(:enabled) { false } it do emit_closing! - expect(emitter).to_not have_received(:request) + + expect(worker).not_to have_received(:enqueue) end end context 'when enabled' do let(:enabled) { true } it do - double = double() - allow(Datadog::Core::Telemetry::Event::AppClosing).to receive(:new).with(no_args).and_return(double) - emit_closing! - expect(emitter).to have_received(:request).with(double) - end - it { is_expected.to be(response) } + expect(worker).to have_received(:enqueue).with( + an_instance_of(Datadog::Core::Telemetry::Event::AppClosing) + ) + end end context 'when in fork' do @@ -156,8 +157,9 @@ it do client expect_in_fork do - expect(emitter).to_not receive(:request) client.started! + + expect(worker).not_to have_received(:enqueue) end end end @@ -165,32 +167,12 @@ describe '#stop!' do subject(:stop!) { client.stop! } - let(:worker) { instance_double(Datadog::Core::Telemetry::Worker) } - - before do - allow(Datadog::Core::Telemetry::Worker).to receive(:new) - .with(enabled: enabled, heartbeat_interval_seconds: heartbeat_interval_seconds, emitter: emitter) - .and_return(worker) - allow(worker).to receive(:start) - allow(worker).to receive(:stop) - end - context 'when disabled' do - let(:enabled) { false } - it 'does not raise error' do - stop! - end - end + it 'stops worker once' do + stop! + stop! - context 'when enabled' do - let(:enabled) { true } - - context 'when stop! has been called already' do - it 'does not raise error' do - stop! - stop! - end - end + expect(worker).to have_received(:stop).once end end @@ -205,21 +187,20 @@ let(:enabled) { false } it do integrations_change! - expect(emitter).to_not have_received(:request) + + expect(worker).not_to have_received(:enqueue) end end context 'when enabled' do let(:enabled) { true } it do - double = double() - allow(Datadog::Core::Telemetry::Event::AppIntegrationsChange).to receive(:new).with(no_args).and_return(double) - integrations_change! - expect(emitter).to have_received(:request).with(double) - end - it { is_expected.to be(response) } + expect(worker).to have_received(:enqueue).with( + an_instance_of(Datadog::Core::Telemetry::Event::AppIntegrationsChange) + ) + end end context 'when in fork' do @@ -228,8 +209,9 @@ it do client expect_in_fork do - expect(emitter).to_not receive(:request) client.started! + + expect(worker).not_to have_received(:enqueue) end end end @@ -247,24 +229,20 @@ let(:enabled) { false } it do client_configuration_change! - expect(emitter).to_not have_received(:request) + + expect(worker).not_to have_received(:enqueue) end end context 'when enabled' do let(:enabled) { true } it do - double = double() - allow(Datadog::Core::Telemetry::Event::AppClientConfigurationChange).to receive(:new).with( - changes, - 'remote_config' - ).and_return(double) - client_configuration_change! - expect(emitter).to have_received(:request).with(double) - end - it { is_expected.to be(response) } + expect(worker).to have_received(:enqueue).with( + an_instance_of(Datadog::Core::Telemetry::Event::AppClientConfigurationChange) + ) + end end context 'when in fork' do @@ -273,8 +251,9 @@ it do client expect_in_fork do - expect(emitter).to_not receive(:request) client.started! + + expect(worker).not_to have_received(:enqueue) end end end diff --git a/spec/datadog/core/telemetry/worker_spec.rb b/spec/datadog/core/telemetry/worker_spec.rb index 8b8aba31784..ba389c23801 100644 --- a/spec/datadog/core/telemetry/worker_spec.rb +++ b/spec/datadog/core/telemetry/worker_spec.rb @@ -11,10 +11,34 @@ let(:heartbeat_interval_seconds) { 0.5 } let(:emitter) { double(Datadog::Core::Telemetry::Emitter) } + let(:backend_supports_telemetry?) { true } + let(:response) do + double( + Datadog::Core::Telemetry::Http::Adapters::Net::Response, + not_found?: !backend_supports_telemetry?, + ok?: backend_supports_telemetry? + ) + end + before do logger = double(Datadog::Core::Logger) allow(logger).to receive(:debug).with(any_args) allow(Datadog).to receive(:logger).and_return(logger) + + @received_started = false + @received_heartbeat = false + + allow(emitter).to receive(:request).with(an_instance_of(Datadog::Core::Telemetry::Event::AppStarted)) do + @received_started = true + + response + end + + allow(emitter).to receive(:request).with(an_instance_of(Datadog::Core::Telemetry::Event::AppHeartbeat)) do + @received_heartbeat = true + + response + end end after do @@ -36,31 +60,6 @@ describe '#start' do context 'when enabled' do - let(:response) do - double( - Datadog::Core::Telemetry::Http::Adapters::Net::Response, - not_found?: !backend_supports_telemetry?, - ok?: backend_supports_telemetry? - ) - end - - before do - @received_started = false - @received_heartbeat = false - - allow(emitter).to receive(:request).with(an_instance_of(Datadog::Core::Telemetry::Event::AppStarted)) do - @received_started = true - - response - end - - allow(emitter).to receive(:request).with(an_instance_of(Datadog::Core::Telemetry::Event::AppHeartbeat)) do - @received_heartbeat = true - - response - end - end - context "when backend doesn't support telemetry" do let(:backend_supports_telemetry?) { false } @@ -137,4 +136,24 @@ end end end + + describe '#enqueue' do + it 'adds events to the buffer and flushes them later' do + events_received = 0 + allow(emitter).to receive(:request).with( + an_instance_of(Datadog::Core::Telemetry::Event::AppIntegrationsChange) + ) do + events_received += 1 + end + + worker.start + + events_sent = 3 + events_sent.times do + worker.enqueue(Datadog::Core::Telemetry::Event::AppIntegrationsChange.new) + end + + try_wait_until { events_received == events_sent } + end + end end From 06293a4b0c39b2ff5fba48b70675425f0e9baf62 Mon Sep 17 00:00:00 2001 From: Andrey Marchenko Date: Fri, 14 Jun 2024 14:14:30 +0200 Subject: [PATCH 09/21] rename Telemetry::Client to Telemetry::Component to better reflect its purpose --- lib/datadog/core/configuration/components.rb | 4 +- .../telemetry/{client.rb => component.rb} | 3 +- lib/datadog/core/telemetry/worker.rb | 6 +- .../telemetry/{client.rbs => component.rbs} | 2 +- .../core/configuration/components_spec.rb | 18 +++--- spec/datadog/core/configuration_spec.rb | 20 +++---- .../{client_spec.rb => component_spec.rb} | 56 +++++++++---------- 7 files changed, 54 insertions(+), 55 deletions(-) rename lib/datadog/core/telemetry/{client.rb => component.rb} (98%) rename sig/datadog/core/telemetry/{client.rbs => component.rbs} (97%) rename spec/datadog/core/telemetry/{client_spec.rb => component_spec.rb} (82%) diff --git a/lib/datadog/core/configuration/components.rb b/lib/datadog/core/configuration/components.rb index 46106accc12..665485c6f0c 100644 --- a/lib/datadog/core/configuration/components.rb +++ b/lib/datadog/core/configuration/components.rb @@ -6,7 +6,7 @@ require_relative '../diagnostics/health' require_relative '../logger' require_relative '../runtime/metrics' -require_relative '../telemetry/client' +require_relative '../telemetry/component' require_relative '../workers/runtime_metrics' require_relative '../remote/component' @@ -62,7 +62,7 @@ def build_telemetry(settings, agent_settings, logger) logger.debug { "Telemetry disabled. Agent network adapter not supported: #{agent_settings.adapter}" } end - Telemetry::Client.new( + Telemetry::Component.new( enabled: enabled, heartbeat_interval_seconds: settings.telemetry.heartbeat_interval_seconds, dependency_collection: settings.telemetry.dependency_collection diff --git a/lib/datadog/core/telemetry/client.rb b/lib/datadog/core/telemetry/component.rb similarity index 98% rename from lib/datadog/core/telemetry/client.rb rename to lib/datadog/core/telemetry/component.rb index 05a96e39239..6324f8ac795 100644 --- a/lib/datadog/core/telemetry/client.rb +++ b/lib/datadog/core/telemetry/component.rb @@ -9,7 +9,7 @@ module Datadog module Core module Telemetry # Telemetry entrypoint, coordinates sending telemetry events at various points in app lifecycle. - class Client + class Component attr_reader :enabled include Core::Utils::Forking @@ -39,7 +39,6 @@ def started! return if !@enabled || forked? @worker.start - @worker.enqueue(Event::AppDependenciesLoaded.new) if @dependency_collection @started = true diff --git a/lib/datadog/core/telemetry/worker.rb b/lib/datadog/core/telemetry/worker.rb index 93fccab9a6c..da0bc7dd455 100644 --- a/lib/datadog/core/telemetry/worker.rb +++ b/lib/datadog/core/telemetry/worker.rb @@ -94,11 +94,11 @@ def started! if res.not_found? # Telemetry is only supported by agent versions 7.34 and up Datadog.logger.debug('Agent does not support telemetry; disabling future telemetry events.') self.enabled = false - end - - if res.ok? + elsif res.ok? Datadog.logger.debug('Telemetry app-started event is successfully sent') @sent_started_event = true + else + Datadog.logger.debug('Error sending telemetry app-started event, retry after heartbeat interval...') end end diff --git a/sig/datadog/core/telemetry/client.rbs b/sig/datadog/core/telemetry/component.rbs similarity index 97% rename from sig/datadog/core/telemetry/client.rbs rename to sig/datadog/core/telemetry/component.rbs index 007050163a7..614ac20d691 100644 --- a/sig/datadog/core/telemetry/client.rbs +++ b/sig/datadog/core/telemetry/component.rbs @@ -1,7 +1,7 @@ module Datadog module Core module Telemetry - class Client + class Component @enabled: bool @dependency_collection: bool @started: bool diff --git a/spec/datadog/core/configuration/components_spec.rb b/spec/datadog/core/configuration/components_spec.rb index e04fad93fcf..79c65637f91 100644 --- a/spec/datadog/core/configuration/components_spec.rb +++ b/spec/datadog/core/configuration/components_spec.rb @@ -7,7 +7,7 @@ require 'datadog/core/diagnostics/environment_logger' require 'datadog/core/diagnostics/health' require 'datadog/core/logger' -require 'datadog/core/telemetry/client' +require 'datadog/core/telemetry/component' require 'datadog/core/runtime/metrics' require 'datadog/core/workers/runtime_metrics' require 'datadog/statsd' @@ -33,7 +33,7 @@ let(:profiler_setup_task) { Datadog::Profiling.supported? ? instance_double(Datadog::Profiling::Tasks::Setup) : nil } let(:remote) { instance_double(Datadog::Core::Remote::Component, start: nil, shutdown!: nil) } - let(:telemetry) { instance_double(Datadog::Core::Telemetry::Client) } + let(:telemetry) { instance_double(Datadog::Core::Telemetry::Component) } let(:environment_logger_extra) { { hello: 123, world: '456' } } @@ -46,7 +46,7 @@ end allow(Datadog::Statsd).to receive(:new) { instance_double(Datadog::Statsd) } allow(Datadog::Core::Remote::Component).to receive(:new).and_return(remote) - allow(Datadog::Core::Telemetry::Client).to receive(:new).and_return(telemetry) + allow(Datadog::Core::Telemetry::Component).to receive(:new).and_return(telemetry) end around do |example| @@ -223,7 +223,7 @@ let(:logger) { instance_double(Logger) } context 'given settings' do - let(:telemetry_client) { instance_double(Datadog::Core::Telemetry::Client) } + let(:telemetry) { instance_double(Datadog::Core::Telemetry::Component) } let(:expected_options) do { enabled: enabled, heartbeat_interval_seconds: heartbeat_interval_seconds, dependency_collection: dependency_collection } @@ -233,16 +233,16 @@ let(:dependency_collection) { true } before do - expect(Datadog::Core::Telemetry::Client).to receive(:new).with(expected_options).and_return(telemetry_client) + expect(Datadog::Core::Telemetry::Component).to receive(:new).with(expected_options).and_return(telemetry) allow(settings.telemetry).to receive(:enabled).and_return(enabled) end - it { is_expected.to be(telemetry_client) } + it { is_expected.to be(telemetry) } context 'with :enabled true' do let(:enabled) { double('enabled') } - it { is_expected.to be(telemetry_client) } + it { is_expected.to be(telemetry) } context 'and :unix agent adapter' do let(:expected_options) do @@ -255,7 +255,7 @@ it 'does not enable telemetry for unsupported non-http transport' do expect(logger).to receive(:debug) - is_expected.to be(telemetry_client) + is_expected.to be(telemetry) end end end @@ -1108,7 +1108,7 @@ let(:runtime_metrics) { instance_double(Datadog::Core::Runtime::Metrics, statsd: statsd) } let(:health_metrics) { instance_double(Datadog::Core::Diagnostics::Health::Metrics, statsd: statsd) } let(:statsd) { instance_double(::Datadog::Statsd) } - let(:telemetry) { instance_double(Datadog::Core::Telemetry::Client) } + let(:telemetry) { instance_double(Datadog::Core::Telemetry::Component) } before do allow(replacement).to receive(:tracer).and_return(tracer) diff --git a/spec/datadog/core/configuration_spec.rb b/spec/datadog/core/configuration_spec.rb index 7873d262f70..ace1b5066b9 100644 --- a/spec/datadog/core/configuration_spec.rb +++ b/spec/datadog/core/configuration_spec.rb @@ -8,13 +8,13 @@ RSpec.describe Datadog::Core::Configuration do let(:default_log_level) { ::Logger::INFO } - let(:telemetry_client) { instance_double(Datadog::Core::Telemetry::Client) } + let(:telemetry) { instance_double(Datadog::Core::Telemetry::Component) } before do - allow(telemetry_client).to receive(:started!) - allow(telemetry_client).to receive(:stop!) - allow(telemetry_client).to receive(:emit_closing!) - allow(Datadog::Core::Telemetry::Client).to receive(:new).and_return(telemetry_client) + allow(telemetry).to receive(:started!) + allow(telemetry).to receive(:stop!) + allow(telemetry).to receive(:emit_closing!) + allow(Datadog::Core::Telemetry::Component).to receive(:new).and_return(telemetry) allow(Datadog::Core::Remote::Component).to receive(:build) end @@ -43,7 +43,7 @@ it do # We cannot mix `expect().to_not` with `expect().to(...).ordered`. # One way around that is to force the method to raise an error if it's ever called. - allow(telemetry_client).to receive(:started!).and_raise('Should not be called') + allow(telemetry).to receive(:started!).and_raise('Should not be called') # Components should have changed expect { configure } @@ -84,7 +84,7 @@ .with(test_class.configuration) expect(new_components).to_not have_received(:shutdown!) - expect(telemetry_client).to have_received(:started!) + expect(telemetry).to have_received(:started!) end end end @@ -501,7 +501,7 @@ describe '#components' do context 'when components are not initialized' do it 'initializes the components' do - expect(telemetry_client).to receive(:started!) + expect(telemetry).to receive(:started!) test_class.send(:components) @@ -510,7 +510,7 @@ context 'when allow_initialization is false' do it 'does not initialize the components' do - expect(telemetry_client).to_not receive(:started!) + expect(telemetry).to_not receive(:started!) test_class.send(:components, allow_initialization: false) @@ -527,7 +527,7 @@ it 'returns the components without touching the COMPONENTS_WRITE_LOCK' do described_class.const_get(:COMPONENTS_WRITE_LOCK).lock - expect(telemetry_client).to_not receive(:started!) + expect(telemetry).to_not receive(:started!) expect(test_class.send(:components)).to_not be_nil end end diff --git a/spec/datadog/core/telemetry/client_spec.rb b/spec/datadog/core/telemetry/component_spec.rb similarity index 82% rename from spec/datadog/core/telemetry/client_spec.rb rename to spec/datadog/core/telemetry/component_spec.rb index 2e1a0a34bcf..5d1f7f013f7 100644 --- a/spec/datadog/core/telemetry/client_spec.rb +++ b/spec/datadog/core/telemetry/component_spec.rb @@ -1,9 +1,9 @@ require 'spec_helper' -require 'datadog/core/telemetry/client' +require 'datadog/core/telemetry/component' -RSpec.describe Datadog::Core::Telemetry::Client do - subject(:client) do +RSpec.describe Datadog::Core::Telemetry::Component do + subject(:telemetry) do described_class.new( enabled: enabled, heartbeat_interval_seconds: heartbeat_interval_seconds, @@ -27,11 +27,11 @@ describe '#initialize' do after do - client.stop! + telemetry.stop! end context 'with default parameters' do - subject(:client) do + subject(:telemetry) do described_class.new( heartbeat_interval_seconds: heartbeat_interval_seconds, dependency_collection: dependency_collection @@ -39,42 +39,42 @@ end it { is_expected.to be_a_kind_of(described_class) } - it { expect(client.enabled).to be(true) } + it { expect(telemetry.enabled).to be(true) } end context 'when :enabled is false' do let(:enabled) { false } it { is_expected.to be_a_kind_of(described_class) } - it { expect(client.enabled).to be(false) } + it { expect(telemetry.enabled).to be(false) } end context 'when enabled' do let(:enabled) { true } it { is_expected.to be_a_kind_of(described_class) } - it { expect(client.enabled).to be(true) } + it { expect(telemetry.enabled).to be(true) } end end describe '#disable!' do after do - client.stop! + telemetry.stop! end - it { expect { client.disable! }.to change { client.enabled }.from(true).to(false) } + it { expect { telemetry.disable! }.to change { telemetry.enabled }.from(true).to(false) } it 'disables worker' do - client.disable! + telemetry.disable! expect(worker).to have_received(:"enabled=").with(false) end end describe '#started!' do - subject(:started!) { client.started! } + subject(:started!) { telemetry.started! } after do - client.stop! + telemetry.stop! end context 'when disabled' do @@ -114,9 +114,9 @@ before { skip 'Fork not supported on current platform' unless Process.respond_to?(:fork) } it do - client + telemetry expect_in_fork do - client.started! + telemetry.started! expect(worker).to_not have_received(:start) end @@ -125,10 +125,10 @@ end describe '#emit_closing!' do - subject(:emit_closing!) { client.emit_closing! } + subject(:emit_closing!) { telemetry.emit_closing! } after do - client.stop! + telemetry.stop! end context 'when disabled' do @@ -155,9 +155,9 @@ before { skip 'Fork not supported on current platform' unless Process.respond_to?(:fork) } it do - client + telemetry expect_in_fork do - client.started! + telemetry.started! expect(worker).not_to have_received(:enqueue) end @@ -166,7 +166,7 @@ end describe '#stop!' do - subject(:stop!) { client.stop! } + subject(:stop!) { telemetry.stop! } it 'stops worker once' do stop! @@ -177,10 +177,10 @@ end describe '#integrations_change!' do - subject(:integrations_change!) { client.integrations_change! } + subject(:integrations_change!) { telemetry.integrations_change! } after do - client.stop! + telemetry.stop! end context 'when disabled' do @@ -207,9 +207,9 @@ before { skip 'Fork not supported on current platform' unless Process.respond_to?(:fork) } it do - client + telemetry expect_in_fork do - client.started! + telemetry.started! expect(worker).not_to have_received(:enqueue) end @@ -218,11 +218,11 @@ end describe '#client_configuration_change!' do - subject(:client_configuration_change!) { client.client_configuration_change!(changes) } + subject(:client_configuration_change!) { telemetry.client_configuration_change!(changes) } let(:changes) { double('changes') } after do - client.stop! + telemetry.stop! end context 'when disabled' do @@ -249,9 +249,9 @@ before { skip 'Fork not supported on current platform' unless Process.respond_to?(:fork) } it do - client + telemetry expect_in_fork do - client.started! + telemetry.started! expect(worker).not_to have_received(:enqueue) end From d31a0883668505b78b92a5077e19730677f5b42c Mon Sep 17 00:00:00 2001 From: Andrey Marchenko Date: Fri, 14 Jun 2024 14:24:44 +0200 Subject: [PATCH 10/21] leftover of telemetry component rename --- spec/datadog/tracing/contrib/extensions_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/datadog/tracing/contrib/extensions_spec.rb b/spec/datadog/tracing/contrib/extensions_spec.rb index d53dd66c0e8..461fd09852f 100644 --- a/spec/datadog/tracing/contrib/extensions_spec.rb +++ b/spec/datadog/tracing/contrib/extensions_spec.rb @@ -46,7 +46,7 @@ end it 'sends a telemetry integrations change event' do - expect_any_instance_of(Datadog::Core::Telemetry::Client).to receive(:integrations_change!) + expect_any_instance_of(Datadog::Core::Telemetry::Component).to receive(:integrations_change!) configure end end From 4f2aab8dbd215c061fb25d96a1d9a35f96783662 Mon Sep 17 00:00:00 2001 From: Andrey Marchenko Date: Mon, 17 Jun 2024 12:37:58 +0200 Subject: [PATCH 11/21] add Core::Utils::OnlyOnceSuccessful to execute code with only one success --- .../core/utils/only_once_successful.rb | 34 +++++++ sig/datadog/core/utils/only_once.rbs | 3 + .../core/utils/only_once_successful.rbs | 8 ++ .../core/utils/only_once_successful_spec.rb | 95 +++++++++++++++++++ 4 files changed, 140 insertions(+) create mode 100644 lib/datadog/core/utils/only_once_successful.rb create mode 100644 sig/datadog/core/utils/only_once_successful.rbs create mode 100644 spec/datadog/core/utils/only_once_successful_spec.rb diff --git a/lib/datadog/core/utils/only_once_successful.rb b/lib/datadog/core/utils/only_once_successful.rb new file mode 100644 index 00000000000..e5ac05d304b --- /dev/null +++ b/lib/datadog/core/utils/only_once_successful.rb @@ -0,0 +1,34 @@ +# frozen_string_literal: true + +require_relative 'only_once' + +module Datadog + module Core + module Utils + # Helper class to execute something with only one success. + # + # This is useful for cases where we want to ensure that a block of code is only executed once, and only if it + # succeeds. One such example is sending app-started telemetry event. + # + # Successful execution is determined by the return value of the block: any truthy value is considered success. + # + # Thread-safe when used correctly (e.g. be careful of races when lazily initializing instances of this class). + # + # Note: In its current state, this class is not Ractor-safe. + # In https://github.com/DataDog/dd-trace-rb/pull/1398#issuecomment-797378810 we have a discussion of alternatives, + # including an alternative implementation that is Ractor-safe once spent. + class OnlyOnceSuccessful < OnlyOnce + def run + @mutex.synchronize do + return if @ran_once + + result = yield + @ran_once = !!result + + result + end + end + end + end + end +end diff --git a/sig/datadog/core/utils/only_once.rbs b/sig/datadog/core/utils/only_once.rbs index 324b1ba72e7..354334c71e1 100644 --- a/sig/datadog/core/utils/only_once.rbs +++ b/sig/datadog/core/utils/only_once.rbs @@ -2,6 +2,9 @@ module Datadog module Core module Utils class OnlyOnce + @ran_once: bool + @mutex: Thread::Mutex + def initialize: () -> untyped def run: () { () -> untyped } -> untyped diff --git a/sig/datadog/core/utils/only_once_successful.rbs b/sig/datadog/core/utils/only_once_successful.rbs new file mode 100644 index 00000000000..81d24fc9094 --- /dev/null +++ b/sig/datadog/core/utils/only_once_successful.rbs @@ -0,0 +1,8 @@ +module Datadog + module Core + module Utils + class OnlyOnceSuccessful < Datadog::Core::Utils::OnlyOnce + end + end + end +end diff --git a/spec/datadog/core/utils/only_once_successful_spec.rb b/spec/datadog/core/utils/only_once_successful_spec.rb new file mode 100644 index 00000000000..bf06ff5d60c --- /dev/null +++ b/spec/datadog/core/utils/only_once_successful_spec.rb @@ -0,0 +1,95 @@ +require 'datadog/core/utils/only_once_successful' + +RSpec.describe Datadog::Core::Utils::OnlyOnceSuccessful do + subject(:only_once_successful) { described_class.new } + + describe '#run' do + context 'before running once' do + it do + expect { |block| only_once_successful.run(&block) }.to yield_control + end + + it 'returns the result of the block ran' do + expect(only_once_successful.run { :result }).to be :result + end + end + + context 'after running once' do + let(:result) { nil } + + before do + only_once_successful.run { result } + end + + context 'when block returns truthy value' do + let(:result) { true } + + it do + expect { |block| only_once_successful.run(&block) }.to_not yield_control + end + + it do + expect(only_once_successful.run { :result }).to be nil + end + end + + context 'when block returns falsey value' do + let(:result) { false } + + it do + expect { |block| only_once_successful.run(&block) }.to yield_control + end + + it 'runs again until block returns truthy value' do + expect(only_once_successful.run { :result }).to be :result + + expect(only_once_successful.run { :result }).to be nil + end + end + end + + context 'when run throws an exception' do + it 'propagates the exception out' do + exception = RuntimeError.new('boom') + + expect { only_once_successful.run { raise exception } }.to raise_exception(exception) + end + + it 'runs again' do + only_once_successful.run { raise 'boom' } rescue nil + + expect { |block| only_once_successful.run(&block) }.to yield_control + end + end + end + + describe '#ran?' do + context 'before running once' do + it do + expect(only_once_successful.ran?).to be false + end + end + + context 'after running once' do + let(:result) { nil } + + before do + only_once_successful.run { result } + end + + context 'when block returns truthy value' do + let(:result) { true } + + it do + expect(only_once_successful.ran?).to be true + end + end + + context 'when block returns falsey value' do + it do + expect(only_once_successful.ran?).to be false + end + end + end + end +end From 8b7a50e7c3203dfd65da9c54fc7856a850857997 Mon Sep 17 00:00:00 2001 From: Andrey Marchenko Date: Mon, 17 Jun 2024 14:09:57 +0200 Subject: [PATCH 12/21] ensure that app-started event is sent at most once, flush events before stopping worker --- lib/datadog/core/telemetry/component.rb | 17 ++-- lib/datadog/core/telemetry/worker.rb | 43 ++++++---- sig/datadog/core/telemetry/worker.rbs | 5 ++ spec/datadog/core/telemetry/component_spec.rb | 4 +- spec/datadog/core/telemetry/worker_spec.rb | 78 ++++++++++++++++++- 5 files changed, 116 insertions(+), 31 deletions(-) diff --git a/lib/datadog/core/telemetry/component.rb b/lib/datadog/core/telemetry/component.rb index 6324f8ac795..f457a434ce4 100644 --- a/lib/datadog/core/telemetry/component.rb +++ b/lib/datadog/core/telemetry/component.rb @@ -28,6 +28,7 @@ def initialize(heartbeat_interval_seconds:, dependency_collection:, enabled: tru heartbeat_interval_seconds: heartbeat_interval_seconds, emitter: Emitter.new ) + @worker.start end def disable! @@ -38,26 +39,24 @@ def disable! def started! return if !@enabled || forked? - @worker.start @worker.enqueue(Event::AppDependenciesLoaded.new) if @dependency_collection @started = true end - def emit_closing! - return if !@enabled || forked? - - @worker.enqueue(Event::AppClosing.new) - end - def stop! return if @stopped - # gracefully stop the worker and send leftover events - @worker.stop + @worker.stop(true) @stopped = true end + def emit_closing! + return if !@enabled || forked? + + @worker.enqueue(Event::AppClosing.new) + end + def integrations_change! return if !@enabled || forked? diff --git a/lib/datadog/core/telemetry/worker.rb b/lib/datadog/core/telemetry/worker.rb index da0bc7dd455..8ba8e8723b1 100644 --- a/lib/datadog/core/telemetry/worker.rb +++ b/lib/datadog/core/telemetry/worker.rb @@ -2,6 +2,7 @@ require_relative 'event' +require_relative '../utils/only_once_successful' require_relative '../workers/polling' require_relative '../workers/queue' @@ -15,6 +16,8 @@ class Worker DEFAULT_BUFFER_MAX_SIZE = 1000 + TELEMETRY_STARTED_ONCE = Utils::OnlyOnceSuccessful.new + def initialize( heartbeat_interval_seconds:, emitter:, @@ -24,8 +27,6 @@ def initialize( ) @emitter = emitter - @sent_started_event = false - # Workers::Polling settings self.enabled = enabled # Workers::IntervalLoop settings @@ -48,6 +49,8 @@ def start def stop(force_stop = false, timeout = @shutdown_timeout) buffer.close if running? + flush_events(dequeue) if work_pending? + super end @@ -56,7 +59,7 @@ def enqueue(event) end def sent_started_event? - @sent_started_event + TELEMETRY_STARTED_ONCE.ran? end private @@ -89,24 +92,25 @@ def heartbeat! def started! return unless enabled? - res = send_event(Event::AppStarted.new) + TELEMETRY_STARTED_ONCE.run do + res = send_event(Event::AppStarted.new) - if res.not_found? # Telemetry is only supported by agent versions 7.34 and up - Datadog.logger.debug('Agent does not support telemetry; disabling future telemetry events.') - self.enabled = false - elsif res.ok? - Datadog.logger.debug('Telemetry app-started event is successfully sent') - @sent_started_event = true - else - Datadog.logger.debug('Error sending telemetry app-started event, retry after heartbeat interval...') + if res.ok? + Datadog.logger.debug('Telemetry app-started event is successfully sent') + true + else + Datadog.logger.debug('Error sending telemetry app-started event, retry after heartbeat interval...') + false + end end end def send_event(event) - Datadog.logger.debug { "Sending telemetry event: #{event}" } - response = @emitter.request(event) - Datadog.logger.debug { "Received response: #{response}" } - response + res = @emitter.request(event) + + disable_on_not_found!(res) + + res end def dequeue @@ -120,6 +124,13 @@ def buffer_klass Core::Buffer::ThreadSafe end end + + def disable_on_not_found!(response) + return unless response.not_found? + + Datadog.logger.debug('Agent does not support telemetry; disabling future telemetry events.') + self.enabled = false + end end end end diff --git a/sig/datadog/core/telemetry/worker.rbs b/sig/datadog/core/telemetry/worker.rbs index 9220dfeea09..b804a19664d 100644 --- a/sig/datadog/core/telemetry/worker.rbs +++ b/sig/datadog/core/telemetry/worker.rbs @@ -8,6 +8,7 @@ module Datadog include Core::Workers::IntervalLoop include Core::Workers::Queue + TELEMETRY_STARTED_ONCE: Datadog::Core::Utils::OnlyOnceSuccessful DEFAULT_BUFFER_MAX_SIZE: 1000 @emitter: Emitter @@ -23,6 +24,8 @@ module Datadog def enqueue: (Event::Base event) -> void + def dequeue: () -> Array[Event::Base] + private def heartbeat!: () -> void @@ -33,6 +36,8 @@ module Datadog def send_event: (Event::Base event) -> Datadog::Core::Telemetry::Http::Adapters::Net::Response + def disable_on_not_found!: (Datadog::Core::Telemetry::Http::Adapters::Net::Response response) -> void + def buffer_klass: () -> untyped end end diff --git a/spec/datadog/core/telemetry/component_spec.rb b/spec/datadog/core/telemetry/component_spec.rb index 5d1f7f013f7..33f4544ec6e 100644 --- a/spec/datadog/core/telemetry/component_spec.rb +++ b/spec/datadog/core/telemetry/component_spec.rb @@ -82,7 +82,7 @@ it do started! - expect(worker).to_not have_received(:start) + expect(worker).to_not have_received(:enqueue) end end @@ -118,7 +118,7 @@ expect_in_fork do telemetry.started! - expect(worker).to_not have_received(:start) + expect(worker).to_not have_received(:enqueue) end end end diff --git a/spec/datadog/core/telemetry/worker_spec.rb b/spec/datadog/core/telemetry/worker_spec.rb index ba389c23801..6008cefd892 100644 --- a/spec/datadog/core/telemetry/worker_spec.rb +++ b/spec/datadog/core/telemetry/worker_spec.rb @@ -42,8 +42,10 @@ end after do - worker.stop(true, 0) + worker.stop(true) worker.join + + Datadog::Core::Telemetry::Worker::TELEMETRY_STARTED_ONCE.send(:reset_ran_once_state_for_tests) end describe '.new' do @@ -97,19 +99,19 @@ end it 'always sends heartbeat event after started event' do - @sent_hearbeat = false + sent_hearbeat = false allow(emitter).to receive(:request).with(kind_of(Datadog::Core::Telemetry::Event::AppHeartbeat)) do # app-started was already sent by now expect(worker.sent_started_event?).to be(true) - @sent_hearbeat = true + sent_hearbeat = true response end worker.start - try_wait_until { @sent_hearbeat } + try_wait_until { sent_hearbeat } end end @@ -124,6 +126,42 @@ expect(@received_heartbeat).to be(false) end end + + context 'several workers running' do + it 'sends single started event' do + started_events = 0 + allow(emitter).to receive(:request).with(kind_of(Datadog::Core::Telemetry::Event::AppStarted)) do + started_events += 1 + + response + end + + heartbeat_events = 0 + allow(emitter).to receive(:request).with(kind_of(Datadog::Core::Telemetry::Event::AppHeartbeat)) do + heartbeat_events += 1 + + response + end + + workers = Array.new(3) do + described_class.new( + enabled: enabled, + heartbeat_interval_seconds: heartbeat_interval_seconds, + emitter: emitter + ) + end + workers.each(&:start) + + try_wait_until { heartbeat_events >= 3 } + + expect(started_events).to be(1) + + workers.each do |w| + w.stop(true, 0) + w.join + end + end + end end context 'when disabled' do @@ -137,6 +175,36 @@ end end + describe '#stop' do + let(:heartbeat_interval_seconds) { 3 } + + it 'flushes events and stops the worker' do + events_received = 0 + allow(emitter).to receive(:request).with( + an_instance_of(Datadog::Core::Telemetry::Event::AppIntegrationsChange) + ) do + events_received += 1 + + response + end + + worker.start + + worker.enqueue(Datadog::Core::Telemetry::Event::AppIntegrationsChange.new) + worker.stop(true) + + try_wait_until { !worker.running? } + + expect(worker).to have_attributes( + enabled?: true, + loop_base_interval: heartbeat_interval_seconds, + run_async?: false, + running?: false, + started?: true + ) + end + end + describe '#enqueue' do it 'adds events to the buffer and flushes them later' do events_received = 0 @@ -144,6 +212,8 @@ an_instance_of(Datadog::Core::Telemetry::Event::AppIntegrationsChange) ) do events_received += 1 + + response end worker.start From 69adca1242c97f25a502feb2b144974d2dabe8b8 Mon Sep 17 00:00:00 2001 From: Andrey Marchenko Date: Mon, 17 Jun 2024 16:04:27 +0200 Subject: [PATCH 13/21] remove Telemetry::Component.started! as right now it just contains dependency collection event logic, move it to the Worker --- lib/datadog/core/configuration.rb | 20 +----- lib/datadog/core/telemetry/component.rb | 13 +--- lib/datadog/core/telemetry/worker.rb | 5 ++ sig/datadog/core/telemetry/component.rbs | 4 -- sig/datadog/core/telemetry/worker.rbs | 3 +- spec/datadog/core/configuration_spec.rb | 11 --- spec/datadog/core/telemetry/component_spec.rb | 68 ++----------------- spec/datadog/core/telemetry/worker_spec.rb | 31 ++++++++- 8 files changed, 48 insertions(+), 107 deletions(-) diff --git a/lib/datadog/core/configuration.rb b/lib/datadog/core/configuration.rb index bcaa6432564..19d3ff27110 100644 --- a/lib/datadog/core/configuration.rb +++ b/lib/datadog/core/configuration.rb @@ -84,23 +84,16 @@ def configure configuration = self.configuration yield(configuration) - built_components = false - - components = safely_synchronize do |write_components| + safely_synchronize do |write_components| write_components.call( if components? replace_components!(configuration, @components) else - components = build_components(configuration) - built_components = true - components + build_components(configuration) end ) end - # Should only be called the first time components are built - components.telemetry.started! if built_components - configuration end @@ -200,20 +193,13 @@ def components(allow_initialization: true) current_components = COMPONENTS_READ_LOCK.synchronize { defined?(@components) && @components } return current_components if current_components || !allow_initialization - built_components = false - - components = safely_synchronize do |write_components| + safely_synchronize do |write_components| if defined?(@components) && @components @components else - built_components = true write_components.call(build_components(configuration)) end end - - # Should only be called the first time components are built - components&.telemetry&.started! if built_components - components end private diff --git a/lib/datadog/core/telemetry/component.rb b/lib/datadog/core/telemetry/component.rb index f457a434ce4..0d5046e4391 100644 --- a/lib/datadog/core/telemetry/component.rb +++ b/lib/datadog/core/telemetry/component.rb @@ -20,13 +20,12 @@ class Component def initialize(heartbeat_interval_seconds:, dependency_collection:, enabled: true) @enabled = enabled @stopped = false - @started = false - @dependency_collection = dependency_collection @worker = Telemetry::Worker.new( enabled: @enabled, heartbeat_interval_seconds: heartbeat_interval_seconds, - emitter: Emitter.new + emitter: Emitter.new, + dependency_collection: dependency_collection ) @worker.start end @@ -36,14 +35,6 @@ def disable! @worker.enabled = false end - def started! - return if !@enabled || forked? - - @worker.enqueue(Event::AppDependenciesLoaded.new) if @dependency_collection - - @started = true - end - def stop! return if @stopped diff --git a/lib/datadog/core/telemetry/worker.rb b/lib/datadog/core/telemetry/worker.rb index 8ba8e8723b1..f2bd7821b4c 100644 --- a/lib/datadog/core/telemetry/worker.rb +++ b/lib/datadog/core/telemetry/worker.rb @@ -21,11 +21,13 @@ class Worker def initialize( heartbeat_interval_seconds:, emitter:, + dependency_collection:, enabled: true, shutdown_timeout: Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT, buffer_size: DEFAULT_BUFFER_MAX_SIZE ) @emitter = emitter + @dependency_collection = dependency_collection # Workers::Polling settings self.enabled = enabled @@ -97,6 +99,9 @@ def started! if res.ok? Datadog.logger.debug('Telemetry app-started event is successfully sent') + + enqueue(Event::AppDependenciesLoaded.new) if @dependency_collection + true else Datadog.logger.debug('Error sending telemetry app-started event, retry after heartbeat interval...') diff --git a/sig/datadog/core/telemetry/component.rbs b/sig/datadog/core/telemetry/component.rbs index 614ac20d691..4d411d32578 100644 --- a/sig/datadog/core/telemetry/component.rbs +++ b/sig/datadog/core/telemetry/component.rbs @@ -3,8 +3,6 @@ module Datadog module Telemetry class Component @enabled: bool - @dependency_collection: bool - @started: bool @stopped: bool @worker: Datadog::Core::Telemetry::Worker @@ -18,8 +16,6 @@ module Datadog def client_configuration_change!: (Enumerable[[String, Numeric | bool | String]] changes) -> void - def started!: () -> void - def emit_closing!: () -> void def stop!: () -> void diff --git a/sig/datadog/core/telemetry/worker.rbs b/sig/datadog/core/telemetry/worker.rbs index b804a19664d..01c5107e990 100644 --- a/sig/datadog/core/telemetry/worker.rbs +++ b/sig/datadog/core/telemetry/worker.rbs @@ -15,8 +15,9 @@ module Datadog @sent_started_event: bool @shutdown_timeout: Integer @buffer_size: Integer + @dependency_collection: bool - def initialize: (?enabled: bool, heartbeat_interval_seconds: Numeric, emitter: Emitter, ?shutdown_timeout: Integer, ?buffer_size: Integer) -> void + def initialize: (?enabled: bool, heartbeat_interval_seconds: Numeric, emitter: Emitter, ?shutdown_timeout: Integer, ?buffer_size: Integer, dependency_collection: bool) -> void def start: () -> void diff --git a/spec/datadog/core/configuration_spec.rb b/spec/datadog/core/configuration_spec.rb index ace1b5066b9..51a2b44cb01 100644 --- a/spec/datadog/core/configuration_spec.rb +++ b/spec/datadog/core/configuration_spec.rb @@ -11,7 +11,6 @@ let(:telemetry) { instance_double(Datadog::Core::Telemetry::Component) } before do - allow(telemetry).to receive(:started!) allow(telemetry).to receive(:stop!) allow(telemetry).to receive(:emit_closing!) allow(Datadog::Core::Telemetry::Component).to receive(:new).and_return(telemetry) @@ -41,10 +40,6 @@ end it do - # We cannot mix `expect().to_not` with `expect().to(...).ordered`. - # One way around that is to force the method to raise an error if it's ever called. - allow(telemetry).to receive(:started!).and_raise('Should not be called') - # Components should have changed expect { configure } .to change { test_class.send(:components) } @@ -84,7 +79,6 @@ .with(test_class.configuration) expect(new_components).to_not have_received(:shutdown!) - expect(telemetry).to have_received(:started!) end end end @@ -501,8 +495,6 @@ describe '#components' do context 'when components are not initialized' do it 'initializes the components' do - expect(telemetry).to receive(:started!) - test_class.send(:components) expect(test_class.send(:components?)).to be true @@ -510,8 +502,6 @@ context 'when allow_initialization is false' do it 'does not initialize the components' do - expect(telemetry).to_not receive(:started!) - test_class.send(:components, allow_initialization: false) expect(test_class.send(:components?)).to be false @@ -527,7 +517,6 @@ it 'returns the components without touching the COMPONENTS_WRITE_LOCK' do described_class.const_get(:COMPONENTS_WRITE_LOCK).lock - expect(telemetry).to_not receive(:started!) expect(test_class.send(:components)).to_not be_nil end end diff --git a/spec/datadog/core/telemetry/component_spec.rb b/spec/datadog/core/telemetry/component_spec.rb index 33f4544ec6e..ba17d37c2f4 100644 --- a/spec/datadog/core/telemetry/component_spec.rb +++ b/spec/datadog/core/telemetry/component_spec.rb @@ -18,7 +18,13 @@ let(:not_found) { false } before do - allow(Datadog::Core::Telemetry::Worker).to receive(:new).and_return(worker) + allow(Datadog::Core::Telemetry::Worker).to receive(:new).with( + heartbeat_interval_seconds: heartbeat_interval_seconds, + dependency_collection: dependency_collection, + enabled: enabled, + emitter: an_instance_of(Datadog::Core::Telemetry::Emitter) + ).and_return(worker) + allow(worker).to receive(:start) allow(worker).to receive(:enqueue) allow(worker).to receive(:stop) @@ -70,60 +76,6 @@ end end - describe '#started!' do - subject(:started!) { telemetry.started! } - - after do - telemetry.stop! - end - - context 'when disabled' do - let(:enabled) { false } - it do - started! - - expect(worker).to_not have_received(:enqueue) - end - end - - context 'when enabled' do - let(:enabled) { true } - - context 'when dependency_collection is true' do - it do - started! - - expect(worker).to have_received(:enqueue).with( - an_instance_of(Datadog::Core::Telemetry::Event::AppDependenciesLoaded) - ) - end - end - - context 'when dependency_collection is false' do - let(:dependency_collection) { false } - - it do - started! - - expect(worker).not_to have_received(:enqueue) - end - end - end - - context 'when in fork' do - before { skip 'Fork not supported on current platform' unless Process.respond_to?(:fork) } - - it do - telemetry - expect_in_fork do - telemetry.started! - - expect(worker).to_not have_received(:enqueue) - end - end - end - end - describe '#emit_closing!' do subject(:emit_closing!) { telemetry.emit_closing! } @@ -157,8 +109,6 @@ it do telemetry expect_in_fork do - telemetry.started! - expect(worker).not_to have_received(:enqueue) end end @@ -209,8 +159,6 @@ it do telemetry expect_in_fork do - telemetry.started! - expect(worker).not_to have_received(:enqueue) end end @@ -251,8 +199,6 @@ it do telemetry expect_in_fork do - telemetry.started! - expect(worker).not_to have_received(:enqueue) end end diff --git a/spec/datadog/core/telemetry/worker_spec.rb b/spec/datadog/core/telemetry/worker_spec.rb index 6008cefd892..6a1d5ca055f 100644 --- a/spec/datadog/core/telemetry/worker_spec.rb +++ b/spec/datadog/core/telemetry/worker_spec.rb @@ -4,12 +4,18 @@ RSpec.describe Datadog::Core::Telemetry::Worker do subject(:worker) do - described_class.new(enabled: enabled, heartbeat_interval_seconds: heartbeat_interval_seconds, emitter: emitter) + described_class.new( + enabled: enabled, + heartbeat_interval_seconds: heartbeat_interval_seconds, + emitter: emitter, + dependency_collection: dependency_collection + ) end let(:enabled) { true } let(:heartbeat_interval_seconds) { 0.5 } let(:emitter) { double(Datadog::Core::Telemetry::Emitter) } + let(:dependency_collection) { false } let(:backend_supports_telemetry?) { true } let(:response) do @@ -113,6 +119,26 @@ try_wait_until { sent_hearbeat } end + + context 'when dependencies collection enabled' do + let(:dependency_collection) { true } + + it 'sends dependencies loaded event after started event' do + sent_dependencies = false + allow(emitter).to receive(:request).with(kind_of(Datadog::Core::Telemetry::Event::AppDependenciesLoaded)) do + # app-started was already sent by now + expect(worker.sent_started_event?).to be(true) + + sent_dependencies = true + + response + end + + worker.start + + try_wait_until { sent_dependencies } + end + end end context 'when internal error returned by emitter' do @@ -147,7 +173,8 @@ described_class.new( enabled: enabled, heartbeat_interval_seconds: heartbeat_interval_seconds, - emitter: emitter + emitter: emitter, + dependency_collection: dependency_collection ) end workers.each(&:start) From 440746abadf8404595a12f33b586c3003c7558ab Mon Sep 17 00:00:00 2001 From: Andrey Marchenko Date: Mon, 17 Jun 2024 16:12:35 +0200 Subject: [PATCH 14/21] change the wrong expectation in workers spec --- spec/datadog/core/telemetry/worker_spec.rb | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/spec/datadog/core/telemetry/worker_spec.rb b/spec/datadog/core/telemetry/worker_spec.rb index 6a1d5ca055f..ad5885abca8 100644 --- a/spec/datadog/core/telemetry/worker_spec.rb +++ b/spec/datadog/core/telemetry/worker_spec.rb @@ -222,13 +222,7 @@ try_wait_until { !worker.running? } - expect(worker).to have_attributes( - enabled?: true, - loop_base_interval: heartbeat_interval_seconds, - run_async?: false, - running?: false, - started?: true - ) + expect(events_received).to eq(1) end end From 76bfd9fc5bd1cd162909629bc3f48b351acaa3e6 Mon Sep 17 00:00:00 2001 From: Andrey Marchenko Date: Mon, 17 Jun 2024 16:48:53 +0200 Subject: [PATCH 15/21] send app-dependencies-loaded event right after app-started event in the same thread --- lib/datadog/core/telemetry/worker.rb | 2 +- spec/datadog/core/telemetry/worker_spec.rb | 18 +++--------------- 2 files changed, 4 insertions(+), 16 deletions(-) diff --git a/lib/datadog/core/telemetry/worker.rb b/lib/datadog/core/telemetry/worker.rb index f2bd7821b4c..2021e139b0a 100644 --- a/lib/datadog/core/telemetry/worker.rb +++ b/lib/datadog/core/telemetry/worker.rb @@ -100,7 +100,7 @@ def started! if res.ok? Datadog.logger.debug('Telemetry app-started event is successfully sent') - enqueue(Event::AppDependenciesLoaded.new) if @dependency_collection + send_event(Event::AppDependenciesLoaded.new) if @dependency_collection true else diff --git a/spec/datadog/core/telemetry/worker_spec.rb b/spec/datadog/core/telemetry/worker_spec.rb index ad5885abca8..35a01e46834 100644 --- a/spec/datadog/core/telemetry/worker_spec.rb +++ b/spec/datadog/core/telemetry/worker_spec.rb @@ -127,7 +127,8 @@ sent_dependencies = false allow(emitter).to receive(:request).with(kind_of(Datadog::Core::Telemetry::Event::AppDependenciesLoaded)) do # app-started was already sent by now - expect(worker.sent_started_event?).to be(true) + # don't use worker.sent_started_event? because it uses the same lock + expect(@received_started).to be(true) sent_dependencies = true @@ -206,23 +207,10 @@ let(:heartbeat_interval_seconds) { 3 } it 'flushes events and stops the worker' do - events_received = 0 - allow(emitter).to receive(:request).with( - an_instance_of(Datadog::Core::Telemetry::Event::AppIntegrationsChange) - ) do - events_received += 1 - - response - end - worker.start - worker.enqueue(Datadog::Core::Telemetry::Event::AppIntegrationsChange.new) + expect(worker).to receive(:flush_events).at_least(:once) worker.stop(true) - - try_wait_until { !worker.running? } - - expect(events_received).to eq(1) end end From cd239cb88dd642866c2e93f1eb1c8f10681cc4eb Mon Sep 17 00:00:00 2001 From: Andrey Marchenko Date: Fri, 21 Jun 2024 09:48:03 +0200 Subject: [PATCH 16/21] add limit option to OnlyOnceSuccessful util --- .../core/utils/only_once_successful.rb | 34 ++++ .../core/utils/only_once_successful.rbs | 15 ++ .../core/utils/only_once_successful_spec.rb | 176 ++++++++++++++++-- 3 files changed, 206 insertions(+), 19 deletions(-) diff --git a/lib/datadog/core/utils/only_once_successful.rb b/lib/datadog/core/utils/only_once_successful.rb index e5ac05d304b..4209cafc1ef 100644 --- a/lib/datadog/core/utils/only_once_successful.rb +++ b/lib/datadog/core/utils/only_once_successful.rb @@ -18,6 +18,14 @@ module Utils # In https://github.com/DataDog/dd-trace-rb/pull/1398#issuecomment-797378810 we have a discussion of alternatives, # including an alternative implementation that is Ractor-safe once spent. class OnlyOnceSuccessful < OnlyOnce + def initialize(limit = 0) + super() + + @limit = limit + @failed = false + @retries = 0 + end + def run @mutex.synchronize do return if @ran_once @@ -25,9 +33,35 @@ def run result = yield @ran_once = !!result + if !@ran_once && limited? + @retries += 1 + check_limit! + end + result end end + + def success? + @mutex.synchronize { @ran_once && !@failed } + end + + def failed? + @mutex.synchronize { @ran_once && @failed } + end + + private + + def check_limit! + if @retries >= @limit + @failed = true + @ran_once = true + end + end + + def limited? + !@limit.nil? && @limit.positive? + end end end end diff --git a/sig/datadog/core/utils/only_once_successful.rbs b/sig/datadog/core/utils/only_once_successful.rbs index 81d24fc9094..2236b5a66b5 100644 --- a/sig/datadog/core/utils/only_once_successful.rbs +++ b/sig/datadog/core/utils/only_once_successful.rbs @@ -2,6 +2,21 @@ module Datadog module Core module Utils class OnlyOnceSuccessful < Datadog::Core::Utils::OnlyOnce + @limit: Integer + @retries: Integer + @failed: bool + + def initialize: (?Integer limit) -> void + + def success?: () -> bool + + def failed?: () -> bool + + private + + def check_limit!: () -> void + + def limited?: () -> bool end end end diff --git a/spec/datadog/core/utils/only_once_successful_spec.rb b/spec/datadog/core/utils/only_once_successful_spec.rb index bf06ff5d60c..e0adb31b41a 100644 --- a/spec/datadog/core/utils/only_once_successful_spec.rb +++ b/spec/datadog/core/utils/only_once_successful_spec.rb @@ -1,28 +1,62 @@ require 'datadog/core/utils/only_once_successful' RSpec.describe Datadog::Core::Utils::OnlyOnceSuccessful do - subject(:only_once_successful) { described_class.new } + subject(:only_once_successful) { described_class.new(limit) } + + let(:limit) { 0 } describe '#run' do - context 'before running once' do - it do - expect { |block| only_once_successful.run(&block) }.to yield_control - end + context 'when limitless' do + context 'before running once' do + it do + expect { |block| only_once_successful.run(&block) }.to yield_control + end - it 'returns the result of the block ran' do - expect(only_once_successful.run { :result }).to be :result + it 'returns the result of the block ran' do + expect(only_once_successful.run { :result }).to be :result + end end - end - context 'after running once' do - let(:result) { nil } + context 'after running once' do + let(:result) { nil } - before do - only_once_successful.run { result } + before do + only_once_successful.run { result } + end + + context 'when block returns truthy value' do + let(:result) { true } + + it do + expect { |block| only_once_successful.run(&block) }.to_not yield_control + end + + it do + expect(only_once_successful.run { :result }).to be nil + end + end + + context 'when block returns falsey value' do + let(:result) { false } + + it do + expect { |block| only_once_successful.run(&block) }.to yield_control + end + + it 'runs again until block returns truthy value' do + expect(only_once_successful.run { :result }).to be :result + + expect(only_once_successful.run { :result }).to be nil + end + end end + end + + context 'when limited' do + let(:limit) { 2 } context 'when block returns truthy value' do - let(:result) { true } + before { only_once_successful.run { true } } it do expect { |block| only_once_successful.run(&block) }.to_not yield_control @@ -33,16 +67,18 @@ end end - context 'when block returns falsey value' do - let(:result) { false } + context 'when block returns falsey value "limit" times' do + before do + limit.times do + only_once_successful.run { false } + end + end it do - expect { |block| only_once_successful.run(&block) }.to yield_control + expect { |block| only_once_successful.run(&block) }.to_not yield_control end - it 'runs again until block returns truthy value' do - expect(only_once_successful.run { :result }).to be :result - + it do expect(only_once_successful.run { :result }).to be nil end end @@ -91,5 +127,107 @@ end end end + + context 'when limited and ran "limit" times' do + let(:limit) { 2 } + + before do + limit.times do + only_once_successful.run { false } + end + end + + it do + expect(only_once_successful.ran?).to be true + end + end + end + + describe '#success?' do + context 'before running once' do + it do + expect(only_once_successful.success?).to be false + end + end + + context 'after running once' do + let(:result) { nil } + + before do + only_once_successful.run { result } + end + + context 'when block returns truthy value' do + let(:result) { true } + + it do + expect(only_once_successful.success?).to be true + end + end + + context 'when block returns falsey value' do + it do + expect(only_once_successful.success?).to be false + end + end + end + + context 'when limited and ran "limit" times' do + let(:limit) { 2 } + + before do + limit.times do + only_once_successful.run { false } + end + end + + it do + expect(only_once_successful.success?).to be false + end + end + end + + describe '#failed?' do + context 'before running once' do + it do + expect(only_once_successful.failed?).to be false + end + end + + context 'after running once' do + let(:result) { nil } + + before do + only_once_successful.run { result } + end + + context 'when block returns truthy value' do + let(:result) { true } + + it do + expect(only_once_successful.failed?).to be false + end + end + + context 'when block returns falsey value' do + it do + expect(only_once_successful.failed?).to be false + end + end + end + + context 'when limited and ran "limit" times' do + let(:limit) { 2 } + + before do + limit.times do + only_once_successful.run { false } + end + end + + it do + expect(only_once_successful.failed?).to be true + end + end end end From 71c9e5cfc6ab2ea052c67919d8193d55a249f7c4 Mon Sep 17 00:00:00 2001 From: Andrey Marchenko Date: Fri, 21 Jun 2024 10:37:14 +0200 Subject: [PATCH 17/21] limit telemetry app-started event retries --- lib/datadog/core/telemetry/worker.rb | 15 ++++- .../core/utils/only_once_successful.rb | 8 +++ sig/datadog/core/telemetry/worker.rbs | 3 + spec/datadog/core/telemetry/worker_spec.rb | 65 +++++++++++++++++++ 4 files changed, 89 insertions(+), 2 deletions(-) diff --git a/lib/datadog/core/telemetry/worker.rb b/lib/datadog/core/telemetry/worker.rb index 2021e139b0a..46d7721ee40 100644 --- a/lib/datadog/core/telemetry/worker.rb +++ b/lib/datadog/core/telemetry/worker.rb @@ -15,8 +15,9 @@ class Worker include Core::Workers::Polling DEFAULT_BUFFER_MAX_SIZE = 1000 + APP_STARTED_EVENT_RETRIES = 10 - TELEMETRY_STARTED_ONCE = Utils::OnlyOnceSuccessful.new + TELEMETRY_STARTED_ONCE = Utils::OnlyOnceSuccessful.new(APP_STARTED_EVENT_RETRIES) def initialize( heartbeat_interval_seconds:, @@ -61,7 +62,11 @@ def enqueue(event) end def sent_started_event? - TELEMETRY_STARTED_ONCE.ran? + TELEMETRY_STARTED_ONCE.success? + end + + def failed_to_start? + TELEMETRY_STARTED_ONCE.failed? end private @@ -94,6 +99,12 @@ def heartbeat! def started! return unless enabled? + if failed_to_start? + Datadog.logger.debug('Telemetry app-started event exhausted retries, disabling telemetry worker') + self.enabled = false + return + end + TELEMETRY_STARTED_ONCE.run do res = send_event(Event::AppStarted.new) diff --git a/lib/datadog/core/utils/only_once_successful.rb b/lib/datadog/core/utils/only_once_successful.rb index 4209cafc1ef..ed8b4141963 100644 --- a/lib/datadog/core/utils/only_once_successful.rb +++ b/lib/datadog/core/utils/only_once_successful.rb @@ -62,6 +62,14 @@ def check_limit! def limited? !@limit.nil? && @limit.positive? end + + def reset_ran_once_state_for_tests + @mutex.synchronize do + @ran_once = false + @failed = false + @retries = 0 + end + end end end end diff --git a/sig/datadog/core/telemetry/worker.rbs b/sig/datadog/core/telemetry/worker.rbs index 01c5107e990..822b9fece95 100644 --- a/sig/datadog/core/telemetry/worker.rbs +++ b/sig/datadog/core/telemetry/worker.rbs @@ -9,6 +9,7 @@ module Datadog include Core::Workers::Queue TELEMETRY_STARTED_ONCE: Datadog::Core::Utils::OnlyOnceSuccessful + APP_STARTED_EVENT_RETRIES: 10 DEFAULT_BUFFER_MAX_SIZE: 1000 @emitter: Emitter @@ -23,6 +24,8 @@ module Datadog def sent_started_event?: () -> bool + def failed_to_start?: () -> bool + def enqueue: (Event::Base event) -> void def dequeue: () -> Array[Event::Base] diff --git a/spec/datadog/core/telemetry/worker_spec.rb b/spec/datadog/core/telemetry/worker_spec.rb index 35a01e46834..e73fec0888e 100644 --- a/spec/datadog/core/telemetry/worker_spec.rb +++ b/spec/datadog/core/telemetry/worker_spec.rb @@ -120,6 +120,71 @@ try_wait_until { sent_hearbeat } end + context 'when app-started event fails' do + it 'retries' do + expect(emitter).to receive(:request).with(an_instance_of(Datadog::Core::Telemetry::Event::AppStarted)) + .and_return( + double( + Datadog::Core::Telemetry::Http::Adapters::Net::Response, + not_found?: false, + ok?: false + ) + ).once + + expect(emitter).to receive(:request).with(an_instance_of(Datadog::Core::Telemetry::Event::AppStarted)) do + @received_started = true + + response + end + + sent_hearbeat = false + allow(emitter).to receive(:request).with(kind_of(Datadog::Core::Telemetry::Event::AppHeartbeat)) do + # app-started was already sent by now + expect(@received_started).to be(true) + + sent_hearbeat = true + + response + end + + worker.start + + try_wait_until { sent_hearbeat } + end + end + + context 'when app-started event exhausted retries' do + let(:heartbeat_interval_seconds) { 0.1 } + + it 'stops retrying, never sends heartbeat, and disables worker' do + expect(emitter).to receive(:request).with(an_instance_of(Datadog::Core::Telemetry::Event::AppStarted)) + .and_return( + double( + Datadog::Core::Telemetry::Http::Adapters::Net::Response, + not_found?: false, + ok?: false + ) + ).exactly(described_class::APP_STARTED_EVENT_RETRIES).times + + sent_hearbeat = false + allow(emitter).to receive(:request).with(kind_of(Datadog::Core::Telemetry::Event::AppHeartbeat)) do + # app-started was already sent by now + expect(@received_started).to be(true) + + sent_hearbeat = true + + response + end + + worker.start + + try_wait_until { !worker.enabled? } + + expect(sent_hearbeat).to be(false) + expect(worker.failed_to_start?).to be(true) + end + end + context 'when dependencies collection enabled' do let(:dependency_collection) { true } From f4d349a6d5b9e224618bb5faabe9a2400854eead Mon Sep 17 00:00:00 2001 From: Andrey Marchenko Date: Fri, 21 Jun 2024 10:56:40 +0200 Subject: [PATCH 18/21] do not instantiate empty array every time when sending events --- lib/datadog/core/telemetry/worker.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/datadog/core/telemetry/worker.rb b/lib/datadog/core/telemetry/worker.rb index 46d7721ee40..ed1ce11f0d8 100644 --- a/lib/datadog/core/telemetry/worker.rb +++ b/lib/datadog/core/telemetry/worker.rb @@ -82,10 +82,11 @@ def perform(*events) end def flush_events(events) + return if events.nil? return if !enabled? || !sent_started_event? Datadog.logger.debug { "Sending #{events&.count} telemetry events" } - (events || []).each do |event| + events.each do |event| send_event(event) end end From 3ccf58ba4dd64b4d716a9c5faeb433ccf3a8214b Mon Sep 17 00:00:00 2001 From: Andrey Marchenko Date: Tue, 25 Jun 2024 12:19:11 +0200 Subject: [PATCH 19/21] lower HTTP timeout for telemetry worker --- lib/datadog/core/telemetry/http/adapters/net.rb | 2 +- lib/datadog/core/telemetry/worker.rb | 2 ++ sig/datadog/core/telemetry/http/adapters/net.rbs | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/datadog/core/telemetry/http/adapters/net.rb b/lib/datadog/core/telemetry/http/adapters/net.rb index e49b321c436..3aa65e6d49d 100644 --- a/lib/datadog/core/telemetry/http/adapters/net.rb +++ b/lib/datadog/core/telemetry/http/adapters/net.rb @@ -15,7 +15,7 @@ class Net :timeout, :ssl - DEFAULT_TIMEOUT = 30 + DEFAULT_TIMEOUT = 2 def initialize(hostname:, port: nil, timeout: DEFAULT_TIMEOUT, ssl: true) @hostname = hostname diff --git a/lib/datadog/core/telemetry/worker.rb b/lib/datadog/core/telemetry/worker.rb index ed1ce11f0d8..57633251ac7 100644 --- a/lib/datadog/core/telemetry/worker.rb +++ b/lib/datadog/core/telemetry/worker.rb @@ -58,6 +58,8 @@ def stop(force_stop = false, timeout = @shutdown_timeout) end def enqueue(event) + return if !enabled? || forked? + buffer.push(event) end diff --git a/sig/datadog/core/telemetry/http/adapters/net.rbs b/sig/datadog/core/telemetry/http/adapters/net.rbs index 5cf50e53adf..311c5989f95 100644 --- a/sig/datadog/core/telemetry/http/adapters/net.rbs +++ b/sig/datadog/core/telemetry/http/adapters/net.rbs @@ -14,7 +14,7 @@ module Datadog attr_reader ssl: bool - DEFAULT_TIMEOUT: 30 + DEFAULT_TIMEOUT: 2 def initialize: (hostname: String, ?port: Integer?, ?timeout: Float | Integer, ?ssl: bool?) -> void From 5729921839d6ff09f724fc4ff022ad21884fcc4e Mon Sep 17 00:00:00 2001 From: Andrey Marchenko Date: Tue, 25 Jun 2024 13:57:54 +0200 Subject: [PATCH 20/21] do not run httprb spec for jruby --- Rakefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Rakefile b/Rakefile index ee43950a83a..919a85f6403 100644 --- a/Rakefile +++ b/Rakefile @@ -101,7 +101,7 @@ TEST_METADATA = { 'http' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ✅ jruby' }, 'httprb' => { - 'http' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ✅ jruby' + 'http' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ❌ jruby' }, 'kafka' => { 'activesupport' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ✅ jruby' From 3b4e4d4007fe603daef835698ac9df4a93438be4 Mon Sep 17 00:00:00 2001 From: Andrey Marchenko Date: Tue, 25 Jun 2024 14:12:26 +0200 Subject: [PATCH 21/21] skip more http specs for jruby --- Rakefile | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/Rakefile b/Rakefile index 919a85f6403..f4ac22207b2 100644 --- a/Rakefile +++ b/Rakefile @@ -67,13 +67,13 @@ TEST_METADATA = { 'elasticsearch-8' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ✅ jruby' }, 'ethon' => { - 'http' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ✅ jruby' + 'http' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ❌ jruby' }, 'excon' => { - 'http' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ✅ jruby' + 'http' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ❌ jruby' }, 'faraday' => { - 'http' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ✅ jruby', + 'http' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ❌ jruby', 'contrib-old' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ❌ 3.0 / ❌ 3.1 / ❌ 3.2 / ❌ 3.3 / ❌ 3.4 / ✅ jruby' }, 'grape' => { @@ -95,10 +95,10 @@ TEST_METADATA = { 'contrib' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ❌ jruby' }, 'http' => { - 'http' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ✅ jruby' + 'http' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ❌ jruby' }, 'httpclient' => { - 'http' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ✅ jruby' + 'http' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ❌ jruby' }, 'httprb' => { 'http' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ❌ jruby' @@ -146,7 +146,7 @@ TEST_METADATA = { 'resque2-redis4' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ✅ jruby' }, 'rest_client' => { - 'http' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ✅ jruby' + 'http' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ❌ jruby' }, 'roda' => { 'contrib' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ✅ jruby' @@ -167,7 +167,7 @@ TEST_METADATA = { 'contrib' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ✅ jruby' }, 'stripe' => { - 'http' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ✅ jruby' + 'http' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ❌ jruby' }, 'sucker_punch' => { 'contrib' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ✅ jruby'