Skip to content
This repository has been archived by the owner on Jul 19, 2023. It is now read-only.

Commit

Permalink
feat(objstore): support Tencent COS object storage
Browse files Browse the repository at this point in the history
  • Loading branch information
scottzhlin committed Nov 25, 2022
1 parent c8e99f6 commit 1898856
Show file tree
Hide file tree
Showing 5 changed files with 220 additions and 8 deletions.
21 changes: 14 additions & 7 deletions pkg/objstore/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/thanos-io/objstore"

"github.com/grafana/phlare/pkg/objstore/providers/azure"
"github.com/grafana/phlare/pkg/objstore/providers/cos"
"github.com/grafana/phlare/pkg/objstore/providers/filesystem"
"github.com/grafana/phlare/pkg/objstore/providers/gcs"
"github.com/grafana/phlare/pkg/objstore/providers/s3"
Expand All @@ -31,6 +32,9 @@ const (
// Swift is the value for the Openstack Swift storage backend.
Swift = "swift"

// COS is the value for the Tencent Cloud COS storage backend.
COS = "cos"

// Filesystem is the value for the filesystem storage backend.
Filesystem = "filesystem"

Expand All @@ -39,7 +43,7 @@ const (
)

var (
SupportedBackends = []string{S3, GCS, Azure, Swift, Filesystem}
SupportedBackends = []string{S3, GCS, Azure, Swift, Filesystem, COS}

ErrUnsupportedStorageBackend = errors.New("unsupported storage backend")
ErrInvalidCharactersInStoragePrefix = errors.New("storage prefix contains invalid characters, it may only contain digits and English alphabet letters")
Expand All @@ -53,6 +57,7 @@ type StorageBackendConfig struct {
GCS gcs.Config `yaml:"gcs"`
Azure azure.Config `yaml:"azure"`
Swift swift.Config `yaml:"swift"`
COS cos.Config `yaml:"cos"`
Filesystem filesystem.Config `yaml:"filesystem"`
}

Expand All @@ -72,6 +77,7 @@ func (cfg *StorageBackendConfig) RegisterFlagsWithPrefixAndDefaultDirectory(pref
cfg.Azure.RegisterFlagsWithPrefix(prefix, f, logger)
cfg.Swift.RegisterFlagsWithPrefix(prefix, f)
cfg.Filesystem.RegisterFlagsWithPrefixAndDefaultDirectory(prefix, dir, f)
cfg.COS.RegisterFlagsWithPrefix(prefix, f)
f.StringVar(&cfg.Backend, prefix+"backend", Filesystem, fmt.Sprintf("Backend storage to use. Supported backends are: %s.", strings.Join(cfg.supportedBackends(), ", ")))
}

Expand All @@ -84,13 +90,14 @@ func (cfg *StorageBackendConfig) Validate() error {
return ErrUnsupportedStorageBackend
}

if cfg.Backend == S3 {
if err := cfg.S3.Validate(); err != nil {
return err
}
switch cfg.Backend {
case S3:
return cfg.S3.Validate()
case COS:
return cfg.COS.Validate()
default:
return nil
}

return nil
}

// Config holds configuration for accessing long-term storage.
Expand Down
4 changes: 3 additions & 1 deletion pkg/objstore/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import (
"context"

"github.com/prometheus/client_golang/prometheus"

"github.com/thanos-io/objstore"

phlareobjstore "github.com/grafana/phlare/pkg/objstore"
"github.com/grafana/phlare/pkg/objstore/client/parquet"
"github.com/grafana/phlare/pkg/objstore/providers/azure"
"github.com/grafana/phlare/pkg/objstore/providers/cos"
"github.com/grafana/phlare/pkg/objstore/providers/filesystem"
"github.com/grafana/phlare/pkg/objstore/providers/gcs"
"github.com/grafana/phlare/pkg/objstore/providers/s3"
Expand All @@ -35,6 +35,8 @@ func NewBucket(ctx context.Context, cfg Config, name string) (phlareobjstore.Buc
backendClient, err = azure.NewBucketClient(cfg.Azure, name, logger)
case Swift:
backendClient, err = swift.NewBucketClient(cfg.Swift, name, logger)
case COS:
backendClient, err = cos.NewBucketClient(cfg.COS, name, logger)
case Filesystem:
backendClient, err = filesystem.NewBucket(cfg.Filesystem.Directory)
default:
Expand Down
40 changes: 40 additions & 0 deletions pkg/objstore/providers/cos/bucket_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package cos

import (
"github.com/go-kit/log"
"github.com/prometheus/common/model"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/exthttp"
"github.com/thanos-io/objstore/providers/cos"
"gopkg.in/yaml.v3"
)

// NewBucketClient creates a bucket client for COS
func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucket, error) {
bucketConfig := &cos.Config{
Bucket: cfg.Bucket,
Region: cfg.Region,
AppId: cfg.AppID,
Endpoint: cfg.Endpoint,
SecretKey: cfg.SecretKey,
SecretId: cfg.SecretID,
HTTPConfig: exthttp.HTTPConfig{
IdleConnTimeout: model.Duration(cfg.HTTP.IdleConnTimeout),
ResponseHeaderTimeout: model.Duration(cfg.HTTP.ResponseHeaderTimeout),
InsecureSkipVerify: cfg.HTTP.InsecureSkipVerify,
TLSHandshakeTimeout: model.Duration(cfg.HTTP.TLSHandshakeTimeout),
ExpectContinueTimeout: model.Duration(cfg.HTTP.ExpectContinueTimeout),
MaxIdleConns: cfg.HTTP.MaxIdleConns,
MaxIdleConnsPerHost: cfg.HTTP.MaxIdleConnsPerHost,
MaxConnsPerHost: cfg.HTTP.MaxConnsPerHost,
Transport: cfg.HTTP.Transport,
},
}

serializedConfig, err := yaml.Marshal(bucketConfig)
if err != nil {
return nil, err
}

return cos.NewBucket(logger, serializedConfig, name)
}
87 changes: 87 additions & 0 deletions pkg/objstore/providers/cos/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package cos

import (
"errors"
"flag"
"fmt"
"net/http"
"net/url"
"time"
)

// Config encapsulates the necessary config values to instantiate an cos client.
type Config struct {
Bucket string `yaml:"bucket"`
Region string `yaml:"region"`
AppID string `yaml:"app_id"`
Endpoint string `yaml:"endpoint"`
SecretKey string `yaml:"secret_key"`
SecretID string `yaml:"secret_id"`
HTTP HTTPConfig `yaml:"http"`
}

// Validate validates cos client config and returns error on failure
func (c *Config) Validate() error {
if len(c.Endpoint) != 0 {
if _, err := url.Parse(c.Endpoint); err != nil {
return fmt.Errorf("cos config: failed to parse endpoint: %w", err)
}

if empty(c.SecretKey) || empty(c.SecretID) {
return errors.New("secret id and secret key cannot be empty")
}
return nil
}

if empty(c.Bucket) || empty(c.AppID) || empty(c.Region) || empty(c.SecretID) || empty(c.SecretKey) {
return errors.New("cos config ")
}
return nil
}

func empty(s string) bool {
return len(s) == 0
}

// RegisterFlags registers the flags for COS storage
func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.RegisterFlagsWithPrefix("", f)
}

// RegisterFlagsWithPrefix register the flags for COS storage with provided prefix
func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&c.Bucket, prefix+"cos.bucket", "", "COS bucket name")
f.StringVar(&c.Region, prefix+"cos.region", "", "COS region name")
f.StringVar(&c.AppID, prefix+"cos.app-id", "", "COS app id")
f.StringVar(&c.Endpoint, prefix+"cos.endpoint", "", "COS storage endpoint")
f.StringVar(&c.SecretID, prefix+"cos.secret-id", "", "COS secret id")
f.StringVar(&c.SecretKey, prefix+"cos.secret-key", "", "COS secret key")
c.HTTP.RegisterFlagsWithPrefix(prefix, f)
}

