From c39cefe920a8ac15a47bedda2856189009e94803 Mon Sep 17 00:00:00 2001 From: vadlakondaswetha Date: Thu, 13 Jun 2024 10:55:26 +0000 Subject: [PATCH 1/7] making computecrc method context cancellable --- internal/cache/util/util.go | 25 ++++++++++++++++--------- internal/cache/util/util_test.go | 17 ++++++++++++++--- 2 files changed, 30 insertions(+), 12 deletions(-) diff --git a/internal/cache/util/util.go b/internal/cache/util/util.go index b6bcfbd9ee..ee39d16c7a 100644 --- a/internal/cache/util/util.go +++ b/internal/cache/util/util.go @@ -15,6 +15,8 @@ package util import ( + "context" + "errors" "fmt" "hash/crc32" "io" @@ -132,24 +134,29 @@ func CreateCacheDirectoryIfNotPresentAt(dirPath string, dirPerm os.FileMode) err return nil } -func calculateCRC32(reader io.Reader) (uint32, error) { +func calculateCRC32(ctx context.Context, reader io.Reader) (uint32, error) { table := crc32.MakeTable(crc32.Castagnoli) checksum := crc32.Checksum([]byte(""), table) buf := make([]byte, BufferSizeForCRC) for { - switch n, err := reader.Read(buf); err { - case nil: - checksum = crc32.Update(checksum, table, buf[:n]) - case io.EOF: - return checksum, nil + select { + case <-ctx.Done(): + return 0, errors.New("CRC computation is cancelled") default: - return 0, err + switch n, err := reader.Read(buf); err { + case nil: + checksum = crc32.Update(checksum, table, buf[:n]) + case io.EOF: + return checksum, nil + default: + return 0, err + } } } } // CalculateFileCRC32 calculates and returns the CRC-32 checksum of a file. -func CalculateFileCRC32(filePath string) (uint32, error) { +func CalculateFileCRC32(ctx context.Context, filePath string) (uint32, error) { // Open file with simplified flags and permissions file, err := os.Open(filePath) if err != nil { @@ -157,5 +164,5 @@ func CalculateFileCRC32(filePath string) (uint32, error) { } defer file.Close() // Ensure file closure - return calculateCRC32(file) + return calculateCRC32(ctx, file) } diff --git a/internal/cache/util/util_test.go b/internal/cache/util/util_test.go index 709dbfbba8..891677824f 100644 --- a/internal/cache/util/util_test.go +++ b/internal/cache/util/util_test.go @@ -15,6 +15,7 @@ package util import ( + "context" "errors" "fmt" "os" @@ -247,26 +248,36 @@ func (ut *utilTest) Test_IsCacheHandleValid_False() { } func (ut *utilTest) Test_CalculateFileCRC32_ShouldReturnCrcForValidFile() { - crc, err := CalculateFileCRC32("testdata/validfile.txt") + crc, err := CalculateFileCRC32(context.Background(), "testdata/validfile.txt") ExpectEq(nil, err) ExpectEq(515179668, crc) } func (ut *utilTest) Test_CalculateFileCRC32_ShouldReturnZeroForEmptyFile() { - crc, err := CalculateFileCRC32("testdata/emptyfile.txt") + crc, err := CalculateFileCRC32(context.Background(), "testdata/emptyfile.txt") ExpectEq(nil, err) ExpectEq(0, crc) } func (ut *utilTest) Test_CalculateFileCRC32_ShouldReturnErrorForFileNotExist() { - crc, err := CalculateFileCRC32("testdata/nofile.txt") + crc, err := CalculateFileCRC32(context.Background(), "testdata/nofile.txt") ExpectTrue(strings.Contains(err.Error(), "no such file or directory")) ExpectEq(0, crc) } +func (ut *utilTest) Test_CalculateFileCRC32_ShouldReturnErrorWhenContextIsCancelled() { + ctx, cancelFunc := context.WithCancel(context.Background()) + cancelFunc() + + crc, err := CalculateFileCRC32(ctx, "testdata/validfile.txt") + + ExpectTrue(strings.Contains(err.Error(), "CRC computation is cancelled")) + ExpectEq(0, crc) +} + func Test_CreateCacheDirectoryIfNotPresentAt_ShouldNotReturnAnyErrorWhenDirectoryExists(t *testing.T) { base := path.Join("./", string(testutil.GenerateRandomBytes(4))) dirPath := path.Join(base, "/", "path/cachedir") From e3fbb924b686b1e83339d4614e21ea4cb30f64e4 Mon Sep 17 00:00:00 2001 From: vadlakondaswetha Date: Thu, 13 Jun 2024 10:57:46 +0000 Subject: [PATCH 2/7] Updated the error message --- internal/cache/util/util.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/cache/util/util.go b/internal/cache/util/util.go index ee39d16c7a..a8333050d8 100644 --- a/internal/cache/util/util.go +++ b/internal/cache/util/util.go @@ -16,7 +16,6 @@ package util import ( "context" - "errors" "fmt" "hash/crc32" "io" @@ -141,7 +140,7 @@ func calculateCRC32(ctx context.Context, reader io.Reader) (uint32, error) { for { select { case <-ctx.Done(): - return 0, errors.New("CRC computation is cancelled") + return 0, fmt.Errorf("CRC computation is cancelled: %w", ctx.Err()) default: switch n, err := reader.Read(buf); err { case nil: From bf4d3c3acddfa02255d93f8ee507069d2e606c3f Mon Sep 17 00:00:00 2001 From: vadlakondaswetha Date: Thu, 13 Jun 2024 11:01:37 +0000 Subject: [PATCH 3/7] fix build --- internal/cache/file/downloader/job.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/cache/file/downloader/job.go b/internal/cache/file/downloader/job.go index 6a22f1f467..d81497411d 100644 --- a/internal/cache/file/downloader/job.go +++ b/internal/cache/file/downloader/job.go @@ -483,7 +483,7 @@ func (job *Job) validateCRC() (err error) { return } - crc32Val, err := cacheutil.CalculateFileCRC32(job.fileSpec.Path) + crc32Val, err := cacheutil.CalculateFileCRC32(job.cancelCtx, job.fileSpec.Path) if err != nil { return } From 5c1448770d90469c56d59ab5ecc5fb5adf03f9b7 Mon Sep 17 00:00:00 2001 From: vadlakondaswetha Date: Thu, 13 Jun 2024 11:49:23 +0000 Subject: [PATCH 4/7] fixing tests --- internal/cache/file/downloader/job_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/cache/file/downloader/job_test.go b/internal/cache/file/downloader/job_test.go index de859f17c7..e28e7cb68d 100644 --- a/internal/cache/file/downloader/job_test.go +++ b/internal/cache/file/downloader/job_test.go @@ -807,6 +807,7 @@ func (dt *downloaderTest) Test_validateCRC_ForTamperedFileWhenEnableCrcCheckIsTr err = os.WriteFile(dt.fileSpec.Path, []byte("test"), 0644) AssertEq(nil, err) + dt.job.cancelCtx, dt.job.cancelFunc = context.WithCancel(context.Background()) err = dt.job.validateCRC() AssertNe(nil, err) @@ -840,6 +841,7 @@ func (dt *downloaderTest) Test_validateCRC_ForTamperedFileWhenEnableCrcCheckIsFa AssertEq(nil, err) dt.job.fileCacheConfig.EnableCrcCheck = false + dt.job.cancelCtx, dt.job.cancelFunc = context.WithCancel(context.Background()) err = dt.job.validateCRC() AssertEq(nil, err) From b2d5ca82ca65377936ab53be3a41468aa4fc5c5d Mon Sep 17 00:00:00 2001 From: vadlakondaswetha Date: Thu, 13 Jun 2024 12:47:01 +0000 Subject: [PATCH 5/7] ADded test case to verify if crc computation is cancelled --- internal/cache/file/downloader/job_test.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/internal/cache/file/downloader/job_test.go b/internal/cache/file/downloader/job_test.go index e28e7cb68d..2b352ceaf9 100644 --- a/internal/cache/file/downloader/job_test.go +++ b/internal/cache/file/downloader/job_test.go @@ -850,3 +850,23 @@ func (dt *downloaderTest) Test_validateCRC_ForTamperedFileWhenEnableCrcCheckIsFa // Verify fileInfoCache update dt.verifyFileInfoEntry(uint64(jobStatus.Offset)) } + +func (dt *downloaderTest) Test_validateCRC_WheContextIsCancelled() { + objectName := "path/in/gcs/file2.txt" + objectSize := 10 * util.MiB + objectContent := testutil.GenerateRandomBytes(objectSize) + dt.initJobTest(objectName, objectContent, DefaultSequentialReadSizeMb, uint64(2*objectSize), func() {}) + // Start download + offset := int64(10 * util.MiB) + _, err := dt.job.Download(context.Background(), offset, true) + AssertEq(nil, err) + AssertEq(Downloading, dt.job.status.Name) + AssertEq(nil, dt.job.status.Err) + AssertGe(dt.job.status.Offset, offset) + + dt.job.cancelFunc() + dt.waitForCrcCheckToBeCompleted() + + AssertEq(Failed, dt.job.status.Name) + ExpectTrue(strings.Contains(dt.job.status.Err.Error(), "CRC computation is cancelled")) +} From ae3357ccadf05aa97eebcb0cc02916f4f253ea8b Mon Sep 17 00:00:00 2001 From: vadlakondaswetha Date: Sat, 15 Jun 2024 05:48:07 +0000 Subject: [PATCH 6/7] code review comments --- .../cache/file/downloader/downloader_test.go | 2 +- internal/cache/file/downloader/job.go | 50 +++++++++++-------- internal/cache/file/downloader/job_test.go | 35 ++++++++++++- internal/cache/util/util_test.go | 4 +- 4 files changed, 64 insertions(+), 27 deletions(-) diff --git a/internal/cache/file/downloader/downloader_test.go b/internal/cache/file/downloader/downloader_test.go index 4d766ad593..c28cde3fb8 100644 --- a/internal/cache/file/downloader/downloader_test.go +++ b/internal/cache/file/downloader/downloader_test.go @@ -76,7 +76,7 @@ func (dt *downloaderTest) waitForCrcCheckToBeCompleted() { // Hence, explicitly waiting till the CRC check is done. for { dt.job.mu.Lock() - if dt.job.status.Name == Completed || dt.job.status.Name == Failed { + if dt.job.status.Name == Completed || dt.job.status.Name == Failed || dt.job.status.Name == Invalid { dt.job.mu.Unlock() break } diff --git a/internal/cache/file/downloader/job.go b/internal/cache/file/downloader/job.go index d81497411d..d9496c49af 100644 --- a/internal/cache/file/downloader/job.go +++ b/internal/cache/file/downloader/job.go @@ -305,13 +305,6 @@ func (job *Job) downloadObjectAsync() { } }() - notifyInvalid := func() { - job.mu.Lock() - job.status.Name = Invalid - job.notifySubscribers() - job.mu.Unlock() - } - var newReader io.ReadCloser var start, end, sequentialReadSize, newReaderLimit int64 end = int64(job.object.Size) @@ -337,14 +330,8 @@ func (job *Job) downloadObjectAsync() { ReadCompressed: job.object.HasContentEncodingGzip(), }) if err != nil { - // Context is canceled when job.cancel is called at the time of - // invalidation and hence caller should be notified as invalid. - if errors.Is(err, context.Canceled) { - notifyInvalid() - return - } err = fmt.Errorf("downloadObjectAsync: error in creating NewReader with start %d and limit %d: %w", start, newReaderLimit, err) - job.failWhileDownloading(err) + job.handleError(err) return } monitor.CaptureGCSReadMetrics(job.cancelCtx, util.Sequential, newReaderLimit-start) @@ -361,14 +348,8 @@ func (job *Job) downloadObjectAsync() { // Copy the contents from NewReader to cache file. _, readErr := io.CopyN(cacheFile, newReader, maxRead) if readErr != nil { - // Context is canceled when job.cancel is called at the time of - // invalidation and hence caller should be notified as invalid. - if errors.Is(readErr, context.Canceled) { - notifyInvalid() - return - } err = fmt.Errorf("downloadObjectAsync: error at the time of copying content to cache file %w", readErr) - job.failWhileDownloading(err) + job.handleError(err) return } start += maxRead @@ -406,7 +387,7 @@ func (job *Job) downloadObjectAsync() { } else { err = job.validateCRC() if err != nil { - job.failWhileDownloading(err) + job.handleError(err) return } @@ -515,3 +496,28 @@ func (job *Job) validateCRC() (err error) { return } + +// Performs different actions based on the type of error. +// For context.Canceled it marks the job as invalid and notifies subscribers. +// For other errors, marks the job as failed and notifies subscribers. +func (job *Job) handleError(err error) { + // Context is canceled when job.cancel is called at the time of + // invalidation and hence caller should be notified as invalid. + if errors.Is(err, context.Canceled) { + job.notifyInvalid() + return + } + + job.failWhileDownloading(err) + return +} + +// Sets the status as invalid and notifies the subscribers. +// +// Acquires and releases LOCK(job.mu) +func (job *Job) notifyInvalid() { + job.mu.Lock() + job.status.Name = Invalid + job.notifySubscribers() + job.mu.Unlock() +} diff --git a/internal/cache/file/downloader/job_test.go b/internal/cache/file/downloader/job_test.go index 2b352ceaf9..26299f8128 100644 --- a/internal/cache/file/downloader/job_test.go +++ b/internal/cache/file/downloader/job_test.go @@ -867,6 +867,37 @@ func (dt *downloaderTest) Test_validateCRC_WheContextIsCancelled() { dt.job.cancelFunc() dt.waitForCrcCheckToBeCompleted() - AssertEq(Failed, dt.job.status.Name) - ExpectTrue(strings.Contains(dt.job.status.Err.Error(), "CRC computation is cancelled")) + AssertEq(Invalid, dt.job.status.Name) + AssertEq(nil, dt.job.status.Err) +} + +func (dt *downloaderTest) Test_handleError_SetStatusAsInvalidWhenContextIsCancelled() { + subscriberOffset := int64(1) + notificationC := dt.job.subscribe(subscriberOffset) + err := errors.Join(context.Canceled) + + err = fmt.Errorf("Wrapping with custom message %w", err) + dt.job.handleError(err) + + AssertEq(0, dt.job.subscribers.Len()) + notification, ok := <-notificationC + jobStatus := JobStatus{Name: Invalid, Err: nil, Offset: 0} + AssertTrue(reflect.DeepEqual(jobStatus, notification)) + AssertEq(true, ok) +} + +func (dt *downloaderTest) Test_handleError_SetStatusAsErrorWhenContextIsNotCancelled() { + subscriberOffset := int64(1) + notificationC := dt.job.subscribe(subscriberOffset) + err := errors.New("custom error") + + updatedErr := fmt.Errorf("Custom message %w", err) + dt.job.handleError(updatedErr) + + AssertEq(0, dt.job.subscribers.Len()) + notification, ok := <-notificationC + jobStatus := JobStatus{Name: Failed, Err: updatedErr, Offset: 0} + fmt.Println(notification) + AssertTrue(reflect.DeepEqual(jobStatus, notification)) + AssertEq(true, ok) } diff --git a/internal/cache/util/util_test.go b/internal/cache/util/util_test.go index 891677824f..1ac435adcd 100644 --- a/internal/cache/util/util_test.go +++ b/internal/cache/util/util_test.go @@ -15,7 +15,6 @@ package util import ( - "context" "errors" "fmt" "os" @@ -29,6 +28,7 @@ import ( testutil "github.com/googlecloudplatform/gcsfuse/v2/internal/util" "github.com/googlecloudplatform/gcsfuse/v2/tools/integration_tests/util/operations" . "github.com/jacobsa/ogletest" + "golang.org/x/net/context" ) func TestUtil(t *testing.T) { RunTests(t) } @@ -271,9 +271,9 @@ func (ut *utilTest) Test_CalculateFileCRC32_ShouldReturnErrorForFileNotExist() { func (ut *utilTest) Test_CalculateFileCRC32_ShouldReturnErrorWhenContextIsCancelled() { ctx, cancelFunc := context.WithCancel(context.Background()) cancelFunc() - crc, err := CalculateFileCRC32(ctx, "testdata/validfile.txt") + ExpectTrue(errors.Is(err, context.Canceled)) ExpectTrue(strings.Contains(err.Error(), "CRC computation is cancelled")) ExpectEq(0, crc) } From 40bbed5917ed43efa0b737039d52559e52e05ac6 Mon Sep 17 00:00:00 2001 From: vadlakondaswetha Date: Sat, 15 Jun 2024 05:51:36 +0000 Subject: [PATCH 7/7] fix lint --- internal/cache/file/downloader/job.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/cache/file/downloader/job.go b/internal/cache/file/downloader/job.go index d9496c49af..541b37b366 100644 --- a/internal/cache/file/downloader/job.go +++ b/internal/cache/file/downloader/job.go @@ -509,7 +509,6 @@ func (job *Job) handleError(err error) { } job.failWhileDownloading(err) - return } // Sets the status as invalid and notifies the subscribers.