Skip to content

Commit

Permalink
Add ability to set ServerSideEncryption on uploaded objects (#69)
Browse files Browse the repository at this point in the history
* storage: Add a parameter for setting SSE mode.

In our use case, we need to pass through the string `AES256` but others
may want `aws:kms` instead.  This gives us a place to store this.

* s3: Pass through ServerSideEncryption string

* s3stream: Also pass through SSE settings here too

* cli: Add `--s3-sse` option to the CLI

* collection: Add a pipeline step to set SSE options

* cli: Pass SSE setting from CLI to pipeline.
  • Loading branch information
sjlongland authored Nov 3, 2022
1 parent 134e850 commit 973ca1b
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 34 deletions.
27 changes: 21 additions & 6 deletions cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,13 @@ type args struct {
TargetRegion string `arg:"--tr" help:"Target AWS Region"`
TargetEndpoint string `arg:"--te" help:"Target AWS Endpoint"`
// S3 config
S3Retry uint `arg:"--s3-retry" help:"Max numbers of retries to sync file"`
S3RetryInterval uint `arg:"--s3-retry-sleep" help:"Sleep interval (sec) between sync retries on error"`
S3Acl string `arg:"--s3-acl" help:"S3 ACL for uploaded files. Possible values: private, public-read, public-read-write, aws-exec-read, authenticated-read, bucket-owner-read, bucket-owner-full-control"`
S3CacheControl string `arg:"--s3-cache-control" help:"Cache-Control header for uploaded files."`
S3StorageClass string `arg:"--s3-storage-class" help:"S3 Storage Class for uploaded files."`
S3KeysPerReq int64 `arg:"--s3-keys-per-req" help:"Max numbers of keys retrieved via List request"`
S3Retry uint `arg:"--s3-retry" help:"Max numbers of retries to sync file"`
S3RetryInterval uint `arg:"--s3-retry-sleep" help:"Sleep interval (sec) between sync retries on error"`
S3Acl string `arg:"--s3-acl" help:"S3 ACL for uploaded files. Possible values: private, public-read, public-read-write, aws-exec-read, authenticated-read, bucket-owner-read, bucket-owner-full-control"`
S3CacheControl string `arg:"--s3-cache-control" help:"Cache-Control header for uploaded files."`
S3StorageClass string `arg:"--s3-storage-class" help:"S3 Storage Class for uploaded files."`
S3KeysPerReq int64 `arg:"--s3-keys-per-req" help:"Max numbers of keys retrieved via List request"`
S3ServerSideEncryption string `arg:"--s3-sse" help:"Use server-side encryption, if specified valid options are \"AES256\" and \"aws:kms\"."`
// FS config
FSFilePerm string `arg:"--fs-file-perm" help:"File permissions"`
FSDirPerm string `arg:"--fs-dir-perm" help:"Dir permissions"`
Expand Down Expand Up @@ -114,6 +115,7 @@ func GetCliArgs() (cli argsParsed, err error) {
rawCli.S3RetryInterval = 0
rawCli.S3Acl = ""
rawCli.S3KeysPerReq = 1000
rawCli.S3ServerSideEncryption = ""
rawCli.OnFail = "fatal"
rawCli.FSDirPerm = "0755"
rawCli.FSFilePerm = "0644"
Expand All @@ -137,6 +139,19 @@ func GetCliArgs() (cli argsParsed, err error) {
p.Fail("--acl must be one of \"copy, private, public-read, public-read-write, aws-exec-read, authenticated-read, bucket-owner-read, bucket-owner-full-control\"")
}

switch strings.ToLower(cli.args.S3ServerSideEncryption) {
case "":
break
case "aes256":
cli.args.S3ServerSideEncryption = "AES256"
break
case "aws:kms":
cli.args.S3ServerSideEncryption = "aws:kms"
break
default:
p.Fail("--s3-sse must be one of \"\", \"AES256\" or \"aws:kms\"")
}

cli.ErrorHandlingMask = storage.ErrHandlingMask(cli.args.ErrorHandlingMask)
switch cli.args.OnFail {
case "fatal":
Expand Down
8 changes: 8 additions & 0 deletions cli/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,14 @@ func setupPipeline(syncGroup *pipeline.Group, cli *argsParsed) {
})
}

if cli.S3ServerSideEncryption != "" {
syncGroup.AddPipeStep(pipeline.Step{
Name: "ServerSideEncryption",
Fn: collection.ServerSideEncryptionUpdater,
Config: cli.S3ServerSideEncryption,
})
}

syncGroup.AddPipeStep(pipeline.Step{
Name: "UploadObj",
Fn: collection.UploadObjectData,
Expand Down
15 changes: 15 additions & 0 deletions pipeline/collection/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,21 @@ var CacheControlUpdater pipeline.StepFn = func(group *pipeline.Group, stepNum in
}
}

// ServerSideEncryptionUpdater updates the SSE mode.
var ServerSideEncryptionUpdater pipeline.StepFn = func(group *pipeline.Group, stepNum int, input <-chan *storage.Object, output chan<- *storage.Object, errChan chan<- error) {
info := group.GetStepInfo(stepNum)
cfg, ok := info.Config.(string)
if !ok {
errChan <- &pipeline.StepConfigurationError{StepName: info.Name, StepNum: stepNum}
}
for obj := range input {
if ok {
obj.ServerSideEncryption = &cfg
output <- obj
}
}
}

// PipelineRateLimit read objects from input and slow down pipeline processing speed to given rate (obj/sec).
//
// This filter read configuration from Step.Config and assert it type to uint type.
Expand Down
26 changes: 15 additions & 11 deletions storage/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,17 +155,21 @@ func (st *S3Storage) PutObject(obj *storage.Object) error {
rlReader := ratelimit.NewReadSeeker(objReader, st.rlBucket)

input := &s3.PutObjectInput{
Bucket: st.awsBucket,
Key: aws.String(st.prefix + *obj.Key),
Body: rlReader,
ContentType: obj.ContentType,
ContentDisposition: obj.ContentDisposition,
ContentEncoding: obj.ContentEncoding,
ContentLanguage: obj.ContentLanguage,
ACL: obj.ACL,
Metadata: obj.Metadata,
CacheControl: obj.CacheControl,
StorageClass: obj.StorageClass,
Bucket: st.awsBucket,
Key: aws.String(st.prefix + *obj.Key),
Body: rlReader,
ContentType: obj.ContentType,
ContentDisposition: obj.ContentDisposition,
ContentEncoding: obj.ContentEncoding,
ContentLanguage: obj.ContentLanguage,
ACL: obj.ACL,
Metadata: obj.Metadata,
CacheControl: obj.CacheControl,
StorageClass: obj.StorageClass,
}

if obj.ServerSideEncryption != nil {
input.ServerSideEncryption = aws.String(*obj.ServerSideEncryption)
}

if _, err := st.awsSvc.PutObjectWithContext(st.ctx, input); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions storage/s3stream/s3stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ func (st *S3StreamStorage) PutObject(obj *storage.Object) error {
StorageClass: obj.StorageClass,
}

if obj.ServerSideEncryption != nil {
input.ServerSideEncryption = aws.String(*obj.ServerSideEncryption)
}

if _, err := st.uploader.UploadWithContext(st.ctx, input); err != nil {
return err
}
Expand Down
35 changes: 18 additions & 17 deletions storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,24 @@ const (

// Object contain content and metadata of S3 object.
type Object struct {
Key *string `json:"-"`
ETag *string `json:"e_tag"`
Mtime *time.Time `json:"mtime"`
Content *[]byte `json:"-"`
ContentStream io.ReadCloser `json:"-"`
ContentLength *int64 `json:"-"`
ContentType *string `json:"content_type"`
ContentDisposition *string `json:"content_disposition"`
ContentEncoding *string `json:"content_encoding"`
ContentLanguage *string `json:"content_language"`
Metadata map[string]*string `json:"metadata"`
ACL *string `json:"acl"`
CacheControl *string `json:"cache_control"`
VersionId *string `json:"version_id"`
IsLatest *bool `json:"-"`
StorageClass *string `json:"storage_class"`
AccessControlPolicy *s3.AccessControlPolicy `json:"access_control_policy"`
Key *string `json:"-"`
ETag *string `json:"e_tag"`
Mtime *time.Time `json:"mtime"`
Content *[]byte `json:"-"`
ContentStream io.ReadCloser `json:"-"`
ContentLength *int64 `json:"-"`
ContentType *string `json:"content_type"`
ContentDisposition *string `json:"content_disposition"`
ContentEncoding *string `json:"content_encoding"`
ContentLanguage *string `json:"content_language"`
Metadata map[string]*string `json:"metadata"`
ACL *string `json:"acl"`
CacheControl *string `json:"cache_control"`
VersionId *string `json:"version_id"`
IsLatest *bool `json:"-"`
StorageClass *string `json:"storage_class"`
AccessControlPolicy *s3.AccessControlPolicy `json:"access_control_policy"`
ServerSideEncryption *string `json:"server_side_encryption"`
}

// Storage interface.
Expand Down

0 comments on commit 973ca1b

Please sign in to comment.