Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Setting bwh to nil after all fileHandles are closed #2823

Merged
merged 10 commits into from
Dec 24, 2024
Merged
15 changes: 12 additions & 3 deletions internal/fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1737,7 +1737,8 @@ func (fs *fileSystem) CreateFile(
handleID := fs.nextHandleID
fs.nextHandleID++

fs.handles[handleID] = handle.NewFileHandle(child.(*inode.FileInode), fs.fileCacheHandler, fs.cacheFileForRangeRead, fs.metricHandle)
// Creating new file is always a write operation, hence passing readOnly as false.
fs.handles[handleID] = handle.NewFileHandle(child.(*inode.FileInode), fs.fileCacheHandler, fs.cacheFileForRangeRead, fs.metricHandle, false)
op.Handle = handleID

fs.mu.Unlock()
Expand Down Expand Up @@ -2372,16 +2373,24 @@ func (fs *fileSystem) OpenFile(
ctx context.Context,
op *fuseops.OpenFileOp) (err error) {
fs.mu.Lock()
defer fs.mu.Unlock()

// Find the inode.
in := fs.fileInodeOrDie(op.Inode)
// Follow lock ordering rules to get inode lock.
// Inode lock is required to register fileHandle with the inode.
fs.mu.Unlock()
in.Lock()
defer in.Unlock()

// Get the fs lock again.
fs.mu.Lock()
defer fs.mu.Unlock()

// Allocate a handle.
handleID := fs.nextHandleID
fs.nextHandleID++

fs.handles[handleID] = handle.NewFileHandle(in, fs.fileCacheHandler, fs.cacheFileForRangeRead, fs.metricHandle)
fs.handles[handleID] = handle.NewFileHandle(in, fs.fileCacheHandler, fs.cacheFileForRangeRead, fs.metricHandle, op.OpenFlags.IsReadOnly())
op.Handle = handleID

// When we observe object generations that we didn't create, we assign them
Expand Down
11 changes: 10 additions & 1 deletion internal/fs/handle/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,23 @@ type FileHandle struct {
// will be downloaded for random reads as well too.
cacheFileForRangeRead bool
metricHandle common.MetricHandle
// For now, we will consider the files which are open in append mode also as write,
// as we are not doing anything special for append. When required we will
// define an enum instead of boolean to hold the type of open.
readOnly bool
}

func NewFileHandle(inode *inode.FileInode, fileCacheHandler *file.CacheHandler, cacheFileForRangeRead bool, metricHandle common.MetricHandle) (fh *FileHandle) {
// LOCKS_REQUIRED(fh.inode.mu)
func NewFileHandle(inode *inode.FileInode, fileCacheHandler *file.CacheHandler, cacheFileForRangeRead bool, metricHandle common.MetricHandle, readOnly bool) (fh *FileHandle) {
fh = &FileHandle{
inode: inode,
fileCacheHandler: fileCacheHandler,
cacheFileForRangeRead: cacheFileForRangeRead,
metricHandle: metricHandle,
readOnly: readOnly,
}

fh.inode.RegisterFileHandle(fh.readOnly)
fh.mu = syncutil.NewInvariantMutex(fh.checkInvariants)

return
Expand All @@ -65,6 +72,8 @@ func NewFileHandle(inode *inode.FileInode, fileCacheHandler *file.CacheHandler,
// Destroy any resources associated with the handle, which must not be used
// again.
func (fh *FileHandle) Destroy() {
// Deregister the fileHandle with the inode.
fh.inode.DeRegisterFileHandle(fh.readOnly)
if fh.reader != nil {
fh.reader.Destroy()
}
Expand Down
40 changes: 40 additions & 0 deletions internal/fs/inode/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/googlecloudplatform/gcsfuse/v2/internal/contentcache"
"github.com/googlecloudplatform/gcsfuse/v2/internal/fs/gcsfuse_errors"
"github.com/googlecloudplatform/gcsfuse/v2/internal/gcsx"
"github.com/googlecloudplatform/gcsfuse/v2/internal/logger"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/storageutil"
"github.com/jacobsa/fuse/fuseops"
Expand Down Expand Up @@ -95,6 +96,15 @@ type FileInode struct {

bwh *bufferedwrites.BufferedWriteHandler
config *cfg.Config

// Once write is started on the file i.e, bwh is initialized, any fileHandles
// opened in write mode before or after this and not yet closed are considered
// as writing to the file even though they are not writing.
// In case of successful flush, we will set bwh to nil. But in case of error,
// we will keep returning that error to all the fileHandles open during that time
// and set bwh to nil after all fileHandlers are closed.
// writeHandleCount tracks the count of open fileHandles in write mode.
writeHandleCount int32
}

var _ Inode = &FileInode{}
Expand Down Expand Up @@ -369,6 +379,31 @@ func (f *FileInode) DecrementLookupCount(n uint64) (destroy bool) {
return
}

// LOCKS_REQUIRED(f.mu)
func (f *FileInode) RegisterFileHandle(readOnly bool) {
if !readOnly {
f.writeHandleCount++
}
}

// LOCKS_REQUIRED(f.mu)
func (f *FileInode) DeRegisterFileHandle(readOnly bool) {
if readOnly {
return
}

if f.writeHandleCount <= 0 {
logger.Errorf("Mismatch in number of write file handles for inode :%d", f.id)
}

f.writeHandleCount--

// All write fileHandles associated with bwh are closed. So safe to set bwh to nil.
if f.writeHandleCount == 0 {
f.bwh = nil
}
}

// LOCKS_REQUIRED(f.mu)
func (f *FileInode) Destroy() (err error) {
f.destroyed = true
Expand Down Expand Up @@ -802,6 +837,11 @@ func (f *FileInode) CreateBufferedOrTempWriter(ctx context.Context) (err error)
}

func (f *FileInode) ensureBufferedWriteHandler(ctx context.Context) error {
// bwh already initialized, do nothing.
if f.bwh != nil {
return nil
}

var err error
var latestGcsObj *gcs.Object
if !f.local {
Expand Down
97 changes: 92 additions & 5 deletions internal/fs/inode/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/jacobsa/syncutil"
"github.com/jacobsa/timeutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -1311,7 +1312,7 @@ func (t *FileTest) TestMultipleWritesToLocalFileWhenStreamingWritesAreEnabled()
assert.WithinDuration(t.T(), attrs.Mtime, createTime, Delta)
}

func (t *FileTest) WriteToEmptyGCSFileWhenStreamingWritesAreEnabled() {
func (t *FileTest) TestWriteToEmptyGCSFileWhenStreamingWritesAreEnabled() {
t.createInodeWithEmptyObject()
t.in.config = &cfg.Config{Write: *getWriteConfig()}
createTime := t.in.mtimeClock.Now()
Expand All @@ -1326,11 +1327,11 @@ func (t *FileTest) WriteToEmptyGCSFileWhenStreamingWritesAreEnabled() {
// The inode should agree about the new mtime.
attrs, err := t.in.Attributes(t.ctx)
assert.Nil(t.T(), err)
assert.Equal(t.T(), int64(2), attrs.Size)
assert.Equal(t.T(), uint64(2), attrs.Size)
assert.WithinDuration(t.T(), attrs.Mtime, createTime, Delta)
}

func (t *FileTest) SetMtimeOnEmptyGCSFileWhenStreamingWritesAreEnabled() {
func (t *FileTest) TestSetMtimeOnEmptyGCSFileWhenStreamingWritesAreEnabled() {
t.createInodeWithEmptyObject()
t.in.config = &cfg.Config{Write: *getWriteConfig()}
assert.Nil(t.T(), t.in.bwh)
Expand All @@ -1342,7 +1343,7 @@ func (t *FileTest) SetMtimeOnEmptyGCSFileWhenStreamingWritesAreEnabled() {
assert.Nil(t.T(), t.in.bwh)
}

func (t *FileTest) SetMtimeOnEmptyGCSFileAfterWritesWhenStreamingWritesAreEnabled() {
func (t *FileTest) TestSetMtimeOnEmptyGCSFileAfterWritesWhenStreamingWritesAreEnabled() {
t.createInodeWithEmptyObject()
t.in.config = &cfg.Config{Write: *getWriteConfig()}
assert.Nil(t.T(), t.in.bwh)
Expand All @@ -1351,7 +1352,7 @@ func (t *FileTest) SetMtimeOnEmptyGCSFileAfterWritesWhenStreamingWritesAreEnable
assert.Nil(t.T(), err)
assert.NotNil(t.T(), t.in.bwh)
writeFileInfo := t.in.bwh.WriteFileInfo()
assert.Equal(t.T(), 2, writeFileInfo.TotalSize)
assert.Equal(t.T(), int64(2), writeFileInfo.TotalSize)

// Set mtime.
mtime := time.Now().UTC().Add(123 * time.Second)
Expand All @@ -1366,6 +1367,92 @@ func (t *FileTest) SetMtimeOnEmptyGCSFileAfterWritesWhenStreamingWritesAreEnable
assert.Equal(t.T(), attrs.Atime, mtime)
}

func (t *FileTest) TestRegisterFileHandle() {
tbl := []struct {
name string
readonly bool
currentVal int32
expectedVal int32
}{
{
name: "ReadOnlyHandle",
readonly: true,
currentVal: 0,
expectedVal: 0,
},
{
name: "ZeroCurrentValueForWriteHandle",
readonly: false,
currentVal: 0,
expectedVal: 1,
},
{
name: "NonZeroCurrentValueForWriteHandle",
readonly: false,
currentVal: 5,
expectedVal: 6,
},
}
for _, tc := range tbl {
t.Run(tc.name, func() {
t.in.writeHandleCount = tc.currentVal

t.in.RegisterFileHandle(tc.readonly)

assert.Equal(t.T(), tc.expectedVal, t.in.writeHandleCount)
})
}
}

func (t *FileTest) TestDeRegisterFileHandle() {
tbl := []struct {
name string
readonly bool
currentVal int32
expectedVal int32
isBwhNil bool
}{
{
name: "ReadOnlyHandle",
readonly: true,
currentVal: 10,
expectedVal: 10,
isBwhNil: false,
},
{
name: "NonZeroCurrentValueForWriteHandle",
readonly: false,
currentVal: 10,
expectedVal: 9,
isBwhNil: false,
},
{
name: "LastWriteHandleToDeregister",
readonly: false,
currentVal: 1,
expectedVal: 0,
isBwhNil: true,
},
}
for _, tc := range tbl {
t.Run(tc.name, func() {
t.in.config = &cfg.Config{Write: *getWriteConfig()}
t.in.writeHandleCount = tc.currentVal
err := t.in.ensureBufferedWriteHandler(t.ctx)
require.NoError(t.T(), err)

t.in.DeRegisterFileHandle(tc.readonly)

assert.Equal(t.T(), tc.expectedVal, t.in.writeHandleCount)
if tc.isBwhNil {
assert.Nil(t.T(), t.in.bwh)
} else {
assert.NotNil(t.T(), t.in.bwh)
}
})
}
}

func getWriteConfig() *cfg.WriteConfig {
return &cfg.WriteConfig{
MaxBlocksPerFile: 10,
Expand Down
Loading