Skip to content

Commit

Permalink
✨ Support cos (#185)
Browse files Browse the repository at this point in the history
* ✨ Add cos storage support

* ♻️ Reimplement cos some of the logic

* ✨ Support cos

---------

Co-authored-by: 王春星 <wangchunxing@bytedance.com>
  • Loading branch information
tosone and wang-chunxing authored Aug 22, 2023
1 parent b8e3dca commit 20718db
Show file tree
Hide file tree
Showing 16 changed files with 524 additions and 242 deletions.
18 changes: 15 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,12 @@ jobs:
restore-keys: |
${{ runner.os }}-go-test-sqlite3-
- name: Run tests
env:
COS_ENDPOINT: ${{ secrets.COS_ENDPOINT_SQLITE }}
COS_AK: ${{ secrets.COS_AK }}
COS_SK: ${{ secrets.COS_SK }}
run: |
CI_DATABASE_TYPE=sqlite3 go test -v -coverprofile=coverage.txt -covermode=atomic `go list ./... | grep -v "pkg/tests" | grep -v "pkg/dal/query" | grep -v "pkg/dal/cmd" | grep -v "pkg/types/enums" | grep -v "pkg/handlers/apidocs" | grep -v "pkg/utils/token/mocks" | grep -v "pkg/utils/password/mocks" | grep -v "pkg/handlers/distribution/clients/mocks"`
CI_DATABASE_TYPE=sqlite3 go test -timeout 30m -v -coverprofile=coverage.txt -covermode=atomic `go list ./... | grep -v "pkg/tests" | grep -v "pkg/dal/query" | grep -v "pkg/dal/cmd" | grep -v "pkg/types/enums" | grep -v "pkg/handlers/apidocs" | grep -v "pkg/utils/token/mocks" | grep -v "pkg/utils/password/mocks" | grep -v "pkg/handlers/distribution/clients/mocks"`
- name: Upload coverage reports to Codecov
uses: codecov/codecov-action@v3
with:
Expand Down Expand Up @@ -137,8 +141,12 @@ jobs:
restore-keys: |
${{ runner.os }}-go-test-postgresql-
- name: Run tests
env:
COS_ENDPOINT: ${{ secrets.COS_ENDPOINT_POSTGRESQL }}
COS_AK: ${{ secrets.COS_AK }}
COS_SK: ${{ secrets.COS_SK }}
run: |
CI_DATABASE_TYPE=postgresql go test -v -coverprofile=coverage.txt -covermode=atomic `go list ./... | grep -v "pkg/tests" | grep -v "pkg/dal/query" | grep -v "pkg/dal/cmd" | grep -v "pkg/types/enums" | grep -v "pkg/handlers/apidocs" | grep -v "pkg/utils/token/mocks" | grep -v "pkg/utils/password/mocks" | grep -v "pkg/handlers/distribution/clients/mocks"`
CI_DATABASE_TYPE=postgresql go test -timeout 30m -v -coverprofile=coverage.txt -covermode=atomic `go list ./... | grep -v "pkg/tests" | grep -v "pkg/dal/query" | grep -v "pkg/dal/cmd" | grep -v "pkg/types/enums" | grep -v "pkg/handlers/apidocs" | grep -v "pkg/utils/token/mocks" | grep -v "pkg/utils/password/mocks" | grep -v "pkg/handlers/distribution/clients/mocks"`
- name: Upload coverage reports to Codecov
uses: codecov/codecov-action@v3
with:
Expand Down Expand Up @@ -205,8 +213,12 @@ jobs:
restore-keys: |
${{ runner.os }}-go-test-mysql-
- name: Run tests
env:
COS_ENDPOINT: ${{ secrets.COS_ENDPOINT_MYSQL }}
COS_AK: ${{ secrets.COS_AK }}
COS_SK: ${{ secrets.COS_SK }}
run: |
CI_DATABASE_TYPE=mysql go test -v -coverprofile=coverage.txt -covermode=atomic `go list ./... | grep -v "pkg/tests" | grep -v "pkg/dal/query" | grep -v "pkg/dal/cmd" | grep -v "pkg/types/enums" | grep -v "pkg/handlers/apidocs" | grep -v "pkg/utils/token/mocks" | grep -v "pkg/utils/password/mocks" | grep -v "pkg/handlers/distribution/clients/mocks"`
CI_DATABASE_TYPE=mysql go test -timeout 30m -v -coverprofile=coverage.txt -covermode=atomic `go list ./... | grep -v "pkg/tests" | grep -v "pkg/dal/query" | grep -v "pkg/dal/cmd" | grep -v "pkg/types/enums" | grep -v "pkg/handlers/apidocs" | grep -v "pkg/utils/token/mocks" | grep -v "pkg/utils/password/mocks" | grep -v "pkg/handlers/distribution/clients/mocks"`
- name: Upload coverage reports to Codecov
uses: codecov/codecov-action@v3
with:
Expand Down
2 changes: 1 addition & 1 deletion conf/config-full.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# this is a full config file, just show all avaliable config items.
# this is a full config file, just show all available config items.

log:
level: debug
Expand Down
5 changes: 5 additions & 0 deletions conf/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ storage:
region: cn-north-1
bucket: sigma
forcePathStyle: true
cos:
ak: xxxx
sk: xxxx
endpoint: https://xxxxxxx

# Notice: the tag never update after the first pulled from remote registry, unless you delete the image and pull again.
proxy:
enabled: false
Expand Down
3 changes: 2 additions & 1 deletion pkg/cmds/distribution/distribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/rs/zerolog/log"
"github.com/spf13/viper"

"github.com/go-sigma/sigma/pkg/configs"
"github.com/go-sigma/sigma/pkg/handlers"
"github.com/go-sigma/sigma/pkg/middlewares"
"github.com/go-sigma/sigma/pkg/storage"
Expand Down Expand Up @@ -62,7 +63,7 @@ func Serve() error {
}

handlers.InitializeDistribution(e)
err := storage.Initialize()
err := storage.Initialize(configs.Configuration{})
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/cmds/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/spf13/viper"

"github.com/go-sigma/sigma/pkg/builder"
"github.com/go-sigma/sigma/pkg/configs"
"github.com/go-sigma/sigma/pkg/consts"
"github.com/go-sigma/sigma/pkg/daemon"
"github.com/go-sigma/sigma/pkg/handlers"
Expand Down Expand Up @@ -98,7 +99,7 @@ func Serve(config ServerConfig) error {
return err
}

err = storage.Initialize()
err = storage.Initialize(configs.Configuration{})
if err != nil {
return err
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/configs/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,23 @@ type ConfigurationStorageS3 struct {
ForcePathStyle bool `yaml:"forcePathStyle"`
}

// ConfigurationStorageCos ...
type ConfigurationStorageCos struct {
Ak string `yaml:"ak"`
Sk string `yaml:"sk"`
Endpoint string `yaml:"endpoint"`
Region string `yaml:"region"`
Bucket string `yaml:"bucket"`
ForcePathStyle bool `yaml:"forcePathStyle"`
}

// ConfigurationStorage ...
type ConfigurationStorage struct {
RootDirectory string `yaml:"rootDirectory"`
Type string `yaml:"type"`
Filesystem ConfigurationStorageFilesystem `yaml:"filesystem"`
S3 ConfigurationStorageS3 `yaml:"s3"`
Cos ConfigurationStorageCos `yaml:"cos"`
}

// ConfigurationProxy ...
Expand Down
226 changes: 182 additions & 44 deletions pkg/storage/cos/cos.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,19 @@ package cos

import (
"context"
"fmt"
"io"
"net/http"
"net/url"
"os"
"path"
"reflect"
"time"
"strconv"

"github.com/spf13/viper"
cos "github.com/tencentyun/cos-go-sdk-v5"
gonanoid "github.com/matoous/go-nanoid"
"github.com/tencentyun/cos-go-sdk-v5"

"github.com/go-sigma/sigma/pkg/configs"
"github.com/go-sigma/sigma/pkg/consts"
"github.com/go-sigma/sigma/pkg/storage"
"github.com/go-sigma/sigma/pkg/utils"
)
Expand All @@ -39,79 +41,215 @@ type factory struct{}

var _ storage.Factory = factory{}

func (f factory) New() (storage.StorageDriver, error) {
endpoint := viper.GetString("storage.cos.endpoint")
ak := viper.GetString("storage.cos.ak")
sk := viper.GetString("storage.cos.sk")
// New ...
func (f factory) New(config configs.Configuration) (storage.StorageDriver, error) {
u, err := url.Parse(config.Storage.Cos.Endpoint)
if err != nil {
return nil, fmt.Errorf("Config [storage.cos.endpoint] is invalid")
}

u, _ := url.Parse(endpoint)
b := &cos.BaseURL{BucketURL: u}
c := cos.NewClient(b, &http.Client{
Timeout: 100 * time.Second,
c := cos.NewClient(&cos.BaseURL{BucketURL: u}, &http.Client{
Transport: &cos.AuthorizationTransport{
SecretID: ak,
SecretKey: sk,
SecretID: config.Storage.Cos.Ak,
SecretKey: config.Storage.Cos.Sk,
},
})

return &tencentcos{
client: c,
client: c,
domain: u.Host,
rootDirectory: config.Storage.RootDirectory,
}, nil
}

type tencentcos struct {
client *cos.Client
client *cos.Client
domain string
rootDirectory string
}

// Stat retrieves the FileInfo for the given path, including the current
// size in bytes and the creation time.
func (t *tencentcos) Stat(ctx context.Context, path string) (storage.FileInfo, error) {
return os.Stat(path)
}

// Move moves an object stored at sourcePath to destPath, removing the
// original object.
// Note: This may be no more efficient than a copy followed by a delete for
// many implementations.
func (t *tencentcos) Move(ctx context.Context, sourcePath string, destPath string) error {
return nil
// Move moves a file from srcPath to dstPath.
func (t *tencentcos) Move(ctx context.Context, srcPath, dstPath string) (err error) {
srcPath = storage.SanitizePath(t.rootDirectory, srcPath)
dstPath = storage.SanitizePath(t.rootDirectory, dstPath)

srcFile, err := t.client.Object.Head(ctx, srcPath, nil)
if err != nil {
return err
}
srcSize := srcFile.ContentLength

if srcSize < storage.MultipartCopyThresholdSize {
_, _, err := t.client.Object.Copy(ctx, dstPath,
fmt.Sprintf("%s/%s", t.domain, srcPath), &cos.ObjectCopyOptions{})
return err
}

uploadRes, _, err := t.client.Object.InitiateMultipartUpload(ctx, dstPath, nil)
if err != nil {
return err
}
uploadID := uploadRes.UploadID

numParts := (srcSize + storage.MultipartCopyChunkSize - 1) / storage.MultipartCopyChunkSize
completedParts := make([]cos.Object, numParts)
errChan := make(chan error, numParts)
limiter := make(chan struct{}, storage.MultipartCopyMaxConcurrency)

for i := range completedParts {
i := i
go func() {
limiter <- struct{}{}
firstByte := i * storage.MultipartCopyChunkSize
lastByte := firstByte + storage.MultipartCopyChunkSize - 1
if int64(lastByte) >= srcSize {
lastByte = int(srcSize - 1)
}
partResp, _, err := t.client.Object.CopyPart(ctx, dstPath, uploadID, i+1,
fmt.Sprintf("%s/%s", t.domain, srcPath), &cos.ObjectCopyPartOptions{
XCosCopySourceRange: fmt.Sprintf("bytes=%d-%d", firstByte, lastByte),
})
if err == nil {
completedParts[i] = cos.Object{
ETag: partResp.ETag,
PartNumber: i + 1,
}
}
errChan <- err
<-limiter
}()
}

for range completedParts {
err := <-errChan
if err != nil {
return err
}
}

_, _, err = t.client.Object.CompleteMultipartUpload(ctx, dstPath, uploadID, &cos.CompleteMultipartUploadOptions{
Parts: completedParts,
})
return err
}

// Delete recursively deletes all objects stored at "path" and its subpaths.
// Delete removes the object at the given path.
// Note: if you delete 'test' then the following file and dir will be deleted:
// 'test/file', 'test', 'test/'
func (t *tencentcos) Delete(ctx context.Context, path string) error {
path = storage.SanitizePath(t.rootDirectory, path)

objects := make([]cos.Object, 0, storage.MaxPaginationKeys)
var marker string
for {
opt := &cos.BucketGetOptions{
Prefix: path,
Marker: marker,
MaxKeys: storage.MaxPaginationKeys,
}
resp, _, err := t.client.Bucket.Get(ctx, opt)
if err != nil {
return fmt.Errorf("List objects failed: %v", err)
}
if len(resp.Contents) == 0 {
break
}

for _, obj := range resp.Contents {
if len(obj.Key) > len(path) && obj.Key[len(path)] != '/' {
continue
}
objects = append(objects, cos.Object{
Key: obj.Key,
})
}

if len(objects) > 0 {
resp, _, err := t.client.Object.DeleteMulti(ctx, &cos.ObjectDeleteMultiOptions{
Quiet: false,
Objects: objects,
})
if err != nil {
return err
}

if len(resp.DeletedObjects) != len(objects) {
errs := make([]error, 0, len(resp.Errors))
for _, err := range resp.Errors {
errs = append(errs, fmt.Errorf(err.Message))
}
return fmt.Errorf("Delete objects failed: %+v", errs)
}
}
objects = objects[:0]
marker = resp.NextMarker
if !resp.IsTruncated {
break
}
}
return nil
}

// Reader retrieves an io.ReadCloser for the content stored at "path"
// with a given byte offset.
// May be used to resume reading a stream by providing a nonzero offset.
// Reader returns a reader for the given path.
func (t *tencentcos) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
f, _ := os.Create("test")
return f, nil
opt := &cos.ObjectGetOptions{
Range: fmt.Sprintf("bytes=%s-", strconv.FormatInt(offset, 10)),
}
resp, err := t.client.Object.Get(ctx, path, opt)
if err != nil {
return nil, fmt.Errorf("Get object failed: %v", err)
}
return resp.Body, nil
}

// CreateUploadID creates a new multipart upload and returns an
// opaque upload ID.
// CreateUploadID creates a new upload ID.
func (t *tencentcos) CreateUploadID(ctx context.Context, path string) (string, error) {
return "", nil
resp, _, err := t.client.Object.InitiateMultipartUpload(ctx, path, &cos.InitiateMultipartUploadOptions{})
if err != nil {
return "", fmt.Errorf("failed to create multipart upload: %v", err)
}
return resp.UploadID, nil
}

// WritePart writes a part of a multipart upload.
func (t *tencentcos) UploadPart(ctx context.Context, path, uploadID string, partNumber int64, body io.Reader) (string, error) {
return "", nil
// UploadPart WritePart writes a part of a multipart upload.
func (t *tencentcos) UploadPart(ctx context.Context, rPath, uploadID string, partNumber int64, body io.Reader) (string, error) {
partPath := path.Join(storage.SanitizePath(t.rootDirectory, consts.BlobUploadParts), rPath, fmt.Sprintf("%s-%s", uploadID, gonanoid.MustGenerate(consts.Alphanum, 6)))
_, err := t.client.Object.Put(ctx, partPath, body, nil)
if err != nil {
return "", fmt.Errorf("Upload part failed: %v", err)
}
res, _, err := t.client.Object.CopyPart(ctx, rPath, uploadID, int(partNumber), fmt.Sprintf("%s/%s", t.domain, partPath), nil)
if err != nil {
return "", fmt.Errorf("Upload part failed: %v", err)
}
err = t.Delete(ctx, partPath)
if err != nil {
return "", fmt.Errorf("Delete the part file failed: %v", err)
}
return res.ETag, nil
}

// CommitUpload commits a multipart upload.
func (t *tencentcos) CommitUpload(ctx context.Context, path string, uploadID string, parts []string) error {
return nil
func (t *tencentcos) CommitUpload(ctx context.Context, path, uploadID string, parts []string) error {
completeParts := make([]cos.Object, len(parts))
for i, part := range parts {
completeParts[i] = cos.Object{
ETag: part,
PartNumber: i + 1,
}
}
_, _, err := t.client.Object.CompleteMultipartUpload(ctx, path, uploadID, &cos.CompleteMultipartUploadOptions{Parts: completeParts})
return err
}

// AbortUpload aborts a multipart upload.
func (t *tencentcos) AbortUpload(ctx context.Context, path string, uploadID string) error {
return nil
_, err := t.client.Object.AbortMultipartUpload(ctx, path, uploadID)
return err
}

// Upload upload a file to the given path.
func (t *tencentcos) Upload(ctx context.Context, path string, body io.Reader) error {
return nil
_, err := t.client.Object.Put(ctx, path, body, nil)
return err
}
Loading

0 comments on commit 20718db

Please sign in to comment.