Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write length and payload separately for double the throughput #388

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2033,7 +2033,7 @@ func benchmarkWrite(b *testing.B, bufsize int, delay time.Duration) {
// open sftp client
sftp, cmd := testClient(b, false, delay)
defer cmd.Wait()
// defer sftp.Close()
defer sftp.Close()

data := make([]byte, size)

Expand Down
86 changes: 62 additions & 24 deletions packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"os"
"reflect"
"sync"

"github.com/pkg/errors"
)
Expand All @@ -26,6 +27,14 @@ const (
debugDumpRxPacketBytes = false
)

type marshalerWithPayload interface {
// marshalWithPayload returns the marshaled representation of its
// receiver, with a variable-length payload field omitted and returned
// separately. The length of that payload should still be marshaled in
// the packet.
marshalWithPayload() (packet, payload []byte)
}

func marshalUint32(b []byte, v uint32) []byte {
return append(b, byte(v>>24), byte(v>>16), byte(v>>8), byte(v))
}
Expand Down Expand Up @@ -116,27 +125,49 @@ func unmarshalStringSafe(b []byte) (string, []byte, error) {
return string(b[:n]), b[n:], nil
}

var lenBufPool = sync.Pool{New: func() interface{} { return new([4]byte) }}

// sendPacket marshals p according to RFC 4234.
func sendPacket(w io.Writer, m encoding.BinaryMarshaler) error {
bb, err := m.MarshalBinary()
if err != nil {
return errors.Errorf("binary marshaller failed: %v", err)
var (
packet, payload []byte
err error
separatePayload bool
)

if mp, ok := m.(marshalerWithPayload); ok {
packet, payload = mp.marshalWithPayload()
separatePayload = true
} else {
packet, err = m.MarshalBinary()
if err != nil {
return errors.Errorf("binary marshaller failed: %v", err)
}
}

if debugDumpTxPacketBytes {
debug("send packet: %s %d bytes %x", fxp(bb[0]), len(bb), bb[1:])
debug("send packet: %s %d bytes %x",
fxp(packet[0]), len(packet), append(packet[1:], payload...))
} else if debugDumpTxPacket {
debug("send packet: %s %d bytes", fxp(bb[0]), len(bb))
debug("send packet: %s %d bytes", fxp(packet[0]), len(packet)+len(payload))
}
// Slide packet down 4 bytes to make room for length header.
packet := append(bb, make([]byte, 4)...) // optimistically assume bb has capacity
copy(packet[4:], bb)
binary.BigEndian.PutUint32(packet[:4], uint32(len(bb)))

_, err = w.Write(packet)
lenBuf := lenBufPool.Get().(*[4]byte)
binary.BigEndian.PutUint32(lenBuf[:], uint32(len(packet)+len(payload)))

_, err = w.Write(lenBuf[:])
if err == nil {
_, err = w.Write(packet)
}
if separatePayload && err == nil {
_, err = w.Write(payload)
}

lenBufPool.Put(lenBuf)
if err != nil {
return errors.Errorf("failed to send packet: %v", err)
}
return nil
return err
}

func recvPacket(r io.Reader, alloc *allocator, orderID uint32) (uint8, []byte, error) {
Expand Down Expand Up @@ -670,20 +701,23 @@ type sshFxpWritePacket struct {

func (p sshFxpWritePacket) id() uint32 { return p.ID }

func (p sshFxpWritePacket) MarshalBinary() ([]byte, error) {
func (p sshFxpWritePacket) marshalWithPayload() (packet, payload []byte) {
l := 1 + 4 + // type(byte) + uint32
4 + len(p.Handle) +
8 + 4 + // uint64 + uint32
len(p.Data)
8 + 4 // uint64 + uint32

b := make([]byte, 0, l)
b = append(b, sshFxpWrite)
b = marshalUint32(b, p.ID)
b = marshalString(b, p.Handle)
b = marshalUint64(b, p.Offset)
b = marshalUint32(b, p.Length)
b = append(b, p.Data...)
return b, nil
return b, p.Data
}

func (p sshFxpWritePacket) MarshalBinary() ([]byte, error) {
b, _ := p.marshalWithPayload()
return append(b, p.Data...), nil
}

func (p *sshFxpWritePacket) UnmarshalBinary(b []byte) error {
Expand Down Expand Up @@ -838,15 +872,19 @@ type sshFxpDataPacket struct {
Data []byte
}

// MarshalBinary encodes the receiver into a binary form and returns the result.
// To avoid a new allocation the Data slice must have a capacity >= Length + 9
func (p sshFxpDataPacket) marshalWithPayload() (packet, payload []byte) {
l := 1 + 4 + 4 // type(byte) + uint32 + uint32

b := make([]byte, 0, l)
b = append(b, sshFxpData)
b = marshalUint32(b, p.ID)
b = marshalUint32(b, p.Length)
return b, p.Data
}

func (p sshFxpDataPacket) MarshalBinary() ([]byte, error) {
b := append(p.Data, make([]byte, 9)...)
copy(b[9:], p.Data[:p.Length])
b[0] = sshFxpData
binary.BigEndian.PutUint32(b[1:5], p.ID)
binary.BigEndian.PutUint32(b[5:9], p.Length)
return b, nil
b, _ := p.marshalWithPayload()
return append(b, p.Data...), nil
}

func (p *sshFxpDataPacket) UnmarshalBinary(b []byte) error {
Expand Down