diff --git a/Rakefile b/Rakefile index ee43950a83a..f4ac22207b2 100644 --- a/Rakefile +++ b/Rakefile @@ -67,13 +67,13 @@ TEST_METADATA = { 'elasticsearch-8' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ✅ jruby' }, 'ethon' => { - 'http' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ✅ jruby' + 'http' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ❌ jruby' }, 'excon' => { - 'http' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ✅ jruby' + 'http' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ❌ jruby' }, 'faraday' => { - 'http' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ✅ jruby', + 'http' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ❌ jruby', 'contrib-old' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ❌ 3.0 / ❌ 3.1 / ❌ 3.2 / ❌ 3.3 / ❌ 3.4 / ✅ jruby' }, 'grape' => { @@ -95,13 +95,13 @@ TEST_METADATA = { 'contrib' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ❌ jruby' }, 'http' => { - 'http' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ✅ jruby' + 'http' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ❌ jruby' }, 'httpclient' => { - 'http' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ✅ jruby' + 'http' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ❌ jruby' }, 'httprb' => { - 'http' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ✅ jruby' + 'http' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ❌ jruby' }, 'kafka' => { 'activesupport' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ✅ jruby' @@ -146,7 +146,7 @@ TEST_METADATA = { 'resque2-redis4' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ✅ jruby' }, 'rest_client' => { - 'http' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ✅ jruby' + 'http' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ❌ jruby' }, 'roda' => { 'contrib' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ✅ jruby' @@ -167,7 +167,7 @@ TEST_METADATA = { 'contrib' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ✅ jruby' }, 'stripe' => { - 'http' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ✅ jruby' + 'http' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ❌ jruby' }, 'sucker_punch' => { 'contrib' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ✅ jruby' diff --git a/lib/datadog/core/configuration.rb b/lib/datadog/core/configuration.rb index bcaa6432564..19d3ff27110 100644 --- a/lib/datadog/core/configuration.rb +++ b/lib/datadog/core/configuration.rb @@ -84,23 +84,16 @@ def configure configuration = self.configuration yield(configuration) - built_components = false - - components = safely_synchronize do |write_components| + safely_synchronize do |write_components| write_components.call( if components? replace_components!(configuration, @components) else - components = build_components(configuration) - built_components = true - components + build_components(configuration) end ) end - # Should only be called the first time components are built - components.telemetry.started! if built_components - configuration end @@ -200,20 +193,13 @@ def components(allow_initialization: true) current_components = COMPONENTS_READ_LOCK.synchronize { defined?(@components) && @components } return current_components if current_components || !allow_initialization - built_components = false - - components = safely_synchronize do |write_components| + safely_synchronize do |write_components| if defined?(@components) && @components @components else - built_components = true write_components.call(build_components(configuration)) end end - - # Should only be called the first time components are built - components&.telemetry&.started! if built_components - components end private diff --git a/lib/datadog/core/configuration/components.rb b/lib/datadog/core/configuration/components.rb index f7e00d7a981..665485c6f0c 100644 --- a/lib/datadog/core/configuration/components.rb +++ b/lib/datadog/core/configuration/components.rb @@ -6,7 +6,7 @@ require_relative '../diagnostics/health' require_relative '../logger' require_relative '../runtime/metrics' -require_relative '../telemetry/client' +require_relative '../telemetry/component' require_relative '../workers/runtime_metrics' require_relative '../remote/component' @@ -62,7 +62,7 @@ def build_telemetry(settings, agent_settings, logger) logger.debug { "Telemetry disabled. Agent network adapter not supported: #{agent_settings.adapter}" } end - Telemetry::Client.new( + Telemetry::Component.new( enabled: enabled, heartbeat_interval_seconds: settings.telemetry.heartbeat_interval_seconds, dependency_collection: settings.telemetry.dependency_collection @@ -169,8 +169,9 @@ def shutdown!(replacement = nil) unused_statsd = (old_statsd - (old_statsd & new_statsd)) unused_statsd.each(&:close) - telemetry.stop! + # enqueue closing event before stopping telemetry so it will be send out on shutdown telemetry.emit_closing! unless replacement + telemetry.stop! end end end diff --git a/lib/datadog/core/telemetry/client.rb b/lib/datadog/core/telemetry/client.rb deleted file mode 100644 index 172145c9342..00000000000 --- a/lib/datadog/core/telemetry/client.rb +++ /dev/null @@ -1,95 +0,0 @@ -# frozen_string_literal: true - -require_relative 'emitter' -require_relative 'event' -require_relative 'heartbeat' -require_relative '../utils/forking' - -module Datadog - module Core - module Telemetry - # Telemetry entrypoint, coordinates sending telemetry events at various points in app lifecycle. - class Client - attr_reader \ - :enabled, - :unsupported - - include Core::Utils::Forking - - # @param enabled [Boolean] Determines whether telemetry events should be sent to the API - # @param heartbeat_interval_seconds [Float] How frequently heartbeats will be reported, in seconds. - # @param [Boolean] dependency_collection Whether to send the `app-dependencies-loaded` event - def initialize(heartbeat_interval_seconds:, dependency_collection:, enabled: true) - @enabled = enabled - @emitter = Emitter.new - @stopped = false - @unsupported = false - @started = false - @dependency_collection = dependency_collection - - @worker = Telemetry::Heartbeat.new(enabled: @enabled, heartbeat_interval_seconds: heartbeat_interval_seconds) do - next unless @started # `started!` should be the first event, thus ensure that `heartbeat!` is not sent first. - - heartbeat! - end - end - - def disable! - @enabled = false - @worker.enabled = false - end - - def started! - return if !@enabled || forked? - - res = @emitter.request(Event::AppStarted.new) - - if res.not_found? # Telemetry is only supported by agent versions 7.34 and up - Datadog.logger.debug('Agent does not support telemetry; disabling future telemetry events.') - disable! - @unsupported = true # Prevent telemetry from getting re-enabled - return res - end - - @emitter.request(Event::AppDependenciesLoaded.new) if @dependency_collection - - @started = true - end - - def emit_closing! - return if !@enabled || forked? - - @emitter.request(Event::AppClosing.new) - end - - def stop! - return if @stopped - - @worker.stop(true, 0) - @stopped = true - end - - def integrations_change! - return if !@enabled || forked? - - @emitter.request(Event::AppIntegrationsChange.new) - end - - # Report configuration changes caused by Remote Configuration. - def client_configuration_change!(changes) - return if !@enabled || forked? - - @emitter.request(Event::AppClientConfigurationChange.new(changes, 'remote_config')) - end - - private - - def heartbeat! - return if !@enabled || forked? - - @emitter.request(Event::AppHeartbeat.new) - end - end - end - end -end diff --git a/lib/datadog/core/telemetry/component.rb b/lib/datadog/core/telemetry/component.rb new file mode 100644 index 00000000000..0d5046e4391 --- /dev/null +++ b/lib/datadog/core/telemetry/component.rb @@ -0,0 +1,66 @@ +# frozen_string_literal: true + +require_relative 'emitter' +require_relative 'event' +require_relative 'worker' +require_relative '../utils/forking' + +module Datadog + module Core + module Telemetry + # Telemetry entrypoint, coordinates sending telemetry events at various points in app lifecycle. + class Component + attr_reader :enabled + + include Core::Utils::Forking + + # @param enabled [Boolean] Determines whether telemetry events should be sent to the API + # @param heartbeat_interval_seconds [Float] How frequently heartbeats will be reported, in seconds. + # @param [Boolean] dependency_collection Whether to send the `app-dependencies-loaded` event + def initialize(heartbeat_interval_seconds:, dependency_collection:, enabled: true) + @enabled = enabled + @stopped = false + + @worker = Telemetry::Worker.new( + enabled: @enabled, + heartbeat_interval_seconds: heartbeat_interval_seconds, + emitter: Emitter.new, + dependency_collection: dependency_collection + ) + @worker.start + end + + def disable! + @enabled = false + @worker.enabled = false + end + + def stop! + return if @stopped + + @worker.stop(true) + @stopped = true + end + + def emit_closing! + return if !@enabled || forked? + + @worker.enqueue(Event::AppClosing.new) + end + + def integrations_change! + return if !@enabled || forked? + + @worker.enqueue(Event::AppIntegrationsChange.new) + end + + # Report configuration changes caused by Remote Configuration. + def client_configuration_change!(changes) + return if !@enabled || forked? + + @worker.enqueue(Event::AppClientConfigurationChange.new(changes, 'remote_config')) + end + end + end + end +end diff --git a/lib/datadog/core/telemetry/heartbeat.rb b/lib/datadog/core/telemetry/heartbeat.rb deleted file mode 100644 index b2129504e68..00000000000 --- a/lib/datadog/core/telemetry/heartbeat.rb +++ /dev/null @@ -1,33 +0,0 @@ -# frozen_string_literal: true - -require_relative '../worker' -require_relative '../workers/polling' - -module Datadog - module Core - module Telemetry - # Periodically sends a heartbeat event to the telemetry API. - class Heartbeat < Core::Worker - include Core::Workers::Polling - - def initialize(heartbeat_interval_seconds:, enabled: true, &block) - # Workers::Polling settings - self.enabled = enabled - # Workers::IntervalLoop settings - self.loop_base_interval = heartbeat_interval_seconds - self.fork_policy = Core::Workers::Async::Thread::FORK_POLICY_STOP - super(&block) - start - end - - def loop_wait_before_first_iteration?; end - - private - - def start - perform - end - end - end - end -end diff --git a/lib/datadog/core/telemetry/http/adapters/net.rb b/lib/datadog/core/telemetry/http/adapters/net.rb index e49b321c436..3aa65e6d49d 100644 --- a/lib/datadog/core/telemetry/http/adapters/net.rb +++ b/lib/datadog/core/telemetry/http/adapters/net.rb @@ -15,7 +15,7 @@ class Net :timeout, :ssl - DEFAULT_TIMEOUT = 30 + DEFAULT_TIMEOUT = 2 def initialize(hostname:, port: nil, timeout: DEFAULT_TIMEOUT, ssl: true) @hostname = hostname diff --git a/lib/datadog/core/telemetry/worker.rb b/lib/datadog/core/telemetry/worker.rb new file mode 100644 index 00000000000..57633251ac7 --- /dev/null +++ b/lib/datadog/core/telemetry/worker.rb @@ -0,0 +1,156 @@ +# frozen_string_literal: true + +require_relative 'event' + +require_relative '../utils/only_once_successful' +require_relative '../workers/polling' +require_relative '../workers/queue' + +module Datadog + module Core + module Telemetry + # Accumulates events and sends them to the API at a regular interval, including heartbeat event. + class Worker + include Core::Workers::Queue + include Core::Workers::Polling + + DEFAULT_BUFFER_MAX_SIZE = 1000 + APP_STARTED_EVENT_RETRIES = 10 + + TELEMETRY_STARTED_ONCE = Utils::OnlyOnceSuccessful.new(APP_STARTED_EVENT_RETRIES) + + def initialize( + heartbeat_interval_seconds:, + emitter:, + dependency_collection:, + enabled: true, + shutdown_timeout: Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT, + buffer_size: DEFAULT_BUFFER_MAX_SIZE + ) + @emitter = emitter + @dependency_collection = dependency_collection + + # Workers::Polling settings + self.enabled = enabled + # Workers::IntervalLoop settings + self.loop_base_interval = heartbeat_interval_seconds + self.fork_policy = Core::Workers::Async::Thread::FORK_POLICY_STOP + + @shutdown_timeout = shutdown_timeout + @buffer_size = buffer_size + + self.buffer = buffer_klass.new(@buffer_size) + end + + def start + return if !enabled? || forked? + + # starts async worker + perform + end + + def stop(force_stop = false, timeout = @shutdown_timeout) + buffer.close if running? + + flush_events(dequeue) if work_pending? + + super + end + + def enqueue(event) + return if !enabled? || forked? + + buffer.push(event) + end + + def sent_started_event? + TELEMETRY_STARTED_ONCE.success? + end + + def failed_to_start? + TELEMETRY_STARTED_ONCE.failed? + end + + private + + def perform(*events) + return if !enabled? || forked? + + started! unless sent_started_event? + + heartbeat! + + flush_events(events) + end + + def flush_events(events) + return if events.nil? + return if !enabled? || !sent_started_event? + + Datadog.logger.debug { "Sending #{events&.count} telemetry events" } + events.each do |event| + send_event(event) + end + end + + def heartbeat! + return if !enabled? || !sent_started_event? + + send_event(Event::AppHeartbeat.new) + end + + def started! + return unless enabled? + + if failed_to_start? + Datadog.logger.debug('Telemetry app-started event exhausted retries, disabling telemetry worker') + self.enabled = false + return + end + + TELEMETRY_STARTED_ONCE.run do + res = send_event(Event::AppStarted.new) + + if res.ok? + Datadog.logger.debug('Telemetry app-started event is successfully sent') + + send_event(Event::AppDependenciesLoaded.new) if @dependency_collection + + true + else + Datadog.logger.debug('Error sending telemetry app-started event, retry after heartbeat interval...') + false + end + end + end + + def send_event(event) + res = @emitter.request(event) + + disable_on_not_found!(res) + + res + end + + def dequeue + buffer.pop + end + + def buffer_klass + if Core::Environment::Ext::RUBY_ENGINE == 'ruby' + Core::Buffer::CRuby + else + Core::Buffer::ThreadSafe + end + end + + def disable_on_not_found!(response) + return unless response.not_found? + + Datadog.logger.debug('Agent does not support telemetry; disabling future telemetry events.') + self.enabled = false + end + end + end + end +end diff --git a/lib/datadog/core/utils/only_once_successful.rb b/lib/datadog/core/utils/only_once_successful.rb new file mode 100644 index 00000000000..ed8b4141963 --- /dev/null +++ b/lib/datadog/core/utils/only_once_successful.rb @@ -0,0 +1,76 @@ +# frozen_string_literal: true + +require_relative 'only_once' + +module Datadog + module Core + module Utils + # Helper class to execute something with only one success. + # + # This is useful for cases where we want to ensure that a block of code is only executed once, and only if it + # succeeds. One such example is sending app-started telemetry event. + # + # Successful execution is determined by the return value of the block: any truthy value is considered success. + # + # Thread-safe when used correctly (e.g. be careful of races when lazily initializing instances of this class). + # + # Note: In its current state, this class is not Ractor-safe. + # In https://github.com/DataDog/dd-trace-rb/pull/1398#issuecomment-797378810 we have a discussion of alternatives, + # including an alternative implementation that is Ractor-safe once spent. + class OnlyOnceSuccessful < OnlyOnce + def initialize(limit = 0) + super() + + @limit = limit + @failed = false + @retries = 0 + end + + def run + @mutex.synchronize do + return if @ran_once + + result = yield + @ran_once = !!result + + if !@ran_once && limited? + @retries += 1 + check_limit! + end + + result + end + end + + def success? + @mutex.synchronize { @ran_once && !@failed } + end + + def failed? + @mutex.synchronize { @ran_once && @failed } + end + + private + + def check_limit! + if @retries >= @limit + @failed = true + @ran_once = true + end + end + + def limited? + !@limit.nil? && @limit.positive? + end + + def reset_ran_once_state_for_tests + @mutex.synchronize do + @ran_once = false + @failed = false + @retries = 0 + end + end + end + end + end +end diff --git a/sig/datadog/core/telemetry/client.rbs b/sig/datadog/core/telemetry/component.rbs similarity index 57% rename from sig/datadog/core/telemetry/client.rbs rename to sig/datadog/core/telemetry/component.rbs index 9bd2f4a97cc..4d411d32578 100644 --- a/sig/datadog/core/telemetry/client.rbs +++ b/sig/datadog/core/telemetry/component.rbs @@ -1,38 +1,26 @@ module Datadog module Core module Telemetry - class Client + class Component @enabled: bool - @dependency_collection: bool - @started: bool @stopped: bool - @emitter: Datadog::Core::Telemetry::Emitter - @unsupported: bool - @worker: Datadog::Core::Telemetry::Heartbeat + @worker: Datadog::Core::Telemetry::Worker attr_reader enabled: bool - attr_reader unsupported: bool - include Core::Utils::Forking - def initialize: (heartbeat_interval_seconds: Numeric, dependency_collection: bool, enabled: bool) -> void + def initialize: (heartbeat_interval_seconds: Numeric, dependency_collection: bool, ?enabled: bool) -> void def disable!: () -> void def client_configuration_change!: (Enumerable[[String, Numeric | bool | String]] changes) -> void - def started!: () -> void - def emit_closing!: () -> void def stop!: () -> void def integrations_change!: () -> void - - private - - def heartbeat!: () -> void end end end diff --git a/sig/datadog/core/telemetry/heartbeat.rbs b/sig/datadog/core/telemetry/heartbeat.rbs deleted file mode 100644 index b89aeedca8b..00000000000 --- a/sig/datadog/core/telemetry/heartbeat.rbs +++ /dev/null @@ -1,20 +0,0 @@ -module Datadog - module Core - module Telemetry - class Heartbeat < Core::Worker - include Core::Workers::Polling - include Core::Workers::Async::Thread - include Core::Workers::Async::Thread::PrependedMethods - include Core::Workers::IntervalLoop - - def initialize: (?enabled: bool, heartbeat_interval_seconds: Numeric) ?{ () -> void } -> void - - def loop_wait_before_first_iteration?: () -> bool? - - private - - def start: () -> void - end - end - end -end diff --git a/sig/datadog/core/telemetry/http/adapters/net.rbs b/sig/datadog/core/telemetry/http/adapters/net.rbs index 5cf50e53adf..311c5989f95 100644 --- a/sig/datadog/core/telemetry/http/adapters/net.rbs +++ b/sig/datadog/core/telemetry/http/adapters/net.rbs @@ -14,7 +14,7 @@ module Datadog attr_reader ssl: bool - DEFAULT_TIMEOUT: 30 + DEFAULT_TIMEOUT: 2 def initialize: (hostname: String, ?port: Integer?, ?timeout: Float | Integer, ?ssl: bool?) -> void diff --git a/sig/datadog/core/telemetry/http/ext.rbs b/sig/datadog/core/telemetry/http/ext.rbs index 22cea7d1fd0..11271822da3 100644 --- a/sig/datadog/core/telemetry/http/ext.rbs +++ b/sig/datadog/core/telemetry/http/ext.rbs @@ -21,7 +21,7 @@ module Datadog CONTENT_TYPE_APPLICATION_JSON: "application/json" - API_VERSION: "v1" + API_VERSION: "v2" AGENT_ENDPOINT: "/telemetry/proxy/api/v2/apmtelemetry" end diff --git a/sig/datadog/core/telemetry/worker.rbs b/sig/datadog/core/telemetry/worker.rbs new file mode 100644 index 00000000000..822b9fece95 --- /dev/null +++ b/sig/datadog/core/telemetry/worker.rbs @@ -0,0 +1,49 @@ +module Datadog + module Core + module Telemetry + class Worker + include Core::Workers::Polling + include Core::Workers::Async::Thread + include Core::Workers::Async::Thread::PrependedMethods + include Core::Workers::IntervalLoop + include Core::Workers::Queue + + TELEMETRY_STARTED_ONCE: Datadog::Core::Utils::OnlyOnceSuccessful + APP_STARTED_EVENT_RETRIES: 10 + DEFAULT_BUFFER_MAX_SIZE: 1000 + + @emitter: Emitter + @sent_started_event: bool + @shutdown_timeout: Integer + @buffer_size: Integer + @dependency_collection: bool + + def initialize: (?enabled: bool, heartbeat_interval_seconds: Numeric, emitter: Emitter, ?shutdown_timeout: Integer, ?buffer_size: Integer, dependency_collection: bool) -> void + + def start: () -> void + + def sent_started_event?: () -> bool + + def failed_to_start?: () -> bool + + def enqueue: (Event::Base event) -> void + + def dequeue: () -> Array[Event::Base] + + private + + def heartbeat!: () -> void + + def started!: () -> void + + def flush_events: (Array[Event::Base] events) -> void + + def send_event: (Event::Base event) -> Datadog::Core::Telemetry::Http::Adapters::Net::Response + + def disable_on_not_found!: (Datadog::Core::Telemetry::Http::Adapters::Net::Response response) -> void + + def buffer_klass: () -> untyped + end + end + end +end diff --git a/sig/datadog/core/utils/only_once.rbs b/sig/datadog/core/utils/only_once.rbs index 324b1ba72e7..354334c71e1 100644 --- a/sig/datadog/core/utils/only_once.rbs +++ b/sig/datadog/core/utils/only_once.rbs @@ -2,6 +2,9 @@ module Datadog module Core module Utils class OnlyOnce + @ran_once: bool + @mutex: Thread::Mutex + def initialize: () -> untyped def run: () { () -> untyped } -> untyped diff --git a/sig/datadog/core/utils/only_once_successful.rbs b/sig/datadog/core/utils/only_once_successful.rbs new file mode 100644 index 00000000000..2236b5a66b5 --- /dev/null +++ b/sig/datadog/core/utils/only_once_successful.rbs @@ -0,0 +1,23 @@ +module Datadog + module Core + module Utils + class OnlyOnceSuccessful < Datadog::Core::Utils::OnlyOnce + @limit: Integer + @retries: Integer + @failed: bool + + def initialize: (?Integer limit) -> void + + def success?: () -> bool + + def failed?: () -> bool + + private + + def check_limit!: () -> void + + def limited?: () -> bool + end + end + end +end diff --git a/sig/datadog/core/workers/polling.rbs b/sig/datadog/core/workers/polling.rbs index 7f4d8f9c55b..43c1360a92c 100644 --- a/sig/datadog/core/workers/polling.rbs +++ b/sig/datadog/core/workers/polling.rbs @@ -2,7 +2,7 @@ module Datadog module Core module Workers module Polling - SHUTDOWN_TIMEOUT: 1 + DEFAULT_SHUTDOWN_TIMEOUT: 1 def self.included: (Class | Module base) -> void diff --git a/spec/datadog/core/configuration/components_spec.rb b/spec/datadog/core/configuration/components_spec.rb index e04fad93fcf..79c65637f91 100644 --- a/spec/datadog/core/configuration/components_spec.rb +++ b/spec/datadog/core/configuration/components_spec.rb @@ -7,7 +7,7 @@ require 'datadog/core/diagnostics/environment_logger' require 'datadog/core/diagnostics/health' require 'datadog/core/logger' -require 'datadog/core/telemetry/client' +require 'datadog/core/telemetry/component' require 'datadog/core/runtime/metrics' require 'datadog/core/workers/runtime_metrics' require 'datadog/statsd' @@ -33,7 +33,7 @@ let(:profiler_setup_task) { Datadog::Profiling.supported? ? instance_double(Datadog::Profiling::Tasks::Setup) : nil } let(:remote) { instance_double(Datadog::Core::Remote::Component, start: nil, shutdown!: nil) } - let(:telemetry) { instance_double(Datadog::Core::Telemetry::Client) } + let(:telemetry) { instance_double(Datadog::Core::Telemetry::Component) } let(:environment_logger_extra) { { hello: 123, world: '456' } } @@ -46,7 +46,7 @@ end allow(Datadog::Statsd).to receive(:new) { instance_double(Datadog::Statsd) } allow(Datadog::Core::Remote::Component).to receive(:new).and_return(remote) - allow(Datadog::Core::Telemetry::Client).to receive(:new).and_return(telemetry) + allow(Datadog::Core::Telemetry::Component).to receive(:new).and_return(telemetry) end around do |example| @@ -223,7 +223,7 @@ let(:logger) { instance_double(Logger) } context 'given settings' do - let(:telemetry_client) { instance_double(Datadog::Core::Telemetry::Client) } + let(:telemetry) { instance_double(Datadog::Core::Telemetry::Component) } let(:expected_options) do { enabled: enabled, heartbeat_interval_seconds: heartbeat_interval_seconds, dependency_collection: dependency_collection } @@ -233,16 +233,16 @@ let(:dependency_collection) { true } before do - expect(Datadog::Core::Telemetry::Client).to receive(:new).with(expected_options).and_return(telemetry_client) + expect(Datadog::Core::Telemetry::Component).to receive(:new).with(expected_options).and_return(telemetry) allow(settings.telemetry).to receive(:enabled).and_return(enabled) end - it { is_expected.to be(telemetry_client) } + it { is_expected.to be(telemetry) } context 'with :enabled true' do let(:enabled) { double('enabled') } - it { is_expected.to be(telemetry_client) } + it { is_expected.to be(telemetry) } context 'and :unix agent adapter' do let(:expected_options) do @@ -255,7 +255,7 @@ it 'does not enable telemetry for unsupported non-http transport' do expect(logger).to receive(:debug) - is_expected.to be(telemetry_client) + is_expected.to be(telemetry) end end end @@ -1108,7 +1108,7 @@ let(:runtime_metrics) { instance_double(Datadog::Core::Runtime::Metrics, statsd: statsd) } let(:health_metrics) { instance_double(Datadog::Core::Diagnostics::Health::Metrics, statsd: statsd) } let(:statsd) { instance_double(::Datadog::Statsd) } - let(:telemetry) { instance_double(Datadog::Core::Telemetry::Client) } + let(:telemetry) { instance_double(Datadog::Core::Telemetry::Component) } before do allow(replacement).to receive(:tracer).and_return(tracer) diff --git a/spec/datadog/core/configuration_spec.rb b/spec/datadog/core/configuration_spec.rb index 7873d262f70..51a2b44cb01 100644 --- a/spec/datadog/core/configuration_spec.rb +++ b/spec/datadog/core/configuration_spec.rb @@ -8,13 +8,12 @@ RSpec.describe Datadog::Core::Configuration do let(:default_log_level) { ::Logger::INFO } - let(:telemetry_client) { instance_double(Datadog::Core::Telemetry::Client) } + let(:telemetry) { instance_double(Datadog::Core::Telemetry::Component) } before do - allow(telemetry_client).to receive(:started!) - allow(telemetry_client).to receive(:stop!) - allow(telemetry_client).to receive(:emit_closing!) - allow(Datadog::Core::Telemetry::Client).to receive(:new).and_return(telemetry_client) + allow(telemetry).to receive(:stop!) + allow(telemetry).to receive(:emit_closing!) + allow(Datadog::Core::Telemetry::Component).to receive(:new).and_return(telemetry) allow(Datadog::Core::Remote::Component).to receive(:build) end @@ -41,10 +40,6 @@ end it do - # We cannot mix `expect().to_not` with `expect().to(...).ordered`. - # One way around that is to force the method to raise an error if it's ever called. - allow(telemetry_client).to receive(:started!).and_raise('Should not be called') - # Components should have changed expect { configure } .to change { test_class.send(:components) } @@ -84,7 +79,6 @@ .with(test_class.configuration) expect(new_components).to_not have_received(:shutdown!) - expect(telemetry_client).to have_received(:started!) end end end @@ -501,8 +495,6 @@ describe '#components' do context 'when components are not initialized' do it 'initializes the components' do - expect(telemetry_client).to receive(:started!) - test_class.send(:components) expect(test_class.send(:components?)).to be true @@ -510,8 +502,6 @@ context 'when allow_initialization is false' do it 'does not initialize the components' do - expect(telemetry_client).to_not receive(:started!) - test_class.send(:components, allow_initialization: false) expect(test_class.send(:components?)).to be false @@ -527,7 +517,6 @@ it 'returns the components without touching the COMPONENTS_WRITE_LOCK' do described_class.const_get(:COMPONENTS_WRITE_LOCK).lock - expect(telemetry_client).to_not receive(:started!) expect(test_class.send(:components)).to_not be_nil end end diff --git a/spec/datadog/core/telemetry/client_spec.rb b/spec/datadog/core/telemetry/client_spec.rb deleted file mode 100644 index 5618d4ea65e..00000000000 --- a/spec/datadog/core/telemetry/client_spec.rb +++ /dev/null @@ -1,329 +0,0 @@ -require 'spec_helper' - -require 'datadog/core/telemetry/client' - -RSpec.describe Datadog::Core::Telemetry::Client do - subject(:client) do - described_class.new( - enabled: enabled, - heartbeat_interval_seconds: heartbeat_interval_seconds, - dependency_collection: dependency_collection - ) - end - - let(:enabled) { true } - let(:heartbeat_interval_seconds) { 1.3 } - let(:dependency_collection) { true } - let(:emitter) { double(Datadog::Core::Telemetry::Emitter) } - let(:response) { double(Datadog::Core::Telemetry::Http::Adapters::Net::Response) } - let(:not_found) { false } - - before do - allow(Datadog::Core::Telemetry::Emitter).to receive(:new).and_return(emitter) - allow(emitter).to receive(:request).and_return(response) - allow(response).to receive(:not_found?).and_return(not_found) - end - - describe '#initialize' do - after do - client.stop! - end - - context 'with default parameters' do - subject(:client) do - described_class.new( - heartbeat_interval_seconds: heartbeat_interval_seconds, - dependency_collection: dependency_collection - ) - end - - it { is_expected.to be_a_kind_of(described_class) } - it { expect(client.enabled).to be(true) } - end - - context 'when :enabled is false' do - let(:enabled) { false } - it { is_expected.to be_a_kind_of(described_class) } - it { expect(client.enabled).to be(false) } - end - - context 'when enabled' do - let(:enabled) { true } - - it { is_expected.to be_a_kind_of(described_class) } - it { expect(client.enabled).to be(true) } - end - end - - describe '#disable!' do - after do - client.stop! - end - - it { expect { client.disable! }.to change { client.enabled }.from(true).to(false) } - end - - describe '#started!' do - subject(:started!) { client.started! } - - after do - client.stop! - end - - context 'when disabled' do - let(:enabled) { false } - it do - started! - expect(emitter).to_not have_received(:request) - end - end - - context 'when enabled' do - let(:enabled) { true } - - context 'when dependency_collection is true' do - it do - app_started = double - allow(Datadog::Core::Telemetry::Event::AppStarted).to receive(:new).with(no_args).and_return(app_started) - - dependencies = double - allow(Datadog::Core::Telemetry::Event::AppDependenciesLoaded) - .to receive(:new).with(no_args).and_return(dependencies) - - started! - expect(emitter).to have_received(:request).with(app_started) - expect(emitter).to have_received(:request).with(dependencies) - end - end - - context 'when dependency_collection is false' do - let(:dependency_collection) { false } - - it do - app_started = double - allow(Datadog::Core::Telemetry::Event::AppStarted).to receive(:new).with(no_args).and_return(app_started) - - dependencies = double - allow(Datadog::Core::Telemetry::Event::AppDependenciesLoaded) - .to receive(:new).with(no_args).and_return(dependencies) - - started! - expect(emitter).to have_received(:request).with(app_started) - expect(emitter).to_not have_received(:request).with(dependencies) - end - - context 'with heartbeat' do - let(:heartbeat_interval_seconds) { 0 } - - it 'sends a heartbeat strictly after app-started' do - @sent_hearbeat = false - allow(emitter).to receive(:request).with(kind_of(Datadog::Core::Telemetry::Event::AppHeartbeat)) do - # Ensure app-started was already sent by now - expect(emitter).to have_received(:request).with(kind_of(Datadog::Core::Telemetry::Event::AppStarted)) - @sent_hearbeat = true - response - end - - client.started! - - try_wait_until { @sent_hearbeat } - end - end - end - end - - context 'when internal error returned by emitter' do - let(:response) { Datadog::Core::Telemetry::Http::InternalErrorResponse.new('error') } - - it { expect { started! }.to_not raise_error } - end - - context 'when response returns 404' do - let(:not_found) { true } - - before do - logger = double(Datadog::Core::Logger) - allow(logger).to receive(:debug).with(any_args) - allow(Datadog).to receive(:logger).and_return(logger) - end - - it do - started! - expect(client.enabled).to be(false) - expect(client.unsupported).to be(true) - expect(Datadog.logger).to have_received(:debug).with( - 'Agent does not support telemetry; disabling future telemetry events.' - ) - end - end - - context 'when in fork' do - before { skip 'Fork not supported on current platform' unless Process.respond_to?(:fork) } - - it do - client - expect_in_fork do - expect(emitter).to_not receive(:request) - client.started! - end - end - end - end - - describe '#emit_closing!' do - subject(:emit_closing!) { client.emit_closing! } - - after do - client.stop! - end - - context 'when disabled' do - let(:enabled) { false } - it do - emit_closing! - expect(emitter).to_not have_received(:request) - end - end - - context 'when enabled' do - let(:enabled) { true } - it do - double = double() - allow(Datadog::Core::Telemetry::Event::AppClosing).to receive(:new).with(no_args).and_return(double) - - emit_closing! - expect(emitter).to have_received(:request).with(double) - end - - it { is_expected.to be(response) } - end - - context 'when in fork' do - before { skip 'Fork not supported on current platform' unless Process.respond_to?(:fork) } - - it do - client - expect_in_fork do - expect(emitter).to_not receive(:request) - client.started! - end - end - end - end - - describe '#stop!' do - subject(:stop!) { client.stop! } - let(:worker) { instance_double(Datadog::Core::Telemetry::Heartbeat) } - - before do - allow(Datadog::Core::Telemetry::Heartbeat).to receive(:new) - .with(enabled: enabled, heartbeat_interval_seconds: heartbeat_interval_seconds).and_return(worker) - allow(worker).to receive(:start) - allow(worker).to receive(:stop) - end - - context 'when disabled' do - let(:enabled) { false } - it 'does not raise error' do - stop! - end - end - - context 'when enabled' do - let(:enabled) { true } - - context 'when stop! has been called already' do - it 'does not raise error' do - stop! - stop! - end - end - end - end - - describe '#integrations_change!' do - subject(:integrations_change!) { client.integrations_change! } - - after do - client.stop! - end - - context 'when disabled' do - let(:enabled) { false } - it do - integrations_change! - expect(emitter).to_not have_received(:request) - end - end - - context 'when enabled' do - let(:enabled) { true } - it do - double = double() - allow(Datadog::Core::Telemetry::Event::AppIntegrationsChange).to receive(:new).with(no_args).and_return(double) - - integrations_change! - expect(emitter).to have_received(:request).with(double) - end - - it { is_expected.to be(response) } - end - - context 'when in fork' do - before { skip 'Fork not supported on current platform' unless Process.respond_to?(:fork) } - - it do - client - expect_in_fork do - expect(emitter).to_not receive(:request) - client.started! - end - end - end - end - - describe '#client_configuration_change!' do - subject(:client_configuration_change!) { client.client_configuration_change!(changes) } - let(:changes) { double('changes') } - - after do - client.stop! - end - - context 'when disabled' do - let(:enabled) { false } - it do - client_configuration_change! - expect(emitter).to_not have_received(:request) - end - end - - context 'when enabled' do - let(:enabled) { true } - it do - double = double() - allow(Datadog::Core::Telemetry::Event::AppClientConfigurationChange).to receive(:new).with( - changes, - 'remote_config' - ).and_return(double) - - client_configuration_change! - expect(emitter).to have_received(:request).with(double) - end - - it { is_expected.to be(response) } - end - - context 'when in fork' do - before { skip 'Fork not supported on current platform' unless Process.respond_to?(:fork) } - - it do - client - expect_in_fork do - expect(emitter).to_not receive(:request) - client.started! - end - end - end - end -end diff --git a/spec/datadog/core/telemetry/component_spec.rb b/spec/datadog/core/telemetry/component_spec.rb new file mode 100644 index 00000000000..ba17d37c2f4 --- /dev/null +++ b/spec/datadog/core/telemetry/component_spec.rb @@ -0,0 +1,207 @@ +require 'spec_helper' + +require 'datadog/core/telemetry/component' + +RSpec.describe Datadog::Core::Telemetry::Component do + subject(:telemetry) do + described_class.new( + enabled: enabled, + heartbeat_interval_seconds: heartbeat_interval_seconds, + dependency_collection: dependency_collection + ) + end + + let(:enabled) { true } + let(:heartbeat_interval_seconds) { 0 } + let(:dependency_collection) { true } + let(:worker) { double(Datadog::Core::Telemetry::Worker) } + let(:not_found) { false } + + before do + allow(Datadog::Core::Telemetry::Worker).to receive(:new).with( + heartbeat_interval_seconds: heartbeat_interval_seconds, + dependency_collection: dependency_collection, + enabled: enabled, + emitter: an_instance_of(Datadog::Core::Telemetry::Emitter) + ).and_return(worker) + + allow(worker).to receive(:start) + allow(worker).to receive(:enqueue) + allow(worker).to receive(:stop) + allow(worker).to receive(:"enabled=") + end + + describe '#initialize' do + after do + telemetry.stop! + end + + context 'with default parameters' do + subject(:telemetry) do + described_class.new( + heartbeat_interval_seconds: heartbeat_interval_seconds, + dependency_collection: dependency_collection + ) + end + + it { is_expected.to be_a_kind_of(described_class) } + it { expect(telemetry.enabled).to be(true) } + end + + context 'when :enabled is false' do + let(:enabled) { false } + it { is_expected.to be_a_kind_of(described_class) } + it { expect(telemetry.enabled).to be(false) } + end + + context 'when enabled' do + let(:enabled) { true } + + it { is_expected.to be_a_kind_of(described_class) } + it { expect(telemetry.enabled).to be(true) } + end + end + + describe '#disable!' do + after do + telemetry.stop! + end + + it { expect { telemetry.disable! }.to change { telemetry.enabled }.from(true).to(false) } + + it 'disables worker' do + telemetry.disable! + + expect(worker).to have_received(:"enabled=").with(false) + end + end + + describe '#emit_closing!' do + subject(:emit_closing!) { telemetry.emit_closing! } + + after do + telemetry.stop! + end + + context 'when disabled' do + let(:enabled) { false } + it do + emit_closing! + + expect(worker).not_to have_received(:enqueue) + end + end + + context 'when enabled' do + let(:enabled) { true } + it do + emit_closing! + + expect(worker).to have_received(:enqueue).with( + an_instance_of(Datadog::Core::Telemetry::Event::AppClosing) + ) + end + end + + context 'when in fork' do + before { skip 'Fork not supported on current platform' unless Process.respond_to?(:fork) } + + it do + telemetry + expect_in_fork do + expect(worker).not_to have_received(:enqueue) + end + end + end + end + + describe '#stop!' do + subject(:stop!) { telemetry.stop! } + + it 'stops worker once' do + stop! + stop! + + expect(worker).to have_received(:stop).once + end + end + + describe '#integrations_change!' do + subject(:integrations_change!) { telemetry.integrations_change! } + + after do + telemetry.stop! + end + + context 'when disabled' do + let(:enabled) { false } + it do + integrations_change! + + expect(worker).not_to have_received(:enqueue) + end + end + + context 'when enabled' do + let(:enabled) { true } + it do + integrations_change! + + expect(worker).to have_received(:enqueue).with( + an_instance_of(Datadog::Core::Telemetry::Event::AppIntegrationsChange) + ) + end + end + + context 'when in fork' do + before { skip 'Fork not supported on current platform' unless Process.respond_to?(:fork) } + + it do + telemetry + expect_in_fork do + expect(worker).not_to have_received(:enqueue) + end + end + end + end + + describe '#client_configuration_change!' do + subject(:client_configuration_change!) { telemetry.client_configuration_change!(changes) } + let(:changes) { double('changes') } + + after do + telemetry.stop! + end + + context 'when disabled' do + let(:enabled) { false } + it do + client_configuration_change! + + expect(worker).not_to have_received(:enqueue) + end + end + + context 'when enabled' do + let(:enabled) { true } + it do + client_configuration_change! + + expect(worker).to have_received(:enqueue).with( + an_instance_of(Datadog::Core::Telemetry::Event::AppClientConfigurationChange) + ) + end + end + + context 'when in fork' do + before { skip 'Fork not supported on current platform' unless Process.respond_to?(:fork) } + + it do + telemetry + expect_in_fork do + expect(worker).not_to have_received(:enqueue) + end + end + end + end +end diff --git a/spec/datadog/core/telemetry/heartbeat_spec.rb b/spec/datadog/core/telemetry/heartbeat_spec.rb deleted file mode 100644 index 645120267e0..00000000000 --- a/spec/datadog/core/telemetry/heartbeat_spec.rb +++ /dev/null @@ -1,46 +0,0 @@ -require 'spec_helper' - -require 'datadog/core/telemetry/heartbeat' - -RSpec.describe Datadog::Core::Telemetry::Heartbeat do - subject(:heartbeat) do - described_class.new(enabled: enabled, heartbeat_interval_seconds: heartbeat_interval_seconds, &block) - end - - let(:enabled) { true } - let(:heartbeat_interval_seconds) { 1.2 } - let(:block) { proc {} } - - after do - heartbeat.stop(true, 0) - heartbeat.join - end - - describe '.new' do - context 'when using default settings' do - subject(:heartbeat) { described_class.new(heartbeat_interval_seconds: heartbeat_interval_seconds, &block) } - it do - is_expected.to have_attributes( - enabled?: true, - loop_base_interval: 1.2, # seconds - task: block - ) - end - end - - context 'when enabled' do - let(:enabled) { true } - - it do - heartbeat - - try_wait_until { heartbeat.running? } - expect(heartbeat).to have_attributes( - run_async?: true, - running?: true, - started?: true - ) - end - end - end -end diff --git a/spec/datadog/core/telemetry/worker_spec.rb b/spec/datadog/core/telemetry/worker_spec.rb new file mode 100644 index 00000000000..e73fec0888e --- /dev/null +++ b/spec/datadog/core/telemetry/worker_spec.rb @@ -0,0 +1,303 @@ +require 'spec_helper' + +require 'datadog/core/telemetry/worker' + +RSpec.describe Datadog::Core::Telemetry::Worker do + subject(:worker) do + described_class.new( + enabled: enabled, + heartbeat_interval_seconds: heartbeat_interval_seconds, + emitter: emitter, + dependency_collection: dependency_collection + ) + end + + let(:enabled) { true } + let(:heartbeat_interval_seconds) { 0.5 } + let(:emitter) { double(Datadog::Core::Telemetry::Emitter) } + let(:dependency_collection) { false } + + let(:backend_supports_telemetry?) { true } + let(:response) do + double( + Datadog::Core::Telemetry::Http::Adapters::Net::Response, + not_found?: !backend_supports_telemetry?, + ok?: backend_supports_telemetry? + ) + end + + before do + logger = double(Datadog::Core::Logger) + allow(logger).to receive(:debug).with(any_args) + allow(Datadog).to receive(:logger).and_return(logger) + + @received_started = false + @received_heartbeat = false + + allow(emitter).to receive(:request).with(an_instance_of(Datadog::Core::Telemetry::Event::AppStarted)) do + @received_started = true + + response + end + + allow(emitter).to receive(:request).with(an_instance_of(Datadog::Core::Telemetry::Event::AppHeartbeat)) do + @received_heartbeat = true + + response + end + end + + after do + worker.stop(true) + worker.join + + Datadog::Core::Telemetry::Worker::TELEMETRY_STARTED_ONCE.send(:reset_ran_once_state_for_tests) + end + + describe '.new' do + it 'creates a new worker in stopped state' do + expect(worker).to have_attributes( + enabled?: true, + loop_base_interval: heartbeat_interval_seconds, + run_async?: false, + running?: false, + started?: false + ) + end + end + + describe '#start' do + context 'when enabled' do + context "when backend doesn't support telemetry" do + let(:backend_supports_telemetry?) { false } + + it 'disables the worker' do + worker.start + + try_wait_until { @received_started } + + expect(worker).to have_attributes( + enabled?: false, + loop_base_interval: heartbeat_interval_seconds, + ) + expect(Datadog.logger).to have_received(:debug).with( + 'Agent does not support telemetry; disabling future telemetry events.' + ) + expect(@received_heartbeat).to be(false) + end + end + + context 'when backend supports telemetry' do + let(:backend_supports_telemetry?) { true } + + it 'starts the worker and sends heartbeat event' do + worker.start + + try_wait_until { @received_heartbeat } + + expect(worker).to have_attributes( + enabled?: true, + loop_base_interval: heartbeat_interval_seconds, + run_async?: true, + running?: true, + started?: true + ) + end + + it 'always sends heartbeat event after started event' do + sent_hearbeat = false + allow(emitter).to receive(:request).with(kind_of(Datadog::Core::Telemetry::Event::AppHeartbeat)) do + # app-started was already sent by now + expect(worker.sent_started_event?).to be(true) + + sent_hearbeat = true + + response + end + + worker.start + + try_wait_until { sent_hearbeat } + end + + context 'when app-started event fails' do + it 'retries' do + expect(emitter).to receive(:request).with(an_instance_of(Datadog::Core::Telemetry::Event::AppStarted)) + .and_return( + double( + Datadog::Core::Telemetry::Http::Adapters::Net::Response, + not_found?: false, + ok?: false + ) + ).once + + expect(emitter).to receive(:request).with(an_instance_of(Datadog::Core::Telemetry::Event::AppStarted)) do + @received_started = true + + response + end + + sent_hearbeat = false + allow(emitter).to receive(:request).with(kind_of(Datadog::Core::Telemetry::Event::AppHeartbeat)) do + # app-started was already sent by now + expect(@received_started).to be(true) + + sent_hearbeat = true + + response + end + + worker.start + + try_wait_until { sent_hearbeat } + end + end + + context 'when app-started event exhausted retries' do + let(:heartbeat_interval_seconds) { 0.1 } + + it 'stops retrying, never sends heartbeat, and disables worker' do + expect(emitter).to receive(:request).with(an_instance_of(Datadog::Core::Telemetry::Event::AppStarted)) + .and_return( + double( + Datadog::Core::Telemetry::Http::Adapters::Net::Response, + not_found?: false, + ok?: false + ) + ).exactly(described_class::APP_STARTED_EVENT_RETRIES).times + + sent_hearbeat = false + allow(emitter).to receive(:request).with(kind_of(Datadog::Core::Telemetry::Event::AppHeartbeat)) do + # app-started was already sent by now + expect(@received_started).to be(true) + + sent_hearbeat = true + + response + end + + worker.start + + try_wait_until { !worker.enabled? } + + expect(sent_hearbeat).to be(false) + expect(worker.failed_to_start?).to be(true) + end + end + + context 'when dependencies collection enabled' do + let(:dependency_collection) { true } + + it 'sends dependencies loaded event after started event' do + sent_dependencies = false + allow(emitter).to receive(:request).with(kind_of(Datadog::Core::Telemetry::Event::AppDependenciesLoaded)) do + # app-started was already sent by now + # don't use worker.sent_started_event? because it uses the same lock + expect(@received_started).to be(true) + + sent_dependencies = true + + response + end + + worker.start + + try_wait_until { sent_dependencies } + end + end + end + + context 'when internal error returned by emitter' do + let(:response) { Datadog::Core::Telemetry::Http::InternalErrorResponse.new('error') } + + it 'does not send heartbeat event' do + worker.start + + try_wait_until { @received_started } + + expect(@received_heartbeat).to be(false) + end + end + + context 'several workers running' do + it 'sends single started event' do + started_events = 0 + allow(emitter).to receive(:request).with(kind_of(Datadog::Core::Telemetry::Event::AppStarted)) do + started_events += 1 + + response + end + + heartbeat_events = 0 + allow(emitter).to receive(:request).with(kind_of(Datadog::Core::Telemetry::Event::AppHeartbeat)) do + heartbeat_events += 1 + + response + end + + workers = Array.new(3) do + described_class.new( + enabled: enabled, + heartbeat_interval_seconds: heartbeat_interval_seconds, + emitter: emitter, + dependency_collection: dependency_collection + ) + end + workers.each(&:start) + + try_wait_until { heartbeat_events >= 3 } + + expect(started_events).to be(1) + + workers.each do |w| + w.stop(true, 0) + w.join + end + end + end + end + + context 'when disabled' do + let(:enabled) { false } + + it 'does not start the worker' do + expect(worker).not_to receive(:perform) + + worker.start + end + end + end + + describe '#stop' do + let(:heartbeat_interval_seconds) { 3 } + + it 'flushes events and stops the worker' do + worker.start + + expect(worker).to receive(:flush_events).at_least(:once) + worker.stop(true) + end + end + + describe '#enqueue' do + it 'adds events to the buffer and flushes them later' do + events_received = 0 + allow(emitter).to receive(:request).with( + an_instance_of(Datadog::Core::Telemetry::Event::AppIntegrationsChange) + ) do + events_received += 1 + + response + end + + worker.start + + events_sent = 3 + events_sent.times do + worker.enqueue(Datadog::Core::Telemetry::Event::AppIntegrationsChange.new) + end + + try_wait_until { events_received == events_sent } + end + end +end diff --git a/spec/datadog/core/utils/only_once_successful_spec.rb b/spec/datadog/core/utils/only_once_successful_spec.rb new file mode 100644 index 00000000000..e0adb31b41a --- /dev/null +++ b/spec/datadog/core/utils/only_once_successful_spec.rb @@ -0,0 +1,233 @@ +require 'datadog/core/utils/only_once_successful' + +RSpec.describe Datadog::Core::Utils::OnlyOnceSuccessful do + subject(:only_once_successful) { described_class.new(limit) } + + let(:limit) { 0 } + + describe '#run' do + context 'when limitless' do + context 'before running once' do + it do + expect { |block| only_once_successful.run(&block) }.to yield_control + end + + it 'returns the result of the block ran' do + expect(only_once_successful.run { :result }).to be :result + end + end + + context 'after running once' do + let(:result) { nil } + + before do + only_once_successful.run { result } + end + + context 'when block returns truthy value' do + let(:result) { true } + + it do + expect { |block| only_once_successful.run(&block) }.to_not yield_control + end + + it do + expect(only_once_successful.run { :result }).to be nil + end + end + + context 'when block returns falsey value' do + let(:result) { false } + + it do + expect { |block| only_once_successful.run(&block) }.to yield_control + end + + it 'runs again until block returns truthy value' do + expect(only_once_successful.run { :result }).to be :result + + expect(only_once_successful.run { :result }).to be nil + end + end + end + end + + context 'when limited' do + let(:limit) { 2 } + + context 'when block returns truthy value' do + before { only_once_successful.run { true } } + + it do + expect { |block| only_once_successful.run(&block) }.to_not yield_control + end + + it do + expect(only_once_successful.run { :result }).to be nil + end + end + + context 'when block returns falsey value "limit" times' do + before do + limit.times do + only_once_successful.run { false } + end + end + + it do + expect { |block| only_once_successful.run(&block) }.to_not yield_control + end + + it do + expect(only_once_successful.run { :result }).to be nil + end + end + end + + context 'when run throws an exception' do + it 'propagates the exception out' do + exception = RuntimeError.new('boom') + + expect { only_once_successful.run { raise exception } }.to raise_exception(exception) + end + + it 'runs again' do + only_once_successful.run { raise 'boom' } rescue nil + + expect { |block| only_once_successful.run(&block) }.to yield_control + end + end + end + + describe '#ran?' do + context 'before running once' do + it do + expect(only_once_successful.ran?).to be false + end + end + + context 'after running once' do + let(:result) { nil } + + before do + only_once_successful.run { result } + end + + context 'when block returns truthy value' do + let(:result) { true } + + it do + expect(only_once_successful.ran?).to be true + end + end + + context 'when block returns falsey value' do + it do + expect(only_once_successful.ran?).to be false + end + end + end + + context 'when limited and ran "limit" times' do + let(:limit) { 2 } + + before do + limit.times do + only_once_successful.run { false } + end + end + + it do + expect(only_once_successful.ran?).to be true + end + end + end + + describe '#success?' do + context 'before running once' do + it do + expect(only_once_successful.success?).to be false + end + end + + context 'after running once' do + let(:result) { nil } + + before do + only_once_successful.run { result } + end + + context 'when block returns truthy value' do + let(:result) { true } + + it do + expect(only_once_successful.success?).to be true + end + end + + context 'when block returns falsey value' do + it do + expect(only_once_successful.success?).to be false + end + end + end + + context 'when limited and ran "limit" times' do + let(:limit) { 2 } + + before do + limit.times do + only_once_successful.run { false } + end + end + + it do + expect(only_once_successful.success?).to be false + end + end + end + + describe '#failed?' do + context 'before running once' do + it do + expect(only_once_successful.failed?).to be false + end + end + + context 'after running once' do + let(:result) { nil } + + before do + only_once_successful.run { result } + end + + context 'when block returns truthy value' do + let(:result) { true } + + it do + expect(only_once_successful.failed?).to be false + end + end + + context 'when block returns falsey value' do + it do + expect(only_once_successful.failed?).to be false + end + end + end + + context 'when limited and ran "limit" times' do + let(:limit) { 2 } + + before do + limit.times do + only_once_successful.run { false } + end + end + + it do + expect(only_once_successful.failed?).to be true + end + end + end +end diff --git a/spec/datadog/tracing/contrib/extensions_spec.rb b/spec/datadog/tracing/contrib/extensions_spec.rb index d53dd66c0e8..461fd09852f 100644 --- a/spec/datadog/tracing/contrib/extensions_spec.rb +++ b/spec/datadog/tracing/contrib/extensions_spec.rb @@ -46,7 +46,7 @@ end it 'sends a telemetry integrations change event' do - expect_any_instance_of(Datadog::Core::Telemetry::Client).to receive(:integrations_change!) + expect_any_instance_of(Datadog::Core::Telemetry::Component).to receive(:integrations_change!) configure end end