Skip to content

Commit

Permalink
Making compute crc method context cancellable (#2013)
Browse files Browse the repository at this point in the history
* making computecrc method context cancellable

* Updated the error message

* fix build

* fixing tests

* ADded test case to verify if crc computation is cancelled

* code review comments

* fix lint
  • Loading branch information
vadlakondaswetha authored Jun 17, 2024
1 parent 21e4ae4 commit b6eca68
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 36 deletions.
2 changes: 1 addition & 1 deletion internal/cache/file/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (dt *downloaderTest) waitForCrcCheckToBeCompleted() {
// Hence, explicitly waiting till the CRC check is done.
for {
dt.job.mu.Lock()
if dt.job.status.Name == Completed || dt.job.status.Name == Failed {
if dt.job.status.Name == Completed || dt.job.status.Name == Failed || dt.job.status.Name == Invalid {
dt.job.mu.Unlock()
break
}
Expand Down
51 changes: 28 additions & 23 deletions internal/cache/file/downloader/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,13 +305,6 @@ func (job *Job) downloadObjectAsync() {
}
}()

notifyInvalid := func() {
job.mu.Lock()
job.status.Name = Invalid
job.notifySubscribers()
job.mu.Unlock()
}

var newReader io.ReadCloser
var start, end, sequentialReadSize, newReaderLimit int64
end = int64(job.object.Size)
Expand All @@ -337,14 +330,8 @@ func (job *Job) downloadObjectAsync() {
ReadCompressed: job.object.HasContentEncodingGzip(),
})
if err != nil {
// Context is canceled when job.cancel is called at the time of
// invalidation and hence caller should be notified as invalid.
if errors.Is(err, context.Canceled) {
notifyInvalid()
return
}
err = fmt.Errorf("downloadObjectAsync: error in creating NewReader with start %d and limit %d: %w", start, newReaderLimit, err)
job.failWhileDownloading(err)
job.handleError(err)
return
}
monitor.CaptureGCSReadMetrics(job.cancelCtx, util.Sequential, newReaderLimit-start)
Expand All @@ -361,14 +348,8 @@ func (job *Job) downloadObjectAsync() {
// Copy the contents from NewReader to cache file.
_, readErr := io.CopyN(cacheFile, newReader, maxRead)
if readErr != nil {
// Context is canceled when job.cancel is called at the time of
// invalidation and hence caller should be notified as invalid.
if errors.Is(readErr, context.Canceled) {
notifyInvalid()
return
}
err = fmt.Errorf("downloadObjectAsync: error at the time of copying content to cache file %w", readErr)
job.failWhileDownloading(err)
job.handleError(err)
return
}
start += maxRead
Expand Down Expand Up @@ -406,7 +387,7 @@ func (job *Job) downloadObjectAsync() {
} else {
err = job.validateCRC()
if err != nil {
job.failWhileDownloading(err)
job.handleError(err)
return
}

Expand Down Expand Up @@ -483,7 +464,7 @@ func (job *Job) validateCRC() (err error) {
return
}

crc32Val, err := cacheutil.CalculateFileCRC32(job.fileSpec.Path)
crc32Val, err := cacheutil.CalculateFileCRC32(job.cancelCtx, job.fileSpec.Path)
if err != nil {
return
}
Expand Down Expand Up @@ -515,3 +496,27 @@ func (job *Job) validateCRC() (err error) {

return
}

// Performs different actions based on the type of error.
// For context.Canceled it marks the job as invalid and notifies subscribers.
// For other errors, marks the job as failed and notifies subscribers.
func (job *Job) handleError(err error) {
// Context is canceled when job.cancel is called at the time of
// invalidation and hence caller should be notified as invalid.
if errors.Is(err, context.Canceled) {
job.notifyInvalid()
return
}

job.failWhileDownloading(err)
}

// Sets the status as invalid and notifies the subscribers.
//
// Acquires and releases LOCK(job.mu)
func (job *Job) notifyInvalid() {
job.mu.Lock()
job.status.Name = Invalid
job.notifySubscribers()
job.mu.Unlock()
}
53 changes: 53 additions & 0 deletions internal/cache/file/downloader/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,7 @@ func (dt *downloaderTest) Test_validateCRC_ForTamperedFileWhenEnableCrcCheckIsTr
err = os.WriteFile(dt.fileSpec.Path, []byte("test"), 0644)
AssertEq(nil, err)

dt.job.cancelCtx, dt.job.cancelFunc = context.WithCancel(context.Background())
err = dt.job.validateCRC()

AssertNe(nil, err)
Expand Down Expand Up @@ -840,6 +841,7 @@ func (dt *downloaderTest) Test_validateCRC_ForTamperedFileWhenEnableCrcCheckIsFa
AssertEq(nil, err)
dt.job.fileCacheConfig.EnableCrcCheck = false

dt.job.cancelCtx, dt.job.cancelFunc = context.WithCancel(context.Background())
err = dt.job.validateCRC()

AssertEq(nil, err)
Expand All @@ -848,3 +850,54 @@ func (dt *downloaderTest) Test_validateCRC_ForTamperedFileWhenEnableCrcCheckIsFa
// Verify fileInfoCache update
dt.verifyFileInfoEntry(uint64(jobStatus.Offset))
}

func (dt *downloaderTest) Test_validateCRC_WheContextIsCancelled() {
objectName := "path/in/gcs/file2.txt"
objectSize := 10 * util.MiB
objectContent := testutil.GenerateRandomBytes(objectSize)
dt.initJobTest(objectName, objectContent, DefaultSequentialReadSizeMb, uint64(2*objectSize), func() {})
// Start download
offset := int64(10 * util.MiB)
_, err := dt.job.Download(context.Background(), offset, true)
AssertEq(nil, err)
AssertEq(Downloading, dt.job.status.Name)
AssertEq(nil, dt.job.status.Err)
AssertGe(dt.job.status.Offset, offset)

dt.job.cancelFunc()
dt.waitForCrcCheckToBeCompleted()

AssertEq(Invalid, dt.job.status.Name)
AssertEq(nil, dt.job.status.Err)
}

func (dt *downloaderTest) Test_handleError_SetStatusAsInvalidWhenContextIsCancelled() {
subscriberOffset := int64(1)
notificationC := dt.job.subscribe(subscriberOffset)
err := errors.Join(context.Canceled)

err = fmt.Errorf("Wrapping with custom message %w", err)
dt.job.handleError(err)

AssertEq(0, dt.job.subscribers.Len())
notification, ok := <-notificationC
jobStatus := JobStatus{Name: Invalid, Err: nil, Offset: 0}
AssertTrue(reflect.DeepEqual(jobStatus, notification))
AssertEq(true, ok)
}

func (dt *downloaderTest) Test_handleError_SetStatusAsErrorWhenContextIsNotCancelled() {
subscriberOffset := int64(1)
notificationC := dt.job.subscribe(subscriberOffset)
err := errors.New("custom error")

updatedErr := fmt.Errorf("Custom message %w", err)
dt.job.handleError(updatedErr)

AssertEq(0, dt.job.subscribers.Len())
notification, ok := <-notificationC
jobStatus := JobStatus{Name: Failed, Err: updatedErr, Offset: 0}
fmt.Println(notification)
AssertTrue(reflect.DeepEqual(jobStatus, notification))
AssertEq(true, ok)
}
24 changes: 15 additions & 9 deletions internal/cache/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package util

import (
"context"
"fmt"
"hash/crc32"
"io"
Expand Down Expand Up @@ -132,30 +133,35 @@ func CreateCacheDirectoryIfNotPresentAt(dirPath string, dirPerm os.FileMode) err
return nil
}

func calculateCRC32(reader io.Reader) (uint32, error) {
func calculateCRC32(ctx context.Context, reader io.Reader) (uint32, error) {
table := crc32.MakeTable(crc32.Castagnoli)
checksum := crc32.Checksum([]byte(""), table)
buf := make([]byte, BufferSizeForCRC)
for {
switch n, err := reader.Read(buf); err {
case nil:
checksum = crc32.Update(checksum, table, buf[:n])
case io.EOF:
return checksum, nil
select {
case <-ctx.Done():
return 0, fmt.Errorf("CRC computation is cancelled: %w", ctx.Err())
default:
return 0, err
switch n, err := reader.Read(buf); err {
case nil:
checksum = crc32.Update(checksum, table, buf[:n])
case io.EOF:
return checksum, nil
default:
return 0, err
}
}
}
}

// CalculateFileCRC32 calculates and returns the CRC-32 checksum of a file.
func CalculateFileCRC32(filePath string) (uint32, error) {
func CalculateFileCRC32(ctx context.Context, filePath string) (uint32, error) {
// Open file with simplified flags and permissions
file, err := os.Open(filePath)
if err != nil {
return 0, fmt.Errorf("error opening file: %w", err)
}
defer file.Close() // Ensure file closure

return calculateCRC32(file)
return calculateCRC32(ctx, file)
}
17 changes: 14 additions & 3 deletions internal/cache/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
testutil "github.com/googlecloudplatform/gcsfuse/v2/internal/util"
"github.com/googlecloudplatform/gcsfuse/v2/tools/integration_tests/util/operations"
. "github.com/jacobsa/ogletest"
"golang.org/x/net/context"
)

func TestUtil(t *testing.T) { RunTests(t) }
Expand Down Expand Up @@ -247,26 +248,36 @@ func (ut *utilTest) Test_IsCacheHandleValid_False() {
}

func (ut *utilTest) Test_CalculateFileCRC32_ShouldReturnCrcForValidFile() {
crc, err := CalculateFileCRC32("testdata/validfile.txt")
crc, err := CalculateFileCRC32(context.Background(), "testdata/validfile.txt")

ExpectEq(nil, err)
ExpectEq(515179668, crc)
}

func (ut *utilTest) Test_CalculateFileCRC32_ShouldReturnZeroForEmptyFile() {
crc, err := CalculateFileCRC32("testdata/emptyfile.txt")
crc, err := CalculateFileCRC32(context.Background(), "testdata/emptyfile.txt")

ExpectEq(nil, err)
ExpectEq(0, crc)
}

func (ut *utilTest) Test_CalculateFileCRC32_ShouldReturnErrorForFileNotExist() {
crc, err := CalculateFileCRC32("testdata/nofile.txt")
crc, err := CalculateFileCRC32(context.Background(), "testdata/nofile.txt")

ExpectTrue(strings.Contains(err.Error(), "no such file or directory"))
ExpectEq(0, crc)
}

func (ut *utilTest) Test_CalculateFileCRC32_ShouldReturnErrorWhenContextIsCancelled() {
ctx, cancelFunc := context.WithCancel(context.Background())
cancelFunc()
crc, err := CalculateFileCRC32(ctx, "testdata/validfile.txt")

ExpectTrue(errors.Is(err, context.Canceled))
ExpectTrue(strings.Contains(err.Error(), "CRC computation is cancelled"))
ExpectEq(0, crc)
}

func Test_CreateCacheDirectoryIfNotPresentAt_ShouldNotReturnAnyErrorWhenDirectoryExists(t *testing.T) {
base := path.Join("./", string(testutil.GenerateRandomBytes(4)))
dirPath := path.Join(base, "/", "path/cachedir")
Expand Down

0 comments on commit b6eca68

Please sign in to comment.