Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
28ea204
feat(eventProcessor): Adds EventProcessor and BatchEventProcessor
rashidsp Aug 1, 2019
bcfca16
feat(forwarding-EP): Implements forwarding event processor
rashidsp Aug 2, 2019
ccbea99
feat(notification-center): Adds LogEvent notification
rashidsp Jul 31, 2019
f7a5b97
log event in forward-EP
rashidsp Aug 2, 2019
f5db35b
Addressing feedback
rashidsp Aug 2, 2019
32a566e
Addressing feedback
rashidsp Aug 2, 2019
c8ed3f7
Addressing feedback
rashidsp Aug 2, 2019
c0946ea
fixes log event issue
rashidsp Aug 2, 2019
cfcf5a8
Addresses review
rashidsp Aug 5, 2019
717adb0
resolves test issue
rashidsp Aug 5, 2019
6eec6bd
fixes logger issue
rashidsp Aug 5, 2019
7ac5896
feat(eventprocess): Integrate Event Processor with Optimizely
rashidsp Aug 6, 2019
899f3a6
fixes: failing test
rashidsp Aug 6, 2019
a31a9da
tests: adds SizedQueue
rashidsp Aug 7, 2019
93562a1
resolves conflicts
rashidsp Aug 7, 2019
0b7e53d
resolves conflicts
rashidsp Aug 7, 2019
e1f6715
Merge branch 'rashid/log-event' into rashid/optly-EP-integration
rashidsp Aug 7, 2019
309ee3a
Merge branch 'master' into rashid/batch-event-processor
rashidsp Aug 8, 2019
096ae95
Addresses feedback
rashidsp Aug 8, 2019
1341177
Addresses feedback
rashidsp Aug 9, 2019
ededaa7
reverts log-event changes
rashidsp Aug 9, 2019
2512412
resolves conflicts
rashidsp Aug 9, 2019
61e3426
config manager update
rashidsp Aug 9, 2019
b409edf
Removed verbose log.
msohailhussain Aug 15, 2019
9c019c7
Merge branch 'master' into rashid/batch-event-processor
rashidsp Aug 16, 2019
2fd378f
Resolves unit test
rashidsp Aug 16, 2019
efab9f4
removes log
rashidsp Aug 16, 2019
3c4a980
adds positive number conditions
rashidsp Aug 16, 2019
29fc092
resolves commits
rashidsp Aug 18, 2019
6634c54
feat(notification-center): Adds LogEvent notification
rashidsp Aug 19, 2019
cbdce5e
fixes visitor_attributes bug
rashidsp Aug 19, 2019
1b03cb5
Merge branch 'rashid/optly-EP-integration' into rashid/Batch-EP-logEvent
rashidsp Aug 19, 2019
6021aef
fixes visitor_attributes bug
rashidsp Aug 19, 2019
184fd06
Addressed review
rashidsp Aug 20, 2019
432933c
Merge branch 'rashid/batch-event-processor' into rashid/optly-EP-inte…
rashidsp Aug 20, 2019
06ca7e3
resolves conflicts
rashidsp Aug 20, 2019
f807970
Fixes event bug
rashidsp Aug 21, 2019
0e12040
resolves conflicts
rashidsp Aug 23, 2019
58ae248
Merge branch 'rashid/optly-EP-integration' into rashid/Batch-EP-logEvent
rashidsp Aug 23, 2019
d3a7495
Fixes event bug
rashidsp Aug 21, 2019
e802860
review changes
rashidsp Aug 27, 2019
83668d2
Merge branch 'rashid/optly-EP-integration' into rashid/Batch-EP-logEvent
rashidsp Aug 28, 2019
87b1d61
resolves conflicts
rashidsp Aug 29, 2019
ecf2d6e
log default interval message.
rashidsp Aug 29, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions lib/optimizely.rb
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def initialize(
@event_processor = if event_processor.respond_to?(:process)
event_processor
else
ForwardingEventProcessor.new(@event_dispatcher, @logger)
ForwardingEventProcessor.new(@event_dispatcher, @logger, @notification_center)
end
end

Expand Down Expand Up @@ -258,11 +258,13 @@ def track(event_key, user_id, attributes = nil, event_tags = nil)
@event_processor.process(user_event)
@logger.log(Logger::INFO, "Tracking event '#{event_key}' for user '#{user_id}'.")

log_event = EventFactory.create_log_event(user_event, @logger)
@notification_center.send_notifications(
NotificationCenter::NOTIFICATION_TYPES[:TRACK],
event_key, user_id, attributes, event_tags, log_event
)
if @notification_center.notification_count(NotificationCenter::NOTIFICATION_TYPES[:TRACK]).positive?
log_event = EventFactory.create_log_event(user_event, @logger)
@notification_center.send_notifications(
NotificationCenter::NOTIFICATION_TYPES[:TRACK],
event_key, user_id, attributes, event_tags, log_event
)
end
nil
end

Expand Down Expand Up @@ -708,6 +710,7 @@ def send_impression(config, experiment, variation_key, user_id, attributes = nil
variation_id = config.get_variation_id_from_key(experiment_key, variation_key)
user_event = UserEventFactory.create_impression_event(config, experiment, variation_id, user_id, attributes)
@event_processor.process(user_event)
return unless @notification_center.notification_count(NotificationCenter::NOTIFICATION_TYPES[:ACTIVATE]).positive?

@logger.log(Logger::INFO, "Activating user '#{user_id}' in experiment '#{experiment_key}'.")
variation = config.get_variation_from_id(experiment_key, variation_id)
Expand Down
15 changes: 13 additions & 2 deletions lib/optimizely/event/batch_event_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ def initialize(
event_dispatcher:,
batch_size: DEFAULT_BATCH_SIZE,
flush_interval: DEFAULT_BATCH_INTERVAL,
logger: NoOpLogger.new
logger: NoOpLogger.new,
notification_center: nil
)
@event_queue = event_queue
@logger = logger
Expand All @@ -50,7 +51,13 @@ def initialize(
@logger.log(Logger::DEBUG, "Setting to default batch_size: #{DEFAULT_BATCH_SIZE}.")
DEFAULT_BATCH_SIZE
end
@flush_interval = positive_number?(flush_interval) ? flush_interval : DEFAULT_BATCH_INTERVAL
@flush_interval = if positive_number?(flush_interval)
flush_interval
else
@logger.log(Logger::DEBUG, "Setting to default flush_interval: #{DEFAULT_BATCH_INTERVAL} ms.")
DEFAULT_BATCH_INTERVAL
end
@notification_center = notification_center
@mutex = Mutex.new
@received = ConditionVariable.new
@current_batch = []
Expand Down Expand Up @@ -152,6 +159,10 @@ def flush_queue!
log_event = Optimizely::EventFactory.create_log_event(@current_batch, @logger)
begin
@event_dispatcher.dispatch_event(log_event)
@notification_center&.send_notifications(
NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT],
log_event
)
rescue StandardError => e
@logger.log(Logger::ERROR, "Error dispatching event: #{log_event} #{e.message}.")
end
Expand Down
7 changes: 6 additions & 1 deletion lib/optimizely/event/forwarding_event_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,21 @@ module Optimizely
class ForwardingEventProcessor < EventProcessor
# ForwardingEventProcessor is a basic transformation stage for converting
# the event batch into a LogEvent to be dispatched.
def initialize(event_dispatcher, logger = nil)
def initialize(event_dispatcher, logger = nil, notification_center = nil)
@event_dispatcher = event_dispatcher
@logger = logger || NoOpLogger.new
@notification_center = notification_center
end

def process(user_event)
log_event = Optimizely::EventFactory.create_log_event(user_event, @logger)

begin
@event_dispatcher.dispatch_event(log_event)
@notification_center&.send_notifications(
NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT],
log_event
)
rescue StandardError => e
@logger.log(Logger::ERROR, "Error dispatching event: #{log_event} #{e.message}.")
end
Expand Down
5 changes: 5 additions & 0 deletions lib/optimizely/notification_center.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class NotificationCenter
# DEPRECATED: ACTIVATE notification type is deprecated since relase 3.1.0.
ACTIVATE: 'ACTIVATE: experiment, user_id, attributes, variation, event',
DECISION: 'DECISION: type, user_id, attributes, decision_info',
LOG_EVENT: 'LOG_EVENT: type, log_event',
OPTIMIZELY_CONFIG_UPDATE: 'optimizely_config_update',
TRACK: 'TRACK: event_key, user_id, attributes, event_tags, event'
}.freeze
Expand Down Expand Up @@ -137,6 +138,10 @@ def send_notifications(notification_type, *args)
end
end

