-
Notifications
You must be signed in to change notification settings - Fork 27
feat(eventProcessor): Add EventProcessor and BatchEventProcessor #191
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
mnoman09
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, Just added few comments.
|
|
||
| DEFAULT_BATCH_SIZE = 10 | ||
| DEFAULT_FLUSH_INTERVAL = 30_000 | ||
| DEFAULT_TIMEOUT_INTERVAL = (5000 * 60) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this variable as it is of no use.
| event_dispatcher:, | ||
| batch_size:, | ||
| flush_interval:, | ||
| timeout_interval:, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this as you are just passing it and it is not getting used anywhere.
| @received.wait(@mutex, 0.05) | ||
| end | ||
|
|
||
| item = @event_queue.pop |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be in mutex.synchronize block?
| def flush_queue! | ||
| return if @current_batch.empty? | ||
|
|
||
| log_event = Optimizely::EventFactory.create_log_event(@current_batch, @logger) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think,
- We should put current_batch in temp_queue.
- Clear current_batch
- Pass temp_queue in create_log_event
All this should happen inside synchornized block.
See Csharp for reference.
@msohailhussain what do you suggest?
| # Reset the deadline if starting a new batch. | ||
| @flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval if @current_batch.empty? | ||
|
|
||
| @logger.log(Logger::DEBUG, "Adding user event: #{user_event.event['key']} to btach.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo:
| @logger.log(Logger::DEBUG, "Adding user event: #{user_event.event['key']} to btach.") | |
| @logger.log(Logger::DEBUG, "Adding user event: #{user_event.event['key']} to batch.") |
| @flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval if @current_batch.empty? | ||
|
|
||
| @logger.log(Logger::DEBUG, "Adding user event: #{user_event.event['key']} to btach.") | ||
| @current_batch << user_event |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should also add mutex lock here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We add mutex lock, when we add event into EventQueue!
| class EventProcessor | ||
| # EventProcessor interface is used to provide an intermediary processing stage within | ||
| # event production. It's assumed that the EventProcessor dispatches events via a provided | ||
| # EventHandler. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: change it to EventDispatcher.
| allow(@event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event)) | ||
| end | ||
|
|
||
| it 'return empty event queue event is processed' do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also make sure if the dispatchEvent is getting called and logevent is correct?
c7c2552 to
28ea204
Compare
msohailhussain
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please address comments
.rubocop_todo.yml
Outdated
| - 'lib/optimizely/event/entity/snapshot_event.rb' | ||
| - 'lib/optimizely/event/entity/conversion_event.rb' | ||
| - 'lib/optimizely/event/entity/event_context.rb' | ||
| - 'lib/optimizely/event/batch_event_processor.rb' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
alphabetize order
| event_dispatcher:, | ||
| batch_size:, | ||
| flush_interval:, | ||
| start_by_default: false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't think so it's really needed.
| @event_dispatcher = event_dispatcher | ||
| @batch_size = batch_size || DEFAULT_BATCH_SIZE | ||
| @flush_interval = flush_interval || DEFAULT_BATCH_INTERVAL | ||
| @logger = logger |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't it assign NoOpLogger if its nil
| @mutex = Mutex.new | ||
| @received = ConditionVariable.new | ||
| @current_batch = [] | ||
| @disposed = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
disposed is it also implemented in http_project_config_manager?
| return | ||
| end | ||
|
|
||
| if @event_queue.include? user_event |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what this condition mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated for DEFAULT_QUEUE_CAPACITY
| @current_batch = [] | ||
| end | ||
|
|
||
| # Reset the deadline if starting a new batch. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this condition can't b in the if
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inline condition; Rubocop's auto-fix
| attr_reader :event_queue | ||
|
|
||
| DEFAULT_BATCH_SIZE = 10 | ||
| DEFAULT_BATCH_INTERVAL = 30_000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's with the _?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RuboCop's change, following ruby's style guide.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a comment about what the unit of this number is.
| allow(Optimizely::EventFactory).to receive(:create_log_event).with(any_args) | ||
| expected_batch = [] | ||
| counter = 0 | ||
| until counter >= 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we process 11 and make sure the batch only has 10?
mikeproeng37
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly looks good, I just have a few comments
| end | ||
|
|
||
| if item.is_a? Optimizely::UserEvent | ||
| @logger.log(Logger::DEBUG, "Received add to batch signal. with event: #{item.event['key']}.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not log here as it would be extremely noisy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is not noisy. because when any impression conversion event occurs, then it will be logged, which is OK. I may remove, if you still think it's noisy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think it is noisy. Let's leave it out unless someone actually requests for this level of verbosity.
| def run | ||
| loop do | ||
| if Helpers::DateTimeUtils.create_timestamp > @flushing_interval_deadline | ||
| @logger.log( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think @msohailhussain mentioned that during testing this was very verbose as it was logged many times. If he is removing from C# SDK it should be removed here as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's not the place, it's under this condition, am going to remove.
if item.nil?
@logger.log(Logger::DEBUG, 'Empty item, sleeping for 50ms.')
sleep(0.05)
next
end
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good. Just add conditions that if batch_size is None or batch_size <= 0 than use default batch size. same goes for flush_time_interval
mikeproeng37
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
aliabbasrizvi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly small changes
| @@ -0,0 +1,200 @@ | |||
| # frozen_string_literal: true | |||
|
|
|||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. add #
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rubocop's rule Layout/EmptyLineAfterMagicComment
| class BatchEventProcessor < EventProcessor | ||
| # BatchEventProcessor is a batched implementation of the Interface EventProcessor. | ||
| # Events passed to the BatchEventProcessor are immediately added to a EventQueue. | ||
| # The BatchEventProcessor maintains a single consumer thread that pulls events off of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sentence seems incomplete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
| attr_reader :event_queue | ||
|
|
||
| DEFAULT_BATCH_SIZE = 10 | ||
| DEFAULT_BATCH_INTERVAL = 30_000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a comment about what the unit of this number is.
| @batch_size = if (batch_size.is_a? Integer) && positive_number?(batch_size) | ||
| batch_size | ||
| else | ||
| DEFAULT_BATCH_SIZE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A message should be logger here that batch_size is being set to default value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| end | ||
|
|
||
| # Reset the deadline if starting a new batch. | ||
| @flushing_interval_deadline = (Helpers::DateTimeUtils.create_timestamp + @flush_interval) if @current_batch.empty? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, create_timestamp creates time in local timezone: https://github.com/optimizely/ruby-sdk/blob/master/lib/optimizely/helpers/date_time_utils.rb#L23-L26
Shouldn't that time be in UTC?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| @logger.log(Logger::DEBUG, 'Revisions mismatched: Flushing current batch.') | ||
| return true | ||
| end | ||
| # Projects should match |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. New line above this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
aliabbasrizvi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Conditionally approving. Please address feedback before merging.
msohailhussain
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
Summary
batch_event_processor.rbfrom rubocop's Metrics/ParameterLists rule.Buffering events within a queue before dispatching is an optimization that should prevent SDK implementations from exhausting resources while increasing throughput. This implementation relies on a
EventQueueto buffer events received from one-to-many producers. A single consumer thread continuously polls from this queue to build a batch before emitting the batchedLogEvent.Test Plan