Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Storage] R2 Cloud Object Storage integration #1736

Merged
merged 100 commits into from
Mar 26, 2023
Merged
Show file tree
Hide file tree
Changes from 46 commits
Commits
Show all changes
100 commits
Select commit Hold shift + click to select a range
987b0bb
first r2 integration code added
Feb 24, 2023
fba15d0
minor fix
Feb 24, 2023
a3de58d
integrating R2 as part of TestStorageCredentials
Feb 24, 2023
4d655ae
session error fix
Feb 24, 2023
f4e4f87
session var fix
Feb 24, 2023
df6f527
fix test_public_bucket
Feb 24, 2023
aa2b244
temp change gs for test
Feb 24, 2023
f3dc326
debug print
Feb 25, 2023
d4f1be1
print handle check
Feb 26, 2023
dfb995e
print update
Feb 26, 2023
e90af35
debug print _add_store
Feb 26, 2023
e8ffe13
debug update
Feb 26, 2023
ef66f56
update debug
Feb 26, 2023
f1e589c
test_smoke_change
Feb 26, 2023
0417e63
test bulk deletion
Feb 26, 2023
ffcbbad
debug logger
Feb 26, 2023
f56fb31
debug
Feb 26, 2023
487525a
dbg
Feb 26, 2023
7e915ef
dbg
Feb 26, 2023
769a470
dbg
Feb 26, 2023
27b2953
dbg
Feb 26, 2023
ff0da6e
dbg
Feb 26, 2023
3848aaa
dbg
Feb 26, 2023
0313246
dbg
Feb 26, 2023
6bc4291
dbg
Feb 26, 2023
a7d4a51
dbg
Feb 26, 2023
ae03628
dbg
Feb 26, 2023
b345459
chage R2Store parent
Feb 27, 2023
164a9e2
dbg test_private_bucket
Feb 27, 2023
f875bff
fix_nonexistent_bucket test
Feb 27, 2023
053e33f
fix cli_ls_cmd
Feb 27, 2023
2cf01a3
fix tmp_awscli_bucket_r2
Feb 27, 2023
0c62cb0
fix2 tmp_awscli_bucket_r2 and nonexistent test
Feb 27, 2023
22ba2a7
quick fix
Feb 27, 2023
70d78ad
nonexistent test fix
Feb 27, 2023
87c98bd
add comment
Feb 27, 2023
e0f7eed
storage.py order change
Feb 27, 2023
5909122
add gs:// to test_smoke
Feb 27, 2023
7a8916f
pylint error fix
Feb 28, 2023
3d7407b
fix format
Feb 28, 2023
0014838
first review fix
Mar 2, 2023
570ec51
formatted
landscapepainter Mar 2, 2023
5ce573d
resolve merge conflict
Mar 2, 2023
c83faae
commit before resolving merge confilct
Mar 3, 2023
75ffa92
'origin' to r2-integration resolve merge conflict
Mar 3, 2023
e3c77b2
adding instructions for R2 in docs
Mar 3, 2023
02d52e6
Merge remote-tracking branch 'origin' into r2-integration
landscapepainter Mar 8, 2023
32556aa
gs test failure fix
landscapepainter Mar 14, 2023
19ced23
undo installation doc.
landscapepainter Mar 16, 2023
95f2959
Merge branch 'master' into r2-integration
landscapepainter Mar 16, 2023
53ddf90
adding **kwargs adaptor/cloudflare.py
landscapepainter Mar 16, 2023
5ac2bb8
undo **kwargs
landscapepainter Mar 16, 2023
8c191fa
update kwargs and delete_r2_bucket
landscapepainter Mar 16, 2023
3bab24c
update
landscapepainter Mar 16, 2023
bfeccb0
check if R2 test skip works
landscapepainter Mar 16, 2023
1f690a1
testing for failure
landscapepainter Mar 16, 2023
e15e084
test list comp.
landscapepainter Mar 16, 2023
95e6547
testing
landscapepainter Mar 16, 2023
7f173fb
testing
landscapepainter Mar 16, 2023
0a99bfa
testing
landscapepainter Mar 16, 2023
2207d72
test delete_r2 change
landscapepainter Mar 16, 2023
b54b614
test skipif syntax
landscapepainter Mar 16, 2023
fc1a645
test
landscapepainter Mar 16, 2023
03ec8c3
test
landscapepainter Mar 16, 2023
91609a6
test
landscapepainter Mar 16, 2023
2a135d8
test
landscapepainter Mar 16, 2023
3e952b8
check if conditional skipping works
landscapepainter Mar 16, 2023
ebc63c4
check if conditionally skipping works
landscapepainter Mar 16, 2023
99ab367
test
landscapepainter Mar 16, 2023
3924cfd
test
landscapepainter Mar 16, 2023
bd82146
test
landscapepainter Mar 16, 2023
8ae98b7
test
landscapepainter Mar 16, 2023
6a1f346
test
landscapepainter Mar 16, 2023
3071499
test
landscapepainter Mar 16, 2023
8a8f54a
format
landscapepainter Mar 16, 2023
779c200
reverting back to conditional skip
landscapepainter Mar 16, 2023
a918c37
revert back to conditional skip
landscapepainter Mar 16, 2023
3f5ab45
debug
landscapepainter Mar 16, 2023
90ab019
format
landscapepainter Mar 16, 2023
0708cad
Merge branch 'r2-integration' of https://github.com/landscapepainter/…
landscapepainter Mar 16, 2023
6802426
r2 multi-file support COPY
landscapepainter Mar 16, 2023
1bdc992
Merge remote-tracking branch 'origin/r2-integration' into r2-integration
landscapepainter Mar 16, 2023
cfc7c2d
multifile COPY R2 Fix
landscapepainter Mar 16, 2023
9146395
fix
landscapepainter Mar 16, 2023
5277e59
fix
landscapepainter Mar 16, 2023
12789e1
fix
landscapepainter Mar 16, 2023
fa7f4df
format fix
landscapepainter Mar 16, 2023
6011108
add r2 storage mounting test
landscapepainter Mar 19, 2023
060a54e
fix indent
landscapepainter Mar 19, 2023
609ad95
fix bug
landscapepainter Mar 20, 2023
cab4df0
test fix and format
landscapepainter Mar 20, 2023
c7e0a2b
Merge branch 'master' into r2-integration
landscapepainter Mar 20, 2023
9667d94
fix generic_cloud
landscapepainter Mar 21, 2023
01cf583
update --cloudflare functionality
landscapepainter Mar 21, 2023
6167ebb
format
landscapepainter Mar 21, 2023
a6a1249
fix conftest
landscapepainter Mar 22, 2023
1b7475e
fixes
landscapepainter Mar 22, 2023
f640db8
comment on conftest.py
landscapepainter Mar 25, 2023
bc59e9d
minor fix
landscapepainter Mar 25, 2023
28b3ea6
format
landscapepainter Mar 25, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion docs/source/getting-started/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Install SkyPilot using pip:
$ # pip install "skypilot[all]"


