Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement limit on max-concurrency. #2032

Merged
merged 17 commits into from
Jun 20, 2024
Merged
6 changes: 5 additions & 1 deletion internal/cache/file/cache_handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"io"
"math"
"os"
"path"
"path/filepath"
Expand All @@ -39,6 +40,7 @@ import (
"github.com/googlecloudplatform/gcsfuse/v2/tools/integration_tests/util/operations"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"golang.org/x/sync/semaphore"
)

const CacheMaxSize = 100 * util.MiB
Expand Down Expand Up @@ -136,7 +138,7 @@ func (cht *cacheHandleTest) SetupTest() {
readLocalFileHandle, err := util.CreateFile(cht.fileSpec, os.O_RDONLY)
assert.Nil(cht.T(), err)

fileDownloadJob := downloader.NewJob(cht.object, cht.bucket, cht.cache, DefaultSequentialReadSizeMb, cht.fileSpec, func() {}, &config.FileCacheConfig{EnableCrcCheck: true, EnableParallelDownloads: false})
fileDownloadJob := downloader.NewJob(cht.object, cht.bucket, cht.cache, DefaultSequentialReadSizeMb, cht.fileSpec, func() {}, &config.FileCacheConfig{EnableCrcCheck: true, EnableParallelDownloads: false}, semaphore.NewWeighted(math.MaxInt64))

cht.cacheHandle = NewCacheHandle(readLocalFileHandle, fileDownloadJob, cht.cache, false, 0)
}
Expand Down Expand Up @@ -835,6 +837,7 @@ func (cht *cacheHandleTest) Test_Read_Sequential_Parallel_Download_True() {
cht.fileSpec,
func() {},
&config.FileCacheConfig{EnableCrcCheck: true, EnableParallelDownloads: true, DownloadParallelismPerFile: 2, ReadRequestSizeMB: 2},
semaphore.NewWeighted(math.MaxInt64),
)
cht.cacheHandle.fileDownloadJob = fileDownloadJob

Expand All @@ -861,6 +864,7 @@ func (cht *cacheHandleTest) Test_Read_Random_Parallel_Download_True() {
cht.fileSpec,
func() {},
&config.FileCacheConfig{EnableCrcCheck: true, EnableParallelDownloads: true, DownloadParallelismPerFile: 5, ReadRequestSizeMB: 2},
semaphore.NewWeighted(math.MaxInt64),
)
cht.cacheHandle.fileDownloadJob = fileDownloadJob

Expand Down
15 changes: 12 additions & 3 deletions internal/cache/file/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package downloader

import (
"math"
"os"

"github.com/googlecloudplatform/gcsfuse/v2/internal/cache/data"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/googlecloudplatform/gcsfuse/v2/internal/config"
"github.com/googlecloudplatform/gcsfuse/v2/internal/locker"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
"golang.org/x/sync/semaphore"
)

// JobManager is responsible for maintaining, getting and removing file download
Expand Down Expand Up @@ -55,19 +57,26 @@ type JobManager struct {
// concatenation of bucket name, "/", and object name. e.g. object path for an
// object named "a/b/foo.txt" in bucket named "test_bucket" would be
// "test_bucket/a/b/foo.txt"
jobs map[string]*Job
mu locker.Locker
jobs map[string]*Job
mu locker.Locker
maxParallelismSem *semaphore.Weighted
}

