Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/autotests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:

- uses: actions/setup-python@v2
with:
python-version: '3.x'
python-version: '3.8'

- name: Install python package dependencies
run: |
Expand Down
44 changes: 28 additions & 16 deletions mergin/client_push.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,16 @@
import time
from typing import List, Tuple, Optional, ByteString

from .local_changes import LocalChange, LocalChanges

from .common import UPLOAD_CHUNK_ATTEMPT_WAIT, UPLOAD_CHUNK_ATTEMPTS, UPLOAD_CHUNK_SIZE, ClientError, ErrorCode
from .local_changes import ChangesValidationError, LocalChange, LocalChanges

from .common import (
MAX_UPLOAD_VERSIONED_SIZE,
UPLOAD_CHUNK_ATTEMPT_WAIT,
UPLOAD_CHUNK_ATTEMPTS,
UPLOAD_CHUNK_SIZE,
MAX_UPLOAD_MEDIA_SIZE,
ClientError,
)
from .merginproject import MerginProject
from .editor import filter_changes
from .utils import get_data_checksum
Expand Down Expand Up @@ -296,28 +303,23 @@ def push_project_async(mc, directory) -> Optional[UploadJob]:
mp.log.info(f"--- push {project_path} - nothing to do")
return

mp.log.debug("push changes:\n" + pprint.pformat(changes))
mp.log.debug("push changes:\n" + pprint.pformat(asdict(changes)))
tmp_dir = tempfile.TemporaryDirectory(prefix="python-api-client-")

# If there are any versioned files (aka .gpkg) that are not updated through a diff,
# we need to make a temporary copy somewhere to be sure that we are uploading full content.
# That's because if there are pending transactions, checkpointing or switching from WAL mode
# won't work, and we would end up with some changes left in -wal file which do not get
# uploaded. The temporary copy using geodiff uses sqlite backup API and should copy everything.
for f in changes["updated"]:
if mp.is_versioned_file(f["path"]) and "diff" not in f:
for f in changes.updated:
if mp.is_versioned_file(f.path) and not f.diff:
mp.copy_versioned_file_for_upload(f, tmp_dir.name)

for f in changes["added"]:
if mp.is_versioned_file(f["path"]):
for f in changes.added:
if mp.is_versioned_file(f.path):
mp.copy_versioned_file_for_upload(f, tmp_dir.name)

local_changes = LocalChanges(
added=[LocalChange(**change) for change in changes["added"]],
updated=[LocalChange(**change) for change in changes["updated"]],
removed=[LocalChange(**change) for change in changes["removed"]],
)
job = create_upload_job(mc, mp, local_changes, tmp_dir)
job = create_upload_job(mc, mp, changes, tmp_dir)
return job


Expand Down Expand Up @@ -471,12 +473,22 @@ def remove_diff_files(job: UploadJob) -> None:
os.remove(diff_file)


def get_push_changes_batch(mc, mp: MerginProject) -> Tuple[dict, int]:
def get_push_changes_batch(mc, mp: MerginProject) -> Tuple[LocalChanges, int]:
"""
Get changes that need to be pushed to the server.
"""
changes = mp.get_push_changes()
project_role = mp.project_role()
changes = filter_changes(mc, project_role, changes)

return changes, sum(len(v) for v in changes.values())
try:
local_changes = LocalChanges(
added=[LocalChange(**change) for change in changes["added"]],
updated=[LocalChange(**change) for change in changes["updated"]],
removed=[LocalChange(**change) for change in changes["removed"]],
)
except ChangesValidationError as e:
raise ClientError(
f"Some files exceeded maximum upload size. Files: {', '.join([c.path for c in e.invalid_changes])}. Maximum size for media files is {e.max_media_upload_size / (1024**3)} GB and for geopackage files {e.max_versioned_upload_size / (1024**3)} GB."
)
return local_changes, sum(len(v) for v in changes.values())
10 changes: 9 additions & 1 deletion mergin/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@
# seconds to wait between sync callback calls
SYNC_CALLBACK_WAIT = 0.01

# maximum size of media file able to upload in one push (in bytes)
MAX_UPLOAD_MEDIA_SIZE = 10 * (1024**3)

