Skip to content

Commit c708153

Browse files
feat: add native storage mode for MongoDB (#164)
Co-authored-by: claude[bot] <41898282+claude[bot]@users.noreply.github.com> Co-authored-by: William Easton <strawgate@users.noreply.github.com>
1 parent 23f4a9c commit c708153

File tree

7 files changed

+488
-83
lines changed

7 files changed

+488
-83
lines changed

key-value/key-value-aio/src/key_value/aio/stores/mongodb/store.py

Lines changed: 98 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22
from datetime import datetime
33
from typing import Any, overload
44

5-
from key_value.shared.utils.managed_entry import ManagedEntry
5+
from key_value.shared.errors import DeserializationError
6+
from key_value.shared.utils.managed_entry import ManagedEntry, verify_dict
67
from key_value.shared.utils.sanitize import ALPHANUMERIC_CHARACTERS, sanitize_string
8+
from key_value.shared.utils.time_to_live import timezone
79
from typing_extensions import Self, override
810

911
from key_value.aio.stores.base import BaseContextManagerStore, BaseDestroyCollectionStore, BaseEnumerateCollectionsStore, BaseStore
@@ -37,44 +39,100 @@ def document_to_managed_entry(document: dict[str, Any]) -> ManagedEntry:
3739
"""Convert a MongoDB document back to a ManagedEntry.
3840
3941
This function deserializes a MongoDB document (created by `managed_entry_to_document`) back to a
40-
ManagedEntry object, parsing the stringified value field and preserving all metadata.
42+
ManagedEntry object. It supports both native BSON storage (dict in value.object field) and legacy
43+
JSON string storage (string in value.string field) for migration support.
4144
4245
Args:
4346
document: The MongoDB document to convert.
4447
4548
Returns:
4649
A ManagedEntry object reconstructed from the document.
4750
"""
48-
return ManagedEntry.from_dict(data=document, stringified_value=True)
51+
if not (value_field := document.get("value")):
52+
msg = "Value field not found"
53+
raise DeserializationError(msg)
4954

55+
if not isinstance(value_field, dict):
56+
msg = "Expected `value` field to be an object"
57+
raise DeserializationError(msg)
5058

51-
def managed_entry_to_document(key: str, managed_entry: ManagedEntry) -> dict[str, Any]:
59+
value_holder: dict[str, Any] = verify_dict(obj=value_field)
60+
61+
data: dict[str, Any] = {}
62+
63+
# The Value field is an object with two possible fields: `object` and `string`
64+
# - `object`: The value is a native BSON dict
65+
# - `string`: The value is a JSON string
66+
# Mongo stores datetimes without timezones as UTC so we mark them as UTC
67+
68+
if created_at_datetime := document.get("created_at"):
69+
if not isinstance(created_at_datetime, datetime):
70+
msg = "Expected `created_at` field to be a datetime"
71+
raise DeserializationError(msg)
72+
data["created_at"] = created_at_datetime.replace(tzinfo=timezone.utc)
73+
74+
if expires_at_datetime := document.get("expires_at"):
75+
if not isinstance(expires_at_datetime, datetime):
76+
msg = "Expected `expires_at` field to be a datetime"
77+
raise DeserializationError(msg)
78+
data["expires_at"] = expires_at_datetime.replace(tzinfo=timezone.utc)
79+
80+
if value_object := value_holder.get("object"):
81+
return ManagedEntry.from_dict(data={"value": value_object, **data})
82+
83+
if value_string := value_holder.get("string"):
84+
return ManagedEntry.from_dict(data={"value": value_string, **data}, stringified_value=True)
85+
86+
msg = "Expected `value` field to be an object with `object` or `string` subfield"
87+
raise DeserializationError(msg)
88+
89+
90+
def managed_entry_to_document(key: str, managed_entry: ManagedEntry, *, native_storage: bool = True) -> dict[str, Any]:
5291
"""Convert a ManagedEntry to a MongoDB document for storage.
5392
5493
This function serializes a ManagedEntry to a MongoDB document format, including the key and all
55-
metadata (TTL, creation, and expiration timestamps). The value is stringified to ensure proper
56-
storage in MongoDB. The serialization is designed to preserve all entry information for round-trip
57-
conversion back to a ManagedEntry.
94+
metadata (TTL, creation, and expiration timestamps). The value storage format depends on the
95+
native_storage parameter.
5896
5997
Args:
6098
key: The key associated with this entry.
6199
managed_entry: The ManagedEntry to serialize.
100+
native_storage: If True (default), store value as native BSON dict in value.object field.
101+
If False, store as JSON string in value.string field for backward compatibility.
62102
63103
Returns:
64104
A MongoDB document dict containing the key, value, and all metadata.
65105
"""
66-
return {
67-
"key": key,
68-
**managed_entry.to_dict(include_metadata=True, include_expiration=True, include_creation=True, stringify_value=True),
69-
}
106+
document: dict[str, Any] = {"key": key, "value": {}}
107+
108+
# We convert to JSON even if we don't need to, this ensures that the value we were provided
109+
# can be serialized to JSON which helps ensure compatibility across stores. For example,
110+
# Mongo can natively handle datetime objects which other stores cannot, if we don't convert to JSON,
111+
# then using py-key-value with Mongo will return different values than if we used another store.
112+
json_str = managed_entry.value_as_json
113+
114+
# Store in appropriate field based on mode
115+
if native_storage:
116+
document["value"]["object"] = managed_entry.value_as_dict
117+
else:
118+
document["value"]["string"] = json_str
119+
120+
# Add metadata fields
121+
if managed_entry.created_at:
122+
document["created_at"] = managed_entry.created_at
123+
if managed_entry.expires_at:
124+
document["expires_at"] = managed_entry.expires_at
125+
126+
return document
70127

71128

72129
class MongoDBStore(BaseEnumerateCollectionsStore, BaseDestroyCollectionStore, BaseContextManagerStore, BaseStore):
73-
"""MongoDB-based key-value store using Motor (async MongoDB driver)."""
130+
"""MongoDB-based key-value store using pymongo."""
74131

75132
_client: AsyncMongoClient[dict[str, Any]]
76133
_db: AsyncDatabase[dict[str, Any]]
77134
_collections_by_name: dict[str, AsyncCollection[dict[str, Any]]]
135+
_native_storage: bool
78136

79137
@overload
80138
def __init__(
@@ -83,6 +141,7 @@ def __init__(
83141
client: AsyncMongoClient[dict[str, Any]],
84142
db_name: str | None = None,
85143
coll_name: str | None = None,
144+
native_storage: bool = True,
86145
default_collection: str | None = None,
87146
) -> None:
88147
"""Initialize the MongoDB store.
@@ -91,19 +150,27 @@ def __init__(
91150
client: The MongoDB client to use.
92151
db_name: The name of the MongoDB database.
93152
coll_name: The name of the MongoDB collection.
153+
native_storage: Whether to use native BSON storage (True, default) or JSON string storage (False).
94154
default_collection: The default collection to use if no collection is provided.
95155
"""
96156

97157
@overload
98158
def __init__(
99-
self, *, url: str, db_name: str | None = None, coll_name: str | None = None, default_collection: str | None = None
159+
self,
160+
*,
161+
url: str,
162+
db_name: str | None = None,
163+
coll_name: str | None = None,
164+
native_storage: bool = True,
165+
default_collection: str | None = None,
100166
) -> None:
101167
"""Initialize the MongoDB store.
102168
103169
Args:
104170
url: The url of the MongoDB cluster.
105171
db_name: The name of the MongoDB database.
106172
coll_name: The name of the MongoDB collection.
173+
native_storage: Whether to use native BSON storage (True, default) or JSON string storage (False).
107174
default_collection: The default collection to use if no collection is provided.
108175
"""
109176

@@ -114,9 +181,21 @@ def __init__(
114181
url: str | None = None,
115182
db_name: str | None = None,
116183
coll_name: str | None = None,
184+
native_storage: bool = True,
117185
default_collection: str | None = None,
118186
) -> None:
119-
"""Initialize the MongoDB store."""
187+
"""Initialize the MongoDB store.
188+
189+
Args:
190+
client: The MongoDB client to use (mutually exclusive with url).
191+
url: The url of the MongoDB cluster (mutually exclusive with client).
192+
db_name: The name of the MongoDB database.
193+
coll_name: The name of the MongoDB collection.
194+
native_storage: Whether to use native BSON storage (True, default) or JSON string storage (False).
195+
Native storage stores values as BSON dicts for better query support.
196+
Legacy mode stores values as JSON strings for backward compatibility.
197+
default_collection: The default collection to use if no collection is provided.
198+
"""
120199

121200
if client:
122201
self._client = client
@@ -131,6 +210,7 @@ def __init__(
131210

132211
self._db = self._client[db_name]
133212
self._collections_by_name = {}
213+
self._native_storage = native_storage
134214

135215
super().__init__(default_collection=default_collection)
136216

@@ -158,7 +238,7 @@ def _sanitize_collection_name(self, collection: str) -> str:
158238
Returns:
159239
A sanitized collection name that meets MongoDB requirements.
160240
"""
161-
return sanitize_string(value=collection, max_length=MAX_COLLECTION_LENGTH, allowed_characters=ALPHANUMERIC_CHARACTERS)
241+
return sanitize_string(value=collection, max_length=MAX_COLLECTION_LENGTH, allowed_characters=COLLECTION_ALLOWED_CHARACTERS)
162242

163243
@override
164244
async def _setup_collection(self, *, collection: str) -> None:
@@ -187,7 +267,7 @@ async def _get_managed_entry(self, *, key: str, collection: str) -> ManagedEntry
187267
sanitized_collection = self._sanitize_collection_name(collection=collection)
188268

189269
if doc := await self._collections_by_name[sanitized_collection].find_one(filter={"key": key}):
190-
return ManagedEntry.from_dict(data=doc, stringified_value=True)
270+
return document_to_managed_entry(document=doc)
191271

192272
return None
193273

@@ -217,7 +297,7 @@ async def _put_managed_entry(
217297
collection: str,
218298
managed_entry: ManagedEntry,
219299
) -> None:
220-
mongo_doc: dict[str, Any] = managed_entry_to_document(key=key, managed_entry=managed_entry)
300+
mongo_doc: dict[str, Any] = managed_entry_to_document(key=key, managed_entry=managed_entry, native_storage=self._native_storage)
221301

222302
sanitized_collection = self._sanitize_collection_name(collection=collection)
223303

@@ -248,7 +328,7 @@ async def _put_managed_entries(
248328

249329
operations: list[UpdateOne] = []
250330
for key, managed_entry in zip(keys, managed_entries, strict=True):
251-
mongo_doc: dict[str, Any] = managed_entry_to_document(key=key, managed_entry=managed_entry)
331+
mongo_doc: dict[str, Any] = managed_entry_to_document(key=key, managed_entry=managed_entry, native_storage=self._native_storage)
252332

253333
operations.append(
254334
UpdateOne(

key-value/key-value-aio/tests/conftest.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ def docker_container(
168168
finally:
169169
docker_stop(name, raise_on_error=False)
170170
docker_rm(name, raise_on_error=False)
171+
docker_wait_container_gone(name=name, max_tries=10, wait_time=1.0)
171172

172173
logger.info(f"Container {name} stopped and removed")
173174
return

0 commit comments

Comments
 (0)