-
Notifications
You must be signed in to change notification settings - Fork 915
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixe using ThreadRunner
with dataset factories
#4093
Fixe using ThreadRunner
with dataset factories
#4093
Conversation
Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue appears for both ThreadRunner and ParallelRunner when loading the same pattern dataset.
Does this issue also exist in ParallelRunner
? I see all github issues are from ThreadRunner
.
I am a bit concern about the performance penalty, the main issue here is a very simple check that causing duplicate dataset registration.
Even if we have to go with Lock
, I think AbstractRunner is not the right place to add. See issues like this, in the past we have something called ShelveStore
, simply importing multiprocess
will cause issues in more restricted environment.
Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>
Thank you, @noklam!
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the point about the performance issues, but it feels wrong to write to a shared object without locking it. Theoretically, it should only affect the case when load() is done via calling an external API, because otherwise, we still cannot do simultaneous readings because of GIL. The solution to replace the dataset when registering it does not seem good as well since we will have to differ the cases when we replace it to bypass this issue and the rest.
I agree it's wrong to use shared object, it's definitely a bug and this is good enough for a temporary fix.
My question will be, would it make sense to resolve dataset pattern slightly earlier? We introduce dataset pattern to make catalog.yml
simpler, this is resolved during pipeline execution. Most of the information is available once the pipeline is ready (roughly at before_pipeline_run
). If we resolve the pattern here , it may be sufficient. (that is, of course ignoring the "dynamic pipeline" case where new dataset is being injected. But I think those case will suffer from the same problem anyway and most likely won't work with ParallelRunner
/ThreadRunner
from the first place.
Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>
Yes, it can be done as well, see the update. Not sure which fix is worse though 🤔 |
My worry is that, as already pointed out, both fixes touch parts of Kedro that should be unencumbered by this.
What's the cost of keeping the issue unfixed and instead make the upcoming
One thing is not clear to me: is it enough to redesign the |
This particular issue will go in the new version if we implement this: #4110 (that's basically what we do in the second solution) and splitting reading from writing should address the rest of the cases. But there might still be cases if we do some conditional writings within the catalog that require changes on the runners' side to make them safe. So, I would say that we will most probably need to redesign runners as well. |
Then my vote goes for the solution that touches the AbstractRunner and leaves the Session alone |
When working on DC2.0 I noted an issue second solution. Converted to draft until resolved. |
Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>
Solution 2 is fixed to consider only datasets from the target pipeline that match patterns. |
Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>
Ugh... yeah difficult tradeoffs. I see this has been reported by at least 2 different users, but worsening the Spark/Databricks experience is not a great outcome either. That, on top of the solutions being ugly already, makes me think that maybe we should indeed not do anything for now. @noklam what are the implications of resolving |
@lrcouto Would it be possible to test this since you are working on the pipeline setup? I think this may be a good test case. |
Alright, let's wait until the performance tests are out and then decide on the temporal approach based on the results. |
I tried running a test project with spark, just to get an idea. Ran each 10 times or so. Running it with the ThreadRunner before the changes:
Running it with the ThreadRunner after the changes:
I still want to try it again with something a little more complex, but I did perceive a small performance decrease. The test project I'm using is not super realistic and is not using any actual data, just generating and grouping 1.000.000 row dataframes to make the computer do things, so I still want to test it with something more akin to reality. |
Thanks a lot @lrcouto for sharing these results. From 3 minutes to 4:30 is still a 50 % performance decrease... do you have any deviation measurements? |
I think I still want to test it a few more times just to make sure it wasn't an external factor (another process running on the background or something) that caused that difference, because it does look a bit high for me. I'll run a couple more times and bring the details. |
I re-did the test making sure that there was nothing running in the background that could affect the results, and ran them in separate environments. The results were similar to what I've posted before. Running 10x with regular Kedro, without the changes:
Running 10x after the changes:
|
🤓 we have enough evidence to reject that the times didn't worsen (read that twice) Stats jokes aside, it's still a 40 % increase in run time. Can potentially be very annoying for some users. |
Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>
Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>
Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>
…fix/4007-thread-runner-with-dataset-factories
Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>
Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>
ThreadRunner
with dataset factories
Reverted to solution 2 after the test results for solution 1 |
@pytest.mark.usefixtures("mock_settings_context_class") | ||
@pytest.mark.parametrize("fake_pipeline_name", [None, _FAKE_PIPELINE_NAME]) | ||
@pytest.mark.parametrize("match_pattern", [True, False]) | ||
def test_run_thread_runner( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this test also verify that dataset factories are resolved? Or is it not possible to test that specifically because the behaviour is not deterministic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does not test that because we mock all the calls to runner, pipelines, catalog, etc, as for the rest of the session tests. It only tests that when using ThreadRunner
, we enter into the branches to resolve the patterns.
The resolution logic is tested with the catalog tests, though it can be tested here. But for that, we'll have to properly initialise runner, pipelines, and catalog - which might be too complex given that the resolution logic is already tested separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Thanks for clarifying! 🙂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ElenaKhaustova , great work investigating the issue and coming up with a suitable solution!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comment regarding the release notes but looks good! ⭐
RELEASE.md
Outdated
@@ -1,8 +1,10 @@ | |||
# Upcoming Release |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep the heading here as it was, it should be changes during the release so the automatic release notes extraction works properly!
Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⭐️great work on figuring out the mocking and it is great to see the pipeline testing already help us to make some decisions 🔥 @lrcouto
Description
Fixes #4007
The issue is not deterministic and appears depending on the order in which threads are executed.
The issue appears for
ThreadRunner
when loading the same pattern dataset. Since patterns are resolved at runtime, corresponding datasets are added upon execution:load()
->_get_dataset()
->add()
. Inside_get_dataset()
kedro/kedro/io/data_catalog.py
Line 423 in 2a97dd4
add()
based on the state of the preceding check. The issue appears when first thread saved the statedataset is not in catalog
but another thread added this dataset right after. So the first thread holds the outdated state based on which we try to add dataset to the catalog again.This issue does not affect
ParallelRunner
because we never share objects between processes, so in each process, we have separate catalogs.Development notes
The main source of the issue is that
catalog.load()
does not only read but also write, and the catalog itself is not thread-safe now. So it feels like it's a major problem which we can temporarily address as suggested and make it properly when runners redesign work.There are two solutions suggested:
load()
dataset (which does writing by callingadd()
inside) function which is applied to threads. With it, we check ifdataset in the catalog
and add dataset together as the wholedataset.load()
is protected by the lock. The possible downside of this solution is affected performance but it should only affect the case whenload()
is done via calling an external API, because otherwise, we still cannot do simultaneous readings because of GIL.load()
callsadd()
as all patterns are resolved based on the pipeline datasets. This solution neglects lazy pattern resolution and will not work with dynamic pipelines.I prefer the first solution cause it feels wrong to write to a shared object without locking it.
Note: if testing you need to run it multiple times to reproduce the error as it's not deterministic.
Developer Certificate of Origin
We need all contributions to comply with the Developer Certificate of Origin (DCO). All commits must be signed off by including a
Signed-off-by
line in the commit message. See our wiki for guidance.If your PR is blocked due to unsigned commits, then you must follow the instructions under "Rebase the branch" on the GitHub Checks page for your PR. This will retroactively add the sign-off to all unsigned commits and allow the DCO check to pass.
Checklist
RELEASE.md
file