Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,6 @@ Style/RescueStandardError:

Style/SignalException:
Enabled: false

Lint/RescueException:
Enabled: false
9 changes: 5 additions & 4 deletions .rubocop_todo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ Lint/LiteralAsCondition:
Metrics/ParameterLists:
Max: 6
Exclude:
- 'lib/optimizely/config_manager/http_project_config_manager.rb'
- 'lib/optimizely.rb'
- 'lib/optimizely/optimizely_factory.rb'
- 'lib/optimizely/event/entity/impression_event.rb'
- 'lib/optimizely/event/entity/snapshot_event.rb'
- 'lib/optimizely/config_manager/http_project_config_manager.rb'
- 'lib/optimizely/event/batch_event_processor.rb'
- 'lib/optimizely/event/entity/conversion_event.rb'
- 'lib/optimizely/event/entity/event_context.rb'
- 'lib/optimizely/event/entity/impression_event.rb'
- 'lib/optimizely/event/entity/snapshot_event.rb'
- 'lib/optimizely/optimizely_factory.rb'

Naming/AccessorMethodName:
Exclude:
Expand Down
204 changes: 204 additions & 0 deletions lib/optimizely/event/batch_event_processor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
# frozen_string_literal: true

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. add #

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rubocop's rule Layout/EmptyLineAfterMagicComment

#
# Copyright 2019, Optimizely and contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
require_relative 'event_processor'
require_relative '../helpers/validator'
module Optimizely
class BatchEventProcessor < EventProcessor
# BatchEventProcessor is a batched implementation of the Interface EventProcessor.
# Events passed to the BatchEventProcessor are immediately added to a EventQueue.
# The BatchEventProcessor maintains a single consumer thread that pulls events off of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sentence seems incomplete.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

# the BlockingQueue and buffers them for either a configured batch size or for a
# maximum duration before the resulting LogEvent is sent to the NotificationCenter.

attr_reader :event_queue, :current_batch, :batch_size, :flush_interval

DEFAULT_BATCH_SIZE = 10
DEFAULT_BATCH_INTERVAL = 30_000 # interval in milliseconds
DEFAULT_QUEUE_CAPACITY = 1000

FLUSH_SIGNAL = 'FLUSH_SIGNAL'
SHUTDOWN_SIGNAL = 'SHUTDOWN_SIGNAL'

def initialize(
event_queue: SizedQueue.new(DEFAULT_QUEUE_CAPACITY),
event_dispatcher:,
batch_size: DEFAULT_BATCH_SIZE,
flush_interval: DEFAULT_BATCH_INTERVAL,
logger: NoOpLogger.new
)
@event_queue = event_queue
@logger = logger
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't it assign NoOpLogger if its nil

@event_dispatcher = event_dispatcher
@batch_size = if (batch_size.is_a? Integer) && positive_number?(batch_size)
batch_size
else
@logger.log(Logger::DEBUG, "Setting to default batch_size: #{DEFAULT_BATCH_SIZE}.")
DEFAULT_BATCH_SIZE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A message should be logger here that batch_size is being set to default value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

end
@flush_interval = positive_number?(flush_interval) ? flush_interval : DEFAULT_BATCH_INTERVAL
@mutex = Mutex.new
@received = ConditionVariable.new
@current_batch = []
@is_started = false
start!
end

def start!
if @is_started == true
@logger.log(Logger::WARN, 'Service already started.')
return
end
@flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval
@thread = Thread.new { run }
@is_started = true
end

def flush
@mutex.synchronize do
@event_queue << FLUSH_SIGNAL
@received.signal
end
end

def process(user_event)
@logger.log(Logger::DEBUG, "Received userEvent: #{user_event}")

unless @thread.alive?
@logger.log(Logger::WARN, 'Executor shutdown, not accepting tasks.')
return
end

@mutex.synchronize do
begin
@event_queue << user_event
@received.signal
rescue Exception
@logger.log(Logger::WARN, 'Payload not accepted by the queue.')
return
end
end
end

def stop!
return unless @thread.alive?

@mutex.synchronize do
@event_queue << SHUTDOWN_SIGNAL
@received.signal
end

@is_started = false
@logger.log(Logger::WARN, 'Stopping scheduler.')
@thread.exit
end

private

