diff --git a/internal/cache/data/object_range.go b/internal/cache/data/object_range.go new file mode 100644 index 0000000000..70ad550ac3 --- /dev/null +++ b/internal/cache/data/object_range.go @@ -0,0 +1,7 @@ +package data + +// ObjectRange specifies the range within the gcs object. +type ObjectRange struct { + Start int64 + End int64 +} diff --git a/internal/cache/file/downloader/jm_parallel_downloads_test.go b/internal/cache/file/downloader/jm_parallel_downloads_test.go index 91d551365b..227dfe7199 100644 --- a/internal/cache/file/downloader/jm_parallel_downloads_test.go +++ b/internal/cache/file/downloader/jm_parallel_downloads_test.go @@ -17,7 +17,6 @@ package downloader import ( "context" "crypto/rand" - "math" "os" "path" "testing" @@ -31,6 +30,7 @@ import ( "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs" "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/storageutil" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func createObjectInBucket(t *testing.T, objPath string, objSize int64, bucket gcs.Bucket) []byte { @@ -98,31 +98,25 @@ func TestParallelDownloads(t *testing.T) { parallelDownloadsPerFile int64 maxParallelDownloads int64 downloadOffset int64 - expectedOffset int64 subscribedOffset int64 }{ { - name: "download in chunks of concurrency * readReqSize", + name: "download the entire object when object size > no of goroutines * readReqSize", objectSize: 15 * util.MiB, readReqSize: 3, - parallelDownloadsPerFile: math.MaxInt, + parallelDownloadsPerFile: 100, maxParallelDownloads: 3, subscribedOffset: 7, downloadOffset: 10, - // Concurrency can go to (maxParallelDownloads + 1) in case - // parallelDownloadsPerFile > maxParallelDownloads because we always - // spawn a minimum of 1 go routine per async job. - expectedOffset: 12 * util.MiB, }, { name: "download only upto the object size", objectSize: 10 * util.MiB, readReqSize: 4, - parallelDownloadsPerFile: math.MaxInt, + parallelDownloadsPerFile: 100, maxParallelDownloads: 3, subscribedOffset: 7, downloadOffset: 10, - expectedOffset: 10 * util.MiB, }, } for _, tc := range tbl { @@ -145,9 +139,9 @@ func TestParallelDownloads(t *testing.T) { select { case jobStatus := <-subscriberC: if assert.Nil(t, err) { - assert.Equal(t, tc.expectedOffset, jobStatus.Offset) + require.GreaterOrEqual(t, tc.objectSize, jobStatus.Offset) verifyFileTillOffset(t, - data.FileSpec{Path: util.GetDownloadPath(path.Join(cacheDir, storage.TestBucketName), "path/in/gcs/foo.txt"), FilePerm: util.DefaultFilePerm, DirPerm: util.DefaultDirPerm}, tc.expectedOffset, + data.FileSpec{Path: util.GetDownloadPath(path.Join(cacheDir, storage.TestBucketName), "path/in/gcs/foo.txt"), FilePerm: util.DefaultFilePerm, DirPerm: util.DefaultDirPerm}, jobStatus.Offset, content) } return @@ -168,7 +162,7 @@ func TestMultipleConcurrentDownloads(t *testing.T) { minObj1, content1 := createObjectInStoreAndInitCache(t, cache, bucket, "path/in/gcs/foo.txt", 10*util.MiB) minObj2, content2 := createObjectInStoreAndInitCache(t, cache, bucket, "path/in/gcs/bar.txt", 5*util.MiB) jm := NewJobManager(cache, util.DefaultFilePerm, util.DefaultDirPerm, cacheDir, 2, &cfg.FileCacheConfig{EnableParallelDownloads: true, - ParallelDownloadsPerFile: math.MaxInt, DownloadChunkSizeMb: 2, EnableCrc: true, MaxParallelDownloads: 2}) + ParallelDownloadsPerFile: 100, DownloadChunkSizeMb: 2, EnableCrc: true, MaxParallelDownloads: 2}) job1 := jm.CreateJobIfNotExists(&minObj1, bucket) job2 := jm.CreateJobIfNotExists(&minObj2, bucket) s1 := job1.subscribe(10 * util.MiB) diff --git a/internal/cache/file/downloader/job.go b/internal/cache/file/downloader/job.go index 7a1a3bce0f..6c5dbc5ea3 100644 --- a/internal/cache/file/downloader/job.go +++ b/internal/cache/file/downloader/job.go @@ -91,6 +91,10 @@ type Job struct { // This semaphore is shared across all jobs spawned by the job manager and is // used to limit the download concurrency. maxParallelismSem *semaphore.Weighted + + // Channel which is used by goroutines to know which ranges need to be + // downloaded when parallel download is enabled. + rangeChan chan data.ObjectRange } // JobStatus represents the status of job. @@ -249,8 +253,9 @@ func (job *Job) updateStatusAndNotifySubscribers(statusName jobStatusName, statu } // updateStatusOffset updates the offset in job's status and in file info cache -// with the given offset. If the the update is successful, this function also +// with the given offset. If the update is successful, this function also // notify the subscribers. +// Not concurrency safe and requires LOCK(job.mu) func (job *Job) updateStatusOffset(downloadedOffset int64) (err error) { fileInfoKey := data.FileInfoKey{ BucketName: job.bucket.Name(), @@ -267,8 +272,7 @@ func (job *Job) updateStatusOffset(downloadedOffset int64) (err error) { Key: fileInfoKey, ObjectGeneration: job.object.Generation, FileSize: job.object.Size, Offset: uint64(downloadedOffset), } - job.mu.Lock() - defer job.mu.Unlock() + err = job.fileInfoCache.UpdateWithoutChangingOrder(fileInfoKeyName, updatedFileInfo) if err == nil { job.status.Offset = downloadedOffset @@ -338,7 +342,9 @@ func (job *Job) downloadObjectToFile(cacheFile *os.File) (err error) { newReader = nil } + job.mu.Lock() err = job.updateStatusOffset(start) + job.mu.Unlock() if err != nil { return err } diff --git a/internal/cache/file/downloader/parallel_downloads_job.go b/internal/cache/file/downloader/parallel_downloads_job.go index bdae66efac..23ffe1a0fe 100644 --- a/internal/cache/file/downloader/parallel_downloads_job.go +++ b/internal/cache/file/downloader/parallel_downloads_job.go @@ -16,10 +16,12 @@ package downloader import ( "context" + "errors" "fmt" "io" "os" + "github.com/googlecloudplatform/gcsfuse/v2/internal/cache/data" cacheutil "github.com/googlecloudplatform/gcsfuse/v2/internal/cache/util" "github.com/googlecloudplatform/gcsfuse/v2/internal/logger" "github.com/googlecloudplatform/gcsfuse/v2/internal/monitor" @@ -67,52 +69,152 @@ func (job *Job) downloadRange(ctx context.Context, dstWriter io.Writer, start, e return err } +// RangeMap maintains the ranges downloaded by the different goroutines. This +// function takes a new range and merges with existing ranges if they are continuous. +// If the start offset is 0, updates the job's status offset. +// +// Eg: +// Input: rangeMap entries 0-3, 5-6. New input 7-8. +// Output: rangeMap entries 0-3, 5-8. +func (job *Job) updateRangeMap(rangeMap map[int64]int64, offsetStart int64, offsetEnd int64) error { + // Check if the chunk downloaded completes a range [0, R) and find that + // R. + job.mu.Lock() + defer job.mu.Unlock() + + finalStart := offsetStart + finalEnd := offsetEnd + + if offsetStart != 0 { + leftStart, exist := rangeMap[offsetStart] + if exist { + finalStart = leftStart + delete(rangeMap, offsetStart) + delete(rangeMap, leftStart) + } + } + + rightEnd, exist := rangeMap[offsetEnd] + if exist { + finalEnd = rightEnd + delete(rangeMap, offsetEnd) + delete(rangeMap, rightEnd) + } + + rangeMap[finalStart] = finalEnd + rangeMap[finalEnd] = finalStart + + if finalStart == 0 { + if updateErr := job.updateStatusOffset(finalEnd); updateErr != nil { + return updateErr + } + } + + return nil +} + +// Reads the range input from the range channel continuously and downloads that +// range from the GCS. If the range channel is closed, it will exit. +func (job *Job) downloadOffsets(ctx context.Context, goroutineIndex int64, cacheFile *os.File, rangeMap map[int64]int64) func() error { + return func() error { + // Since we keep a goroutine for each job irrespective of the maxParallelism, + // not releasing the default goroutine to the pool. + if goroutineIndex > 0 { + defer job.maxParallelismSem.Release(1) + } + + for { + // Read the offset to be downloaded from the channel. + objectRange, ok := <-job.rangeChan + if !ok { + // In case channel is closed return. + return nil + } + + offsetWriter := io.NewOffsetWriter(cacheFile, int64(objectRange.Start)) + err := job.downloadRange(ctx, offsetWriter, objectRange.Start, objectRange.End) + if err != nil { + return err + } + + err = job.updateRangeMap(rangeMap, objectRange.Start, objectRange.End) + if err != nil { + return err + } + } + } +} + // parallelDownloadObjectToFile does parallel download of the backing GCS object // into given file handle using multiple NewReader method of gcs.Bucket running // in parallel. This function is canceled if job.cancelCtx is canceled. func (job *Job) parallelDownloadObjectToFile(cacheFile *os.File) (err error) { - var start, end int64 - end = int64(job.object.Size) - var parallelReadRequestSize = int64(job.fileCacheConfig.DownloadChunkSizeMb) * cacheutil.MiB - - // Each iteration of this for loop downloads job.fileCacheConfig.DownloadChunkSizeMb * job.fileCacheConfig.ParallelDownloadsPerFile - // size of range of object from GCS into given file handle and updates the - // file info cache. - for start < end { - downloadErrGroup, downloadErrGroupCtx := errgroup.WithContext(job.cancelCtx) - - for goRoutineIdx := int64(0); (goRoutineIdx < job.fileCacheConfig.ParallelDownloadsPerFile) && (start < end); goRoutineIdx++ { - rangeStart := start - rangeEnd := min(rangeStart+parallelReadRequestSize, end) - currGoRoutineIdx := goRoutineIdx - - // Respect max download parallelism only beyond first go routine. - if currGoRoutineIdx > 0 && !job.maxParallelismSem.TryAcquire(1) { - break - } + rangeMap := make(map[int64]int64) + // Trying to keep the channel size greater than ParallelDownloadsPerFile to ensure + // that there is no goroutine waiting for data(nextRange) to be published to channel. + job.rangeChan = make(chan data.ObjectRange, 2*job.fileCacheConfig.ParallelDownloadsPerFile) + var numGoRoutines int64 + var start int64 + downloadChunkSize := job.fileCacheConfig.DownloadChunkSizeMb * cacheutil.MiB + downloadErrGroup, downloadErrGroupCtx := errgroup.WithContext(job.cancelCtx) - downloadErrGroup.Go(func() error { - if currGoRoutineIdx > 0 { - defer job.maxParallelismSem.Release(1) - } - // Copy the contents from NewReader to cache file at appropriate offset. - offsetWriter := io.NewOffsetWriter(cacheFile, rangeStart) - return job.downloadRange(downloadErrGroupCtx, offsetWriter, rangeStart, rangeEnd) - }) - - start = rangeEnd + // Start the goroutines as per the config and the availability. + for numGoRoutines = 0; (numGoRoutines < job.fileCacheConfig.ParallelDownloadsPerFile) && (start < int64(job.object.Size)); numGoRoutines++ { + // Respect max download parallelism only beyond first go routine. + if numGoRoutines > 0 && !job.maxParallelismSem.TryAcquire(1) { + break } - // If any of the go routines failed, consider the async job failed. - err = downloadErrGroup.Wait() - if err != nil { - return + downloadErrGroup.Go(job.downloadOffsets(downloadErrGroupCtx, numGoRoutines, cacheFile, rangeMap)) + start = start + downloadChunkSize + } + + for start = 0; start < int64(job.object.Size); { + nextRange := data.ObjectRange{ + Start: start, + End: min(int64(job.object.Size), start+downloadChunkSize), } - err = job.updateStatusOffset(start) - if err != nil { - return err + select { + case job.rangeChan <- nextRange: + start = nextRange.End + // In case we haven't started the goroutines as per the config, checking + // if any goroutines are available now. + // This may not be the ideal way, but since we don't have any way of + // listening if goroutines from other jobs have freed up, checking it here. + for numGoRoutines < job.fileCacheConfig.ParallelDownloadsPerFile && job.maxParallelismSem.TryAcquire(1) { + downloadErrGroup.Go(job.downloadOffsets(downloadErrGroupCtx, numGoRoutines, cacheFile, rangeMap)) + numGoRoutines++ + } + case <-downloadErrGroupCtx.Done(): + return job.handleJobCompletion(downloadErrGroupCtx, downloadErrGroup) } } - return + + return job.handleJobCompletion(downloadErrGroupCtx, downloadErrGroup) +} + +// Job can be success or failure. This method will handle all the scenarios and +// return the appropriate error. +func (job *Job) handleJobCompletion(ctx context.Context, group *errgroup.Group) error { + // Close the channel since we are ending the job. + close(job.rangeChan) + + // First check if the context has reported any error. This is to handle scenario + // where context is cancelled and no goroutines are running. + err := ctx.Err() + if err != nil { + // Ideally not required, but this is an additional check to ensure that + // no goroutines are running. + waitErr := group.Wait() + return errors.Join(err, waitErr) + } + + // If any of the go routines failed, consider the async job failed. + err = group.Wait() + if err != nil { + return err + } + + return nil } diff --git a/internal/cache/file/downloader/parallel_downloads_job_test.go b/internal/cache/file/downloader/parallel_downloads_job_test.go index e01207f8cc..184633ec03 100644 --- a/internal/cache/file/downloader/parallel_downloads_job_test.go +++ b/internal/cache/file/downloader/parallel_downloads_job_test.go @@ -64,7 +64,7 @@ func (dt *parallelDownloaderTest) Test_downloadRange() { FilePerm: os.FileMode(0600), DirPerm: os.FileMode(0700)}, os.O_TRUNC|os.O_RDWR) AssertEq(nil, err) verifyContentAtOffset := func(file *os.File, start, end int64) { - _, err = file.Seek(start, 0) + _, err = file.Seek(int64(start), 0) AssertEq(nil, err) buf := make([]byte, end-start) _, err = file.Read(buf) @@ -152,3 +152,82 @@ func (dt *parallelDownloaderTest) Test_parallelDownloadObjectToFile_CtxCancelled AssertTrue(errors.Is(err, context.Canceled), fmt.Sprintf("didn't get context canceled error: %v", err)) } + +func (dt *parallelDownloaderTest) Test_updateRangeMap_withNoEntries() { + rangeMap := make(map[int64]int64) + + err := dt.job.updateRangeMap(rangeMap, 0, 10) + + AssertEq(nil, err) + AssertEq(2, len(rangeMap)) + AssertEq(10, rangeMap[0]) + AssertEq(0, rangeMap[10]) +} + +func (dt *parallelDownloaderTest) Test_updateRangeMap_withInputContinuousWithEndOffset() { + rangeMap := make(map[int64]int64) + rangeMap[0] = 2 + rangeMap[2] = 0 + rangeMap[4] = 6 + rangeMap[6] = 4 + + err := dt.job.updateRangeMap(rangeMap, 6, 8) + + AssertEq(nil, err) + AssertEq(4, len(rangeMap)) + AssertEq(2, rangeMap[0]) + AssertEq(0, rangeMap[2]) + AssertEq(8, rangeMap[4]) + AssertEq(4, rangeMap[8]) +} + +func (dt *parallelDownloaderTest) Test_updateRangeMap_withInputContinuousWithStartOffset() { + rangeMap := make(map[int64]int64) + rangeMap[2] = 4 + rangeMap[4] = 2 + rangeMap[8] = 10 + rangeMap[10] = 8 + + err := dt.job.updateRangeMap(rangeMap, 0, 2) + + AssertEq(nil, err) + AssertEq(4, len(rangeMap)) + AssertEq(4, rangeMap[0]) + AssertEq(0, rangeMap[4]) + AssertEq(10, rangeMap[8]) + AssertEq(8, rangeMap[10]) +} + +func (dt *parallelDownloaderTest) Test_updateRangeMap_withInputFillingTheMissingRange() { + rangeMap := make(map[int64]int64) + rangeMap[0] = 4 + rangeMap[4] = 0 + rangeMap[6] = 8 + rangeMap[8] = 6 + + err := dt.job.updateRangeMap(rangeMap, 4, 6) + + AssertEq(nil, err) + AssertEq(2, len(rangeMap)) + AssertEq(8, rangeMap[0]) + AssertEq(0, rangeMap[8]) +} + +func (dt *parallelDownloaderTest) Test_updateRangeMap_withInputNotOverlappingWithAnyRanges() { + rangeMap := make(map[int64]int64) + rangeMap[0] = 4 + rangeMap[4] = 0 + rangeMap[12] = 14 + rangeMap[14] = 12 + + err := dt.job.updateRangeMap(rangeMap, 8, 10) + + AssertEq(nil, err) + AssertEq(6, len(rangeMap)) + AssertEq(0, rangeMap[4]) + AssertEq(4, rangeMap[0]) + AssertEq(8, rangeMap[10]) + AssertEq(10, rangeMap[8]) + AssertEq(12, rangeMap[14]) + AssertEq(14, rangeMap[12]) +} diff --git a/tools/integration_tests/read_cache/job_chunk_test.go b/tools/integration_tests/read_cache/job_chunk_test.go index 72e6ad4b03..83ff16a6d5 100644 --- a/tools/integration_tests/read_cache/job_chunk_test.go +++ b/tools/integration_tests/read_cache/job_chunk_test.go @@ -17,7 +17,6 @@ package read_cache import ( "context" "log" - "math" "path" "sync" "testing" @@ -39,11 +38,10 @@ import ( //////////////////////////////////////////////////////////////////////// type jobChunkTest struct { - flags []string - storageClient *storage.Client - ctx context.Context - chunkSize int64 - isLimitedByMaxParallelDownloads bool + flags []string + storageClient *storage.Client + ctx context.Context + chunkSize int64 } func (s *jobChunkTest) Setup(t *testing.T) { @@ -83,7 +81,6 @@ func createConfigFileForJobChunkTest(cacheSize int64, cacheFileForRangeRead bool func (s *jobChunkTest) TestJobChunkSizeForSingleFileReads(t *testing.T) { var fileSize int64 = 24 * util.MiB - chunkCount := math.Ceil(float64(fileSize) / float64(s.chunkSize)) testFileName := setupFileInTestDir(s.ctx, s.storageClient, testDirName, fileSize, t) expectedOutcome := readFileAndValidateCacheWithGCS(s.ctx, s.storageClient, testFileName, fileSize, false, t) @@ -92,16 +89,23 @@ func (s *jobChunkTest) TestJobChunkSizeForSingleFileReads(t *testing.T) { structuredJobLogs := read_logs.GetJobLogsSortedByTimestamp(setup.LogFile(), t) assert.Equal(t, expectedOutcome.BucketName, structuredJobLogs[0].BucketName) assert.Equal(t, expectedOutcome.ObjectName, structuredJobLogs[0].ObjectName) - require.EqualValues(t, chunkCount, len(structuredJobLogs[0].JobEntries)) - for i := 0; int64(i) < int64(chunkCount); i++ { - offset := min(s.chunkSize*int64(i+1), fileSize) - assert.Equal(t, offset, structuredJobLogs[0].JobEntries[i].Offset) + + // We need to check that downloadedOffset is always greater than the previous downloadedOffset + // and is in multiples of chunkSize. + for i := 1; i < len(structuredJobLogs[0].JobEntries); i++ { + offsetDiff := structuredJobLogs[0].JobEntries[i].Offset - structuredJobLogs[0].JobEntries[i-1].Offset + assert.Greater(t, offsetDiff, int64(0)) + // This is true for all entries except last one. + // Will be true for last entry only if the fileSize is multiple of chunkSize. + assert.Equal(t, int64(0), offsetDiff%s.chunkSize) } + + // Validate that last downloadedOffset is same as fileSize. + assert.Equal(t, fileSize, structuredJobLogs[0].JobEntries[len(structuredJobLogs[0].JobEntries)-1].Offset) } func (s *jobChunkTest) TestJobChunkSizeForMultipleFileReads(t *testing.T) { var fileSize int64 = 24 * util.MiB - chunkCount := fileSize / s.chunkSize var testFileNames [2]string var expectedOutcome [2]*Expected testFileNames[0] = setupFileInTestDir(s.ctx, s.storageClient, testDirName, fileSize, t) @@ -129,22 +133,24 @@ func (s *jobChunkTest) TestJobChunkSizeForMultipleFileReads(t *testing.T) { expectedOutcome[0], expectedOutcome[1] = expectedOutcome[1], expectedOutcome[0] testFileNames[0], testFileNames[1] = testFileNames[1], testFileNames[0] } - assert.Equal(t, expectedOutcome[0].BucketName, structuredJobLogs[0].BucketName) - assert.Equal(t, expectedOutcome[1].BucketName, structuredJobLogs[1].BucketName) - assert.Equal(t, expectedOutcome[0].ObjectName, structuredJobLogs[0].ObjectName) - assert.Equal(t, expectedOutcome[1].ObjectName, structuredJobLogs[1].ObjectName) - if s.isLimitedByMaxParallelDownloads { - require.LessOrEqual(t, chunkCount, int64(len(structuredJobLogs[0].JobEntries))) - for i := 0; int64(i) < int64(len(structuredJobLogs[0].JobEntries)); i++ { - offset := min(s.chunkSize*int64(i+1), fileSize) - assert.GreaterOrEqual(t, offset, structuredJobLogs[0].JobEntries[i].Offset) - } - } else { - require.EqualValues(t, chunkCount, len(structuredJobLogs[0].JobEntries)) - for i := 0; int64(i) < chunkCount; i++ { - offset := min(s.chunkSize*int64(i+1), fileSize) - assert.Equal(t, offset, structuredJobLogs[0].JobEntries[i].Offset) + + for fileIndex := 0; fileIndex < 2; fileIndex++ { + assert.Equal(t, expectedOutcome[fileIndex].BucketName, structuredJobLogs[fileIndex].BucketName) + assert.Equal(t, expectedOutcome[fileIndex].ObjectName, structuredJobLogs[fileIndex].ObjectName) + + // We need to check that downloadedOffset is always greater than the previous downloadedOffset + // and is in multiples of chunkSize. + entriesLen := len(structuredJobLogs[fileIndex].JobEntries) + for entryIndex := 1; entryIndex < entriesLen; entryIndex++ { + offsetDiff := structuredJobLogs[fileIndex].JobEntries[entryIndex].Offset - structuredJobLogs[fileIndex].JobEntries[entryIndex-1].Offset + assert.Greater(t, offsetDiff, int64(0)) + // This is true for all entries except last one. + // Will be true for last entry only if the fileSize is multiple of chunkSize. + assert.Equal(t, int64(0), offsetDiff%s.chunkSize) } + + // Validate that last downloadedOffset is same as fileSize. + assert.Equal(t, fileSize, structuredJobLogs[fileIndex].JobEntries[entriesLen-1].Offset) } } @@ -182,7 +188,7 @@ func TestJobChunkTest(t *testing.T) { // with unlimited max parallel downloads. ts.flags = []string{"--config-file=" + createConfigFileForJobChunkTest(cacheSizeMB, false, "unlimitedMaxParallelDownloads", parallelDownloadsPerFile, maxParallelDownloads, downloadChunkSizeMB)} - ts.chunkSize = parallelDownloadsPerFile * downloadChunkSizeMB * util.MiB + ts.chunkSize = downloadChunkSizeMB * util.MiB log.Printf("Running tests with flags: %s", ts.flags) test_setup.RunTests(t, ts) @@ -193,7 +199,7 @@ func TestJobChunkTest(t *testing.T) { downloadChunkSizeMB := 3 ts.flags = []string{"--config-file=" + createConfigFileForJobChunkTest(cacheSizeMB, false, "limitedMaxParallelDownloadsNotEffectingChunkSize", parallelDownloadsPerFile, maxParallelDownloads, downloadChunkSizeMB)} - ts.chunkSize = int64(parallelDownloadsPerFile) * int64(downloadChunkSizeMB) * util.MiB + ts.chunkSize = int64(downloadChunkSizeMB) * util.MiB log.Printf("Running tests with flags: %s", ts.flags) test_setup.RunTests(t, ts) @@ -204,8 +210,7 @@ func TestJobChunkTest(t *testing.T) { downloadChunkSizeMB = 3 ts.flags = []string{"--config-file=" + createConfigFileForJobChunkTest(cacheSizeMB, false, "limitedMaxParallelDownloadsEffectingChunkSize", parallelDownloadsPerFile, maxParallelDownloads, downloadChunkSizeMB)} - ts.chunkSize = int64(maxParallelDownloads+1) * int64(downloadChunkSizeMB) * util.MiB - ts.isLimitedByMaxParallelDownloads = true + ts.chunkSize = int64(downloadChunkSizeMB) * util.MiB log.Printf("Running tests with flags: %s", ts.flags) test_setup.RunTests(t, ts)