44require "thread"
55require "time"
66
7+ #
8+ # Analytics event processing in the SDK involves several components. The purpose of this design is to
9+ # minimize overhead on the application threads that are generating analytics events.
10+ #
11+ # EventProcessor receives an analytics event from the SDK client, on an application thread. It places
12+ # the event in a bounded queue, the "inbox", and immediately returns.
13+ #
14+ # On a separate worker thread, EventDispatcher consumes events from the inbox. These are considered
15+ # "input events" because they may or may not actually be sent to LaunchDarkly; most flag evaluation
16+ # events are not sent, but are counted and the counters become part of a single summary event.
17+ # EventDispatcher updates those counters, creates "index" events for any users that have not been seen
18+ # recently, and places any events that will be sent to LaunchDarkly into the "outbox" queue.
19+ #
20+ # When it is time to flush events to LaunchDarkly, the contents of the outbox are handed off to
21+ # another worker thread which sends the HTTP request.
22+ #
23+
724module LaunchDarkly
825 MAX_FLUSH_WORKERS = 5
926 CURRENT_SCHEMA_VERSION = 3
@@ -68,53 +85,73 @@ class StopMessage < SynchronousMessage
6885 # @private
6986 class EventProcessor
7087 def initialize ( sdk_key , config , client = nil )
71- @queue = Queue . new
88+ @logger = config . logger
89+ @inbox = SizedQueue . new ( config . capacity )
7290 @flush_task = Concurrent ::TimerTask . new ( execution_interval : config . flush_interval ) do
73- @queue << FlushMessage . new
91+ post_to_inbox ( FlushMessage . new )
7492 end
7593 @flush_task . execute
7694 @users_flush_task = Concurrent ::TimerTask . new ( execution_interval : config . user_keys_flush_interval ) do
77- @queue << FlushUsersMessage . new
95+ post_to_inbox ( FlushUsersMessage . new )
7896 end
7997 @users_flush_task . execute
8098 @stopped = Concurrent ::AtomicBoolean . new ( false )
81-
82- EventDispatcher . new ( @queue , sdk_key , config , client )
99+ @inbox_full = Concurrent ::AtomicBoolean . new ( false )
100+
101+ EventDispatcher . new ( @inbox , sdk_key , config , client )
83102 end
84103
85104 def add_event ( event )
86105 event [ :creationDate ] = ( Time . now . to_f * 1000 ) . to_i
87- @queue << EventMessage . new ( event )
106+ post_to_inbox ( EventMessage . new ( event ) )
88107 end
89108
90109 def flush
91110 # flush is done asynchronously
92- @queue << FlushMessage . new
111+ post_to_inbox ( FlushMessage . new )
93112 end
94113
95114 def stop
96115 # final shutdown, which includes a final flush, is done synchronously
97116 if @stopped . make_true
98117 @flush_task . shutdown
99118 @users_flush_task . shutdown
100- @queue << FlushMessage . new
119+ # Note that here we are not calling post_to_inbox, because we *do* want to wait if the inbox
120+ # is full; an orderly shutdown can't happen unless these messages are received.
121+ @inbox << FlushMessage . new
101122 stop_msg = StopMessage . new
102- @queue << stop_msg
123+ @inbox << stop_msg
103124 stop_msg . wait_for_completion
104125 end
105126 end
106127
107128 # exposed only for testing
108129 def wait_until_inactive
109130 sync_msg = TestSyncMessage . new
110- @queue << sync_msg
131+ @inbox << sync_msg
111132 sync_msg . wait_for_completion
112133 end
134+
135+ private
136+
137+ def post_to_inbox ( message )
138+ begin
139+ @inbox . push ( message , non_block = true )
140+ rescue ThreadError
141+ # If the inbox is full, it means the EventDispatcher thread is seriously backed up with not-yet-processed
142+ # events. This is unlikely, but if it happens, it means the application is probably doing a ton of flag
143+ # evaluations across many threads-- so if we wait for a space in the inbox, we risk a very serious slowdown
144+ # of the app. To avoid that, we'll just drop the event. The log warning about this will only be shown once.
145+ if @inbox_full . make_true
146+ @logger . warn { "[LDClient] Events are being produced faster than they can be processed; some events will be dropped" }
147+ end
148+ end
149+ end
113150 end
114151
115152 # @private
116153 class EventDispatcher
117- def initialize ( queue , sdk_key , config , client )
154+ def initialize ( inbox , sdk_key , config , client )
118155 @sdk_key = sdk_key
119156 @config = config
120157
@@ -129,10 +166,10 @@ def initialize(queue, sdk_key, config, client)
129166 @disabled = Concurrent ::AtomicBoolean . new ( false )
130167 @last_known_past_time = Concurrent ::AtomicReference . new ( 0 )
131168
132- buffer = EventBuffer . new ( config . capacity , config . logger )
169+ outbox = EventBuffer . new ( config . capacity , config . logger )
133170 flush_workers = NonBlockingThreadPool . new ( MAX_FLUSH_WORKERS )
134171
135- Thread . new { main_loop ( queue , buffer , flush_workers ) }
172+ Thread . new { main_loop ( inbox , outbox , flush_workers ) }
136173 end
137174
138175 private
@@ -141,16 +178,16 @@ def now_millis()
141178 ( Time . now . to_f * 1000 ) . to_i
142179 end
143180
144- def main_loop ( queue , buffer , flush_workers )
181+ def main_loop ( inbox , outbox , flush_workers )
145182 running = true
146183 while running do
147184 begin
148- message = queue . pop
185+ message = inbox . pop
149186 case message
150187 when EventMessage
151- dispatch_event ( message . event , buffer )
188+ dispatch_event ( message . event , outbox )
152189 when FlushMessage
153- trigger_flush ( buffer , flush_workers )
190+ trigger_flush ( outbox , flush_workers )
154191 when FlushUsersMessage
155192 @user_keys . clear
156193 when TestSyncMessage
@@ -181,11 +218,11 @@ def synchronize_for_testing(flush_workers)
181218 flush_workers . wait_all
182219 end
183220
184- def dispatch_event ( event , buffer )
221+ def dispatch_event ( event , outbox )
185222 return if @disabled . value
186223
187224 # Always record the event in the summary.
188- buffer . add_to_summary ( event )
225+ outbox . add_to_summary ( event )
189226
190227 # Decide whether to add the event to the payload. Feature events may be added twice, once for
191228 # the event (if tracked) and once for debugging.
@@ -205,16 +242,16 @@ def dispatch_event(event, buffer)
205242 # an identify event for that user.
206243 if !( will_add_full_event && @config . inline_users_in_events )
207244 if event . has_key? ( :user ) && !notice_user ( event [ :user ] ) && event [ :kind ] != "identify"
208- buffer . add_event ( {
245+ outbox . add_event ( {
209246 kind : "index" ,
210247 creationDate : event [ :creationDate ] ,
211248 user : event [ :user ]
212249 } )
213250 end
214251 end
215252
216- buffer . add_event ( event ) if will_add_full_event
217- buffer . add_event ( debug_event ) if !debug_event . nil?
253+ outbox . add_event ( event ) if will_add_full_event
254+ outbox . add_event ( debug_event ) if !debug_event . nil?
218255 end
219256
220257 # Add to the set of users we've noticed, and return true if the user was already known to us.
@@ -236,12 +273,12 @@ def should_debug_event(event)
236273 end
237274 end
238275
239- def trigger_flush ( buffer , flush_workers )
276+ def trigger_flush ( outbox , flush_workers )
240277 if @disabled . value
241278 return
242279 end
243280
244- payload = buffer . get_payload
281+ payload = outbox . get_payload
245282 if !payload . events . empty? || !payload . summary . counters . empty?
246283 # If all available worker threads are busy, success will be false and no job will be queued.
247284 success = flush_workers . post do
@@ -252,7 +289,7 @@ def trigger_flush(buffer, flush_workers)
252289 Util . log_exception ( @config . logger , "Unexpected error in event processor" , e )
253290 end
254291 end
255- buffer . clear if success # Reset our internal state, these events now belong to the flush worker
292+ outbox . clear if success # Reset our internal state, these events now belong to the flush worker
256293 end
257294 end
258295
0 commit comments