func NewJobManager(fileInfoCache *lru.Cache, filePerm os.FileMode, dirPerm os.FileMode,
cacheDir string, sequentialReadSizeMb int32, c *config.FileCacheConfig) (jm *JobManager) {
maxDownloadParallelism := int64(math.MaxInt64)
if c.MaxDownloadParallelism > 0 {
kislaykishore marked this conversation as resolved.
Show resolved Hide resolved
maxDownloadParallelism = int64(c.MaxDownloadParallelism)
}
jm = &JobManager{
fileInfoCache: fileInfoCache,
filePerm: filePerm,
dirPerm: dirPerm,
cacheDir: cacheDir,
sequentialReadSizeMb: sequentialReadSizeMb,
fileCacheConfig: c,
// Shared between jobs - Limits the overall concurrency of downloads.
maxParallelismSem: semaphore.NewWeighted(maxDownloadParallelism),
}
jm.mu = locker.New("JobManager", func() {})
jm.jobs = make(map[string]*Job)
Expand Down Expand Up @@ -105,7 +114,7 @@ func (jm *JobManager) CreateJobIfNotExists(object *gcs.MinObject, bucket gcs.Buc
removeJobCallback := func() {
jm.removeJob(object.Name, bucket.Name())
}
job = NewJob(object, bucket, jm.fileInfoCache, jm.sequentialReadSizeMb, fileSpec, removeJobCallback, jm.fileCacheConfig)
job = NewJob(object, bucket, jm.fileInfoCache, jm.sequentialReadSizeMb, fileSpec, removeJobCallback, jm.fileCacheConfig, jm.maxParallelismSem)
jm.jobs[objectPath] = job
return job
}
Expand Down
199 changes: 199 additions & 0 deletions internal/cache/file/downloader/jm_parallel_downloads_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
// Copyright 2024 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package downloader

import (
"context"
"crypto/rand"
"math"
"os"
"path"
"testing"
"time"

"github.com/googlecloudplatform/gcsfuse/v2/internal/cache/data"
"github.com/googlecloudplatform/gcsfuse/v2/internal/cache/lru"
"github.com/googlecloudplatform/gcsfuse/v2/internal/cache/util"
"github.com/googlecloudplatform/gcsfuse/v2/internal/config"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/storageutil"
"github.com/stretchr/testify/assert"
)

func createObjectInBucket(t *testing.T, objPath string, objSize int64, bucket gcs.Bucket) []byte {
t.Helper()
objectContent := make([]byte, objSize)
_, err := rand.Read(objectContent)
if err != nil {
t.Fatalf("Error while generating random object content: %v", err)
}
_, err = storageutil.CreateObject(context.Background(), bucket, objPath, objectContent)
if err != nil {
t.Fatalf("Error while creating object in fakestorage: %v", err)
}
return objectContent
}

func configureFakeStorage(t *testing.T) storage.StorageHandle {
t.Helper()
fakeStorage := storage.NewFakeStorage()
t.Cleanup(func() { fakeStorage.ShutDown() })
return fakeStorage.CreateStorageHandle()
}

func configureCache(t *testing.T, maxSize int64) (*lru.Cache, string) {
t.Helper()
cache := lru.NewCache(uint64(maxSize))
cacheDir, err := os.MkdirTemp("", "gcsfuse_test")
if err != nil {
t.Fatalf("Error while creating the cache directory: %v", err)
}
t.Cleanup(func() { _ = os.RemoveAll(cacheDir) })
return cache, cacheDir
}

func createObjectInStoreAndInitCache(t *testing.T, cache *lru.Cache, bucket gcs.Bucket, objectName string, objectSize int64) (gcs.MinObject, []byte) {
t.Helper()
content := createObjectInBucket(t, objectName, objectSize, bucket)
minObj := getMinObject(objectName, bucket)
fileInfoKey := data.FileInfoKey{
BucketName: storage.TestBucketName,
ObjectName: objectName,
}
fileInfo := data.FileInfo{
Key: fileInfoKey,
ObjectGeneration: minObj.Generation,
FileSize: minObj.Size,
Offset: 0,
}
fileInfoKeyName, err := fileInfoKey.Key()
if err != nil {
t.Fatalf("Error occurred while retrieving fileInfoKey: %v", err)
}
_, err = cache.Insert(fileInfoKeyName, fileInfo)
if err != nil {
t.Fatalf("Error occurred while inserting fileinfo into cache: %v", err)
}
return minObj, content
}

func TestParallelDownloads(t *testing.T) {
tbl := []struct {
name string
objectSize int64
readReqSize int
downloadParallelismPerFile int
maxDownloadParallelism int
downloadOffset int64
expectedOffset int64
subscribedOffset int64
}{
{
name: "download in chunks of concurrency * readReqSize",
objectSize: 15 * util.MiB,
readReqSize: 4,
downloadParallelismPerFile: math.MaxInt,
maxDownloadParallelism: 3,
subscribedOffset: 7,
downloadOffset: 10,
expectedOffset: 12 * util.MiB,
},
{
name: "download only upto the object size",
objectSize: 10 * util.MiB,
readReqSize: 4,
downloadParallelismPerFile: math.MaxInt,
maxDownloadParallelism: 3,
subscribedOffset: 7,
downloadOffset: 10,
expectedOffset: 10 * util.MiB,
},
}
for _, tc := range tbl {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
cache, cacheDir := configureCache(t, 2*tc.objectSize)
storageHandle := configureFakeStorage(t)
bucket := storageHandle.BucketHandle(storage.TestBucketName, "")
minObj, content := createObjectInStoreAndInitCache(t, cache, bucket, "path/in/gcs/foo.txt", tc.objectSize)
jm := NewJobManager(cache, util.DefaultFilePerm, util.DefaultDirPerm, cacheDir, 2, &config.FileCacheConfig{EnableParallelDownloads: true,
DownloadParallelismPerFile: tc.downloadParallelismPerFile, ReadRequestSizeMB: tc.readReqSize, EnableCrcCheck: true, MaxDownloadParallelism: tc.maxDownloadParallelism})
job := jm.CreateJobIfNotExists(&minObj, bucket)
subscriberC := job.subscribe(tc.subscribedOffset)

_, err := job.Download(context.Background(), 10, false)

timeout := time.After(1 * time.Second)
for {
select {
case jobStatus := <-subscriberC:
if assert.Nil(t, err) {
assert.Equal(t, tc.expectedOffset, jobStatus.Offset)
verifyFileTillOffset(t,
data.FileSpec{Path: util.GetDownloadPath(path.Join(cacheDir, storage.TestBucketName), "path/in/gcs/foo.txt"), FilePerm: util.DefaultFilePerm, DirPerm: util.DefaultDirPerm}, tc.expectedOffset,
content)
}
return
case <-timeout:
assert.Fail(t, "Test timed out")
return
}
}
})
}
}