SkyPilot currently supports four cloud providers: AWS, GCP, Azure, and Lambda Cloud.
SkyPilot currently supports five cloud providers: AWS, GCP, Azure, Lambda Cloud, and Cloudflare(R2, storage only).
If you only have access to certain clouds, use any combination of
:code:`"[aws,azure,gcp,lambda]"` (e.g., :code:`"[aws,gcp]"`) to reduce the
dependencies installed.
Expand Down Expand Up @@ -118,6 +118,44 @@ To configure Lambda Cloud access, go to the `API Keys <https://cloud.lambdalabs.

.. _verify-cloud-access:

Cloudflare R2
~~~~~~~~~~~~~~~~~~

As R2 is S3 compatible, we would need to set up AWS credentials for R2 with :code:`aws configure`. To generate the Access Key ID and Secret Access Key for R2, please follow these `instructions <https://developers.cloudflare.com/r2/data-access/s3-api/tokens/>`_. With the given Key values, you can set up the credentials. If you already configured for AWS, you can skip this step.

.. code-block:: console

$ # Install boto
$ pip install boto3

$ # Configure your R2 credentials
$ aws configure

Then, set the following.

.. code-block:: text

AWS Access Key ID [None]: <access_key_id>
AWS Secret Access Key [None]: <access_key_secret>
Default region name [None]: auto
Default output format [None]: json

