-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replace Redis locking with a query state machine #434
Conversation
…ome duped fixtures
…nherit from Query
Co-Authored-By: maxalbert <maxalbert@users.noreply.github.com>
Co-Authored-By: maxalbert <maxalbert@users.noreply.github.com>
Codecov Report
@@ Coverage Diff @@
## master #434 +/- ##
==========================================
+ Coverage 91.33% 91.34% +<.01%
==========================================
Files 96 97 +1
Lines 5459 5638 +179
Branches 641 663 +22
==========================================
+ Hits 4986 5150 +164
- Misses 344 359 +15
Partials 129 129
Continue to review full report at Codecov.
|
…ing of the strings) with dummy_redis.set()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work! 👍 I'm happy to merge this now. This will make a lot of things easier to implementa and reason about. 🎉
When looking at some of the tests (especially in test_query_state.py
, but also elsewhere) I was sometimes confused about why they actually test what they purport to test. The reason for this confusion is that they make certain assumptions about how functionality is implemented internally in flowmachine which isn't visible in the actual test.
A simple example is that QueryStateMachine.wait_until_complete()
uses the _sleep
function internally (so that monkeypatching it allows us to test certain conditions), but there is nowhere this is visible in the test. There are a bunch more subtle and difficult-to-reason-about assumptions (e.g. how Query uses QueryStateMachine internally and how certain methods call each other under certain conditions). I'd be happier if there was a way to make these assumptions really obvious in the tests because it makes them somewhat brittle, but I can't see an easy way without reworking a bunch of other code and it's only tangentially related to this PR so let's just keep a mental note of it and aim to fix this in the future. But it's clear that the tests expose certain pains in the code base which often make it necessary to use monkeypatching or lengthy test setup code which ideally we shouldn't need. Anyway, not immediately related to this PR, just an observation from reviewing it.
mock_func_has_lock.return_value = True | ||
assert "running" == query_proxy.poll() | ||
qsm = QueryStateMachine(dummy_redis, query_id=q.md5) | ||
dummy_redis._store[qsm.state_machine._name] = QueryState.EXECUTING.value.encode() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dummy_redis._store[qsm.state_machine._name] = QueryState.EXECUTING.value.encode() | |
dummy_redis.set(qsm.state_machine._name, QueryState.EXECUTING.value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we using the internal property ._name
here and in the other tests? This doesn't seem to be used anywhere in the actual flowmachine code, so I'm not sure what the purpose in the tests is. If we do want to use it, can we wrap it up in a non-private attribute on QueryStateMachine
itself and give it a meaningful name to reveal its purpose?
|
||
mock_func_has_lock.return_value = False | ||
dummy_redis._store[qsm.state_machine._name] = QueryState.COMPLETED.value.encode() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dummy_redis._store[qsm.state_machine._name] = QueryState.COMPLETED.value.encode() | |
dummy_redis.set(qsm.state_machine._name, QueryState.COMPLETED.value) |
assert "running" == query_proxy.poll() | ||
qsm = QueryStateMachine(dummy_redis, query_id=q.md5) | ||
dummy_redis._store[qsm.state_machine._name] = QueryState.EXECUTING.value.encode() | ||
assert QueryState.EXECUTING.value == query_proxy.poll() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert QueryState.EXECUTING.value == query_proxy.poll() | |
assert QueryState.EXECUTING == query_proxy.poll() |
) | ||
# Set query state | ||
qsm = QueryStateMachine(dummy_redis, query_id=q.md5) | ||
dummy_redis._store[qsm.state_machine._name] = current_state.encode() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dummy_redis._store[qsm.state_machine._name] = current_state.encode() | |
dummy_redis.set(qsm.state_machine._name, current_state) |
"blocking_state", [QueryState.EXECUTING, QueryState.RESETTING, QueryState.QUEUED] | ||
) | ||
def test_blocks(blocking_state, monkeypatch, dummy_redis): | ||
"""Test that states which alter the executing state of the query block.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incomplete sentence in docstring?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whoops sorry, not incomplete but I didn't understand the grammatical structure. 😂
"""Test that resetting a query's cache will error if in a state where that isn't possible.""" | ||
q = DummyQuery(1, sleep_time=5) | ||
qsm = QueryStateMachine(q.redis, q.md5) | ||
# Mark the query as in the process of resetting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to this comment, should there be a qsm.reset()
after `qsm.execute() below? The test passes either way, just wondering what the intent is...
redis: StrictRedis, | ||
query: "Query", | ||
connection: "Connection", | ||
ddl_ops_func: Callable[[str, str], List[str]], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are ddl_ops_func
and write_func
arguments that need to be passed in? Why shouldn't their functionality just be part of the internal responsibility of write_query_to_cache
?
q_state_machine.raise_error() | ||
logger.error(f"Error executing SQL. Error was {e}") | ||
raise e | ||
if schema == "cache": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason to only conditionally write cache metadata? I'm a little confused about the meaning of this schema
argument. I assumed its purpose was to allow a different name for the cache schema (e.g. for testing), but it looks like it's passed in from Query.to_sql()
and ModelResult.to_sql()
and is basically the schema that the queries are stored in? I trust that it does the right thing but I'm not sure I fully follow the logic.
f"Got a bad state for '{query_id}'. Original exception was {e}" | ||
) | ||
|
||
if query_state == QueryState.EXECUTING: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just as a thought, I wonder whether it would be useful to have the error messages live in QueryState directly and simply propagate them here? Would allow us to get rid of the if/elif chain. But haven't looked into it deeply, it's possble this wouldn't work out.
@pytest.mark.parametrize( | ||
"current_state, expected_error", | ||
[ | ||
(QueryState.KNOWN, MissingQueryError), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to my comment above, I'm wondering whether it's worth always raising the same error. I'll keep that in mind for my re-working / removal of the QueryProxy class.
@greenape FYI, I have added a couple more commits on top of yours, but literally only cosmetics. |
…se the logic seems to be subtly different and makes the test 'test_drop_query_blocks' hang. This reverts commit d5bf413.
Closes #283
I have:
Description
Previously we used a quasi-reentrant lock to prevent executing a query on FlowDB if it was already in the process of being executed. In the ancient times, this was implemented as a thread based rlock, but in living memory this has used the redis locking algorithm.
This doesn't quite suffice, because the lock is only acquired once a thread is available to start running the query, which, combined with the 1:1 relationship between the number of threads and the number of database connections means this would happen fairly quickly.
In FlowMachine library terms, this as shown in #283, but could equally result in a query composing a subquery re-executing the subquery when it should actually wait for the subquery to be written to cache. In FlowMachine server terms, this means that with n_threads queries currently running, any additional query submitted to the server will appear to be awol until one currently running finishes.
This replaces the lock with a state machine, which actually runs atomically in redis. Queries can be in one of several states, some of which indicate that you can safely get their SQL, and some of which indicate that you should wait to do so because it is likely to change.
#283 is hence mitigated, since nothing ever has to wait for a lock and hence marking a query as running can safely be done in the main thread. It also has some additional benefits, in that one can now get a more accurate view of a query's current state. This includes whether it is actively running, or just expected to at some point, whether it is being wiped from cache right now, if it failed to run successfully (even if somebody else last tried to run it), and so on.
(Looking forward to some debate on this one ;))