Skip to content

Commit 5c3ec02

Browse files
feat: add native storage support to DuckDB store with JSON column option
- Add use_json_column parameter (defaults to True) for native JSON vs TEXT storage - Update schema to use native DuckDB types (JSON, TIMESTAMP) for better queryability - Store value data separately from metadata to eliminate duplication - Metadata (created_at, ttl, expires_at) in native columns for efficient SQL queries - Add comprehensive tests for both JSON and TEXT modes - Add test_native_sql_queryability() demonstrating direct SQL queries on stored data - Fix timezone handling to ensure consistency with ManagedEntry expectations - All 411 tests passing (404 passed, 7 skipped) This enables users to query the DuckDB database directly with SQL for analytics and data exploration while maintaining full compatibility with the key-value API. Co-authored-by: William Easton <strawgate@users.noreply.github.com>
1 parent 8458d68 commit 5c3ec02

File tree

2 files changed

+176
-24
lines changed

2 files changed

+176
-24
lines changed

key-value/key-value-aio/src/key_value/aio/stores/duckdb/store.py

Lines changed: 91 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ class DuckDBStore(BaseContextManagerStore, BaseStore):
2020
for analytical workloads while supporting standard SQL operations. This store
2121
can operate in memory-only mode or persist data to disk.
2222
23+
The store uses native DuckDB types (JSON, TIMESTAMP) to enable efficient SQL queries
24+
on stored data. Users can query the database directly for analytics or data exploration.
25+
2326
Note on connection ownership: When you provide an existing connection, the store
2427
will take ownership and close it when the store is closed or garbage collected.
2528
If you need to reuse a connection, create separate DuckDB connections for each store.
@@ -28,6 +31,7 @@ class DuckDBStore(BaseContextManagerStore, BaseStore):
2831
_connection: duckdb.DuckDBPyConnection
2932
_is_closed: bool
3033
_owns_connection: bool
34+
_use_json_column: bool
3135

