55import errno
66import jsonpickle
77from threading import Event , Lock , Thread
8+ import six
89import time
10+ import urllib3
911
1012# noinspection PyBroadException
1113try :
1416 # noinspection PyUnresolvedReferences,PyPep8Naming
1517 import Queue as queue
1618
17- import requests
18- from requests .packages .urllib3 .exceptions import ProtocolError
19-
20- import six
21-
2219from ldclient .event_summarizer import EventSummarizer
2320from ldclient .fixed_thread_pool import FixedThreadPool
2421from ldclient .lru_cache import SimpleLRUCache
2522from ldclient .user_filter import UserFilter
2623from ldclient .interfaces import EventProcessor
2724from ldclient .repeating_timer import RepeatingTimer
25+ from ldclient .util import UnsuccessfulResponseException
2826from ldclient .util import _headers
27+ from ldclient .util import create_http_pool_manager
2928from ldclient .util import log
29+ from ldclient .util import http_error_message , is_http_error_recoverable , throw_if_unsuccessful_response
3030
3131
3232__MAX_FLUSH_THREADS__ = 5
@@ -144,8 +144,8 @@ def make_summary_event(self, summary):
144144
145145
146146class EventPayloadSendTask (object ):
147- def __init__ (self , session , config , formatter , payload , response_fn ):
148- self ._session = session
147+ def __init__ (self , http , config , formatter , payload , response_fn ):
148+ self ._http = http
149149 self ._config = config
150150 self ._formatter = formatter
151151 self ._payload = payload
@@ -154,43 +154,30 @@ def __init__(self, session, config, formatter, payload, response_fn):
154154 def run (self ):
155155 try :
156156 output_events = self ._formatter .make_output_events (self ._payload .events , self ._payload .summary )
157- resp = self ._do_send (output_events , True )
158- if resp is not None :
159- self ._response_fn (resp )
157+ resp = self ._do_send (output_events )
160158 except Exception :
161159 log .warning (
162160 'Unhandled exception in event processor. Analytics events were not processed.' ,
163161 exc_info = True )
164162
165- def _do_send (self , output_events , should_retry ):
163+ def _do_send (self , output_events ):
166164 # noinspection PyBroadException
167165 try :
168166 json_body = jsonpickle .encode (output_events , unpicklable = False )
169167 log .debug ('Sending events payload: ' + json_body )
170168 hdrs = _headers (self ._config .sdk_key )
171169 hdrs ['X-LaunchDarkly-Event-Schema' ] = str (__CURRENT_EVENT_SCHEMA__ )
172170 uri = self ._config .events_uri
173- r = self ._session . post ( uri ,
171+ r = self ._http . request ( 'POST' , uri ,
174172 headers = hdrs ,
175- timeout = (self ._config .connect_timeout , self ._config .read_timeout ),
176- data = json_body )
177- r .raise_for_status ()
173+ timeout = urllib3 .Timeout (connect = self ._config .connect_timeout , read = self ._config .read_timeout ),
174+ body = json_body ,
175+ retries = 1 )
176+ self ._response_fn (r )
178177 return r
179- except ProtocolError as e :
180- if e .args is not None and len (e .args ) > 1 and e .args [1 ] is not None :
181- inner = e .args [1 ]
182- if inner .errno is not None and inner .errno == errno .ECONNRESET and should_retry :
183- log .warning (
184- 'ProtocolError exception caught while sending events. Retrying.' )
185- self ._do_send (output_events , False )
186- else :
187- log .warning (
188- 'Unhandled exception in event processor. Analytics events were not processed.' ,
189- exc_info = True )
190- except Exception :
178+ except Exception as e :
191179 log .warning (
192- 'Unhandled exception in event processor. Analytics events were not processed.' ,
193- exc_info = True )
180+ 'Unhandled exception in event processor. Analytics events were not processed. [%s]' , e )
194181
195182
196183FlushPayload = namedtuple ('FlushPayload' , ['events' , 'summary' ])
@@ -224,11 +211,11 @@ def clear(self):
224211
225212
226213class EventDispatcher (object ):
227- def __init__ (self , queue , config , session ):
214+ def __init__ (self , queue , config , http_client ):
228215 self ._queue = queue
229216 self ._config = config
230- self ._session = requests . Session ( ) if session is None else session
231- self ._close_session = (session is None ) # so we know whether to close it later
217+ self ._http = create_http_pool_manager ( num_pools = 1 , verify_ssl = config . verify_ssl ) if http_client is None else http_client
218+ self ._close_http = (http_client is None ) # so we know whether to close it later
232219 self ._disabled = False
233220 self ._buffer = EventBuffer (config .events_max_pending )
234221 self ._user_keys = SimpleLRUCache (config .user_keys_capacity )
@@ -261,7 +248,6 @@ def _run_main_loop(self):
261248 return
262249 except Exception :
263250 log .error ('Unhandled exception in event processor' , exc_info = True )
264- self ._session .close ()
265251
266252 def _process_event (self , event ):
267253 if self ._disabled :
@@ -320,7 +306,7 @@ def _trigger_flush(self):
320306 return
321307 payload = self ._buffer .get_payload ()
322308 if len (payload .events ) > 0 or len (payload .summary .counters ) > 0 :
323- task = EventPayloadSendTask (self ._session , self ._config , self ._formatter , payload ,
309+ task = EventPayloadSendTask (self ._http , self ._config , self ._formatter , payload ,
324310 self ._handle_response )
325311 if self ._flush_workers .execute (task .run ):
326312 # The events have been handed off to a flush worker; clear them from our buffer.
@@ -330,34 +316,35 @@ def _trigger_flush(self):
330316 pass
331317
332318 def _handle_response (self , r ):
333- server_date_str = r .headers . get ('Date' )
319+ server_date_str = r .getheader ('Date' )
334320 if server_date_str is not None :
335321 server_date = parsedate (server_date_str )
336322 if server_date is not None :
337323 timestamp = int (time .mktime (server_date ) * 1000 )
338324 self ._last_known_past_time = timestamp
339- if r .status_code == 401 :
340- log .error ('Received 401 error, no further events will be posted since SDK key is invalid' )
341- self ._disabled = True
342- return
325+ if r .status > 299 :
326+ log .error (http_error_message (r .status , "event delivery" , "some events were dropped" ))
327+ if not is_http_error_recoverable (r .status ):
328+ self ._disabled = True
329+ return
343330
344331 def _do_shutdown (self ):
345332 self ._flush_workers .stop ()
346333 self ._flush_workers .wait ()
347- if self ._close_session :
348- self ._session . close ()
334+ if self ._close_http :
335+ self ._http . clear ()
349336
350337
351338class DefaultEventProcessor (EventProcessor ):
352- def __init__ (self , config , session = None ):
339+ def __init__ (self , config , http = None ):
353340 self ._queue = queue .Queue (config .events_max_pending )
354341 self ._flush_timer = RepeatingTimer (config .flush_interval , self .flush )
355342 self ._users_flush_timer = RepeatingTimer (config .user_keys_flush_interval , self ._flush_users )
356343 self ._flush_timer .start ()
357344 self ._users_flush_timer .start ()
358345 self ._close_lock = Lock ()
359346 self ._closed = False
360- EventDispatcher (self ._queue , config , session )
347+ EventDispatcher (self ._queue , config , http )
361348
362349 def send_event (self , event ):
363350 event ['creationDate' ] = int (time .time () * 1000 )
0 commit comments