Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Made the parallel downloads go routines running without synchronization #2287

Merged
merged 18 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions internal/cache/data/object_range.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package data

// ObjectRange specifies the range within the gcs object.
type ObjectRange struct {
vadlakondaswetha marked this conversation as resolved.
Show resolved Hide resolved
Start int64
End int64
}
20 changes: 7 additions & 13 deletions internal/cache/file/downloader/jm_parallel_downloads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package downloader
import (
"context"
"crypto/rand"
"math"
"os"
"path"
"testing"
Expand All @@ -31,6 +30,7 @@ import (
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/storageutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func createObjectInBucket(t *testing.T, objPath string, objSize int64, bucket gcs.Bucket) []byte {
Expand Down Expand Up @@ -98,31 +98,25 @@ func TestParallelDownloads(t *testing.T) {
parallelDownloadsPerFile int64
maxParallelDownloads int64
downloadOffset int64
expectedOffset int64
subscribedOffset int64
}{
{
name: "download in chunks of concurrency * readReqSize",
name: "download the entire object when object size > no of goroutines * readReqSize",
objectSize: 15 * util.MiB,
readReqSize: 3,
parallelDownloadsPerFile: math.MaxInt,
parallelDownloadsPerFile: 100,
vadlakondaswetha marked this conversation as resolved.
Show resolved Hide resolved
maxParallelDownloads: 3,
subscribedOffset: 7,
downloadOffset: 10,
// Concurrency can go to (maxParallelDownloads + 1) in case
// parallelDownloadsPerFile > maxParallelDownloads because we always
// spawn a minimum of 1 go routine per async job.
expectedOffset: 12 * util.MiB,
},
{
name: "download only upto the object size",
objectSize: 10 * util.MiB,
readReqSize: 4,
parallelDownloadsPerFile: math.MaxInt,
parallelDownloadsPerFile: 100,
maxParallelDownloads: 3,
subscribedOffset: 7,
downloadOffset: 10,
expectedOffset: 10 * util.MiB,
},
}
for _, tc := range tbl {
Expand All @@ -145,9 +139,9 @@ func TestParallelDownloads(t *testing.T) {
select {
case jobStatus := <-subscriberC:
if assert.Nil(t, err) {
assert.Equal(t, tc.expectedOffset, jobStatus.Offset)
require.GreaterOrEqual(t, tc.objectSize, jobStatus.Offset)
verifyFileTillOffset(t,
data.FileSpec{Path: util.GetDownloadPath(path.Join(cacheDir, storage.TestBucketName), "path/in/gcs/foo.txt"), FilePerm: util.DefaultFilePerm, DirPerm: util.DefaultDirPerm}, tc.expectedOffset,
data.FileSpec{Path: util.GetDownloadPath(path.Join(cacheDir, storage.TestBucketName), "path/in/gcs/foo.txt"), FilePerm: util.DefaultFilePerm, DirPerm: util.DefaultDirPerm}, jobStatus.Offset,
vadlakondaswetha marked this conversation as resolved.
Show resolved Hide resolved
content)
}
return
Expand All @@ -168,7 +162,7 @@ func TestMultipleConcurrentDownloads(t *testing.T) {
minObj1, content1 := createObjectInStoreAndInitCache(t, cache, bucket, "path/in/gcs/foo.txt", 10*util.MiB)
minObj2, content2 := createObjectInStoreAndInitCache(t, cache, bucket, "path/in/gcs/bar.txt", 5*util.MiB)
jm := NewJobManager(cache, util.DefaultFilePerm, util.DefaultDirPerm, cacheDir, 2, &cfg.FileCacheConfig{EnableParallelDownloads: true,
ParallelDownloadsPerFile: math.MaxInt, DownloadChunkSizeMb: 2, EnableCrc: true, MaxParallelDownloads: 2})
ParallelDownloadsPerFile: 100, DownloadChunkSizeMb: 2, EnableCrc: true, MaxParallelDownloads: 2})
job1 := jm.CreateJobIfNotExists(&minObj1, bucket)
job2 := jm.CreateJobIfNotExists(&minObj2, bucket)
s1 := job1.subscribe(10 * util.MiB)
Expand Down
12 changes: 9 additions & 3 deletions internal/cache/file/downloader/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ type Job struct {
// This semaphore is shared across all jobs spawned by the job manager and is
// used to limit the download concurrency.
maxParallelismSem *semaphore.Weighted

// Channel which is used by goroutines to know which ranges need to be
// downloaded when parallel download is enabled.
rangeChan chan data.ObjectRange
}

// JobStatus represents the status of job.
Expand Down Expand Up @@ -249,8 +253,9 @@ func (job *Job) updateStatusAndNotifySubscribers(statusName jobStatusName, statu
}

// updateStatusOffset updates the offset in job's status and in file info cache
// with the given offset. If the the update is successful, this function also
// with the given offset. If the update is successful, this function also
// notify the subscribers.
// Not concurrency safe and requires LOCK(job.mu)
func (job *Job) updateStatusOffset(downloadedOffset int64) (err error) {
fileInfoKey := data.FileInfoKey{
BucketName: job.bucket.Name(),
Expand All @@ -267,8 +272,7 @@ func (job *Job) updateStatusOffset(downloadedOffset int64) (err error) {
Key: fileInfoKey, ObjectGeneration: job.object.Generation,
FileSize: job.object.Size, Offset: uint64(downloadedOffset),
}
job.mu.Lock()
defer job.mu.Unlock()

err = job.fileInfoCache.UpdateWithoutChangingOrder(fileInfoKeyName, updatedFileInfo)
if err == nil {
job.status.Offset = downloadedOffset
Expand Down Expand Up @@ -338,7 +342,9 @@ func (job *Job) downloadObjectToFile(cacheFile *os.File) (err error) {
newReader = nil
}

job.mu.Lock()
err = job.updateStatusOffset(start)
job.mu.Unlock()
if err != nil {
return err
}
Expand Down
176 changes: 139 additions & 37 deletions internal/cache/file/downloader/parallel_downloads_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ package downloader

import (
"context"
"errors"
"fmt"
"io"
"os"

"github.com/googlecloudplatform/gcsfuse/v2/internal/cache/data"
cacheutil "github.com/googlecloudplatform/gcsfuse/v2/internal/cache/util"
"github.com/googlecloudplatform/gcsfuse/v2/internal/logger"
"github.com/googlecloudplatform/gcsfuse/v2/internal/monitor"
Expand Down Expand Up @@ -67,52 +69,152 @@ func (job *Job) downloadRange(ctx context.Context, dstWriter io.Writer, start, e
return err
}

// RangeMap maintains the ranges downloaded by the different goroutines. This
vadlakondaswetha marked this conversation as resolved.
Show resolved Hide resolved
// function takes a new range and merges with existing ranges if they are continuous.
// If the start offset is 0, updates the job's status offset.
//
// Eg:
// Input: rangeMap entries 0-3, 5-6. New input 7-8.
// Output: rangeMap entries 0-3, 5-8.
func (job *Job) updateRangeMap(rangeMap map[int64]int64, offsetStart int64, offsetEnd int64) error {
// Check if the chunk downloaded completes a range [0, R) and find that
// R.
job.mu.Lock()
defer job.mu.Unlock()

finalStart := offsetStart
finalEnd := offsetEnd

if offsetStart != 0 {
leftStart, exist := rangeMap[offsetStart]
if exist {
finalStart = leftStart
delete(rangeMap, offsetStart)
delete(rangeMap, leftStart)
}
}

rightEnd, exist := rangeMap[offsetEnd]
if exist {
finalEnd = rightEnd
delete(rangeMap, offsetEnd)
delete(rangeMap, rightEnd)
}

rangeMap[finalStart] = finalEnd
rangeMap[finalEnd] = finalStart

if finalStart == 0 {
if updateErr := job.updateStatusOffset(finalEnd); updateErr != nil {
return updateErr
}
}

return nil
}

// Reads the range input from the range channel continuously and downloads that
// range from the GCS. If the range channel is closed, it will exit.
func (job *Job) downloadOffsets(ctx context.Context, goroutineIndex int64, cacheFile *os.File, rangeMap map[int64]int64) func() error {
return func() error {
// Since we keep a goroutine for each job irrespective of the maxParallelism,
// not releasing the default goroutine to the pool.
if goroutineIndex > 0 {
defer job.maxParallelismSem.Release(1)
}

for {
// Read the offset to be downloaded from the channel.
objectRange, ok := <-job.rangeChan
if !ok {
// In case channel is closed return.
return nil
}

offsetWriter := io.NewOffsetWriter(cacheFile, int64(objectRange.Start))
err := job.downloadRange(ctx, offsetWriter, objectRange.Start, objectRange.End)
if err != nil {
return err
}

err = job.updateRangeMap(rangeMap, objectRange.Start, objectRange.End)
if err != nil {
return err
}
}
}
}

// parallelDownloadObjectToFile does parallel download of the backing GCS object
// into given file handle using multiple NewReader method of gcs.Bucket running
// in parallel. This function is canceled if job.cancelCtx is canceled.
func (job *Job) parallelDownloadObjectToFile(cacheFile *os.File) (err error) {
var start, end int64
end = int64(job.object.Size)
var parallelReadRequestSize = int64(job.fileCacheConfig.DownloadChunkSizeMb) * cacheutil.MiB

// Each iteration of this for loop downloads job.fileCacheConfig.DownloadChunkSizeMb * job.fileCacheConfig.ParallelDownloadsPerFile
// size of range of object from GCS into given file handle and updates the
// file info cache.
for start < end {
downloadErrGroup, downloadErrGroupCtx := errgroup.WithContext(job.cancelCtx)

for goRoutineIdx := int64(0); (goRoutineIdx < job.fileCacheConfig.ParallelDownloadsPerFile) && (start < end); goRoutineIdx++ {
rangeStart := start
rangeEnd := min(rangeStart+parallelReadRequestSize, end)
currGoRoutineIdx := goRoutineIdx

// Respect max download parallelism only beyond first go routine.
if currGoRoutineIdx > 0 && !job.maxParallelismSem.TryAcquire(1) {
break
}
rangeMap := make(map[int64]int64)
// Trying to keep the channel size greater than ParallelDownloadsPerFile to ensure
// that there is no goroutine waiting for data(nextRange) to be published to channel.
job.rangeChan = make(chan data.ObjectRange, 2*job.fileCacheConfig.ParallelDownloadsPerFile)
var numGoRoutines int64
var start int64
downloadChunkSize := job.fileCacheConfig.DownloadChunkSizeMb * cacheutil.MiB
downloadErrGroup, downloadErrGroupCtx := errgroup.WithContext(job.cancelCtx)

downloadErrGroup.Go(func() error {
if currGoRoutineIdx > 0 {
defer job.maxParallelismSem.Release(1)
}
// Copy the contents from NewReader to cache file at appropriate offset.
offsetWriter := io.NewOffsetWriter(cacheFile, rangeStart)
return job.downloadRange(downloadErrGroupCtx, offsetWriter, rangeStart, rangeEnd)
})

start = rangeEnd
// Start the goroutines as per the config and the availability.
for numGoRoutines = 0; (numGoRoutines < job.fileCacheConfig.ParallelDownloadsPerFile) && (start < int64(job.object.Size)); numGoRoutines++ {
// Respect max download parallelism only beyond first go routine.
if numGoRoutines > 0 && !job.maxParallelismSem.TryAcquire(1) {
break
}

// If any of the go routines failed, consider the async job failed.
err = downloadErrGroup.Wait()
if err != nil {
return
downloadErrGroup.Go(job.downloadOffsets(downloadErrGroupCtx, numGoRoutines, cacheFile, rangeMap))
start = start + downloadChunkSize
}

for start = 0; start < int64(job.object.Size); {
nextRange := data.ObjectRange{
Start: start,
End: min(int64(job.object.Size), start+downloadChunkSize),
}

err = job.updateStatusOffset(start)
if err != nil {
return err
select {
case job.rangeChan <- nextRange:
start = nextRange.End
// In case we haven't started the goroutines as per the config, checking
// if any goroutines are available now.
// This may not be the ideal way, but since we don't have any way of
// listening if goroutines from other jobs have freed up, checking it here.
for numGoRoutines < job.fileCacheConfig.ParallelDownloadsPerFile && job.maxParallelismSem.TryAcquire(1) {
downloadErrGroup.Go(job.downloadOffsets(downloadErrGroupCtx, numGoRoutines, cacheFile, rangeMap))
numGoRoutines++
}
case <-downloadErrGroupCtx.Done():
return job.handleJobCompletion(downloadErrGroupCtx, downloadErrGroup)
}
}
return

return job.handleJobCompletion(downloadErrGroupCtx, downloadErrGroup)
}

// Job can be success or failure. This method will handle all the scenarios and
// return the appropriate error.
func (job *Job) handleJobCompletion(ctx context.Context, group *errgroup.Group) error {
// Close the channel since we are ending the job.
close(job.rangeChan)

// First check if the context has reported any error. This is to handle scenario
// where context is cancelled and no goroutines are running.
err := ctx.Err()
if err != nil {
// Ideally not required, but this is an additional check to ensure that
// no goroutines are running.
waitErr := group.Wait()
sethiay marked this conversation as resolved.
Show resolved Hide resolved
return errors.Join(err, waitErr)
}

// If any of the go routines failed, consider the async job failed.
err = group.Wait()
if err != nil {
return err
}

return nil
}
Loading
Loading