Skip to content

Commit

Permalink
SplitHTTP: Do not produce too large upload (#3691)
Browse files Browse the repository at this point in the history
  • Loading branch information
mmmray authored Aug 17, 2024
1 parent 1562e1f commit 160316d
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 6 deletions.
46 changes: 41 additions & 5 deletions transport/internet/splithttp/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,11 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me

httpClient := getHTTPClient(ctx, dest, streamSettings)

uploadPipeReader, uploadPipeWriter := pipe.New(pipe.WithSizeLimit(scMaxEachPostBytes.roll()))
maxUploadSize := scMaxEachPostBytes.roll()
// WithSizeLimit(0) will still allow single bytes to pass, and a lot of
// code relies on this behavior. Subtract 1 so that together with
// uploadWriter wrapper, exact size limits can be enforced
uploadPipeReader, uploadPipeWriter := pipe.New(pipe.WithSizeLimit(maxUploadSize - 1))

go func() {
requestsLimiter := semaphore.New(int(scMaxConcurrentPosts.roll()))
Expand Down Expand Up @@ -318,16 +322,48 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
},
}

// necessary in order to send larger chunks in upload
bufferedUploadPipeWriter := buf.NewBufferedWriter(uploadPipeWriter)
bufferedUploadPipeWriter.SetBuffered(false)
writer := uploadWriter{
uploadPipeWriter,
maxUploadSize,
}

conn := splitConn{
writer: bufferedUploadPipeWriter,
writer: writer,
reader: lazyDownload,
remoteAddr: remoteAddr,
localAddr: localAddr,
}

return stat.Connection(&conn), nil
}

// A wrapper around pipe that ensures the size limit is exactly honored.
//
// The MultiBuffer pipe accepts any single WriteMultiBuffer call even if that
// single MultiBuffer exceeds the size limit, and then starts blocking on the
// next WriteMultiBuffer call. This means that ReadMultiBuffer can return more
// bytes than the size limit. We work around this by splitting a potentially
// too large write up into multiple.
type uploadWriter struct {
*pipe.Writer
maxLen int32
}

func (w uploadWriter) Write(b []byte) (int, error) {
capacity := int(w.maxLen - w.Len())
if capacity > 0 && capacity < len(b) {
b = b[:capacity]
}

buffer := buf.New()
n, err := buffer.Write(b)
if err != nil {
return 0, err
}

err = w.WriteMultiBuffer([]*buf.Buffer{buffer})
if err != nil {
return 0, err
}
return n, nil
}
56 changes: 55 additions & 1 deletion transport/internet/splithttp/splithttp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func Test_queryString(t *testing.T) {
ctx := context.Background()
streamSettings := &internet.MemoryStreamConfig{
ProtocolName: "splithttp",
ProtocolSettings: &Config{Path: "sh"},
ProtocolSettings: &Config{Path: "sh?ed=2048"},
}
conn, err := Dial(ctx, net.TCPDestination(net.DomainAddress("localhost"), listenPort), streamSettings)

Expand All @@ -407,3 +407,57 @@ func Test_queryString(t *testing.T) {
common.Must(conn.Close())
common.Must(listen.Close())
}

func Test_maxUpload(t *testing.T) {
listenPort := tcp.PickPort()
streamSettings := &internet.MemoryStreamConfig{
ProtocolName: "splithttp",
ProtocolSettings: &Config{
Path: "/sh",
ScMaxEachPostBytes: &RandRangeConfig{
From: 100,
To: 100,
},
},
}

var uploadSize int
listen, err := ListenSH(context.Background(), net.LocalHostIP, listenPort, streamSettings, func(conn stat.Connection) {
go func(c stat.Connection) {
defer c.Close()
var b [1024]byte
c.SetReadDeadline(time.Now().Add(2 * time.Second))
n, err := c.Read(b[:])
if err != nil {
return
}

uploadSize = n

common.Must2(c.Write([]byte("Response")))
}(conn)
})
common.Must(err)
ctx := context.Background()

conn, err := Dial(ctx, net.TCPDestination(net.DomainAddress("localhost"), listenPort), streamSettings)

// send a slightly too large upload
var upload [101]byte
_, err = conn.Write(upload[:])
common.Must(err)

var b [1024]byte
n, _ := io.ReadFull(conn, b[:])
fmt.Println("string is", n)
if string(b[:n]) != "Response" {
t.Error("response: ", string(b[:n]))
}
common.Must(conn.Close())

if uploadSize > 100 || uploadSize == 0 {
t.Error("incorrect upload size: ", uploadSize)
}

common.Must(listen.Close())
}
8 changes: 8 additions & 0 deletions transport/pipe/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ var (
errSlowDown = errors.New("slow down")
)

func (p *pipe) Len() int32 {
data := p.data
if data == nil {
return 0
}
return data.Len()
}

func (p *pipe) getState(forRead bool) error {
switch p.state {
case open:
Expand Down
4 changes: 4 additions & 0 deletions transport/pipe/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ func (w *Writer) Close() error {
return w.pipe.Close()
}

func (w *Writer) Len() int32 {
return w.pipe.Len()
}

// Interrupt implements common.Interruptible.
func (w *Writer) Interrupt() {
w.pipe.Interrupt()
Expand Down

0 comments on commit 160316d

Please sign in to comment.