Skip to content

Commit 66a23ca

Browse files
committed
[content-service] Refactor upload to GCS
1 parent 2f266fd commit 66a23ca

File tree

4 files changed

+210
-219
lines changed

4 files changed

+210
-219
lines changed

Diff for: components/content-service-api/go/config/config.go

-1
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@ type GCPConfig struct {
9696
CredentialsFile string `json:"credentialsFile"`
9797
Region string `json:"region"`
9898
Project string `json:"projectId"`
99-
ParallelUpload int `json:"parallelUpload"`
10099

101100
MaximumBackupCount int `json:"maximumBackupCount"`
102101
}

Diff for: components/content-service/pkg/archive/tar.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -134,16 +134,17 @@ func ExtractTarbal(ctx context.Context, src io.Reader, dst string, opts ...TarOp
134134
// We need to remap the UID and GID between the host and the container to avoid permission issues.
135135
for _, p := range paths {
136136
v := m[p]
137-
uid := toHostID(v.UID, cfg.UIDMaps)
138-
gid := toHostID(v.GID, cfg.GIDMaps)
139137

140138
if v.IsSymlink {
141139
continue
142140
}
143141

142+
uid := toHostID(v.UID, cfg.UIDMaps)
143+
gid := toHostID(v.GID, cfg.GIDMaps)
144+
144145
err = remapFile(path.Join(dst, p), uid, gid, v.Xattrs)
145146
if err != nil {
146-
log.WithError(err).WithField("uid", uid).WithField("gid", gid).WithField("path", p).Warn("cannot chown")
147+
log.WithError(err).WithField("uid", uid).WithField("gid", gid).WithField("path", p).Debug("cannot chown")
147148
}
148149
}
149150

Diff for: components/content-service/pkg/storage/gcloud.go

+36-212
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,12 @@ import (
99
"encoding/hex"
1010
"errors"
1111
"fmt"
12-
"hash/crc32"
1312
"io"
14-
"math/rand"
1513
"net/http"
1614
"os"
1715
"path/filepath"
1816
"sort"
1917
"strings"
20-
"sync"
2118
"time"
2219

2320
"cloud.google.com/go/storage"
@@ -174,8 +171,8 @@ func (rs *DirectGCPStorage) defaultObjectAccess(ctx context.Context, bkt, obj st
174171
return nil, false, xerrors.Errorf("no gcloud client available - did you call Init()?")
175172
}
176173

177-
hdl := rs.client.Bucket(bkt).Object(obj)
178-
rc, err := hdl.NewReader(ctx)
174+
objHandle := rs.client.Bucket(bkt).Object(obj)
175+
rc, err := objHandle.NewReader(ctx)
179176
if err != nil {
180177
return nil, false, err
181178
}
@@ -346,104 +343,68 @@ func (rs *DirectGCPStorage) Upload(ctx context.Context, source string, name stri
346343
}
347344
defer sfn.Close()
348345

349-
var totalSize int64
350-
log.WithField("tasks", fmt.Sprintf("%d", rs.GCPConfig.ParallelUpload)).WithField("tmpfile", source).Debug("Uploading in parallel")
351346
stat, err := sfn.Stat()
352347
if err != nil {
353348
return
354349
}
355-
totalSize = stat.Size()
356-
span.SetTag("totalSize", totalSize)
357-
358-
uploadSpan := opentracing.StartSpan("remote-upload", opentracing.ChildOf(span.Context()))
359-
uploadSpan.SetTag("bucket", rs.bucketName())
360-
uploadSpan.SetTag("obj", rs.objectName(name))
361-
/* Read back from the file in chunks. We don't wand a complicated composition operation,
362-
* so we'll have 32 chunks max. See https://cloud.google.com/storage/docs/composite-objects
363-
* for more details.
364-
*/
365-
var chunks []string
366-
if chunks, err = rs.uploadChunks(opentracing.ContextWithSpan(ctx, uploadSpan), sfn, totalSize, rs.GCPConfig.ParallelUpload); err != nil {
367-
tracing.FinishSpan(uploadSpan, &err)
368-
return
369-
}
370-
defer func() {
371-
err := rs.deleteChunks(opentracing.ContextWithSpan(ctx, uploadSpan), chunks)
372-
if err != nil {
373-
log.WithError(err).WithField("name", name).Warn("cannot clean up upload chunks")
374-
}
375-
}()
376350

377-
log.WithField("workspaceId", rs.WorkspaceName).WithField("bucketName", rs.bucketName()).Debug("Uploaded chunks")
351+
totalSize := stat.Size()
352+
span.SetTag("totalSize", totalSize)
378353

379-
// compose the uploaded chunks
380354
bucket = rs.bucketName()
381355
bkt := rs.client.Bucket(bucket)
382-
src := make([]*gcpstorage.ObjectHandle, len(chunks))
383-
for i := 0; i < len(chunks); i++ {
384-
src[i] = bkt.Object(chunks[i])
385-
}
356+
386357
object = rs.objectName(name)
387358
obj := bkt.Object(object)
388359

360+
uploadSpan := opentracing.StartSpan("remote-upload", opentracing.ChildOf(span.Context()))
361+
uploadSpan.SetTag("bucket", bucket)
362+
uploadSpan.SetTag("obj", object)
363+
389364
var firstBackup bool
390365
if _, e := obj.Attrs(ctx); e == gcpstorage.ErrObjectNotExist {
391366
firstBackup = true
392367
}
393-
// maintain backup trail if we're asked to - we do this prior to overwriting the regular backup file
394-
// to make sure we're trailign the previous backup.
395-
if options.BackupTrail.Enabled && !firstBackup {
396-
err := rs.trailBackup(ctx, bkt, obj, options.BackupTrail.ThisBackupID, options.BackupTrail.TrailLength)
397-
if err != nil {
398-
log.WithError(err).Error("cannot maintain backup trail")
399-
}
368+
369+
wc := obj.NewWriter(ctx)
370+
371+
if options.ContentType != "" {
372+
wc.ContentType = options.ContentType
400373
}
401374

402-
// now that the upload is complete and the backup trail has been created, compose the chunks to
403-
// create the actual backup
404-
_, err = obj.ComposerFrom(src...).Run(ctx)
405-
if err != nil {
406-
tracing.FinishSpan(uploadSpan, &err)
407-
return
375+
if options.Annotations != nil {
376+
wc.Metadata = options.Annotations
408377
}
409-
attrs, err := obj.Update(ctx, gcpstorage.ObjectAttrsToUpdate{
410-
ContentType: options.ContentType,
411-
Metadata: options.Annotations,
412-
})
378+
379+
written, err := io.Copy(wc, sfn)
413380
if err != nil {
414-
tracing.FinishSpan(uploadSpan, &err)
381+
defer wc.Close()
382+
log.WithError(err).WithField("name", name).Error("Error while uploading file")
415383
return
416384
}
417-
log.WithField("chunkCount", fmt.Sprintf("%d", len(chunks))).Debug("Composited chunks")
418-
uploadSpan.Finish()
419385

420-
// compare the MD5 sum of the composited object with the local tar file
421-
remotehash := attrs.CRC32C
422-
_, err = sfn.Seek(0, 0)
386+
err = wc.Close()
423387
if err != nil {
424-
log.WithError(err).Debug("cannot compute local checksum")
425-
426-
// us being unable to produce the local checksum is not enough of a reason to fail the upload
427-
// altogether. We did upload something after all.
428-
err = nil
388+
log.WithError(err).WithField("name", name).Error("Error while uploading file")
429389
return
430390
}
431-
var localhash uint32
432-
h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
433-
_, err = io.Copy(h, sfn)
434-
if err != nil {
435-
log.WithError(err).Debug("cannot compute local checksum")
436-
} else {
437-
localhash = h.Sum32()
391+
392+
if written != totalSize {
393+
err = xerrors.Errorf("Wrote fewer bytes than it should have, %d instead of %d", written, totalSize)
394+
return
438395
}
439-
if remotehash == 0 || localhash == 0 {
440-
log.WithField("remotehash", remotehash).WithField("localhash", localhash).Debug("one of the checksums is empty - not comparing")
441-
} else if remotehash == localhash {
442-
log.WithField("remotehash", remotehash).WithField("localhash", localhash).Debug("checksums match")
443-
} else {
444-
log.WithField("remotehash", remotehash).WithField("localhash", localhash).Debug("checksums do not match")
396+
397+
// maintain backup trail if we're asked to - we do this prior to overwriting the regular backup file
398+
// to make sure we're trailign the previous backup.
399+
if options.BackupTrail.Enabled && !firstBackup {
400+
err := rs.trailBackup(ctx, bkt, obj, options.BackupTrail.ThisBackupID, options.BackupTrail.TrailLength)
401+
if err != nil {
402+
log.WithError(err).Error("cannot maintain backup trail")
403+
}
445404
}
446405

406+
uploadSpan.Finish()
407+
447408
err = nil
448409
return
449410
}
@@ -480,133 +441,6 @@ func (rs *DirectGCPStorage) ensureBackupSlotAvailable() error {
480441
return nil
481442
}
482443

483-
func (rs *DirectGCPStorage) uploadChunks(ctx context.Context, f io.ReaderAt, totalSize int64, desiredChunkCount int) (chnks []string, err error) {
484-
//nolint:ineffassign
485-
span, ctx := opentracing.StartSpanFromContext(ctx, "uploadChunks")
486-
defer tracing.FinishSpan(span, &err)
487-
488-
if totalSize == 0 {
489-
return []string{}, xerrors.Errorf("Total size must be greater than zero")
490-
}
491-
if desiredChunkCount < 1 {
492-
return []string{}, xerrors.Errorf("Desired chunk count must be greater (or equal to) one")
493-
}
494-
495-
minChunkSize := int64(256 * 1024)
496-
chunkSize := totalSize / int64(desiredChunkCount)
497-
chunkSize = (chunkSize / minChunkSize) * minChunkSize
498-
if chunkSize < minChunkSize {
499-
chunkSize = minChunkSize
500-
}
501-
chunkCount := int(totalSize/chunkSize) + 1
502-
503-
log.WithField("count", chunkCount).WithField("chunkSize", chunkSize).WithField("totalSize", totalSize).Debug("Computed chunk size")
504-
505-
pfx := fmt.Sprintf("uploads/%s", randomString(20))
506-
507-
// sync construct taken from https://play.golang.org/p/mqUvKFDQbfn
508-
errChannel := make(chan error, 1)
509-
chunks := make([]string, chunkCount)
510-
wg := sync.WaitGroup{}
511-
512-
// we need to add ourselves to the working group here, and not in the
513-
// go routines, as they might start after the "finished" go routine.
514-
wg.Add(chunkCount)
515-
516-
for i := 0; i < chunkCount; i++ {
517-
off := int64(i) * chunkSize
518-
n := chunkSize
519-
if off+n > totalSize {
520-
n = totalSize - off
521-
}
522-
r := io.NewSectionReader(f, off, n)
523-
chunkName := fmt.Sprintf("%s/%d-upload", pfx, i)
524-
chunks[i] = chunkName
525-
526-
go rs.uploadChunk(opentracing.ContextWithSpan(ctx, span), chunkName, r, n, &wg, errChannel)
527-
}
528-
529-
// Put the wait group in a go routine.
530-
// By putting the wait group in the go routine we ensure either all pass
531-
// and we close the "finished" channel or we wait forever for the wait group
532-
// to finish.
533-
//
534-
// Waiting forever is okay because of the blocking select below.
535-
finished := make(chan bool, 1)
536-
go func() {
537-
wg.Wait()
538-
close(finished)
539-
}()
540-
541-
// This select will block until one of the two channels returns a value.
542-
// This means on the first failure in the go routines above the errChannel will release a
543-
// value first. Because there is a "return" statement in the err check this function will
544-
// exit when an error occurs.
545-
//
546-
// Due to the blocking on wg.Wait() the finished channel will not get a value unless all
547-
// the go routines before were successful because not all the wg.Done() calls would have
548-
// happened.
549-
select {
550-
case <-finished:
551-
log.Debug("Finished uploading")
552-
case err := <-errChannel:
553-
log.WithError(err).Debug("Error while uploading chunks")
554-
if err != nil {
555-
// already logged in uploadChunk
556-
return []string{}, err
557-
}
558-
}
559-
560-
return chunks, nil
561-
}
562-
563-
func (rs *DirectGCPStorage) uploadChunk(ctx context.Context, name string, r io.Reader, size int64, wg *sync.WaitGroup, errchan chan error) {
564-
//nolint:ineffassign
565-
span, ctx := opentracing.StartSpanFromContext(ctx, "uploadChunk")
566-
span.SetTag("size", size)
567-
defer span.Finish()
568-
569-
defer wg.Done()
570-
571-
start := time.Now()
572-
log.WithField("name", name).WithField("size", fmt.Sprintf("%d", size)).Debug("Uploading chunk")
573-
574-
wc := rs.client.Bucket(rs.bucketName()).Object(name).NewWriter(ctx)
575-
defer wc.Close()
576-
577-
written, err := io.Copy(wc, r)
578-
if err != nil {
579-
log.WithError(err).WithField("name", name).Error("Error while uploading chunk")
580-
errchan <- err
581-
return
582-
}
583-
if written != size {
584-
err := xerrors.Errorf("Wrote fewer bytes than it should have, %d instead of %d", written, size)
585-
log.WithError(err).WithField("name", name).Error("Error while uploading chunk")
586-
errchan <- err
587-
return
588-
}
589-
590-
log.WithField("name", name).WithField("duration", time.Since(start)).Debug("Upload complete")
591-
}
592-
593-
func (rs *DirectGCPStorage) deleteChunks(ctx context.Context, chunks []string) (err error) {
594-
//nolint:ineffassign
595-
span, ctx := opentracing.StartSpanFromContext(ctx, "deleteChunks")
596-
defer tracing.FinishSpan(span, &err)
597-
598-
for i := 0; i < len(chunks); i++ {
599-
err = rs.client.Bucket(rs.bucketName()).Object(chunks[i]).Delete(ctx)
600-
}
601-
602-
if err != nil {
603-
log.WithError(err).Error("Error while deleting chunks")
604-
return err
605-
}
606-
607-
return nil
608-
}
609-
610444
func (rs *DirectGCPStorage) trailBackup(ctx context.Context, bkt *gcpstorage.BucketHandle, obj *gcpstorage.ObjectHandle, backupID string, trailLength int) (err error) {
611445
//nolint:ineffassign
612446
span, ctx := opentracing.StartSpanFromContext(ctx, "uploadChunk")
@@ -653,16 +487,6 @@ func (rs *DirectGCPStorage) trailBackup(ctx context.Context, bkt *gcpstorage.Buc
653487
return nil
654488
}
655489

656-
func randomString(len int) string {
657-
min := 97
658-
max := 122
659-
bytes := make([]byte, len)
660-
for i := 0; i < len; i++ {
661-
bytes[i] = byte(min + rand.Intn(max-min))
662-
}
663-
return string(bytes)
664-
}
665-
666490
func (rs *DirectGCPStorage) bucketName() string {
667491
return gcpBucketName(rs.Stage, rs.Username)
668492
}

0 commit comments

Comments
 (0)