# maximum size of GPKG file able to upload in one push (in bytes)
MAX_UPLOAD_VERSIONED_SIZE = 5 * (1024**3)

# default URL for submitting logs
MERGIN_DEFAULT_LOGS_URL = "https://g4pfq226j0.execute-api.eu-west-1.amazonaws.com/mergin_client_log_submit"

Expand All @@ -39,7 +45,9 @@ class ErrorCode(Enum):


class ClientError(Exception):
def __init__(self, detail: str, url=None, server_code=None, server_response=None, http_error=None, http_method=None):
def __init__(
self, detail: str, url=None, server_code=None, server_response=None, http_error=None, http_method=None
):
self.detail = detail
self.url = url
self.http_error = http_error
Expand Down
2 changes: 1 addition & 1 deletion mergin/editor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from itertools import filterfalse
from typing import Callable, Dict, List

from .utils import is_mergin_config, is_qgis_file, is_versioned_file
from .utils import is_qgis_file

EDITOR_ROLE_NAME = "editor"

Expand Down
41 changes: 40 additions & 1 deletion mergin/local_changes.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, Optional, List, Tuple
from typing import Optional, List, Tuple

from .utils import is_versioned_file
from .common import MAX_UPLOAD_MEDIA_SIZE, MAX_UPLOAD_VERSIONED_SIZE

MAX_UPLOAD_CHANGES = 100


# The custom exception
class ChangesValidationError(Exception):
def __init__(self, message, invalid_changes=[], max_media_upload_size=None, max_versioned_upload_size=None):
super().__init__(message)
self.invalid_changes = invalid_changes if invalid_changes is not None else []
self.max_media_upload_size = max_media_upload_size
self.max_versioned_upload_size = max_versioned_upload_size


@dataclass
Expand Down Expand Up @@ -55,6 +69,31 @@ class LocalChanges:
updated: List[LocalChange] = field(default_factory=list)
removed: List[LocalChange] = field(default_factory=list)

def __post_init__(self):
"""
Enforce a limit of changes combined from `added` and `updated`.
"""
upload_changes = self.get_upload_changes()
total_changes = len(upload_changes)
oversize_changes = []
for change in upload_changes:
if not is_versioned_file(change.path) and change.size > MAX_UPLOAD_MEDIA_SIZE:
oversize_changes.append(change)
elif not change.diff and change.size > MAX_UPLOAD_VERSIONED_SIZE:
oversize_changes.append(change)
if oversize_changes:
error = ChangesValidationError("Some files exceed the maximum upload size", oversize_changes)
error.max_media_upload_size = MAX_UPLOAD_MEDIA_SIZE
error.max_versioned_upload_size = MAX_UPLOAD_VERSIONED_SIZE
raise error

if total_changes > MAX_UPLOAD_CHANGES:
# Calculate how many changes to keep from `added` and `updated`
added_limit = min(len(self.added), MAX_UPLOAD_CHANGES)
updated_limit = MAX_UPLOAD_CHANGES - added_limit
self.added = self.added[:added_limit]
self.updated = self.updated[:updated_limit]

def to_server_payload(self) -> dict:
return {
"added": [change.to_server_data() for change in self.added],
Expand Down
14 changes: 7 additions & 7 deletions mergin/merginproject.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
conflicted_copy_file_name,
edit_conflict_file_name,
)

from .local_changes import LocalChange

this_dir = os.path.dirname(os.path.realpath(__file__))

Expand Down Expand Up @@ -470,20 +470,20 @@ def get_push_changes(self):
changes["updated"] = [f for f in changes["updated"] if f not in not_updated]
return changes

def copy_versioned_file_for_upload(self, f, tmp_dir):
def copy_versioned_file_for_upload(self, f: LocalChange, tmp_dir: str) -> str:
"""
Make a temporary copy of the versioned file using geodiff, to make sure that we have full
content in a single file (nothing left in WAL journal)
"""
path = f["path"]
path = f.path
self.log.info("Making a temporary copy (full upload): " + path)
tmp_file = os.path.join(tmp_dir, path)
os.makedirs(os.path.dirname(tmp_file), exist_ok=True)
self.geodiff.make_copy_sqlite(self.fpath(path), tmp_file)
f["size"] = os.path.getsize(tmp_file)
f["checksum"] = generate_checksum(tmp_file)
f["chunks"] = [str(uuid.uuid4()) for i in range(math.ceil(f["size"] / UPLOAD_CHUNK_SIZE))]
f["upload_file"] = tmp_file
f.size = os.path.getsize(tmp_file)
f.checksum = generate_checksum(tmp_file)
f.chunks = [str(uuid.uuid4()) for i in range(math.ceil(f.size / UPLOAD_CHUNK_SIZE))]
f.upload_file = tmp_file
return tmp_file

