Skip to content

Commit

Permalink
Merge pull request #107 from padelt/master
Browse files Browse the repository at this point in the history
Fix deadlock where opening database fails
  • Loading branch information
mpenkov authored Jan 30, 2021
2 parents 2959991 + 9171bc6 commit 9cc029f
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- Do not create tables when in read-only mode (PR [#128](https://github.com/RaRe-Technologies/sqlitedict/pull/128), [@hholst80](https://github.com/hholst80))
- Use tempfile.mkstemp for safer temp file creation (PR [#106](https://github.com/RaRe-Technologies/sqlitedict/pull/106), [@ergoithz](https://github.com/ergoithz))
- Fix deadlock where opening database fails (PR [#107](https://github.com/RaRe-Technologies/sqlitedict/pull/107), [@padelt](https://github.com/padelt))

## 1.7.0, 04/09/2018

Expand Down
68 changes: 56 additions & 12 deletions sqlitedict.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import sys
import tempfile
import logging
import time
import traceback

from threading import Thread
Expand All @@ -56,6 +57,9 @@ def exec_(_code_, _globs_=None, _locs_=None):
_locs_ = _globs_
exec("""exec _code_ in _globs_, _locs_""")

class TimeoutError(OSError):
pass

exec_("def reraise(tp, value, tb=None):\n"
" raise tp, value, tb\n")
else:
Expand Down Expand Up @@ -105,7 +109,7 @@ class SqliteDict(DictClass):
VALID_FLAGS = ['c', 'r', 'w', 'n']

def __init__(self, filename=None, tablename='unnamed', flag='c',
autocommit=False, journal_mode="DELETE", encode=encode, decode=decode):
autocommit=False, journal_mode="DELETE", encode=encode, decode=decode, timeout=5):
"""
Initialize a thread-safe sqlite-backed dictionary. The dictionary will
be a table `tablename` in database file `filename`. A single file (=database)
Expand Down Expand Up @@ -136,6 +140,8 @@ def __init__(self, filename=None, tablename='unnamed', flag='c',
object.
The default is to use pickle.
The `timeout` defines the maximum time (in seconds) to wait for initial Thread startup.
"""
self.in_temp = filename is None
if self.in_temp:
Expand Down Expand Up @@ -165,6 +171,7 @@ def __init__(self, filename=None, tablename='unnamed', flag='c',
self.journal_mode = journal_mode
self.encode = encode
self.decode = decode
self.timeout = timeout

logger.info("opening Sqlite table %r in %r" % (tablename, filename))
self.conn = self._new_conn()
Expand All @@ -180,7 +187,8 @@ def __init__(self, filename=None, tablename='unnamed', flag='c',
self.clear()

def _new_conn(self):
return SqliteMultithread(self.filename, autocommit=self.autocommit, journal_mode=self.journal_mode)
return SqliteMultithread(self.filename, autocommit=self.autocommit, journal_mode=self.journal_mode,
timeout=self.timeout)

def __enter__(self):
if not hasattr(self, 'conn') or self.conn is None:
Expand Down Expand Up @@ -381,7 +389,7 @@ class SqliteMultithread(Thread):
in a separate thread (in the same order they arrived).
"""
def __init__(self, filename, autocommit, journal_mode):
def __init__(self, filename, autocommit, journal_mode, timeout):
super(SqliteMultithread, self).__init__()
self.filename = filename
self.autocommit = autocommit
Expand All @@ -390,19 +398,34 @@ def __init__(self, filename, autocommit, journal_mode):
self.reqs = Queue()
self.setDaemon(True) # python2.5-compatible
self.exception = None
self._sqlitedict_thread_initialized = None
self.timeout = timeout
self.log = logging.getLogger('sqlitedict.SqliteMultithread')
self.start()

def run(self):
if self.autocommit:
conn = sqlite3.connect(self.filename, isolation_level=None, check_same_thread=False)
else:
conn = sqlite3.connect(self.filename, check_same_thread=False)
conn.execute('PRAGMA journal_mode = %s' % self.journal_mode)
conn.text_factory = str
cursor = conn.cursor()
conn.commit()
cursor.execute('PRAGMA synchronous=OFF')
try:
if self.autocommit:
conn = sqlite3.connect(self.filename, isolation_level=None, check_same_thread=False)
else:
conn = sqlite3.connect(self.filename, check_same_thread=False)
except Exception:
self.log.exception("Failed to initialize connection for filename: %s" % self.filename)
self.exception = sys.exc_info()
raise

try:
conn.execute('PRAGMA journal_mode = %s' % self.journal_mode)
conn.text_factory = str
cursor = conn.cursor()
conn.commit()
cursor.execute('PRAGMA synchronous=OFF')
except Exception:
self.log.exception("Failed to execute PRAGMA statements.")
self.exception = sys.exc_info()
raise

self._sqlitedict_thread_initialized = True

res = None
while True:
Expand Down Expand Up @@ -488,6 +511,7 @@ def execute(self, req, arg=None, res=None):
"""
`execute` calls are non-blocking: just queue up the request and return immediately.
"""
self._wait_for_initialization()
self.check_raise_error()

# NOTE: This might be a lot of information to pump into an input
Expand Down Expand Up @@ -552,6 +576,26 @@ def close(self, force=False):
self.select_one('--close--')
self.join()

def _wait_for_initialization(self):
"""
Polls the 'initialized' flag to be set by the started Thread in run().
"""
# A race condition may occur without waiting for initialization:
# __init__() finishes with the start() call, but the Thread needs some time to actually start working.
# If opening the database file fails in run(), an exception will occur and self.exception will be set.
# But if we run check_raise_error() before run() had a chance to set self.exception, it will report
# a false negative: An exception occured and the thread terminates but self.exception is unset.
# This leads to a deadlock while waiting for the results of execute().
# By waiting for the Thread to set the initialized flag, we can ensure the thread has successfully
# opened the file - and possibly set self.exception to be detected by check_raise_error().

start_time = time.time()
while time.time() - start_time < self.timeout:
if self._sqlitedict_thread_initialized or self.exception:
return
time.sleep(0.1)
raise TimeoutError("SqliteMultithread failed to flag initialization withing %0.0f seconds." % self.timeout)


if __name__ == '__main__':
print(__version__)

0 comments on commit 9cc029f

Please sign in to comment.