Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: advanced split options [BREAKING] #311

Merged
merged 14 commits into from
Nov 7, 2024
15 changes: 8 additions & 7 deletions transport/split/stream_dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,21 @@ import (
"github.com/Jigsaw-Code/outline-sdk/transport"
)

// splitDialer is a [transport.StreamDialer] that implements the split strategy.
// Use [NewStreamDialer] to create new instances.
type splitDialer struct {
dialer transport.StreamDialer
splitPoint int64
dialer transport.StreamDialer
nextSplit SplitIterator
}

var _ transport.StreamDialer = (*splitDialer)(nil)

// NewStreamDialer creates a [transport.StreamDialer] that splits the outgoing stream after writing "prefixBytes" bytes
// using [SplitWriter].
func NewStreamDialer(dialer transport.StreamDialer, prefixBytes int64) (transport.StreamDialer, error) {
// NewStreamDialer creates a [transport.StreamDialer] that splits the outgoing stream according to nextSplit.
func NewStreamDialer(dialer transport.StreamDialer, nextSplit SplitIterator) (transport.StreamDialer, error) {
if dialer == nil {
return nil, errors.New("argument dialer must not be nil")
}
return &splitDialer{dialer: dialer, splitPoint: prefixBytes}, nil
return &splitDialer{dialer: dialer, nextSplit: nextSplit}, nil
jyyi1 marked this conversation as resolved.
Show resolved Hide resolved
}

// DialStream implements [transport.StreamDialer].DialStream.
Expand All @@ -43,5 +44,5 @@ func (d *splitDialer) DialStream(ctx context.Context, remoteAddr string) (transp
if err != nil {
return nil, err
}
return transport.WrapConn(innerConn, innerConn, NewWriter(innerConn, d.splitPoint)), nil
return transport.WrapConn(innerConn, innerConn, NewWriter(innerConn, d.nextSplit)), nil
}
108 changes: 92 additions & 16 deletions transport/split/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
)

type splitWriter struct {
writer io.Writer
prefixBytes int64
writer io.Writer
// Bytes until the next split. This must always be > 0, unless splits are done.
nextSplitBytes int64
nextSegmentLength func() int64
}

