Skip to content

Commit 2a2094f

Browse files
feat: update DuckDB store for PR #204 serialization changes
- Add version, key, and collection fields to serialized JSON documents - Update DuckDBSerializationAdapter to store metadata in JSON columns - Split prepare_load into helper methods for better code organization - Add type annotations to satisfy type checker - Regenerate sync library Co-authored-by: William Easton <strawgate@users.noreply.github.com>
1 parent 36f674c commit 2a2094f

File tree

2 files changed

+122
-58
lines changed
  • key-value
    • key-value-aio/src/key_value/aio/stores/duckdb
    • key-value-sync/src/key_value/sync/code_gen/stores/duckdb

2 files changed

+122
-58
lines changed

key-value/key-value-aio/src/key_value/aio/stores/duckdb/store.py

Lines changed: 61 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import json
22
from datetime import datetime, timezone
33
from pathlib import Path
4-
from typing import Any, overload
4+
from typing import Any, cast, overload
55

66
from key_value.shared.errors import DeserializationError
77
from key_value.shared.utils.managed_entry import ManagedEntry
@@ -41,58 +41,92 @@ def prepare_dump(self, data: dict[str, Any]) -> dict[str, Any]:
4141
"""Prepare data for dumping to DuckDB.
4242
4343
Moves the value to the appropriate column (value_dict or value_json)
44-
and sets the other column to None.
44+
and sets the other column to None. Also includes version, key, and collection
45+
fields in the JSON for compatibility with deserialization.
4546
"""
4647
value = data.pop("value")
4748

49+
# Extract version, key, and collection to include in the JSON
50+
version = data.pop("version", None)
51+
key = data.pop("key", None)
52+
collection_name = data.pop("collection", None)
53+
54+
# Build the document to store in JSON columns
55+
json_document: dict[str, Any] = {"value": value}
56+
57+
if version is not None:
58+
json_document["version"] = version
59+
if key is not None:
60+
json_document["key"] = key
61+
if collection_name is not None:
62+
json_document["collection"] = collection_name
63+
4864
# Set both columns to None, then populate the appropriate one
4965
data["value_json"] = None
5066
data["value_dict"] = None
5167

5268
if self._native_storage:
53-
# For native storage, we pass the JSON string to DuckDB's JSON column
69+
# For native storage, convert the document to JSON string for DuckDB's JSON column
5470
# DuckDB will parse it and store it as native JSON
55-
data["value_dict"] = value
71+
data["value_dict"] = json.dumps(json_document)
5672
else:
57-
# For TEXT storage, value should be a JSON string
58-
data["value_json"] = value
73+
# For TEXT storage, store as JSON string
74+
data["value_json"] = json.dumps(json_document)
5975

6076
return data
6177

6278
@override
6379
def prepare_load(self, data: dict[str, Any]) -> dict[str, Any]:
6480
"""Prepare data loaded from DuckDB for conversion to ManagedEntry.
6581
66-
Extracts value from the appropriate column and handles timezone conversion
67-
for DuckDB's naive timestamps.
82+
Extracts value, version, key, and collection from the JSON columns
83+
and handles timezone conversion for DuckDB's naive timestamps.
6884
"""
6985
value_json = data.pop("value_json", None)
7086
value_dict = data.pop("value_dict", None)
7187

72-
# Determine which value column to use (prefer value_dict if present)
88+
# Parse the JSON document from the appropriate column
89+
json_document = self._parse_json_column(value_dict, value_json)
90+
91+
# Extract fields from the JSON document
92+
data["value"] = json_document.get("value")
93+
if "version" in json_document:
94+
data["version"] = json_document["version"]
95+
if "key" in json_document:
96+
data["key"] = json_document["key"]
97+
if "collection" in json_document:
98+
data["collection"] = json_document["collection"]
99+
100+
# DuckDB always returns naive timestamps, but ManagedEntry expects timezone-aware ones
101+
self._convert_timestamps_to_utc(data)
102+
103+
return data
104+
105+
def _parse_json_column(self, value_dict: Any, value_json: Any) -> dict[str, Any]: # noqa: ANN401
106+
"""Parse JSON from value_dict or value_json column."""
73107
if value_dict is not None:
74108
# Native storage mode - value_dict can be dict or string (DuckDB JSON returns as string)
75109
if isinstance(value_dict, dict):
76-
data["value"] = value_dict
77-
elif isinstance(value_dict, str):
78-
# DuckDB sometimes returns JSON as string, parse it
79-
data["value"] = json.loads(value_dict)
80-
else:
81-
msg = f"value_dict has unexpected type: {type(value_dict)}"
82-
raise DeserializationError(message=msg)
83-
elif value_json is not None:
110+
return cast(dict[str, Any], value_dict)
111+
if isinstance(value_dict, str):
112+
parsed: dict[str, Any] = json.loads(value_dict)
113+
return parsed
114+
msg = f"value_dict has unexpected type: {type(value_dict)}"
115+
raise DeserializationError(message=msg)
116+
117+
if value_json is not None:
84118
# Stringified JSON mode - parse from string
85119
if isinstance(value_json, str):
86-
data["value"] = json.loads(value_json)
87-
else:
88-
msg = f"value_json has unexpected type: {type(value_json)}"
89-
raise DeserializationError(message=msg)
90-
else:
91-
msg = "Neither value_dict nor value_json column contains data"
120+
parsed_json: dict[str, Any] = json.loads(value_json)
121+
return parsed_json
122+
msg = f"value_json has unexpected type: {type(value_json)}"
92123
raise DeserializationError(message=msg)
93124