def notification_count(notification_type)
@notifications.include?(notification_type) ? @notifications[notification_type].count : 0
end

private

def notification_type_valid?(notification_type)
Expand Down
48 changes: 43 additions & 5 deletions spec/event/batch_event_processor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,16 @@
@event_queue = SizedQueue.new(100)
@event_dispatcher = Optimizely::EventDispatcher.new
allow(@event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event))
@notification_center = Optimizely::NotificationCenter.new(spy_logger, error_handler)
allow(@notification_center).to receive(:send_notifications)

@event_processor = Optimizely::BatchEventProcessor.new(
event_queue: @event_queue,
event_dispatcher: @event_dispatcher,
batch_size: MAX_BATCH_SIZE,
flush_interval: MAX_DURATION_MS,
logger: spy_logger
logger: spy_logger,
notification_center: @notification_center
)
end

Expand Down Expand Up @@ -236,18 +239,19 @@
event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher)
expect(event_processor.flush_interval).to eq(30_000)
event_processor.stop!
event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 'test')
event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 'test', logger: spy_logger)
expect(event_processor.flush_interval).to eq(30_000)
event_processor.stop!
event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: [])
event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: [], logger: spy_logger)
expect(event_processor.flush_interval).to eq(30_000)
event_processor.stop!
event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 0)
event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 0, logger: spy_logger)
expect(event_processor.flush_interval).to eq(30_000)
event_processor.stop!
event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: -5)
event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: -5, logger: spy_logger)
expect(event_processor.flush_interval).to eq(30_000)
event_processor.stop!
expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Setting to default flush_interval: 30000 ms.').exactly(4).times
end

