Skip to content

Commit 9875f8b

Browse files
committed
python: Add multi-database support for cluster and standalone modes
- Add database_id parameter to BaseClientConfiguration, GlideClientConfiguration, and GlideClusterClientConfiguration - Implement SELECT command for both cluster and standalone modes with comprehensive documentation - Add validation for database_id parameter (0-15 range, integer type) - Refactor configuration protobuf request creation with helper methods for better maintainability - Add extensive test coverage for database_id configuration and validation - Include production warnings and recommended approaches in SELECT command documentation - Support routing configuration for cluster mode SELECT operations - Ensure database_id persists across reconnections when set in client configuration Documentation: - Added comprehensive docstrings with warnings about SELECT command limitations - Included examples showing recommended database_id configuration approach - Documented reconnection behavior and cluster-specific routing requirements Signed-off-by: affonsov <67347924+affonsov@users.noreply.github.com>
1 parent 90f2b1a commit 9875f8b

File tree

9 files changed

+650
-30
lines changed

9 files changed

+650
-30
lines changed

python/glide-async/python/glide/async_commands/cluster_commands.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1029,6 +1029,64 @@ async def flushall(
10291029
await self._execute_command(RequestType.FlushAll, args, route),
10301030
)
10311031

1032+
async def select(
1033+
self, index: int, route: Optional[Route] = None
1034+
) -> TClusterResponse[TOK]:
1035+
"""
1036+
Change the currently selected database on cluster nodes.
1037+
1038+
**WARNING**: This command is NOT RECOMMENDED for production use.
1039+
Upon reconnection, nodes will revert to the database_id specified
1040+
in the client configuration (default: 0), NOT the database selected
1041+
via this command.
1042+
1043+
**RECOMMENDED APPROACH**: Use the database_id parameter in client
1044+
configuration instead:
1045+
1046+
```python
1047+
client = await GlideClusterClient.create_client(
1048+
GlideClusterClientConfiguration(
1049+
addresses=[NodeAddress("localhost", 6379)],
1050+
database_id=5 # Recommended: persists across reconnections
1051+
)
1052+
)
1053+
```
1054+
1055+
**CLUSTER BEHAVIOR**: This command routes to all nodes by default
1056+
to maintain consistency across the cluster.
1057+
1058+
**RECONNECTION BEHAVIOR**: After any reconnection (due to network issues,
1059+
timeouts, etc.), nodes will automatically revert to the database_id
1060+
specified during client creation, losing any database selection made via
1061+
this SELECT command.
1062+
1063+
See [valkey.io](https://valkey.io/commands/select/) for details.
1064+
1065+
Args:
1066+
index (int): The index of the database to select.
1067+
route (Optional[Route]): The command will be routed to all nodes by default,
1068+
unless `route` is provided, in which case the client will route the command
1069+
to the nodes defined by `route`.
1070+
1071+
Returns:
1072+
TClusterResponse[TOK]: A simple OK response from each node.
1073+
1074+
Examples:
1075+
>>> await client.select(1)
1076+
'OK' # All nodes in the cluster have selected database 1
1077+
>>> await client.select(2, AllNodes())
1078+
'OK' # Explicitly route to all nodes
1079+
"""
1080+
from glide_shared.routes import AllNodes
1081+
1082+
if route is None:
1083+
route = AllNodes() # Default routing for cluster mode
1084+
1085+
return cast(
1086+
TClusterResponse[TOK],
1087+
await self._execute_command(RequestType.Select, [str(index)], route),
1088+
)
1089+
10321090
async def flushdb(
10331091
self, flush_mode: Optional[FlushMode] = None, route: Optional[Route] = None
10341092
) -> TOK:

