Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for bucket to bucket sync of the latest versions of files #165

Merged
merged 23 commits into from
Nov 3, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
de1f5aa
Add support for bucket to bucket sync
mlech-reef Sep 1, 2020
5702cb6
Add unit tests
mlech-reef Sep 9, 2020
66cb55a
Merge remote-tracking branch 'upstream/master' into b2b-sync
mlech-reef Sep 11, 2020
3eb5189
Improve unit testing
mlech-reef Sep 12, 2020
cabba13
Fix emerge planner
mlech-reef Sep 12, 2020
8b5328d
Increase the timeout for server-side copy
mlech-reef Sep 13, 2020
f089bd9
Add small improvements in sync
mlech-reef Sep 13, 2020
dd3e6ec
Update unit tests after review
mlech-reef Sep 17, 2020
e196a02
Fix reporter for copy, few fixes
mlech-reef Sep 17, 2020
c1b51f9
Fix unit tests for windows
mlech-reef Sep 17, 2020
2f0f77f
Fix usage of progress listener
mlech-reef Sep 17, 2020
0df6236
Merge remote-tracking branch 'upstream/master' into b2b-sync
mlech-reef Sep 30, 2020
ded4499
Add better API handling in conftest
mlech-reef Oct 1, 2020
3a3da88
Change error class in sync
mlech-reef Oct 1, 2020
8da4823
Merge remote-tracking branch 'upstream/master' into b2b-sync
mlech-reef Oct 10, 2020
9dba8b0
Update after PR
mlech-reef Oct 10, 2020
3d5a0b8
Revert "Fix usage of progress listener"
mlech-reef Oct 10, 2020
93013c1
Update b2sdk/sync/action.py
mlech-reef Oct 14, 2020
c96d741
Merge remote-tracking branch 'upstream/master' into b2b-sync
mlech-reef Oct 14, 2020
5b0fdd2
Update docstring of Synchronizer
mlech-reef Oct 28, 2020
56c4dcb
Update Synchronizer docstring
mlech-reef Oct 28, 2020
2d6e955
Update after PR
mlech-reef Oct 28, 2020
a7429eb
Remove unused import from the tests
mlech-reef Oct 28, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ b2sdk>=0.0.0,<1.0.0
## Not released yet

* Drop Python 2 and Python 3.4 support :tada:
* Support for bucket to bucket sync

## 1.1.4 (2020-07-15)

