From da525f42c912586a2cc2b695b595b02f04e11cdd Mon Sep 17 00:00:00 2001 From: Justin Gilmer Date: Fri, 23 Jun 2023 16:39:48 -0400 Subject: [PATCH] Optimize logging calls Previously, the debug logging in the api would create the f-strings no matter if logging.DEBUG was the current log level or not. This can impact the performance, especially for benchmarking. Now, a cached IS_DEBUG flag is created for the stream operations, and other locations, the logger.isEnabledFor boolean is checked. Note that in the stream.py, this same function call is only executed once, and the results are cached for the rest of the logic. --- btrdb/conn.py | 54 +++++++++++++++++++++++++++++++++++------------ btrdb/endpoint.py | 13 ++++++++---- btrdb/stream.py | 45 ++++++++++++++++++++++++++------------- 3 files changed, 79 insertions(+), 33 deletions(-) diff --git a/btrdb/conn.py b/btrdb/conn.py index dc48d28..83aa5e1 100644 --- a/btrdb/conn.py +++ b/btrdb/conn.py @@ -110,20 +110,20 @@ def __init__(self, addrportstr, apikey=None): else: self.channel = grpc.insecure_channel(addrportstr, chan_ops) if apikey is not None: + class AuthCallDetails(grpc.ClientCallDetails): def __init__(self, apikey, client_call_details): metadata = [] if client_call_details.metadata is not None: metadata = list(client_call_details.metadata) - metadata.append( - ('authorization', "Bearer " + apikey) - ) + metadata.append(("authorization", "Bearer " + apikey)) self.method = client_call_details.method self.timeout = client_call_details.timeout self.credentials = client_call_details.credentials self.wait_for_ready = client_call_details.wait_for_ready self.compression = client_call_details.compression self.metadata = metadata + class AuthorizationInterceptor( grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor, @@ -132,19 +132,43 @@ class AuthorizationInterceptor( ): def __init__(self, apikey): self.apikey = apikey - def intercept_unary_unary(self, continuation, client_call_details, request): - return continuation(AuthCallDetails(self.apikey, client_call_details), request) - def intercept_unary_stream(self, continuation, client_call_details, request): - return continuation(AuthCallDetails(self.apikey, client_call_details), request) - def intercept_stream_unary(self, continuation, client_call_details, request_iterator): - return continuation(AuthCallDetails(self.apikey, client_call_details), request) - def intercept_stream_stream(self, continuation, client_call_details, request_iterator): - return continuation(AuthCallDetails(self.apikey, client_call_details), request) + + def intercept_unary_unary( + self, continuation, client_call_details, request + ): + return continuation( + AuthCallDetails(self.apikey, client_call_details), request + ) + + def intercept_unary_stream( + self, continuation, client_call_details, request + ): + return continuation( + AuthCallDetails(self.apikey, client_call_details), request + ) + + def intercept_stream_unary( + self, continuation, client_call_details, request_iterator + ): + return continuation( + AuthCallDetails(self.apikey, client_call_details), + request_iterator, + ) + + def intercept_stream_stream( + self, continuation, client_call_details, request_iterator + ): + return continuation( + AuthCallDetails(self.apikey, client_call_details), + request_iterator, + ) + self.channel = grpc.intercept_channel( self.channel, AuthorizationInterceptor(apikey), ) + def _is_arrow_enabled(info): info = { "majorVersion": info.majorVersion, @@ -152,8 +176,9 @@ def _is_arrow_enabled(info): } major = info.get("majorVersion", -1) minor = info.get("minorVersion", -1) - logger.debug(f"major version: {major}") - logger.debug(f"minor version: {minor}") + if logger.isEnabledFor(logging.DEBUG): + logger.debug(f"major version: {major}") + logger.debug(f"minor version: {minor}") if major >= 5 and minor >= 30: return True else: @@ -169,7 +194,8 @@ def __init__(self, endpoint): self.ep = endpoint self._executor = ThreadPoolExecutor() self._ARROW_ENABLED = True # _is_arrow_enabled(self.ep.info()) - logger.debug(f"ARROW ENABLED: {self._ARROW_ENABLED}") + if logger.isEnabledFor(logging.DEBUG): + logger.debug(f"ARROW ENABLED: {self._ARROW_ENABLED}") def query(self, stmt, params=[]): """ diff --git a/btrdb/endpoint.py b/btrdb/endpoint.py index 6eec064..8346c4e 100644 --- a/btrdb/endpoint.py +++ b/btrdb/endpoint.py @@ -274,14 +274,19 @@ def lookupStreams(self, collection, isCollectionPrefix, tags, annotations): @error_handler def nearest(self, uu, time, version, backward): - logger.debug(f"nearest function params: {uu}\t{time}\t{version}\t{backward}") + if logger.isEnabledFor(logging.DEBUG): + logger.debug( + f"nearest function params: {uu}\t{time}\t{version}\t{backward}" + ) params = btrdb_pb2.NearestParams( uuid=uu.bytes, time=time, versionMajor=version, backward=backward ) - logger.debug(f"params from nearest: {params}") - logger.debug(f"uuid: {uu}") + if logger.isEnabledFor(logging.DEBUG): + logger.debug(f"params from nearest: {params}") + logger.debug(f"uuid: {uu}") result = self.stub.Nearest(params) - logger.debug(f"nearest, results: {result}") + if logger.isEnabledFor(logging.DEBUG): + logger.debug(f"nearest, results: {result}") check_proto_stat(result.stat) return result.value, result.versionMajor diff --git a/btrdb/stream.py b/btrdb/stream.py index 0296fe6..3698289 100644 --- a/btrdb/stream.py +++ b/btrdb/stream.py @@ -46,6 +46,7 @@ ## Module Variables ########################################################################## logger = logging.getLogger(__name__) +IS_DEBUG = logger.isEnabledFor(logging.DEBUG) INSERT_BATCH_SIZE = 50000 MINIMUM_TIME = -(16 << 56) MAXIMUM_TIME = (48 << 56) - 1 @@ -511,7 +512,8 @@ def arrow_insert(self, data: pa.Table, merge: str = "never") -> int: chunksize = INSERT_BATCH_SIZE assert isinstance(data, pa.Table) tmp_table = data.rename_columns(["time", "value"]) - logger.debug(f"tmp_table schema: {tmp_table.schema}") + if IS_DEBUG: + logger.debug(f"tmp_table schema: {tmp_table.schema}") new_schema = pa.schema( [ (pa.field("time", pa.timestamp(unit="ns", tz="UTC"), nullable=False)), @@ -536,7 +538,8 @@ def arrow_insert(self, data: pa.Table, merge: str = "never") -> int: # Process the batches as needed version = [] for tab in table_slices: - logger.debug(f"Table Slice: {tab}") + if IS_DEBUG: + logger.debug(f"Table Slice: {tab}") feather_bytes = _table_slice_to_feather_bytes(table_slice=tab) version.append( self._btrdb.ep.arrowInsertValues( @@ -730,7 +733,8 @@ def values(self, start, end, version=0): materialized = [] start = to_nanoseconds(start) end = to_nanoseconds(end) - logger.debug(f"For stream - {self.uuid} - {self.name}") + if IS_DEBUG: + logger.debug(f"For stream - {self.uuid} - {self.name}") point_windows = self._btrdb.ep.rawValues(self._uuid, start, end, version) for point_list, version in point_windows: for point in point_list: @@ -883,7 +887,8 @@ def arrow_aligned_windows( _arrow_not_impl_str.format("arrow_aligned_windows") ) - logger.debug(f"For stream - {self.uuid} - {self.name}") + if IS_DEBUG: + logger.debug(f"For stream - {self.uuid} - {self.name}") start = to_nanoseconds(start) end = to_nanoseconds(end) arr_bytes = self._btrdb.ep.arrowAlignedWindows( @@ -892,8 +897,9 @@ def arrow_aligned_windows( # exhausting the generator from above bytes_materialized = list(arr_bytes) - logger.debug(f"Length of materialized list: {len(bytes_materialized)}") - logger.debug(f"materialized bytes[0:1]: {bytes_materialized[0:1]}") + if IS_DEBUG: + logger.debug(f"Length of materialized list: {len(bytes_materialized)}") + logger.debug(f"materialized bytes[0:1]: {bytes_materialized[0:1]}") # ignore versions for now materialized_table = _materialize_stream_as_table(bytes_materialized) stream_names = [ @@ -1005,8 +1011,9 @@ def arrow_windows( # exhausting the generator from above bytes_materialized = list(arr_bytes) - logger.debug(f"Length of materialized list: {len(bytes_materialized)}") - logger.debug(f"materialized bytes[0:1]: {bytes_materialized[0:1]}") + if IS_DEBUG: + logger.debug(f"Length of materialized list: {len(bytes_materialized)}") + logger.debug(f"materialized bytes[0:1]: {bytes_materialized[0:1]}") # ignore versions for now materialized = _materialize_stream_as_table(bytes_materialized) stream_names = [ @@ -1042,11 +1049,15 @@ def nearest(self, time, version, backward=False): """ try: - logger.debug(f"checking nearest for: {self.uuid}\t\t{time}\t\t{version}") + if IS_DEBUG: + logger.debug( + f"checking nearest for: {self.uuid}\t\t{time}\t\t{version}" + ) rp, version = self._btrdb.ep.nearest( self._uuid, to_nanoseconds(time), version, backward ) - logger.debug(f"Nearest for stream: {self.uuid} - {rp}") + if IS_DEBUG: + logger.debug(f"Nearest for stream: {self.uuid} - {rp}") except BTrDBError as exc: if not isinstance(exc, NoSuchPoint): raise @@ -1223,7 +1234,8 @@ def earliest(self): params = self._params_from_filters() start = params.get("start", MINIMUM_TIME) versions = self.versions() - logger.debug(f"versions: {versions}") + if IS_DEBUG: + logger.debug(f"versions: {versions}") earliest_points_gen = self._btrdb._executor.map( lambda s: s.nearest(start, version=versions.get(s.uuid, 0), backward=False), self._streams, @@ -1842,8 +1854,9 @@ def _arrow_multivalues(self, period_ns: int): # exhausting the generator from above bytes_materialized = list(arr_bytes) - logger.debug(f"Length of materialized list: {len(bytes_materialized)}") - logger.debug(f"materialized bytes[0:1]: {bytes_materialized[0:1]}") + if IS_DEBUG: + logger.debug(f"Length of materialized list: {len(bytes_materialized)}") + logger.debug(f"materialized bytes[0:1]: {bytes_materialized[0:1]}") data = _materialize_stream_as_table(bytes_materialized) return data @@ -1919,9 +1932,11 @@ def _materialize_stream_as_table(arrow_bytes): for b, _ in arrow_bytes: with pa.ipc.open_stream(b) as reader: schema = reader.schema - logger.debug(f"schema: {schema}") + if IS_DEBUG: + logger.debug(f"schema: {schema}") table_list.append(reader.read_all()) - logger.debug(f"table list: {table_list}") + if IS_DEBUG: + logger.debug(f"table list: {table_list}") table = pa.concat_tables(table_list) return table