var _ io.Writer = (*splitWriter)(nil)
Expand All @@ -32,36 +34,110 @@ type splitWriterReaderFrom struct {

var _ io.ReaderFrom = (*splitWriterReaderFrom)(nil)

// NewWriter creates a [io.Writer] that ensures the byte sequence is split at prefixBytes.
// A write will end right after byte index prefixBytes - 1, before a write starting at byte index prefixBytes.
// For example, if you have a write of [0123456789] and prefixBytes = 3, you will get writes [012] and [3456789].
// If the input writer is a [io.ReaderFrom], the output writer will be too.
func NewWriter(writer io.Writer, prefixBytes int64) io.Writer {
sw := &splitWriter{writer, prefixBytes}
// Split Iterator is a function that returns how many bytes until the next split point, or zero if there are no more splits to do.
fortuna marked this conversation as resolved.
Show resolved Hide resolved
type SplitIterator func() int64

// NewFixedSplitIterator is a helper function that returns a [SplitIterator] that returns the input number once, followed by zero.
// This is helpful for when you want to split the stream once in a fixed position.
func NewFixedSplitIterator(n int64) SplitIterator {
return func() int64 {
next := n
n = 0
return next
}
}

// RepeatedSplit represents a split sequence of count segments with bytes length.
type RepeatedSplit struct {
Count int
Bytes int64
}

// NewRepeatedSplitIterator is a helper function that returns a [SplitIterator] that returns split points according to splits.
// The splits input represents pairs of (count, bytes), meaning a sequence of count splits with bytes length.
// This is helpful for when you want to split the stream repeatedly at different positions and lengths.
func NewRepeatedSplitIterator(splits ...RepeatedSplit) SplitIterator {
// Make sure we don't edit the original slice.
cleanSplits := make([]RepeatedSplit, 0, len(splits))
// Remove no-op splits.
for _, split := range splits {
if split.Count > 0 && split.Bytes > 0 {
cleanSplits = append(cleanSplits, split)
}
}
return func() int64 {
if len(cleanSplits) == 0 {
return 0
}
next := cleanSplits[0].Bytes
cleanSplits[0].Count -= 1
if cleanSplits[0].Count == 0 {
cleanSplits = cleanSplits[1:]
}
return next
}
}

// NewWriter creates a split Writer that calls the nextSegmentLength [SplitIterator] to determine the number bytes until the next split
// point until it returns zero.
func NewWriter(writer io.Writer, nextSegmentLength SplitIterator) io.Writer {
sw := &splitWriter{writer: writer, nextSegmentLength: nextSegmentLength}
sw.nextSplitBytes = nextSegmentLength()
if rf, ok := writer.(io.ReaderFrom); ok {
return &splitWriterReaderFrom{sw, rf}
}
return sw
}

// ReadFrom implements io.ReaderFrom.
func (w *splitWriterReaderFrom) ReadFrom(source io.Reader) (int64, error) {
reader := io.MultiReader(io.LimitReader(source, w.prefixBytes), source)
written, err := w.rf.ReadFrom(reader)
w.prefixBytes -= written
var written int64
for w.nextSplitBytes > 0 {
expectedBytes := w.nextSplitBytes
n, err := w.rf.ReadFrom(io.LimitReader(source, expectedBytes))
written += n
w.advance(n)
if err != nil {
return written, err
}
fortuna marked this conversation as resolved.
Show resolved Hide resolved
if n < expectedBytes {
// Source is done before the split happened. Return.
return written, err
}
}
n, err := w.rf.ReadFrom(source)
written += n
w.advance(n)
return written, err
}

func (w *splitWriter) advance(n int64) {
if w.nextSplitBytes == 0 {
// Done with splits: return.
return
}
w.nextSplitBytes -= int64(n)
if w.nextSplitBytes > 0 {
return
}
// Split done, set up the next split.
w.nextSplitBytes = w.nextSegmentLength()
}

// Write implements io.Writer.
func (w *splitWriter) Write(data []byte) (written int, err error) {
if 0 < w.prefixBytes && w.prefixBytes < int64(len(data)) {
written, err = w.writer.Write(data[:w.prefixBytes])
w.prefixBytes -= int64(written)
for 0 < w.nextSplitBytes && w.nextSplitBytes < int64(len(data)) {
jyyi1 marked this conversation as resolved.
Show resolved Hide resolved
dataToSend := data[:w.nextSplitBytes]
n, err := w.writer.Write(dataToSend)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What we discovered during the experiments: it's not enough to just split data into multiple writes.

OS sometimes unites the writes anyways.

smth like https://github.com/hufrea/byedpi/blob/75671fa11c57aa458e87d876efc48b40985ed78f/desync.c#L88 or just a small Sleep is required to make it reliable.

Maybe there's a better way (somehow make the OS buffer sizes smaller?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Were the experiments using Go? I have not seen that happen. I'd appreciate a way to reproduce it before we can add and wait.

In Go each TCPConn.Write will correspond to one write system call. With the NoDelay enabled (the default), it ends up being one write consistently. Make sure you don't have TCP_CORK enabled.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Were the experiments using Go?

yes

With the NoDelay enabled (the default), it ends up being one write consistently. Make sure you don't have TCP_CORK enabled.

I'll try to run this on my machine (I don't remember the details about the test we did IRL)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For TCP sockets, Go will try to send packets ASAP without delays: https://pkg.go.dev/net#TCPConn.SetNoDelay

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing about NoDelay: Documentation says "meaning that data is sent as soon as possible after a Write".

We don't have any guarantees that "as soon is possible" is before the next write. It might happen later.

I took 4801a03, manually configured it to spit 999 times per 1 byte and see the following picture when running fetch:

00:18:59.021904 IP nyanpc.55852 > opennet.ru.http: Flags [S], seq 500399592, win 64240, options [mss 1460,sackOK,TS val 1345057543 ecr 0,nop,wscale 7], length 0
00:18:59.069457 IP opennet.ru.http > nyanpc.55852: Flags [S.], seq 4096694502, ack 500399593, win 28960, options [mss 1452,sackOK,TS val 4132463171 ecr 1345057543,nop,wscale 7], length 0
00:18:59.069476 IP nyanpc.55852 > opennet.ru.http: Flags [.], ack 1, win 502, options [nop,nop,TS val 1345057590 ecr 4132463171], length 0
00:18:59.069708 IP nyanpc.55852 > opennet.ru.http: Flags [P.], seq 1:2, ack 1, win 502, options [nop,nop,TS val 1345057590 ecr 4132463171], length 1: HTTP
00:18:59.069712 IP nyanpc.55852 > opennet.ru.http: Flags [P.], seq 2:4, ack 1, win 502, options [nop,nop,TS val 1345057590 ecr 4132463171], length 2: HTTP
00:18:59.069720 IP nyanpc.55852 > opennet.ru.http: Flags [P.], seq 4:20, ack 1, win 502, options [nop,nop,TS val 1345057590 ecr 4132463171], length 16: HTTP
00:18:59.069770 IP nyanpc.55852 > opennet.ru.http: Flags [P.], seq 20:92, ack 1, win 502, options [nop,nop,TS val 1345057590 ecr 4132463171], length 72: HTTP
00:18:59.117256 IP opennet.ru.http > nyanpc.55852: Flags [.], ack 2, win 227, options [nop,nop,TS val 4132463219 ecr 1345057590], length 0

The first packet is len 1 (expected), but then it's 2, 16, 72.

If I add time.Sleep(time.Millisecond) after the write I see what I expect to see:

00:23:20.956161 IP nyanpc.35648 > opennet.ru.http: Flags [S], seq 3537089940, win 64240, options [mss 1460,sackOK,TS val 1345319477 ecr 0,nop,wscale 7], length 0
00:23:21.003374 IP opennet.ru.http > nyanpc.35648: Flags [S.], seq 1632357611, ack 3537089941, win 28960, options [mss 1452,sackOK,TS val 4132725105 ecr 1345319477,nop,wscale 7], length 0
00:23:21.003389 IP nyanpc.35648 > opennet.ru.http: Flags [.], ack 1, win 502, options [nop,nop,TS val 1345319524 ecr 4132725105], length 0
00:23:21.003585 IP nyanpc.35648 > opennet.ru.http: Flags [P.], seq 1:2, ack 1, win 502, options [nop,nop,TS val 1345319524 ecr 4132725105], length 1: HTTP
00:23:21.004670 IP nyanpc.35648 > opennet.ru.http: Flags [P.], seq 2:3, ack 1, win 502, options [nop,nop,TS val 1345319525 ecr 4132725105], length 1: HTTP
00:23:21.005743 IP nyanpc.35648 > opennet.ru.http: Flags [P.], seq 3:4, ack 1, win 502, options [nop,nop,TS val 1345319526 ecr 4132725105], length 1: HTTP
00:23:21.006810 IP nyanpc.35648 > opennet.ru.http: Flags [P.], seq 4:5, ack 1, win 502, options [nop,nop,TS val 1345319527 ecr 4132725105], length 1: HTTP
00:23:21.007874 IP nyanpc.35648 > opennet.ru.http: Flags [P.], seq 5:6, ack 1, win 502, options [nop,nop,TS val 1345319529 ecr 4132725105], length 1: HTTP
00:23:21.008934 IP nyanpc.35648 > opennet.ru.http: Flags [P.], seq 6:7, ack 1, win 502, options [nop,nop,TS val 1345319530 ecr 4132725105], length 1: HTTP
00:23:21.010007 IP nyanpc.35648 > opennet.ru.http: Flags [P.], seq 7:8, ack 1, win 502, options [nop,nop,TS val 1345319531 ecr 4132725105], length 1: HTTP
00:23:21.011070 IP nyanpc.35648 > opennet.ru.http: Flags [P.], seq 8:9, ack 1, win 502, options [nop,nop,TS val 1345319532 ecr 4132725105], length 1: HTTP
00:23:21.012130 IP nyanpc.35648 > opennet.ru.http: Flags [P.], seq 9:10, ack 1, win 502, options [nop,nop,TS val 1345319533 ecr 4132725105], length 1: HTTP
00:23:21.013168 IP nyanpc.35648 > opennet.ru.http: Flags [P.], seq 10:11, ack 1, win 502, options [nop,nop,TS val 1345319534 ecr 4132725105], length 1: HTTP
00:23:21.051078 IP opennet.ru.http > nyanpc.35648: Flags [.], ack 2, win 227, options [nop,nop,TS val 4132725152 ecr 1345319524], length 0
00:23:21.051084 IP nyanpc.35648 > opennet.ru.http: Flags [P.], seq 11:46, ack 1, win 502, options [nop,nop,TS val 1345319572 ecr 4132725152], length 35: HTTP
00:23:21.051205 IP nyanpc.35648 > opennet.ru.http: Flags [P.], seq 46:47, ack 1, win 502, options [nop,nop,TS val 1345319572 ecr 4132725152], length 1: HTTP
00:23:21.051745 IP opennet.ru.http > nyanpc.35648: Flags [.], ack 3, win 227, options [nop,nop,TS val 4132725153 ecr 1345319525], length 0
00:23:21.052274 IP nyanpc.35648 > opennet.ru.http: Flags [P.], seq 47:48, ack 1, win 502, options [nop,nop,TS val 1345319573 ecr 4132725153], length 1: HTTP
00:23:21.052719 IP opennet.ru.http > nyanpc.35648: Flags [.], ack 4, win 227, options [nop,nop,TS val 4132725154 ecr 1345319526], length 0
00:23:21.053341 IP nyanpc.35648 > opennet.ru.http: Flags [P.], seq 48:49, ack 1, win 502, options [nop,nop,TS val 1345319574 ecr 4132725154], length 1: HTTP
00:23:21.053974 IP opennet.ru.http > nyanpc.35648: Flags [.], ack 5, win 227, options [nop,nop,TS val 4132725155 ecr 1345319527], length 0
00:23:21.054405 IP nyanpc.35648 > opennet.ru.http: Flags [P.], seq 49:50, ack 1, win 502, options [nop,nop,TS val 1345319575 ecr 4132725155], length 1: HTTP
00:23:21.055214 IP opennet.ru.http > nyanpc.35648: Flags [.], ack 6, win 227, options [nop,nop,TS val 4132725157 ecr 1345319529], length 0
00:23:21.055464 IP nyanpc.35648 > opennet.ru.http: Flags [P.], seq 50:51, ack 1, win 502, options [nop,nop,TS val 1345319576 ecr 4132725157], length 1: HTTP
00:23:21.056222 IP opennet.ru.http > nyanpc.35648: Flags [.], ack 7, win 227, options [nop,nop,TS val 4132725158 ecr 1345319530], length 0
00:23:21.056497 IP nyanpc.35648 > opennet.ru.http: Flags [P.], seq 51:52, ack 1, win 502, options [nop,nop,TS val 1345319577 ecr 4132725158], length 1: HTTP

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added 100us sleep. I wonder if that's enough or if you need a full millisecond.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any idea why you are getting 35 there?

No idea, likely some kind of congestion control kicking it.

I added 100us sleep. I wonder if that's enough or if you need a full millisecond.

If congestion control is causing this, relying on delays won't be good enough for mobile network with high latencies. Need some experimentation, and maybe putting the delay into config.

Porting https://github.com/hufrea/byedpi/blob/75671fa11c57aa458e87d876efc48b40985ed78f/desync.c#L88 for android should also be possible, but tricky. I wonder if this option is also supported on ios, but there's a solution for windows.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://pkg.go.dev/net#TCPConn.SetWriteBuffer

hmm, is this what we need?

setting write buffer to a relatively low value when doing splits, then maybe restoring it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@derlaft What operating system are you using?

I get it working fine on macOS without the sleep:
image

FYI, I pushed the config changes. Can you try it again with the new code?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

linux

I get it working fine on macOS without the sleep:

looks like mac stack is simpler in that aspect (which is good news, means we won't likely have to do anything for ios as well?)

FYI, I pushed the config changes. Can you try it again with the new code?

sure, will do that today in the evening

written += n
w.advance(int64(n))
if err != nil {
return written, err
}
data = data[written:]
data = data[n:]
}
n, err := w.writer.Write(data)
written += n
w.prefixBytes -= int64(n)
w.advance(int64(n))
return written, err
}
88 changes: 80 additions & 8 deletions transport/split/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,52 @@ func (w *collectWrites) Write(data []byte) (int, error) {

func TestWrite_Split(t *testing.T) {
var innerWriter collectWrites
splitWriter := NewWriter(&innerWriter, 3)
splitWriter := NewWriter(&innerWriter, NewFixedSplitIterator(3))
n, err := splitWriter.Write([]byte("Request"))
require.NoError(t, err)
require.Equal(t, 7, n)
require.Equal(t, [][]byte{[]byte("Req"), []byte("uest")}, innerWriter.writes)
}

func TestWrite_SplitZero(t *testing.T) {
var innerWriter collectWrites
splitWriter := NewWriter(&innerWriter, NewRepeatedSplitIterator(RepeatedSplit{1, 0}, RepeatedSplit{0, 1}, RepeatedSplit{10, 0}, RepeatedSplit{0, 2}))
n, err := splitWriter.Write([]byte("Request"))
require.NoError(t, err)
require.Equal(t, 7, n)
require.Equal(t, [][]byte{[]byte("Request")}, innerWriter.writes)
}

func TestWrite_SplitZeroLong(t *testing.T) {
var innerWriter collectWrites
splitWriter := NewWriter(&innerWriter, NewRepeatedSplitIterator(RepeatedSplit{1, 0}, RepeatedSplit{1_000_000_000_000_000_000, 0}))
n, err := splitWriter.Write([]byte("Request"))
require.NoError(t, err)
require.Equal(t, 7, n)
require.Equal(t, [][]byte{[]byte("Request")}, innerWriter.writes)
}

func TestWrite_SplitZeroPrefix(t *testing.T) {
var innerWriter collectWrites
splitWriter := NewWriter(&innerWriter, NewRepeatedSplitIterator(RepeatedSplit{1, 0}, RepeatedSplit{3, 2}))
n, err := splitWriter.Write([]byte("Request"))
require.NoError(t, err)
require.Equal(t, 7, n)
require.Equal(t, [][]byte{[]byte("Re"), []byte("qu"), []byte("es"), []byte("t")}, innerWriter.writes)
}

func TestWrite_SplitMulti(t *testing.T) {
var innerWriter collectWrites
splitWriter := NewWriter(&innerWriter, NewRepeatedSplitIterator(RepeatedSplit{1, 1}, RepeatedSplit{3, 2}, RepeatedSplit{2, 3}))
n, err := splitWriter.Write([]byte("RequestRequestRequest"))
require.NoError(t, err)
require.Equal(t, 21, n)
require.Equal(t, [][]byte{[]byte("R"), []byte("eq"), []byte("ue"), []byte("st"), []byte("Req"), []byte("ues"), []byte("tRequest")}, innerWriter.writes)
}

func TestWrite_ShortWrite(t *testing.T) {
var innerWriter collectWrites
splitWriter := NewWriter(&innerWriter, 10)
splitWriter := NewWriter(&innerWriter, NewFixedSplitIterator(10))
n, err := splitWriter.Write([]byte("Request"))
require.NoError(t, err)
require.Equal(t, 7, n)
Expand All @@ -56,7 +92,7 @@ func TestWrite_ShortWrite(t *testing.T) {

func TestWrite_Zero(t *testing.T) {
var innerWriter collectWrites
splitWriter := NewWriter(&innerWriter, 0)
splitWriter := NewWriter(&innerWriter, NewFixedSplitIterator(0))
n, err := splitWriter.Write([]byte("Request"))
require.NoError(t, err)
require.Equal(t, 7, n)
Expand All @@ -65,7 +101,7 @@ func TestWrite_Zero(t *testing.T) {

func TestWrite_NeedsTwoWrites(t *testing.T) {
var innerWriter collectWrites
splitWriter := NewWriter(&innerWriter, 5)
splitWriter := NewWriter(&innerWriter, NewFixedSplitIterator(5))
n, err := splitWriter.Write([]byte("Re"))
require.NoError(t, err)
require.Equal(t, 2, n)
Expand All @@ -77,13 +113,37 @@ func TestWrite_NeedsTwoWrites(t *testing.T) {

func TestWrite_Compound(t *testing.T) {
var innerWriter collectWrites
splitWriter := NewWriter(NewWriter(&innerWriter, 4), 1)
splitWriter := NewWriter(NewWriter(&innerWriter, NewFixedSplitIterator(4)), NewFixedSplitIterator(1))
n, err := splitWriter.Write([]byte("Request"))
require.NoError(t, err)
require.Equal(t, 7, n)
require.Equal(t, [][]byte{[]byte("R"), []byte("equ"), []byte("est")}, innerWriter.writes)
}

func TestWrite_RepeatNumber3_SkipBytes5(t *testing.T) {
var innerWriter collectWrites
splitWriter := NewWriter(&innerWriter, NewRepeatedSplitIterator(RepeatedSplit{1, 1}, RepeatedSplit{3, 5}))
n, err := splitWriter.Write([]byte("RequestRequestRequest."))
require.NoError(t, err)
require.Equal(t, 7*3+1, n)
require.Equal(t, [][]byte{
[]byte("R"), // prefix
[]byte("eques"), // split 1
[]byte("tRequ"), // split 2
[]byte("estRe"), // split 3
[]byte("quest."), // tail
}, innerWriter.writes)
}

func TestWrite_RepeatNumber3_SkipBytes0(t *testing.T) {
var innerWriter collectWrites
splitWriter := NewWriter(&innerWriter, NewRepeatedSplitIterator(RepeatedSplit{1, 1}, RepeatedSplit{0, 3}))
n, err := splitWriter.Write([]byte("Request"))
require.NoError(t, err)
require.Equal(t, 7, n)
require.Equal(t, [][]byte{[]byte("R"), []byte("equest")}, innerWriter.writes)
}

// collectReader is a [io.Reader] that appends each Read from the Reader to the reads slice.
type collectReader struct {
io.Reader
Expand All @@ -101,7 +161,7 @@ func (r *collectReader) Read(buf []byte) (int, error) {
}

func TestReadFrom(t *testing.T) {
splitWriter := NewWriter(&bytes.Buffer{}, 3)
splitWriter := NewWriter(&bytes.Buffer{}, NewFixedSplitIterator(3))
rf, ok := splitWriter.(io.ReaderFrom)
require.True(t, ok)

Expand All @@ -118,8 +178,20 @@ func TestReadFrom(t *testing.T) {
require.Equal(t, [][]byte{[]byte("Request2")}, cr.reads)
}

func TestReadFrom_Multi(t *testing.T) {
splitWriter := NewWriter(&bytes.Buffer{}, NewRepeatedSplitIterator(RepeatedSplit{1, 1}, RepeatedSplit{3, 2}, RepeatedSplit{2, 3}))
rf, ok := splitWriter.(io.ReaderFrom)
require.True(t, ok)

cr := &collectReader{Reader: bytes.NewReader([]byte("RequestRequestRequest"))}
n, err := rf.ReadFrom(cr)
require.NoError(t, err)
require.Equal(t, int64(21), n)
require.Equal(t, [][]byte{[]byte("R"), []byte("eq"), []byte("ue"), []byte("st"), []byte("Req"), []byte("ues"), []byte("tRequest")}, cr.reads)
}

func TestReadFrom_ShortRead(t *testing.T) {
splitWriter := NewWriter(&bytes.Buffer{}, 10)
splitWriter := NewWriter(&bytes.Buffer{}, NewFixedSplitIterator(10))
rf, ok := splitWriter.(io.ReaderFrom)
require.True(t, ok)
cr := &collectReader{Reader: bytes.NewReader([]byte("Request1"))}
Expand All @@ -138,7 +210,7 @@ func TestReadFrom_ShortRead(t *testing.T) {
func BenchmarkReadFrom(b *testing.B) {
for n := 0; n < b.N; n++ {
reader := bytes.NewReader(make([]byte, n))
writer := NewWriter(io.Discard, 10)
writer := NewWriter(io.Discard, NewFixedSplitIterator(10))
rf, ok := writer.(io.ReaderFrom)
require.True(b, ok)
_, err := rf.ReadFrom(reader)
Expand Down
Loading