it 'should set flush interval when provided valid' do
Expand All @@ -258,4 +262,38 @@
expect(event_processor.flush_interval).to eq(2000.5)
event_processor.stop!
end

it 'should send log event notification when event is dispatched' do
conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil)
log_event = Optimizely::EventFactory.create_log_event(conversion_event, spy_logger)

@event_processor.process(conversion_event)
sleep 1.5

expect(@notification_center).to have_received(:send_notifications).with(
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT],
log_event
).once

expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once
end

it 'should log an error when dispatch event raises timeout exception' do
conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil)
log_event = Optimizely::EventFactory.create_log_event(conversion_event, spy_logger)
allow(Optimizely::EventFactory).to receive(:create_log_event).and_return(log_event)

timeout_error = Timeout::Error.new
allow(@event_dispatcher).to receive(:dispatch_event).and_raise(timeout_error)

@event_processor.process(conversion_event)
sleep 1.5

expect(@notification_center).not_to have_received(:send_notifications)

expect(spy_logger).to have_received(:log).once.with(
Logger::ERROR,
"Error dispatching event: #{log_event} Timeout::Error."
)
end
end
18 changes: 15 additions & 3 deletions spec/event/forwarding_event_processor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,31 +69,43 @@
end

describe '.process' do
it 'should dispatch log event when valid event is provided' do
it 'should dispatch and send log event when valid event is provided' do
notification_center = Optimizely::NotificationCenter.new(spy_logger, error_handler)
allow(notification_center).to receive(:send_notifications)
forwarding_event_processor = Optimizely::ForwardingEventProcessor.new(
@event_dispatcher, spy_logger
@event_dispatcher, spy_logger, notification_center
)

forwarding_event_processor.process(@conversion_event)

expect(notification_center).to have_received(:send_notifications).with(
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT],
Optimizely::Event.new(:post, log_url, @expected_conversion_params, post_headers)
).once

expect(@event_dispatcher).to have_received(:dispatch_event).with(
Optimizely::Event.new(:post, log_url, @expected_conversion_params, post_headers)
).once
end

it 'should log an error when dispatch event raises timeout exception' do
notification_center = Optimizely::NotificationCenter.new(spy_logger, error_handler)
allow(notification_center).to receive(:send_notifications)

log_event = Optimizely::Event.new(:post, log_url, @expected_conversion_params, post_headers)
allow(Optimizely::EventFactory).to receive(:create_log_event).and_return(log_event)

timeout_error = Timeout::Error.new
allow(@event_dispatcher).to receive(:dispatch_event).and_raise(timeout_error)

forwarding_event_processor = Optimizely::ForwardingEventProcessor.new(
@event_dispatcher, spy_logger
@event_dispatcher, spy_logger, notification_center
)

forwarding_event_processor.process(@conversion_event)

expect(notification_center).not_to have_received(:send_notifications)

