Skip to content

Commit c612c08

Browse files
feat: add native storage mode for MongoDB
Add support for storing dictionaries as native BSON objects in MongoDB, similar to the native storage feature implemented for Elasticsearch in #149. Changes: - Add native_storage parameter to MongoDBStore (defaults to True) - Use two-field approach: value.dict for native BSON, value.string for legacy JSON strings - Support reading both formats for backward compatibility and migration - Update serialization/deserialization functions to handle both modes - Add comprehensive tests for native mode, legacy mode, and migration scenarios - Update sync library via codegen The native storage mode stores values as BSON documents instead of JSON strings, enabling better query support and more efficient storage in MongoDB. Resolves #159 🤖 Generated with [Claude Code](https://claude.ai/code) Co-authored-by: William Easton <strawgate@users.noreply.github.com>
1 parent 681aa5e commit c612c08

File tree

4 files changed

+407
-36
lines changed

4 files changed

+407
-36
lines changed

key-value/key-value-aio/src/key_value/aio/stores/mongodb/store.py

Lines changed: 66 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,36 +37,66 @@ def document_to_managed_entry(document: dict[str, Any]) -> ManagedEntry:
3737
"""Convert a MongoDB document back to a ManagedEntry.
3838
3939
This function deserializes a MongoDB document (created by `managed_entry_to_document`) back to a
40-
ManagedEntry object, parsing the stringified value field and preserving all metadata.
40+
ManagedEntry object. It supports both native BSON storage (dict in value.dict field) and legacy
41+
JSON string storage (string in value.string field) for migration support.
4142
4243
Args:
4344
document: The MongoDB document to convert.
4445
4546
Returns:
4647
A ManagedEntry object reconstructed from the document.
4748
"""
49+
# Check if we have the new two-field structure
50+
value_field = document.get("value")
51+
52+
if isinstance(value_field, dict):
53+
# New format: check for dict or string subfields
54+
if "dict" in value_field:
55+
# Native storage mode - value is already a dict
56+
return ManagedEntry.from_dict(
57+
data={"value": value_field["dict"], **{k: v for k, v in document.items() if k != "value"}}, stringified_value=False
58+
)
59+
if "string" in value_field:
60+
# Legacy storage mode - value is a JSON string
61+
return ManagedEntry.from_dict(
62+
data={"value": value_field["string"], **{k: v for k, v in document.items() if k != "value"}}, stringified_value=True
63+
)
64+
65+
# Old format: value field is directly a string
4866
return ManagedEntry.from_dict(data=document, stringified_value=True)
4967

5068

51-
def managed_entry_to_document(key: str, managed_entry: ManagedEntry) -> dict[str, Any]:
69+
def managed_entry_to_document(key: str, managed_entry: ManagedEntry, *, native_storage: bool = True) -> dict[str, Any]:
5270
"""Convert a ManagedEntry to a MongoDB document for storage.
5371
5472
This function serializes a ManagedEntry to a MongoDB document format, including the key and all
55-
metadata (TTL, creation, and expiration timestamps). The value is stringified to ensure proper
56-
storage in MongoDB. The serialization is designed to preserve all entry information for round-trip
57-
conversion back to a ManagedEntry.
73+
metadata (TTL, creation, and expiration timestamps). The value storage format depends on the
74+
native_storage parameter.
5875
5976
Args:
6077
key: The key associated with this entry.
6178
managed_entry: The ManagedEntry to serialize.
79+
native_storage: If True (default), store value as native BSON dict in value.dict field.
80+
If False, store as JSON string in value.string field for backward compatibility.
6281
6382
Returns:
6483
A MongoDB document dict containing the key, value, and all metadata.
6584
"""
66-
return {
67-
"key": key,
68-
**managed_entry.to_dict(include_metadata=True, include_expiration=True, include_creation=True, stringify_value=True),
69-
}
85+
document: dict[str, Any] = {"key": key, "value": {}}
86+
87+
# Store in appropriate field based on mode
88+
if native_storage:
89+
document["value"]["dict"] = managed_entry.value_as_dict
90+
else:
91+
document["value"]["string"] = managed_entry.value_as_json
92+
93+
# Add metadata fields
94+
if managed_entry.created_at:
95+
document["created_at"] = managed_entry.created_at
96+
if managed_entry.expires_at:
97+
document["expires_at"] = managed_entry.expires_at
98+
99+
return document
70100

71101

