Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ The `sdk_key` is used to compose the outbound HTTP request to the default datafi
You can provide an initial datafile to bootstrap the `DataFileProjectConfig` so that it can be used immediately. The initial datafile also serves as a fallback datafile if HTTP connection cannot be established. The initial datafile will be discarded after the first successful datafile poll.

**polling_interval**
The polling interval is used to specify a fixed delay between consecutive HTTP requests for the datafile. Valid duration is between 1 and 2592000 seconds. Default is 5 minutes.
The polling interval is used to specify a fixed delay between consecutive HTTP requests for the datafile. Valid duration is greater than 0 and less than 2592000 seconds. Default is 5 minutes.

**url_template**
A string with placeholder `{sdk_key}` can be provided so that this template along with the provided `sdk_key` is used to form the target URL.
Expand Down
4 changes: 2 additions & 2 deletions lib/optimizely/config_manager/http_project_config_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ def polling_interval(polling_interval)
return
end

unless polling_interval.is_a? Integer
unless polling_interval.is_a? Numeric
@logger.log(
Logger::ERROR,
"Polling interval '#{polling_interval}' has invalid type. Defaulting to #{Helpers::Constants::CONFIG_MANAGER['DEFAULT_UPDATE_INTERVAL']} seconds."
Expand All @@ -219,7 +219,7 @@ def polling_interval(polling_interval)
return
end

unless polling_interval.between?(Helpers::Constants::CONFIG_MANAGER['MIN_SECONDS_LIMIT'], Helpers::Constants::CONFIG_MANAGER['MAX_SECONDS_LIMIT'])
unless polling_interval.positive? && polling_interval <= Helpers::Constants::CONFIG_MANAGER['MAX_SECONDS_LIMIT']
@logger.log(
Logger::DEBUG,
"Polling interval '#{polling_interval}' has invalid range. Defaulting to #{Helpers::Constants::CONFIG_MANAGER['DEFAULT_UPDATE_INTERVAL']} seconds."
Expand Down
21 changes: 18 additions & 3 deletions lib/optimizely/event/batch_event_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class BatchEventProcessor < EventProcessor
# the BlockingQueue and buffers them for either a configured batch size or for a
# maximum duration before the resulting LogEvent is sent to the NotificationCenter.

attr_reader :event_queue, :current_batch, :started, :batch_size, :flush_interval
attr_reader :event_queue, :event_dispatcher, :current_batch, :started, :batch_size, :flush_interval

DEFAULT_BATCH_SIZE = 10
DEFAULT_BATCH_INTERVAL = 30_000 # interval in milliseconds
Expand All @@ -36,7 +36,7 @@ class BatchEventProcessor < EventProcessor

def initialize(
event_queue: SizedQueue.new(DEFAULT_QUEUE_CAPACITY),
event_dispatcher:,
event_dispatcher: Optimizely::EventDispatcher.new,
batch_size: DEFAULT_BATCH_SIZE,
flush_interval: DEFAULT_BATCH_INTERVAL,
logger: NoOpLogger.new,
Expand All @@ -51,7 +51,12 @@ def initialize(
@logger.log(Logger::DEBUG, "Setting to default batch_size: #{DEFAULT_BATCH_SIZE}.")
DEFAULT_BATCH_SIZE
end
@flush_interval = positive_number?(flush_interval) ? flush_interval : DEFAULT_BATCH_INTERVAL
@flush_interval = if positive_number?(flush_interval)
flush_interval
else
@logger.log(Logger::DEBUG, "Setting to default flush_interval: #{DEFAULT_BATCH_INTERVAL} ms.")
DEFAULT_BATCH_INTERVAL
end
@notification_center = notification_center
@mutex = Mutex.new
@received = ConditionVariable.new
Expand Down Expand Up @@ -146,6 +151,16 @@ def run

add_to_batch(item) if item.is_a? Optimizely::UserEvent
end
rescue SignalException
@logger.log(Logger::INFO, 'Interrupted while processing buffer.')
rescue Exception => e
@logger.log(Logger::ERROR, "Uncaught exception processing buffer. #{e.message}")
ensure
@logger.log(
Logger::INFO,
'Exiting processing loop. Attempting to flush pending events.'
)
flush_queue!
end

def flush_queue!
Expand Down
43 changes: 37 additions & 6 deletions spec/event/batch_event_processor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,10 @@
expected_batch.pop # Removes 11th element
expect(@event_processor.current_batch.size).to be 10

expect(Optimizely::EventFactory).to have_received(:create_log_event).with(expected_batch, spy_logger).once
expect(Optimizely::EventFactory).to have_received(:create_log_event).with(expected_batch, spy_logger).twice
expect(@event_dispatcher).to have_received(:dispatch_event).with(
Optimizely::EventFactory.create_log_event(expected_batch, spy_logger)
).once
).twice
expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Flushing on max batch size!').once
end

Expand Down Expand Up @@ -239,18 +239,19 @@
event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher)
expect(event_processor.flush_interval).to eq(30_000)
event_processor.stop!
event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 'test')
event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 'test', logger: spy_logger)
expect(event_processor.flush_interval).to eq(30_000)
event_processor.stop!
event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: [])
event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: [], logger: spy_logger)
expect(event_processor.flush_interval).to eq(30_000)
event_processor.stop!
event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 0)
event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 0, logger: spy_logger)
expect(event_processor.flush_interval).to eq(30_000)
event_processor.stop!
event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: -5)
event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: -5, logger: spy_logger)
expect(event_processor.flush_interval).to eq(30_000)
event_processor.stop!
expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Setting to default flush_interval: 30000 ms.').exactly(4).times
end

it 'should set flush interval when provided valid' do
Expand Down Expand Up @@ -295,4 +296,34 @@
"Error dispatching event: #{log_event} Timeout::Error."
)
end

it 'should flush pending events when stop is called' do
allow(Optimizely::EventFactory).to receive(:create_log_event).with(any_args)
expected_batch = []
counter = 0
until counter >= 10
event['key'] = event['key'] + counter.to_s
user_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil)
expected_batch << user_event
@event_processor.process(user_event)
counter += 1
end

sleep 0.25

# max batch size not occurred and batch is not dispatched.
expect(@event_processor.current_batch.size).to be < 10
expect(@event_dispatcher).not_to have_received(:dispatch_event)

# Stop should flush the queue!
@event_processor.stop!
sleep 0.75

expect(spy_logger).to have_received(:log).with(Logger::INFO, 'Exiting processing loop. Attempting to flush pending events.')
expect(@event_dispatcher).to have_received(:dispatch_event).with(
Optimizely::EventFactory.create_log_event(expected_batch, spy_logger)
)

expect(spy_logger).not_to have_received(:log).with(Logger::DEBUG, 'Flushing on max batch size!')
end
end