Expand Down
4 changes: 2 additions & 2 deletions b2sdk/bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,10 +687,10 @@ def copy(
"""

copy_source = CopySource(file_id, offset=offset, length=length)
if length is None:
if not length:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is not the same when length is 0. I think this first branch should still happen in that case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first branch is using a simple copy manager and the second emerger created by @mzukowski-reef
The problem is that emerger doesn't work when copying 0-length files and actually is not needed for such files

# TODO: it feels like this should be checked on lower level - eg. RawApi
validate_b2_file_name(new_file_name)
return self.api.services.upload_manager.copy_file(
return self.api.services.copy_manager.copy_file(
copy_source,
new_file_name,
content_type=content_type,
Expand Down
69 changes: 69 additions & 0 deletions b2sdk/sync/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,75 @@ def __str__(self):
)


class B2CopyAction(AbstractAction):
"""
File copying action.
"""

def __init__(
self, relative_name, b2_file_name, file_id, dest_b2_file_name, mod_time_millis, size
):
"""
:param relative_name: a relative file name
:type relative_name: str
:param b2_file_name: a name of a remote file
:type b2_file_name: str
:param file_id: a file ID
:type file_id: str
:param dest_b2_file_name: a name of a destination remote file
:type dest_b2_file_name: str
:param mod_time_millis: file modification time in milliseconds
:type mod_time_millis: int
:param size: a file size
:type size: int
"""
self.relative_name = relative_name
self.b2_file_name = b2_file_name
self.file_id = file_id
self.dest_b2_file_name = dest_b2_file_name
self.mod_time_millis = mod_time_millis
self.size = size

def get_bytes(self):
"""
Return file size.

:rtype: int
"""
return self.size

def do_action(self, bucket, reporter):
"""
Perform the copying action, returning only after the action is completed.

:param bucket: a Bucket object
:type bucket: b2sdk.bucket.Bucket
:param reporter: a place to report errors
"""
bucket.copy(
self.file_id,
self.dest_b2_file_name,
length=self.size,
progress_listener=SyncFileReporter(reporter)
)

def do_report(self, bucket, reporter):
"""
Report the copying action performed.

:param bucket: a Bucket object
:type bucket: b2sdk.bucket.Bucket
:param reporter: a place to report errors
"""
reporter.print_completion('copy ' + self.relative_name)

def __str__(self):
return (
'b2_copy(%s, %s, %s, %d)' %
(self.b2_file_name, self.file_id, self.dest_b2_file_name, self.mod_time_millis)
)


class B2DeleteAction(AbstractAction):
def __init__(self, relative_name, b2_file_name, file_id, note):
"""
Expand Down
56 changes: 55 additions & 1 deletion b2sdk/sync/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import logging

from ..exception import DestFileNewer
from .action import LocalDeleteAction, B2DeleteAction, B2DownloadAction, B2HideAction, B2UploadAction
from .action import LocalDeleteAction, B2CopyAction, B2DeleteAction, B2DownloadAction, B2HideAction, B2UploadAction
from .exception import InvalidArgument

ONE_DAY_IN_MS = 24 * 60 * 60 * 1000
Expand Down Expand Up @@ -305,6 +305,60 @@ class DownAndKeepDaysPolicy(DownPolicy):
pass


class CopyPolicy(AbstractFileSyncPolicy):
"""
File is copied (server-side).
"""
DESTINATION_PREFIX = 'b2://'
SOURCE_PREFIX = 'b2://'

def _make_transfer_action(self):
return B2CopyAction(
self._source_file.name,
self._source_folder.make_full_path(self._source_file.name),
self._source_file.latest_version().id_,
self._dest_folder.make_full_path(self._source_file.name),
self._get_source_mod_time(),
self._source_file.latest_version().size,
)


class CopyAndDeletePolicy(CopyPolicy):
"""
File is copied (server-side) and the delete flag is SET.
"""

def _get_hide_delete_actions(self):
for action in super()._get_hide_delete_actions():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What actions might the parent class have that we want to use here? (It looks like the actual base class just returns the empty list.)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the parent class will start returning something, we'd like this class (and all descendants) to honor that without modification of the inheriting classes

yield action
for action in make_b2_delete_actions(
self._source_file,
self._dest_file,
self._dest_folder,
self._transferred,
):
yield action


class CopyAndKeepDaysPolicy(CopyPolicy):
"""
File is copied (server-side) and the keepDays flag is SET.
"""

def _get_hide_delete_actions(self):
for action in super()._get_hide_delete_actions():
yield action
for action in make_b2_keep_days_actions(
self._source_file,
self._dest_file,
self._dest_folder,
self._transferred,
self._keep_days,
self._now_millis,
):
yield action


def make_b2_delete_note(version, index, transferred):
"""
Create a note message for delete action.
Expand Down
22 changes: 16 additions & 6 deletions b2sdk/sync/policy_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
#
######################################################################

from .policy import DownAndDeletePolicy, DownAndKeepDaysPolicy, DownPolicy
from .policy import UpAndDeletePolicy, UpAndKeepDaysPolicy, UpPolicy
from .policy import CopyAndDeletePolicy, CopyAndKeepDaysPolicy, CopyPolicy, \
DownAndDeletePolicy, DownAndKeepDaysPolicy, DownPolicy, UpAndDeletePolicy, \
UpAndKeepDaysPolicy, UpPolicy


class SyncPolicyManager(object):
Expand Down Expand Up @@ -87,10 +88,19 @@ def get_policy_class(self, sync_type, delete, keep_days):
return DownAndKeepDaysPolicy
else:
return DownPolicy
assert False, 'invalid sync type: %s, keep_days: %s, delete: %s' % (
sync_type,
keep_days,
delete,
elif sync_type == 'b2-to-b2':
if delete:
return CopyAndDeletePolicy
elif keep_days:
return CopyAndKeepDaysPolicy
else:
return CopyPolicy
raise NotImplemented(
'invalid sync type: %s, keep_days: %s, delete: %s' % (
sync_type,
keep_days,
delete,
)
)


Expand Down
25 changes: 14 additions & 11 deletions b2sdk/sync/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,17 @@ def sync_folders(self, source_folder, dest_folder, now_millis, reporter):
:param int now_millis: current time in milliseconds
:param b2sdk.sync.report.SyncReport,None reporter: progress reporter
"""
source_type = source_folder.folder_type()
dest_type = dest_folder.folder_type()

if source_type != 'b2' and dest_type != 'b2':
raise NotImplemented('Sync between two local folders is not supported!')

# For downloads, make sure that the target directory is there.
if dest_folder.folder_type() == 'local' and not self.dry_run:
if dest_type == 'local' and not self.dry_run:
dest_folder.ensure_present()

if source_folder.folder_type() == 'local' and not self.allow_empty_source:
if source_type == 'local' and not self.allow_empty_source:
source_folder.ensure_non_empty()

# Make an executor to count files and run all of the actions. This is
Expand All @@ -192,23 +198,20 @@ def sync_folders(self, source_folder, dest_folder, now_millis, reporter):
# First, start the thread that counts the local files. That's the operation
# that should be fastest, and it provides scale for the progress reporting.
local_folder = None
if source_folder.folder_type() == 'local':
if source_type == 'local':
local_folder = source_folder
if dest_folder.folder_type() == 'local':
local_folder = dest_folder
if local_folder is None:
raise ValueError('neither folder is a local folder')
if reporter:
if reporter and local_folder is not None:
sync_executor.submit(count_files, local_folder, reporter)

# Schedule each of the actions
bucket = None
if source_folder.folder_type() == 'b2':
bucket = source_folder.bucket
if dest_folder.folder_type() == 'b2':
if dest_type == 'b2':
bucket = dest_folder.bucket
if bucket is None:
raise ValueError('neither folder is a b2 folder')

total_files = 0
total_bytes = 0
for action in self.make_folder_sync_actions(
Expand Down Expand Up @@ -250,8 +253,8 @@ def make_folder_sync_actions(
source_type = source_folder.folder_type()
dest_type = dest_folder.folder_type()
sync_type = '%s-to-%s' % (source_type, dest_type)
if (source_type, dest_type) not in [('b2', 'local'), ('local', 'b2')]:
raise NotImplementedError("Sync support only local-to-b2 and b2-to-local")
if source_type != 'b2' and dest_type != 'b2':
raise NotImplementedError('Sync between two local folders is not supported!')

for source_file, dest_file in zip_folders(
source_folder,
Expand Down
12 changes: 7 additions & 5 deletions b2sdk/transfer/outbound/copy_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@

class CopySource(OutboundTransferSource):
def __init__(self, file_id, offset=0, length=None):
if length is None and offset > 0:
raise ValueError('Cannot copy with non zero offset and unknown length')
if not length and offset > 0:
raise ValueError('Cannot copy with non zero offset and unknown or zero length')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? Can't we copy a zero-length file?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, but we'd rather not. Uploads are A-class transactions (free), but b2_copy_file are C-class transactions (2500/day for free, then $0.004 per 1,000 calls), so since we are not going to save much bandwidth on transfer, it's better to re-upload the "whole" file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an emerger code. See my previous comment.

self.file_id = file_id
self.length = length
self.offset = offset
Expand All @@ -38,13 +38,15 @@ def is_copy(self):
return True

def get_bytes_range(self):
if self.length is None:
if not self.length:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here, too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

if self.offset > 0:
# auto mode should get file info and create correct copy source (with length)
raise ValueError('cannot return bytes range for non zero offset and unknown length')
raise ValueError(
'cannot return bytes range for non zero offset and unknown or zero length'
)
return None

return (self.offset, self.offset + self.length - 1)
return self.offset, self.offset + self.length - 1

def get_copy_source_range(self, relative_offset, range_length):
if self.length is not None and range_length + relative_offset > self.length:
Expand Down
5 changes: 0 additions & 5 deletions test/unit/v0/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -832,11 +832,6 @@ def test_file_exclusions_inclusions_with_delete(self):


class TestMakeSyncActions(TestSync):
def test_illegal_b2_to_b2(self):
b2_folder = FakeFolder('b2', [])
with self.assertRaises(NotImplementedError):
list(make_folder_sync_actions(b2_folder, b2_folder, FakeArgs(), 0, self.reporter))

def test_illegal_local_to_local(self):
local_folder = FakeFolder('local', [])
with self.assertRaises(NotImplementedError):
Expand Down
7 changes: 0 additions & 7 deletions test/unit/v1/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -862,13 +862,6 @@ class IllegalEnum(Enum):


class TestMakeSyncActions(TestSync):
def test_illegal_b2_to_b2(self):
b2_folder = FakeFolder('b2', [])
with self.assertRaises(NotImplementedError):
fakeargs = FakeArgs()
syncronizer = fakeargs.get_synchronizer()
list(syncronizer.make_folder_sync_actions(b2_folder, b2_folder, 0, self.reporter))

def test_illegal_local_to_local(self):
local_folder = FakeFolder('local', [])
with self.assertRaises(NotImplementedError):
Expand Down