diff --git a/sqlitedict.py b/sqlitedict.py index 635dfde..8b20bf4 100755 --- a/sqlitedict.py +++ b/sqlitedict.py @@ -34,6 +34,7 @@ import logging import traceback from base64 import b64decode, b64encode +import weakref __version__ = '2.0.0' @@ -65,6 +66,51 @@ def reraise(tp, value, tb=None): logger = logging.getLogger(__name__) +# +# There's a thread that holds the actual SQL connection (SqliteMultithread). +# We communicate with this thread via queues (request and responses). +# The requests can either be SQL commands or one of the "special" commands +# below: +# +# _REQUEST_CLOSE: request that the SQL connection be closed +# _REQUEST_COMMIT: request that any changes be committed to the DB +# +# Responses are either SQL records (e.g. results of a SELECT) or the magic +# _RESPONSE_NO_MORE command, which indicates nothing else will ever be written +# to the response queue. +# +_REQUEST_CLOSE = '--close--' +_REQUEST_COMMIT = '--commit--' +_RESPONSE_NO_MORE = '--no more--' + +# +# We work with weak references for better memory efficiency. +# Dereferencing, checking the referent queue still exists, and putting to it +# is boring and repetitive, so we have a _put function to handle it for us. +# +_PUT_OK, _PUT_REFERENT_DESTROYED, _PUT_NOOP = 0, 1, 2 + + +def _put(queue_reference, item): + if queue_reference is not None: + queue = queue_reference() + if queue is None: + # + # We got a reference to a queue, but that queue no longer exists + # + retval = _PUT_REFERENT_DESTROYED + else: + queue.put(item) + retval = _PUT_OK + + del queue + return retval + + # + # We didn't get a reference to a queue, so do nothing (no-op). + # + return _PUT_NOOP + def open(*args, **kwargs): """See documentation of the SqliteDict class.""" @@ -454,16 +500,22 @@ def run(self): finally: self._lock.release() - res = None + res_ref = None while True: - req, arg, res, outer_stack = self.reqs.get() - if req == '--close--': - assert res, ('--close-- without return queue', res) + # + # req: an SQL command or one of the --magic-- commands we use internally + # arg: arguments for the command + # res_ref: a weak reference to the queue into which responses must be placed + # outer_stack: the outer stack, for producing more informative traces in case of error + # + req, arg, res_ref, outer_stack = self.reqs.get() + + if req == _REQUEST_CLOSE: + assert res_ref, ('--close-- without return queue', res_ref) break - elif req == '--commit--': + elif req == _REQUEST_COMMIT: conn.commit() - if res: - res.put('--no more--') + _put(res_ref, _RESPONSE_NO_MORE) else: try: cursor.execute(req, arg) @@ -504,17 +556,25 @@ def run(self): 'SqliteDict instance to show the outer stack.' ) - if res: + if res_ref: for rec in cursor: - res.put(rec) - res.put('--no more--') + if _put(res_ref, rec) == _PUT_REFERENT_DESTROYED: + # + # The queue we are sending responses to got garbage + # collected. Nobody is listening anymore, so we + # stop sending responses. + # + break + + _put(res_ref, _RESPONSE_NO_MORE) if self.autocommit: conn.commit() self.log.debug('received: %s, send: --no more--', req) conn.close() - res.put('--no more--') + + _put(res_ref, _RESPONSE_NO_MORE) def check_raise_error(self): """ @@ -548,6 +608,10 @@ def check_raise_error(self): def execute(self, req, arg=None, res=None): """ `execute` calls are non-blocking: just queue up the request and return immediately. + + :param req: The request (an SQL command) + :param arg: Arguments to the SQL command + :param res: A queue in which to place responses as they become available """ self.check_raise_error() stack = None @@ -559,7 +623,16 @@ def execute(self, req, arg=None, res=None): # so often. stack = traceback.extract_stack()[:-1] - self.reqs.put((req, arg or tuple(), res, stack)) + # + # We pass a weak reference to the response queue instead of a regular + # reference, because we want the queues to be garbage-collected + # more aggressively. + # + res_ref = None + if res: + res_ref = weakref.ref(res) + + self.reqs.put((req, arg or tuple(), res_ref, stack)) def executemany(self, req, items): for item in items: @@ -579,7 +652,7 @@ def select(self, req, arg=None): while True: rec = res.get() self.check_raise_error() - if rec == '--no more--': + if rec == _RESPONSE_NO_MORE: break yield rec @@ -596,10 +669,10 @@ def commit(self, blocking=True): # blocking=False. This ensures any available exceptions for any # previous statement are thrown before returning, and that the # data has actually persisted to disk! - self.select_one('--commit--') + self.select_one(_REQUEST_COMMIT) else: # otherwise, we fire and forget as usual. - self.execute('--commit--') + self.execute(_REQUEST_COMMIT) def close(self, force=False): if force: @@ -608,12 +681,12 @@ def close(self, force=False): # can't process the request. Instead, push the close command to the requests # queue directly. If run() is still alive, it will exit gracefully. If not, # then there's nothing we can do anyway. - self.reqs.put(('--close--', None, Queue(), None)) + self.reqs.put((_REQUEST_CLOSE, None, weakref.ref(Queue()), None)) else: # we abuse 'select' to "iter" over a "--close--" statement so that we # can confirm the completion of close before joining the thread and # returning (by semaphore '--no more--' - self.select_one('--close--') + self.select_one(_REQUEST_CLOSE) self.join() diff --git a/tests/test_autocommit.py b/tests/test_autocommit.py index a2d7c71..a835ff4 100644 --- a/tests/test_autocommit.py +++ b/tests/test_autocommit.py @@ -6,7 +6,7 @@ def test(): "Verify autocommit just before program exits." - assert os.system('PYTHONPATH=. %s tests/autocommit.py' % sys.executable) == 0 + assert os.system('env PYTHONPATH=. %s tests/autocommit.py' % sys.executable) == 0 # The above script relies on the autocommit feature working correctly. # Now, let's check if it actually worked. d = sqlitedict.SqliteDict('tests/db/autocommit.sqlite') diff --git a/tests/test_core.py b/tests/test_core.py index 820f8d7..0bfc590 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -3,6 +3,7 @@ import unittest import tempfile import os +from unittest.mock import patch # local import sqlitedict @@ -66,6 +67,40 @@ def test_commit_nonblocking(self): d['key'] = 'value' d.commit(blocking=False) + def test_cancel_iterate(self): + import time + + class EndlessKeysIterator: + def __init__(self) -> None: + self.value = 0 + + def __iter__(self): + return self + + def __next__(self): + self.value += 1 + return [self.value] + + with patch('sqlitedict.sqlite3') as mock_sqlite3: + ki = EndlessKeysIterator() + cursor = mock_sqlite3.connect().cursor() + cursor.__iter__.return_value = ki + + with SqliteDict(autocommit=True) as d: + for i, k in enumerate(d.keys()): + assert i + 1 == k + if k > 100: + break + assert ki.value > 101 + + # Release GIL, let background threads run. + # Don't use gc.collect because this is simulate user code. + time.sleep(0.01) + + current = ki.value + time.sleep(1) + assert current == ki.value, 'Will not read more after iterate stop' + class NamedSqliteDictCreateOrReuseTest(TempSqliteDictTest): """Verify default flag='c', and flag='n' of SqliteDict()."""