diff --git a/.rubocop.yml b/.rubocop.yml index d923aba4..f9ae7181 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -44,3 +44,6 @@ Style/RescueStandardError: Style/SignalException: Enabled: false + +Lint/RescueException: + Enabled: false diff --git a/.rubocop_todo.yml b/.rubocop_todo.yml index baf926ef..53c17b56 100644 --- a/.rubocop_todo.yml +++ b/.rubocop_todo.yml @@ -21,13 +21,14 @@ Lint/LiteralAsCondition: Metrics/ParameterLists: Max: 6 Exclude: - - 'lib/optimizely/config_manager/http_project_config_manager.rb' - 'lib/optimizely.rb' - - 'lib/optimizely/optimizely_factory.rb' - - 'lib/optimizely/event/entity/impression_event.rb' - - 'lib/optimizely/event/entity/snapshot_event.rb' + - 'lib/optimizely/config_manager/http_project_config_manager.rb' + - 'lib/optimizely/event/batch_event_processor.rb' - 'lib/optimizely/event/entity/conversion_event.rb' - 'lib/optimizely/event/entity/event_context.rb' + - 'lib/optimizely/event/entity/impression_event.rb' + - 'lib/optimizely/event/entity/snapshot_event.rb' + - 'lib/optimizely/optimizely_factory.rb' Naming/AccessorMethodName: Exclude: diff --git a/lib/optimizely/event/batch_event_processor.rb b/lib/optimizely/event/batch_event_processor.rb new file mode 100644 index 00000000..eeb730d4 --- /dev/null +++ b/lib/optimizely/event/batch_event_processor.rb @@ -0,0 +1,204 @@ +# frozen_string_literal: true + +# +# Copyright 2019, Optimizely and contributors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +require_relative 'event_processor' +require_relative '../helpers/validator' +module Optimizely + class BatchEventProcessor < EventProcessor + # BatchEventProcessor is a batched implementation of the Interface EventProcessor. + # Events passed to the BatchEventProcessor are immediately added to a EventQueue. + # The BatchEventProcessor maintains a single consumer thread that pulls events off of + # the BlockingQueue and buffers them for either a configured batch size or for a + # maximum duration before the resulting LogEvent is sent to the NotificationCenter. + + attr_reader :event_queue, :current_batch, :batch_size, :flush_interval + + DEFAULT_BATCH_SIZE = 10 + DEFAULT_BATCH_INTERVAL = 30_000 # interval in milliseconds + DEFAULT_QUEUE_CAPACITY = 1000 + + FLUSH_SIGNAL = 'FLUSH_SIGNAL' + SHUTDOWN_SIGNAL = 'SHUTDOWN_SIGNAL' + + def initialize( + event_queue: SizedQueue.new(DEFAULT_QUEUE_CAPACITY), + event_dispatcher:, + batch_size: DEFAULT_BATCH_SIZE, + flush_interval: DEFAULT_BATCH_INTERVAL, + logger: NoOpLogger.new + ) + @event_queue = event_queue + @logger = logger + @event_dispatcher = event_dispatcher + @batch_size = if (batch_size.is_a? Integer) && positive_number?(batch_size) + batch_size + else + @logger.log(Logger::DEBUG, "Setting to default batch_size: #{DEFAULT_BATCH_SIZE}.") + DEFAULT_BATCH_SIZE + end + @flush_interval = positive_number?(flush_interval) ? flush_interval : DEFAULT_BATCH_INTERVAL + @mutex = Mutex.new + @received = ConditionVariable.new + @current_batch = [] + @is_started = false + start! + end + + def start! + if @is_started == true + @logger.log(Logger::WARN, 'Service already started.') + return + end + @flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval + @thread = Thread.new { run } + @is_started = true + end + + def flush + @mutex.synchronize do + @event_queue << FLUSH_SIGNAL + @received.signal + end + end + + def process(user_event) + @logger.log(Logger::DEBUG, "Received userEvent: #{user_event}") + + unless @thread.alive? + @logger.log(Logger::WARN, 'Executor shutdown, not accepting tasks.') + return + end + + @mutex.synchronize do + begin + @event_queue << user_event + @received.signal + rescue Exception + @logger.log(Logger::WARN, 'Payload not accepted by the queue.') + return + end + end + end + + def stop! + return unless @thread.alive? + + @mutex.synchronize do + @event_queue << SHUTDOWN_SIGNAL + @received.signal + end + + @is_started = false + @logger.log(Logger::WARN, 'Stopping scheduler.') + @thread.exit + end + + private + + def run + loop do + if Helpers::DateTimeUtils.create_timestamp > @flushing_interval_deadline + @logger.log( + Logger::DEBUG, + 'Deadline exceeded flushing current batch.' + ) + flush_queue! + end + + item = nil + + @mutex.synchronize do + @received.wait(@mutex, 0.05) + item = @event_queue.pop if @event_queue.length.positive? + end + + if item.nil? + sleep(0.05) + next + end + + if item == SHUTDOWN_SIGNAL + @logger.log(Logger::INFO, 'Received shutdown signal.') + break + end + + if item == FLUSH_SIGNAL + @logger.log(Logger::DEBUG, 'Received flush signal.') + flush_queue! + next + end + + add_to_batch(item) if item.is_a? Optimizely::UserEvent + end + end + + def flush_queue! + return if @current_batch.empty? + + log_event = Optimizely::EventFactory.create_log_event(@current_batch, @logger) + begin + @event_dispatcher.dispatch_event(log_event) + rescue StandardError => e + @logger.log(Logger::ERROR, "Error dispatching event: #{log_event} #{e.message}") + end + @current_batch = [] + end + + def add_to_batch(user_event) + if should_split?(user_event) + flush_queue! + @current_batch = [] + end + + # Reset the deadline if starting a new batch. + @flushing_interval_deadline = (Helpers::DateTimeUtils.create_timestamp + @flush_interval) if @current_batch.empty? + + @logger.log(Logger::DEBUG, "Adding user event: #{user_event.event['key']} to batch.") + @current_batch << user_event + return unless @current_batch.length >= @batch_size + + @logger.log(Logger::DEBUG, 'Flushing on max batch size!') + flush_queue! + end + + def should_split?(user_event) + return false if @current_batch.empty? + + current_context = @current_batch.last.event_context + new_context = user_event.event_context + + # Revisions should match + unless current_context[:revision] == new_context[:revision] + @logger.log(Logger::DEBUG, 'Revisions mismatched: Flushing current batch.') + return true + end + + # Projects should match + unless current_context[:project_id] == new_context[:project_id] + @logger.log(Logger::DEBUG, 'Project Ids mismatched: Flushing current batch.') + return true + end + false + end + + def positive_number?(value) + # Returns true if the given value is positive finite number. + # false otherwise. + Helpers::Validator.finite_number?(value) && value.positive? + end + end +end diff --git a/lib/optimizely/event/event_factory.rb b/lib/optimizely/event/event_factory.rb index 16f4d1f8..754117b5 100644 --- a/lib/optimizely/event/event_factory.rb +++ b/lib/optimizely/event/event_factory.rb @@ -87,7 +87,7 @@ def build_attribute_list(user_attributes, project_config) ) end - return unless Helpers::Validator.boolean? project_config.bot_filtering + return visitor_attributes unless Helpers::Validator.boolean? project_config.bot_filtering # Append Bot Filtering Attribute visitor_attributes.push( diff --git a/lib/optimizely/event/event_processor.rb b/lib/optimizely/event/event_processor.rb new file mode 100644 index 00000000..0ddfc648 --- /dev/null +++ b/lib/optimizely/event/event_processor.rb @@ -0,0 +1,25 @@ +# frozen_string_literal: true + +# +# Copyright 2019, Optimizely and contributors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +module Optimizely + class EventProcessor + # EventProcessor interface is used to provide an intermediary processing stage within + # event production. It's assumed that the EventProcessor dispatches events via a provided + # EventDispatcher. + def process(user_event); end + end +end diff --git a/lib/optimizely/helpers/date_time_utils.rb b/lib/optimizely/helpers/date_time_utils.rb index aa62639b..97fb987b 100644 --- a/lib/optimizely/helpers/date_time_utils.rb +++ b/lib/optimizely/helpers/date_time_utils.rb @@ -21,9 +21,9 @@ module DateTimeUtils module_function def create_timestamp - # Returns Integer Current timestamp - - (Time.now.to_f * 1000).to_i + # Returns Integer current UTC timestamp + utc = Time.now.getutc + (utc.to_f * 1000).to_i end end end diff --git a/spec/event/batch_event_processor_spec.rb b/spec/event/batch_event_processor_spec.rb new file mode 100644 index 00000000..4794e075 --- /dev/null +++ b/spec/event/batch_event_processor_spec.rb @@ -0,0 +1,262 @@ +# frozen_string_literal: true + +# +# Copyright 2019, Optimizely and contributors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +require 'spec_helper' +require 'optimizely/event/batch_event_processor' +require 'optimizely/event/user_event_factory' +require 'optimizely/exceptions' +require 'optimizely/event_dispatcher' +require 'optimizely/error_handler' +require 'optimizely/helpers/constants' +require 'optimizely/helpers/validator' +require 'optimizely/logger' +describe Optimizely::BatchEventProcessor do + WebMock.allow_net_connect! + let(:config_body_JSON) { OptimizelySpec::VALID_CONFIG_BODY_JSON } + let(:error_handler) { Optimizely::NoOpErrorHandler.new } + let(:spy_logger) { spy('logger') } + let(:project_config) { Optimizely::DatafileProjectConfig.new(config_body_JSON, spy_logger, error_handler) } + let(:event) { project_config.get_event_from_key('test_event') } + let(:log_url) { 'https://logx.optimizely.com/v1/events' } + let(:post_headers) { {'Content-Type' => 'application/json'} } + + MAX_BATCH_SIZE = 10 + MAX_DURATION_MS = 1000 + + before(:example) do + @event_queue = SizedQueue.new(100) + @event_dispatcher = Optimizely::EventDispatcher.new + allow(@event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event)) + + @event_processor = Optimizely::BatchEventProcessor.new( + event_queue: @event_queue, + event_dispatcher: @event_dispatcher, + batch_size: MAX_BATCH_SIZE, + flush_interval: MAX_DURATION_MS, + logger: spy_logger + ) + end + + after(:example) do + @event_processor.stop! + @event_queue.clear + end + + it 'should log waring when service is already started' do + @event_processor.start! + expect(spy_logger).to have_received(:log).with(Logger::WARN, 'Service already started.').once + end + + it 'return return empty event queue and dispatch log event when event is processed' do + conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + log_event = Optimizely::EventFactory.create_log_event(conversion_event, spy_logger) + + @event_processor.process(conversion_event) + sleep 1.5 + + expect(@event_processor.event_queue.length).to eq(0) + expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once + end + + it 'should flush the current batch when deadline exceeded' do + user_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + logger = spy('logger') + event_processor = Optimizely::BatchEventProcessor.new( + event_queue: @event_queue, + event_dispatcher: @event_dispatcher, + batch_size: MAX_BATCH_SIZE, + flush_interval: MAX_DURATION_MS * 3, + logger: logger + ) + sleep 0.025 + event_processor.process(user_event) + + sleep 1 + expect(event_processor.event_queue.length).to eq(0) + expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Deadline exceeded flushing current batch.') + end + + it 'should flush the current batch when max batch size' do + allow(Optimizely::EventFactory).to receive(:create_log_event).with(any_args) + expected_batch = [] + counter = 0 + until counter >= 11 + event['key'] = event['key'] + counter.to_s + user_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + expected_batch << user_event + @event_processor.process(user_event) + counter += 1 + end + + sleep 1 + + expected_batch.pop # Removes 11th element + expect(@event_processor.current_batch.size).to be 10 + + expect(Optimizely::EventFactory).to have_received(:create_log_event).with(expected_batch, spy_logger).once + expect(@event_dispatcher).to have_received(:dispatch_event).with( + Optimizely::EventFactory.create_log_event(expected_batch, spy_logger) + ).once + expect(spy_logger).to have_received(:log).with(Logger::DEBUG, "Adding user event: #{event['key']} to batch.").exactly(10).times + expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Flushing on max batch size!').once + end + + it 'should dispatch the event when flush is called' do + conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + log_event = Optimizely::EventFactory.create_log_event(conversion_event, spy_logger) + + event_processor = Optimizely::BatchEventProcessor.new( + event_queue: @event_queue, + event_dispatcher: @event_dispatcher, + batch_size: MAX_BATCH_SIZE, + flush_interval: MAX_DURATION_MS / 2, + logger: spy_logger + ) + + event_processor.process(conversion_event) + event_processor.flush + sleep 1.5 + + event_processor.process(conversion_event) + event_processor.flush + sleep 1.5 + + expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).twice + + expect(event_processor.event_queue.length).to eq(0) + expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Received flush signal.').twice + end + + it 'should flush on mismatch revision' do + allow(project_config).to receive(:revision).and_return('1', '2') + user_event1 = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + user_event2 = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + log_event = Optimizely::EventFactory.create_log_event(user_event1, spy_logger) + + expect(user_event1.event_context[:revision]).to eq('1') + @event_processor.process(user_event1) + + expect(user_event2.event_context[:revision]).to eq('2') + @event_processor.process(user_event2) + + sleep 0.25 + + expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once + + expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Revisions mismatched: Flushing current batch.').once + expect(spy_logger).not_to have_received(:log).with(Logger::DEBUG, 'Deadline exceeded flushing current batch.') + end + + it 'should flush on mismatch project id' do + allow(project_config).to receive(:project_id).and_return('X', 'Y') + user_event1 = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + user_event2 = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + log_event = Optimizely::EventFactory.create_log_event(user_event1, spy_logger) + + expect(user_event1.event_context[:project_id]).to eq('X') + @event_processor.process(user_event1) + + expect(user_event2.event_context[:project_id]).to eq('Y') + @event_processor.process(user_event2) + + sleep 0.25 + + expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once + + expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Project Ids mismatched: Flushing current batch.').once + expect(spy_logger).not_to have_received(:log).with(Logger::DEBUG, 'Deadline exceeded flushing current batch.') + end + + it 'should process and halt event when start or stop are called' do + conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + log_event = Optimizely::EventFactory.create_log_event(conversion_event, spy_logger) + + @event_processor.process(conversion_event) + sleep 1.75 + expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once + + @event_processor.stop! + @event_processor.process(conversion_event) + expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once + @event_processor.start! + @event_processor.stop! + expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Deadline exceeded flushing current batch.').at_least(1).times + end + + it 'should not dispatch event when close is called during process' do + conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + + @event_processor.process(conversion_event) + @event_processor.stop! + expect(@event_dispatcher).not_to have_received(:dispatch_event) + end + + it 'should set default batch size when provided invalid' do + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher) + expect(event_processor.batch_size).to eq(10) + event_processor.stop! + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, batch_size: 'test', logger: spy_logger) + expect(event_processor.batch_size).to eq(10) + event_processor.stop! + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, batch_size: [], logger: spy_logger) + expect(event_processor.batch_size).to eq(10) + event_processor.stop! + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, batch_size: 0, logger: spy_logger) + expect(event_processor.batch_size).to eq(10) + event_processor.stop! + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, batch_size: -5, logger: spy_logger) + expect(event_processor.batch_size).to eq(10) + event_processor.stop! + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, batch_size: 5.5, logger: spy_logger) + expect(event_processor.batch_size).to eq(10) + event_processor.stop! + expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Setting to default batch_size: 10.').exactly(5).times + end + + it 'should set batch size when provided valid' do + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, batch_size: 5) + expect(event_processor.batch_size).to eq(5) + event_processor.stop! + end + + it 'should set default flush interval when provided invalid' do + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher) + expect(event_processor.flush_interval).to eq(30_000) + event_processor.stop! + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 'test') + expect(event_processor.flush_interval).to eq(30_000) + event_processor.stop! + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: []) + expect(event_processor.flush_interval).to eq(30_000) + event_processor.stop! + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 0) + expect(event_processor.flush_interval).to eq(30_000) + event_processor.stop! + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: -5) + expect(event_processor.flush_interval).to eq(30_000) + event_processor.stop! + end + + it 'should set flush interval when provided valid' do + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 2000) + expect(event_processor.flush_interval).to eq(2000) + event_processor.stop! + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 2000.5) + expect(event_processor.flush_interval).to eq(2000.5) + event_processor.stop! + end +end