Skip to content

Commit

Permalink
Merge pull request #482 from pkg/patch/sequential-concurrent-write-re…
Browse files Browse the repository at this point in the history
…quests

Sequentially issue write requests, process results concurrently
  • Loading branch information
puellanivis authored Dec 15, 2021
2 parents 7adab6c + 84714f9 commit e0c1059
Showing 1 changed file with 69 additions and 25 deletions.
94 changes: 69 additions & 25 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1461,11 +1461,20 @@ func (f *File) writeAtConcurrent(b []byte, off int64) (int, error) {
cancel := make(chan struct{})

type work struct {
b []byte
id uint32
res chan result

off int64
}
workCh := make(chan work)

concurrency := len(b)/f.c.maxPacket + 1
if concurrency > f.c.maxConcurrentRequests || concurrency < 1 {
concurrency = f.c.maxConcurrentRequests
}

pool := newResChanPool(concurrency)

// Slice: cut up the Read into any number of buffers of length <= f.c.maxPacket, and at appropriate offsets.
go func() {
defer close(workCh)
Expand All @@ -1479,8 +1488,20 @@ func (f *File) writeAtConcurrent(b []byte, off int64) (int, error) {
wb = wb[:chunkSize]
}

id := f.c.nextID()
res := pool.Get()
off := off + int64(read)

f.c.dispatchRequest(res, &sshFxpWritePacket{
ID: id,
Handle: f.handle,
Offset: uint64(off),
Length: uint32(len(wb)),
Data: wb,
})

select {
case workCh <- work{wb, off + int64(read)}:
case workCh <- work{id, res, off}:
case <-cancel:
return
}
Expand All @@ -1495,25 +1516,29 @@ func (f *File) writeAtConcurrent(b []byte, off int64) (int, error) {
}
errCh := make(chan wErr)

concurrency := len(b)/f.c.maxPacket + 1
if concurrency > f.c.maxConcurrentRequests || concurrency < 1 {
concurrency = f.c.maxConcurrentRequests
}

var wg sync.WaitGroup
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
// Map_i: each worker gets work, and does the Write from each buffer to its respective offset.
go func() {
defer wg.Done()

ch := make(chan result, 1) // reusable channel per mapper.
for work := range workCh {
s := <-work.res
pool.Put(work.res)

err := s.err
if err == nil {
switch s.typ {
case sshFxpStatus:
err = normaliseError(unmarshalStatus(work.id, s.data))
default:
err = unimplementedPacketErr(s.typ)
}
}

for packet := range workCh {
n, err := f.writeChunkAt(ch, packet.b, packet.off)
if err != nil {
// return the offset as the start + how much we wrote before the error.
errCh <- wErr{packet.off + int64(n), err}
errCh <- wErr{work.off, err}
}
}
}()
Expand Down Expand Up @@ -1598,8 +1623,9 @@ func (f *File) ReadFromWithConcurrency(r io.Reader, concurrency int) (read int64
cancel := make(chan struct{})

type work struct {
b []byte
n int
id uint32
res chan result

off int64
}
workCh := make(chan work)
Expand All @@ -1614,24 +1640,34 @@ func (f *File) ReadFromWithConcurrency(r io.Reader, concurrency int) (read int64
concurrency = f.c.maxConcurrentRequests
}

pool := newBufPool(concurrency, f.c.maxPacket)
pool := newResChanPool(concurrency)

// Slice: cut up the Read into any number of buffers of length <= f.c.maxPacket, and at appropriate offsets.
go func() {
defer close(workCh)

b := make([]byte, f.c.maxPacket)
off := f.offset

for {
b := pool.Get()

n, err := r.Read(b)

if n > 0 {
read += int64(n)

id := f.c.nextID()
res := pool.Get()

f.c.dispatchRequest(res, &sshFxpWritePacket{
ID: id,
Handle: f.handle,
Offset: uint64(off),
Length: uint32(n),
Data: b,
})

select {
case workCh <- work{b, n, off}:
// We need the pool.Put(b) to put the whole slice, not just trunced.
case workCh <- work{id, res, off}:
case <-cancel:
return
}
Expand All @@ -1655,15 +1691,23 @@ func (f *File) ReadFromWithConcurrency(r io.Reader, concurrency int) (read int64
go func() {
defer wg.Done()

ch := make(chan result, 1) // reusable channel per mapper.
for work := range workCh {
s := <-work.res
pool.Put(work.res)

err := s.err
if err == nil {
switch s.typ {
case sshFxpStatus:
err = normaliseError(unmarshalStatus(work.id, s.data))
default:
err = unimplementedPacketErr(s.typ)
}
}

for packet := range workCh {
n, err := f.writeChunkAt(ch, packet.b[:packet.n], packet.off)
if err != nil {
// return the offset as the start + how much we wrote before the error.
errCh <- rwErr{packet.off + int64(n), err}
errCh <- rwErr{work.off, err}
}
pool.Put(packet.b)
}
}()
}
Expand Down

0 comments on commit e0c1059

Please sign in to comment.