From 120b9759fe77a94adca9e5579672c5a43bf3cc32 Mon Sep 17 00:00:00 2001 From: Cassondra Foesch Date: Mon, 28 Jun 2021 15:07:47 +0000 Subject: [PATCH 1/2] concurrent reads: use sequential requests for ReadAt as well --- client.go | 56 ++++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 47 insertions(+), 9 deletions(-) diff --git a/client.go b/client.go index aa17a392..c7868f3e 100644 --- a/client.go +++ b/client.go @@ -1028,7 +1028,17 @@ func (f *File) ReadAt(b []byte, off int64) (int, error) { cancel := make(chan struct{}) + concurrency := len(b)/f.c.maxPacket + 1 + if concurrency > f.c.maxConcurrentRequests || concurrency < 1 { + concurrency = f.c.maxConcurrentRequests + } + + resPool := newResChanPool(concurrency) + type work struct { + id uint32 + res chan result + b []byte off int64 } @@ -1048,8 +1058,18 @@ func (f *File) ReadAt(b []byte, off int64) (int, error) { rb = rb[:chunkSize] } + id := f.c.nextID() + res := resPool.Get() + + f.c.dispatchRequest(res, &sshFxpReadPacket{ + ID: id, + Handle: f.handle, + Offset: uint64(offset), + Len: uint32(chunkSize), + }) + select { - case workCh <- work{rb, offset}: + case workCh <- work{id, res, rb, offset}: case <-cancel: return } @@ -1065,11 +1085,6 @@ func (f *File) ReadAt(b []byte, off int64) (int, error) { } errCh := make(chan rErr) - concurrency := len(b)/f.c.maxPacket + 1 - if concurrency > f.c.maxConcurrentRequests || concurrency < 1 { - concurrency = f.c.maxConcurrentRequests - } - var wg sync.WaitGroup wg.Add(concurrency) for i := 0; i < concurrency; i++ { @@ -1077,10 +1092,33 @@ func (f *File) ReadAt(b []byte, off int64) (int, error) { go func() { defer wg.Done() - ch := make(chan result, 1) // reusable channel per mapper. - for packet := range workCh { - n, err := f.readChunkAt(ch, packet.b, packet.off) + var n int + + s := <-packet.res + resPool.Put(packet.res) + + err := s.err + if err == nil { + switch s.typ { + case sshFxpStatus: + err = normaliseError(unmarshalStatus(packet.id, s.data)) + + case sshFxpData: + sid, data := unmarshalUint32(s.data) + if packet.id != sid { + err = &unexpectedIDErr{packet.id, sid} + + } else { + l, data := unmarshalUint32(data) + n = copy(packet.b, data[:l]) + } + + default: + err = unimplementedPacketErr(s.typ) + } + } + if err != nil { // return the offset as the start + how much we read before the error. errCh <- rErr{packet.off + int64(n), err} From 6617a3a1aa546aebee58f9b22758abe17378e5da Mon Sep 17 00:00:00 2001 From: Cassondra Foesch Date: Mon, 28 Jun 2021 18:27:13 +0000 Subject: [PATCH 2/2] [bugfix] short reads indicate EOF --- client.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/client.go b/client.go index c7868f3e..a1aad7b3 100644 --- a/client.go +++ b/client.go @@ -1112,6 +1112,13 @@ func (f *File) ReadAt(b []byte, off int64) (int, error) { } else { l, data := unmarshalUint32(data) n = copy(packet.b, data[:l]) + + // For normal disk files, it is guaranteed that this will read + // the specified number of bytes, or up to end of file. + // This implies, if we have a short read, that means EOF. + if n < len(packet.b) { + err = io.EOF + } } default: