Skip to content

Commit

Permalink
Support sync of a single file
Browse files Browse the repository at this point in the history
  • Loading branch information
ylow committed Feb 6, 2024
1 parent 853af45 commit d2a0068
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 10 deletions.
2 changes: 2 additions & 0 deletions python/pyxet/pyxet/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ def sync(source: Annotated[str, typer.Argument(help="Source folder to sync")],
print(f"Checking sync")
cmd.validate()
print(f"Starting sync")
if dryrun:
print("This is a dryrun")
stats = cmd.run()
if not dryrun:
print(f"Completed sync. Copied: {stats.copied} files, ignored: {stats.ignored} files")
Expand Down
38 changes: 28 additions & 10 deletions python/pyxet/pyxet/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from datetime import datetime
from functools import partial

from pyxet.util import _get_fs_and_path, _isdir, _rel_path, _path_join, _path_dirname, _is_illegal_subdirectory_file_name
from pyxet.util import _get_fs_and_path, _isdir, _rel_path, _path_join, _path_split, _path_dirname, _is_illegal_subdirectory_file_name
from pyxet.file_operations import _single_file_copy_impl, CopyUnit

XET_MTIME_FORMAT = '%Y-%m-%dT%H:%M:%S%z'
Expand Down Expand Up @@ -41,9 +41,6 @@ def validate(self):
# TODO: we may want to be able to sync remote location to a new branch?
self._dest_fs.branch_info(self._dest_root)

# source should be a directory
if not _isdir(self._src_fs, self._src_root):
raise ValueError(f"source: {self._src_root} needs to be a directory")
# s3 needs a bucket
if self._src_proto == 's3' and (self._src_root == '/' or self._src_root == ''):
raise ValueError(f"S3 source needs a specified bucket")
Expand Down Expand Up @@ -93,25 +90,44 @@ def _sync_with_ls(self, executor, futures, src_path, dest_path):
Note that ls on xet-fs doesn't return an mtime and thus, will not be able to compare
on that field.
"""

try:
dest_files = self._dest_fs.find(dest_path, detail=True)
except RuntimeError:
dest_files = {}
total_size = 0
for abs_path, src_info in self._src_fs.find(src_path, detail=True).items():
relpath = _rel_path(abs_path, src_path)

if _is_illegal_subdirectory_file_name(relpath):
print(f"{abs_path} is an invalid file (not copied).")
continue

dest_for_this_path = _path_join(self._dest_fs, dest_path, relpath)
if not _isdir(self._src_fs, src_path):
# syncing a single file
src_info = self._src_fs.info(src_path)
abs_path = src_info['name']
# get the last component name. This is the filename
fname = _path_split(self._src_fs, abs_path)[-1]
# Tack it on to the end of the dest path to make the output name
dest_for_this_path = _path_join(self._dest_fs, dest_path, fname)
dest_info = dest_files.get(dest_for_this_path)

partial_func = partial(self._sync_file_task, abs_path, src_info, dest_for_this_path, dest_info)
futures.append(executor.submit(partial_func))
total_size += src_info.get('size', 0)

else:
# syncing a folder
for abs_path, src_info in self._src_fs.find(src_path, detail=True).items():
relpath = _rel_path(abs_path, src_path)

if _is_illegal_subdirectory_file_name(relpath):
print(f"{abs_path} is an invalid file (not copied).")
continue

dest_for_this_path = _path_join(self._dest_fs, dest_path, relpath)
dest_info = dest_files.get(dest_for_this_path)

partial_func = partial(self._sync_file_task, abs_path, src_info, dest_for_this_path, dest_info)
futures.append(executor.submit(partial_func))
total_size += src_info.get('size', 0)

if self._update_size:
self._update_remote_size(total_size)

Expand Down Expand Up @@ -164,6 +180,8 @@ def _sync_file_task(self, src_path, src_info, dest_path, dest_info):
dest_dir = _path_dirname(self._dest_fs, dest_path)
cp_copy = CopyUnit(src_path=src_path, dest_path=dest_path, dest_dir = dest_dir, size = size)
_single_file_copy_impl(cp_copy, self._src_fs, self._dest_fs)
else:
print(f"Copying {src_path} to {dest_path}")
return True
# ignored
return False
Expand Down

0 comments on commit d2a0068

Please sign in to comment.