diff --git a/internal/cache/file/downloader/downloader_test.go b/internal/cache/file/downloader/downloader_test.go index 4d766ad593..c28cde3fb8 100644 --- a/internal/cache/file/downloader/downloader_test.go +++ b/internal/cache/file/downloader/downloader_test.go @@ -76,7 +76,7 @@ func (dt *downloaderTest) waitForCrcCheckToBeCompleted() { // Hence, explicitly waiting till the CRC check is done. for { dt.job.mu.Lock() - if dt.job.status.Name == Completed || dt.job.status.Name == Failed { + if dt.job.status.Name == Completed || dt.job.status.Name == Failed || dt.job.status.Name == Invalid { dt.job.mu.Unlock() break } diff --git a/internal/cache/file/downloader/job.go b/internal/cache/file/downloader/job.go index 6a22f1f467..541b37b366 100644 --- a/internal/cache/file/downloader/job.go +++ b/internal/cache/file/downloader/job.go @@ -305,13 +305,6 @@ func (job *Job) downloadObjectAsync() { } }() - notifyInvalid := func() { - job.mu.Lock() - job.status.Name = Invalid - job.notifySubscribers() - job.mu.Unlock() - } - var newReader io.ReadCloser var start, end, sequentialReadSize, newReaderLimit int64 end = int64(job.object.Size) @@ -337,14 +330,8 @@ func (job *Job) downloadObjectAsync() { ReadCompressed: job.object.HasContentEncodingGzip(), }) if err != nil { - // Context is canceled when job.cancel is called at the time of - // invalidation and hence caller should be notified as invalid. - if errors.Is(err, context.Canceled) { - notifyInvalid() - return - } err = fmt.Errorf("downloadObjectAsync: error in creating NewReader with start %d and limit %d: %w", start, newReaderLimit, err) - job.failWhileDownloading(err) + job.handleError(err) return } monitor.CaptureGCSReadMetrics(job.cancelCtx, util.Sequential, newReaderLimit-start) @@ -361,14 +348,8 @@ func (job *Job) downloadObjectAsync() { // Copy the contents from NewReader to cache file. _, readErr := io.CopyN(cacheFile, newReader, maxRead) if readErr != nil { - // Context is canceled when job.cancel is called at the time of - // invalidation and hence caller should be notified as invalid. - if errors.Is(readErr, context.Canceled) { - notifyInvalid() - return - } err = fmt.Errorf("downloadObjectAsync: error at the time of copying content to cache file %w", readErr) - job.failWhileDownloading(err) + job.handleError(err) return } start += maxRead @@ -406,7 +387,7 @@ func (job *Job) downloadObjectAsync() { } else { err = job.validateCRC() if err != nil { - job.failWhileDownloading(err) + job.handleError(err) return } @@ -483,7 +464,7 @@ func (job *Job) validateCRC() (err error) { return } - crc32Val, err := cacheutil.CalculateFileCRC32(job.fileSpec.Path) + crc32Val, err := cacheutil.CalculateFileCRC32(job.cancelCtx, job.fileSpec.Path) if err != nil { return } @@ -515,3 +496,27 @@ func (job *Job) validateCRC() (err error) { return } + +// Performs different actions based on the type of error. +// For context.Canceled it marks the job as invalid and notifies subscribers. +// For other errors, marks the job as failed and notifies subscribers. +func (job *Job) handleError(err error) { + // Context is canceled when job.cancel is called at the time of + // invalidation and hence caller should be notified as invalid. + if errors.Is(err, context.Canceled) { + job.notifyInvalid() + return + } + + job.failWhileDownloading(err) +} + +// Sets the status as invalid and notifies the subscribers. +// +// Acquires and releases LOCK(job.mu) +func (job *Job) notifyInvalid() { + job.mu.Lock() + job.status.Name = Invalid + job.notifySubscribers() + job.mu.Unlock() +} diff --git a/internal/cache/file/downloader/job_test.go b/internal/cache/file/downloader/job_test.go index de859f17c7..26299f8128 100644 --- a/internal/cache/file/downloader/job_test.go +++ b/internal/cache/file/downloader/job_test.go @@ -807,6 +807,7 @@ func (dt *downloaderTest) Test_validateCRC_ForTamperedFileWhenEnableCrcCheckIsTr err = os.WriteFile(dt.fileSpec.Path, []byte("test"), 0644) AssertEq(nil, err) + dt.job.cancelCtx, dt.job.cancelFunc = context.WithCancel(context.Background()) err = dt.job.validateCRC() AssertNe(nil, err) @@ -840,6 +841,7 @@ func (dt *downloaderTest) Test_validateCRC_ForTamperedFileWhenEnableCrcCheckIsFa AssertEq(nil, err) dt.job.fileCacheConfig.EnableCrcCheck = false + dt.job.cancelCtx, dt.job.cancelFunc = context.WithCancel(context.Background()) err = dt.job.validateCRC() AssertEq(nil, err) @@ -848,3 +850,54 @@ func (dt *downloaderTest) Test_validateCRC_ForTamperedFileWhenEnableCrcCheckIsFa // Verify fileInfoCache update dt.verifyFileInfoEntry(uint64(jobStatus.Offset)) } + +func (dt *downloaderTest) Test_validateCRC_WheContextIsCancelled() { + objectName := "path/in/gcs/file2.txt" + objectSize := 10 * util.MiB + objectContent := testutil.GenerateRandomBytes(objectSize) + dt.initJobTest(objectName, objectContent, DefaultSequentialReadSizeMb, uint64(2*objectSize), func() {}) + // Start download + offset := int64(10 * util.MiB) + _, err := dt.job.Download(context.Background(), offset, true) + AssertEq(nil, err) + AssertEq(Downloading, dt.job.status.Name) + AssertEq(nil, dt.job.status.Err) + AssertGe(dt.job.status.Offset, offset) + + dt.job.cancelFunc() + dt.waitForCrcCheckToBeCompleted() + + AssertEq(Invalid, dt.job.status.Name) + AssertEq(nil, dt.job.status.Err) +} + +func (dt *downloaderTest) Test_handleError_SetStatusAsInvalidWhenContextIsCancelled() { + subscriberOffset := int64(1) + notificationC := dt.job.subscribe(subscriberOffset) + err := errors.Join(context.Canceled) + + err = fmt.Errorf("Wrapping with custom message %w", err) + dt.job.handleError(err) + + AssertEq(0, dt.job.subscribers.Len()) + notification, ok := <-notificationC + jobStatus := JobStatus{Name: Invalid, Err: nil, Offset: 0} + AssertTrue(reflect.DeepEqual(jobStatus, notification)) + AssertEq(true, ok) +} + +func (dt *downloaderTest) Test_handleError_SetStatusAsErrorWhenContextIsNotCancelled() { + subscriberOffset := int64(1) + notificationC := dt.job.subscribe(subscriberOffset) + err := errors.New("custom error") + + updatedErr := fmt.Errorf("Custom message %w", err) + dt.job.handleError(updatedErr) + + AssertEq(0, dt.job.subscribers.Len()) + notification, ok := <-notificationC + jobStatus := JobStatus{Name: Failed, Err: updatedErr, Offset: 0} + fmt.Println(notification) + AssertTrue(reflect.DeepEqual(jobStatus, notification)) + AssertEq(true, ok) +} diff --git a/internal/cache/util/util.go b/internal/cache/util/util.go index b6bcfbd9ee..a8333050d8 100644 --- a/internal/cache/util/util.go +++ b/internal/cache/util/util.go @@ -15,6 +15,7 @@ package util import ( + "context" "fmt" "hash/crc32" "io" @@ -132,24 +133,29 @@ func CreateCacheDirectoryIfNotPresentAt(dirPath string, dirPerm os.FileMode) err return nil } -func calculateCRC32(reader io.Reader) (uint32, error) { +func calculateCRC32(ctx context.Context, reader io.Reader) (uint32, error) { table := crc32.MakeTable(crc32.Castagnoli) checksum := crc32.Checksum([]byte(""), table) buf := make([]byte, BufferSizeForCRC) for { - switch n, err := reader.Read(buf); err { - case nil: - checksum = crc32.Update(checksum, table, buf[:n]) - case io.EOF: - return checksum, nil + select { + case <-ctx.Done(): + return 0, fmt.Errorf("CRC computation is cancelled: %w", ctx.Err()) default: - return 0, err + switch n, err := reader.Read(buf); err { + case nil: + checksum = crc32.Update(checksum, table, buf[:n]) + case io.EOF: + return checksum, nil + default: + return 0, err + } } } } // CalculateFileCRC32 calculates and returns the CRC-32 checksum of a file. -func CalculateFileCRC32(filePath string) (uint32, error) { +func CalculateFileCRC32(ctx context.Context, filePath string) (uint32, error) { // Open file with simplified flags and permissions file, err := os.Open(filePath) if err != nil { @@ -157,5 +163,5 @@ func CalculateFileCRC32(filePath string) (uint32, error) { } defer file.Close() // Ensure file closure - return calculateCRC32(file) + return calculateCRC32(ctx, file) } diff --git a/internal/cache/util/util_test.go b/internal/cache/util/util_test.go index 709dbfbba8..1ac435adcd 100644 --- a/internal/cache/util/util_test.go +++ b/internal/cache/util/util_test.go @@ -28,6 +28,7 @@ import ( testutil "github.com/googlecloudplatform/gcsfuse/v2/internal/util" "github.com/googlecloudplatform/gcsfuse/v2/tools/integration_tests/util/operations" . "github.com/jacobsa/ogletest" + "golang.org/x/net/context" ) func TestUtil(t *testing.T) { RunTests(t) } @@ -247,26 +248,36 @@ func (ut *utilTest) Test_IsCacheHandleValid_False() { } func (ut *utilTest) Test_CalculateFileCRC32_ShouldReturnCrcForValidFile() { - crc, err := CalculateFileCRC32("testdata/validfile.txt") + crc, err := CalculateFileCRC32(context.Background(), "testdata/validfile.txt") ExpectEq(nil, err) ExpectEq(515179668, crc) } func (ut *utilTest) Test_CalculateFileCRC32_ShouldReturnZeroForEmptyFile() { - crc, err := CalculateFileCRC32("testdata/emptyfile.txt") + crc, err := CalculateFileCRC32(context.Background(), "testdata/emptyfile.txt") ExpectEq(nil, err) ExpectEq(0, crc) } func (ut *utilTest) Test_CalculateFileCRC32_ShouldReturnErrorForFileNotExist() { - crc, err := CalculateFileCRC32("testdata/nofile.txt") + crc, err := CalculateFileCRC32(context.Background(), "testdata/nofile.txt") ExpectTrue(strings.Contains(err.Error(), "no such file or directory")) ExpectEq(0, crc) } +func (ut *utilTest) Test_CalculateFileCRC32_ShouldReturnErrorWhenContextIsCancelled() { + ctx, cancelFunc := context.WithCancel(context.Background()) + cancelFunc() + crc, err := CalculateFileCRC32(ctx, "testdata/validfile.txt") + + ExpectTrue(errors.Is(err, context.Canceled)) + ExpectTrue(strings.Contains(err.Error(), "CRC computation is cancelled")) + ExpectEq(0, crc) +} + func Test_CreateCacheDirectoryIfNotPresentAt_ShouldNotReturnAnyErrorWhenDirectoryExists(t *testing.T) { base := path.Join("./", string(testutil.GenerateRandomBytes(4))) dirPath := path.Join(base, "/", "path/cachedir")