Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions lib/optimizely.rb
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def initialize(
@event_processor = if event_processor.respond_to?(:process)
event_processor
else
ForwardingEventProcessor.new(@event_dispatcher, @logger)
ForwardingEventProcessor.new(@event_dispatcher, @logger, @notification_center)
end
end

Expand Down Expand Up @@ -258,11 +258,13 @@ def track(event_key, user_id, attributes = nil, event_tags = nil)
@event_processor.process(user_event)
@logger.log(Logger::INFO, "Tracking event '#{event_key}' for user '#{user_id}'.")

log_event = EventFactory.create_log_event(user_event, @logger)
@notification_center.send_notifications(
NotificationCenter::NOTIFICATION_TYPES[:TRACK],
event_key, user_id, attributes, event_tags, log_event
)
if @notification_center.notification_count(NotificationCenter::NOTIFICATION_TYPES[:TRACK]).positive?
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do we gain by checking notification count, as opposed to just sending notifications regardless of count?

Copy link
Contributor Author

@rashidsp rashidsp Sep 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This validates that the listener has already been added before sending the notification.
Please view java-sdk as a reference: https://github.com/optimizely/java-sdk/blob/master/core-api/src/main/java/com/optimizely/ab/Optimizely.java#L304

log_event = EventFactory.create_log_event(user_event, @logger)
@notification_center.send_notifications(
NotificationCenter::NOTIFICATION_TYPES[:TRACK],
event_key, user_id, attributes, event_tags, log_event
)
end
nil
end

Expand Down Expand Up @@ -708,6 +710,7 @@ def send_impression(config, experiment, variation_key, user_id, attributes = nil
variation_id = config.get_variation_id_from_key(experiment_key, variation_key)
user_event = UserEventFactory.create_impression_event(config, experiment, variation_id, user_id, attributes)
@event_processor.process(user_event)
return unless @notification_center.notification_count(NotificationCenter::NOTIFICATION_TYPES[:ACTIVATE]).positive?

@logger.log(Logger::INFO, "Activating user '#{user_id}' in experiment '#{experiment_key}'.")
variation = config.get_variation_from_id(experiment_key, variation_id)
Expand Down
8 changes: 7 additions & 1 deletion lib/optimizely/event/batch_event_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ def initialize(
event_dispatcher:,
batch_size: DEFAULT_BATCH_SIZE,
flush_interval: DEFAULT_BATCH_INTERVAL,
logger: NoOpLogger.new
logger: NoOpLogger.new,
notification_center: nil
)
@event_queue = event_queue
@logger = logger
Expand All @@ -51,6 +52,7 @@ def initialize(
DEFAULT_BATCH_SIZE
end
@flush_interval = positive_number?(flush_interval) ? flush_interval : DEFAULT_BATCH_INTERVAL
@notification_center = notification_center
@mutex = Mutex.new
@received = ConditionVariable.new
@current_batch = []
Expand Down Expand Up @@ -152,6 +154,10 @@ def flush_queue!
log_event = Optimizely::EventFactory.create_log_event(@current_batch, @logger)
begin
@event_dispatcher.dispatch_event(log_event)
@notification_center&.send_notifications(
NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT],
log_event
)
rescue StandardError => e
@logger.log(Logger::ERROR, "Error dispatching event: #{log_event} #{e.message}.")
end
Expand Down
7 changes: 6 additions & 1 deletion lib/optimizely/event/forwarding_event_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,21 @@ module Optimizely
class ForwardingEventProcessor < EventProcessor
# ForwardingEventProcessor is a basic transformation stage for converting
# the event batch into a LogEvent to be dispatched.
def initialize(event_dispatcher, logger = nil)
def initialize(event_dispatcher, logger = nil, notification_center = nil)
@event_dispatcher = event_dispatcher
@logger = logger || NoOpLogger.new
@notification_center = notification_center
end

def process(user_event)
log_event = Optimizely::EventFactory.create_log_event(user_event, @logger)

begin
@event_dispatcher.dispatch_event(log_event)
@notification_center&.send_notifications(
NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT],
log_event
)
rescue StandardError => e
@logger.log(Logger::ERROR, "Error dispatching event: #{log_event} #{e.message}.")
end
Expand Down
5 changes: 5 additions & 0 deletions lib/optimizely/notification_center.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class NotificationCenter
# DEPRECATED: ACTIVATE notification type is deprecated since relase 3.1.0.
ACTIVATE: 'ACTIVATE: experiment, user_id, attributes, variation, event',
DECISION: 'DECISION: type, user_id, attributes, decision_info',
LOG_EVENT: 'LOG_EVENT: type, log_event',
OPTIMIZELY_CONFIG_UPDATE: 'optimizely_config_update',
TRACK: 'TRACK: event_key, user_id, attributes, event_tags, event'
}.freeze
Expand Down Expand Up @@ -137,6 +138,10 @@ def send_notifications(notification_type, *args)
end
end