// HTTPConfig stores the http.Transport configuration for the COS client.
type HTTPConfig struct {
IdleConnTimeout time.Duration `yaml:"idle_conn_timeout" category:"advanced"`
ResponseHeaderTimeout time.Duration `yaml:"response_header_timeout" category:"advanced"`
InsecureSkipVerify bool `yaml:"insecure_skip_verify" category:"advanced"`
TLSHandshakeTimeout time.Duration `yaml:"tls_handshake_timeout" category:"advanced"`
ExpectContinueTimeout time.Duration `yaml:"expect_continue_timeout" category:"advanced"`
MaxIdleConns int `yaml:"max_idle_connections" category:"advanced"`
MaxIdleConnsPerHost int `yaml:"max_idle_connections_per_host" category:"advanced"`
MaxConnsPerHost int `yaml:"max_connections_per_host" category:"advanced"`

// Allow upstream callers to inject a round tripper
Transport http.RoundTripper `yaml:"-"`
}

// RegisterFlagsWithPrefix registers the flags for COS storage with the provided prefix
func (cfg *HTTPConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&cfg.IdleConnTimeout, prefix+"cos.http.idle-conn-timeout", 90*time.Second, "The time an idle connection will remain idle before closing.")
f.DurationVar(&cfg.ResponseHeaderTimeout, prefix+"cos.http.response-header-timeout", 2*time.Minute, "The amount of time the client will wait for a servers response headers.")
f.BoolVar(&cfg.InsecureSkipVerify, prefix+"cos.http.insecure-skip-verify", false, "If the client connects to COS via HTTPS and this option is enabled, the client will accept any certificate and hostname.")
f.DurationVar(&cfg.TLSHandshakeTimeout, prefix+"cos.tls-handshake-timeout", 10*time.Second, "Maximum time to wait for a TLS handshake. 0 means no limit.")
f.DurationVar(&cfg.ExpectContinueTimeout, prefix+"cos.expect-continue-timeout", 1*time.Second, "The time to wait for a server's first response headers after fully writing the request headers if the request has an Expect header. 0 to send the request body immediately.")
f.IntVar(&cfg.MaxIdleConns, prefix+"cos.max-idle-connections", 100, "Maximum number of idle (keep-alive) connections across all hosts. 0 means no limit.")
f.IntVar(&cfg.MaxIdleConnsPerHost, prefix+"cos.max-idle-connections-per-host", 100, "Maximum number of idle (keep-alive) connections to keep per-host. If 0, a built-in default value is used.")
f.IntVar(&cfg.MaxConnsPerHost, prefix+"cos.max-connections-per-host", 0, "Maximum number of connections per host. 0 means no limit.")
}
76 changes: 76 additions & 0 deletions pkg/objstore/providers/cos/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package cos