python/glide-async/python/glide/async_commands/standalone_commands.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,28 @@ async def select(self, index: int) -> TOK:
166166
"""
167167
Change the currently selected database.
168168
169+
**WARNING**: This command is NOT RECOMMENDED for production use.
170+
Upon reconnection, the client will revert to the database_id specified
171+
in the client configuration (default: 0), NOT the database selected
172+
via this command.
173+
174+
**RECOMMENDED APPROACH**: Use the database_id parameter in client
175+
configuration instead:
176+
177+
```python
178+
client = await GlideClient.create_client(
179+
GlideClientConfiguration(
180+
addresses=[NodeAddress("localhost", 6379)],
181+
database_id=5 # Recommended: persists across reconnections
182+
)
183+
)
184+
```
185+
186+
**RECONNECTION BEHAVIOR**: After any reconnection (due to network issues,
187+
timeouts, etc.), the client will automatically revert to the database_id
188+
specified during client creation, losing any database selection made via
189+
this SELECT command.
190+
169191
See [valkey.io](https://valkey.io/commands/select/) for details.
170192
171193
Args:

python/glide-shared/glide_shared/config.py

Lines changed: 66 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,8 @@ class BaseClientConfiguration:
249249
reconnect_strategy (Optional[BackoffStrategy]): Strategy used to determine how and when to reconnect, in case of
250250
connection failures.
251251
If not set, a default backoff strategy will be used.
252+
database_id (Optional[int]): Index of the logical database to connect to.
253+
If not set, the client will connect to database 0.
252254
client_name (Optional[str]): Client name to be used for the client. Will be used with CLIENT SETNAME command
253255
during connection establishment.
254256
protocol (ProtocolVersion): Serialization protocol to be used. If not set, `RESP3` will be used.
@@ -292,6 +294,7 @@ def __init__(
292294
read_from: ReadFrom = ReadFrom.PRIMARY,
293295
request_timeout: Optional[int] = None,
294296
reconnect_strategy: Optional[BackoffStrategy] = None,
297+
database_id: Optional[int] = None,
295298
client_name: Optional[str] = None,
296299
protocol: ProtocolVersion = ProtocolVersion.RESP3,
297300
inflight_requests_limit: Optional[int] = None,
@@ -305,13 +308,23 @@ def __init__(
305308
self.read_from = read_from
306309
self.request_timeout = request_timeout
307310
self.reconnect_strategy = reconnect_strategy
311+
self.database_id = database_id
308312
self.client_name = client_name
309313
self.protocol = protocol
310314
self.inflight_requests_limit = inflight_requests_limit
311315
self.client_az = client_az
312316
self.advanced_config = advanced_config
313317
self.lazy_connect = lazy_connect
314318

319+
# Validate database_id parameter
320+
if database_id is not None:
321+
if not isinstance(database_id, int):
322+
raise ValueError("database_id must be an integer")
323+
if database_id < 0:
324+
raise ValueError("database_id must be non-negative")
325+
if database_id > 15:
326+
raise ValueError("database_id must be less than or equal to 15")
327+
315328
if read_from == ReadFrom.AZ_AFFINITY and not client_az:
316329
raise ValueError(
317330
"client_az must be set when read_from is set to AZ_AFFINITY"
@@ -322,6 +335,39 @@ def __init__(
322335
"client_az must be set when read_from is set to AZ_AFFINITY_REPLICAS_AND_PRIMARY"
323336
)
324337

338+
def _set_addresses_in_request(self, request: ConnectionRequest) -> None:
339+
"""Set addresses in the protobuf request."""
340+
for address in self.addresses:
341+
address_info = request.addresses.add()
342+
address_info.host = address.host
343+
address_info.port = address.port
344+
345+
def _set_reconnect_strategy_in_request(self, request: ConnectionRequest) -> None:
346+
"""Set reconnect strategy in the protobuf request."""
347+
if not self.reconnect_strategy:
348+
return
349+
350+
request.connection_retry_strategy.number_of_retries = (
351+
self.reconnect_strategy.num_of_retries
352+
)
353+
request.connection_retry_strategy.factor = self.reconnect_strategy.factor
354+
request.connection_retry_strategy.exponent_base = (
355+
self.reconnect_strategy.exponent_base
356+
)
357+
if self.reconnect_strategy.jitter_percent is not None:
358+
request.connection_retry_strategy.jitter_percent = (
359+
self.reconnect_strategy.jitter_percent
360+
)
361+
362+
def _set_credentials_in_request(self, request: ConnectionRequest) -> None:
363+
"""Set credentials in the protobuf request."""
364+
if not self.credentials:
365+
return
366+
367+
if self.credentials.username:
368+
request.authentication_info.username = self.credentials.username
369+
request.authentication_info.password = self.credentials.password
370+
325371
def _create_a_protobuf_conn_request(
326372
self, cluster_mode: bool = False
327373
) -> ConnectionRequest:
@@ -335,44 +381,34 @@ def _create_a_protobuf_conn_request(
335381
ConnectionRequest: Protobuf ConnectionRequest.
336382
"""
337383
request = ConnectionRequest()
338-
for address in self.addresses:
339-
address_info = request.addresses.add()
340-
address_info.host = address.host
341-
address_info.port = address.port
384+
385+
# Set basic configuration
386+
self._set_addresses_in_request(request)
342387
request.tls_mode = TlsMode.SecureTls if self.use_tls else TlsMode.NoTls
343388
request.read_from = self.read_from.value
389+
request.cluster_mode_enabled = cluster_mode
390+
request.protocol = self.protocol.value
391+
392+
# Set optional configuration
344393
if self.request_timeout:
345394
request.request_timeout = self.request_timeout
346-
if self.reconnect_strategy:
347-
request.connection_retry_strategy.number_of_retries = (
348-
self.reconnect_strategy.num_of_retries
349-
)
350-
request.connection_retry_strategy.factor = self.reconnect_strategy.factor
351-
request.connection_retry_strategy.exponent_base = (
352-
self.reconnect_strategy.exponent_base
353-
)
354-
if self.reconnect_strategy.jitter_percent is not None:
355-
request.connection_retry_strategy.jitter_percent = (
356-
self.reconnect_strategy.jitter_percent
357-
)
358395

