Skip to content

Commit

Permalink
Made the parallel downloads go routines running without synchronizati…
Browse files Browse the repository at this point in the history
…on (#2287)

* made the parallel go routines running without synchronization

# Conflicts:
#	internal/cache/file/downloader/parallel_downloads_job.go

* added print statement

* fixing tests

# Conflicts:
#	internal/cache/file/downloader/jm_parallel_downloads_test.go

* added changes to update the marker file

* reverting the test change

* fixing the test

* Fixing the integration tests

* Fixing the test

* releasign the goroutine

* fixing tests

# Conflicts:
#	internal/cache/file/downloader/jm_parallel_downloads_test.go

* Code review comments

* comments fixed

* Added unit tests for updateRangeMap method

* code review comments

* code review comments

* changed to require

* code review comments

* adding comment for public struct
  • Loading branch information
vadlakondaswetha authored and Tulsishah committed Aug 13, 2024
1 parent 187ffae commit 5cdab7a
Show file tree
Hide file tree
Showing 6 changed files with 278 additions and 85 deletions.
7 changes: 7 additions & 0 deletions internal/cache/data/object_range.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package data

// ObjectRange specifies the range within the gcs object.
type ObjectRange struct {
Start int64
End int64
}
20 changes: 7 additions & 13 deletions internal/cache/file/downloader/jm_parallel_downloads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package downloader
import (
"context"
"crypto/rand"
"math"
"os"
"path"
"testing"
Expand All @@ -31,6 +30,7 @@ import (
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/storageutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func createObjectInBucket(t *testing.T, objPath string, objSize int64, bucket gcs.Bucket) []byte {
Expand Down Expand Up @@ -98,31 +98,25 @@ func TestParallelDownloads(t *testing.T) {
parallelDownloadsPerFile int64
maxParallelDownloads int64
downloadOffset int64
expectedOffset int64
subscribedOffset int64
}{
{
name: "download in chunks of concurrency * readReqSize",
name: "download the entire object when object size > no of goroutines * readReqSize",
objectSize: 15 * util.MiB,
readReqSize: 3,
parallelDownloadsPerFile: math.MaxInt,
parallelDownloadsPerFile: 100,
maxParallelDownloads: 3,
subscribedOffset: 7,
downloadOffset: 10,
// Concurrency can go to (maxParallelDownloads + 1) in case
// parallelDownloadsPerFile > maxParallelDownloads because we always
// spawn a minimum of 1 go routine per async job.
expectedOffset: 12 * util.MiB,
},
{
name: "download only upto the object size",
objectSize: 10 * util.MiB,
readReqSize: 4,
parallelDownloadsPerFile: math.MaxInt,
parallelDownloadsPerFile: 100,
maxParallelDownloads: 3,
subscribedOffset: 7,
downloadOffset: 10,
expectedOffset: 10 * util.MiB,
},
}
for _, tc := range tbl {
Expand All @@ -145,9 +139,9 @@ func TestParallelDownloads(t *testing.T) {
select {
case jobStatus := <-subscriberC:
if assert.Nil(t, err) {
assert.Equal(t, tc.expectedOffset, jobStatus.Offset)
require.GreaterOrEqual(t, tc.objectSize, jobStatus.Offset)
verifyFileTillOffset(t,
data.FileSpec{Path: util.GetDownloadPath(path.Join(cacheDir, storage.TestBucketName), "path/in/gcs/foo.txt"), FilePerm: util.DefaultFilePerm, DirPerm: util.DefaultDirPerm}, tc.expectedOffset,
data.FileSpec{Path: util.GetDownloadPath(path.Join(cacheDir, storage.TestBucketName), "path/in/gcs/foo.txt"), FilePerm: util.DefaultFilePerm, DirPerm: util.DefaultDirPerm}, jobStatus.Offset,
content)
}
return
Expand All @@ -168,7 +162,7 @@ func TestMultipleConcurrentDownloads(t *testing.T) {
minObj1, content1 := createObjectInStoreAndInitCache(t, cache, bucket, "path/in/gcs/foo.txt", 10*util.MiB)
minObj2, content2 := createObjectInStoreAndInitCache(t, cache, bucket, "path/in/gcs/bar.txt", 5*util.MiB)
jm := NewJobManager(cache, util.DefaultFilePerm, util.DefaultDirPerm, cacheDir, 2, &cfg.FileCacheConfig{EnableParallelDownloads: true,
ParallelDownloadsPerFile: math.MaxInt, DownloadChunkSizeMb: 2, EnableCrc: true, MaxParallelDownloads: 2})
ParallelDownloadsPerFile: 100, DownloadChunkSizeMb: 2, EnableCrc: true, MaxParallelDownloads: 2})
job1 := jm.CreateJobIfNotExists(&minObj1, bucket)
job2 := jm.CreateJobIfNotExists(&minObj2, bucket)
s1 := job1.subscribe(10 * util.MiB)
Expand Down
12 changes: 9 additions & 3 deletions internal/cache/file/downloader/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ type Job struct {
// This semaphore is shared across all jobs spawned by the job manager and is
// used to limit the download concurrency.
maxParallelismSem *semaphore.Weighted

// Channel which is used by goroutines to know which ranges need to be
// downloaded when parallel download is enabled.
rangeChan chan data.ObjectRange
}

// JobStatus represents the status of job.
Expand Down Expand Up @@ -249,8 +253,9 @@ func (job *Job) updateStatusAndNotifySubscribers(statusName jobStatusName, statu
}

// updateStatusOffset updates the offset in job's status and in file info cache
// with the given offset. If the the update is successful, this function also
// with the given offset. If the update is successful, this function also
// notify the subscribers.
// Not concurrency safe and requires LOCK(job.mu)
func (job *Job) updateStatusOffset(downloadedOffset int64) (err error) {
fileInfoKey := data.FileInfoKey{
BucketName: job.bucket.Name(),
Expand All @@ -267,8 +272,7 @@ func (job *Job) updateStatusOffset(downloadedOffset int64) (err error) {
Key: fileInfoKey, ObjectGeneration: job.object.Generation,
FileSize: job.object.Size, Offset: uint64(downloadedOffset),
}
job.mu.Lock()
defer job.mu.Unlock()

err = job.fileInfoCache.UpdateWithoutChangingOrder(fileInfoKeyName, updatedFileInfo)
if err == nil {
job.status.Offset = downloadedOffset
Expand Down Expand Up @@ -338,7 +342,9 @@ func (job *Job) downloadObjectToFile(cacheFile *os.File) (err error) {
newReader = nil
}

job.mu.Lock()
err = job.updateStatusOffset(start)
job.mu.Unlock()
if err != nil {
return err
}
Expand Down
176 changes: 139 additions & 37 deletions internal/cache/file/downloader/parallel_downloads_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ package downloader

import (
"context"
"errors"
"fmt"
"io"
"os"

"github.com/googlecloudplatform/gcsfuse/v2/internal/cache/data"
cacheutil "github.com/googlecloudplatform/gcsfuse/v2/internal/cache/util"
"github.com/googlecloudplatform/gcsfuse/v2/internal/logger"
"github.com/googlecloudplatform/gcsfuse/v2/internal/monitor"
Expand Down Expand Up @@ -67,52 +69,152 @@ func (job *Job) downloadRange(ctx context.Context, dstWriter io.Writer, start, e
return err
}

// RangeMap maintains the ranges downloaded by the different goroutines. This
// function takes a new range and merges with existing ranges if they are continuous.
// If the start offset is 0, updates the job's status offset.
//
// Eg:
// Input: rangeMap entries 0-3, 5-6. New input 7-8.
// Output: rangeMap entries 0-3, 5-8.
func (job *Job) updateRangeMap(rangeMap map[int64]int64, offsetStart int64, offsetEnd int64) error {
// Check if the chunk downloaded completes a range [0, R) and find that
// R.
job.mu.Lock()
defer job.mu.Unlock()

finalStart := offsetStart
finalEnd := offsetEnd

if offsetStart != 0 {
leftStart, exist := rangeMap[offsetStart]
if exist {
finalStart = leftStart
delete(rangeMap, offsetStart)
delete(rangeMap, leftStart)
}
}

rightEnd, exist := rangeMap[offsetEnd]
if exist {
finalEnd = rightEnd
delete(rangeMap, offsetEnd)
delete(rangeMap, rightEnd)
}

rangeMap[finalStart] = finalEnd
rangeMap[finalEnd] = finalStart

if finalStart == 0 {
if updateErr := job.updateStatusOffset(finalEnd); updateErr != nil {
return updateErr
}
}

return nil
}

// Reads the range input from the range channel continuously and downloads that
// range from the GCS. If the range channel is closed, it will exit.
func (job *Job) downloadOffsets(ctx context.Context, goroutineIndex int64, cacheFile *os.File, rangeMap map[int64]int64) func() error {
return func() error {
// Since we keep a goroutine for each job irrespective of the maxParallelism,
// not releasing the default goroutine to the pool.
if goroutineIndex > 0 {
defer job.maxParallelismSem.Release(1)
}

for {
// Read the offset to be downloaded from the channel.
objectRange, ok := <-job.rangeChan
if !ok {
// In case channel is closed return.
return nil
}

offsetWriter := io.NewOffsetWriter(cacheFile, int64(objectRange.Start))
err := job.downloadRange(ctx, offsetWriter, objectRange.Start, objectRange.End)
if err != nil {
return err
}

err = job.updateRangeMap(rangeMap, objectRange.Start, objectRange.End)
if err != nil {
return err
}
}
}
}

// parallelDownloadObjectToFile does parallel download of the backing GCS object
// into given file handle using multiple NewReader method of gcs.Bucket running
// in parallel. This function is canceled if job.cancelCtx is canceled.
func (job *Job) parallelDownloadObjectToFile(cacheFile *os.File) (err error) {
var start, end int64
end = int64(job.object.Size)
var parallelReadRequestSize = int64(job.fileCacheConfig.DownloadChunkSizeMb) * cacheutil.MiB

// Each iteration of this for loop downloads job.fileCacheConfig.DownloadChunkSizeMb * job.fileCacheConfig.ParallelDownloadsPerFile
// size of range of object from GCS into given file handle and updates the
// file info cache.
for start < end {
downloadErrGroup, downloadErrGroupCtx := errgroup.WithContext(job.cancelCtx)

for goRoutineIdx := int64(0); (goRoutineIdx < job.fileCacheConfig.ParallelDownloadsPerFile) && (start < end); goRoutineIdx++ {
rangeStart := start
rangeEnd := min(rangeStart+parallelReadRequestSize, end)
currGoRoutineIdx := goRoutineIdx

// Respect max download parallelism only beyond first go routine.
if currGoRoutineIdx > 0 && !job.maxParallelismSem.TryAcquire(1) {
break
}
rangeMap := make(map[int64]int64)
// Trying to keep the channel size greater than ParallelDownloadsPerFile to ensure
// that there is no goroutine waiting for data(nextRange) to be published to channel.
job.rangeChan = make(chan data.ObjectRange, 2*job.fileCacheConfig.ParallelDownloadsPerFile)
var numGoRoutines int64
var start int64
downloadChunkSize := job.fileCacheConfig.DownloadChunkSizeMb * cacheutil.MiB
downloadErrGroup, downloadErrGroupCtx := errgroup.WithContext(job.cancelCtx)

downloadErrGroup.Go(func() error {
if currGoRoutineIdx > 0 {
defer job.maxParallelismSem.Release(1)
}
// Copy the contents from NewReader to cache file at appropriate offset.
offsetWriter := io.NewOffsetWriter(cacheFile, rangeStart)
return job.downloadRange(downloadErrGroupCtx, offsetWriter, rangeStart, rangeEnd)
})

start = rangeEnd
// Start the goroutines as per the config and the availability.
for numGoRoutines = 0; (numGoRoutines < job.fileCacheConfig.ParallelDownloadsPerFile) && (start < int64(job.object.Size)); numGoRoutines++ {
// Respect max download parallelism only beyond first go routine.
if numGoRoutines > 0 && !job.maxParallelismSem.TryAcquire(1) {
break
}

// If any of the go routines failed, consider the async job failed.
err = downloadErrGroup.Wait()
if err != nil {
return
downloadErrGroup.Go(job.downloadOffsets(downloadErrGroupCtx, numGoRoutines, cacheFile, rangeMap))
start = start + downloadChunkSize
}

for start = 0; start < int64(job.object.Size); {
nextRange := data.ObjectRange{
Start: start,
End: min(int64(job.object.Size), start+downloadChunkSize),
}

err = job.updateStatusOffset(start)
if err != nil {
return err
select {
case job.rangeChan <- nextRange:
start = nextRange.End
// In case we haven't started the goroutines as per the config, checking
// if any goroutines are available now.
// This may not be the ideal way, but since we don't have any way of
// listening if goroutines from other jobs have freed up, checking it here.
for numGoRoutines < job.fileCacheConfig.ParallelDownloadsPerFile && job.maxParallelismSem.TryAcquire(1) {
downloadErrGroup.Go(job.downloadOffsets(downloadErrGroupCtx, numGoRoutines, cacheFile, rangeMap))
numGoRoutines++
}
case <-downloadErrGroupCtx.Done():
return job.handleJobCompletion(downloadErrGroupCtx, downloadErrGroup)
}
}
return

return job.handleJobCompletion(downloadErrGroupCtx, downloadErrGroup)
}

// Job can be success or failure. This method will handle all the scenarios and
// return the appropriate error.
func (job *Job) handleJobCompletion(ctx context.Context, group *errgroup.Group) error {
// Close the channel since we are ending the job.
close(job.rangeChan)

// First check if the context has reported any error. This is to handle scenario
// where context is cancelled and no goroutines are running.
err := ctx.Err()
if err != nil {
// Ideally not required, but this is an additional check to ensure that
// no goroutines are running.
waitErr := group.Wait()
return errors.Join(err, waitErr)
}

// If any of the go routines failed, consider the async job failed.
err = group.Wait()
if err != nil {
return err
}

return nil
}
Loading

0 comments on commit 5cdab7a

Please sign in to comment.