def get_list_of_push_changes(self, push_changes):
Expand Down
23 changes: 22 additions & 1 deletion mergin/test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2328,8 +2328,10 @@ def test_clean_diff_files(mc):
shutil.copy(mp.fpath("inserted_1_A.gpkg"), mp.fpath(f_updated))
mc.push_project(project_dir)

diff_files = glob.glob("*-diff-*", root_dir=os.path.split(mp.fpath_meta("inserted_1_A.gpkg"))[0])
directory = os.path.split(mp.fpath_meta("inserted_1_A.gpkg"))[0]
diff_files = [f for f in os.listdir(directory) if "-diff-" in f]

# Assert that no matching files are found
assert diff_files == []


Expand Down Expand Up @@ -3214,3 +3216,22 @@ def test_client_project_sync_retry(mc):
with pytest.raises(ClientError):
mc.sync_project(project_dir)
assert mock_push_project_async.call_count == 2


def test_push_file_limits(mc):
test_project = "test_push_file_limits"
project = API_USER + "/" + test_project
project_dir = os.path.join(TMP_DIR, test_project)
cleanup(mc, project, [project_dir])
mc.create_project(test_project)
mc.download_project(project, project_dir)
shutil.copy(os.path.join(TEST_DATA_DIR, "base.gpkg"), project_dir)
# setting to some minimal value to mock limit hit
with patch("mergin.local_changes.MAX_UPLOAD_VERSIONED_SIZE", 1):
with pytest.raises(ClientError, match=f"Some files exceeded maximum upload size. Files: base.gpkg."):
mc.push_project(project_dir)

shutil.copy(os.path.join(TEST_DATA_DIR, "test.txt"), project_dir)
with patch("mergin.local_changes.MAX_UPLOAD_MEDIA_SIZE", 1):
with pytest.raises(ClientError, match=f"Some files exceeded maximum upload size. Files: test.txt."):
mc.push_project(project_dir)
5 changes: 4 additions & 1 deletion mergin/test/test_common.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from ..common import ClientError, ErrorCode


def test_client_error_is_blocked_sync():
"""Test the is_blocked_sync method of ClientError."""
error = ClientError(detail="", server_code=None)
Expand All @@ -12,6 +13,7 @@ def test_client_error_is_blocked_sync():
error.server_code = ErrorCode.ProjectVersionExists.value
assert error.is_blocking_sync() is True


def test_client_error_is_rate_limit():
"""Test the is_rate_limit method of ClientError."""
error = ClientError(detail="", http_error=None)
Expand All @@ -21,6 +23,7 @@ def test_client_error_is_rate_limit():
error.http_error = 429
assert error.is_rate_limit() is True


def test_client_error_is_retryable_sync():
"""Test the is_retryable_sync method of ClientError."""
error = ClientError(detail="", server_code=None, http_error=None)
Expand All @@ -43,4 +46,4 @@ def test_client_error_is_retryable_sync():
error.http_error = 500
assert error.is_retryable_sync() is False
error.http_error = 429
assert error.is_retryable_sync() is True
assert error.is_retryable_sync() is True
94 changes: 93 additions & 1 deletion mergin/test/test_local_changes.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from datetime import datetime
import pytest
from unittest.mock import patch

from ..local_changes import LocalChange, LocalChanges
from ..local_changes import ChangesValidationError, LocalChange, LocalChanges, MAX_UPLOAD_CHANGES


def test_local_changes_from_dict():
Expand Down Expand Up @@ -118,3 +120,93 @@ def test_local_changes_get_upload_changes():
assert len(upload_changes) == 2 # Only added and updated should be included
assert upload_changes[0].path == "file1.txt" # First change is from added
assert upload_changes[1].path == "file2.txt" # Second change is from updated