Open :code:`~/.aws/credentials`, and if you did not configure for AWS, change :code:`[default]` to :code:`[r2]`. If you already have configured for AWS and skipped the step above, create another configuration for r2 below your configuration for AWS.

.. code-block:: text

...

[r2]
aws_access_key_id = <access_key_id>
aws_secret_access_key = <access_key_secret>

Get your Account ID from your R2 dashboard and store it in :code:`~/.cloudflare/accountid`.

.. code-block:: text

<account_id>

Verifying cloud access
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down
120 changes: 120 additions & 0 deletions sky/adaptors/cloudflare.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
"""Cloudflare cloud adaptors"""

# pylint: disable=import-outside-toplevel

import functools
import threading
import os

boto3 = None
botocore = None
_session_creation_lock = threading.RLock()
ACCOUNT_ID_PATH = '~/.cloudflare/accountid'
R2_PROFILE_NAME = 'r2'


def import_package(func):

@functools.wraps(func)
def wrapper(*args, **kwargs):
global boto3, botocore
if boto3 is None or botocore is None:
try:
import boto3 as _boto3
import botocore as _botocore
boto3 = _boto3
botocore = _botocore
except ImportError:
raise ImportError('Fail to import dependencies for Cloudflare.'
'Try pip install "skypilot[aws]"') from None
return func(*args, **kwargs)

return wrapper


# lru_cache() is thread-safe and it will return the same session object
# for different threads.
# Reference: https://docs.python.org/3/library/functools.html#functools.lru_cache # pylint: disable=line-too-long
@functools.lru_cache()
@import_package
def session():
"""Create an AWS session."""
# Creating the session object is not thread-safe for boto3,
# so we add a reentrant lock to synchronize the session creation.
# Reference: https://github.com/boto/boto3/issues/1592
# However, the session object itself is thread-safe, so we are
# able to use lru_cache() to cache the session object.
with _session_creation_lock:
return boto3.session.Session(profile_name=R2_PROFILE_NAME)


@functools.lru_cache()
@import_package
def resource(resource_name: str):
"""Create an Cloudflare resource.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: *a cloudflare


Args:
resource_name: Cloudflare resource name (e.g., 's3').
kwargs: Other options.
"""
# Need to use the resource retrieved from the per-thread session
# to avoid thread-safety issues (Directly creating the client
# with boto3.resource() is not thread-safe).
# Reference: https://stackoverflow.com/a/59635814

session_ = session()
cloudflare_credentials = session_.get_credentials().get_frozen_credentials()
endpoint = create_endpoint()

return session_.resource(
resource_name,
endpoint_url=endpoint,
aws_access_key_id=cloudflare_credentials.access_key,
aws_secret_access_key=cloudflare_credentials.secret_key,
region_name='auto')


@functools.lru_cache()
def client(service_name: str, region):
"""Create an CLOUDFLARE client of a certain service.

Args:
service_name: CLOUDFLARE service name (e.g., 's3').
kwargs: Other options.
"""
# Need to use the client retrieved from the per-thread session
# to avoid thread-safety issues (Directly creating the client
# with boto3.client() is not thread-safe).
# Reference: https://stackoverflow.com/a/59635814

session_ = session()
cloudflare_credentials = session_.get_credentials().get_frozen_credentials()
endpoint = create_endpoint()

return session_.client(
service_name,
endpoint_url=endpoint,
aws_access_key_id=cloudflare_credentials.access_key,
aws_secret_access_key=cloudflare_credentials.secret_key,
region_name=region)


@import_package
def botocore_exceptions():
"""AWS botocore exception."""
from botocore import exceptions
return exceptions


def create_endpoint():
"""Reads accountid necessary to interact with R2"""

accountid_path = os.path.expanduser(ACCOUNT_ID_PATH)
with open(accountid_path, 'r') as f:
lines = f.readlines()
accountid = lines[0]

