diff --git a/fuse/opcode.go b/fuse/opcode.go index 39d56248..6bf0c53b 100644 --- a/fuse/opcode.go +++ b/fuse/opcode.go @@ -84,7 +84,7 @@ const ( //////////////////////////////////////////////////////////////// -func doInit(server *Server, req *request) { +func doInit(server *protocolServer, req *request) { input := (*InitIn)(req.inData()) if input.Major != _FUSE_KERNEL_VERSION { log.Printf("Major versions does not match. Given %d, want %d\n", input.Major, _FUSE_KERNEL_VERSION) @@ -98,7 +98,6 @@ func doInit(server *Server, req *request) { } kernelFlags := input.Flags64() - server.kernelSettings = *input kernelFlags &= (CAP_ASYNC_READ | CAP_BIG_WRITES | CAP_FILE_OPS | CAP_READDIRPLUS | CAP_NO_OPEN_SUPPORT | CAP_PARALLEL_DIROPS | CAP_MAX_PAGES | CAP_RENAME_SWAP | CAP_PASSTHROUGH) @@ -160,7 +159,7 @@ func doInit(server *Server, req *request) { req.status = OK } -func doOpen(server *Server, req *request) { +func doOpen(server *protocolServer, req *request) { out := (*OpenOut)(req.outData()) status := server.fileSystem.Open(req.cancel, (*OpenIn)(req.inData()), out) req.status = status @@ -169,13 +168,13 @@ func doOpen(server *Server, req *request) { } } -func doCreate(server *Server, req *request) { +func doCreate(server *protocolServer, req *request) { out := (*CreateOut)(req.outData()) status := server.fileSystem.Create(req.cancel, (*CreateIn)(req.inData()), req.filename(), out) req.status = status } -func doReadDir(server *Server, req *request) { +func doReadDir(server *protocolServer, req *request) { in := (*ReadIn)(req.inData()) out := NewDirEntryList(req.outPayload, uint64(in.Offset)) code := server.fileSystem.ReadDir(req.cancel, in, out) @@ -183,7 +182,7 @@ func doReadDir(server *Server, req *request) { req.status = code } -func doReadDirPlus(server *Server, req *request) { +func doReadDirPlus(server *protocolServer, req *request) { in := (*ReadIn)(req.inData()) out := NewDirEntryList(req.outPayload, uint64(in.Offset)) @@ -192,25 +191,25 @@ func doReadDirPlus(server *Server, req *request) { req.status = code } -func doOpenDir(server *Server, req *request) { +func doOpenDir(server *protocolServer, req *request) { out := (*OpenOut)(req.outData()) status := server.fileSystem.OpenDir(req.cancel, (*OpenIn)(req.inData()), out) req.status = status } -func doSetattr(server *Server, req *request) { +func doSetattr(server *protocolServer, req *request) { out := (*AttrOut)(req.outData()) req.status = server.fileSystem.SetAttr(req.cancel, (*SetAttrIn)(req.inData()), out) } -func doWrite(server *Server, req *request) { +func doWrite(server *protocolServer, req *request) { n, status := server.fileSystem.Write(req.cancel, (*WriteIn)(req.inData()), req.inPayload) o := (*WriteOut)(req.outData()) o.Size = n req.status = status } -func doNotifyReply(server *Server, req *request) { +func doNotifyReply(server *protocolServer, req *request) { reply := (*NotifyRetrieveIn)(req.inData()) server.retrieveMu.Lock() reading := server.retrieveTab[reply.Unique] @@ -252,7 +251,7 @@ const _SECURITY_CAPABILITY = "security.capability" const _SECURITY_ACL = "system.posix_acl_access" const _SECURITY_ACL_DEFAULT = "system.posix_acl_default" -func doGetXAttr(server *Server, req *request) { +func doGetXAttr(server *protocolServer, req *request) { if server.opts.DisableXAttrs { req.status = ENOSYS return @@ -296,21 +295,21 @@ func doGetXAttr(server *Server, req *request) { } } -func doGetAttr(server *Server, req *request) { +func doGetAttr(server *protocolServer, req *request) { out := (*AttrOut)(req.outData()) s := server.fileSystem.GetAttr(req.cancel, (*GetAttrIn)(req.inData()), out) req.status = s } // doForget - forget one NodeId -func doForget(server *Server, req *request) { +func doForget(server *protocolServer, req *request) { if !server.opts.RememberInodes { server.fileSystem.Forget(req.inHeader().NodeId, (*ForgetIn)(req.inData()).Nlookup) } } // doBatchForget - forget a list of NodeIds -func doBatchForget(server *Server, req *request) { +func doBatchForget(server *protocolServer, req *request) { in := (*_BatchForgetIn)(req.inData()) wantBytes := uintptr(in.Count) * unsafe.Sizeof(_ForgetOne{}) if uintptr(len(req.inPayload)) < wantBytes { @@ -332,40 +331,40 @@ func doBatchForget(server *Server, req *request) { } } -func doReadlink(server *Server, req *request) { +func doReadlink(server *protocolServer, req *request) { req.outPayload, req.status = server.fileSystem.Readlink(req.cancel, req.inHeader()) } -func doLookup(server *Server, req *request) { +func doLookup(server *protocolServer, req *request) { out := (*EntryOut)(req.outData()) req.status = server.fileSystem.Lookup(req.cancel, req.inHeader(), req.filename(), out) } -func doMknod(server *Server, req *request) { +func doMknod(server *protocolServer, req *request) { out := (*EntryOut)(req.outData()) req.status = server.fileSystem.Mknod(req.cancel, (*MknodIn)(req.inData()), req.filename(), out) } -func doMkdir(server *Server, req *request) { +func doMkdir(server *protocolServer, req *request) { out := (*EntryOut)(req.outData()) req.status = server.fileSystem.Mkdir(req.cancel, (*MkdirIn)(req.inData()), req.filename(), out) } -func doUnlink(server *Server, req *request) { +func doUnlink(server *protocolServer, req *request) { req.status = server.fileSystem.Unlink(req.cancel, req.inHeader(), req.filename()) } -func doRmdir(server *Server, req *request) { +func doRmdir(server *protocolServer, req *request) { req.status = server.fileSystem.Rmdir(req.cancel, req.inHeader(), req.filename()) } -func doLink(server *Server, req *request) { +func doLink(server *protocolServer, req *request) { out := (*EntryOut)(req.outData()) req.status = server.fileSystem.Link(req.cancel, (*LinkIn)(req.inData()), req.filename(), out) } -func doRead(server *Server, req *request) { +func doRead(server *protocolServer, req *request) { in := (*ReadIn)(req.inData()) req.readResult, req.status = server.fileSystem.Read(req.cancel, in, req.outPayload) if fd, ok := req.readResult.(*readResultFd); ok { @@ -375,47 +374,47 @@ func doRead(server *Server, req *request) { } } -func doFlush(server *Server, req *request) { +func doFlush(server *protocolServer, req *request) { req.status = server.fileSystem.Flush(req.cancel, (*FlushIn)(req.inData())) } -func doRelease(server *Server, req *request) { +func doRelease(server *protocolServer, req *request) { server.fileSystem.Release(req.cancel, (*ReleaseIn)(req.inData())) } -func doFsync(server *Server, req *request) { +func doFsync(server *protocolServer, req *request) { req.status = server.fileSystem.Fsync(req.cancel, (*FsyncIn)(req.inData())) } -func doReleaseDir(server *Server, req *request) { +func doReleaseDir(server *protocolServer, req *request) { server.fileSystem.ReleaseDir((*ReleaseIn)(req.inData())) } -func doFsyncDir(server *Server, req *request) { +func doFsyncDir(server *protocolServer, req *request) { req.status = server.fileSystem.FsyncDir(req.cancel, (*FsyncIn)(req.inData())) } -func doSetXAttr(server *Server, req *request) { +func doSetXAttr(server *protocolServer, req *request) { i := bytes.IndexByte(req.inPayload, 0) req.status = server.fileSystem.SetXAttr(req.cancel, (*SetXAttrIn)(req.inData()), string(req.inPayload[:i]), req.inPayload[i+1:]) } -func doRemoveXAttr(server *Server, req *request) { +func doRemoveXAttr(server *protocolServer, req *request) { req.status = server.fileSystem.RemoveXAttr(req.cancel, req.inHeader(), req.filename()) } -func doAccess(server *Server, req *request) { +func doAccess(server *protocolServer, req *request) { req.status = server.fileSystem.Access(req.cancel, (*AccessIn)(req.inData())) } -func doSymlink(server *Server, req *request) { +func doSymlink(server *protocolServer, req *request) { out := (*EntryOut)(req.outData()) n1, n2 := req.filenames() req.status = server.fileSystem.Symlink(req.cancel, req.inHeader(), n2, n1, out) } -func doRename(server *Server, req *request) { +func doRename(server *protocolServer, req *request) { if server.kernelSettings.supportsRenameSwap() { doRename2(server, req) return @@ -429,12 +428,12 @@ func doRename(server *Server, req *request) { req.status = server.fileSystem.Rename(req.cancel, &in, n1, n2) } -func doRename2(server *Server, req *request) { +func doRename2(server *protocolServer, req *request) { n1, n2 := req.filenames() req.status = server.fileSystem.Rename(req.cancel, (*RenameIn)(req.inData()), n1, n2) } -func doStatFs(server *Server, req *request) { +func doStatFs(server *protocolServer, req *request) { out := (*StatfsOut)(req.outData()) req.status = server.fileSystem.StatFs(req.cancel, req.inHeader(), out) if req.status == ENOSYS && runtime.GOOS == "darwin" { @@ -445,51 +444,51 @@ func doStatFs(server *Server, req *request) { } } -func doIoctl(server *Server, req *request) { +func doIoctl(server *protocolServer, req *request) { req.status = Status(syscall.ENOTTY) } -func doDestroy(server *Server, req *request) { +func doDestroy(server *protocolServer, req *request) { req.status = OK } -func doFallocate(server *Server, req *request) { +func doFallocate(server *protocolServer, req *request) { req.status = server.fileSystem.Fallocate(req.cancel, (*FallocateIn)(req.inData())) } -func doGetLk(server *Server, req *request) { +func doGetLk(server *protocolServer, req *request) { req.status = server.fileSystem.GetLk(req.cancel, (*LkIn)(req.inData()), (*LkOut)(req.outData())) } -func doSetLk(server *Server, req *request) { +func doSetLk(server *protocolServer, req *request) { req.status = server.fileSystem.SetLk(req.cancel, (*LkIn)(req.inData())) } -func doSetLkw(server *Server, req *request) { +func doSetLkw(server *protocolServer, req *request) { req.status = server.fileSystem.SetLkw(req.cancel, (*LkIn)(req.inData())) } -func doLseek(server *Server, req *request) { +func doLseek(server *protocolServer, req *request) { in := (*LseekIn)(req.inData()) out := (*LseekOut)(req.outData()) req.status = server.fileSystem.Lseek(req.cancel, in, out) } -func doCopyFileRange(server *Server, req *request) { +func doCopyFileRange(server *protocolServer, req *request) { in := (*CopyFileRangeIn)(req.inData()) out := (*WriteOut)(req.outData()) out.Size, req.status = server.fileSystem.CopyFileRange(req.cancel, in) } -func doInterrupt(server *Server, req *request) { +func doInterrupt(server *protocolServer, req *request) { input := (*InterruptIn)(req.inData()) req.status = server.interruptRequest(input.Unique) } //////////////////////////////////////////////////////////////// -type operationFunc func(*Server, *request) +type operationFunc func(*protocolServer, *request) type castPointerFunc func(unsafe.Pointer) interface{} type operationHandler struct { diff --git a/fuse/poll.go b/fuse/poll.go index 7627bc9e..937d1abb 100644 --- a/fuse/poll.go +++ b/fuse/poll.go @@ -8,7 +8,7 @@ package fuse const pollHackName = ".go-fuse-epoll-hack" const pollHackInode = ^uint64(0) -func doPollHackLookup(ms *Server, req *request) { +func doPollHackLookup(ms *protocolServer, req *request) { attr := Attr{ Ino: pollHackInode, Mode: S_IFREG | 0644, diff --git a/fuse/request.go b/fuse/request.go index 0b5ac440..1bcdd391 100644 --- a/fuse/request.go +++ b/fuse/request.go @@ -189,6 +189,7 @@ func (r *request) inData() unsafe.Pointer { return unsafe.Pointer(&r.inputBuf[0]) } +// note: outSize is without OutHeader func parseRequest(in []byte, kernelSettings *InitIn) (h *operationHandler, inSize, outSize, outPayloadSize int, errno Status) { inSize = int(unsafe.Sizeof(InHeader{})) if len(in) < inSize { @@ -206,7 +207,7 @@ func parseRequest(in []byte, kernelSettings *InitIn) (h *operationHandler, inSiz if h.InputSize > 0 { inSize = int(h.InputSize) } - if hdr.Opcode == _OP_RENAME && kernelSettings.supportsRenameSwap() { + if kernelSettings != nil && hdr.Opcode == _OP_RENAME && kernelSettings.supportsRenameSwap() { inSize = int(unsafe.Sizeof(RenameIn{})) } if hdr.Opcode == _OP_INIT && inSize > len(in) { diff --git a/fuse/server.go b/fuse/server.go index 4e46c6c2..4b53498e 100644 --- a/fuse/server.go +++ b/fuse/server.go @@ -35,12 +35,32 @@ const ( maxMaxReaders = 16 ) +type protocolServer struct { + fileSystem RawFileSystem + + interruptMu sync.Mutex + reqInflight []*request + connectionDead bool + + latencies LatencyMap + + // in-flight notify-retrieve queries + retrieveMu sync.Mutex + retrieveNext uint64 + retrieveTab map[uint64]*retrieveCacheRequest // notifyUnique -> retrieve request + + kernelSettings InitIn + + opts *MountOptions +} + // Server contains the logic for reading from the FUSE device and // translating it to RawFileSystem interface calls. type Server struct { + protocolServer + // Empty if unmounted. mountPoint string - fileSystem RawFileSystem // writeMu serializes close and notify writes writeMu sync.Mutex @@ -48,8 +68,6 @@ type Server struct { // I/O with kernel and daemon. mountFd int - latencies LatencyMap - opts *MountOptions // maxReaders is the maximum number of goroutines reading requests @@ -62,15 +80,9 @@ type Server struct { reqPool sync.Pool // Pool for raw requests data - readPool sync.Pool - reqMu sync.Mutex - reqReaders int - kernelSettings InitIn - - // in-flight notify-retrieve queries - retrieveMu sync.Mutex - retrieveNext uint64 - retrieveTab map[uint64]*retrieveCacheRequest // notifyUnique -> retrieve request + readPool sync.Pool + reqMu sync.Mutex + reqReaders int singleReader bool canSplice bool @@ -82,10 +94,6 @@ type Server struct { // for implementing single threaded processing. requestProcessingMu sync.Mutex - - interruptMu sync.Mutex - reqInflight []*request - connectionDead bool } // SetDebug is deprecated. Use MountOptions.Debug instead. @@ -207,10 +215,13 @@ func NewServer(fs RawFileSystem, mountPoint string, opts *MountOptions) (*Server } ms := &Server{ - fileSystem: fs, + protocolServer: protocolServer{ + fileSystem: fs, + retrieveTab: make(map[uint64]*retrieveCacheRequest), + opts: &o, + }, opts: &o, maxReaders: maxReaders, - retrieveTab: make(map[uint64]*retrieveCacheRequest), singleReader: useSingleReader, ready: make(chan error, 1), } @@ -543,14 +554,14 @@ exit: } } -func (ms *Server) addInflight(req *request) { +func (ms *protocolServer) addInflight(req *request) { ms.interruptMu.Lock() defer ms.interruptMu.Unlock() req.inflightIndex = len(ms.reqInflight) ms.reqInflight = append(ms.reqInflight, req) } -func (ms *Server) dropInflight(req *request) { +func (ms *protocolServer) dropInflight(req *request) { ms.interruptMu.Lock() defer ms.interruptMu.Unlock() this := req.inflightIndex @@ -562,7 +573,7 @@ func (ms *Server) dropInflight(req *request) { ms.reqInflight = ms.reqInflight[:last] } -func (ms *Server) interruptRequest(unique uint64) Status { +func (ms *protocolServer) interruptRequest(unique uint64) Status { ms.interruptMu.Lock() defer ms.interruptMu.Unlock() @@ -578,7 +589,7 @@ func (ms *Server) interruptRequest(unique uint64) Status { return EAGAIN } -func (ms *Server) cancelAll() { +func (ms *protocolServer) cancelAll() { ms.interruptMu.Lock() defer ms.interruptMu.Unlock() ms.connectionDead = true @@ -611,11 +622,10 @@ func (ms *Server) handleRequest(req *requestAlloc) Status { if outPayloadSize > 0 { req.outPayload = ms.buffers.AllocBuffer(uint32(outPayloadSize)) } - ms.innerHandleRequest(h, &req.request) + ms.protocolServer.handleRequest(h, &req.request) if req.suppressReply { return OK } - errno := ms.write(&req.request) if errno != 0 { // Ignore ENOENT for INTERRUPT responses which @@ -623,7 +633,7 @@ func (ms *Server) handleRequest(req *requestAlloc) Status { // known by the kernel. This is a normal if the // referred request already has completed. // - // Ignore ENOENT for RELEASE(DIR) responses. When the FS + // Ignore ENOENT for RELEASE responses. When the FS // is unmounted directly after a file close, the // device can go away while we are still processing // RELEASE. This is because RELEASE is analogous to @@ -639,7 +649,7 @@ func (ms *Server) handleRequest(req *requestAlloc) Status { return errno } -func (ms *Server) innerHandleRequest(h *operationHandler, req *request) { +func (ms *protocolServer) handleRequest(h *operationHandler, req *request) { ms.addInflight(req) defer ms.dropInflight(req)