func TestMultipleConcurrentDownloads(t *testing.T) {
t.Parallel()
storageHandle := configureFakeStorage(t)
cache, cacheDir := configureCache(t, 30*util.MiB)
bucket := storageHandle.BucketHandle(storage.TestBucketName, "")
minObj1, content1 := createObjectInStoreAndInitCache(t, cache, bucket, "path/in/gcs/foo.txt", 10*util.MiB)
minObj2, content2 := createObjectInStoreAndInitCache(t, cache, bucket, "path/in/gcs/bar.txt", 5*util.MiB)
jm := NewJobManager(cache, util.DefaultFilePerm, util.DefaultDirPerm, cacheDir, 2, &config.FileCacheConfig{EnableParallelDownloads: true,
DownloadParallelismPerFile: math.MaxInt, ReadRequestSizeMB: 2, EnableCrcCheck: true, MaxDownloadParallelism: 2})
job1 := jm.CreateJobIfNotExists(&minObj1, bucket)
job2 := jm.CreateJobIfNotExists(&minObj2, bucket)
s1 := job1.subscribe(10 * util.MiB)
s2 := job2.subscribe(5 * util.MiB)
ctx := context.Background()

_, err1 := job1.Download(ctx, 10*util.MiB, false)
_, err2 := job2.Download(ctx, 5*util.MiB, false)

notif1, notif2 := false, false
timeout := time.After(1 * time.Second)
for {
select {
case <-s1:
notif1 = true
case <-s2:
notif2 = true
case <-timeout:
assert.Fail(t, "Test timed out")
return
}
if assert.Nil(t, err1) && assert.Nil(t, err2) && notif1 && notif2 {
verifyFileTillOffset(t,
data.FileSpec{Path: util.GetDownloadPath(path.Join(cacheDir, storage.TestBucketName), "path/in/gcs/foo.txt"), FilePerm: util.DefaultFilePerm, DirPerm: util.DefaultDirPerm},
10*util.MiB, content1)
verifyFileTillOffset(t,
data.FileSpec{Path: util.GetDownloadPath(path.Join(cacheDir, storage.TestBucketName), "path/in/gcs/bar.txt"), FilePerm: util.DefaultFilePerm, DirPerm: util.DefaultDirPerm},
5*util.MiB, content2)
return
}
}
}
6 changes: 6 additions & 0 deletions internal/cache/file/downloader/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
"github.com/googlecloudplatform/gcsfuse/v2/internal/util"
"golang.org/x/net/context"
"golang.org/x/sync/semaphore"
)