def test_local_changes_post_init_validation_media():
"""Test the get_media_upload_file method of LocalChanges."""
# Define constants
SIZE_LIMIT_MB = 5
SIZE_LIMIT_BYTES = SIZE_LIMIT_MB * 1024 * 1024
SMALL_FILE_SIZE = 1024
LARGE_FILE_SIZE = 15 * 1024 * 1024

# Create sample LocalChange instances
added = [
LocalChange(path="file1.txt", checksum="abc123", size=SMALL_FILE_SIZE, mtime=datetime.now()),
LocalChange(path="file2.jpg", checksum="xyz789", size=LARGE_FILE_SIZE, mtime=datetime.now()), # Over limit
]
updated = [
LocalChange(path="file3.mp4", checksum="lmn456", size=5 * 1024 * 1024, mtime=datetime.now()),
LocalChange(path="file4.gpkg", checksum="opq123", size=SMALL_FILE_SIZE, mtime=datetime.now()),
]

# Initialize LocalChanges
with patch("mergin.local_changes.MAX_UPLOAD_MEDIA_SIZE", SIZE_LIMIT_BYTES):
with pytest.raises(ChangesValidationError, match="Some files exceed") as err:
LocalChanges(added=added, updated=updated)
print(err.value.invalid_changes)
assert len(err.value.invalid_changes) == 1
assert "file2.jpg" == err.value.invalid_changes[0].path
assert err.value.invalid_changes[0].size == LARGE_FILE_SIZE


def test_local_changes_post_init_validation_media():
"""Test the get_gpgk_upload_file method of LocalChanges."""
# Define constants
SIZE_LIMIT_MB = 10
SIZE_LIMIT_BYTES = SIZE_LIMIT_MB * 1024 * 1024
SMALL_FILE_SIZE = 1024
LARGE_FILE_SIZE = 15 * 1024 * 1024

# Create sample LocalChange instances
added = [
LocalChange(path="file1.gpkg", checksum="abc123", size=SMALL_FILE_SIZE, mtime=datetime.now()),
LocalChange(
path="file2.gpkg", checksum="xyz789", size=LARGE_FILE_SIZE, mtime=datetime.now(), diff=None
), # Over limit
]
updated = [
LocalChange(
path="file3.gpkg",
checksum="lmn456",
size=SIZE_LIMIT_BYTES + 1,
mtime=datetime.now(),
diff={"path": "file3-diff.gpkg", "checksum": "diff123", "size": 1024, "mtime": datetime.now()},
),
LocalChange(path="file4.txt", checksum="opq123", size=SMALL_FILE_SIZE, mtime=datetime.now()),
]

# Initialize LocalChanges
with patch("mergin.local_changes.MAX_UPLOAD_VERSIONED_SIZE", SIZE_LIMIT_BYTES):
with pytest.raises(ChangesValidationError) as err:
LocalChanges(added=added, updated=updated)
assert len(err.value.invalid_changes) == 1
assert "file2.gpkg" == err.value.invalid_changes[0].path
assert err.value.invalid_changes[0].size == LARGE_FILE_SIZE


def test_local_changes_post_init():
"""Test the __post_init__ method of LocalChanges."""
# Define constants
ADDED_COUNT = 80
UPDATED_COUNT = 21
SMALL_FILE_SIZE = 1024
LARGE_FILE_SIZE = 2048

# Create more than MAX_UPLOAD_CHANGES changes
added = [
LocalChange(path=f"file{i}.txt", checksum="abc123", size=SMALL_FILE_SIZE, mtime=datetime.now())
for i in range(ADDED_COUNT)
]
updated = [
LocalChange(path=f"file{i}.txt", checksum="xyz789", size=LARGE_FILE_SIZE, mtime=datetime.now())
for i in range(UPDATED_COUNT)
]

# Initialize LocalChanges
local_changes = LocalChanges(added=added, updated=updated)

# Assertions
assert len(local_changes.added) == ADDED_COUNT # All added changes are included
assert len(local_changes.updated) == MAX_UPLOAD_CHANGES - ADDED_COUNT # Only enough updated changes are included
assert len(local_changes.added) + len(local_changes.updated) == MAX_UPLOAD_CHANGES # Total is limited