66import logging
77import pathlib
88import time
9+ import tempfile
910from collections import OrderedDict
1011from typing import TypeVar , Tuple , Callable , Dict , Any , Optional , Union , Set , List
1112
3637
3738
3839# TODO: set up in the use temporary space
39- _store : Store = LocalFileStore ( "/tmp/dds/internal/" , "/tmp/dds/data/" )
40+ _store_var : Optional [ Store ] = None
4041_eval_ctx : Optional [EvalContext ] = None
4142
4243
@@ -64,11 +65,11 @@ def eval(
6465
6566def load (path : Union [str , DDSPath , pathlib .Path ]) -> Any :
6667 path_ = DDSPathUtils .create (path )
67- key = _store .fetch_paths ([path_ ]).get (path_ )
68+ key = _store () .fetch_paths ([path_ ]).get (path_ )
6869 if key is None :
69- raise DDSException (f"The store { _store } did not return path { path_ } " )
70+ raise DDSException (f"The store { _store () } did not return path { path_ } " )
7071 else :
71- return _store .fetch_blob (key )
72+ return _store () .fetch_blob (key )
7273
7374
7475def set_store (
@@ -84,21 +85,23 @@ def set_store(
8485
8586 store: either a store, or 'local' or 'dbfs'
8687 """
87- global _store
88+ global _store_var
8889 if isinstance (store , Store ):
8990 if cache_objects is not None :
9091 raise DDSException (
9192 f"Cannot provide a caching option and a store object of type 'Store' at the same time"
9293 )
9394 # Directly setting the store
94- _store = store
95+ _store_var = store
9596 return
9697 elif store == "local" :
9798 if not internal_dir :
98- internal_dir = "/tmp"
99+ internal_dir = str (
100+ pathlib .Path (tempfile .gettempdir ()).joinpath ("dds" , "store" )
101+ )
99102 if not data_dir :
100- data_dir = "/tmp/ data"
101- _store = LocalFileStore (internal_dir , data_dir )
103+ data_dir = str ( pathlib . Path ( tempfile . gettempdir ()). joinpath ( "dds" , " data"))
104+ _store_var = LocalFileStore (internal_dir , data_dir )
102105 elif store == "dbfs" :
103106 if data_dir is None :
104107 raise DDSException ("Missing data_dir argument" )
@@ -115,7 +118,7 @@ def set_store(
115118 commit_type = str (commit_type or CommitType .FULL .name ).upper ()
116119 commit_type_ = CommitType [commit_type ]
117120
118- _store = DBFSStore (
121+ _store_var = DBFSStore (
119122 DBFSURI .parse (internal_dir ), DBFSURI .parse (data_dir ), dbutils , commit_type_
120123 )
121124 else :
@@ -136,8 +139,8 @@ def set_store(
136139 elif cache_objects > 0 :
137140 num_objects = cache_objects
138141 if num_objects is not None :
139- _store = LRUCacheStore (_store , num_elem = num_objects )
140- _logger .debug (f"Setting the store to { _store } " )
142+ _store_var = LRUCacheStore (_store () , num_elem = num_objects )
143+ _logger .debug (f"Setting the store to { _store () } " )
141144
142145
143146def _parse_stages (
@@ -196,9 +199,9 @@ def _eval(
196199 )
197200 key = None if path is None else _eval_ctx .requested_paths [path ]
198201 t = _time ()
199- if key is not None and _store .has_blob (key ):
202+ if key is not None and _store () .has_blob (key ):
200203 _logger .debug (f"_eval:Return cached { path } from { key } " )
201- blob = _store .fetch_blob (key )
204+ blob = _store () .fetch_blob (key )
202205 _add_delta (t , ProcessingStage .STORE_COMMIT )
203206 return blob
204207 else :
@@ -217,11 +220,27 @@ def _eval(
217220 if key is not None :
218221 _logger .info (f"_eval:Storing blob into key { key } " )
219222 t = _time ()
220- _store .store_blob (key , res , codec = None )
223+ _store () .store_blob (key , res , codec = None )
221224 _add_delta (t , ProcessingStage .STORE_COMMIT )
222225 return res
223226
224227
228+ def _store () -> Store :
229+ """
230+ Gets the current store (or initializes it to the local default store if necessary)
231+ """
232+ global _store_var
233+ if _store_var is None :
234+ p = pathlib .Path (tempfile .gettempdir ()).joinpath ("dds" )
235+ store_path = p .joinpath ("store" )
236+ data_path = p .joinpath ("data" )
237+ _logger .info (
238+ f"Initializing default store. store dir: { store_path } data dir: { data_path } "
239+ )
240+ _store_var = LocalFileStore (str (store_path ), str (data_path ))
241+ return _store_var
242+
243+
225244def _time () -> float :
226245 return time .monotonic ()
227246
@@ -272,7 +291,7 @@ def _eval_new_ctx(
272291 _logger .debug (
273292 f"_eval_new_ctx: need to resolve indirect references: { loads_to_check } "
274293 )
275- resolved_indirect_refs = _store .fetch_paths (loads_to_check )
294+ resolved_indirect_refs = _store () .fetch_paths (loads_to_check )
276295 _logger .debug (
277296 f"_eval_new_ctx: fetched indirect references: { resolved_indirect_refs } "
278297 )
@@ -296,7 +315,7 @@ def _eval_new_ctx(
296315 present_blobs : Optional [Set [PyHash ]]
297316 if extra_debug :
298317 present_blobs = set (
299- [key for key in set (store_paths .values ()) if _store .has_blob (key )]
318+ [key for key in set (store_paths .values ()) if _store () .has_blob (key )]
300319 )
301320 _logger .debug (f"_eval_new_ctx: { len (present_blobs )} present blobs" )
302321 else :
@@ -327,9 +346,9 @@ def _eval_new_ctx(
327346 current_sig = inters .fun_return_sig
328347 _logger .debug (f"_eval_new_ctx:current_sig: { current_sig } " )
329348 t = _time ()
330- if _store .has_blob (current_sig ):
349+ if _store () .has_blob (current_sig ):
331350 _logger .debug (f"_eval_new_ctx:Return cached signature { current_sig } " )
332- res = _store .fetch_blob (current_sig )
351+ res = _store () .fetch_blob (current_sig )
333352 _add_delta (t , ProcessingStage .STORE_COMMIT )
334353 else :
335354 arg_repr = [str (type (arg )) for arg in args ]
@@ -349,13 +368,13 @@ def _eval_new_ctx(
349368 # TODO: add a phase for storing the blobs
350369 _logger .info (f"_eval:Storing blob into key { obj_key } " )
351370 t = _time ()
352- _store .store_blob (obj_key , res , codec = None )
371+ _store () .store_blob (obj_key , res , codec = None )
353372 _add_delta (t , ProcessingStage .STORE_COMMIT )
354373
355374 if ProcessingStage .PATH_COMMIT in stages :
356375 _logger .debug (f"Starting stage { ProcessingStage .PATH_COMMIT } " )
357376 t = _time ()
358- _store .sync_paths (store_paths )
377+ _store () .sync_paths (store_paths )
359378 _add_delta (t , ProcessingStage .PATH_COMMIT )
360379 _logger .debug (f"Stage { ProcessingStage .PATH_COMMIT } done" )
361380 else :
0 commit comments