accountid = accountid.strip()
endpoint = 'https://' + accountid + '.r2.cloudflarestorage.com'

return endpoint
60 changes: 59 additions & 1 deletion sky/cloud_stores.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from sky.clouds import gcp
from sky.data import data_utils
from sky.adaptors import aws
from sky.adaptors import aws, cloudflare


class CloudStorage:
Expand Down Expand Up @@ -146,6 +146,63 @@ def make_sync_file_command(self, source: str, destination: str) -> str:
return ' && '.join(all_commands)


class R2CloudStorage(CloudStorage):
"""Cloudflare Cloud Storage."""

# List of commands to install AWS CLI
_GET_AWSCLI = [
'aws --version >/dev/null 2>&1 || pip3 install awscli',
]

def is_directory(self, url: str) -> bool:
"""Returns whether R2 'url' is a directory.

In cloud object stores, a "directory" refers to a regular object whose
name is a prefix of other objects.
"""
r2 = cloudflare.resource('s3')
bucket_name, path = data_utils.split_r2_path(url)
bucket = r2.Bucket(bucket_name)

num_objects = 0
for obj in bucket.objects.filter(Prefix=path):
num_objects += 1
if obj.key == path:
return False
# If there are more than 1 object in filter, then it is a directory
if num_objects == 3:
return True

# A directory with few or no items
return True

def make_sync_dir_command(self, source: str, destination: str) -> str:
"""Downloads using AWS CLI."""
# AWS Sync by default uses 10 threads to upload files to the bucket.
# To increase parallelism, modify max_concurrent_requests in your
# aws config file (Default path: ~/.aws/config).
endpoint_url = cloudflare.create_endpoint()
download_via_awscli = ('aws s3 sync --no-follow-symlinks '
f'{source} s3://{destination} '
f'--endpoint {endpoint_url} '
f'--profile={cloudflare.R2_PROFILE_NAME}')

all_commands = list(self._GET_AWSCLI)
all_commands.append(download_via_awscli)
return ' && '.join(all_commands)

def make_sync_file_command(self, source: str, destination: str) -> str:
"""Downloads a file using AWS CLI."""
endpoint_url = cloudflare.create_endpoint()
download_via_awscli = (f'aws s3 cp s3://{source} {destination} '
f'--endpoint {endpoint_url} '
f'--profile={cloudflare.R2_PROFILE_NAME}')

all_commands = list(self._GET_AWSCLI)
all_commands.append(download_via_awscli)
return ' && '.join(all_commands)


def get_storage_from_path(url: str) -> CloudStorage:
"""Returns a CloudStorage by identifying the scheme:// in a URL."""
result = urllib.parse.urlsplit(url)
Expand All @@ -159,4 +216,5 @@ def get_storage_from_path(url: str) -> CloudStorage:
_REGISTRY = {
'gs': GcsCloudStorage(),
's3': S3CloudStorage(),
'r2': R2CloudStorage()
}
58 changes: 58 additions & 0 deletions sky/data/data_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

TODO:
- All combinations of Azure Transfer
- All combinations of R2 Transfer
- GCS -> S3
"""
import json
Expand Down Expand Up @@ -116,6 +117,21 @@ def s3_to_gcs(s3_bucket_name: str, gs_bucket_name: str) -> None:
f'Transfer finished in {(time.time() - start) / 60:.2f} minutes.')


def s3_to_r2(s3_bucket_name: str, r2_bucket_name: str) -> None:
"""Creates a one-time transfer from Amazon S3 to Google Cloud Storage.

Can be viewed from: https://console.cloud.google.com/transfer/cloud
it will block until the transfer is complete.

Args:
s3_bucket_name: str; Name of the Amazon S3 Bucket
r2_bucket_name: str; Name of the Cloudflare R2 Bucket
"""
raise NotImplementedError('Moving data directly from clouds to R2 is '
'currently not supported. Please specify '
'a local source for the storage object.')


def gcs_to_s3(gs_bucket_name: str, s3_bucket_name: str) -> None:
"""Creates a one-time transfer from Google Cloud Storage to Amazon S3.