3236
@overload
3337
def __init__(
@@ -36,6 +40,7 @@ def __init__(
3640
connection: duckdb.DuckDBPyConnection,
3741
default_collection: str | None = None,
3842
seed: SEED_DATA_TYPE | None = None,
43+
use_json_column: bool = True,
3944
) -> None:
4045
"""Initialize the DuckDB store with an existing connection.
4146
@@ -47,6 +52,8 @@ def __init__(
4752
connection: An existing DuckDB connection to use.
4853
default_collection: The default collection to use if no collection is provided.
4954
seed: Optional seed data to pre-populate the store.
55+
use_json_column: If True, use native JSON column type; if False, use TEXT.
56+
Default is True for better queryability and native type support.
5057
"""
5158

5259
@overload
@@ -56,13 +63,16 @@ def __init__(
5663
database_path: Path | str | None = None,
5764
default_collection: str | None = None,
5865
seed: SEED_DATA_TYPE | None = None,
66+
use_json_column: bool = True,
5967
) -> None:
6068
"""Initialize the DuckDB store with a database path.
6169
6270
Args:
6371
database_path: Path to the database file. If None or ':memory:', uses in-memory database.
6472
default_collection: The default collection to use if no collection is provided.
6573
seed: Optional seed data to pre-populate the store.
74+
use_json_column: If True, use native JSON column type; if False, use TEXT.
75+
Default is True for better queryability and native type support.
6676
"""
6777

6878
def __init__(
@@ -72,6 +82,7 @@ def __init__(
7282
database_path: Path | str | None = None,
7383
default_collection: str | None = None,
7484
seed: SEED_DATA_TYPE | None = None,
85+
use_json_column: bool = True,
7586
) -> None:
7687
"""Initialize the DuckDB store.
7788
@@ -80,6 +91,8 @@ def __init__(
8091
database_path: Path to the database file. If None or ':memory:', uses in-memory database.
8192
default_collection: The default collection to use if no collection is provided.
8293
seed: Optional seed data to pre-populate the store.
94+
use_json_column: If True, use native JSON column type; if False, use TEXT.
95+
Default is True for better queryability and native type support.
8396
"""
8497
if connection is not None and database_path is not None:
8598
msg = "Provide only one of connection or database_path"
@@ -101,6 +114,7 @@ def __init__(
101114
self._owns_connection = True
102115

103116
self._is_closed = False
117+
self._use_json_column = use_json_column
104118
self._stable_api = False
105119

106120
super().__init__(default_collection=default_collection, seed=seed)
@@ -109,25 +123,30 @@ def __init__(
109123
async def _setup(self) -> None:
110124
"""Initialize the database schema for key-value storage.
111125
112-
Note: The schema stores created_at, ttl, and expires_at as separate columns
113-
in addition to the serialized ManagedEntry in value_json. This duplication
114-
is intentional for future features:
115-
- The expires_at column with its index enables efficient expiration-based
116-
cleanup queries (e.g., DELETE FROM kv_entries WHERE expires_at < now())
117-
- The separate columns allow for metadata queries without deserializing JSON
118-
- Currently, only value_json is read during _get_managed_entry
119-
120-
This design trades storage space for query flexibility and future extensibility.
126+
The schema uses native DuckDB types for efficient querying:
127+
- value: JSON or TEXT column storing the actual value data (not full ManagedEntry)
128+
- created_at: TIMESTAMP for native datetime operations
129+
- ttl: DOUBLE for time-to-live in seconds
130+
- expires_at: TIMESTAMP for native expiration queries
131+
132+
This design enables:
133+
- Direct SQL queries on the database for analytics
134+
- Efficient expiration cleanup: DELETE FROM kv_entries WHERE expires_at < now()
135+
- Metadata queries without JSON deserialization
136+
- No data duplication (metadata in columns, value in JSON/TEXT)
121137
"""
138+
# Determine column type based on use_json_column setting
139+
value_column_type = "JSON" if self._use_json_column else "TEXT"
140+
122141
# Create the main table for storing key-value entries
123-
self._connection.execute("""
142+
self._connection.execute(f"""
124143
CREATE TABLE IF NOT EXISTS kv_entries (
125144
collection VARCHAR NOT NULL,
126145
key VARCHAR NOT NULL,
127-
value_json TEXT NOT NULL,
128-
created_at DOUBLE,
146+
value {value_column_type} NOT NULL,
147+
created_at TIMESTAMP,
129148
ttl DOUBLE,
130-
expires_at DOUBLE,
149+
expires_at TIMESTAMP,
131150
PRIMARY KEY (collection, key)
132151
)
133152
""")
@@ -138,29 +157,58 @@ async def _setup(self) -> None:
138157
ON kv_entries(collection)
139158
""")
140159

141-
# Create index for expiration-based queries (for future cleanup features)
160+
# Create index for expiration-based queries
142161
self._connection.execute("""
143162
CREATE INDEX IF NOT EXISTS idx_kv_expires_at
144163
ON kv_entries(expires_at)
145164
""")
146165

147166
@override
148167
async def _get_managed_entry(self, *, key: str, collection: str) -> ManagedEntry | None:
149-
"""Retrieve a managed entry by key from the specified collection."""
168+
"""Retrieve a managed entry by key from the specified collection.
169+
170+
Reconstructs the ManagedEntry from the value column and metadata columns.
171+
The value column contains only the value data (not the full ManagedEntry),
172+
and metadata (created_at, ttl, expires_at) is stored in separate columns.
173+
"""
150174
if self._is_closed:
151175
msg = "Cannot operate on closed DuckDBStore"
152176
raise RuntimeError(msg)
153177

154178
result = self._connection.execute(
155-
"SELECT value_json FROM kv_entries WHERE collection = ? AND key = ?",
179+
"SELECT value, created_at, ttl, expires_at FROM kv_entries WHERE collection = ? AND key = ?",
156180
[collection, key],
157181
).fetchone()
158182

159183
if result is None:
160184
return None
161185

162-
value_json = result[0]
163-
return ManagedEntry.from_json(json_str=value_json)
186+
value_data, created_at, ttl, expires_at = result
187+
188+
# Convert value from JSON/TEXT to dict
189+
# If it's already a dict (from JSON column), use it; otherwise parse from string
190+
if isinstance(value_data, str):
191+
import json
192+
value = json.loads(value_data)
193+
else:
194+
value = value_data
195+
196+
# DuckDB always returns naive timestamps, but ManagedEntry expects timezone-aware ones
197+
# Convert to timezone-aware UTC timestamps
198+
from datetime import timezone
199+
200+
if created_at is not None and created_at.tzinfo is None:
201+
created_at = created_at.replace(tzinfo=timezone.utc)
202+
if expires_at is not None and expires_at.tzinfo is None:
203+
expires_at = expires_at.replace(tzinfo=timezone.utc)
204+
205+
# Reconstruct ManagedEntry with metadata from columns
206+
return ManagedEntry(
207+
value=value,
208+
created_at=created_at,
209+
ttl=ttl,
210+
expires_at=expires_at,
211+
)
164212

165213
@override
166214
async def _put_managed_entry(
@@ -170,25 +218,44 @@ async def _put_managed_entry(
170218
collection: str,
171219
managed_entry: ManagedEntry,
172220
) -> None:
173-
"""Store a managed entry by key in the specified collection."""
221+
"""Store a managed entry by key in the specified collection.
222+
223+
Stores the value and metadata separately:
224+
- value: JSON string of just the value data (not full ManagedEntry)
225+
- created_at, ttl, expires_at: Stored in native columns for efficient querying
226+
"""
174227
if self._is_closed:
175228
msg = "Cannot operate on closed DuckDBStore"
176229
raise RuntimeError(msg)
177230

178-
# Insert or replace the entry
231+
# Get just the value as JSON (not the full ManagedEntry)
232+
value_json = managed_entry.value_as_json
233+
234+
# Ensure timestamps are timezone-aware (convert naive to UTC if needed)
235+
from datetime import timezone
236+
237+
created_at = managed_entry.created_at
238+
if created_at is not None and created_at.tzinfo is None:
239+
created_at = created_at.replace(tzinfo=timezone.utc)
240+
241+
expires_at = managed_entry.expires_at
242+
if expires_at is not None and expires_at.tzinfo is None:
243+
expires_at = expires_at.replace(tzinfo=timezone.utc)
244+
245+
# Insert or replace the entry with metadata in separate columns
179246
self._connection.execute(
180247
"""
181248
INSERT OR REPLACE INTO kv_entries
182-
(collection, key, value_json, created_at, ttl, expires_at)
249+
(collection, key, value, created_at, ttl, expires_at)
183250
VALUES (?, ?, ?, ?, ?, ?)
184251
""",
185252
[
186253
collection,
187254
key,
188-
managed_entry.to_json(),
189-
managed_entry.created_at.timestamp() if managed_entry.created_at else None,
255+
value_json,
256+
created_at,
190257
managed_entry.ttl,
191-
managed_entry.expires_at.timestamp() if managed_entry.expires_at else None,
258+
expires_at,
192259
],
193260
)
194261

key-value/key-value-aio/tests/stores/duckdb/test_duckdb.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,21 @@ async def store(self) -> AsyncGenerator[DuckDBStore, None]:
3838
async def test_not_unbounded(self, store: BaseStore): ...
3939

4040

41+
class TestDuckDBStoreTextMode(ContextManagerStoreTestMixin, BaseStoreTests):
42+
"""Test DuckDB store with TEXT column mode instead of JSON."""
43+
44+
@override
45+
@pytest.fixture
46+
async def store(self) -> AsyncGenerator[DuckDBStore, None]:
47+
"""Test with in-memory DuckDB database using TEXT column."""
48+
duckdb_store = DuckDBStore(use_json_column=False)
49+
yield duckdb_store
50+
await duckdb_store.close()
51+
52+
@pytest.mark.skip(reason="Local disk stores are unbounded")
53+
async def test_not_unbounded(self, store: BaseStore): ...
54+
55+
4156
class TestDuckDBStoreSpecific:
4257
"""Test DuckDB-specific functionality."""
4358

@@ -48,6 +63,76 @@ async def store(self) -> AsyncGenerator[DuckDBStore, None]:
4863
yield duckdb_store
4964
await duckdb_store.close()
5065

66+
async def test_native_sql_queryability(self):
67+
"""Test that users can query the database directly with SQL."""
68+
store = DuckDBStore(use_json_column=True)
69+
70+
# Store some test data with known metadata
71+
await store.put(collection="products", key="item1", value={"name": "Widget", "price": 10.99}, ttl=3600)
72+
await store.put(collection="products", key="item2", value={"name": "Gadget", "price": 25.50}, ttl=7200)
73+
await store.put(collection="orders", key="order1", value={"total": 100.00, "items": 3})
74+
75+
# Query directly via SQL to verify native storage
76+
# Check that value is stored as JSON (can extract fields)
77+
result = store._connection.execute("""
78+
SELECT key, value->'name' as name, value->'price' as price
79+
FROM kv_entries
80+
WHERE collection = 'products'
81+
ORDER BY key
82+
""").fetchall()
83+
84+
assert len(result) == 2
85+
assert result[0][0] == "item1"
86+
assert result[0][1] == '"Widget"' # JSON strings are quoted
87+
assert result[1][0] == "item2"
88+
89+
# Query by expiration timestamp
90+
result = store._connection.execute("""
91+
SELECT COUNT(*)
92+
FROM kv_entries
93+
WHERE expires_at > now() OR expires_at IS NULL
94+
""").fetchone()
95+
96+
assert result[0] == 3 # All 3 entries should not be expired
97+
98+
# Query metadata columns directly
99+
result = store._connection.execute("""
100+
SELECT key, ttl, created_at IS NOT NULL as has_created
101+
FROM kv_entries
102+
WHERE collection = 'products' AND ttl > 3600
103+
""").fetchall()
104+
105+
assert len(result) == 1 # Only item2 has ttl > 3600
106+
assert result[0][0] == "item2"
107+
assert result[0][1] == 7200
108+
assert result[0][2] is True # has_created
109+
110+
await store.close()
111+
112+
async def test_text_mode_storage(self):
113+
"""Test that TEXT mode stores value as string instead of native JSON."""
114+
store = DuckDBStore(use_json_column=False)
115+
116+
await store.put(collection="test", key="key1", value={"data": "value"})
117+
118+
# Query to check column type - in TEXT mode, value should be a string
119+
result = store._connection.execute("""
120+
SELECT value, typeof(value) as value_type
121+
FROM kv_entries
122+
WHERE collection = 'test' AND key = 'key1'
123+
""").fetchone()
124+
125+
assert result is not None
126+
value_str, value_type = result
127+
128+
# In TEXT mode, value should be stored as VARCHAR/TEXT
129+
assert value_type in ("VARCHAR", "TEXT")
130+
# Value should be a JSON string
131+
assert isinstance(value_str, str)
132+
assert "data" in value_str
133+
134+
await store.close()
135+
51136
async def test_database_path_initialization(self):
52137
"""Test that store can be initialized with different database path options."""
53138
# In-memory (default)

0 commit comments

Comments
 (0)