72102
class MongoDBStore(BaseEnumerateCollectionsStore, BaseDestroyCollectionStore, BaseContextManagerStore, BaseStore):
@@ -75,6 +105,7 @@ class MongoDBStore(BaseEnumerateCollectionsStore, BaseDestroyCollectionStore, Ba
75105
_client: AsyncMongoClient[dict[str, Any]]
76106
_db: AsyncDatabase[dict[str, Any]]
77107
_collections_by_name: dict[str, AsyncCollection[dict[str, Any]]]
108+
_native_storage: bool
78109

79110
@overload
80111
def __init__(
@@ -83,6 +114,7 @@ def __init__(
83114
client: AsyncMongoClient[dict[str, Any]],
84115
db_name: str | None = None,
85116
coll_name: str | None = None,
117+
native_storage: bool = True,
86118
default_collection: str | None = None,
87119
) -> None:
88120
"""Initialize the MongoDB store.
@@ -91,19 +123,27 @@ def __init__(
91123
client: The MongoDB client to use.
92124
db_name: The name of the MongoDB database.
93125
coll_name: The name of the MongoDB collection.
126+
native_storage: Whether to use native BSON storage (True, default) or JSON string storage (False).
94127
default_collection: The default collection to use if no collection is provided.
95128
"""
96129

97130
@overload
98131
def __init__(
99-
self, *, url: str, db_name: str | None = None, coll_name: str | None = None, default_collection: str | None = None
132+
self,
133+
*,
134+
url: str,
135+
db_name: str | None = None,
136+
coll_name: str | None = None,
137+
native_storage: bool = True,
138+
default_collection: str | None = None,
100139
) -> None:
101140
"""Initialize the MongoDB store.
102141
103142
Args:
104143
url: The url of the MongoDB cluster.
105144
db_name: The name of the MongoDB database.
106145
coll_name: The name of the MongoDB collection.
146+
native_storage: Whether to use native BSON storage (True, default) or JSON string storage (False).
107147
default_collection: The default collection to use if no collection is provided.
108148
"""
109149

@@ -114,9 +154,21 @@ def __init__(
114154
url: str | None = None,
115155
db_name: str | None = None,
116156
coll_name: str | None = None,
157+
native_storage: bool = True,
117158
default_collection: str | None = None,
118159
) -> None:
119-
"""Initialize the MongoDB store."""
160+
"""Initialize the MongoDB store.
161+
162+
Args:
163+
client: The MongoDB client to use (mutually exclusive with url).
164+
url: The url of the MongoDB cluster (mutually exclusive with client).
165+
db_name: The name of the MongoDB database.
166+
coll_name: The name of the MongoDB collection.
167+
native_storage: Whether to use native BSON storage (True, default) or JSON string storage (False).
168+
Native storage stores values as BSON dicts for better query support.
169+
Legacy mode stores values as JSON strings for backward compatibility.
170+
default_collection: The default collection to use if no collection is provided.
171+
"""
120172

121173
if client:
122174
self._client = client
@@ -131,6 +183,7 @@ def __init__(
131183

132184
self._db = self._client[db_name]
133185
self._collections_by_name = {}
186+
self._native_storage = native_storage
134187

135188
super().__init__(default_collection=default_collection)
136189

@@ -217,7 +270,7 @@ async def _put_managed_entry(
217270
collection: str,
218271
managed_entry: ManagedEntry,
219272
) -> None:
220-
mongo_doc: dict[str, Any] = managed_entry_to_document(key=key, managed_entry=managed_entry)
273+
mongo_doc: dict[str, Any] = managed_entry_to_document(key=key, managed_entry=managed_entry, native_storage=self._native_storage)
221274

222275
sanitized_collection = self._sanitize_collection_name(collection=collection)
223276

@@ -248,7 +301,7 @@ async def _put_managed_entries(
248301

249302
operations: list[UpdateOne] = []
250303
for key, managed_entry in zip(keys, managed_entries, strict=True):
251-
mongo_doc: dict[str, Any] = managed_entry_to_document(key=key, managed_entry=managed_entry)
304+
mongo_doc: dict[str, Any] = managed_entry_to_document(key=key, managed_entry=managed_entry, native_storage=self._native_storage)
252305

253306
operations.append(
254307
UpdateOne(

key-value/key-value-aio/tests/stores/mongodb/test_mongodb.py

Lines changed: 140 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,19 +44,19 @@ class MongoDBFailedToStartError(Exception):
4444
pass
4545

4646

47-
def test_managed_entry_document_conversion():
47+
def test_managed_entry_document_conversion_native_mode():
4848
created_at = datetime(year=2025, month=1, day=1, hour=0, minute=0, second=0, tzinfo=timezone.utc)
4949
expires_at = created_at + timedelta(seconds=10)
5050

5151
managed_entry = ManagedEntry(value={"test": "test"}, created_at=created_at, expires_at=expires_at)
52-
document = managed_entry_to_document(key="test", managed_entry=managed_entry)
52+
document = managed_entry_to_document(key="test", managed_entry=managed_entry, native_storage=True)
5353

5454
assert document == snapshot(
5555
{
5656
"key": "test",
57-
"value": '{"test": "test"}',
58-
"created_at": "2025-01-01T00:00:00+00:00",
59-
"expires_at": "2025-01-01T00:00:10+00:00",
57+
"value": {"dict": {"test": "test"}},
58+
"created_at": datetime(2025, 1, 1, 0, 0, tzinfo=timezone.utc),
59+
"expires_at": datetime(2025, 1, 1, 0, 0, 10, tzinfo=timezone.utc),
6060
}
6161
)
6262

@@ -68,8 +68,55 @@ def test_managed_entry_document_conversion():
6868
assert round_trip_managed_entry.expires_at == expires_at
6969

7070

71+
def test_managed_entry_document_conversion_legacy_mode():
72+
created_at = datetime(year=2025, month=1, day=1, hour=0, minute=0, second=0, tzinfo=timezone.utc)
73+
expires_at = created_at + timedelta(seconds=10)
74+
75+
managed_entry = ManagedEntry(value={"test": "test"}, created_at=created_at, expires_at=expires_at)
76+
document = managed_entry_to_document(key="test", managed_entry=managed_entry, native_storage=False)
77+
78+
assert document == snapshot(
79+
{
80+
"key": "test",
81+
"value": {"string": '{"test": "test"}'},
82+
"created_at": datetime(2025, 1, 1, 0, 0, tzinfo=timezone.utc),
83+
"expires_at": datetime(2025, 1, 1, 0, 0, 10, tzinfo=timezone.utc),
84+
}
85+
)
86+
87+
round_trip_managed_entry = document_to_managed_entry(document=document)
88+
89+
assert round_trip_managed_entry.value == managed_entry.value
90+
assert round_trip_managed_entry.created_at == created_at
91+
assert round_trip_managed_entry.ttl == IsFloat(lt=0)
92+
assert round_trip_managed_entry.expires_at == expires_at
93+
94+
95+
def test_managed_entry_document_conversion_old_format():
96+
"""Test backward compatibility with old format where value is directly a string."""
97+
created_at = datetime(year=2025, month=1, day=1, hour=0, minute=0, second=0, tzinfo=timezone.utc)
98+
expires_at = created_at + timedelta(seconds=10)
99+
100+
# Simulate old document format
101+
old_document = {
102+
"key": "test",
103+
"value": '{"test": "test"}',
104+
"created_at": "2025-01-01T00:00:00+00:00",
105+
"expires_at": "2025-01-01T00:00:10+00:00",
106+
}
107+
108+
round_trip_managed_entry = document_to_managed_entry(document=old_document)
109+
110+
assert round_trip_managed_entry.value == {"test": "test"}
111+
assert round_trip_managed_entry.created_at == created_at
112+
assert round_trip_managed_entry.ttl == IsFloat(lt=0)
113+
assert round_trip_managed_entry.expires_at == expires_at
114+
115+
71116
@pytest.mark.skipif(should_skip_docker_tests(), reason="Docker is not available")
72117
class TestMongoDBStore(ContextManagerStoreTestMixin, BaseStoreTests):
118+
"""Test MongoDBStore with native_storage=False (legacy mode) for backward compatibility."""
119+
73120
@pytest.fixture(autouse=True, scope="session", params=MONGODB_VERSIONS_TO_TEST)
74121
async def setup_mongodb(self, request: pytest.FixtureRequest) -> AsyncGenerator[None, None]:
75122
version = request.param
@@ -84,7 +131,8 @@ async def setup_mongodb(self, request: pytest.FixtureRequest) -> AsyncGenerator[
84131
@override
85132
@pytest.fixture
86133
async def store(self, setup_mongodb: None) -> MongoDBStore:
87-
store = MongoDBStore(url=f"mongodb://{MONGODB_HOST}:{MONGODB_HOST_PORT}", db_name=MONGODB_TEST_DB)
134+
# Use legacy mode (native_storage=False) to test backward compatibility
135+
store = MongoDBStore(url=f"mongodb://{MONGODB_HOST}:{MONGODB_HOST_PORT}", db_name=MONGODB_TEST_DB, native_storage=False)
88136
# Ensure a clean db by dropping our default test collection if it exists
89137
with contextlib.suppress(Exception):
90138
_ = await store._client.drop_database(name_or_database=MONGODB_TEST_DB) # pyright: ignore[reportPrivateUsage]
@@ -106,3 +154,89 @@ async def test_mongodb_collection_name_sanitization(self, mongodb_store: MongoDB
106154

107155
collections = await mongodb_store.collections()
108156
assert collections == snapshot(["test_collection_-daf4a2ec"])
157+
158+
159+
@pytest.mark.skipif(should_skip_docker_tests(), reason="Docker is not available")
160+
class TestMongoDBStoreNativeMode(ContextManagerStoreTestMixin, BaseStoreTests):
161+
"""Test MongoDBStore with native_storage=True (default)."""
162+
163+
@pytest.fixture(autouse=True, scope="session", params=MONGODB_VERSIONS_TO_TEST)
164+
async def setup_mongodb(self, request: pytest.FixtureRequest) -> AsyncGenerator[None, None]:
165+
version = request.param
166+
167+
with docker_container(f"mongodb-test-native-{version}", f"mongo:{version}", {str(MONGODB_HOST_PORT): MONGODB_HOST_PORT}):
168+
if not await async_wait_for_true(bool_fn=ping_mongodb, tries=WAIT_FOR_MONGODB_TIMEOUT, wait_time=1):
169+
msg = f"MongoDB {version} failed to start"
170+
raise MongoDBFailedToStartError(msg)
171+
172+
yield
173+
174+
@override
175+
@pytest.fixture
176+
async def store(self, setup_mongodb: None) -> MongoDBStore:
177+
store = MongoDBStore(url=f"mongodb://{MONGODB_HOST}:{MONGODB_HOST_PORT}", db_name=f"{MONGODB_TEST_DB}-native", native_storage=True)
178+
# Ensure a clean db by dropping our default test collection if it exists
179+
with contextlib.suppress(Exception):
180+
_ = await store._client.drop_database(name_or_database=f"{MONGODB_TEST_DB}-native") # pyright: ignore[reportPrivateUsage]
181+
182+
return store
183+
184+
@pytest.fixture
185+
async def mongodb_store(self, store: MongoDBStore) -> MongoDBStore:
186+
return store
187+
188+
@pytest.mark.skip(reason="Distributed Caches are unbounded")
189+
@override
190+
async def test_not_unbounded(self, store: BaseStore): ...
191+
192+
async def test_value_stored_as_bson_dict(self, mongodb_store: MongoDBStore):
193+
"""Verify values are stored as BSON dicts, not JSON strings."""
194+
await mongodb_store.put(collection="test", key="test_key", value={"name": "Alice", "age": 30})
195+
196+
# Get the raw MongoDB document
197+
await mongodb_store._setup_collection(collection="test") # pyright: ignore[reportPrivateUsage]
198+
sanitized_collection = mongodb_store._sanitize_collection_name(collection="test") # pyright: ignore[reportPrivateUsage]
199+
collection = mongodb_store._collections_by_name[sanitized_collection] # pyright: ignore[reportPrivateUsage]
200+
doc = await collection.find_one({"key": "test_key"})
201+
202+
# In native mode, value should be a dict with "dict" subfield
203+
assert doc is not None
204+
assert isinstance(doc["value"], dict)
205+
assert "dict" in doc["value"]
206+
assert doc["value"]["dict"] == {"name": "Alice", "age": 30}
207+
208+
async def test_migration_from_legacy_mode(self, mongodb_store: MongoDBStore):
209+
"""Verify native mode can read legacy JSON string data."""
210+
# Manually insert a legacy document with JSON string value in the new format
211+
await mongodb_store._setup_collection(collection="test") # pyright: ignore[reportPrivateUsage]
212+
sanitized_collection = mongodb_store._sanitize_collection_name(collection="test") # pyright: ignore[reportPrivateUsage]
213+
collection = mongodb_store._collections_by_name[sanitized_collection] # pyright: ignore[reportPrivateUsage]
214+
215+
await collection.insert_one(
216+
{
217+
"key": "legacy_key",
218+
"value": {"string": '{"legacy": "data"}'}, # New format with JSON string
219+
}
220+
)
221+
222+
# Should be able to read it in native mode
223+
result = await mongodb_store.get(collection="test", key="legacy_key")
224+
assert result == {"legacy": "data"}
225+
226+
async def test_migration_from_old_format(self, mongodb_store: MongoDBStore):
227+
"""Verify native mode can read old format where value is directly a string."""
228+
# Manually insert an old document with value directly as JSON string
229+
await mongodb_store._setup_collection(collection="test") # pyright: ignore[reportPrivateUsage]
230+
sanitized_collection = mongodb_store._sanitize_collection_name(collection="test") # pyright: ignore[reportPrivateUsage]
231+
collection = mongodb_store._collections_by_name[sanitized_collection] # pyright: ignore[reportPrivateUsage]
232+
233+
await collection.insert_one(
234+
{
235+
"key": "old_key",
236+
"value": '{"old": "format"}', # Old format: value directly as JSON string
237+
}
238+
)
239+
240+
# Should be able to read it in native mode
241+
result = await mongodb_store.get(collection="test", key="old_key")
242+
assert result == {"old": "format"}

0 commit comments

Comments
 (0)