Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Advanced split options #309

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions transport/split/stream_dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,21 @@ import (
)

type splitDialer struct {
dialer transport.StreamDialer
splitPoint int64
dialer transport.StreamDialer
splitPoint int64
repeatsNumber int64
skipBytes int64
}

var _ transport.StreamDialer = (*splitDialer)(nil)

// NewStreamDialer creates a [transport.StreamDialer] that splits the outgoing stream after writing "prefixBytes" bytes
// using [SplitWriter].
func NewStreamDialer(dialer transport.StreamDialer, prefixBytes int64) (transport.StreamDialer, error) {
// using [SplitWriter]. If "repeatsNumber" is not 0, will split that many times, skipping "skipBytes" in between packets.
func NewStreamDialer(dialer transport.StreamDialer, prefixBytes int64, repeatsNumber int64, skipBytes int64) (transport.StreamDialer, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make splitDialer public (SplitDialer) and return that in this function, and add setters for the repeat and skip. Keep the member variables private so people don't mess with them.

As a design philosophy, we would rather keep the constructor with only the required parameters, with optional options as setters. That keeps the code extensible and simple for those that don't care about the advanced options.

This approach will preserve backwards compatibility as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aargh, the setters don't work as proposed beecause we return different implementations depending on the parameters. Need to rethink...

if dialer == nil {
return nil, errors.New("argument dialer must not be nil")
}
return &splitDialer{dialer: dialer, splitPoint: prefixBytes}, nil
return &splitDialer{dialer: dialer, splitPoint: prefixBytes, repeatsNumber: repeatsNumber, skipBytes: skipBytes}, nil
}

// DialStream implements [transport.StreamDialer].DialStream.
Expand All @@ -43,5 +45,5 @@ func (d *splitDialer) DialStream(ctx context.Context, remoteAddr string) (transp
if err != nil {
return nil, err
}
return transport.WrapConn(innerConn, innerConn, NewWriter(innerConn, d.splitPoint)), nil
return transport.WrapConn(innerConn, innerConn, NewWriter(innerConn, d.splitPoint, d.repeatsNumber, d.skipBytes)), nil
}
38 changes: 28 additions & 10 deletions transport/split/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
)

type splitWriter struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same note as for the Dialer. Let's make this public and return the implementation in the NewWriter call, with setters for the optional parameters.

writer io.Writer
prefixBytes int64
writer io.Writer
prefixBytes int64
repeatsNumberLeft int64 // How many times left to split
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move these comments to the public setters.

skipBytes int64 // When splitting multiple times, how many bytes to skip in between different splittings
}

