Skip to content

Commit d89d28c

Browse files
authored
Optimize logging calls (#30)
Previously, the debug logging in the api would create the f-strings no matter if logging.DEBUG was the current log level or not. This can impact the performance, especially for benchmarking. Now, a cached IS_DEBUG flag is created for the stream operations, and other locations, the logger.isEnabledFor boolean is checked. Note that in the stream.py, this same function call is only executed once, and the results are cached for the rest of the logic.
1 parent caa919a commit d89d28c

File tree

3 files changed

+79
-33
lines changed

3 files changed

+79
-33
lines changed

btrdb/conn.py

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -110,20 +110,20 @@ def __init__(self, addrportstr, apikey=None):
110110
else:
111111
self.channel = grpc.insecure_channel(addrportstr, chan_ops)
112112
if apikey is not None:
113+
113114
class AuthCallDetails(grpc.ClientCallDetails):
114115
def __init__(self, apikey, client_call_details):
115116
metadata = []
116117
if client_call_details.metadata is not None:
117118
metadata = list(client_call_details.metadata)
118-
metadata.append(
119-
('authorization', "Bearer " + apikey)
120-
)
119+
metadata.append(("authorization", "Bearer " + apikey))
121120
self.method = client_call_details.method
122121
self.timeout = client_call_details.timeout
123122
self.credentials = client_call_details.credentials
124123
self.wait_for_ready = client_call_details.wait_for_ready
125124
self.compression = client_call_details.compression
126125
self.metadata = metadata
126+
127127
class AuthorizationInterceptor(
128128
grpc.UnaryUnaryClientInterceptor,
129129
grpc.UnaryStreamClientInterceptor,
@@ -132,28 +132,53 @@ class AuthorizationInterceptor(
132132
):
133133
def __init__(self, apikey):
134134
self.apikey = apikey
135-
def intercept_unary_unary(self, continuation, client_call_details, request):
136-
return continuation(AuthCallDetails(self.apikey, client_call_details), request)
137-
def intercept_unary_stream(self, continuation, client_call_details, request):
138-
return continuation(AuthCallDetails(self.apikey, client_call_details), request)
139-
def intercept_stream_unary(self, continuation, client_call_details, request_iterator):
140-
return continuation(AuthCallDetails(self.apikey, client_call_details), request)
141-
def intercept_stream_stream(self, continuation, client_call_details, request_iterator):
142-
return continuation(AuthCallDetails(self.apikey, client_call_details), request)
135+
136+
def intercept_unary_unary(
137+
self, continuation, client_call_details, request
138+
):
139+
return continuation(
140+
AuthCallDetails(self.apikey, client_call_details), request
141+
)
142+
143+
def intercept_unary_stream(
144+
self, continuation, client_call_details, request
145+
):
146+
return continuation(
147+
AuthCallDetails(self.apikey, client_call_details), request
148+
)
149+
150+
def intercept_stream_unary(
151+
self, continuation, client_call_details, request_iterator
152+
):
153+
return continuation(
154+
AuthCallDetails(self.apikey, client_call_details),
155+
request_iterator,
156+
)
157+
158+
def intercept_stream_stream(
159+
self, continuation, client_call_details, request_iterator
160+
):
161+
return continuation(
162+
AuthCallDetails(self.apikey, client_call_details),
163+
request_iterator,
164+
)
165+
143166
self.channel = grpc.intercept_channel(
144167
self.channel,
145168
AuthorizationInterceptor(apikey),
146169
)
147170

171+
148172
def _is_arrow_enabled(info):
149173
info = {
150174
"majorVersion": info.majorVersion,
151175
"minorVersion": info.minorVersion,
152176
}
153177
major = info.get("majorVersion", -1)
154178
minor = info.get("minorVersion", -1)
155-
logger.debug(f"major version: {major}")
156-
logger.debug(f"minor version: {minor}")
179+
if logger.isEnabledFor(logging.DEBUG):
180+
logger.debug(f"major version: {major}")
181+
logger.debug(f"minor version: {minor}")
157182
if major >= 5 and minor >= 30:
158183
return True
159184
else:
@@ -169,7 +194,8 @@ def __init__(self, endpoint):
169194
self.ep = endpoint
170195
self._executor = ThreadPoolExecutor()
171196
self._ARROW_ENABLED = True # _is_arrow_enabled(self.ep.info())
172-
logger.debug(f"ARROW ENABLED: {self._ARROW_ENABLED}")
197+
if logger.isEnabledFor(logging.DEBUG):
198+
logger.debug(f"ARROW ENABLED: {self._ARROW_ENABLED}")
173199

174200
def query(self, stmt, params=[]):
175201
"""

btrdb/endpoint.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -274,14 +274,19 @@ def lookupStreams(self, collection, isCollectionPrefix, tags, annotations):
274274

275275
@error_handler
276276
def nearest(self, uu, time, version, backward):
277-
logger.debug(f"nearest function params: {uu}\t{time}\t{version}\t{backward}")
277+
if logger.isEnabledFor(logging.DEBUG):
278+
logger.debug(
279+
f"nearest function params: {uu}\t{time}\t{version}\t{backward}"
280+
)
278281
params = btrdb_pb2.NearestParams(
279282
uuid=uu.bytes, time=time, versionMajor=version, backward=backward
280283
)
281-
logger.debug(f"params from nearest: {params}")
282-
logger.debug(f"uuid: {uu}")
284+
if logger.isEnabledFor(logging.DEBUG):
285+
logger.debug(f"params from nearest: {params}")
286+
logger.debug(f"uuid: {uu}")
283287
result = self.stub.Nearest(params)
284-
logger.debug(f"nearest, results: {result}")
288+
if logger.isEnabledFor(logging.DEBUG):
289+
logger.debug(f"nearest, results: {result}")
285290
check_proto_stat(result.stat)
286291
return result.value, result.versionMajor
287292

btrdb/stream.py

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
## Module Variables
4747
##########################################################################
4848
logger = logging.getLogger(__name__)
49+
IS_DEBUG = logger.isEnabledFor(logging.DEBUG)
4950
INSERT_BATCH_SIZE = 50000
5051
MINIMUM_TIME = -(16 << 56)
5152
MAXIMUM_TIME = (48 << 56) - 1
@@ -511,7 +512,8 @@ def arrow_insert(self, data: pa.Table, merge: str = "never") -> int:
511512
chunksize = INSERT_BATCH_SIZE
512513
assert isinstance(data, pa.Table)
513514
tmp_table = data.rename_columns(["time", "value"])
514-
logger.debug(f"tmp_table schema: {tmp_table.schema}")
515+
if IS_DEBUG:
516+
logger.debug(f"tmp_table schema: {tmp_table.schema}")
515517
new_schema = pa.schema(
516518
[
517519
(pa.field("time", pa.timestamp(unit="ns", tz="UTC"), nullable=False)),
@@ -536,7 +538,8 @@ def arrow_insert(self, data: pa.Table, merge: str = "never") -> int:
536538
# Process the batches as needed
537539
version = []
538540
for tab in table_slices:
539-
logger.debug(f"Table Slice: {tab}")
541+
if IS_DEBUG:
542+
logger.debug(f"Table Slice: {tab}")
540543
feather_bytes = _table_slice_to_feather_bytes(table_slice=tab)
541544
version.append(
542545
self._btrdb.ep.arrowInsertValues(
@@ -730,7 +733,8 @@ def values(self, start, end, version=0):
730733
materialized = []
731734
start = to_nanoseconds(start)
732735
end = to_nanoseconds(end)
733-
logger.debug(f"For stream - {self.uuid} - {self.name}")
736+
if IS_DEBUG:
737+
logger.debug(f"For stream - {self.uuid} - {self.name}")
734738
point_windows = self._btrdb.ep.rawValues(self._uuid, start, end, version)
735739
for point_list, version in point_windows:
736740
for point in point_list:
@@ -883,7 +887,8 @@ def arrow_aligned_windows(
883887
_arrow_not_impl_str.format("arrow_aligned_windows")
884888
)
885889

886-
logger.debug(f"For stream - {self.uuid} - {self.name}")
890+
if IS_DEBUG:
891+
logger.debug(f"For stream - {self.uuid} - {self.name}")
887892
start = to_nanoseconds(start)
888893
end = to_nanoseconds(end)
889894
arr_bytes = self._btrdb.ep.arrowAlignedWindows(
@@ -892,8 +897,9 @@ def arrow_aligned_windows(
892897
# exhausting the generator from above
893898
bytes_materialized = list(arr_bytes)
894899

895-
logger.debug(f"Length of materialized list: {len(bytes_materialized)}")
896-
logger.debug(f"materialized bytes[0:1]: {bytes_materialized[0:1]}")
900+
if IS_DEBUG:
901+
logger.debug(f"Length of materialized list: {len(bytes_materialized)}")
902+
logger.debug(f"materialized bytes[0:1]: {bytes_materialized[0:1]}")
897903
# ignore versions for now
898904
materialized_table = _materialize_stream_as_table(bytes_materialized)
899905
stream_names = [
@@ -1005,8 +1011,9 @@ def arrow_windows(
10051011
# exhausting the generator from above
10061012
bytes_materialized = list(arr_bytes)
10071013

1008-
logger.debug(f"Length of materialized list: {len(bytes_materialized)}")
1009-
logger.debug(f"materialized bytes[0:1]: {bytes_materialized[0:1]}")
1014+
if IS_DEBUG:
1015+
logger.debug(f"Length of materialized list: {len(bytes_materialized)}")
1016+
logger.debug(f"materialized bytes[0:1]: {bytes_materialized[0:1]}")
10101017
# ignore versions for now
10111018
materialized = _materialize_stream_as_table(bytes_materialized)
10121019
stream_names = [
@@ -1042,11 +1049,15 @@ def nearest(self, time, version, backward=False):
10421049
10431050
"""
10441051
try:
1045-
logger.debug(f"checking nearest for: {self.uuid}\t\t{time}\t\t{version}")
1052+
if IS_DEBUG:
1053+
logger.debug(
1054+
f"checking nearest for: {self.uuid}\t\t{time}\t\t{version}"
1055+
)
10461056
rp, version = self._btrdb.ep.nearest(
10471057
self._uuid, to_nanoseconds(time), version, backward
10481058
)
1049-
logger.debug(f"Nearest for stream: {self.uuid} - {rp}")
1059+
if IS_DEBUG:
1060+
logger.debug(f"Nearest for stream: {self.uuid} - {rp}")
10501061
except BTrDBError as exc:
10511062
if not isinstance(exc, NoSuchPoint):
10521063
raise
@@ -1223,7 +1234,8 @@ def earliest(self):
12231234
params = self._params_from_filters()
12241235
start = params.get("start", MINIMUM_TIME)
12251236
versions = self.versions()
1226-
logger.debug(f"versions: {versions}")
1237+
if IS_DEBUG:
1238+
logger.debug(f"versions: {versions}")
12271239
earliest_points_gen = self._btrdb._executor.map(
12281240
lambda s: s.nearest(start, version=versions.get(s.uuid, 0), backward=False),
12291241
self._streams,
@@ -1842,8 +1854,9 @@ def _arrow_multivalues(self, period_ns: int):
18421854
# exhausting the generator from above
18431855
bytes_materialized = list(arr_bytes)
18441856

1845-
logger.debug(f"Length of materialized list: {len(bytes_materialized)}")
1846-
logger.debug(f"materialized bytes[0:1]: {bytes_materialized[0:1]}")
1857+
if IS_DEBUG:
1858+
logger.debug(f"Length of materialized list: {len(bytes_materialized)}")
1859+
logger.debug(f"materialized bytes[0:1]: {bytes_materialized[0:1]}")
18471860
data = _materialize_stream_as_table(bytes_materialized)
18481861
return data
18491862

@@ -1919,9 +1932,11 @@ def _materialize_stream_as_table(arrow_bytes):
19191932
for b, _ in arrow_bytes:
19201933
with pa.ipc.open_stream(b) as reader:
19211934
schema = reader.schema
1922-
logger.debug(f"schema: {schema}")
1935+
if IS_DEBUG:
1936+
logger.debug(f"schema: {schema}")
19231937
table_list.append(reader.read_all())
1924-
logger.debug(f"table list: {table_list}")
1938+
if IS_DEBUG:
1939+
logger.debug(f"table list: {table_list}")
19251940
table = pa.concat_tables(table_list)
19261941
return table
19271942

0 commit comments

Comments
 (0)