diff --git a/streamflow/data/remotepath.py b/streamflow/data/remotepath.py index c1ab9db0..29f198f2 100644 --- a/streamflow/data/remotepath.py +++ b/streamflow/data/remotepath.py @@ -56,6 +56,22 @@ def _get_filename_from_response(response: ClientResponse, url: str): return url.rsplit("/", 1)[-1] +def _get_inner_path( + context: StreamFlowContext, location: ExecutionLocation, path: StreamFlowPath +) -> StreamFlowPath: + for mount in sorted(location.mounts.keys(), reverse=True): + if path.is_relative_to(mount): + return _get_inner_path( + context=context, + location=location.wraps, + path=StreamFlowPath( + location.mounts[mount], context=context, location=location.wraps + ) + / path.relative_to(mount), + ) + return path + + async def _size( context: StreamFlowContext, location: ExecutionLocation | None, @@ -330,7 +346,8 @@ async def read_text(self, n=-1, encoding=None, errors=None) -> str: async def resolve(self, strict=False) -> LocalStreamFlowPath | None: if await self.exists(): return LocalStreamFlowPath( - super().resolve(strict=strict), context=self.context + super().resolve(strict=strict), + context=self.context, ) else: return None @@ -398,6 +415,14 @@ def __init__(self, *args, context: StreamFlowContext, location: ExecutionLocatio location.deployment ) self.location: ExecutionLocation = location + self._inner_path: StreamFlowPath | None = None + + def _get_inner_path(self) -> StreamFlowPath: + if self._inner_path is None: + self._inner_path = _get_inner_path( + context=self.context, location=self.location, path=self + ) + return self._inner_path async def _test(self, command: list[str]) -> bool: command = ["test"] + command @@ -414,36 +439,36 @@ async def _test(self, command: list[str]) -> bool: return not status async def checksum(self): - command = [ - "test", - "-f", - f"'{self.__str__()}'", - "&&", - "sha1sum", - f"'{self.__str__()}'", - "|", - "awk", - "'{print $1}'", - ] - result, status = await self.connector.run( - location=self.location, command=command, capture_output=True - ) - if status > 1: - raise WorkflowExecutionException( - "{} Command '{}' on location {}: {}".format( - status, command, self.location, result - ) - ) - return result.strip() - - async def exists(self, *, follow_symlinks=True) -> bool: - return await self._test( - command=( - ["-e", f"'{self.__str__()}'"] - if follow_symlinks - else ["-e", f"'{self.__str__()}'", "-o", "-L", f"'{self.__str__()}'"] + if (inner_path := self._get_inner_path()) != self: + return await inner_path.checksum() + else: + command = [ + "test", + "-f", + f"'{self.__str__()}'", + "&&", + "sha1sum", + f"'{self.__str__()}'", + "|", + "awk", + "'{print $1}'", + ] + result, status = await self.connector.run( + location=self.location, command=command, capture_output=True ) - ) + if status > 1: + raise WorkflowExecutionException( + "{} Command '{}' on location {}: {}".format( + status, command, self.location, result + ) + ) + return result.strip() + + async def exists(self) -> bool: + if (inner_path := self._get_inner_path()) != self: + return await inner_path.exists() + else: + return await self._test(command=(["-e", f"'{self.__str__()}'"])) async def glob( self, pattern, *, case_sensitive=None @@ -474,34 +499,49 @@ async def glob( ) async def is_dir(self) -> bool: - return await self._test(command=["-d", f"'{self.__str__()}'"]) + if (inner_path := self._get_inner_path()) != self: + return await inner_path.is_dir() + else: + return await self._test(command=["-d", f"'{self.__str__()}'"]) async def is_file(self) -> bool: - return await self._test(command=["-f", f"'{self.__str__()}'"]) + if (inner_path := self._get_inner_path()) != self: + return await inner_path.is_file() + else: + return await self._test(command=["-f", f"'{self.__str__()}'"]) async def is_symlink(self) -> bool: - return await self._test(command=["-L", f"'{self.__str__()}'"]) + if (inner_path := self._get_inner_path()) != self: + return await inner_path.is_symlink() + else: + return await self._test(command=["-L", f"'{self.__str__()}'"]) async def mkdir(self, mode=0o777, parents=False, exist_ok=False) -> None: - command = ["mkdir", "-m", f"{mode:o}"] - if parents or exist_ok: - command.append("-p") - command.append(self.__str__()) - result, status = await self.connector.run( - location=self.location, command=command, capture_output=True - ) - _check_status(command, self.location, result, status) + if (inner_path := self._get_inner_path()) != self: + await inner_path.mkdir(mode=mode, parents=parents, exist_ok=exist_ok) + else: + command = ["mkdir", "-m", f"{mode:o}"] + if parents or exist_ok: + command.append("-p") + command.append(self.__str__()) + result, status = await self.connector.run( + location=self.location, command=command, capture_output=True + ) + _check_status(command, self.location, result, status) async def read_text(self, n=-1, encoding=None, errors=None) -> str: - command = ["head", "-c", str(n)] if n >= 0 else ["cat"] - command.append(self.__str__()) - result, status = await self.connector.run( - location=self.location, command=command, capture_output=True - ) - _check_status(command, self.location, result, status) - return result.strip() + if (inner_path := self._get_inner_path()) != self: + return await inner_path.read_text(n=n, encoding=encoding, errors=errors) + else: + command = ["head", "-c", str(n)] if n >= 0 else ["cat"] + command.append(self.__str__()) + result, status = await self.connector.run( + location=self.location, command=command, capture_output=True + ) + _check_status(command, self.location, result, status) + return result.strip() - async def resolve(self, strict=False) -> RemoteStreamFlowPath | None: + async def resolve(self, strict=False) -> StreamFlowPath | None: # If at least one primary location is present on the site, return its path if locations := self.context.data_manager.get_data_locations( path=self.__str__(), @@ -509,7 +549,7 @@ async def resolve(self, strict=False) -> RemoteStreamFlowPath | None: location_name=self.location.name, data_type=DataType.PRIMARY, ): - return RemoteStreamFlowPath( + return StreamFlowPath( next(iter(locations)).path, context=self.context, location=next(iter(locations)).location, @@ -542,36 +582,45 @@ async def resolve(self, strict=False) -> RemoteStreamFlowPath | None: ) async def rmtree(self) -> None: - command = ["rm", "-rf ", self.__str__()] - result, status = await self.connector.run( - location=self.location, command=command, capture_output=True - ) - _check_status(command, self.location, result, status) + if (inner_path := self._get_inner_path()) != self: + await inner_path.rmtree() + else: + command = ["rm", "-rf ", self.__str__()] + result, status = await self.connector.run( + location=self.location, command=command, capture_output=True + ) + _check_status(command, self.location, result, status) async def size(self) -> int: - command = [ - "".join( - [ - "find -L ", - f'"{self.__str__()}"', - " -type f -exec ls -ln {} \\+ | ", - "awk 'BEGIN {sum=0} {sum+=$5} END {print sum}'; ", - ] + if (inner_path := self._get_inner_path()) != self: + return await inner_path.size() + else: + command = [ + "".join( + [ + "find -L ", + f'"{self.__str__()}"', + " -type f -exec ls -ln {} \\+ | ", + "awk 'BEGIN {sum=0} {sum+=$5} END {print sum}'; ", + ] + ) + ] + result, status = await self.connector.run( + location=self.location, command=command, capture_output=True ) - ] - result, status = await self.connector.run( - location=self.location, command=command, capture_output=True - ) - _check_status(command, self.location, result, status) - result = result.strip().strip("'\"") - return int(result) if result.isdigit() else 0 + _check_status(command, self.location, result, status) + result = result.strip().strip("'\"") + return int(result) if result.isdigit() else 0 async def symlink_to(self, target, target_is_directory=False) -> None: - command = ["ln", "-snf", str(target), self.__str__()] - result, status = await self.connector.run( - location=self.location, command=command, capture_output=True - ) - _check_status(command, self.location, result, status) + if (inner_path := self._get_inner_path()) != self: + await inner_path.symlink_to(target, target_is_directory=target_is_directory) + else: + command = ["ln", "-snf", str(target), self.__str__()] + result, status = await self.connector.run( + location=self.location, command=command, capture_output=True + ) + _check_status(command, self.location, result, status) async def walk( self, top_down=True, on_error=None, follow_symlinks=False @@ -582,87 +631,96 @@ async def walk( MutableSequence[str], ] ]: - paths = [self] - while paths: - path = paths.pop() - if isinstance(path, tuple): - yield path - continue - command = ["find"] - if follow_symlinks: - command.append("-L") - command.extend([f"'{str(path)}'", "-mindepth", "1", "-maxdepth", "1"]) - try: - content, status = await self.connector.run( - location=self.location, - command=command + ["-type", "d"], - capture_output=True, - ) - _check_status(command, self.location, content, status) - content = content.strip(" \n") - dirnames = ( - [ - str( - RemoteStreamFlowPath( - p, context=self.context, location=self.location - ).relative_to(path) - ) - for p in content.splitlines() - ] - if content - else [] - ) - content, status = await self.connector.run( - location=self.location, - command=command + ["-type", "f"], - capture_output=True, - ) - _check_status(command, self.location, content, status) - content = content.strip(" \n") - filenames = ( - [ - str( - RemoteStreamFlowPath( - p, context=self.context, location=self.location - ).relative_to(path) - ) - for p in content.splitlines() - ] - if content - else [] - ) - except WorkflowExecutionException as error: - if on_error is not None: - on_error(error) - continue - - if top_down: + if (inner_path := self._get_inner_path()) != self: + async for path, dirnames, filenames in inner_path.walk( + top_down=top_down, on_error=on_error, follow_symlinks=follow_symlinks + ): yield path, dirnames, filenames - else: - paths.append((path, dirnames, filenames)) - paths += [path._make_child_relpath(d) for d in reversed(dirnames)] + else: + paths = [self] + while paths: + path = paths.pop() + if isinstance(path, tuple): + yield path + continue + command = ["find"] + if follow_symlinks: + command.append("-L") + command.extend([f"'{str(path)}'", "-mindepth", "1", "-maxdepth", "1"]) + try: + content, status = await self.connector.run( + location=self.location, + command=command + ["-type", "d"], + capture_output=True, + ) + _check_status(command, self.location, content, status) + content = content.strip(" \n") + dirnames = ( + [ + str( + RemoteStreamFlowPath( + p, context=self.context, location=self.location + ).relative_to(path) + ) + for p in content.splitlines() + ] + if content + else [] + ) + content, status = await self.connector.run( + location=self.location, + command=command + ["-type", "f"], + capture_output=True, + ) + _check_status(command, self.location, content, status) + content = content.strip(" \n") + filenames = ( + [ + str( + RemoteStreamFlowPath( + p, context=self.context, location=self.location + ).relative_to(path) + ) + for p in content.splitlines() + ] + if content + else [] + ) + except WorkflowExecutionException as error: + if on_error is not None: + on_error(error) + continue + + if top_down: + yield path, dirnames, filenames + else: + paths.append((path, dirnames, filenames)) + paths += [path._make_child_relpath(d) for d in reversed(dirnames)] def with_segments(self, *pathsegments): return type(self)(*pathsegments, context=self.context, location=self.location) async def write_text(self, data: str, **kwargs) -> int: - if not isinstance(data, str): - raise TypeError("data must be str, not %s" % data.__class__.__name__) - command = [ - "echo", - base64.b64encode(data.encode("utf-8")).decode("utf-8"), - "|", - "base64", - "-d", - ] - result, status = await self.connector.run( - location=self.location, - command=command, - stdout=self.__str__(), - capture_output=True, - ) - _check_status(command, self.location, result, status) - return len(data) + if (inner_path := self._get_inner_path()) != self: + return await inner_path.write_text(data, **kwargs) + else: + if not isinstance(data, str): + raise TypeError("data must be str, not %s" % data.__class__.__name__) + command = [ + "echo", + base64.b64encode(data.encode("utf-8")).decode("utf-8"), + "|", + "base64", + "-d", + ] + result, status = await self.connector.run( + location=self.location, + command=command, + stdout=self.__str__(), + capture_output=True, + ) + _check_status(command, self.location, result, status) + return len(data) async def download(