-
Notifications
You must be signed in to change notification settings - Fork 9
/
client.go
194 lines (185 loc) · 6.31 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
package awos
import (
"fmt"
"io"
"net/http"
"strings"
"time"
"github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
)
// Client interface
type Client interface {
Get(key string, options ...GetOptions) (string, error)
GetBytes(key string, options ...GetOptions) ([]byte, error)
GetAsReader(key string, options ...GetOptions) (io.ReadCloser, error)
GetWithMetaGZIP(key string, attributes []string, options ...GetOptions) (io.ReadCloser, map[string]string, error)
GetWithMeta(key string, attributes []string, options ...GetOptions) (io.ReadCloser, map[string]string, error)
Put(key string, reader io.ReadSeeker, meta map[string]string, options ...PutOptions) error
Del(key string) error
DelMulti(keys []string) error
Head(key string, meta []string) (map[string]string, error)
ListObject(key string, prefix string, marker string, maxKeys int, delimiter string) ([]string, error)
SignURL(key string, expired int64, options ...SignOptions) (string, error)
GetAndDecompress(key string) (string, error)
GetAndDecompressAsReader(key string) (io.ReadCloser, error)
CompressAndPut(key string, reader io.ReadSeeker, meta map[string]string, options ...PutOptions) error
Range(key string, offset int64, length int64) (io.ReadCloser, error)
Exists(key string) (bool, error)
}
// Options for New method
type Options struct {
// Required, value is one of oss/s3, case insensetive
StorageType string
// Required
AccessKeyID string
// Required
AccessKeySecret string
// Required
Endpoint string
// Required
Bucket string
// Optional, choose which bucket to use based on the last character of the key,
// if bucket is 'content', shards is ['abc', 'edf'],
// then the last character of the key with a/b/c will automatically use the content-abc bucket, and vice versa
Shards []string
// Only for s3-like
Region string
// Only for s3-like, whether to force path style URLs for S3 objects.
S3ForcePathStyle bool
// Only for s3-like
SSL bool
// Only for s3-like, set http client timeout.
// oss has default timeout, but s3 default timeout is 0 means no timeout.
S3HttpTimeoutSecs int64
S3HttpTransportMaxConnsPerHost int
S3HttpTransportMaxIdleConns int
S3HttpTransportIdleConnTimeout time.Duration
// EnableCompressor
EnableCompressor bool
// CompressType gzip
CompressType string
// CompressLimit 大于该值之后才压缩 单位字节
CompressLimit int
}
const (
DefaultHttpTimeout = int64(60)
)
// New awos Client instance
func New(options *Options) (Client, error) {
Register(DefaultGzipCompressor)
cfg := DefaultConfig()
cfg.StorageType = strings.ToLower(options.StorageType)
if cfg.StorageType == StorageTypeOSS {
client, err := oss.New(options.Endpoint, options.AccessKeyID, options.AccessKeySecret)
if err != nil {
return nil, err
}
var ossClient = &OSS{cfg: cfg}
if options.Shards != nil && len(options.Shards) > 0 {
buckets := make(map[string]*oss.Bucket)
for _, v := range options.Shards {
bucket, err := client.Bucket(options.Bucket + "-" + v)
if err != nil {
return nil, err
}
for i := 0; i < len(v); i++ {
buckets[strings.ToLower(v[i:i+1])] = bucket
}
}
ossClient.Shards = buckets
} else {
bucket, err := client.Bucket(options.Bucket)
if err != nil {
return nil, err
}
ossClient.Bucket = bucket
}
if options.EnableCompressor {
// 目前仅支持 gzip
ossClient.cfg.EnableCompressor = options.EnableCompressor
ossClient.cfg.CompressType = options.CompressType
ossClient.cfg.CompressLimit = options.CompressLimit
if comp, ok := compressors[options.CompressType]; ok {
ossClient.compressor = comp
} else {
fmt.Printf("unknown type is: %s", options.CompressType)
}
}
return ossClient, nil
} else if cfg.StorageType == StorageTypeS3 {
var config *aws.Config
// use minio
if options.S3ForcePathStyle {
config = &aws.Config{
Region: aws.String(options.Region),
DisableSSL: aws.Bool(!options.SSL),
Credentials: credentials.NewStaticCredentials(options.AccessKeyID, options.AccessKeySecret, ""),
Endpoint: aws.String(options.Endpoint),
S3ForcePathStyle: aws.Bool(true),
}
} else {
config = &aws.Config{
Region: aws.String(options.Region),
DisableSSL: aws.Bool(!options.SSL),
Credentials: credentials.NewStaticCredentials(options.AccessKeyID, options.AccessKeySecret, ""),
}
if options.Endpoint != "" {
config.Endpoint = aws.String(options.Endpoint)
}
}
httpTimeout := DefaultHttpTimeout
if options.S3HttpTimeoutSecs > 0 {
httpTimeout = options.S3HttpTimeoutSecs
}
httpClient := &http.Client{
Timeout: time.Second * time.Duration(httpTimeout),
}
if options.S3HttpTransportMaxConnsPerHost > 0 {
transport := &http.Transport{
MaxIdleConns: options.S3HttpTransportMaxConnsPerHost,
IdleConnTimeout: 30 * time.Second,
MaxConnsPerHost: options.S3HttpTransportMaxConnsPerHost,
ForceAttemptHTTP2: true,
}
if options.S3HttpTransportMaxIdleConns > 0 {
transport.MaxIdleConns = options.S3HttpTransportMaxIdleConns
}
if options.S3HttpTransportIdleConnTimeout != 0 {
transport.IdleConnTimeout = options.S3HttpTransportIdleConnTimeout
}
httpClient.Transport = transport
}
config.HTTPClient = httpClient
service := s3.New(session.Must(session.NewSession(config)))
var s3Client = &S3{Client: service, cfg: cfg}
if options.Shards != nil && len(options.Shards) > 0 {
buckets := make(map[string]string)
for _, v := range options.Shards {
for i := 0; i < len(v); i++ {
buckets[strings.ToLower(v[i:i+1])] = options.Bucket + "-" + v
}
}
s3Client.ShardsBucket = buckets
} else {
s3Client.BucketName = options.Bucket
}
if options.EnableCompressor {
// 目前仅支持 gzip
s3Client.cfg.EnableCompressor = options.EnableCompressor
s3Client.cfg.CompressType = options.CompressType
s3Client.cfg.CompressLimit = options.CompressLimit
if comp, ok := compressors[options.CompressType]; ok {
s3Client.compressor = comp
} else {
fmt.Printf("unknown type is: %s", options.CompressType)
}
}
return s3Client, nil
} else {
return nil, fmt.Errorf(`unknown StorageType:"%s", only supports oss or s3`, options.StorageType)
}
}