Skip to content

Commit

Permalink
[content-service] Refactor upload to GCS
Browse files Browse the repository at this point in the history
  • Loading branch information
aledbf committed May 6, 2022
1 parent 2f266fd commit c3f442d
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 214 deletions.
1 change: 0 additions & 1 deletion components/content-service-api/go/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ type GCPConfig struct {
CredentialsFile string `json:"credentialsFile"`
Region string `json:"region"`
Project string `json:"projectId"`
ParallelUpload int `json:"parallelUpload"`

MaximumBackupCount int `json:"maximumBackupCount"`
}
Expand Down
244 changes: 34 additions & 210 deletions components/content-service/pkg/storage/gcloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,12 @@ import (
"encoding/hex"
"errors"
"fmt"
"hash/crc32"
"io"
"math/rand"
"net/http"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"

"cloud.google.com/go/storage"
Expand Down Expand Up @@ -346,104 +343,68 @@ func (rs *DirectGCPStorage) Upload(ctx context.Context, source string, name stri
}
defer sfn.Close()

var totalSize int64
log.WithField("tasks", fmt.Sprintf("%d", rs.GCPConfig.ParallelUpload)).WithField("tmpfile", source).Debug("Uploading in parallel")
stat, err := sfn.Stat()
if err != nil {
return
}
totalSize = stat.Size()
span.SetTag("totalSize", totalSize)

uploadSpan := opentracing.StartSpan("remote-upload", opentracing.ChildOf(span.Context()))
uploadSpan.SetTag("bucket", rs.bucketName())
uploadSpan.SetTag("obj", rs.objectName(name))
/* Read back from the file in chunks. We don't wand a complicated composition operation,
* so we'll have 32 chunks max. See https://cloud.google.com/storage/docs/composite-objects
* for more details.
*/
var chunks []string
if chunks, err = rs.uploadChunks(opentracing.ContextWithSpan(ctx, uploadSpan), sfn, totalSize, rs.GCPConfig.ParallelUpload); err != nil {
tracing.FinishSpan(uploadSpan, &err)
return
}
defer func() {
err := rs.deleteChunks(opentracing.ContextWithSpan(ctx, uploadSpan), chunks)
if err != nil {
log.WithError(err).WithField("name", name).Warn("cannot clean up upload chunks")
}
}()

log.WithField("workspaceId", rs.WorkspaceName).WithField("bucketName", rs.bucketName()).Debug("Uploaded chunks")
totalSize := stat.Size()
span.SetTag("totalSize", totalSize)

// compose the uploaded chunks
bucket = rs.bucketName()
bkt := rs.client.Bucket(bucket)
src := make([]*gcpstorage.ObjectHandle, len(chunks))
for i := 0; i < len(chunks); i++ {
src[i] = bkt.Object(chunks[i])
}

object = rs.objectName(name)
obj := bkt.Object(object)

uploadSpan := opentracing.StartSpan("remote-upload", opentracing.ChildOf(span.Context()))
uploadSpan.SetTag("bucket", bucket)
uploadSpan.SetTag("obj", object)

var firstBackup bool
if _, e := obj.Attrs(ctx); e == gcpstorage.ErrObjectNotExist {
firstBackup = true
}
// maintain backup trail if we're asked to - we do this prior to overwriting the regular backup file
// to make sure we're trailign the previous backup.
if options.BackupTrail.Enabled && !firstBackup {
err := rs.trailBackup(ctx, bkt, obj, options.BackupTrail.ThisBackupID, options.BackupTrail.TrailLength)
if err != nil {
log.WithError(err).Error("cannot maintain backup trail")
}

wc := obj.NewWriter(ctx)

if options.ContentType != "" {
wc.ContentType = options.ContentType
}

// now that the upload is complete and the backup trail has been created, compose the chunks to
// create the actual backup
_, err = obj.ComposerFrom(src...).Run(ctx)
if err != nil {
tracing.FinishSpan(uploadSpan, &err)
return
if options.Annotations != nil {
wc.Metadata = options.Annotations
}
attrs, err := obj.Update(ctx, gcpstorage.ObjectAttrsToUpdate{
ContentType: options.ContentType,
Metadata: options.Annotations,
})

written, err := io.Copy(wc, sfn)
if err != nil {
tracing.FinishSpan(uploadSpan, &err)
defer wc.Close()
log.WithError(err).WithField("name", name).Error("Error while uploading file")
return
}
log.WithField("chunkCount", fmt.Sprintf("%d", len(chunks))).Debug("Composited chunks")
uploadSpan.Finish()

// compare the MD5 sum of the composited object with the local tar file
remotehash := attrs.CRC32C
_, err = sfn.Seek(0, 0)
err = wc.Close()
if err != nil {
log.WithError(err).Debug("cannot compute local checksum")

// us being unable to produce the local checksum is not enough of a reason to fail the upload
// altogether. We did upload something after all.
err = nil
log.WithError(err).WithField("name", name).Error("Error while uploading file")
return
}
var localhash uint32
h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
_, err = io.Copy(h, sfn)
if err != nil {
log.WithError(err).Debug("cannot compute local checksum")
} else {
localhash = h.Sum32()

if written != totalSize {
err = xerrors.Errorf("Wrote fewer bytes than it should have, %d instead of %d", written, totalSize)
return
}
if remotehash == 0 || localhash == 0 {
log.WithField("remotehash", remotehash).WithField("localhash", localhash).Debug("one of the checksums is empty - not comparing")
} else if remotehash == localhash {
log.WithField("remotehash", remotehash).WithField("localhash", localhash).Debug("checksums match")
} else {
log.WithField("remotehash", remotehash).WithField("localhash", localhash).Debug("checksums do not match")

// maintain backup trail if we're asked to - we do this prior to overwriting the regular backup file
// to make sure we're trailign the previous backup.
if options.BackupTrail.Enabled && !firstBackup {
err := rs.trailBackup(ctx, bkt, obj, options.BackupTrail.ThisBackupID, options.BackupTrail.TrailLength)
if err != nil {
log.WithError(err).Error("cannot maintain backup trail")
}
}

uploadSpan.Finish()

err = nil
return
}
Expand Down Expand Up @@ -480,133 +441,6 @@ func (rs *DirectGCPStorage) ensureBackupSlotAvailable() error {
return nil
}

func (rs *DirectGCPStorage) uploadChunks(ctx context.Context, f io.ReaderAt, totalSize int64, desiredChunkCount int) (chnks []string, err error) {
//nolint:ineffassign
span, ctx := opentracing.StartSpanFromContext(ctx, "uploadChunks")
defer tracing.FinishSpan(span, &err)

if totalSize == 0 {
return []string{}, xerrors.Errorf("Total size must be greater than zero")
}
if desiredChunkCount < 1 {
return []string{}, xerrors.Errorf("Desired chunk count must be greater (or equal to) one")
}

minChunkSize := int64(256 * 1024)
chunkSize := totalSize / int64(desiredChunkCount)
chunkSize = (chunkSize / minChunkSize) * minChunkSize
if chunkSize < minChunkSize {
chunkSize = minChunkSize
}
chunkCount := int(totalSize/chunkSize) + 1

log.WithField("count", chunkCount).WithField("chunkSize", chunkSize).WithField("totalSize", totalSize).Debug("Computed chunk size")

pfx := fmt.Sprintf("uploads/%s", randomString(20))

// sync construct taken from https://play.golang.org/p/mqUvKFDQbfn
errChannel := make(chan error, 1)
chunks := make([]string, chunkCount)
wg := sync.WaitGroup{}

// we need to add ourselves to the working group here, and not in the
// go routines, as they might start after the "finished" go routine.
wg.Add(chunkCount)

for i := 0; i < chunkCount; i++ {
off := int64(i) * chunkSize
n := chunkSize
if off+n > totalSize {
n = totalSize - off
}
r := io.NewSectionReader(f, off, n)
chunkName := fmt.Sprintf("%s/%d-upload", pfx, i)
chunks[i] = chunkName

go rs.uploadChunk(opentracing.ContextWithSpan(ctx, span), chunkName, r, n, &wg, errChannel)
}

// Put the wait group in a go routine.
// By putting the wait group in the go routine we ensure either all pass
// and we close the "finished" channel or we wait forever for the wait group
// to finish.
//
// Waiting forever is okay because of the blocking select below.
finished := make(chan bool, 1)
go func() {
wg.Wait()
close(finished)
}()

// This select will block until one of the two channels returns a value.
// This means on the first failure in the go routines above the errChannel will release a
// value first. Because there is a "return" statement in the err check this function will
// exit when an error occurs.
//
// Due to the blocking on wg.Wait() the finished channel will not get a value unless all
// the go routines before were successful because not all the wg.Done() calls would have
// happened.
select {
case <-finished:
log.Debug("Finished uploading")
case err := <-errChannel:
log.WithError(err).Debug("Error while uploading chunks")
if err != nil {
// already logged in uploadChunk
return []string{}, err
}
}

return chunks, nil
}

func (rs *DirectGCPStorage) uploadChunk(ctx context.Context, name string, r io.Reader, size int64, wg *sync.WaitGroup, errchan chan error) {
//nolint:ineffassign
span, ctx := opentracing.StartSpanFromContext(ctx, "uploadChunk")
span.SetTag("size", size)
defer span.Finish()

defer wg.Done()

start := time.Now()
log.WithField("name", name).WithField("size", fmt.Sprintf("%d", size)).Debug("Uploading chunk")

wc := rs.client.Bucket(rs.bucketName()).Object(name).NewWriter(ctx)
defer wc.Close()

written, err := io.Copy(wc, r)
if err != nil {
log.WithError(err).WithField("name", name).Error("Error while uploading chunk")
errchan <- err
return
}
if written != size {
err := xerrors.Errorf("Wrote fewer bytes than it should have, %d instead of %d", written, size)
log.WithError(err).WithField("name", name).Error("Error while uploading chunk")
errchan <- err
return
}

log.WithField("name", name).WithField("duration", time.Since(start)).Debug("Upload complete")
}

func (rs *DirectGCPStorage) deleteChunks(ctx context.Context, chunks []string) (err error) {
//nolint:ineffassign
span, ctx := opentracing.StartSpanFromContext(ctx, "deleteChunks")
defer tracing.FinishSpan(span, &err)

for i := 0; i < len(chunks); i++ {
err = rs.client.Bucket(rs.bucketName()).Object(chunks[i]).Delete(ctx)
}

if err != nil {
log.WithError(err).Error("Error while deleting chunks")
return err
}

return nil
}

func (rs *DirectGCPStorage) trailBackup(ctx context.Context, bkt *gcpstorage.BucketHandle, obj *gcpstorage.ObjectHandle, backupID string, trailLength int) (err error) {
//nolint:ineffassign
span, ctx := opentracing.StartSpanFromContext(ctx, "uploadChunk")
Expand Down Expand Up @@ -653,16 +487,6 @@ func (rs *DirectGCPStorage) trailBackup(ctx context.Context, bkt *gcpstorage.Buc
return nil
}

func randomString(len int) string {
min := 97
max := 122
bytes := make([]byte, len)
for i := 0; i < len; i++ {
bytes[i] = byte(min + rand.Intn(max-min))
}
return string(bytes)
}

func (rs *DirectGCPStorage) bucketName() string {
return gcpBucketName(rs.Stage, rs.Username)
}
Expand Down
Loading

0 comments on commit c3f442d

Please sign in to comment.