359-
request.cluster_mode_enabled = True if cluster_mode else False
360-
if self.credentials:
361-
if self.credentials.username:
362-
request.authentication_info.username = self.credentials.username
363-
request.authentication_info.password = self.credentials.password
396+
self._set_reconnect_strategy_in_request(request)
397+
self._set_credentials_in_request(request)
398+
364399
if self.client_name:
365400
request.client_name = self.client_name
366-
request.protocol = self.protocol.value
367401
if self.inflight_requests_limit:
368402
request.inflight_requests_limit = self.inflight_requests_limit
369403
if self.client_az:
370404
request.client_az = self.client_az
405+
if self.database_id is not None:
406+
request.database_id = self.database_id
371407
if self.advanced_config:
372408
self.advanced_config._create_a_protobuf_conn_request(request)
373-
374409
if self.lazy_connect is not None:
375410
request.lazy_connect = self.lazy_connect
411+
376412
return request
377413

378414
def _is_pubsub_configured(self) -> bool:
@@ -425,7 +461,8 @@ class GlideClientConfiguration(BaseClientConfiguration):
425461
reconnect_strategy (Optional[BackoffStrategy]): Strategy used to determine how and when to reconnect, in case of
426462
connection failures.
427463
If not set, a default backoff strategy will be used.
428-
database_id (Optional[int]): index of the logical database to connect to.
464+
database_id (Optional[int]): Index of the logical database to connect to.
465+
If not set, the client will connect to database 0.
429466
client_name (Optional[str]): Client name to be used for the client. Will be used with CLIENT SETNAME command during
430467
connection establishment.
431468
protocol (ProtocolVersion): The version of the RESP protocol to communicate with the server.
@@ -500,23 +537,21 @@ def __init__(
500537
read_from=read_from,
501538
request_timeout=request_timeout,
502539
reconnect_strategy=reconnect_strategy,
540+
database_id=database_id,
503541
client_name=client_name,
504542
protocol=protocol,
505543
inflight_requests_limit=inflight_requests_limit,
506544
client_az=client_az,
507545
advanced_config=advanced_config,
508546
lazy_connect=lazy_connect,
509547
)
510-
self.database_id = database_id
511548
self.pubsub_subscriptions = pubsub_subscriptions
512549

513550
def _create_a_protobuf_conn_request(
514551
self, cluster_mode: bool = False
515552
) -> ConnectionRequest:
516553
assert cluster_mode is False
517554
request = super()._create_a_protobuf_conn_request(cluster_mode)
518-
if self.database_id:
519-
request.database_id = self.database_id
520555

