Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Making compute crc method context cancellable #2013

Merged
merged 7 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/cache/file/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (dt *downloaderTest) waitForCrcCheckToBeCompleted() {
// Hence, explicitly waiting till the CRC check is done.
for {
dt.job.mu.Lock()
if dt.job.status.Name == Completed || dt.job.status.Name == Failed {
if dt.job.status.Name == Completed || dt.job.status.Name == Failed || dt.job.status.Name == Invalid {
dt.job.mu.Unlock()
break
}
Expand Down
51 changes: 28 additions & 23 deletions internal/cache/file/downloader/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,13 +305,6 @@ func (job *Job) downloadObjectAsync() {
}
}()

notifyInvalid := func() {
job.mu.Lock()
job.status.Name = Invalid
job.notifySubscribers()
job.mu.Unlock()
}

var newReader io.ReadCloser
var start, end, sequentialReadSize, newReaderLimit int64
end = int64(job.object.Size)
Expand All @@ -337,14 +330,8 @@ func (job *Job) downloadObjectAsync() {
ReadCompressed: job.object.HasContentEncodingGzip(),
})
if err != nil {
// Context is canceled when job.cancel is called at the time of
// invalidation and hence caller should be notified as invalid.
if errors.Is(err, context.Canceled) {
notifyInvalid()
return
}
err = fmt.Errorf("downloadObjectAsync: error in creating NewReader with start %d and limit %d: %w", start, newReaderLimit, err)
job.failWhileDownloading(err)
job.handleError(err)
return
}
monitor.CaptureGCSReadMetrics(job.cancelCtx, util.Sequential, newReaderLimit-start)
Expand All @@ -361,14 +348,8 @@ func (job *Job) downloadObjectAsync() {
// Copy the contents from NewReader to cache file.
_, readErr := io.CopyN(cacheFile, newReader, maxRead)
if readErr != nil {
// Context is canceled when job.cancel is called at the time of
// invalidation and hence caller should be notified as invalid.
if errors.Is(readErr, context.Canceled) {
notifyInvalid()
return
}
err = fmt.Errorf("downloadObjectAsync: error at the time of copying content to cache file %w", readErr)
job.failWhileDownloading(err)
job.handleError(err)
return
}
start += maxRead
Expand Down Expand Up @@ -406,7 +387,7 @@ func (job *Job) downloadObjectAsync() {
} else {
err = job.validateCRC()
if err != nil {
job.failWhileDownloading(err)
job.handleError(err)
return
}

Expand Down Expand Up @@ -483,7 +464,7 @@ func (job *Job) validateCRC() (err error) {
return
}

crc32Val, err := cacheutil.CalculateFileCRC32(job.fileSpec.Path)
crc32Val, err := cacheutil.CalculateFileCRC32(job.cancelCtx, job.fileSpec.Path)
if err != nil {
return
vadlakondaswetha marked this conversation as resolved.
Show resolved Hide resolved
}
Expand Down Expand Up @@ -515,3 +496,27 @@ func (job *Job) validateCRC() (err error) {

return
}

// Performs different actions based on the type of error.
// For context.Canceled it marks the job as invalid and notifies subscribers.
// For other errors, marks the job as failed and notifies subscribers.
func (job *Job) handleError(err error) {
// Context is canceled when job.cancel is called at the time of
// invalidation and hence caller should be notified as invalid.
if errors.Is(err, context.Canceled) {
job.notifyInvalid()
return
}

job.failWhileDownloading(err)
}

// Sets the status as invalid and notifies the subscribers.
//
// Acquires and releases LOCK(job.mu)
func (job *Job) notifyInvalid() {
job.mu.Lock()
job.status.Name = Invalid
job.notifySubscribers()
job.mu.Unlock()
}
53 changes: 53 additions & 0 deletions internal/cache/file/downloader/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,7 @@ func (dt *downloaderTest) Test_validateCRC_ForTamperedFileWhenEnableCrcCheckIsTr
err = os.WriteFile(dt.fileSpec.Path, []byte("test"), 0644)
AssertEq(nil, err)

dt.job.cancelCtx, dt.job.cancelFunc = context.WithCancel(context.Background())
err = dt.job.validateCRC()

AssertNe(nil, err)
Expand Down Expand Up @@ -840,6 +841,7 @@ func (dt *downloaderTest) Test_validateCRC_ForTamperedFileWhenEnableCrcCheckIsFa
AssertEq(nil, err)
dt.job.fileCacheConfig.EnableCrcCheck = false

dt.job.cancelCtx, dt.job.cancelFunc = context.WithCancel(context.Background())
err = dt.job.validateCRC()

AssertEq(nil, err)
Expand All @@ -848,3 +850,54 @@ func (dt *downloaderTest) Test_validateCRC_ForTamperedFileWhenEnableCrcCheckIsFa
// Verify fileInfoCache update
dt.verifyFileInfoEntry(uint64(jobStatus.Offset))
}

func (dt *downloaderTest) Test_validateCRC_WheContextIsCancelled() {
objectName := "path/in/gcs/file2.txt"
objectSize := 10 * util.MiB
sethiay marked this conversation as resolved.
Show resolved Hide resolved
objectContent := testutil.GenerateRandomBytes(objectSize)
dt.initJobTest(objectName, objectContent, DefaultSequentialReadSizeMb, uint64(2*objectSize), func() {})
// Start download
offset := int64(10 * util.MiB)
_, err := dt.job.Download(context.Background(), offset, true)
AssertEq(nil, err)
AssertEq(Downloading, dt.job.status.Name)
AssertEq(nil, dt.job.status.Err)
AssertGe(dt.job.status.Offset, offset)

dt.job.cancelFunc()
dt.waitForCrcCheckToBeCompleted()

AssertEq(Invalid, dt.job.status.Name)
AssertEq(nil, dt.job.status.Err)
}

func (dt *downloaderTest) Test_handleError_SetStatusAsInvalidWhenContextIsCancelled() {
subscriberOffset := int64(1)
notificationC := dt.job.subscribe(subscriberOffset)
err := errors.Join(context.Canceled)

err = fmt.Errorf("Wrapping with custom message %w", err)
dt.job.handleError(err)

AssertEq(0, dt.job.subscribers.Len())
notification, ok := <-notificationC
jobStatus := JobStatus{Name: Invalid, Err: nil, Offset: 0}
AssertTrue(reflect.DeepEqual(jobStatus, notification))
AssertEq(true, ok)
}

func (dt *downloaderTest) Test_handleError_SetStatusAsErrorWhenContextIsNotCancelled() {
subscriberOffset := int64(1)
notificationC := dt.job.subscribe(subscriberOffset)
err := errors.New("custom error")

updatedErr := fmt.Errorf("Custom message %w", err)
dt.job.handleError(updatedErr)

AssertEq(0, dt.job.subscribers.Len())
notification, ok := <-notificationC
jobStatus := JobStatus{Name: Failed, Err: updatedErr, Offset: 0}
fmt.Println(notification)
AssertTrue(reflect.DeepEqual(jobStatus, notification))
AssertEq(true, ok)
}
24 changes: 15 additions & 9 deletions internal/cache/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package util

import (
"context"
"fmt"
"hash/crc32"
"io"
Expand Down Expand Up @@ -132,30 +133,35 @@ func CreateCacheDirectoryIfNotPresentAt(dirPath string, dirPerm os.FileMode) err
return nil
}

func calculateCRC32(reader io.Reader) (uint32, error) {
func calculateCRC32(ctx context.Context, reader io.Reader) (uint32, error) {
table := crc32.MakeTable(crc32.Castagnoli)
checksum := crc32.Checksum([]byte(""), table)
buf := make([]byte, BufferSizeForCRC)
for {
switch n, err := reader.Read(buf); err {
case nil:
checksum = crc32.Update(checksum, table, buf[:n])
case io.EOF:
return checksum, nil
select {
case <-ctx.Done():
return 0, fmt.Errorf("CRC computation is cancelled: %w", ctx.Err())
default:
return 0, err
switch n, err := reader.Read(buf); err {
case nil:
checksum = crc32.Update(checksum, table, buf[:n])
case io.EOF:
return checksum, nil
default:
return 0, err
}
}
}
}

// CalculateFileCRC32 calculates and returns the CRC-32 checksum of a file.
func CalculateFileCRC32(filePath string) (uint32, error) {
func CalculateFileCRC32(ctx context.Context, filePath string) (uint32, error) {
// Open file with simplified flags and permissions
file, err := os.Open(filePath)
if err != nil {
return 0, fmt.Errorf("error opening file: %w", err)
}
defer file.Close() // Ensure file closure

return calculateCRC32(file)
return calculateCRC32(ctx, file)
}
17 changes: 14 additions & 3 deletions internal/cache/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
testutil "github.com/googlecloudplatform/gcsfuse/v2/internal/util"
"github.com/googlecloudplatform/gcsfuse/v2/tools/integration_tests/util/operations"
. "github.com/jacobsa/ogletest"
"golang.org/x/net/context"
)

func TestUtil(t *testing.T) { RunTests(t) }
Expand Down Expand Up @@ -247,26 +248,36 @@ func (ut *utilTest) Test_IsCacheHandleValid_False() {
}

func (ut *utilTest) Test_CalculateFileCRC32_ShouldReturnCrcForValidFile() {
crc, err := CalculateFileCRC32("testdata/validfile.txt")
crc, err := CalculateFileCRC32(context.Background(), "testdata/validfile.txt")

ExpectEq(nil, err)
ExpectEq(515179668, crc)
}

func (ut *utilTest) Test_CalculateFileCRC32_ShouldReturnZeroForEmptyFile() {
crc, err := CalculateFileCRC32("testdata/emptyfile.txt")
crc, err := CalculateFileCRC32(context.Background(), "testdata/emptyfile.txt")

ExpectEq(nil, err)
ExpectEq(0, crc)
}

func (ut *utilTest) Test_CalculateFileCRC32_ShouldReturnErrorForFileNotExist() {
crc, err := CalculateFileCRC32("testdata/nofile.txt")
crc, err := CalculateFileCRC32(context.Background(), "testdata/nofile.txt")

ExpectTrue(strings.Contains(err.Error(), "no such file or directory"))
ExpectEq(0, crc)
}

func (ut *utilTest) Test_CalculateFileCRC32_ShouldReturnErrorWhenContextIsCancelled() {
ctx, cancelFunc := context.WithCancel(context.Background())
cancelFunc()
crc, err := CalculateFileCRC32(ctx, "testdata/validfile.txt")

ExpectTrue(errors.Is(err, context.Canceled))
ExpectTrue(strings.Contains(err.Error(), "CRC computation is cancelled"))
ExpectEq(0, crc)
}

func Test_CreateCacheDirectoryIfNotPresentAt_ShouldNotReturnAnyErrorWhenDirectoryExists(t *testing.T) {
base := path.Join("./", string(testutil.GenerateRandomBytes(4)))
dirPath := path.Join(base, "/", "path/cachedir")
Expand Down
Loading