Skip to content

Commit e42d621

Browse files
[serve][llm] Ray LLM Cloud Filesystem Restructuring: Provider-Specific Implementations (#58469)
Signed-off-by: ahao-anyscale <ahao@anyscale.com>
1 parent fa625a6 commit e42d621

File tree

15 files changed

+2787
-1469
lines changed

15 files changed

+2787
-1469
lines changed

python/ray/llm/_internal/common/callbacks/cloud_downloader.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,7 @@ def on_before_download_model_files_distributed(self) -> None:
7979
paths = self.kwargs["paths"]
8080
start_time = time.monotonic()
8181
for cloud_uri, local_path in paths:
82-
CloudFileSystem.download_files_parallel(
83-
path=local_path, bucket_uri=cloud_uri
84-
)
82+
CloudFileSystem.download_files(path=local_path, bucket_uri=cloud_uri)
8583
end_time = time.monotonic()
8684
logger.info(
8785
f"CloudDownloader: Files downloaded in {end_time - start_time} seconds"
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
"""Cloud filesystem module for provider-specific implementations.
2+
3+
This module provides a unified interface for cloud storage operations across
4+
different providers (S3, GCS, Azure) while allowing provider-specific optimizations.
5+
"""
6+
7+
from ray.llm._internal.common.utils.cloud_filesystem.azure_filesystem import (
8+
AzureFileSystem,
9+
)
10+
from ray.llm._internal.common.utils.cloud_filesystem.base import (
11+
BaseCloudFileSystem,
12+
)
13+
from ray.llm._internal.common.utils.cloud_filesystem.gcs_filesystem import (
14+
GCSFileSystem,
15+
)
16+
from ray.llm._internal.common.utils.cloud_filesystem.pyarrow_filesystem import (
17+
PyArrowFileSystem,
18+
)
19+
from ray.llm._internal.common.utils.cloud_filesystem.s3_filesystem import (
20+
S3FileSystem,
21+
)
22+
23+
__all__ = [
24+
"BaseCloudFileSystem",
25+
"PyArrowFileSystem",
26+
"GCSFileSystem",
27+
"AzureFileSystem",
28+
"S3FileSystem",
29+
]
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
"""Azure-specific filesystem implementation.
2+
3+
This module provides an Azure-specific implementation that delegates to PyArrowFileSystem.
4+
This maintains backward compatibility while allowing for future optimizations using
5+
native Azure tools (azcopy, azure-storage-blob SDK).
6+
"""
7+
8+
from typing import List, Optional, Union
9+
10+
from ray.llm._internal.common.utils.cloud_filesystem.base import BaseCloudFileSystem
11+
from ray.llm._internal.common.utils.cloud_filesystem.pyarrow_filesystem import (
12+
PyArrowFileSystem,
13+
)
14+
15+
16+
class AzureFileSystem(BaseCloudFileSystem):
17+
"""Azure-specific implementation of cloud filesystem operations.
18+
19+
**Note**: This implementation currently delegates to PyArrowFileSystem to maintain
20+
stability. Optimized implementation using azure-storage-blob SDK and azcopy
21+
will be added in a future PR.
22+
"""
23+
24+
@staticmethod
25+
def get_file(
26+
object_uri: str, decode_as_utf_8: bool = True
27+
) -> Optional[Union[str, bytes]]:
28+
"""Download a file from cloud storage into memory.
29+
30+
Args:
31+
object_uri: URI of the file (abfss:// or azure://)
32+
decode_as_utf_8: If True, decode the file as UTF-8
33+
34+
Returns:
35+
File contents as string or bytes, or None if file doesn't exist
36+
"""
37+
return PyArrowFileSystem.get_file(object_uri, decode_as_utf_8)
38+
39+
@staticmethod
40+
def list_subfolders(folder_uri: str) -> List[str]:
41+
"""List the immediate subfolders in a cloud directory.
42+
43+
Args:
44+
folder_uri: URI of the directory (abfss:// or azure://)
45+
46+
Returns:
47+
List of subfolder names (without trailing slashes)
48+
"""
49+
return PyArrowFileSystem.list_subfolders(folder_uri)
50+
51+
@staticmethod
52+
def download_files(
53+
path: str,
54+
bucket_uri: str,
55+
substrings_to_include: Optional[List[str]] = None,
56+
suffixes_to_exclude: Optional[List[str]] = None,
57+
) -> None:
58+
"""Download files from cloud storage to a local directory.
59+
60+
Args:
61+
path: Local directory where files will be downloaded
62+
bucket_uri: URI of cloud directory
63+
substrings_to_include: Only include files containing these substrings
64+
suffixes_to_exclude: Exclude certain files from download (e.g .safetensors)
65+
"""
66+
PyArrowFileSystem.download_files(
67+
path, bucket_uri, substrings_to_include, suffixes_to_exclude
68+
)
69+
70+
@staticmethod
71+
def upload_files(
72+
local_path: str,
73+
bucket_uri: str,
74+
) -> None:
75+
"""Upload files to cloud storage.
76+
77+
Args:
78+
local_path: The local path of the files to upload.
79+
bucket_uri: The bucket uri to upload the files to, must start with
80+
`abfss://` or `azure://`.
81+
"""
82+
PyArrowFileSystem.upload_files(local_path, bucket_uri)
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
"""Abstract base class for cloud filesystem implementations.
2+
3+
This module defines the interface that all cloud storage provider implementations
4+
must follow, ensuring consistency across different providers while allowing
5+
provider-specific optimizations.
6+
"""
7+
8+
from abc import ABC, abstractmethod
9+
from typing import List, Optional, Union
10+
11+
12+
class BaseCloudFileSystem(ABC):
13+
"""Abstract base class for cloud filesystem implementations.
14+
15+
This class defines the interface that all cloud storage provider implementations
16+
must implement. Provider-specific classes (S3FileSystem, GCSFileSystem, etc.)
17+
will inherit from this base class and provide optimized implementations for
18+
their respective cloud storage platforms.
19+
"""
20+
21+
@staticmethod
22+
@abstractmethod
23+
def get_file(
24+
object_uri: str, decode_as_utf_8: bool = True
25+
) -> Optional[Union[str, bytes]]:
26+
"""Download a file from cloud storage into memory.
27+
28+
Args:
29+
object_uri: URI of the file (s3://, gs://, abfss://, or azure://)
30+
decode_as_utf_8: If True, decode the file as UTF-8
31+
32+
Returns:
33+
File contents as string or bytes, or None if file doesn't exist
34+
"""
35+
pass
36+
37+
@staticmethod
38+
@abstractmethod
39+
def list_subfolders(folder_uri: str) -> List[str]:
40+
"""List the immediate subfolders in a cloud directory.
41+
42+
Args:
43+
folder_uri: URI of the directory (s3://, gs://, abfss://, or azure://)
44+
45+
Returns:
46+
List of subfolder names (without trailing slashes)
47+
"""
48+
pass
49+
50+
@staticmethod
51+
@abstractmethod
52+
def download_files(
53+
path: str,
54+
bucket_uri: str,
55+
substrings_to_include: Optional[List[str]] = None,
56+
suffixes_to_exclude: Optional[List[str]] = None,
57+
) -> None:
58+
"""Download files from cloud storage to a local directory.
59+
60+
Args:
61+
path: Local directory where files will be downloaded
62+
bucket_uri: URI of cloud directory
63+
substrings_to_include: Only include files containing these substrings
64+
suffixes_to_exclude: Exclude certain files from download (e.g .safetensors)
65+
"""
66+
pass
67+
68+
@staticmethod
69+
@abstractmethod
70+
def upload_files(
71+
local_path: str,
72+
bucket_uri: str,
73+
) -> None:
74+
"""Upload files to cloud storage.
75+
76+
Args:
77+
local_path: The local path of the files to upload.
78+
bucket_uri: The bucket uri to upload the files to, must start with
79+
`s3://`, `gs://`, `abfss://`, or `azure://`.
80+
"""
81+
pass
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
"""GCS-specific filesystem implementation.
2+
3+
This module provides a GCS-specific implementation.
4+
This maintains backward compatibility while allowing for future optimizations using
5+
native GCS tools (gsutil, google-cloud-storage SDK).
6+
"""
7+
8+
from typing import List, Optional, Union
9+
10+
from ray.llm._internal.common.utils.cloud_filesystem.base import BaseCloudFileSystem
11+
from ray.llm._internal.common.utils.cloud_filesystem.pyarrow_filesystem import (
12+
PyArrowFileSystem,
13+
)
14+
15+
16+
class GCSFileSystem(BaseCloudFileSystem):
17+
"""GCS-specific implementation of cloud filesystem operations.
18+
19+
**Note**: This implementation currently delegates to PyArrowFileSystem to maintain
20+
stability. Optimized implementation using google-cloud-storage SDK and gsutil
21+
will be added in a future PR.
22+
"""
23+
24+
@staticmethod
25+
def get_file(
26+
object_uri: str, decode_as_utf_8: bool = True
27+
) -> Optional[Union[str, bytes]]:
28+
"""Download a file from cloud storage into memory.
29+
30+
Args:
31+
object_uri: URI of the file (gs://)
32+
decode_as_utf_8: If True, decode the file as UTF-8
33+
34+
Returns:
35+
File contents as string or bytes, or None if file doesn't exist
36+
"""
37+
return PyArrowFileSystem.get_file(object_uri, decode_as_utf_8)
38+
39+
@staticmethod
40+
def list_subfolders(folder_uri: str) -> List[str]:
41+
"""List the immediate subfolders in a cloud directory.
42+
43+
Args:
44+
folder_uri: URI of the directory (gs://)
45+
46+
Returns:
47+
List of subfolder names (without trailing slashes)
48+
"""
49+
return PyArrowFileSystem.list_subfolders(folder_uri)
50+
51+
@staticmethod
52+
def download_files(
53+
path: str,
54+
bucket_uri: str,
55+
substrings_to_include: Optional[List[str]] = None,
56+
suffixes_to_exclude: Optional[List[str]] = None,
57+
) -> None:
58+
"""Download files from cloud storage to a local directory.
59+
60+
Args:
61+
path: Local directory where files will be downloaded
62+
bucket_uri: URI of cloud directory
63+
substrings_to_include: Only include files containing these substrings
64+
suffixes_to_exclude: Exclude certain files from download (e.g .safetensors)
65+
"""
66+
PyArrowFileSystem.download_files(
67+
path, bucket_uri, substrings_to_include, suffixes_to_exclude
68+
)
69+
70+
@staticmethod
71+
def upload_files(
72+
local_path: str,
73+
bucket_uri: str,
74+
) -> None:
75+
"""Upload files to cloud storage.
76+
77+
Args:
78+
local_path: The local path of the files to upload.
79+
bucket_uri: The bucket uri to upload the files to, must start with `gs://`.
80+
"""
81+
PyArrowFileSystem.upload_files(local_path, bucket_uri)

0 commit comments

Comments
 (0)