Expand All @@ -129,6 +145,48 @@ def gcs_to_s3(gs_bucket_name: str, s3_bucket_name: str) -> None:
subprocess.call(sync_command, shell=True)


def gcs_to_r2(gs_bucket_name: str, r2_bucket_name: str) -> None:
"""Creates a one-time transfer from Google Cloud Storage to Amazon S3.

Args:
gs_bucket_name: str; Name of the Google Cloud Storage Bucket
r2_bucket_name: str; Name of the Cloudflare R2 Bucket
"""
raise NotImplementedError('Moving data directly from clouds to R2 is '
'currently not supported. Please specify '
'a local source for the storage object.')


def r2_to_gcs(r2_bucket_name: str, gs_bucket_name: str) -> None:
"""Creates a one-time transfer from Cloudflare R2 to Google Cloud Storage.

Can be viewed from: https://console.cloud.google.com/transfer/cloud
it will block until the transfer is complete.

Args:
r2_bucket_name: str; Name of the Cloudflare R2 Bucket
gs_bucket_name: str; Name of the Google Cloud Storage Bucket
"""
raise NotImplementedError('Moving data directly from R2 to clouds is '
'currently not supported. Please specify '
'a local source for the storage object.')


def r2_to_s3(r2_bucket_name: str, s3_bucket_name: str) -> None:
"""Creates a one-time transfer from Amazon S3 to Google Cloud Storage.

Can be viewed from: https://console.cloud.google.com/transfer/cloud
it will block until the transfer is complete.

Args:
r2_bucket_name: str; Name of the Cloudflare R2 Bucket\
s3_bucket_name: str; Name of the Amazon S3 Bucket
"""
raise NotImplementedError('Moving data directly from R2 to clouds is '
'currently not supported. Please specify '
'a local source for the storage object.')


def _add_bucket_iam_member(bucket_name: str, role: str, member: str) -> None:
storage_client = gcp.storage_client()
bucket = storage_client.bucket(bucket_name)
Expand Down
36 changes: 34 additions & 2 deletions sky/data/data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from sky import exceptions
from sky import sky_logging
from sky.adaptors import aws, gcp
from sky.adaptors import aws, gcp, cloudflare
from sky.utils import ux_utils

Client = Any
Expand Down Expand Up @@ -40,6 +40,18 @@ def split_gcs_path(gcs_path: str) -> Tuple[str, str]:
return bucket, key


def split_r2_path(r2_path: str) -> Tuple[str, str]:
"""Splits R2 Path into Bucket name and Relative Path to Bucket

Args:
r2_path: str; R2 Path, e.g. r2://imagenet/train/
"""
path_parts = r2_path.replace('r2://', '').split('/')
bucket = path_parts.pop(0)
key = '/'.join(path_parts)
return bucket, key


def create_s3_client(region: str = 'us-east-2') -> Client:
"""Helper method that connects to Boto3 client for S3 Bucket

Expand Down Expand Up @@ -73,6 +85,26 @@ def verify_gcs_bucket(name: str) -> bool:
return False


def create_r2_client(region: str = 'auto') -> Client:
"""Helper method that connects to Boto3 client for R2 Bucket

Args:
region: str; Region for CLOUDFLARE R2 is set to auto
"""
return cloudflare.client('s3', region)


def verify_r2_bucket(name: str) -> bool:
"""Helper method that checks if the R2 bucket exists

Args:
name: str; Name of R2 Bucket (without r2:// prefix)
"""
r2 = cloudflare.resource('s3')
bucket = r2.Bucket(name)
return bucket in r2.buckets.all()


def is_cloud_store_url(url):
result = urllib.parse.urlsplit(url)
# '' means non-cloud URLs.
Expand Down Expand Up @@ -118,7 +150,7 @@ def parallel_upload(source_path_list: List[str],
max_concurrent_uploads: Optional[int] = None) -> None:
"""Helper function to run parallel uploads for a list of paths.

Used by S3Store and GCSStore to run rsync commands in parallel by
Used by S3Store, GCSStore, and R2Store to run rsync commands in parallel by
providing appropriate command generators.

Args:
Expand Down
Loading