94-
# DuckDB always returns naive timestamps, but ManagedEntry expects timezone-aware ones
95-
# Convert to timezone-aware UTC timestamps. Handle None values explicitly.
125+
msg = "Neither value_dict nor value_json column contains data"
126+
raise DeserializationError(message=msg)
127+
128+
def _convert_timestamps_to_utc(self, data: dict[str, Any]) -> None:
129+
"""Convert naive timestamps to UTC timezone-aware timestamps."""
96130
created_at = data.get("created_at")
97131
if created_at is not None and isinstance(created_at, datetime) and created_at.tzinfo is None:
98132
data["created_at"] = created_at.astimezone(tz=timezone.utc)
@@ -101,8 +135,6 @@ def prepare_load(self, data: dict[str, Any]) -> dict[str, Any]:
101135
if expires_at is not None and isinstance(expires_at, datetime) and expires_at.tzinfo is None:
102136
data["expires_at"] = expires_at.astimezone(tz=timezone.utc)
103137

104-
return data
105-
106138

107139
class DuckDBStore(BaseContextManagerStore, BaseStore):
108140
"""A DuckDB-based key-value store supporting both in-memory and persistent storage.
@@ -378,8 +410,8 @@ async def _put_managed_entry(
378410
msg = "Cannot operate on closed DuckDBStore"
379411
raise RuntimeError(msg)
380412

381-
# Use adapter to dump the managed entry to a dict
382-
document = self._adapter.dump_dict(entry=managed_entry, exclude_none=False)
413+
# Use adapter to dump the managed entry to a dict with key and collection
414+
document = self._adapter.dump_dict(entry=managed_entry, exclude_none=False, key=key, collection=collection)
383415

384416
# Insert or replace the entry with metadata in separate columns
385417
self._connection.execute(

key-value/key-value-sync/src/key_value/sync/code_gen/stores/duckdb/store.py

Lines changed: 61 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import json
55
from datetime import datetime, timezone
66
from pathlib import Path
7-
from typing import Any, overload
7+
from typing import Any, cast, overload
88

99
from key_value.shared.errors import DeserializationError
1010
from key_value.shared.utils.managed_entry import ManagedEntry
@@ -44,58 +44,92 @@ def prepare_dump(self, data: dict[str, Any]) -> dict[str, Any]:
4444
"""Prepare data for dumping to DuckDB.
4545
4646
Moves the value to the appropriate column (value_dict or value_json)
47-
and sets the other column to None.
47+
and sets the other column to None. Also includes version, key, and collection
48+
fields in the JSON for compatibility with deserialization.
4849
"""
4950
value = data.pop("value")
5051

52+
# Extract version, key, and collection to include in the JSON
53+
version = data.pop("version", None)
54+
key = data.pop("key", None)
55+
collection_name = data.pop("collection", None)
56+
57+
# Build the document to store in JSON columns
58+
json_document: dict[str, Any] = {"value": value}
59+
60+
if version is not None:
61+
json_document["version"] = version
62+
if key is not None:
63+
json_document["key"] = key
64+
if collection_name is not None:
65+
json_document["collection"] = collection_name
66+
5167
# Set both columns to None, then populate the appropriate one
5268
data["value_json"] = None
5369
data["value_dict"] = None
5470

5571
if self._native_storage:
56-
# For native storage, we pass the JSON string to DuckDB's JSON column
72+
# For native storage, convert the document to JSON string for DuckDB's JSON column
5773
# DuckDB will parse it and store it as native JSON
58-
data["value_dict"] = value
74+
data["value_dict"] = json.dumps(json_document)
5975
else:
60-
# For TEXT storage, value should be a JSON string
61-
data["value_json"] = value
76+
# For TEXT storage, store as JSON string
77+
data["value_json"] = json.dumps(json_document)
6278

6379
return data
6480

6581
@override
6682
def prepare_load(self, data: dict[str, Any]) -> dict[str, Any]:
6783
"""Prepare data loaded from DuckDB for conversion to ManagedEntry.
6884
69-
Extracts value from the appropriate column and handles timezone conversion
70-
for DuckDB's naive timestamps.
85+
Extracts value, version, key, and collection from the JSON columns
86+
and handles timezone conversion for DuckDB's naive timestamps.
7187
"""
7288
value_json = data.pop("value_json", None)
7389
value_dict = data.pop("value_dict", None)
7490

75-
# Determine which value column to use (prefer value_dict if present)
91+
# Parse the JSON document from the appropriate column
92+
json_document = self._parse_json_column(value_dict, value_json)
93+
94+
# Extract fields from the JSON document
95+
data["value"] = json_document.get("value")
96+
if "version" in json_document:
97+
data["version"] = json_document["version"]
98+
if "key" in json_document:
99+
data["key"] = json_document["key"]
100+
if "collection" in json_document:
101+
data["collection"] = json_document["collection"]
102+
103+
# DuckDB always returns naive timestamps, but ManagedEntry expects timezone-aware ones
104+
self._convert_timestamps_to_utc(data)
105+
106+
return data
107+
108+
def _parse_json_column(self, value_dict: Any, value_json: Any) -> dict[str, Any]:
109+
"Parse JSON from value_dict or value_json column."
76110
if value_dict is not None:
77111
# Native storage mode - value_dict can be dict or string (DuckDB JSON returns as string)
78112
if isinstance(value_dict, dict):
79-
data["value"] = value_dict
80-
elif isinstance(value_dict, str):
81-
# DuckDB sometimes returns JSON as string, parse it
82-
data["value"] = json.loads(value_dict)
83-
else:
84-
msg = f"value_dict has unexpected type: {type(value_dict)}"
85-
raise DeserializationError(message=msg)
86-
elif value_json is not None:
113+
return cast("dict[str, Any]", value_dict)
114+
if isinstance(value_dict, str):
115+
parsed: dict[str, Any] = json.loads(value_dict)
116+
return parsed
117+
msg = f"value_dict has unexpected type: {type(value_dict)}"
118+
raise DeserializationError(message=msg)
119+
120+
if value_json is not None:
87121
# Stringified JSON mode - parse from string
88122
if isinstance(value_json, str):
89-
data["value"] = json.loads(value_json)
90-
else:
91-
msg = f"value_json has unexpected type: {type(value_json)}"
92-
raise DeserializationError(message=msg)
93-
else:
94-
msg = "Neither value_dict nor value_json column contains data"
123+
parsed_json: dict[str, Any] = json.loads(value_json)
124+
return parsed_json
125+
msg = f"value_json has unexpected type: {type(value_json)}"
95126
raise DeserializationError(message=msg)
96127

97-
# DuckDB always returns naive timestamps, but ManagedEntry expects timezone-aware ones
98-
# Convert to timezone-aware UTC timestamps. Handle None values explicitly.
128+
msg = "Neither value_dict nor value_json column contains data"
129+
raise DeserializationError(message=msg)
130+
131+
def _convert_timestamps_to_utc(self, data: dict[str, Any]) -> None:
132+
"""Convert naive timestamps to UTC timezone-aware timestamps."""
99133
created_at = data.get("created_at")
100134
if created_at is not None and isinstance(created_at, datetime) and (created_at.tzinfo is None):
101135
data["created_at"] = created_at.astimezone(tz=timezone.utc)
@@ -104,8 +138,6 @@ def prepare_load(self, data: dict[str, Any]) -> dict[str, Any]:
104138
if expires_at is not None and isinstance(expires_at, datetime) and (expires_at.tzinfo is None):
105139
data["expires_at"] = expires_at.astimezone(tz=timezone.utc)
106140

107-
return data
108-
109141

110142
class DuckDBStore(BaseContextManagerStore, BaseStore):
111143
"""A DuckDB-based key-value store supporting both in-memory and persistent storage.
@@ -340,8 +372,8 @@ def _put_managed_entry(self, *, key: str, collection: str, managed_entry: Manage
340372
msg = "Cannot operate on closed DuckDBStore"
341373
raise RuntimeError(msg)
342374

343-
# Use adapter to dump the managed entry to a dict
344-
document = self._adapter.dump_dict(entry=managed_entry, exclude_none=False)
375+
# Use adapter to dump the managed entry to a dict with key and collection
376+
document = self._adapter.dump_dict(entry=managed_entry, exclude_none=False, key=key, collection=collection)
345377

346378
# Insert or replace the entry with metadata in separate columns
347379
self._connection.execute(

0 commit comments

Comments
 (0)