def notification_count(notification_type)
@notifications.include?(notification_type) ? @notifications[notification_type].count : 0
end

private

def notification_type_valid?(notification_type)
Expand Down
39 changes: 38 additions & 1 deletion spec/event/batch_event_processor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,16 @@
@event_queue = SizedQueue.new(100)
@event_dispatcher = Optimizely::EventDispatcher.new
allow(@event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event))
@notification_center = Optimizely::NotificationCenter.new(spy_logger, error_handler)
allow(@notification_center).to receive(:send_notifications)

@event_processor = Optimizely::BatchEventProcessor.new(
event_queue: @event_queue,
event_dispatcher: @event_dispatcher,
batch_size: MAX_BATCH_SIZE,
flush_interval: MAX_DURATION_MS,
logger: spy_logger
logger: spy_logger,
notification_center: @notification_center
)
end

Expand Down Expand Up @@ -258,4 +261,38 @@
expect(event_processor.flush_interval).to eq(2000.5)
event_processor.stop!
end

it 'should send log event notification when event is dispatched' do
conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil)
log_event = Optimizely::EventFactory.create_log_event(conversion_event, spy_logger)

@event_processor.process(conversion_event)
sleep 1.5

expect(@notification_center).to have_received(:send_notifications).with(
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT],
log_event
).once

expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once
end

it 'should log an error when dispatch event raises timeout exception' do
conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil)
log_event = Optimizely::EventFactory.create_log_event(conversion_event, spy_logger)
allow(Optimizely::EventFactory).to receive(:create_log_event).and_return(log_event)

timeout_error = Timeout::Error.new
allow(@event_dispatcher).to receive(:dispatch_event).and_raise(timeout_error)

@event_processor.process(conversion_event)
sleep 1.5

expect(@notification_center).not_to have_received(:send_notifications)

expect(spy_logger).to have_received(:log).once.with(
Logger::ERROR,
"Error dispatching event: #{log_event} Timeout::Error."
)
end
end
18 changes: 15 additions & 3 deletions spec/event/forwarding_event_processor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,31 +69,43 @@
end

describe '.process' do
it 'should dispatch log event when valid event is provided' do
it 'should dispatch and send log event when valid event is provided' do
notification_center = Optimizely::NotificationCenter.new(spy_logger, error_handler)
allow(notification_center).to receive(:send_notifications)
forwarding_event_processor = Optimizely::ForwardingEventProcessor.new(
@event_dispatcher, spy_logger
@event_dispatcher, spy_logger, notification_center
)

forwarding_event_processor.process(@conversion_event)

expect(notification_center).to have_received(:send_notifications).with(
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT],
Optimizely::Event.new(:post, log_url, @expected_conversion_params, post_headers)
).once

expect(@event_dispatcher).to have_received(:dispatch_event).with(
Optimizely::Event.new(:post, log_url, @expected_conversion_params, post_headers)
).once
end

it 'should log an error when dispatch event raises timeout exception' do
notification_center = Optimizely::NotificationCenter.new(spy_logger, error_handler)
allow(notification_center).to receive(:send_notifications)

log_event = Optimizely::Event.new(:post, log_url, @expected_conversion_params, post_headers)
allow(Optimizely::EventFactory).to receive(:create_log_event).and_return(log_event)

timeout_error = Timeout::Error.new
allow(@event_dispatcher).to receive(:dispatch_event).and_raise(timeout_error)

forwarding_event_processor = Optimizely::ForwardingEventProcessor.new(
@event_dispatcher, spy_logger
@event_dispatcher, spy_logger, notification_center
)

forwarding_event_processor.process(@conversion_event)

expect(notification_center).not_to have_received(:send_notifications)

expect(spy_logger).to have_received(:log).once.with(
Logger::ERROR,
"Error dispatching event: #{log_event} Timeout::Error."
Expand Down
31 changes: 31 additions & 0 deletions spec/notification_center_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,37 @@ def deliver_three; end
end
end

describe '.notification_count' do
it 'should return count of added notification types' do
notification_center.add_notification_listener(
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:ACTIVATE],
@callback_reference
)

