diff --git a/btrdb/conn.py b/btrdb/conn.py index dc48d28..83aa5e1 100644 --- a/btrdb/conn.py +++ b/btrdb/conn.py @@ -110,20 +110,20 @@ def __init__(self, addrportstr, apikey=None): else: self.channel = grpc.insecure_channel(addrportstr, chan_ops) if apikey is not None: + class AuthCallDetails(grpc.ClientCallDetails): def __init__(self, apikey, client_call_details): metadata = [] if client_call_details.metadata is not None: metadata = list(client_call_details.metadata) - metadata.append( - ('authorization', "Bearer " + apikey) - ) + metadata.append(("authorization", "Bearer " + apikey)) self.method = client_call_details.method self.timeout = client_call_details.timeout self.credentials = client_call_details.credentials self.wait_for_ready = client_call_details.wait_for_ready self.compression = client_call_details.compression self.metadata = metadata + class AuthorizationInterceptor( grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor, @@ -132,19 +132,43 @@ class AuthorizationInterceptor( ): def __init__(self, apikey): self.apikey = apikey - def intercept_unary_unary(self, continuation, client_call_details, request): - return continuation(AuthCallDetails(self.apikey, client_call_details), request) - def intercept_unary_stream(self, continuation, client_call_details, request): - return continuation(AuthCallDetails(self.apikey, client_call_details), request) - def intercept_stream_unary(self, continuation, client_call_details, request_iterator): - return continuation(AuthCallDetails(self.apikey, client_call_details), request) - def intercept_stream_stream(self, continuation, client_call_details, request_iterator): - return continuation(AuthCallDetails(self.apikey, client_call_details), request) + + def intercept_unary_unary( + self, continuation, client_call_details, request + ): + return continuation( + AuthCallDetails(self.apikey, client_call_details), request + ) + + def intercept_unary_stream( + self, continuation, client_call_details, request + ): + return continuation( + AuthCallDetails(self.apikey, client_call_details), request + ) + + def intercept_stream_unary( + self, continuation, client_call_details, request_iterator + ): + return continuation( + AuthCallDetails(self.apikey, client_call_details), + request_iterator, + ) + + def intercept_stream_stream( + self, continuation, client_call_details, request_iterator + ): + return continuation( + AuthCallDetails(self.apikey, client_call_details), + request_iterator, + ) + self.channel = grpc.intercept_channel( self.channel, AuthorizationInterceptor(apikey), ) + def _is_arrow_enabled(info): info = { "majorVersion": info.majorVersion, @@ -152,8 +176,9 @@ def _is_arrow_enabled(info): } major = info.get("majorVersion", -1) minor = info.get("minorVersion", -1) - logger.debug(f"major version: {major}") - logger.debug(f"minor version: {minor}") + if logger.isEnabledFor(logging.DEBUG): + logger.debug(f"major version: {major}") + logger.debug(f"minor version: {minor}") if major >= 5 and minor >= 30: return True else: @@ -169,7 +194,8 @@ def __init__(self, endpoint): self.ep = endpoint self._executor = ThreadPoolExecutor() self._ARROW_ENABLED = True # _is_arrow_enabled(self.ep.info()) - logger.debug(f"ARROW ENABLED: {self._ARROW_ENABLED}") + if logger.isEnabledFor(logging.DEBUG): + logger.debug(f"ARROW ENABLED: {self._ARROW_ENABLED}") def query(self, stmt, params=[]): """ diff --git a/btrdb/endpoint.py b/btrdb/endpoint.py index 6eec064..8346c4e 100644 --- a/btrdb/endpoint.py +++ b/btrdb/endpoint.py @@ -274,14 +274,19 @@ def lookupStreams(self, collection, isCollectionPrefix, tags, annotations): @error_handler def nearest(self, uu, time, version, backward): - logger.debug(f"nearest function params: {uu}\t{time}\t{version}\t{backward}") + if logger.isEnabledFor(logging.DEBUG): + logger.debug( + f"nearest function params: {uu}\t{time}\t{version}\t{backward}" + ) params = btrdb_pb2.NearestParams( uuid=uu.bytes, time=time, versionMajor=version, backward=backward ) - logger.debug(f"params from nearest: {params}") - logger.debug(f"uuid: {uu}") + if logger.isEnabledFor(logging.DEBUG): + logger.debug(f"params from nearest: {params}") + logger.debug(f"uuid: {uu}") result = self.stub.Nearest(params) - logger.debug(f"nearest, results: {result}") + if logger.isEnabledFor(logging.DEBUG): + logger.debug(f"nearest, results: {result}") check_proto_stat(result.stat) return result.value, result.versionMajor diff --git a/btrdb/stream.py b/btrdb/stream.py index 0296fe6..3698289 100644 --- a/btrdb/stream.py +++ b/btrdb/stream.py @@ -46,6 +46,7 @@ ## Module Variables ########################################################################## logger = logging.getLogger(__name__) +IS_DEBUG = logger.isEnabledFor(logging.DEBUG) INSERT_BATCH_SIZE = 50000 MINIMUM_TIME = -(16 << 56) MAXIMUM_TIME = (48 << 56) - 1 @@ -511,7 +512,8 @@ def arrow_insert(self, data: pa.Table, merge: str = "never") -> int: chunksize = INSERT_BATCH_SIZE assert isinstance(data, pa.Table) tmp_table = data.rename_columns(["time", "value"]) - logger.debug(f"tmp_table schema: {tmp_table.schema}") + if IS_DEBUG: + logger.debug(f"tmp_table schema: {tmp_table.schema}") new_schema = pa.schema( [ (pa.field("time", pa.timestamp(unit="ns", tz="UTC"), nullable=False)), @@ -536,7 +538,8 @@ def arrow_insert(self, data: pa.Table, merge: str = "never") -> int: # Process the batches as needed version = [] for tab in table_slices: - logger.debug(f"Table Slice: {tab}") + if IS_DEBUG: + logger.debug(f"Table Slice: {tab}") feather_bytes = _table_slice_to_feather_bytes(table_slice=tab) version.append( self._btrdb.ep.arrowInsertValues( @@ -730,7 +733,8 @@ def values(self, start, end, version=0): materialized = [] start = to_nanoseconds(start) end = to_nanoseconds(end) - logger.debug(f"For stream - {self.uuid} - {self.name}") + if IS_DEBUG: + logger.debug(f"For stream - {self.uuid} - {self.name}") point_windows = self._btrdb.ep.rawValues(self._uuid, start, end, version) for point_list, version in point_windows: for point in point_list: @@ -883,7 +887,8 @@ def arrow_aligned_windows( _arrow_not_impl_str.format("arrow_aligned_windows") ) - logger.debug(f"For stream - {self.uuid} - {self.name}") + if IS_DEBUG: + logger.debug(f"For stream - {self.uuid} - {self.name}") start = to_nanoseconds(start) end = to_nanoseconds(end) arr_bytes = self._btrdb.ep.arrowAlignedWindows( @@ -892,8 +897,9 @@ def arrow_aligned_windows( # exhausting the generator from above bytes_materialized = list(arr_bytes) - logger.debug(f"Length of materialized list: {len(bytes_materialized)}") - logger.debug(f"materialized bytes[0:1]: {bytes_materialized[0:1]}") + if IS_DEBUG: + logger.debug(f"Length of materialized list: {len(bytes_materialized)}") + logger.debug(f"materialized bytes[0:1]: {bytes_materialized[0:1]}") # ignore versions for now materialized_table = _materialize_stream_as_table(bytes_materialized) stream_names = [ @@ -1005,8 +1011,9 @@ def arrow_windows( # exhausting the generator from above bytes_materialized = list(arr_bytes) - logger.debug(f"Length of materialized list: {len(bytes_materialized)}") - logger.debug(f"materialized bytes[0:1]: {bytes_materialized[0:1]}") + if IS_DEBUG: + logger.debug(f"Length of materialized list: {len(bytes_materialized)}") + logger.debug(f"materialized bytes[0:1]: {bytes_materialized[0:1]}") # ignore versions for now materialized = _materialize_stream_as_table(bytes_materialized) stream_names = [ @@ -1042,11 +1049,15 @@ def nearest(self, time, version, backward=False): """ try: - logger.debug(f"checking nearest for: {self.uuid}\t\t{time}\t\t{version}") + if IS_DEBUG: + logger.debug( + f"checking nearest for: {self.uuid}\t\t{time}\t\t{version}" + ) rp, version = self._btrdb.ep.nearest( self._uuid, to_nanoseconds(time), version, backward ) - logger.debug(f"Nearest for stream: {self.uuid} - {rp}") + if IS_DEBUG: + logger.debug(f"Nearest for stream: {self.uuid} - {rp}") except BTrDBError as exc: if not isinstance(exc, NoSuchPoint): raise @@ -1223,7 +1234,8 @@ def earliest(self): params = self._params_from_filters() start = params.get("start", MINIMUM_TIME) versions = self.versions() - logger.debug(f"versions: {versions}") + if IS_DEBUG: + logger.debug(f"versions: {versions}") earliest_points_gen = self._btrdb._executor.map( lambda s: s.nearest(start, version=versions.get(s.uuid, 0), backward=False), self._streams, @@ -1842,8 +1854,9 @@ def _arrow_multivalues(self, period_ns: int): # exhausting the generator from above bytes_materialized = list(arr_bytes) - logger.debug(f"Length of materialized list: {len(bytes_materialized)}") - logger.debug(f"materialized bytes[0:1]: {bytes_materialized[0:1]}") + if IS_DEBUG: + logger.debug(f"Length of materialized list: {len(bytes_materialized)}") + logger.debug(f"materialized bytes[0:1]: {bytes_materialized[0:1]}") data = _materialize_stream_as_table(bytes_materialized) return data @@ -1919,9 +1932,11 @@ def _materialize_stream_as_table(arrow_bytes): for b, _ in arrow_bytes: with pa.ipc.open_stream(b) as reader: schema = reader.schema - logger.debug(f"schema: {schema}") + if IS_DEBUG: + logger.debug(f"schema: {schema}") table_list.append(reader.read_all()) - logger.debug(f"table list: {table_list}") + if IS_DEBUG: + logger.debug(f"table list: {table_list}") table = pa.concat_tables(table_list) return table