@@ -154,6 +154,8 @@ class KafkaClient(object):
154
154
sasl mechanism handshake. Default: one of bootstrap servers
155
155
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
156
156
instance. (See kafka.oauth.abstract). Default: None
157
+ raise_upon_socket_err_during_wakeup (bool): If set to True, raise an exception
158
+ upon socket error during wakeup(). Default: False
157
159
"""
158
160
159
161
DEFAULT_CONFIG = {
@@ -192,7 +194,8 @@ class KafkaClient(object):
192
194
'sasl_plain_password' : None ,
193
195
'sasl_kerberos_service_name' : 'kafka' ,
194
196
'sasl_kerberos_domain_name' : None ,
195
- 'sasl_oauth_token_provider' : None
197
+ 'sasl_oauth_token_provider' : None ,
198
+ 'raise_upon_socket_err_during_wakeup' : False
196
199
}
197
200
198
201
def __init__ (self , ** configs ):
@@ -243,6 +246,8 @@ def __init__(self, **configs):
243
246
check_timeout = self .config ['api_version_auto_timeout_ms' ] / 1000
244
247
self .config ['api_version' ] = self .check_version (timeout = check_timeout )
245
248
249
+ self ._raise_upon_socket_err_during_wakeup = self .config ['raise_upon_socket_err_during_wakeup' ]
250
+
246
251
def _can_bootstrap (self ):
247
252
effective_failures = self ._bootstrap_fails // self ._num_bootstrap_hosts
248
253
backoff_factor = 2 ** effective_failures
@@ -936,8 +941,10 @@ def wakeup(self):
936
941
except socket .timeout :
937
942
log .warning ('Timeout to send to wakeup socket!' )
938
943
raise Errors .KafkaTimeoutError ()
939
- except socket .error :
944
+ except socket .error as e :
940
945
log .warning ('Unable to send to wakeup socket!' )
946
+ if self ._raise_upon_socket_err_during_wakeup :
947
+ raise e
941
948
942
949
def _clear_wake_fd (self ):
943
950
# reading from wake socket should only happen in a single thread
0 commit comments