Skip to content

Commit

Permalink
update reader length after write() (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
davies authored Jan 9, 2021
1 parent d23762a commit 7e13080
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 136 deletions.
12 changes: 6 additions & 6 deletions pkg/fuse/fuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,19 @@ func NewJFS() *JFS {
}
}

func (fs *JFS) replyEntry(out *fuse.EntryOut, entry *meta.Entry) fuse.Status {
out.NodeId = uint64(entry.Inode)
func (fs *JFS) replyEntry(out *fuse.EntryOut, e *meta.Entry) fuse.Status {
out.NodeId = uint64(e.Inode)
out.Generation = 1
out.SetAttrTimeout(fs.attrTimeout)
if entry.Attr.Typ == meta.TypeDirectory {
if e.Attr.Typ == meta.TypeDirectory {
out.SetEntryTimeout(fs.direntryTimeout)
} else {
out.SetEntryTimeout(fs.entryTimeout)
}
if vfs.IsSpecialNode(entry.Inode) {
if vfs.IsSpecialNode(e.Inode) {
out.SetAttrTimeout(time.Hour)
}
attrToStat(entry.Inode, entry.Attr, &out.Attr)
attrToStat(e.Inode, e.Attr, &out.Attr)
return 0
}

Expand Down Expand Up @@ -368,7 +368,7 @@ func (fs *JFS) ReadDirPlus(cancel <-chan struct{}, in *fuse.ReadIn, out *fuse.Di
break
}
if e.Attr.Full {
vfs.UpdateEntry(e)
vfs.UpdateLength(e.Inode, e.Attr)
fs.replyEntry(eo, e)
} else {
eo.Ino = uint64(e.Inode)
Expand Down
35 changes: 10 additions & 25 deletions pkg/vfs/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ type handle struct {
children []*meta.Entry

// for file
length uint64
mode uint8
locks uint8
flockOwner uint64 // kernel 3.1- does not pass lock_owner in release()
reader FileReader
Expand All @@ -51,25 +49,29 @@ type handle struct {
}

func (h *handle) addOp(ctx Context) {
h.Lock()
defer h.Unlock()
h.ops = append(h.ops, ctx)
}

func (h *handle) removeOp(ctx Context) {
h.Lock()
defer h.Unlock()
for i, c := range h.ops {
if c == ctx {
h.ops[i] = h.ops[len(h.ops)-1]
h.ops = h.ops[:len(h.ops)-1]
break
}
}
h.Unlock()
}

func (h *handle) cancelOp(pid uint32) {
if pid == 0 {
return
}
h.Lock()
defer h.Unlock()
for _, c := range h.ops {
if c.Pid() == pid || c.Pid() > 0 && c.Duration() > time.Second {
c.Cancel()
Expand All @@ -86,11 +88,8 @@ func (h *handle) Rlock(ctx Context) bool {
}
}
h.readers++
if h.reader == nil {
h.reader = reader.Open(h.inode, h.length)
}
h.addOp(ctx)
h.Unlock()
h.addOp(ctx)
return true
}

Expand All @@ -115,11 +114,8 @@ func (h *handle) Wlock(ctx Context) bool {
}
h.writers--
h.writing = 1
if h.writer == nil {
h.writer = writer.Open(h.inode, h.length)
}
h.addOp(ctx)
h.Unlock()
h.addOp(ctx)
return true
}

Expand Down Expand Up @@ -186,25 +182,14 @@ func releaseHandle(inode Ino, fh uint64) {
}
}

func updateHandleLength(inode Ino, length uint64) {
hanleLock.Lock()
for _, h := range handles[inode] {
h.Lock()
h.length = length
h.Unlock()
}
hanleLock.Unlock()
}

func newFileHandle(mode uint8, inode Ino, length uint64) uint64 {
h := newHandle(inode)
h.Lock()
defer h.Unlock()
h.length = length
h.mode = mode
if mode == modeRead {
if mode&modeRead != 0 {
h.reader = reader.Open(inode, length)
} else if mode == modeWrite {
}
if mode&modeWrite != 0 {
h.writer = writer.Open(inode, length)
}
return h.fh
Expand Down
32 changes: 29 additions & 3 deletions pkg/vfs/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ type FileReader interface {
}

type DataReader interface {
Open(inode Ino, fleng uint64) FileReader
Open(inode Ino, length uint64) FileReader
Truncate(inode Ino, length uint64)
}

type frange struct {
Expand Down Expand Up @@ -298,6 +299,7 @@ type fileReader struct {
r *dataReader
}

// protected by f
func (f *fileReader) newSlice(block *frange) *sliceReader {
s := &sliceReader{}
s.file = f
Expand Down Expand Up @@ -498,6 +500,7 @@ func (f *fileReader) splitRange(block *frange) []uint64 {
return ranges
}

// protected by f
func (f *fileReader) readAhead(block *frange) {
f.visit(func(r *sliceReader) {
if r.state.valid() && r.block.off <= block.off && r.block.end() > block.off {
Expand Down Expand Up @@ -639,6 +642,12 @@ func (f *fileReader) Read(ctx meta.Context, offset uint64, buf []byte) (int, sys
return f.waitForIO(ctx, reqs, buf)
}

func (f *fileReader) Truncate(length uint64) {
f.Lock()
f.length = length
f.Unlock()
}

func (f *fileReader) visit(fn func(s *sliceReader)) {
var next *sliceReader
for s := f.slices; s != nil; s = next {
Expand Down Expand Up @@ -690,11 +699,11 @@ func NewDataReader(conf *Config, m meta.Meta, store chunk.ChunkStore) DataReader
}
}

func (r *dataReader) Open(inode Ino, len uint64) FileReader {
func (r *dataReader) Open(inode Ino, length uint64) FileReader {
f := &fileReader{
r: r,
inode: inode,
length: len,
length: length,
}
f.last = &(f.slices)

Expand All @@ -706,6 +715,23 @@ func (r *dataReader) Open(inode Ino, len uint64) FileReader {
return f
}

func (r *dataReader) Truncate(inode Ino, length uint64) {
// r could be hold inside f, so Unlock r first to avoid deadlock
r.Lock()
var fs []*fileReader
f := r.files[inode]
for f != nil {
fs = append(fs, f)
f = f.next
}
r.Unlock()
for _, f := range fs {
f.Lock()
f.length = length
f.Unlock()
}
}

func (r *dataReader) readSlice(ctx context.Context, s *meta.Slice, page *chunk.Page, off int) error {
buf := page.Data
read := 0
Expand Down
Loading

0 comments on commit 7e13080

Please sign in to comment.