import "testing"

func TestConfig_Validate(t *testing.T) {
type fields struct {
Bucket string
Region string
AppID string
Endpoint string
SecretKey string
SecretID string
HTTP HTTPConfig
}
tests := []struct {
name string
fields fields
wantErr bool
}{
{
name: "ok endpoint",
fields: fields{
Endpoint: "http://bucket-123.cos.ap-beijing.myqcloud.com",
SecretID: "sid",
SecretKey: "skey",
},
wantErr: false,
},
{
name: "ok bucket-AppID-region",
fields: fields{
Bucket: "bucket",
AppID: "123",
Region: "ap-beijing",
SecretID: "sid",
SecretKey: "skey",
},
wantErr: false,
},
{
name: "missing skey",
fields: fields{
Bucket: "bucket",
AppID: "123",
Region: "ap-beijing",
},
wantErr: true,
},
{
name: "missing bucket",
fields: fields{
AppID: "123",
Region: "ap-beijing",
SecretID: "sid",
SecretKey: "skey",
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Config{
Bucket: tt.fields.Bucket,
Region: tt.fields.Region,
AppID: tt.fields.AppID,
Endpoint: tt.fields.Endpoint,
SecretKey: tt.fields.SecretKey,
SecretID: tt.fields.SecretID,
HTTP: tt.fields.HTTP,
}
if err := c.Validate(); (err != nil) != tt.wantErr {
t.Errorf("Validate() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}

0 comments on commit 1898856

Please sign in to comment.