diff --git a/lib/optimizely.rb b/lib/optimizely.rb index 7eb634f7..1f1cad71 100644 --- a/lib/optimizely.rb +++ b/lib/optimizely.rb @@ -102,7 +102,7 @@ def initialize( @event_processor = if event_processor.respond_to?(:process) event_processor else - ForwardingEventProcessor.new(@event_dispatcher, @logger) + ForwardingEventProcessor.new(@event_dispatcher, @logger, @notification_center) end end @@ -258,11 +258,13 @@ def track(event_key, user_id, attributes = nil, event_tags = nil) @event_processor.process(user_event) @logger.log(Logger::INFO, "Tracking event '#{event_key}' for user '#{user_id}'.") - log_event = EventFactory.create_log_event(user_event, @logger) - @notification_center.send_notifications( - NotificationCenter::NOTIFICATION_TYPES[:TRACK], - event_key, user_id, attributes, event_tags, log_event - ) + if @notification_center.notification_count(NotificationCenter::NOTIFICATION_TYPES[:TRACK]).positive? + log_event = EventFactory.create_log_event(user_event, @logger) + @notification_center.send_notifications( + NotificationCenter::NOTIFICATION_TYPES[:TRACK], + event_key, user_id, attributes, event_tags, log_event + ) + end nil end @@ -708,6 +710,7 @@ def send_impression(config, experiment, variation_key, user_id, attributes = nil variation_id = config.get_variation_id_from_key(experiment_key, variation_key) user_event = UserEventFactory.create_impression_event(config, experiment, variation_id, user_id, attributes) @event_processor.process(user_event) + return unless @notification_center.notification_count(NotificationCenter::NOTIFICATION_TYPES[:ACTIVATE]).positive? @logger.log(Logger::INFO, "Activating user '#{user_id}' in experiment '#{experiment_key}'.") variation = config.get_variation_from_id(experiment_key, variation_id) diff --git a/lib/optimizely/event/batch_event_processor.rb b/lib/optimizely/event/batch_event_processor.rb index 6575bc32..f6785d03 100644 --- a/lib/optimizely/event/batch_event_processor.rb +++ b/lib/optimizely/event/batch_event_processor.rb @@ -39,7 +39,8 @@ def initialize( event_dispatcher:, batch_size: DEFAULT_BATCH_SIZE, flush_interval: DEFAULT_BATCH_INTERVAL, - logger: NoOpLogger.new + logger: NoOpLogger.new, + notification_center: nil ) @event_queue = event_queue @logger = logger @@ -50,7 +51,13 @@ def initialize( @logger.log(Logger::DEBUG, "Setting to default batch_size: #{DEFAULT_BATCH_SIZE}.") DEFAULT_BATCH_SIZE end - @flush_interval = positive_number?(flush_interval) ? flush_interval : DEFAULT_BATCH_INTERVAL + @flush_interval = if positive_number?(flush_interval) + flush_interval + else + @logger.log(Logger::DEBUG, "Setting to default flush_interval: #{DEFAULT_BATCH_INTERVAL} ms.") + DEFAULT_BATCH_INTERVAL + end + @notification_center = notification_center @mutex = Mutex.new @received = ConditionVariable.new @current_batch = [] @@ -152,6 +159,10 @@ def flush_queue! log_event = Optimizely::EventFactory.create_log_event(@current_batch, @logger) begin @event_dispatcher.dispatch_event(log_event) + @notification_center&.send_notifications( + NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], + log_event + ) rescue StandardError => e @logger.log(Logger::ERROR, "Error dispatching event: #{log_event} #{e.message}.") end diff --git a/lib/optimizely/event/forwarding_event_processor.rb b/lib/optimizely/event/forwarding_event_processor.rb index 679174ae..8970f301 100644 --- a/lib/optimizely/event/forwarding_event_processor.rb +++ b/lib/optimizely/event/forwarding_event_processor.rb @@ -20,9 +20,10 @@ module Optimizely class ForwardingEventProcessor < EventProcessor # ForwardingEventProcessor is a basic transformation stage for converting # the event batch into a LogEvent to be dispatched. - def initialize(event_dispatcher, logger = nil) + def initialize(event_dispatcher, logger = nil, notification_center = nil) @event_dispatcher = event_dispatcher @logger = logger || NoOpLogger.new + @notification_center = notification_center end def process(user_event) @@ -30,6 +31,10 @@ def process(user_event) begin @event_dispatcher.dispatch_event(log_event) + @notification_center&.send_notifications( + NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], + log_event + ) rescue StandardError => e @logger.log(Logger::ERROR, "Error dispatching event: #{log_event} #{e.message}.") end diff --git a/lib/optimizely/notification_center.rb b/lib/optimizely/notification_center.rb index 53d6dc3d..4a3b0169 100644 --- a/lib/optimizely/notification_center.rb +++ b/lib/optimizely/notification_center.rb @@ -24,6 +24,7 @@ class NotificationCenter # DEPRECATED: ACTIVATE notification type is deprecated since relase 3.1.0. ACTIVATE: 'ACTIVATE: experiment, user_id, attributes, variation, event', DECISION: 'DECISION: type, user_id, attributes, decision_info', + LOG_EVENT: 'LOG_EVENT: type, log_event', OPTIMIZELY_CONFIG_UPDATE: 'optimizely_config_update', TRACK: 'TRACK: event_key, user_id, attributes, event_tags, event' }.freeze @@ -137,6 +138,10 @@ def send_notifications(notification_type, *args) end end + def notification_count(notification_type) + @notifications.include?(notification_type) ? @notifications[notification_type].count : 0 + end + private def notification_type_valid?(notification_type) diff --git a/spec/event/batch_event_processor_spec.rb b/spec/event/batch_event_processor_spec.rb index a5351ca4..8411cbdc 100644 --- a/spec/event/batch_event_processor_spec.rb +++ b/spec/event/batch_event_processor_spec.rb @@ -41,13 +41,16 @@ @event_queue = SizedQueue.new(100) @event_dispatcher = Optimizely::EventDispatcher.new allow(@event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event)) + @notification_center = Optimizely::NotificationCenter.new(spy_logger, error_handler) + allow(@notification_center).to receive(:send_notifications) @event_processor = Optimizely::BatchEventProcessor.new( event_queue: @event_queue, event_dispatcher: @event_dispatcher, batch_size: MAX_BATCH_SIZE, flush_interval: MAX_DURATION_MS, - logger: spy_logger + logger: spy_logger, + notification_center: @notification_center ) end @@ -236,18 +239,19 @@ event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher) expect(event_processor.flush_interval).to eq(30_000) event_processor.stop! - event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 'test') + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 'test', logger: spy_logger) expect(event_processor.flush_interval).to eq(30_000) event_processor.stop! - event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: []) + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: [], logger: spy_logger) expect(event_processor.flush_interval).to eq(30_000) event_processor.stop! - event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 0) + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 0, logger: spy_logger) expect(event_processor.flush_interval).to eq(30_000) event_processor.stop! - event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: -5) + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: -5, logger: spy_logger) expect(event_processor.flush_interval).to eq(30_000) event_processor.stop! + expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Setting to default flush_interval: 30000 ms.').exactly(4).times end it 'should set flush interval when provided valid' do @@ -258,4 +262,38 @@ expect(event_processor.flush_interval).to eq(2000.5) event_processor.stop! end + + it 'should send log event notification when event is dispatched' do + conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + log_event = Optimizely::EventFactory.create_log_event(conversion_event, spy_logger) + + @event_processor.process(conversion_event) + sleep 1.5 + + expect(@notification_center).to have_received(:send_notifications).with( + Optimizely::NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], + log_event + ).once + + expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once + end + + it 'should log an error when dispatch event raises timeout exception' do + conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + log_event = Optimizely::EventFactory.create_log_event(conversion_event, spy_logger) + allow(Optimizely::EventFactory).to receive(:create_log_event).and_return(log_event) + + timeout_error = Timeout::Error.new + allow(@event_dispatcher).to receive(:dispatch_event).and_raise(timeout_error) + + @event_processor.process(conversion_event) + sleep 1.5 + + expect(@notification_center).not_to have_received(:send_notifications) + + expect(spy_logger).to have_received(:log).once.with( + Logger::ERROR, + "Error dispatching event: #{log_event} Timeout::Error." + ) + end end diff --git a/spec/event/forwarding_event_processor_spec.rb b/spec/event/forwarding_event_processor_spec.rb index 7be502f0..f58d0e90 100644 --- a/spec/event/forwarding_event_processor_spec.rb +++ b/spec/event/forwarding_event_processor_spec.rb @@ -69,19 +69,29 @@ end describe '.process' do - it 'should dispatch log event when valid event is provided' do + it 'should dispatch and send log event when valid event is provided' do + notification_center = Optimizely::NotificationCenter.new(spy_logger, error_handler) + allow(notification_center).to receive(:send_notifications) forwarding_event_processor = Optimizely::ForwardingEventProcessor.new( - @event_dispatcher, spy_logger + @event_dispatcher, spy_logger, notification_center ) forwarding_event_processor.process(@conversion_event) + expect(notification_center).to have_received(:send_notifications).with( + Optimizely::NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], + Optimizely::Event.new(:post, log_url, @expected_conversion_params, post_headers) + ).once + expect(@event_dispatcher).to have_received(:dispatch_event).with( Optimizely::Event.new(:post, log_url, @expected_conversion_params, post_headers) ).once end it 'should log an error when dispatch event raises timeout exception' do + notification_center = Optimizely::NotificationCenter.new(spy_logger, error_handler) + allow(notification_center).to receive(:send_notifications) + log_event = Optimizely::Event.new(:post, log_url, @expected_conversion_params, post_headers) allow(Optimizely::EventFactory).to receive(:create_log_event).and_return(log_event) @@ -89,11 +99,13 @@ allow(@event_dispatcher).to receive(:dispatch_event).and_raise(timeout_error) forwarding_event_processor = Optimizely::ForwardingEventProcessor.new( - @event_dispatcher, spy_logger + @event_dispatcher, spy_logger, notification_center ) forwarding_event_processor.process(@conversion_event) + expect(notification_center).not_to have_received(:send_notifications) + expect(spy_logger).to have_received(:log).once.with( Logger::ERROR, "Error dispatching event: #{log_event} Timeout::Error." diff --git a/spec/notification_center_spec.rb b/spec/notification_center_spec.rb index 540c4420..be3a4b8e 100644 --- a/spec/notification_center_spec.rb +++ b/spec/notification_center_spec.rb @@ -486,6 +486,37 @@ def deliver_three; end end end + describe '.notification_count' do + it 'should return count of added notification types' do + notification_center.add_notification_listener( + Optimizely::NotificationCenter::NOTIFICATION_TYPES[:ACTIVATE], + @callback_reference + ) + + notification_center.add_notification_listener( + Optimizely::NotificationCenter::NOTIFICATION_TYPES[:ACTIVATE], + method(:test) + ) + + notification_center.add_notification_listener( + Optimizely::NotificationCenter::NOTIFICATION_TYPES[:TRACK], + method(:test) + ) + + expect( + notification_center.notification_count(Optimizely::NotificationCenter::NOTIFICATION_TYPES[:ACTIVATE]) + ).to eq(2) + + expect( + notification_center.notification_count(Optimizely::NotificationCenter::NOTIFICATION_TYPES[:TRACK]) + ).to eq(1) + + expect( + notification_center.notification_count(Optimizely::NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT]) + ).to eq(0) + end + end + describe '@error_handler' do let(:raise_error_handler) { Optimizely::RaiseErrorHandler.new } let(:notification_center) { Optimizely::NotificationCenter.new(spy_logger, raise_error_handler) } diff --git a/spec/project_spec.rb b/spec/project_spec.rb index ece7b934..fa462c16 100644 --- a/spec/project_spec.rb +++ b/spec/project_spec.rb @@ -587,6 +587,11 @@ class InvalidErrorHandler; end end it 'should log and send activate notification when an impression event is dispatched' do + def callback(_args); end + project_instance.notification_center.add_notification_listener( + Optimizely::NotificationCenter::NOTIFICATION_TYPES[:ACTIVATE], + method(:callback) + ) variation_to_return = project_instance.config_manager.config.get_variation_from_id('test_experiment', '111128') allow(project_instance.decision_service.bucketer).to receive(:bucket).and_return(variation_to_return) allow(project_instance.event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event)) @@ -600,6 +605,11 @@ class InvalidErrorHandler; end Optimizely::NotificationCenter::NOTIFICATION_TYPES[:DECISION], any_args ).ordered + # Log event + expect(project_instance.notification_center).to receive(:send_notifications).with( + Optimizely::NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], any_args + ).ordered + # Activate listener expect(project_instance.notification_center).to receive(:send_notifications).with( Optimizely::NotificationCenter::NOTIFICATION_TYPES[:ACTIVATE], @@ -886,13 +896,26 @@ class InvalidErrorHandler; end params = @expected_track_event_params params[:visitors][0][:snapshots][0][:events][0].merge!(revenue: 42, tags: {'revenue' => 42}) + + def callback(_args); end + project_instance.notification_center.add_notification_listener( + Optimizely::NotificationCenter::NOTIFICATION_TYPES[:TRACK], + method(:callback) + ) allow(project_instance.event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event)) conversion_event = Optimizely::Event.new(:post, conversion_log_url, params, post_headers) + + expect(project_instance.notification_center).to receive(:send_notifications) + .with( + Optimizely::NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], any_args + ).ordered + expect(project_instance.notification_center).to receive(:send_notifications) .with( Optimizely::NotificationCenter::NOTIFICATION_TYPES[:TRACK], 'test_event', 'test_user', nil, {'revenue' => 42}, conversion_event - ).once + ).ordered + project_instance.track('test_event', 'test_user', nil, 'revenue' => 42) expect(project_instance.event_dispatcher).to have_received(:dispatch_event).with(Optimizely::Event.new(:post, conversion_log_url, params, post_headers)).once end @@ -1463,6 +1486,12 @@ class InvalidErrorHandler; end end it 'should return true, send activate notification and an impression if the user is bucketed into a feature experiment' do + def callback(_args); end + project_instance.notification_center.add_notification_listener( + Optimizely::NotificationCenter::NOTIFICATION_TYPES[:ACTIVATE], + method(:callback) + ) + allow(project_instance.event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event)) experiment_to_return = config_body['experiments'][3] variation_to_return = experiment_to_return['variations'][0] @@ -1472,6 +1501,11 @@ class InvalidErrorHandler; end Optimizely::DecisionService::DECISION_SOURCES['FEATURE_TEST'] ) + expect(project_instance.notification_center).to receive(:send_notifications) + .with( + Optimizely::NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], any_args + ).ordered + expect(project_instance.notification_center).to receive(:send_notifications) .with( Optimizely::NotificationCenter::NOTIFICATION_TYPES[:ACTIVATE], @@ -1709,6 +1743,12 @@ class InvalidErrorHandler; end describe '.decision listener' do it 'should return enabled features and call decision listener for all features' do + def callback(_args); end + project_instance.notification_center.add_notification_listener( + Optimizely::NotificationCenter::NOTIFICATION_TYPES[:ACTIVATE], + method(:callback) + ) + allow(project_instance.event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event)) enabled_features = %w[boolean_feature integer_single_variable_feature] @@ -1739,6 +1779,10 @@ class InvalidErrorHandler; end nil ) + expect(project_instance.notification_center).to receive(:send_notifications).twice.with( + Optimizely::NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], any_args + ) + expect(project_instance.notification_center).to receive(:send_notifications).twice.with( Optimizely::NotificationCenter::NOTIFICATION_TYPES[:ACTIVATE], any_args )