Skip to content

Commit

Permalink
Fix deadlock where opening database fails without setting self.except…
Browse files Browse the repository at this point in the history
…ion; avoid deadlock after race condition where command is enqueued before thread can signal an exception piskvorky#90
  • Loading branch information
Philipp Adelt committed Feb 21, 2020
1 parent f5c57f1 commit 7b06f85
Showing 1 changed file with 54 additions and 12 deletions.
66 changes: 54 additions & 12 deletions sqlitedict.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import tempfile
import random
import logging
import time
import traceback

from threading import Thread
Expand Down Expand Up @@ -109,7 +110,7 @@ class SqliteDict(DictClass):
VALID_FLAGS = ['c', 'r', 'w', 'n']

def __init__(self, filename=None, tablename='unnamed', flag='c',
autocommit=False, journal_mode="DELETE", encode=encode, decode=decode):
autocommit=False, journal_mode="DELETE", encode=encode, decode=decode, timeout=5):
"""
Initialize a thread-safe sqlite-backed dictionary. The dictionary will
be a table `tablename` in database file `filename`. A single file (=database)
Expand Down Expand Up @@ -140,6 +141,8 @@ def __init__(self, filename=None, tablename='unnamed', flag='c',
object.
The default is to use pickle.
The `timeout` defines the maximum time (in seconds) to wait for initial Thread startup.
"""
self.in_temp = filename is None
if self.in_temp:
Expand Down Expand Up @@ -167,6 +170,7 @@ def __init__(self, filename=None, tablename='unnamed', flag='c',
self.journal_mode = journal_mode
self.encode = encode
self.decode = decode
self.timeout = timeout

logger.info("opening Sqlite table %r in %s" % (tablename, filename))
MAKE_TABLE = 'CREATE TABLE IF NOT EXISTS "%s" (key TEXT PRIMARY KEY, value BLOB)' % self.tablename
Expand All @@ -177,7 +181,8 @@ def __init__(self, filename=None, tablename='unnamed', flag='c',
self.clear()

def _new_conn(self):
return SqliteMultithread(self.filename, autocommit=self.autocommit, journal_mode=self.journal_mode)
return SqliteMultithread(self.filename, autocommit=self.autocommit, journal_mode=self.journal_mode,
timeout=self.timeout)

def __enter__(self):
if not hasattr(self, 'conn') or self.conn is None:
Expand Down Expand Up @@ -377,7 +382,7 @@ class SqliteMultithread(Thread):
in a separate thread (in the same order they arrived).
"""
def __init__(self, filename, autocommit, journal_mode):
def __init__(self, filename, autocommit, journal_mode, timeout):
super(SqliteMultithread, self).__init__()
self.filename = filename
self.autocommit = autocommit
Expand All @@ -386,19 +391,34 @@ def __init__(self, filename, autocommit, journal_mode):
self.reqs = Queue()
self.setDaemon(True) # python2.5-compatible
self.exception = None
self.initialized = None
self.timeout = timeout
self.log = logging.getLogger('sqlitedict.SqliteMultithread')
self.start()

def run(self):
if self.autocommit:
conn = sqlite3.connect(self.filename, isolation_level=None, check_same_thread=False)
else:
conn = sqlite3.connect(self.filename, check_same_thread=False)
conn.execute('PRAGMA journal_mode = %s' % self.journal_mode)
conn.text_factory = str
cursor = conn.cursor()
conn.commit()
cursor.execute('PRAGMA synchronous=OFF')
try:
if self.autocommit:
conn = sqlite3.connect(self.filename, isolation_level=None, check_same_thread=False)
else:
conn = sqlite3.connect(self.filename, check_same_thread=False)
except Exception as ex:
self.log.exception("Failed to initialize connection for filename: %s" % self.filename)
self.exception = (e_type, e_value, e_tb) = sys.exc_info()
raise

try:
conn.execute('PRAGMA journal_mode = %s' % self.journal_mode)
conn.text_factory = str
cursor = conn.cursor()
conn.commit()
cursor.execute('PRAGMA synchronous=OFF')
except Exception as ex:
self.log.exception("Failed to execute PRAGMA statements.")
self.exception = (e_type, e_value, e_tb) = sys.exc_info()
raise

self.initialized = True

res = None
while True:
Expand Down Expand Up @@ -484,6 +504,7 @@ def execute(self, req, arg=None, res=None):
"""
`execute` calls are non-blocking: just queue up the request and return immediately.
"""
self.wait_for_initialization()
self.check_raise_error()

# NOTE: This might be a lot of information to pump into an input
Expand Down Expand Up @@ -547,4 +568,25 @@ def close(self, force=False):
# returning (by semaphore '--no more--'
self.select_one('--close--')
self.join()

def wait_for_initialization(self):
"""
Polls the 'initialized' flag to be set by the started Thread in run().
"""
# A race condition may occur without waiting for initialization:
# __init__() finishes with the start() call, but the Thread needs some time to actually start working.
# If opening the database file fails in run(), an exception will occur and self.exception will be set.
# But if we run check_raise_error() before run() had a chance to set self.exception, it will report
# a false negative: An exception occured and the thread terminates but self.exception is unset.
# This leads to a deadlock while waiting for the results of execute().
# By waiting for the Thread to set the initialized flag, we can ensure the thread has successfully
# opened the file - and possibly set self.exception to be detected by check_raise_error().

start_time = time.time()
while time.time() - start_time < self.timeout:
if self.initialized or self.exception:
return
time.sleep(0.1)
raise TimeoutError("SqliteMultithread failed to flag initialization withing %0.0f seconds." % self.timeout)

#endclass SqliteMultithread

0 comments on commit 7b06f85

Please sign in to comment.