var _ io.Writer = (*splitWriter)(nil)
Expand All @@ -36,10 +38,19 @@ var _ io.ReaderFrom = (*splitWriterReaderFrom)(nil)
// A write will end right after byte index prefixBytes - 1, before a write starting at byte index prefixBytes.
// For example, if you have a write of [0123456789] and prefixBytes = 3, you will get writes [012] and [3456789].
// If the input writer is a [io.ReaderFrom], the output writer will be too.
func NewWriter(writer io.Writer, prefixBytes int64) io.Writer {
sw := &splitWriter{writer, prefixBytes}
if rf, ok := writer.(io.ReaderFrom); ok {
return &splitWriterReaderFrom{sw, rf}
// If repeatsNumber > 1, then packets will be split multiple times, skipping skipBytes in between splits.
// Example:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the slice points will be in positions prefixBytes + i * skipBytes, for 0 <= i < repeatsNumber.

It may be worth it writing down that formula. It clarifies things.

This formula made me realize that skipBytes and repeatsNumber are not meaningful by themselves, and must always be specified together.

So perhaps we need a setter like SplitWriter.SetMultiSplit(splitBytes, splitCount).
It could be other names, like Enable instead of Set, or Additional, Extra instead of Multi, ... Open to ideas.

I think splitCount in this case should be for the extra blocks, in order to not be confusing. In that case the formula would be:
prefixBytes + i * splitBytes, for 0 <= i <= splitCount. With the default being splitCount == 0.

// prefixBytes = 1
// repeatsNumber = 3
// skipBytes = 6
// Array of [0132456789 10 11 12 13 14 15 16 ...] will become
// [0] [123456] [789 10 11 12] [13 14 15 16 ...]
func NewWriter(writer io.Writer, prefixBytes int64, repeatsNumber int64, skipBytes int64) io.Writer {
sw := &splitWriter{writer, prefixBytes, repeatsNumber, skipBytes}
if repeatsNumber == 0 && skipBytes == 0 {
if rf, ok := writer.(io.ReaderFrom); ok {
return &splitWriterReaderFrom{sw, rf}
}
}
return sw
}
Expand All @@ -52,13 +63,20 @@ func (w *splitWriterReaderFrom) ReadFrom(source io.Reader) (int64, error) {
}

func (w *splitWriter) Write(data []byte) (written int, err error) {
if 0 < w.prefixBytes && w.prefixBytes < int64(len(data)) {
written, err = w.writer.Write(data[:w.prefixBytes])
w.prefixBytes -= int64(written)
for 0 < w.prefixBytes && w.prefixBytes < int64(len(data)) {
dataToSend := data[:w.prefixBytes]
n, err := w.writer.Write(dataToSend)
written += n
w.prefixBytes -= int64(n)
if err != nil {
return written, err
}
data = data[written:]
data = data[n:]

w.repeatsNumberLeft -= 1
if w.repeatsNumberLeft > 0 && w.prefixBytes == 0 {
w.prefixBytes = w.skipBytes
}
}
n, err := w.writer.Write(data)
written += n
Expand Down
41 changes: 33 additions & 8 deletions transport/split/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (w *collectWrites) Write(data []byte) (int, error) {

func TestWrite_Split(t *testing.T) {
var innerWriter collectWrites
splitWriter := NewWriter(&innerWriter, 3)
splitWriter := NewWriter(&innerWriter, 3, 0, 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These can all be reverted with the backwards-compatible API.

n, err := splitWriter.Write([]byte("Request"))
require.NoError(t, err)
require.Equal(t, 7, n)
Expand All @@ -47,7 +47,7 @@ func TestWrite_Split(t *testing.T) {

func TestWrite_ShortWrite(t *testing.T) {
var innerWriter collectWrites
splitWriter := NewWriter(&innerWriter, 10)
splitWriter := NewWriter(&innerWriter, 10, 0, 0)
n, err := splitWriter.Write([]byte("Request"))
require.NoError(t, err)
require.Equal(t, 7, n)
Expand All @@ -56,7 +56,7 @@ func TestWrite_ShortWrite(t *testing.T) {

func TestWrite_Zero(t *testing.T) {
var innerWriter collectWrites
splitWriter := NewWriter(&innerWriter, 0)
splitWriter := NewWriter(&innerWriter, 0, 0, 0)
n, err := splitWriter.Write([]byte("Request"))
require.NoError(t, err)
require.Equal(t, 7, n)
Expand All @@ -65,7 +65,7 @@ func TestWrite_Zero(t *testing.T) {

func TestWrite_NeedsTwoWrites(t *testing.T) {
var innerWriter collectWrites
splitWriter := NewWriter(&innerWriter, 5)
splitWriter := NewWriter(&innerWriter, 5, 0, 0)
n, err := splitWriter.Write([]byte("Re"))
require.NoError(t, err)
require.Equal(t, 2, n)
Expand All @@ -77,13 +77,38 @@ func TestWrite_NeedsTwoWrites(t *testing.T) {

func TestWrite_Compound(t *testing.T) {
var innerWriter collectWrites
splitWriter := NewWriter(NewWriter(&innerWriter, 4), 1)
splitWriter := NewWriter(NewWriter(&innerWriter, 4, 0, 0), 1, 0, 0)
n, err := splitWriter.Write([]byte("Request"))
require.NoError(t, err)
require.Equal(t, 7, n)
require.Equal(t, [][]byte{[]byte("R"), []byte("equ"), []byte("est")}, innerWriter.writes)
}

func TestWrite_RepeatNumber3_SkipBytes5(t *testing.T) {
var innerWriter collectWrites
splitWriter := NewWriter(&innerWriter, 1, 3, 5)
n, err := splitWriter.Write([]byte("RequestRequestRequest"))
require.NoError(t, err)
require.Equal(t, 7*3, n)
require.Equal(t, [][]byte{
[]byte("R"),
// split 1
[]byte("eques"),
// split 2
[]byte("tRequ"),
// split 3
[]byte("estRequest")}, innerWriter.writes)
}

func TestWrite_RepeatNumber3_SkipBytes0(t *testing.T) {
var innerWriter collectWrites
splitWriter := NewWriter(&innerWriter, 1, 3, 0)
n, err := splitWriter.Write([]byte("Request"))
require.NoError(t, err)
require.Equal(t, 7, n)
require.Equal(t, [][]byte{[]byte("R"), []byte("equest")}, innerWriter.writes)
}

// collectReader is a [io.Reader] that appends each Read from the Reader to the reads slice.
type collectReader struct {
io.Reader
Expand All @@ -101,7 +126,7 @@ func (r *collectReader) Read(buf []byte) (int, error) {
}

func TestReadFrom(t *testing.T) {
splitWriter := NewWriter(&bytes.Buffer{}, 3)
splitWriter := NewWriter(&bytes.Buffer{}, 3, 0, 0)
rf, ok := splitWriter.(io.ReaderFrom)
require.True(t, ok)

Expand All @@ -119,7 +144,7 @@ func TestReadFrom(t *testing.T) {
}

func TestReadFrom_ShortRead(t *testing.T) {
splitWriter := NewWriter(&bytes.Buffer{}, 10)
splitWriter := NewWriter(&bytes.Buffer{}, 10, 0, 0)
rf, ok := splitWriter.(io.ReaderFrom)
require.True(t, ok)
cr := &collectReader{Reader: bytes.NewReader([]byte("Request1"))}
Expand All @@ -138,7 +163,7 @@ func TestReadFrom_ShortRead(t *testing.T) {
func BenchmarkReadFrom(b *testing.B) {
for n := 0; n < b.N; n++ {
reader := bytes.NewReader(make([]byte, n))
writer := NewWriter(io.Discard, 10)
writer := NewWriter(io.Discard, 10, 0, 0)
rf, ok := writer.(io.ReaderFrom)
require.True(b, ok)
_, err := rf.ReadFrom(reader)
Expand Down
46 changes: 42 additions & 4 deletions x/configurl/split.go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's update configurl/doc.go with the new format.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"strconv"
"strings"

"github.com/Jigsaw-Code/outline-sdk/transport"
"github.com/Jigsaw-Code/outline-sdk/transport/split"
Expand All @@ -29,11 +30,48 @@ func registerSplitStreamDialer(r TypeRegistry[transport.StreamDialer], typeID st
if err != nil {
return nil, err
}
prefixBytesStr := config.URL.Opaque
prefixBytes, err := strconv.Atoi(prefixBytesStr)
prefixBytes, repeatsNumber, skipBytes, err := parseURL(config.URL.Opaque)
if err != nil {
return nil, fmt.Errorf("prefixBytes is not a number: %v. Split config should be in split:<number> format", prefixBytesStr)
return nil, err
}
return split.NewStreamDialer(sd, int64(prefixBytes))
return split.NewStreamDialer(sd, prefixBytes, repeatsNumber, skipBytes)
})
}

func parseURL(splitUrl string) (prefixBytes int64, repeatsNumber int64, skipBytes int64, err error) {
// Split the input string by commas
parts := strings.Split(splitUrl, ",")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of a comma-separated value, let's use proper URL variables.
It would read something like prefix=1&count=2&length=10. Need to figure out proper names.

You can fallback to variables if it fails to parse as a number.


// Convert all parts to integers
values := make([]int64, len(parts))
for i, part := range parts {
values[i], err = strconv.ParseInt(part, 10, 64)
if err != nil {
return 0, 0, 0, err // Return immediately if any conversion fails
}
if values[i] < 0 {
return 0, 0, 0, fmt.Errorf("All numbers in split have to be positive, got %d", values[i])
}
}

if len(values) > 0 {
prefixBytes = values[0]
}
if len(values) > 1 {
repeatsNumber = values[1]
}
if len(values) > 2 {
skipBytes = values[2]
}
if len(values) > 3 {
return 0, 0, 0, fmt.Errorf("Got too many values to parse, expected split:<number>,[<number>,<number>]. Got %s", splitUrl)
}

if repeatsNumber > 0 && skipBytes == 0 {
return 0, 0, 0, fmt.Errorf(
"If repeatsNumber is >0, then skipBytes has to be >0. Got prefixBytes=%d repeatsNumber=%d skipBytes=%d",
prefixBytes, repeatsNumber, skipBytes)
}

return prefixBytes, repeatsNumber, skipBytes, nil
}
7 changes: 6 additions & 1 deletion x/examples/fetch-speed/main.go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"strings"
"time"

"crypto/tls"

"github.com/Jigsaw-Code/outline-sdk/x/configurl"
)

Expand Down Expand Up @@ -72,7 +74,10 @@ func main() {
}
return dialer.DialStream(ctx, net.JoinHostPort(host, port))
}
httpClient := &http.Client{Transport: &http.Transport{DialContext: dialContext}, Timeout: *timeoutFlag}
httpClient := &http.Client{Transport: &http.Transport{
DialContext: dialContext,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}, Timeout: *timeoutFlag}

req, err := http.NewRequest(*methodFlag, url, nil)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion x/examples/fetch/main.go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Damn, I tried to revert these files on github and ended up removing them by accident. Still trying to figure out this cross-repo thing

Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func main() {
methodFlag := flag.String("method", "GET", "The HTTP method to use")
var headersFlag stringArrayFlagValue
flag.Var(&headersFlag, "H", "Raw HTTP Header line to add. It must not end in \\r\\n")
timeoutSecFlag := flag.Int("timeout", 5, "Timeout in seconds")
timeoutSecFlag := flag.Int("timeout", 100500, "Timeout in seconds")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert


flag.Parse()

Expand Down
2 changes: 2 additions & 0 deletions x/go.mod
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert

Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,5 @@ require (
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/Jigsaw-Code/outline-sdk => /home/peter/outline-sdk