33from datetime import datetime
44from typing import Any , overload
55
6- from elastic_transport import ObjectApiResponse
7- from elastic_transport import SerializationError as ElasticsearchSerializationError
86from key_value .shared .errors import DeserializationError , SerializationError
97from key_value .shared .utils .managed_entry import ManagedEntry
10- from key_value .shared .utils .sanitization import AlwaysHashStrategy , HashFragmentMode , HybridSanitizationStrategy
8+ from key_value .shared .utils .sanitization import (
9+ AlwaysHashStrategy ,
10+ HashFragmentMode ,
11+ HybridSanitizationStrategy ,
12+ SanitizationStrategy ,
13+ )
1114from key_value .shared .utils .sanitize import (
1215 ALPHANUMERIC_CHARACTERS ,
1316 LOWERCASE_ALPHABET ,
2932from key_value .aio .stores .elasticsearch .utils import LessCapableJsonSerializer , LessCapableNdjsonSerializer , new_bulk_action
3033
3134try :
35+ from elastic_transport import ObjectApiResponse
36+ from elastic_transport import SerializationError as ElasticsearchSerializationError
3237 from elasticsearch import AsyncElasticsearch
38+ from elasticsearch .exceptions import BadRequestError
3339
3440 from key_value .aio .stores .elasticsearch .utils import (
3541 get_aggregations_from_body ,
6369 },
6470 "value" : {
6571 "properties" : {
66- # You might think the `string` field should be a text/keyword field
67- # but this is the recommended mapping for large stringified json
68- "string" : {
69- "type" : "object" ,
70- "enabled" : False ,
71- },
7272 "flattened" : {
7373 "type" : "flattened" ,
7474 },
8383MAX_KEY_LENGTH = 256
8484ALLOWED_KEY_CHARACTERS : str = ALPHANUMERIC_CHARACTERS
8585
86- MAX_INDEX_LENGTH = 240
86+ MAX_INDEX_LENGTH = 200
8787ALLOWED_INDEX_CHARACTERS : str = LOWERCASE_ALPHABET + NUMBERS + "_" + "-" + "."
8888
8989
9090class ElasticsearchSerializationAdapter (SerializationAdapter ):
91- """Adapter for Elasticsearch with support for native and string storage modes."""
92-
93- _native_storage : bool
94-
95- def __init__ (self , * , native_storage : bool = True ) -> None :
96- """Initialize the Elasticsearch adapter.
91+ """Adapter for Elasticsearch."""
9792
98- Args:
99- native_storage: If True (default), store values as flattened dicts.
100- If False, store values as JSON strings.
101- """
93+ def __init__ (self ) -> None :
94+ """Initialize the Elasticsearch adapter"""
10295 super ().__init__ ()
10396
104- self ._native_storage = native_storage
10597 self ._date_format = "isoformat"
106- self ._value_format = "dict" if native_storage else "string"
98+ self ._value_format = "dict"
10799
108100 @override
109101 def prepare_dump (self , data : dict [str , Any ]) -> dict [str , Any ]:
110102 value = data .pop ("value" )
111103
112- data ["value" ] = {}
113-
114- if self ._native_storage :
115- data ["value" ]["flattened" ] = value
116- else :
117- data ["value" ]["string" ] = value
104+ data ["value" ] = {
105+ "flattened" : value ,
106+ }
118107
119108 return data
120109
121110 @override
122111 def prepare_load (self , data : dict [str , Any ]) -> dict [str , Any ]:
123- value = data .pop ("value" )
124-
125- if "flattened" in value :
126- data ["value" ] = value ["flattened" ]
127- elif "string" in value :
128- data ["value" ] = value ["string" ]
129- else :
130- msg = "Value field not found in Elasticsearch document"
131- raise DeserializationError (message = msg )
112+ data ["value" ] = data .pop ("value" ).get ("flattened" )
132113
133114 return data
134115
135116
117+ class ElasticsearchV1KeySanitizationStrategy (AlwaysHashStrategy ):
118+ def __init__ (self ) -> None :
119+ super ().__init__ (
120+ hash_length = 64 ,
121+ )
122+
123+
124+ class ElasticsearchV1CollectionSanitizationStrategy (HybridSanitizationStrategy ):
125+ def __init__ (self ) -> None :
126+ super ().__init__ (
127+ replacement_character = "_" ,
128+ max_length = MAX_INDEX_LENGTH ,
129+ allowed_characters = UPPERCASE_ALPHABET + ALLOWED_INDEX_CHARACTERS ,
130+ hash_fragment_mode = HashFragmentMode .ALWAYS ,
131+ )
132+
133+
136134class ElasticsearchStore (
137135 BaseEnumerateCollectionsStore , BaseEnumerateKeysStore , BaseDestroyCollectionStore , BaseCullStore , BaseContextManagerStore , BaseStore
138136):
139- """A elasticsearch-based store."""
137+ """An Elasticsearch-based store.
138+
139+ Stores collections in their own indices and stores values in Flattened fields.
140+
141+ This store has specific restrictions on what is allowed in keys and collections. Keys and collections are not sanitized
142+ by default which may result in errors when using the store.
143+
144+ To avoid issues, you may want to consider leveraging the `ElasticsearchV1KeySanitizationStrategy` and
145+ `ElasticsearchV1CollectionSanitizationStrategy` strategies.
146+ """
140147
141148 _client : AsyncElasticsearch
142149
143150 _is_serverless : bool
144151
145152 _index_prefix : str
146153
147- _native_storage : bool
154+ _default_collection : str | None
148155
149156 _serializer : SerializationAdapter
150157
158+ _key_sanitization_strategy : SanitizationStrategy
159+ _collection_sanitization_strategy : SanitizationStrategy
160+
151161 @overload
152162 def __init__ (
153163 self ,
154164 * ,
155165 elasticsearch_client : AsyncElasticsearch ,
156166 index_prefix : str ,
157- native_storage : bool = True ,
158167 default_collection : str | None = None ,
159- ) -> None : ...
168+ key_sanitization_strategy : SanitizationStrategy | None = None ,
169+ collection_sanitization_strategy : SanitizationStrategy | None = None ,
170+ ) -> None :
171+ """Initialize the elasticsearch store.
172+
173+ Args:
174+ elasticsearch_client: The elasticsearch client to use.
175+ index_prefix: The index prefix to use. Collections will be prefixed with this prefix.
176+ default_collection: The default collection to use if no collection is provided.
177+ key_sanitization_strategy: The sanitization strategy to use for keys.
178+ collection_sanitization_strategy: The sanitization strategy to use for collections.
179+ """
160180
161181 @overload
162182 def __init__ (
@@ -165,9 +185,18 @@ def __init__(
165185 url : str ,
166186 api_key : str | None = None ,
167187 index_prefix : str ,
168- native_storage : bool = True ,
169188 default_collection : str | None = None ,
170- ) -> None : ...
189+ key_sanitization_strategy : SanitizationStrategy | None = None ,
190+ collection_sanitization_strategy : SanitizationStrategy | None = None ,
191+ ) -> None :
192+ """Initialize the elasticsearch store.
193+
194+ Args:
195+ url: The url of the elasticsearch cluster.
196+ api_key: The api key to use.
197+ index_prefix: The index prefix to use. Collections will be prefixed with this prefix.
198+ default_collection: The default collection to use if no collection is provided.
199+ """
171200
172201 def __init__ (
173202 self ,
@@ -176,8 +205,9 @@ def __init__(
176205 url : str | None = None ,
177206 api_key : str | None = None ,
178207 index_prefix : str ,
179- native_storage : bool = True ,
180208 default_collection : str | None = None ,
209+ key_sanitization_strategy : SanitizationStrategy | None = None ,
210+ collection_sanitization_strategy : SanitizationStrategy | None = None ,
181211 ) -> None :
182212 """Initialize the elasticsearch store.
183213
@@ -186,9 +216,9 @@ def __init__(
186216 url: The url of the elasticsearch cluster.
187217 api_key: The api key to use.
188218 index_prefix: The index prefix to use. Collections will be prefixed with this prefix.
189- native_storage: Whether to use native storage mode (flattened field type) or serialize
190- all values to JSON strings. Defaults to True.
191219 default_collection: The default collection to use if no collection is provided.
220+ key_sanitization_strategy: The sanitization strategy to use for keys.
221+ collection_sanitization_strategy: The sanitization strategy to use for collections.
192222 """
193223 if elasticsearch_client is None and url is None :
194224 msg = "Either elasticsearch_client or url must be provided"
@@ -209,29 +239,14 @@ def __init__(
209239 LessCapableNdjsonSerializer .install_serializer (client = self ._client )
210240
211241 self ._index_prefix = index_prefix .lower ()
212- self ._native_storage = native_storage
213242 self ._is_serverless = False
214243
215- # We have 240 characters to work with
216- # We need to account for the index prefix and the hyphen.
217- max_index_length = MAX_INDEX_LENGTH - (len (self ._index_prefix ) + 1 )
218-
219- self ._serializer = ElasticsearchSerializationAdapter (native_storage = native_storage )
220-
221- # We allow uppercase through the sanitizer so we can lowercase them instead of them
222- # all turning into underscores.
223- collection_sanitization = HybridSanitizationStrategy (
224- replacement_character = "_" ,
225- max_length = max_index_length ,
226- allowed_characters = UPPERCASE_ALPHABET + ALLOWED_INDEX_CHARACTERS ,
227- hash_fragment_mode = HashFragmentMode .ALWAYS ,
228- )
229- key_sanitization = AlwaysHashStrategy ()
244+ self ._serializer = ElasticsearchSerializationAdapter ()
230245
231246 super ().__init__ (
232247 default_collection = default_collection ,
233- collection_sanitization_strategy = collection_sanitization ,
234- key_sanitization_strategy = key_sanitization ,
248+ collection_sanitization_strategy = collection_sanitization_strategy ,
249+ key_sanitization_strategy = key_sanitization_strategy ,
235250 )
236251
237252 @override
@@ -247,7 +262,12 @@ async def _setup_collection(self, *, collection: str) -> None:
247262 if await self ._client .options (ignore_status = 404 ).indices .exists (index = index_name ):
248263 return
249264
250- _ = await self ._client .options (ignore_status = 404 ).indices .create (index = index_name , mappings = DEFAULT_MAPPING , settings = {})
265+ try :
266+ _ = await self ._client .options (ignore_status = 404 ).indices .create (index = index_name , mappings = DEFAULT_MAPPING , settings = {})
267+ except BadRequestError as e :
268+ if "index_already_exists_exception" in str (e ).lower ():
269+ return
270+ raise
251271
252272 def _get_index_name (self , collection : str ) -> str :
253273 return self ._index_prefix + "-" + self ._sanitize_collection (collection = collection ).lower ()
0 commit comments