Skip to content

Commit

Permalink
added sync_from_fileserver and sync_to_fileserver
Browse files Browse the repository at this point in the history
closes issue #5
  • Loading branch information
thawn committed Oct 19, 2022
1 parent e75762e commit 53e689a
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 50 deletions.
152 changes: 112 additions & 40 deletions biapol_taurus/_project_file_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,11 +430,45 @@ def save_file(self, save_function: callable, filename: str, *args, target: str =
return save_to_project(save_function, str(target_path), *args, cache_workspace=self.cache,
path_to_datamover=self.datamover.path_to_exe, path_to_workspace_tools=self.workspace_exe_path, quiet=self.quiet, **kw)

def sync_with_fileserver(self, direction: str = 'from fileserver', delete: bool = False,
overwrite_newer: bool = False, im_sure: bool = False, dry_run: bool = False, background: bool = True):
'''Synchronize a whole directory tree with the fileserver (using rsync). By default, Does not delete files, but overwrites existing files if they are older.
def sync_from_fileserver(self, delete: bool = False, overwrite_newer: bool = False,
im_sure: bool = False, dry_run: bool = False, background: bool = True):
'''Synchronize a whole directory tree from the fileserver to the project space (using rsync). By default, does not delete files, but overwrites existing files if they are older.
By default, we sync from the fileserver to the project space (direction='from fileserver'). If you want to synchronize from the project space to the fileserver, use direction='to fileserver'.
Beware that this recursively copies _all_ the data. So if you have an error in source mount or target project space, this might create a mess.
If you are unsure, first call the method with dry_run=True. That way, no data will be transferred, and you can check the output.
This behavior is enforced for dangerous operations that might cause data loss.
Dangerous operations are:
1. setting delete=True
2. setting overwrite_newer=True
3. syncing the entire fileserver with the entire project space.
Parameters
----------
delete : bool, optional
Delete files that do not exist on the fileserver on the target project space (add rsync --delete flag), by default False
overwrite_newer : bool, optional
Overwrite files on the target project space even if they are newer (removes rsync -u flag), by default False
im_sure : bool, optional
Confirm that you are sure and skip the dry-run for dangerous operations, by default False
dry_run : bool, optional
Enforce a dry-run (add rsunc -n flag), by default False
background : bool, optional
Run in the background and do not wait until the sync is complete, by default True
Returns
-------
subprocess.CompletedProcess object (if bacground=True (default))
the CompletedProcess object created by subprocess.Popen. This can be used to retrieve the command output with the communicate() method: https://docs.python.org/3/library/subprocess.html
tuple of strings (if background=False)
the first element of the tuple is the standard output (stdout) of the process, the second element is the error (stderr).
'''

return self._sync(direction='from fileserver', delete=delete,
overwrite_newer=overwrite_newer, im_sure=im_sure, dry_run=dry_run)

def sync_to_fileserver(self, delete: bool = False, overwrite_newer: bool = False,
im_sure: bool = False, dry_run: bool = False, background: bool = True):
'''Synchronize a whole directory tree from the project space on the cluster to the fileserver (using rsync). By default, does not delete files, but overwrites existing files if they are older.
Beware that this recursively copies _all_ the data. So if you have an error in source mount or target project space, this might create a mess.
If you are unsure, first call the method with dry_run=True. That way, no data will be transferred, and you can check the output.
Expand All @@ -446,8 +480,6 @@ def sync_with_fileserver(self, direction: str = 'from fileserver', delete: bool
Parameters
----------
direction : str, optional
The direction in which to sync, by default 'from fileserver'
delete : bool, optional
Delete files that do not exist on the fileserver on the target project space (add rsync --delete flag), by default False
overwrite_newer : bool, optional
Expand All @@ -466,40 +498,9 @@ def sync_with_fileserver(self, direction: str = 'from fileserver', delete: bool
tuple of strings (if background=False)
the first element of the tuple is the standard output (stdout) of the process, the second element is the error (stderr).
'''
if overwrite_newer:
options = ['-av']
else:
options = ['-auv']
if delete:
options.append('--delete')
if direction == 'from_fileserver' or direction == 'from fileserver':
options.append(str(self.source_fileserver_dir) + '/')
options.append(str(self.target_project_space_dir))
else:
options.append(str(self.target_project_space_dir) + '/')
options.append(str(self.source_fileserver_dir))
confirmation_required = delete or overwrite_newer
if self.target_project_space_dir.parent == list(self.target_project_space_dir.parents)[
-2] and self.source_fileserver_dir.parent == list(self.source_fileserver_dir.parents)[-2]:
# syncing the entire fileserver directly into target_project_space_dir
# requires confirmation because it might affect data of other users of the
# same project space
confirmation_required = True
if dry_run:
confirmation_required = False # dry runs are never dangerous
options[0] += 'n'
if confirmation_required and not im_sure:
warnings.warn(
'What you are trying to do requires confirmation. Enforcing dry-run...')
options[0] += 'n'
process = self.datamover.dtrsync(*options)
waitfor(process, discard_output=False, quiet=self.quiet)
out, _ = process.communicate()
raise ConfirmationRequiredException(
'If you are sure you know what you are doing, call this method again with te keyword argument "im_sure=True".\nBut before you do that, please carefully check the output of the dry-run and make sure that is what you intended: {}'.format(out))
process = self.datamover.dtrsync(*options)
waitfor(process, discard_output=False, quiet=self.quiet)
return process.communicate()

return self._sync(direction='to fileserver', delete=delete,
overwrite_newer=overwrite_newer, im_sure=im_sure, dry_run=dry_run)

def get_file(self, filename: str, timeout_in_s: float = -1,
wait_for_finish: bool = True) -> Path:
Expand Down Expand Up @@ -621,6 +622,77 @@ def remove_file(self, filename, wait_for_finish: bool = False):

return exit_code

def _sync(self, direction: str = 'from fileserver', delete: bool = False,
overwrite_newer: bool = False, im_sure: bool = False, dry_run: bool = False, background: bool = True):
'''Synchronize a whole directory tree with the fileserver (using rsync). By default, Does not delete files, but overwrites existing files if they are older.
By default, we sync from the fileserver to the project space (direction='from fileserver'). If you want to synchronize from the project space to the fileserver, use direction='to fileserver'.
Beware that this recursively copies _all_ the data. So if you have an error in source mount or target project space, this might create a mess.
If you are unsure, first call the method with dry_run=True. That way, no data will be transferred, and you can check the output.
This behavior is enforced for dangerous operations that might cause data loss.
Dangerous operations are:
1. setting delete=True
2. setting overwrite_newer=True
3. syncing the entire fileserver with the entire project space.
Parameters
----------
direction : str, optional
The direction in which to sync, by default 'from fileserver'
delete : bool, optional
Delete files that do not exist on the fileserver on the target project space (add rsync --delete flag), by default False
overwrite_newer : bool, optional
Overwrite files on the target project space even if they are newer (removes rsync -u flag), by default False
im_sure : bool, optional
Confirm that you are sure and skip the dry-run for dangerous operations, by default False
dry_run : bool, optional
Enforce a dry-run (add rsunc -n flag), by default False
background : bool, optional
Run in the background and do not wait until the sync is complete, by default True
Returns
-------
subprocess.CompletedProcess object (if bacground=True (default))
the CompletedProcess object created by subprocess.Popen. This can be used to retrieve the command output with the communicate() method: https://docs.python.org/3/library/subprocess.html
tuple of strings (if background=False)
the first element of the tuple is the standard output (stdout) of the process, the second element is the error (stderr).
'''
if overwrite_newer:
options = ['-av']
else:
options = ['-auv']
if delete:
options.append('--delete')
if direction == 'from_fileserver' or direction == 'from fileserver':
options.append(str(self.source_fileserver_dir) + '/')
options.append(str(self.target_project_space_dir))
else:
options.append(str(self.target_project_space_dir) + '/')
options.append(str(self.source_fileserver_dir))
confirmation_required = delete or overwrite_newer
if self.target_project_space_dir.parent == list(self.target_project_space_dir.parents)[
-2] and self.source_fileserver_dir.parent == list(self.source_fileserver_dir.parents)[-2]:
# syncing the entire fileserver directly into target_project_space_dir
# requires confirmation because it might affect data of other users of the
# same project space
confirmation_required = True
if dry_run:
confirmation_required = False # dry runs are never dangerous
options[0] += 'n'
if confirmation_required and not im_sure:
warnings.warn(
'What you are trying to do requires confirmation. Enforcing dry-run...')
options[0] += 'n'
process = self.datamover.dtrsync(*options)
waitfor(process, discard_output=False, quiet=self.quiet)
out, _ = process.communicate()
raise ConfirmationRequiredException(
'If you are sure you know what you are doing, call this method again with te keyword argument "im_sure=True".\nBut before you do that, please carefully check the output of the dry-run and make sure that is what you intended: {}'.format(out))
process = self.datamover.dtrsync(*options)
waitfor(process, discard_output=False, quiet=self.quiet)
return process.communicate()

def _initialize_tmp(self):
'''Delete all temporary data and create a new, empty temp directory.
'''
Expand Down
18 changes: 8 additions & 10 deletions tests/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ def test_ensure_project_dir_exists(self):
self.assertTrue(self.project_userdir.exists())

def test_sync_from_fileserver(self):
self.pft.sync_with_fileserver()
self.pft.sync_from_fileserver()
self.assertTrue((self.project_userdir / 'testdata.npy').exists())
self.assertTrue((self.project_userdir / 'testimage.tif').exists())
numpy_data = np.load(self.project_userdir / 'testdata.npy')
self.assertTrue(np.array_equal(self.testdata, numpy_data))

def test_sync_to_fileserver(self):
np.save(self.project_userdir / 'testdata_new.npy', self.testdata, allow_pickle=False)
self.pft.sync_with_fileserver(direction='to fileserver')
self.pft.sync_to_fileserver()
self.assertTrue((self.fileserver_userdir / 'testdata_new.npy').exists())
self.assertTrue((self.fileserver_userdir / 'testdata.npy').exists())
self.assertTrue((self.fileserver_userdir / 'testimage.tif').exists())
Expand All @@ -74,22 +74,21 @@ def test_sync_to_fileserver(self):
def test_sync_delete_not_confirmed(self):
self.assertRaises(
biapol_taurus.ConfirmationRequiredException,
self.pft.sync_with_fileserver,
direction='to fileserver',
self.pft.sync_to_fileserver,
delete=True)
self.assertTrue((self.fileserver_userdir / 'testdata.npy').exists())
self.assertTrue((self.fileserver_userdir / 'testimage.tif').exists())

def test_sync_delete_confirmed(self):
self.pft.sync_with_fileserver(direction='to fileserver', delete=True, im_sure=True)
self.pft.sync_to_fileserver(delete=True, im_sure=True)
self.assertFalse((self.fileserver_userdir / 'testdata.npy').exists())
self.assertFalse((self.fileserver_userdir / 'testimage.tif').exists())

def test_sync_overwrite_older(self):
new_testdata = np.random.randn(64, 64)
sleep(1) # ensure that the source testdata file is at least 1s newer than the target file
np.save(self.project_userdir / 'testdata.npy', new_testdata, allow_pickle=False)
self.pft.sync_with_fileserver(direction='to fileserver')
self.pft.sync_to_fileserver()
self.assertTrue((self.fileserver_userdir / 'testdata.npy').exists())
self.assertTrue((self.fileserver_userdir / 'testimage.tif').exists())
numpy_data = np.load(self.fileserver_userdir / 'testdata.npy')
Expand All @@ -101,7 +100,7 @@ def test_sync_not_overwrite_newer(self):
np.save(self.project_userdir / 'testdata.npy', new_testdata, allow_pickle=False)
sleep(1) # ensure that the target testdata file is at least 1s newer than the source file
np.save(self.fileserver_userdir / 'testdata.npy', self.testdata, allow_pickle=False)
self.pft.sync_with_fileserver(direction='to fileserver')
self.pft.sync_to_fileserver()
self.assertTrue((self.fileserver_userdir / 'testdata.npy').exists())
self.assertTrue((self.fileserver_userdir / 'testimage.tif').exists())
numpy_data = np.load(self.fileserver_userdir / 'testdata.npy')
Expand All @@ -113,8 +112,7 @@ def test_sync_not_overwrite_newer_unconfirmed(self):
np.save(self.project_userdir / 'testdata.npy', new_testdata, allow_pickle=False)
self.assertRaises(
biapol_taurus.ConfirmationRequiredException,
self.pft.sync_with_fileserver,
direction='to fileserver',
self.pft.sync_to_fileserver,
overwrite_newer=True)
self.assertTrue((self.fileserver_userdir / 'testdata.npy').exists())
self.assertTrue((self.fileserver_userdir / 'testimage.tif').exists())
Expand All @@ -127,7 +125,7 @@ def test_sync_overwrite_newer_confirmed(self):
np.save(self.project_userdir / 'testdata.npy', new_testdata, allow_pickle=False)
sleep(1) # ensure that the target testdata file is at least 1s newer than the source file
np.save(self.fileserver_userdir / 'testdata.npy', self.testdata, allow_pickle=False)
self.pft.sync_with_fileserver(direction='to fileserver', overwrite_newer=True, im_sure=True)
self.pft.sync_to_fileserver(overwrite_newer=True, im_sure=True)
self.assertTrue((self.fileserver_userdir / 'testdata.npy').exists())
self.assertTrue((self.fileserver_userdir / 'testimage.tif').exists())
numpy_data = np.load(self.fileserver_userdir / 'testdata.npy')
Expand Down

0 comments on commit 53e689a

Please sign in to comment.