521556
if self.pubsub_subscriptions:
522557
if self.protocol == ProtocolVersion.RESP2:
@@ -592,6 +627,8 @@ class GlideClusterClientConfiguration(BaseClientConfiguration):
592627
reconnect_strategy (Optional[BackoffStrategy]): Strategy used to determine how and when to reconnect, in case of
593628
connection failures.
594629
If not set, a default backoff strategy will be used.
630+
database_id (Optional[int]): Index of the logical database to connect to.
631+
If not set, the client will connect to database 0.
595632
client_name (Optional[str]): Client name to be used for the client. Will be used with CLIENT SETNAME command during
596633
connection establishment.
597634
protocol (ProtocolVersion): The version of the RESP protocol to communicate with the server.
@@ -661,6 +698,7 @@ def __init__(
661698
read_from: ReadFrom = ReadFrom.PRIMARY,
662699
request_timeout: Optional[int] = None,
663700
reconnect_strategy: Optional[BackoffStrategy] = None,
701+
database_id: Optional[int] = None,
664702
client_name: Optional[str] = None,
665703
protocol: ProtocolVersion = ProtocolVersion.RESP3,
666704
periodic_checks: Union[
@@ -679,6 +717,7 @@ def __init__(
679717
read_from=read_from,
680718
request_timeout=request_timeout,
681719
reconnect_strategy=reconnect_strategy,
720+
database_id=database_id,
682721
client_name=client_name,
683722
protocol=protocol,
684723
inflight_requests_limit=inflight_requests_limit,

python/glide-sync/glide_sync/config.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,8 @@ class GlideClusterClientConfiguration(SharedGlideClusterClientConfiguration):
123123
reconnect_strategy (Optional[BackoffStrategy]): Strategy used to determine how and when to reconnect, in case of
124124
connection failures.
125125
If not set, a default backoff strategy will be used.
126+
database_id (Optional[int]): Index of the logical database to connect to.
127+
If not set, the client will connect to database 0.
126128
client_name (Optional[str]): Client name to be used for the client. Will be used with CLIENT SETNAME command during
127129
connection establishment.
128130
protocol (ProtocolVersion): The version of the RESP protocol to communicate with the server.
@@ -153,6 +155,7 @@ def __init__(
153155
read_from: ReadFrom = ReadFrom.PRIMARY,
154156
request_timeout: Optional[int] = None,
155157
reconnect_strategy: Optional[BackoffStrategy] = None,
158+
database_id: Optional[int] = None,
156159
client_name: Optional[str] = None,
157160
protocol: ProtocolVersion = ProtocolVersion.RESP3,
158161
periodic_checks: Union[
@@ -168,9 +171,10 @@ def __init__(
168171
credentials=credentials,
169172
read_from=read_from,
170173
request_timeout=request_timeout,
174+
reconnect_strategy=reconnect_strategy,
175+
database_id=database_id,
171176
periodic_checks=periodic_checks,
172177
pubsub_subscriptions=None,
173-
reconnect_strategy=reconnect_strategy,
174178
client_name=client_name,
175179
protocol=protocol,
176180
inflight_requests_limit=None,

python/glide-sync/glide_sync/sync_commands/cluster_commands.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -919,6 +919,64 @@ def flushall(
919919
self._execute_command(RequestType.FlushAll, args, route),
920920
)
921921

922+
def select(
923+
self, index: int, route: Optional[Route] = None
924+
) -> TClusterResponse[TOK]:
925+
"""
926+
Change the currently selected database on cluster nodes.
927+
928+
**WARNING**: This command is NOT RECOMMENDED for production use.
929+
Upon reconnection, nodes will revert to the database_id specified
930+
in the client configuration (default: 0), NOT the database selected
931+
via this command.
932+
933+
**RECOMMENDED APPROACH**: Use the database_id parameter in client
934+
configuration instead:
935+
936+
```python
937+
client = GlideClusterClient.create_client(
938+
GlideClusterClientConfiguration(
939+
addresses=[NodeAddress("localhost", 6379)],
940+
database_id=5 # Recommended: persists across reconnections
941+
)
942+
)
943+
```
944+
945+
**CLUSTER BEHAVIOR**: This command routes to all nodes by default
946+
to maintain consistency across the cluster.
947+
948+
**RECONNECTION BEHAVIOR**: After any reconnection (due to network issues,
949+
timeouts, etc.), nodes will automatically revert to the database_id
950+
specified during client creation, losing any database selection made via
951+
this SELECT command.
952+
953+
See [valkey.io](https://valkey.io/commands/select/) for details.
954+
955+
Args:
956+
index (int): The index of the database to select.
957+
route (Optional[Route]): The command will be routed to all nodes by default,
958+
unless `route` is provided, in which case the client will route the command
959+
to the nodes defined by `route`.
960+
961+
Returns:
962+
TClusterResponse[TOK]: A simple OK response from each node.
963+
964+
Examples:
965+
>>> client.select(1)
966+
'OK' # All nodes in the cluster have selected database 1
967+
>>> client.select(2, AllNodes())
968+
'OK' # Explicitly route to all nodes
969+
"""
970+
from glide_shared.routes import AllNodes
971+
972+
if route is None:
973+
route = AllNodes() # Default routing for cluster mode
974+
975+
return cast(
976+
TClusterResponse[TOK],
977+
self._execute_command(RequestType.Select, [str(index)], route),
978+
)
979+
922980
def flushdb(
923981
self, flush_mode: Optional[FlushMode] = None, route: Optional[Route] = None
924982
) -> TOK:

0 commit comments

Comments
 (0)