def run
loop do
if Helpers::DateTimeUtils.create_timestamp > @flushing_interval_deadline
@logger.log(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @msohailhussain mentioned that during testing this was very verbose as it was logged many times. If he is removing from C# SDK it should be removed here as well.

Copy link
Contributor

@msohailhussain msohailhussain Aug 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not the place, it's under this condition, am going to remove.

        if item.nil?
          @logger.log(Logger::DEBUG, 'Empty item, sleeping for 50ms.')
          sleep(0.05)
          next
        end

Logger::DEBUG,
'Deadline exceeded flushing current batch.'
)
flush_queue!
end

item = nil

@mutex.synchronize do
@received.wait(@mutex, 0.05)
item = @event_queue.pop if @event_queue.length.positive?
end

if item.nil?
sleep(0.05)
next
end

if item == SHUTDOWN_SIGNAL
@logger.log(Logger::INFO, 'Received shutdown signal.')
break
end

if item == FLUSH_SIGNAL
@logger.log(Logger::DEBUG, 'Received flush signal.')
flush_queue!
next
end

add_to_batch(item) if item.is_a? Optimizely::UserEvent
end
end

def flush_queue!
return if @current_batch.empty?

log_event = Optimizely::EventFactory.create_log_event(@current_batch, @logger)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think,

  1. We should put current_batch in temp_queue.
  2. Clear current_batch
  3. Pass temp_queue in create_log_event
    All this should happen inside synchornized block.
    See Csharp for reference.

@msohailhussain what do you suggest?

begin
@event_dispatcher.dispatch_event(log_event)
rescue StandardError => e
@logger.log(Logger::ERROR, "Error dispatching event: #{log_event} #{e.message}")
end
@current_batch = []
end

def add_to_batch(user_event)
if should_split?(user_event)
flush_queue!
@current_batch = []
end

# Reset the deadline if starting a new batch.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this condition can't b in the if

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inline condition; Rubocop's auto-fix

@flushing_interval_deadline = (Helpers::DateTimeUtils.create_timestamp + @flush_interval) if @current_batch.empty?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, create_timestamp creates time in local timezone: https://github.com/optimizely/ruby-sdk/blob/master/lib/optimizely/helpers/date_time_utils.rb#L23-L26

Shouldn't that time be in UTC?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


@logger.log(Logger::DEBUG, "Adding user event: #{user_event.event['key']} to batch.")
@current_batch << user_event
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also add mutex lock here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We add mutex lock, when we add event into EventQueue!

return unless @current_batch.length >= @batch_size

@logger.log(Logger::DEBUG, 'Flushing on max batch size!')
flush_queue!
end

def should_split?(user_event)
return false if @current_batch.empty?

current_context = @current_batch.last.event_context
new_context = user_event.event_context

# Revisions should match
unless current_context[:revision] == new_context[:revision]
@logger.log(Logger::DEBUG, 'Revisions mismatched: Flushing current batch.')
return true
end

# Projects should match
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. New line above this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

unless current_context[:project_id] == new_context[:project_id]
@logger.log(Logger::DEBUG, 'Project Ids mismatched: Flushing current batch.')
return true
end
false
end

def positive_number?(value)
# Returns true if the given value is positive finite number.
# false otherwise.
Helpers::Validator.finite_number?(value) && value.positive?
end
end
end
2 changes: 1 addition & 1 deletion lib/optimizely/event/event_factory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def build_attribute_list(user_attributes, project_config)
)
end

return unless Helpers::Validator.boolean? project_config.bot_filtering
return visitor_attributes unless Helpers::Validator.boolean? project_config.bot_filtering

# Append Bot Filtering Attribute
visitor_attributes.push(
Expand Down
25 changes: 25 additions & 0 deletions lib/optimizely/event/event_processor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# frozen_string_literal: true

#
# Copyright 2019, Optimizely and contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Optimizely
class EventProcessor
# EventProcessor interface is used to provide an intermediary processing stage within
# event production. It's assumed that the EventProcessor dispatches events via a provided
# EventDispatcher.
def process(user_event); end
end
end
6 changes: 3 additions & 3 deletions lib/optimizely/helpers/date_time_utils.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ module DateTimeUtils
module_function

def create_timestamp
# Returns Integer Current timestamp

(Time.now.to_f * 1000).to_i
# Returns Integer current UTC timestamp
utc = Time.now.getutc
(utc.to_f * 1000).to_i
end
end
end
Expand Down
Loading