notification_center.add_notification_listener(
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:ACTIVATE],
method(:test)
)

notification_center.add_notification_listener(
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:TRACK],
method(:test)
)

expect(
notification_center.notification_count(Optimizely::NotificationCenter::NOTIFICATION_TYPES[:ACTIVATE])
).to eq(2)

expect(
notification_center.notification_count(Optimizely::NotificationCenter::NOTIFICATION_TYPES[:TRACK])
).to eq(1)

expect(
notification_center.notification_count(Optimizely::NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT])
).to eq(0)
end
end

describe '@error_handler' do
let(:raise_error_handler) { Optimizely::RaiseErrorHandler.new }
let(:notification_center) { Optimizely::NotificationCenter.new(spy_logger, raise_error_handler) }
Expand Down
46 changes: 45 additions & 1 deletion spec/project_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,11 @@ class InvalidErrorHandler; end
end

it 'should log and send activate notification when an impression event is dispatched' do
def callback(_args); end
project_instance.notification_center.add_notification_listener(
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:ACTIVATE],
method(:callback)
)
variation_to_return = project_instance.config_manager.config.get_variation_from_id('test_experiment', '111128')
allow(project_instance.decision_service.bucketer).to receive(:bucket).and_return(variation_to_return)
allow(project_instance.event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event))
Expand All @@ -600,6 +605,11 @@ class InvalidErrorHandler; end
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:DECISION], any_args
).ordered

# Log event
expect(project_instance.notification_center).to receive(:send_notifications).with(
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], any_args
).ordered

# Activate listener
expect(project_instance.notification_center).to receive(:send_notifications).with(
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:ACTIVATE],
Expand Down Expand Up @@ -886,13 +896,26 @@ class InvalidErrorHandler; end
params = @expected_track_event_params
params[:visitors][0][:snapshots][0][:events][0].merge!(revenue: 42,
tags: {'revenue' => 42})

def callback(_args); end
project_instance.notification_center.add_notification_listener(
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:TRACK],
method(:callback)
)
allow(project_instance.event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event))
conversion_event = Optimizely::Event.new(:post, conversion_log_url, params, post_headers)

expect(project_instance.notification_center).to receive(:send_notifications)
.with(
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], any_args
).ordered

expect(project_instance.notification_center).to receive(:send_notifications)
.with(
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:TRACK],
'test_event', 'test_user', nil, {'revenue' => 42}, conversion_event
).once
).ordered

project_instance.track('test_event', 'test_user', nil, 'revenue' => 42)
expect(project_instance.event_dispatcher).to have_received(:dispatch_event).with(Optimizely::Event.new(:post, conversion_log_url, params, post_headers)).once
end
Expand Down Expand Up @@ -1463,6 +1486,12 @@ class InvalidErrorHandler; end
end

it 'should return true, send activate notification and an impression if the user is bucketed into a feature experiment' do
def callback(_args); end
project_instance.notification_center.add_notification_listener(
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:ACTIVATE],
method(:callback)
)

allow(project_instance.event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event))
experiment_to_return = config_body['experiments'][3]
variation_to_return = experiment_to_return['variations'][0]
Expand All @@ -1472,6 +1501,11 @@ class InvalidErrorHandler; end
Optimizely::DecisionService::DECISION_SOURCES['FEATURE_TEST']
)

expect(project_instance.notification_center).to receive(:send_notifications)
.with(
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], any_args
).ordered

expect(project_instance.notification_center).to receive(:send_notifications)
.with(
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:ACTIVATE],
Expand Down Expand Up @@ -1709,6 +1743,12 @@ class InvalidErrorHandler; end

describe '.decision listener' do
it 'should return enabled features and call decision listener for all features' do
def callback(_args); end
project_instance.notification_center.add_notification_listener(
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:ACTIVATE],
method(:callback)
)

allow(project_instance.event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event))

enabled_features = %w[boolean_feature integer_single_variable_feature]
Expand Down Expand Up @@ -1739,6 +1779,10 @@ class InvalidErrorHandler; end
nil
)

expect(project_instance.notification_center).to receive(:send_notifications).twice.with(
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], any_args
)

expect(project_instance.notification_center).to receive(:send_notifications).twice.with(
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:ACTIVATE], any_args
)
Expand Down