Skip to content

Split push for blocking and non-blocking #244

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ Commands:
share Fetch permissions to project
share-add Add permissions to [users] to project
share-remove Remove [users] permissions from project
show-file-changeset Displays information about project changes.
show-file-history Displays information about a single version of a...
show-version Displays information about a single version of a...
show-file-changeset Display information about project changes.
show-file-history Display information about a single version of a...
show-version Display information about a single version of a...
status Show all changes in project files - upstream and...
```

Expand All @@ -99,7 +99,7 @@ To download a specific version of a project:
$ mergin --username john download --version v42 john/project1 ~/mergin/project1
```

To download a sepecific version of a single file:
To download a specific version of a single file:

1. First you need to download the project:
```
Expand Down
23 changes: 15 additions & 8 deletions mergin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
download_project_is_running,
)
from mergin.client_pull import pull_project_async, pull_project_is_running, pull_project_finalize, pull_project_cancel
from mergin.client_push import push_project_async, push_project_is_running, push_project_finalize, push_project_cancel
from mergin.client_push import push_next_change, push_project_is_running, push_project_finalize, push_project_cancel


from pygeodiff import GeoDiff
Expand Down Expand Up @@ -412,17 +412,24 @@ def push(ctx):
return
directory = os.getcwd()
try:
job = push_project_async(mc, directory)
if job is not None: # if job is none, we don't upload any files, and the transaction is finished already
with click.progressbar(length=job.total_size) as bar:
# keep going until there are no more changes
while True:
job = push_next_change(mc, directory)
if job is None:
click.echo("All changes uploaded.")
break

# show progress for this single change upload
with click.progressbar(length=job.total_size, label="Uploading change") as bar:
last_transferred_size = 0
while push_project_is_running(job):
time.sleep(1 / 10) # 100ms
new_transferred_size = job.transferred_size
bar.update(new_transferred_size - last_transferred_size) # the update() needs increment only
last_transferred_size = new_transferred_size
# finalize this change upload (bump versions on server & locally)
push_project_finalize(job)
click.echo("Done")
click.echo("Change pushed, checking for more…")
except InvalidProject as e:
click.secho("Invalid project directory ({})".format(str(e)), fg="red")
except ClientError as e:
Expand Down Expand Up @@ -473,7 +480,7 @@ def pull(ctx):
@click.argument("version")
@click.pass_context
def show_version(ctx, version):
"""Displays information about a single version of a project. `version` is 'v1', 'v2', etc."""
"""Display information about a single version of a project. `version` is 'v1', 'v2', etc."""
mc = ctx.obj["client"]
if mc is None:
return
Expand All @@ -492,7 +499,7 @@ def show_version(ctx, version):
@click.argument("path")
@click.pass_context
def show_file_history(ctx, path):
"""Displays information about a single version of a project."""
"""Display information about a single version of a project."""
mc = ctx.obj["client"]
if mc is None:
return
Expand All @@ -516,7 +523,7 @@ def show_file_history(ctx, path):
@click.argument("version")
@click.pass_context
def show_file_changeset(ctx, path, version):
"""Displays information about project changes."""
"""Display information about project changes."""
mc = ctx.obj["client"]
if mc is None:
return
Expand Down
18 changes: 10 additions & 8 deletions mergin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
download_diffs_finalize,
)
from .client_pull import pull_project_async, pull_project_wait, pull_project_finalize
from .client_push import push_project_async, push_project_wait, push_project_finalize
from .client_push import push_next_change, push_project_wait, push_project_finalize
from .utils import DateTimeEncoder, get_versions_with_file_changes, int_version, is_version_acceptable
from .version import __version__

Expand Down Expand Up @@ -93,7 +93,7 @@ def __init__(
plugin_version=None,
proxy_config=None,
):
self.url = url if url is not None else MerginClient.default_url()
self.url = (url if url is not None else MerginClient.default_url()).rstrip("/") + "/"
self._auth_params = None
self._auth_session = None
self._user_info = None
Expand Down Expand Up @@ -473,7 +473,8 @@ def create_project(self, project_name, is_public=False, namespace=None):

def create_project_and_push(self, project_name, directory, is_public=False, namespace=None):
"""
Convenience method to create project and push the initial version right after that.
Convenience method to create project and push the the files right after that.
Creates two versions when directory contains blocking and non-blocking changes.

:param project_name: Project's full name (<namespace>/<name>)
:type project_name: String
Expand Down Expand Up @@ -896,11 +897,12 @@ def push_project(self, directory):
:param directory: Project's directory
:type directory: String
"""
job = push_project_async(self, directory)
if job is None:
return # there is nothing to push (or we only deleted some files)
push_project_wait(job)
push_project_finalize(job)
while True:
job = push_next_change(self, directory)
if not job:
return # there is nothing to push (or we only deleted some files)
push_project_wait(job)
push_project_finalize(job)

def pull_project(self, directory):
"""
Expand Down
2 changes: 1 addition & 1 deletion mergin/client_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def download_project_async(mc, project_path, directory, project_version=None):
"""

if "/" not in project_path:
raise ClientError("Project name needs to be fully qualified, e.g. <username>/<projectname>")
raise ClientError("Project name needs to be fully qualified, e.g. <workspacename>/<projectname>")
if os.path.exists(directory):
raise ClientError("Project directory already exists")
os.makedirs(directory)
Expand Down
127 changes: 105 additions & 22 deletions mergin/client_push.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
import tempfile
import concurrent.futures
import os
from typing import Dict, List, Optional
import click

from .common import UPLOAD_CHUNK_SIZE, ClientError
from .merginproject import MerginProject
from .editor import filter_changes
from .editor import is_editor_enabled, _apply_editor_filters
from .utils import is_qgis_file, is_versioned_file


class UploadJob:
Expand Down Expand Up @@ -82,8 +85,83 @@ def upload_blocking(self, mc, mp):
raise ClientError("Mismatch between uploaded file chunk {} and local one".format(self.chunk_id))


def push_project_async(mc, directory):
"""Starts push of a project and returns pending upload job"""
class ChangesHandler:
"""
Handles preparation of file changes to be uploaded to the server.

This class is responsible for:
- Filtering project file changes.
- Splitting changes into blocking and non-blocking groups.
- TODO: Applying limits such as max file count or size to break large uploads into smaller batches.
- Generating upload-ready change groups for asynchronous job creation.
"""

def __init__(self, client, project_info, changes):
self.client = client
self.project_info = project_info
self._raw_changes = changes

@staticmethod
def is_blocking_file(file):
return is_qgis_file(file["path"]) or is_versioned_file(file["path"])

def _filter_changes(self, changes: Dict[str, List[dict]]) -> Dict[str, List[dict]]:
"""
Filters the given changes dictionary based on the editor's enabled state.

If the editor is not enabled, the changes dictionary is returned as-is. Otherwise, the changes are passed through the `_apply_editor_filters` method to apply any configured filters.

Args:
changes (dict[str, list[dict]]): A dictionary mapping file paths to lists of change dictionaries.

Returns:
dict[str, list[dict]]: The filtered changes dictionary.
"""
if not is_editor_enabled(self.client, self.project_info):
return changes
return _apply_editor_filters(changes)

def _split_by_type(self, changes: Dict[str, List[dict]]) -> List[Dict[str, List[dict]]]:
"""
Split raw filtered changes into two batches:
1. Blocking: updated/removed and added files that are blocking
2. Non-blocking: added files that are not blocking
"""
blocking_changes = {"added": [], "updated": [], "removed": [], "renamed": []}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we get rid of legacy "renamed", it is not used for a while

non_blocking_changes = {"added": [], "updated": [], "removed": [], "renamed": []}

for f in changes.get("added", []):
if self.is_blocking_file(f):
blocking_changes["added"].append(f)
else:
non_blocking_changes["added"].append(f)

for f in changes.get("updated", []):
blocking_changes["updated"].append(f)

for f in changes.get("removed", []):
blocking_changes["removed"].append(f)

result = []
if any(len(v) for v in blocking_changes.values()):
result.append(blocking_changes)
if any(len(v) for v in non_blocking_changes.values()):
result.append(non_blocking_changes)

return result

def split(self) -> List[Dict[str, List[dict]]]:
"""
Applies all configured internal filters and returns a list of change ready to be uploaded.
"""
changes = self._filter_changes(self._raw_changes)
changes_list = self._split_by_type(changes)
# TODO: apply limits; changes = self._limit_by_file_count(changes)
return changes_list


def push_next_change(mc, directory) -> Optional[UploadJob]:
"""Starts push of a change of a project and returns pending upload job"""

mp = MerginProject(directory)
if mp.has_unfinished_pull():
Expand All @@ -107,8 +185,8 @@ def push_project_async(mc, directory):

username = mc.username()
# permissions field contains information about update, delete and upload privileges of the user
# on a specific project. This is more accurate information then "writernames" field, as it takes
# into account namespace privileges. So we have to check only "permissions", namely "upload" one
# on a specific project. This is more accurate information than "writernames" field, as it takes
# into account namespace privileges. So we have to check only "permissions", namely "upload" once
if not mc.has_writing_permissions(project_path):
mp.log.error(f"--- push {project_path} - username {username} does not have write access")
raise ClientError(f"You do not seem to have write access to the project (username '{username}')")
Expand All @@ -120,9 +198,14 @@ def push_project_async(mc, directory):
+ f"\n\nLocal version: {local_version}\nServer version: {server_version}"
)

changes = mp.get_push_changes()
changes = filter_changes(mc, project_info, changes)
mp.log.debug("push changes:\n" + pprint.pformat(changes))
all_changes = mp.get_push_changes()
changes_list = ChangesHandler(mc, project_info, all_changes).split()
if not changes_list:
return None

# take only the first change
change = changes_list[0]
mp.log.debug("push change:\n" + pprint.pformat(change))

tmp_dir = tempfile.TemporaryDirectory(prefix="python-api-client-")

Expand All @@ -131,22 +214,22 @@ def push_project_async(mc, directory):
# That's because if there are pending transactions, checkpointing or switching from WAL mode
# won't work, and we would end up with some changes left in -wal file which do not get
# uploaded. The temporary copy using geodiff uses sqlite backup API and should copy everything.
for f in changes["updated"]:
for f in change["updated"]:
if mp.is_versioned_file(f["path"]) and "diff" not in f:
mp.copy_versioned_file_for_upload(f, tmp_dir.name)

for f in changes["added"]:
for f in change["added"]:
if mp.is_versioned_file(f["path"]):
mp.copy_versioned_file_for_upload(f, tmp_dir.name)

if not sum(len(v) for v in changes.values()):
if not any(len(v) for v in change.values()):
mp.log.info(f"--- push {project_path} - nothing to do")
return

# drop internal info from being sent to server
for item in changes["updated"]:
for item in change["updated"]:
item.pop("origin_checksum", None)
data = {"version": local_version, "changes": changes}
data = {"version": local_version, "changes": change}

try:
resp = mc.post(
Expand All @@ -163,7 +246,7 @@ def push_project_async(mc, directory):
upload_files = data["changes"]["added"] + data["changes"]["updated"]

transaction_id = server_resp["transaction"] if upload_files else None
job = UploadJob(project_path, changes, transaction_id, mp, mc, tmp_dir)
job = UploadJob(project_path, change, transaction_id, mp, mc, tmp_dir)

if not upload_files:
mp.log.info("not uploading any files")
Expand Down Expand Up @@ -196,16 +279,16 @@ def push_project_async(mc, directory):

total_size += file_size

job.total_size = total_size
job.upload_queue_items = upload_queue_items
job.total_size = total_size
job.upload_queue_items = upload_queue_items

mp.log.info(f"will upload {len(upload_queue_items)} items with total size {total_size}")
mp.log.info(f"will upload {len(upload_queue_items)} items with total size {total_size}")

# start uploads in background
job.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
for item in upload_queue_items:
future = job.executor.submit(_do_upload, item, job)
job.futures.append(future)
# start uploads in background
job.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would already start sending some chunks from second upload before first one is finalized

I would rather keep push_project_async for single job only (including start/upload/finalize) and process split changes sequentially as number of serial push_project_async calls with or without pull before each (I do not know, in principle for non-blocking push you should not need to pull)

for item in upload_queue_items:
future = job.executor.submit(_do_upload, item, job)
job.futures.append(future)

return job

Expand Down
2 changes: 1 addition & 1 deletion mergin/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
this_dir = os.path.dirname(os.path.realpath(__file__))


# Error code from the public API, add to the end of enum as we handle more eror
# Error code from the public API, add to the end of enum as we handle more error
class ErrorCode(Enum):
ProjectsLimitHit = "ProjectsLimitHit"
StorageLimitHit = "StorageLimitHit"
Expand Down
19 changes: 1 addition & 18 deletions mergin/editor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from itertools import filterfalse
from typing import Callable, Dict, List

from .utils import is_mergin_config, is_qgis_file, is_versioned_file
from .utils import is_qgis_file

EDITOR_ROLE_NAME = "editor"

Expand Down Expand Up @@ -40,23 +40,6 @@ def _apply_editor_filters(changes: Dict[str, List[dict]]) -> Dict[str, List[dict
return changes


def filter_changes(mc, project_info: dict, changes: Dict[str, List[dict]]) -> Dict[str, List[dict]]:
"""
Filters the given changes dictionary based on the editor's enabled state.

If the editor is not enabled, the changes dictionary is returned as-is. Otherwise, the changes are passed through the `_apply_editor_filters` method to apply any configured filters.

Args:
changes (dict[str, list[dict]]): A dictionary mapping file paths to lists of change dictionaries.

Returns:
dict[str, list[dict]]: The filtered changes dictionary.
"""
if not is_editor_enabled(mc, project_info):
return changes
return _apply_editor_filters(changes)


def prevent_conflicted_copy(path: str, mc, project_info: dict) -> bool:
"""
Decides whether a file path should be blocked from creating a conflicted copy.
Expand Down
Loading
Loading