Skip to content

Commit

Permalink
Merge pull request #408 from drakkan/345
Browse files Browse the repository at this point in the history
add an option to disable concurrent reads
  • Loading branch information
drakkan authored Mar 5, 2021
2 parents a889618 + c539fdb commit f50ad19
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 17 deletions.
54 changes: 52 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,24 @@ func UseConcurrentWrites(value bool) ClientOption {
}
}

// UseConcurrentReads allows the Client to perform concurrent Reads.
//
// Concurrent reads are generally safe to use and not using them will degrade
// performance, so this option is enabled by default.
//
// When enabled, WriteTo will use Stat/Fstat to get the file size and determines
// how many concurrent workers to use.
// Some "read once" servers will delete the file if they receive a stat call on an
// open file and then the download will fail.
// Disabling concurrent reads you will be able to download files from these servers.
// If concurrent reads are disabled, the UseFstat option is ignored.
func UseConcurrentReads(value bool) ClientOption {
return func(c *Client) error {
c.disableConcurrentReads = !value
return nil
}
}

// UseFstat sets whether to use Fstat or Stat when File.WriteTo is called
// (usually when copying files).
// Some servers limit the amount of open files and calling Stat after opening
Expand Down Expand Up @@ -152,8 +170,9 @@ type Client struct {

// write concurrency is… error prone.
// Default behavior should be to not use it.
useConcurrentWrites bool
useFstat bool
useConcurrentWrites bool
useFstat bool
disableConcurrentReads bool
}

// NewClient creates a new SFTP client on conn, using zero or more option
Expand Down Expand Up @@ -947,6 +966,29 @@ func (f *File) readChunkAt(ch chan result, b []byte, off int64) (n int, err erro
return
}

func (f *File) readAtSequential(b []byte, off int64) (read int, err error) {
for read < len(b) {
rb := b[read:]
if len(rb) > f.c.maxPacket {
rb = rb[:f.c.maxPacket]
}
n, err := f.readChunkAt(nil, rb, off+int64(read))
if n < 0 {
panic("sftp.File: returned negative count from readChunkAt")
}
if n > 0 {
read += n
}
if err != nil {
if errors.Is(err, io.EOF) {
return read, nil // return nil explicitly.
}
return read, err
}
}
return read, nil
}

// ReadAt reads up to len(b) byte from the File at a given offset `off`. It returns
// the number of bytes read and an error, if any. ReadAt follows io.ReaderAt semantics,
// so the file offset is not altered during the read.
Expand All @@ -957,6 +999,10 @@ func (f *File) ReadAt(b []byte, off int64) (int, error) {
return f.readChunkAt(nil, b, off)
}

if f.c.disableConcurrentReads {
return f.readAtSequential(b, off)
}

// Split the read into multiple maxPacket-sized concurrent reads bounded by maxConcurrentRequests.
// This allows writes with a suitably large buffer to transfer data at a much faster rate
// by overlapping round trip times.
Expand Down Expand Up @@ -1098,6 +1144,10 @@ func (f *File) WriteTo(w io.Writer) (written int64, err error) {
f.mu.Lock()
defer f.mu.Unlock()

if f.c.disableConcurrentReads {
return f.writeToSequential(w)
}

// For concurrency, we want to guess how many concurrent workers we should use.
var fileSize uint64
if f.c.useFstat {
Expand Down
94 changes: 79 additions & 15 deletions client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1115,6 +1115,67 @@ func TestClientReadSimple(t *testing.T) {
}
}

func TestClientReadSequential(t *testing.T) {
sftp, cmd := testClient(t, READONLY, NODELAY)
defer cmd.Wait()
defer sftp.Close()

sftp.disableConcurrentReads = true
d, err := ioutil.TempDir("", "sftptest-readsequential")
require.NoError(t, err)

defer os.RemoveAll(d)

f, err := ioutil.TempFile(d, "read-sequential-test")
require.NoError(t, err)
fname := f.Name()
content := []byte("hello world")
f.Write(content)
f.Close()

for _, maxPktSize := range []int{1, 2, 3, 4} {
sftp.maxPacket = maxPktSize

sftpFile, err := sftp.Open(fname)
require.NoError(t, err)

stuff := make([]byte, 32)
n, err := sftpFile.Read(stuff)
require.NoError(t, err)
require.Equal(t, len(content), n)
require.Equal(t, content, stuff[0:len(content)])

err = sftpFile.Close()
require.NoError(t, err)

sftpFile, err = sftp.Open(fname)
require.NoError(t, err)

stuff = make([]byte, 5)
n, err = sftpFile.Read(stuff)
require.NoError(t, err)
require.Equal(t, len(stuff), n)
require.Equal(t, content[:len(stuff)], stuff)

err = sftpFile.Close()
require.NoError(t, err)

// now read from a offset
off := int64(3)
sftpFile, err = sftp.Open(fname)
require.NoError(t, err)

stuff = make([]byte, 5)
n, err = sftpFile.ReadAt(stuff, off)
require.NoError(t, err)
require.Equal(t, len(stuff), n)
require.Equal(t, content[off:off+int64(len(stuff))], stuff)

err = sftpFile.Close()
require.NoError(t, err)
}
}

func TestClientReadDir(t *testing.T) {
sftp1, cmd1 := testClient(t, READONLY, NODELAY)
sftp2, cmd2 := testClientGoSvr(t, READONLY, NODELAY)
Expand Down Expand Up @@ -1219,21 +1280,24 @@ func TestClientRead(t *testing.T) {
}
defer os.RemoveAll(d)

for _, tt := range clientReadTests {
f, err := ioutil.TempFile(d, "read-test")
if err != nil {
t.Fatal(err)
}
defer f.Close()
hash := writeN(t, f, tt.n)
f2, err := sftp.Open(f.Name())
if err != nil {
t.Fatal(err)
}
defer f2.Close()
hash2, n := readHash(t, f2)
if hash != hash2 || tt.n != n {
t.Errorf("Read: hash: want: %q, got %q, read: want: %v, got %v", hash, hash2, tt.n, n)
for _, disableConcurrentReads := range []bool{true, false} {
for _, tt := range clientReadTests {
f, err := ioutil.TempFile(d, "read-test")
if err != nil {
t.Fatal(err)
}
defer f.Close()
hash := writeN(t, f, tt.n)
sftp.disableConcurrentReads = disableConcurrentReads
f2, err := sftp.Open(f.Name())
if err != nil {
t.Fatal(err)
}
defer f2.Close()
hash2, n := readHash(t, f2)
if hash != hash2 || tt.n != n {
t.Errorf("Read: hash: want: %q, got %q, read: want: %v, got %v", hash, hash2, tt.n, n)
}
}
}
}
Expand Down

0 comments on commit f50ad19

Please sign in to comment.