type jobStatusName string
Expand Down Expand Up @@ -87,6 +88,9 @@ type Job struct {
removeJobCallback func()

mu locker.Locker
// This semaphore is shared across all jobs spawned by the job manager and is
// used to limit the download concurrency.
maxParallelismSem *semaphore.Weighted
}

// JobStatus represents the status of job.
Expand All @@ -111,6 +115,7 @@ func NewJob(
fileSpec data.FileSpec,
removeJobCallback func(),
fileCacheConfig *config.FileCacheConfig,
maxParallelismSem *semaphore.Weighted,
) (job *Job) {
job = &Job{
object: object,
Expand All @@ -120,6 +125,7 @@ func NewJob(
fileSpec: fileSpec,
removeJobCallback: removeJobCallback,
fileCacheConfig: fileCacheConfig,
maxParallelismSem: maxParallelismSem,
}
job.mu = locker.New("Job-"+fileSpec.Path, job.checkInvariants)
job.init()
Expand Down
21 changes: 4 additions & 17 deletions internal/cache/file/downloader/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"errors"
"fmt"
"math"
"os"
"path"
"reflect"
Expand All @@ -31,10 +32,10 @@ import (
"github.com/googlecloudplatform/gcsfuse/v2/internal/cache/lru"
"github.com/googlecloudplatform/gcsfuse/v2/internal/cache/util"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/storageutil"
testutil "github.com/googlecloudplatform/gcsfuse/v2/internal/util"
. "github.com/jacobsa/ogletest"
"golang.org/x/sync/semaphore"
)

////////////////////////////////////////////////////////////////////////
Expand All @@ -45,34 +46,20 @@ const CacheMaxSize = 50
const DefaultObjectName = "foo"
const DefaultSequentialReadSizeMb = 100

func (dt *downloaderTest) getMinObject(objectName string) gcs.MinObject {
ctx := context.Background()
minObject, _, err := dt.bucket.StatObject(ctx, &gcs.StatObjectRequest{Name: objectName,
ForceFetchFromGcs: true})
if err != nil {
panic(fmt.Errorf("error whlie stating object: %w", err))
}

if minObject != nil {
return *minObject
}
return gcs.MinObject{}
}

func (dt *downloaderTest) initJobTest(objectName string, objectContent []byte, sequentialReadSize int32, lruCacheSize uint64, removeCallback func()) {
ctx := context.Background()
objects := map[string][]byte{objectName: objectContent}
err := storageutil.CreateObjects(ctx, dt.bucket, objects)
AssertEq(nil, err)
dt.object = dt.getMinObject(objectName)
dt.object = getMinObject(objectName, dt.bucket)
dt.fileSpec = data.FileSpec{
Path: dt.fileCachePath(dt.bucket.Name(), dt.object.Name),
FilePerm: util.DefaultFilePerm,
DirPerm: util.DefaultDirPerm,
}
dt.cache = lru.NewCache(lruCacheSize)

dt.job = NewJob(&dt.object, dt.bucket, dt.cache, sequentialReadSize, dt.fileSpec, removeCallback, dt.defaultFileCacheConfig)
dt.job = NewJob(&dt.object, dt.bucket, dt.cache, sequentialReadSize, dt.fileSpec, removeCallback, dt.defaultFileCacheConfig, semaphore.NewWeighted(math.MaxInt64))
fileInfoKey := data.FileInfoKey{
BucketName: storage.TestBucketName,
ObjectName: objectName,
Expand Down
Loading
Loading