expect(spy_logger).to have_received(:log).once.with(
Logger::ERROR,
"Error dispatching event: #{log_event} Timeout::Error."
Expand Down
31 changes: 31 additions & 0 deletions spec/notification_center_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,37 @@ def deliver_three; end
end
end

describe '.notification_count' do
it 'should return count of added notification types' do
notification_center.add_notification_listener(
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:ACTIVATE],
@callback_reference
)

notification_center.add_notification_listener(
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:ACTIVATE],
method(:test)
)

notification_center.add_notification_listener(
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:TRACK],
method(:test)
)

expect(
notification_center.notification_count(Optimizely::NotificationCenter::NOTIFICATION_TYPES[:ACTIVATE])
).to eq(2)

expect(
notification_center.notification_count(Optimizely::NotificationCenter::NOTIFICATION_TYPES[:TRACK])
).to eq(1)

expect(
notification_center.notification_count(Optimizely::NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT])
).to eq(0)
end
end

describe '@error_handler' do
let(:raise_error_handler) { Optimizely::RaiseErrorHandler.new }
let(:notification_center) { Optimizely::NotificationCenter.new(spy_logger, raise_error_handler) }
Expand Down
46 changes: 45 additions & 1 deletion spec/project_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,11 @@ class InvalidErrorHandler; end
end

it 'should log and send activate notification when an impression event is dispatched' do
def callback(_args); end
project_instance.notification_center.add_notification_listener(
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:ACTIVATE],
method(:callback)
)
variation_to_return = project_instance.config_manager.config.get_variation_from_id('test_experiment', '111128')
allow(project_instance.decision_service.bucketer).to receive(:bucket).and_return(variation_to_return)
allow(project_instance.event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event))
Expand All @@ -600,6 +605,11 @@ class InvalidErrorHandler; end
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:DECISION], any_args
).ordered

# Log event
expect(project_instance.notification_center).to receive(:send_notifications).with(
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], any_args
).ordered

# Activate listener
expect(project_instance.notification_center).to receive(:send_notifications).with(
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:ACTIVATE],
Expand Down Expand Up @@ -886,13 +896,26 @@ class InvalidErrorHandler; end
params = @expected_track_event_params
params[:visitors][0][:snapshots][0][:events][0].merge!(revenue: 42,
tags: {'revenue' => 42})

def callback(_args); end
project_instance.notification_center.add_notification_listener(
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:TRACK],
method(:callback)
)
allow(project_instance.event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event))
conversion_event = Optimizely::Event.new(:post, conversion_log_url, params, post_headers)

expect(project_instance.notification_center).to receive(:send_notifications)
.with(
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], any_args
).ordered

expect(project_instance.notification_center).to receive(:send_notifications)
.with(
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:TRACK],
'test_event', 'test_user', nil, {'revenue' => 42}, conversion_event
).once
).ordered

project_instance.track('test_event', 'test_user', nil, 'revenue' => 42)
expect(project_instance.event_dispatcher).to have_received(:dispatch_event).with(Optimizely::Event.new(:post, conversion_log_url, params, post_headers)).once
end
Expand Down Expand Up @@ -1463,6 +1486,12 @@ class InvalidErrorHandler; end
end

it 'should return true, send activate notification and an impression if the user is bucketed into a feature experiment' do
def callback(_args); end
project_instance.notification_center.add_notification_listener(
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:ACTIVATE],
method(:callback)
)

allow(project_instance.event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event))
experiment_to_return = config_body['experiments'][3]
variation_to_return = experiment_to_return['variations'][0]
Expand All @@ -1472,6 +1501,11 @@ class InvalidErrorHandler; end
Optimizely::DecisionService::DECISION_SOURCES['FEATURE_TEST']
)

expect(project_instance.notification_center).to receive(:send_notifications)
.with(
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], any_args
).ordered

expect(project_instance.notification_center).to receive(:send_notifications)
.with(
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:ACTIVATE],
Expand Down Expand Up @@ -1709,6 +1743,12 @@ class InvalidErrorHandler; end

describe '.decision listener' do
it 'should return enabled features and call decision listener for all features' do
def callback(_args); end
project_instance.notification_center.add_notification_listener(
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:ACTIVATE],
method(:callback)
)

allow(project_instance.event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event))

enabled_features = %w[boolean_feature integer_single_variable_feature]
Expand Down Expand Up @@ -1739,6 +1779,10 @@ class InvalidErrorHandler; end
nil
)

expect(project_instance.notification_center).to receive(:send_notifications).twice.with(
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], any_args
)

expect(project_instance.notification_center).to receive(:send_notifications).twice.with(
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:ACTIVATE], any_args
)
Expand Down