Skip to content

Commit

Permalink
Implement functionality to terminate ndt7 tests early given a client …
Browse files Browse the repository at this point in the history
…parameter (#390)

* Initial early exit changes

* Use parameter

* Address comments

* Move Params to sender package
  • Loading branch information
cristinaleonr committed Jun 27, 2023
1 parent ec2a361 commit 5824ad0
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 47 deletions.
23 changes: 16 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,43 @@ go 1.18

require (
github.com/apex/log v1.9.0
github.com/google/uuid v1.2.0
github.com/google/uuid v1.3.0
github.com/gorilla/handlers v1.5.1
github.com/gorilla/websocket v1.5.0
github.com/m-lab/access v0.0.11
github.com/m-lab/go v0.1.53
github.com/m-lab/go v0.1.66
github.com/m-lab/tcp-info v1.5.3
github.com/m-lab/uuid v1.0.1
github.com/prometheus/client_golang v1.13.0
go.uber.org/goleak v1.1.12
gopkg.in/m-lab/pipe.v3 v3.0.0-20180108231244-604e84f43ee0
)

require (
github.com/google/go-cmp v0.5.9 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/stretchr/testify v1.8.1 // indirect
golang.org/x/tools v0.6.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
)

require (
github.com/araddon/dateparse v0.0.0-20200409225146-d820a6159ab1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/felixge/httpsnoop v1.0.1 // indirect
github.com/gocarina/gocsv v0.0.0-20210408192840-02d7211d929d // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/justinas/alice v1.2.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 // indirect
golang.org/x/crypto v0.7.0 // indirect
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
google.golang.org/protobuf v1.28.1 // indirect
golang.org/x/sys v0.7.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/square/go-jose.v2 v2.6.0 // indirect
)
68 changes: 33 additions & 35 deletions go.sum

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions ndt7/download/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
// for the subtest. The conn argument is the open WebSocket connection. The data
// argument is the archival data where results are saved. All arguments are
// owned by the caller of this function.
func Do(ctx context.Context, conn *websocket.Conn, data *model.ArchivalData) error {
func Do(ctx context.Context, conn *websocket.Conn, data *model.ArchivalData, params *sender.Params) error {
// Implementation note: use child contexts so the sender is strictly time
// bounded. After timeout, the sender closes the conn, which results in the
// receiver completing.
Expand All @@ -24,7 +24,7 @@ func Do(ctx context.Context, conn *websocket.Conn, data *model.ArchivalData) err

// Perform download and save server-measurements in data.
// TODO: move sender.Start logic to this file.
err := sender.Start(ctx, conn, data)
err := sender.Start(ctx, conn, data, params)

// Block on the receiver completing to guarantee that access to data is synchronous.
<-recv.Done()
Expand Down
18 changes: 16 additions & 2 deletions ndt7/download/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ import (
"github.com/m-lab/ndt-server/ndt7/spec"
)

// Params defines the parameters for the sender to end the test early.
type Params struct {
IsEarlyExit bool
MaxBytes int64
}

func makePreparedMessage(size int) (*websocket.PreparedMessage, error) {
data := make([]byte, size)
_, err := rand.Read(data)
Expand All @@ -32,7 +38,7 @@ func makePreparedMessage(size int) (*websocket.PreparedMessage, error) {
// Liveness guarantee: the sender will not be stuck sending for more than the
// MaxRuntime of the subtest. This is enforced by setting the write deadline to
// Time.Now() + MaxRuntime.
func Start(ctx context.Context, conn *websocket.Conn, data *model.ArchivalData) error {
func Start(ctx context.Context, conn *websocket.Conn, data *model.ArchivalData, params *Params) error {
logging.Logger.Debug("sender: start")
proto := ndt7metrics.ConnLabel(conn)

Expand Down Expand Up @@ -70,7 +76,7 @@ func Start(ctx context.Context, conn *websocket.Conn, data *model.ArchivalData)
for {
select {
case m, ok := <-src:
if !ok { // This means that the measurer has terminated
if !ok { // This means that the measurer has terminated.
closer.StartClosing(conn)
ndt7metrics.ClientSenderErrors.WithLabelValues(
proto, string(spec.SubtestDownload), "measurer-closed").Inc()
Expand All @@ -90,6 +96,14 @@ func Start(ctx context.Context, conn *websocket.Conn, data *model.ArchivalData)
proto, string(spec.SubtestDownload), "ping-send-ticks").Inc()
return err
}
// End the test once enough bytes have been acked.
if params.IsEarlyExit && m.TCPInfo != nil &&
m.TCPInfo.BytesAcked >= params.MaxBytes {
closer.StartClosing(conn)
ndt7metrics.ClientSenderErrors.WithLabelValues(
proto, string(spec.SubtestDownload), "measurer-closed-early").Inc()
return nil
}
default:
if err := conn.WritePreparedMessage(preparedMessage); err != nil {
logging.Logger.WithError(err).Warn(
Expand Down
36 changes: 35 additions & 1 deletion ndt7/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (
"net/http"
"net/url"
"regexp"
"strconv"
"time"

"github.com/gorilla/websocket"
"golang.org/x/exp/slices"

"github.com/m-lab/access/controller"
"github.com/m-lab/go/prometheusx"
Expand All @@ -20,6 +22,7 @@ import (
"github.com/m-lab/ndt-server/metadata"
"github.com/m-lab/ndt-server/metrics"
"github.com/m-lab/ndt-server/ndt7/download"
"github.com/m-lab/ndt-server/ndt7/download/sender"
ndt7metrics "github.com/m-lab/ndt-server/ndt7/metrics"
"github.com/m-lab/ndt-server/ndt7/model"
"github.com/m-lab/ndt-server/ndt7/results"
Expand Down Expand Up @@ -64,6 +67,13 @@ func (h Handler) Upload(rw http.ResponseWriter, req *http.Request) {
// runMeasurement conditionally runs either download or upload based on kind.
// The kind argument must be spec.SubtestDownload or spec.SubtestUpload.
func (h Handler) runMeasurement(kind spec.SubtestKind, rw http.ResponseWriter, req *http.Request) {
// Validate client request before opening the connection.
params, err := validateEarlyExit(req.URL.Query())
if err != nil {
warnAndClose(rw, err.Error())
return
}

// Setup websocket connection.
conn := setupConn(rw, req)
if conn == nil {
Expand Down Expand Up @@ -109,7 +119,7 @@ func (h Handler) runMeasurement(kind spec.SubtestKind, rw http.ResponseWriter, r
var rate float64
if kind == spec.SubtestDownload {
result.Download = data
err = download.Do(ctx, conn, data)
err = download.Do(ctx, conn, data, params)
rate = downRate(data.ServerMeasurements)
} else if kind == spec.SubtestUpload {
result.Upload = data
Expand Down Expand Up @@ -240,3 +250,27 @@ func appendClientMetadata(data *model.ArchivalData, values url.Values) {
})
}
}

// validateEarlyExit verifies and returns the "early_exit" parameters.
func validateEarlyExit(values url.Values) (*sender.Params, error) {
for name, values := range values {
if name != spec.EarlyExitParameterName {
continue
}

value := values[0]
if !slices.Contains(spec.ValidEarlyExitValues, value) {
return nil, fmt.Errorf("Invalid %s parameter value %s", name, value)
}

// Convert string to int64.
bytes, _ := strconv.ParseInt(value, 10, 64)
return &sender.Params{
IsEarlyExit: true,
MaxBytes: bytes * 1000000, // Conver MB to bytes.
}, nil
}
return &sender.Params{
IsEarlyExit: false,
}, nil
}
66 changes: 66 additions & 0 deletions ndt7/handler/handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Package handler implements the WebSocket handler for ndt7.
package handler

import (
"net/url"
"reflect"
"testing"

"github.com/m-lab/ndt-server/ndt7/download/sender"
"github.com/m-lab/ndt-server/ndt7/spec"
)

func Test_validateEarlyExit(t *testing.T) {
type args struct {
values url.Values
}
tests := []struct {
name string
values url.Values
want *sender.Params
wantErr bool
}{
{
name: "valid-param",
values: url.Values{"early_exit": {spec.ValidEarlyExitValues[0]}},
want: &sender.Params{
IsEarlyExit: true,
MaxBytes: 250000000,
},
wantErr: false,
},
{
name: "invalid-param",
values: url.Values{"early_exit": {"123"}},
want: nil,
wantErr: true,
},
{
name: "missing-value",
values: url.Values{"early_exit": {""}},
want: nil,
wantErr: true,
},
{
name: "absent-param",
values: url.Values{"foo": {"bar"}},
want: &sender.Params{
IsEarlyExit: false,
MaxBytes: 0,
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := validateEarlyExit(tt.values)
if (err != nil) != tt.wantErr {
t.Errorf("validateEarlyExit() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("validateEarlyExit() = %v, want %v", got, tt.want)
}
})
}
}
15 changes: 15 additions & 0 deletions ndt7/spec/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ const MaxMessageSize = 1 << 24
// a good compromise between Go and JavaScript as seen in cloud based tests.
const MaxScaledMessageSize = 1 << 20

// ValidEarlyExitValues contains the set of accepted MB transfer amounts after which
// ndt7 download tests can be prematurely terminated.
// Client requests with values outside of this set will result in a 400 error.
var ValidEarlyExitValues = []string{"250"}

// EarlyExitParameterName is the name of the parameter that clients can use to terminate
// ndt7 download tests once the test has transferred as many MB as the parameter's value.
const EarlyExitParameterName = "early_exit"

// DefaultWebsocketBufferSize is the read and write buffer sizes used when
// creating a websocket connection. This size is independent of the websocket
// message sizes defined above (which may be larger) and used to optimize read
Expand Down Expand Up @@ -65,3 +74,9 @@ const (
// SubtestUpload is a upload subtest
SubtestUpload = SubtestKind("upload")
)

// Params defines the client parameters for ndt7 requests.
type Params struct {
IsEarlyExit bool
MaxBytes int64
}

0 comments on commit 5824ad0

Please sign in to comment.