Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ReaderFrom/WriterTo for Linux (splice) #29

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 251 additions & 0 deletions raw_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
/*
Copyright The containerd Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package fifo

import (
"io"
"sync"
"sync/atomic"
"syscall"

"golang.org/x/sys/unix"
)

const (
spliceMax int64 = 1 << 62
spliceToEOF int64 = -1
)

var (
spliceSupported int32 = 1
bufPool = &sync.Pool{
New: func() interface{} {
buf := make([]byte, 1<<20)
return &buf
},
}
)

func (f *fifo) ReadFrom(r io.Reader) (int64, error) {
if f.flag&(syscall.O_WRONLY|syscall.O_RDWR) == 0 {
return 0, ErrWrToRDONLY
}
select {
case <-f.opened:
return f.readFrom(r)
default:
}
select {
case <-f.opened:
return f.readFrom(r)
Comment on lines +47 to +54
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of trying to read from <-f.opened twice here?

case <-f.closed:
return 0, ErrWriteClosed
}
}

func (f *fifo) readFrom(r io.Reader) (int64, error) {
if atomic.LoadInt32(&spliceSupported) != 1 {
return copyBuffer(f.file, r)
samuelkarp marked this conversation as resolved.
Show resolved Hide resolved
}

remain := spliceToEOF
lr, ok := r.(*io.LimitedReader)
if ok {
remain, r = lr.N, lr.R
if remain <= 0 {
return 0, nil
}
}

rscI, ok := r.(syscall.Conn)
if !ok {
return copyBuffer(f.file, r)
}
Comment on lines +75 to +77
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this error case also need to handle lr != nil?

Suggested change
if !ok {
return copyBuffer(f.file, r)
}
if !ok {
if lr != nil {
r = lr
}
return copyBuffer(f.file, r)
}


rsc, err := rscI.SyscallConn()
if err != nil {
if lr != nil {
r = lr
}
return copyBuffer(f.file, r)
}

wsc, err := f.SyscallConn()
if err != nil {
if lr != nil {
r = lr
}
return copyBuffer(f.file, r)
}

handled, written, err := doRawCopy(rsc, wsc, remain)
if err != nil {
return written, err
}
if !handled {
if lr != nil {
r = lr
}
return copyBuffer(f.file, r)
}

return written, nil
}

func (f *fifo) WriteTo(w io.Writer) (int64, error) {
if f.flag&syscall.O_WRONLY > 0 {
return 0, ErrRdFrmWRONLY
}
select {
case <-f.opened:
return f.writeTo(w)
default:
}

select {
case <-f.opened:
return f.writeTo(w)
Comment on lines +113 to +121
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here about the repeated read from f.opened.

case <-f.closed:
return 0, ErrWriteClosed
}
}

func (f *fifo) writeTo(w io.Writer) (int64, error) {
if atomic.LoadInt32(&spliceSupported) != 1 {
return copyBuffer(w, f.file)
}

wscI, ok := w.(syscall.Conn)
if !ok {
return copyBuffer(w, f.file)
}

wsc, err := wscI.SyscallConn()
if err != nil {
return copyBuffer(w, f.file)
}

rsc, err := f.SyscallConn()
if err != nil {
return copyBuffer(w, f.file)
}

handled, written, err := doRawCopy(rsc, wsc, spliceToEOF)
if err != nil {
return written, err
}
if !handled {
return copyBuffer(w, f.file)
}
return written, err
}

func doRawCopy(rsc, wsc syscall.RawConn, remain int64) (handled bool, written int64, _ error) {
var (
spliceErr error
writeErr error
copyToEOF = remain == spliceToEOF
)

if remain == spliceToEOF {
remain = spliceMax
}

// Hear the RawConn Read/Write methods allow us to utilize the go runtime
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
// Hear the RawConn Read/Write methods allow us to utilize the go runtime
// Here the RawConn Read/Write methods allow us to utilize the go runtime

// poller to wait for the file descriptors to be ready for reads or writes.
//
// Read/Write will sleep the goroutine until the file descriptor is ready.
// Once we are inside the function we've passed in, we know the FD is ready.
//
// Read/Write both run the function they are passed repeatedly until the
// function returns true.
err := rsc.Read(func(rfd uintptr) bool {
err := wsc.Write(func(wfd uintptr) bool {
for copyToEOF || remain > 0 {
// We always use NONBLOCK here. If the file descriptor(s) is not
// opened with O_NONBLOCK then splice just blocks like normal.
// If they opened with O_NONBLOCK, then `unix.Splice` returns
// with EAGAIN when either the read or write would block.
n, err := unix.Splice(int(rfd), nil, int(wfd), nil, int(remain), unix.SPLICE_F_MOVE|unix.SPLICE_F_NONBLOCK)
if n > 0 {
written += n
if !copyToEOF {
remain -= n
}
}

switch err {
case unix.ENOSYS:
// splice not supported on kernel
atomic.StoreInt32(&spliceSupported, 0)
return true
case syscall.EINVAL, syscall.EOPNOTSUPP, syscall.EPERM:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would syscall.EOPNOTSUPP here indicate the process is unable to perform the syscall regardless of whether the kernel supports it? If this doesn't depend on input would it make sense to treat it like ENOSYS?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think EOPNOTSUP is more about the transport than the actual system.
For instance, I'd expect EOPNOTSUP if the read side is a unix socket which does not support splicing... but I have not tested this... it may actually be worth adding tests for this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although the specific case may actually be EINVAL...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair, its not super clear when these standard return values would be returned in this case and better not to set a global value

// In all these cases, there is no data transferred
return true
case nil:
handled = true
if n == 0 {
// At EOF
return true
}
case unix.EINTR:
continue
Comment on lines +199 to +206
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think it'd be slightly more readable to have an explicit continue in here.

Suggested change
case nil:
handled = true
if n == 0 {
// At EOF
return true
}
case unix.EINTR:
continue
case nil:
handled = true
if n == 0 {
// At EOF
return true
}
continue
case unix.EINTR:
continue

Also, what do you think about reordering the switch to order successful cases before errors and have all the errors grouped together? Something like nil, unix.EINTR, unix.EAGAIN, unix.ENOSYS, syscall.EINVAL, syscall.EOPNOTSUPP, syscall.EPERM, default?

case unix.EAGAIN:
// Normally we'd want to return false here, because this just means
// we need it to wait for the fd to be ready again, however we don't know
// which fd needs to be waited on.
// So, break out of the write func and let `Read` return false so we end up
// waiting for both fd's to be ready.
return true
default:
spliceErr = err
handled = true
return true
}
}
return true
})
if err != nil {
writeErr = err
return true
}
if spliceErr != nil {
// I don't like this but it made the linter happy. All hail the mighty linter.
// If splice returned EAGAIN we should return false so we can
// wait for the FD to be ready again. Otherwise we just want to exit
// early.
return spliceErr != unix.EAGAIN
}

return true
})

if spliceErr != nil {
return handled, written, spliceErr
}
if writeErr != nil {
return handled, written, writeErr
}
return handled, written, err
}

func copyBuffer(w io.Writer, r io.Reader) (int64, error) {
buf := bufPool.Get().(*[]byte)
n, err := io.CopyBuffer(w, r, *buf)
bufPool.Put(buf)
return n, err
}
Loading