diff --git a/README.md b/README.md index 31a01e89..82e93d23 100644 --- a/README.md +++ b/README.md @@ -98,7 +98,7 @@ The `sdk_key` is used to compose the outbound HTTP request to the default datafi You can provide an initial datafile to bootstrap the `DataFileProjectConfig` so that it can be used immediately. The initial datafile also serves as a fallback datafile if HTTP connection cannot be established. The initial datafile will be discarded after the first successful datafile poll. **polling_interval** -The polling interval is used to specify a fixed delay between consecutive HTTP requests for the datafile. Valid duration is between 1 and 2592000 seconds. Default is 5 minutes. +The polling interval is used to specify a fixed delay between consecutive HTTP requests for the datafile. Valid duration is greater than 0 and less than 2592000 seconds. Default is 5 minutes. **url_template** A string with placeholder `{sdk_key}` can be provided so that this template along with the provided `sdk_key` is used to form the target URL. diff --git a/lib/optimizely/config_manager/http_project_config_manager.rb b/lib/optimizely/config_manager/http_project_config_manager.rb index da92d0e4..fa37711a 100644 --- a/lib/optimizely/config_manager/http_project_config_manager.rb +++ b/lib/optimizely/config_manager/http_project_config_manager.rb @@ -210,7 +210,7 @@ def polling_interval(polling_interval) return end - unless polling_interval.is_a? Integer + unless polling_interval.is_a? Numeric @logger.log( Logger::ERROR, "Polling interval '#{polling_interval}' has invalid type. Defaulting to #{Helpers::Constants::CONFIG_MANAGER['DEFAULT_UPDATE_INTERVAL']} seconds." @@ -219,7 +219,7 @@ def polling_interval(polling_interval) return end - unless polling_interval.between?(Helpers::Constants::CONFIG_MANAGER['MIN_SECONDS_LIMIT'], Helpers::Constants::CONFIG_MANAGER['MAX_SECONDS_LIMIT']) + unless polling_interval.positive? && polling_interval <= Helpers::Constants::CONFIG_MANAGER['MAX_SECONDS_LIMIT'] @logger.log( Logger::DEBUG, "Polling interval '#{polling_interval}' has invalid range. Defaulting to #{Helpers::Constants::CONFIG_MANAGER['DEFAULT_UPDATE_INTERVAL']} seconds." diff --git a/lib/optimizely/event/batch_event_processor.rb b/lib/optimizely/event/batch_event_processor.rb index 70b4382c..62f43045 100644 --- a/lib/optimizely/event/batch_event_processor.rb +++ b/lib/optimizely/event/batch_event_processor.rb @@ -25,7 +25,7 @@ class BatchEventProcessor < EventProcessor # the BlockingQueue and buffers them for either a configured batch size or for a # maximum duration before the resulting LogEvent is sent to the NotificationCenter. - attr_reader :event_queue, :current_batch, :started, :batch_size, :flush_interval + attr_reader :event_queue, :event_dispatcher, :current_batch, :started, :batch_size, :flush_interval DEFAULT_BATCH_SIZE = 10 DEFAULT_BATCH_INTERVAL = 30_000 # interval in milliseconds @@ -36,7 +36,7 @@ class BatchEventProcessor < EventProcessor def initialize( event_queue: SizedQueue.new(DEFAULT_QUEUE_CAPACITY), - event_dispatcher:, + event_dispatcher: Optimizely::EventDispatcher.new, batch_size: DEFAULT_BATCH_SIZE, flush_interval: DEFAULT_BATCH_INTERVAL, logger: NoOpLogger.new, @@ -51,7 +51,12 @@ def initialize( @logger.log(Logger::DEBUG, "Setting to default batch_size: #{DEFAULT_BATCH_SIZE}.") DEFAULT_BATCH_SIZE end - @flush_interval = positive_number?(flush_interval) ? flush_interval : DEFAULT_BATCH_INTERVAL + @flush_interval = if positive_number?(flush_interval) + flush_interval + else + @logger.log(Logger::DEBUG, "Setting to default flush_interval: #{DEFAULT_BATCH_INTERVAL} ms.") + DEFAULT_BATCH_INTERVAL + end @notification_center = notification_center @mutex = Mutex.new @received = ConditionVariable.new @@ -146,6 +151,16 @@ def run add_to_batch(item) if item.is_a? Optimizely::UserEvent end + rescue SignalException + @logger.log(Logger::INFO, 'Interrupted while processing buffer.') + rescue Exception => e + @logger.log(Logger::ERROR, "Uncaught exception processing buffer. #{e.message}") + ensure + @logger.log( + Logger::INFO, + 'Exiting processing loop. Attempting to flush pending events.' + ) + flush_queue! end def flush_queue! diff --git a/spec/event/batch_event_processor_spec.rb b/spec/event/batch_event_processor_spec.rb index 86b46098..172d528a 100644 --- a/spec/event/batch_event_processor_spec.rb +++ b/spec/event/batch_event_processor_spec.rb @@ -110,10 +110,10 @@ expected_batch.pop # Removes 11th element expect(@event_processor.current_batch.size).to be 10 - expect(Optimizely::EventFactory).to have_received(:create_log_event).with(expected_batch, spy_logger).once + expect(Optimizely::EventFactory).to have_received(:create_log_event).with(expected_batch, spy_logger).twice expect(@event_dispatcher).to have_received(:dispatch_event).with( Optimizely::EventFactory.create_log_event(expected_batch, spy_logger) - ).once + ).twice expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Flushing on max batch size!').once end @@ -239,18 +239,19 @@ event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher) expect(event_processor.flush_interval).to eq(30_000) event_processor.stop! - event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 'test') + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 'test', logger: spy_logger) expect(event_processor.flush_interval).to eq(30_000) event_processor.stop! - event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: []) + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: [], logger: spy_logger) expect(event_processor.flush_interval).to eq(30_000) event_processor.stop! - event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 0) + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 0, logger: spy_logger) expect(event_processor.flush_interval).to eq(30_000) event_processor.stop! - event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: -5) + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: -5, logger: spy_logger) expect(event_processor.flush_interval).to eq(30_000) event_processor.stop! + expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Setting to default flush_interval: 30000 ms.').exactly(4).times end it 'should set flush interval when provided valid' do @@ -295,4 +296,34 @@ "Error dispatching event: #{log_event} Timeout::Error." ) end + + it 'should flush pending events when stop is called' do + allow(Optimizely::EventFactory).to receive(:create_log_event).with(any_args) + expected_batch = [] + counter = 0 + until counter >= 10 + event['key'] = event['key'] + counter.to_s + user_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + expected_batch << user_event + @event_processor.process(user_event) + counter += 1 + end + + sleep 0.25 + + # max batch size not occurred and batch is not dispatched. + expect(@event_processor.current_batch.size).to be < 10 + expect(@event_dispatcher).not_to have_received(:dispatch_event) + + # Stop should flush the queue! + @event_processor.stop! + sleep 0.75 + + expect(spy_logger).to have_received(:log).with(Logger::INFO, 'Exiting processing loop. Attempting to flush pending events.') + expect(@event_dispatcher).to have_received(:dispatch_event).with( + Optimizely::EventFactory.create_log_event(expected_batch, spy_logger) + ) + + expect(spy_logger).not_to have_received(:log).with(Logger::DEBUG, 'Flushing on max batch size!') + end end