-
Notifications
You must be signed in to change notification settings - Fork 7
Split push for blocking and non-blocking #244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Follows up MerginMaps/server#464 Separates push changes to blocking and non-blocking changes that the server can handle concurrently. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good staff here. I am not sure if we want to run each job and wait for him and then pull changes and run it again. If yes, we could make some tweak to logic.
k: sorted(v, key=lambda f: f["path"]) for k, v in d.items() | ||
} | ||
|
||
def test_changes_handler(mc): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks fine. Just add some integration test to make 2 pushes in 2 threads (?). I think we could use mc and mc2 for that shit :)
@@ -2886,3 +2894,38 @@ def test_mc_without_login(): | |||
# without login should not be able to access workspaces | |||
with pytest.raises(ClientError, match="Authentication information is missing or invalid."): | |||
mc.workspaces_list() | |||
|
|||
def sort_dict_of_files_by_path(d): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use _ as internal helper
@@ -2440,7 +2446,7 @@ def test_project_rename(mc: MerginClient): | |||
|
|||
# validate project info | |||
project_info = mc.project_info(project_renamed) | |||
assert project_info["version"] == "v1" | |||
assert project_info["version"] == "v2" # teo version created in initial push |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can get rid of testing direct versin number from some places in tests. I think we do not need to test it in project_rename. We could stay with that in tests which are related to push testing directly. Just idea :)
assert "Wrong version parameters" in str(e.value) | ||
assert "Available versions: [1, 2, 3, 4]" in str(e.value) | ||
assert "Available versions: [1, 3, 4, 5]" in str(e.value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I probably do not understand, but why 2 is missing here? :)
mergin/client_push.py
Outdated
if not sum(len(v) for v in changes.values()): | ||
mp.log.info(f"--- push {project_path} - nothing to do") | ||
return | ||
for changes in changes_list: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure If we want to submit to jobs all changes or wait for each change and then call finalize with new pull of changes.
In case we want to run each change and then recalculate changes again, I suggest to make sure that queue of jobs is recalculated each time finalize is called. Something like recursion or some while loop on top of that.
def push():
changes = get_push_changes()
changes = changes_handler.split()
while len(changes):
job = push_project_async()
while push_is_finished(job):
yield some_progress # we could probably move that here from cli.py in that case
push_project_finalize(job)
changes = get_push_changes()
changes = changes_handler.split()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, we need to split workload here into processing changes and then do serial push_async jobs.
1. Blocking: updated/removed and added files that are blocking | ||
2. Non-blocking: added files that are not blocking | ||
""" | ||
blocking_changes = {"added": [], "updated": [], "removed": [], "renamed": []} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we get rid of legacy "renamed", it is not used for a while
mergin/client_push.py
Outdated
Applies all configured internal filters and returns a list of change ready to be uploaded. | ||
""" | ||
changes = self._filter_changes(self._raw_changes) | ||
changes = self._split_by_type(changes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we are changing datatype here, I would not reuse the same variable and name then
mp.log.info(f"will upload {len(upload_queue_items)} items with total size {total_size}") | ||
|
||
# start uploads in background | ||
job.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this would already start sending some chunks from second upload before first one is finalized
I would rather keep push_project_async
for single job only (including start/upload/finalize) and process split changes sequentially as number of serial push_project_async calls with or without pull before each (I do not know, in principle for non-blocking push you should not need to pull)
mergin/client_push.py
Outdated
if not sum(len(v) for v in changes.values()): | ||
mp.log.info(f"--- push {project_path} - nothing to do") | ||
return | ||
for changes in changes_list: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, we need to split workload here into processing changes and then do serial push_async jobs.
When two clients push at the same time, one client may remove changes uploaded by the other one. See merged logs from clieant A and client B. |
Resolves #240