Skip to content

Commit

Permalink
cleanup: remove asynchronous experiments
Browse files Browse the repository at this point in the history
We originally developed asynchronous experiments as a mean to
support websteps, a nettest that returned more than one measurement
per invocation of its Measure method.

Since then, we removed websteps. Therefore, this code is currently
technically unused. Additionally, this code further complicates
implementing richer input, because it is another way of performing
measurements.

As such, in the interest of switfly moving forward with richer
input _and_ of simplifying the engine, we are now removing this
unused functionality from the tree.

Part of ooni/probe#2607
  • Loading branch information
bassosimone committed Jun 5, 2024
1 parent a5076b5 commit 236988b
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 259 deletions.
171 changes: 73 additions & 98 deletions internal/engine/experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,104 +96,6 @@ func (e *experiment) ReportID() string {
return report.ReportID()
}

// experimentAsyncWrapper makes a sync experiment behave like it was async
type experimentAsyncWrapper struct {
*experiment
}

var _ model.ExperimentMeasurerAsync = &experimentAsyncWrapper{}

// RunAsync implements ExperimentMeasurerAsync.RunAsync.
func (eaw *experimentAsyncWrapper) RunAsync(
ctx context.Context, sess model.ExperimentSession, input string,
callbacks model.ExperimentCallbacks) (<-chan *model.ExperimentAsyncTestKeys, error) {
out := make(chan *model.ExperimentAsyncTestKeys)
measurement := eaw.experiment.newMeasurement(input)
start := time.Now()
args := &model.ExperimentArgs{
Callbacks: eaw.callbacks,
Measurement: measurement,
Session: eaw.session,
}
err := eaw.experiment.measurer.Run(ctx, args)
stop := time.Now()
if err != nil {
return nil, err
}
go func() {
defer close(out) // signal the reader we're done!
out <- &model.ExperimentAsyncTestKeys{
Extensions: measurement.Extensions,
Input: measurement.Input,
MeasurementRuntime: stop.Sub(start).Seconds(),
TestKeys: measurement.TestKeys,
TestHelpers: measurement.TestHelpers,
}
}()
return out, nil
}

// MeasureAsync implements [model.Experiment].
func (e *experiment) MeasureAsync(
ctx context.Context, input string) (<-chan *model.Measurement, error) {
err := e.session.MaybeLookupLocationContext(ctx) // this already tracks session bytes
if err != nil {
return nil, err
}
ctx = bytecounter.WithSessionByteCounter(ctx, e.session.byteCounter)
ctx = bytecounter.WithExperimentByteCounter(ctx, e.byteCounter)
var async model.ExperimentMeasurerAsync
if v, okay := e.measurer.(model.ExperimentMeasurerAsync); okay {
async = v
} else {
async = &experimentAsyncWrapper{e}
}
in, err := async.RunAsync(ctx, e.session, input, e.callbacks)
if err != nil {
return nil, err
}
out := make(chan *model.Measurement)
go func() {
defer close(out) // we need to signal the consumer we're done
for tk := range in {
measurement := e.newMeasurement(input)
measurement.Extensions = tk.Extensions
measurement.Input = tk.Input
measurement.MeasurementRuntime = tk.MeasurementRuntime
measurement.TestHelpers = tk.TestHelpers
measurement.TestKeys = tk.TestKeys
if err := model.ScrubMeasurement(measurement, e.session.ProbeIP()); err != nil {
// If we fail to scrub the measurement then we are not going to
// submit it. Most likely causes of error here are unlikely,
// e.g., the TestKeys being not serializable.
e.session.Logger().Warnf("can't scrub measurement: %s", err.Error())
continue
}
out <- measurement
}
}()
return out, nil
}

// MeasureWithContext implements [model.Experiment].
func (e *experiment) MeasureWithContext(
ctx context.Context, input string,
) (measurement *model.Measurement, err error) {
out, err := e.MeasureAsync(ctx, input)
if err != nil {
return nil, err
}
for m := range out {
if measurement == nil {
measurement = m // as documented just return the first one
}
}
if measurement == nil {
err = errors.New("experiment returned no measurements")
}
return
}

// SubmitAndUpdateMeasurementContext implements [model.Experiment].
func (e *experiment) SubmitAndUpdateMeasurementContext(
ctx context.Context, measurement *model.Measurement) error {
Expand Down Expand Up @@ -277,6 +179,79 @@ func (e *experiment) OpenReportContext(ctx context.Context) error {
return nil
}

// MeasureWithContext implements [model.Experiment].
func (e *experiment) MeasureWithContext(ctx context.Context, input string) (*model.Measurement, error) {
// Here we ensure that we have already looked up the probe location
// information such that we correctly populate the measurement and also
// VERY IMPORTANTLY to scrub the IP address from the measurement.
//
// Also, this SHOULD happen before wrapping the context for byte counting
// since MaybeLookupLocationContext already accounts for bytes I/O.
//
// TODO(bassosimone,DecFox): historically we did this only for measuring
// and not for opening a report, which probably is not correct. Because the
// function call is idempotent, call it also when opening a report?
if err := e.session.MaybeLookupLocationContext(ctx); err != nil {
return nil, err
}

// Tweak the context such that the bytes sent and received are accounted
// to both the session's byte counter and to the experiment's byte counter.
ctx = bytecounter.WithSessionByteCounter(ctx, e.session.byteCounter)
ctx = bytecounter.WithExperimentByteCounter(ctx, e.byteCounter)

// Create a new measurement that the experiment measurer will finish filling
// by adding the test keys etc. Please, note that, as of 2024-06-05, we're using
// the measurement Input to provide input to an experiment. We'll probably
// change this, when we'll have finished implementing richer input.
measurement := e.newMeasurement(input)

// Record when we started the experiment, to compute the runtime.
start := time.Now()

// Prepare the arguments for the experiment measurer
args := &model.ExperimentArgs{
Callbacks: e.callbacks,
Measurement: measurement,
Session: e.session,
}

// Invoke the measurer. Conventionally, an error being returned here
// indicates that something went wrong during the measurement. For example,
// it could be that the user provided us with a malformed input. In case
// there's censorship, by all means the experiment should return a nil error
// and fill the measurement accordingly.
err := e.measurer.Run(ctx, args)

// Record when the experiment finished running.
stop := time.Now()

// Handle the case where there was a fundamental error.
if err != nil {
return nil, err
}

// Make sure we record the measurement runtime.
measurement.MeasurementRuntime = stop.Sub(start).Seconds()

// Scub the measurement removing the probe IP addr from it. We are 100% sure we know
// our own IP addr, since we called MaybeLookupLocation above. Obviously, we aren't
// going to submit the measurement in case we can't scrub it, so we just return an error
// if this specific corner case happens.
//
// TODO(bassosimone,DecFox): a dual stack client MAY be such that we discover its IPv4
// address but the IPv6 address is present inside the measurement. We should ensure that
// we improve our discovering capabilities to also cover this specific case.
if err := model.ScrubMeasurement(measurement, e.session.ProbeIP()); err != nil {
e.session.Logger().Warnf("can't scrub measurement: %s", err.Error())
return nil, err
}

// We're all good! Let us return the measurement to the caller, which will
// addtionally take care that we're submitting it, if needed.
return measurement, nil
}

func (e *experiment) newReportTemplate() model.OOAPIReportTemplate {
return model.OOAPIReportTemplate{
DataFormatVersion: model.OOAPIReportDefaultDataFormatVersion,
Expand Down
7 changes: 0 additions & 7 deletions internal/mocks/experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ type Experiment struct {

MockReportID func() string

MockMeasureAsync func(ctx context.Context, input string) (<-chan *model.Measurement, error)

MockMeasureWithContext func(
ctx context.Context, input string) (measurement *model.Measurement, err error)

Expand Down Expand Up @@ -45,11 +43,6 @@ func (e *Experiment) ReportID() string {
return e.MockReportID()
}

func (e *Experiment) MeasureAsync(
ctx context.Context, input string) (<-chan *model.Measurement, error) {
return e.MockMeasureAsync(ctx, input)
}

func (e *Experiment) MeasureWithContext(
ctx context.Context, input string) (measurement *model.Measurement, err error) {
return e.MockMeasureWithContext(ctx, input)
Expand Down
16 changes: 0 additions & 16 deletions internal/mocks/experiment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,6 @@ func TestExperiment(t *testing.T) {
}
})

t.Run("MeasureAsync", func(t *testing.T) {
expected := errors.New("mocked err")
e := &Experiment{
MockMeasureAsync: func(ctx context.Context, input string) (<-chan *model.Measurement, error) {
return nil, expected
},
}
out, err := e.MeasureAsync(context.Background(), "xo")
if !errors.Is(err, expected) {
t.Fatal("unexpected err", err)
}
if out != nil {
t.Fatal("expected nil")
}
})

t.Run("MeasureWithContext", func(t *testing.T) {
expected := errors.New("mocked err")
e := &Experiment{
Expand Down
86 changes: 6 additions & 80 deletions internal/model/experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,57 +53,13 @@ type ExperimentSession interface {
UserAgent() string
}

// ExperimentAsyncTestKeys is the type of test keys returned by an experiment
// when running in async fashion rather than in sync fashion.
type ExperimentAsyncTestKeys struct {
// Extensions contains the extensions used by this experiment.
Extensions map[string]int64

// Input is the input this measurement refers to.
Input MeasurementInput

// MeasurementRuntime is the total measurement runtime.
MeasurementRuntime float64

// TestHelpers contains the test helpers used in the experiment
TestHelpers map[string]interface{}

// TestKeys contains the actual test keys.
TestKeys interface{}
}

// ExperimentMeasurerAsync is a measurer that can run in async fashion.
//
// Currently this functionality is optional, but we will likely
// migrate all experiments to use this functionality in 2022.
type ExperimentMeasurerAsync interface {
// RunAsync runs the experiment in async fashion.
//
// Arguments:
//
// - ctx is the context for deadline/timeout/cancellation
//
// - sess is the measurement session
//
// - input is the input URL to measure
//
// - callbacks contains the experiment callbacks
//
// Returns either a channel where TestKeys are posted or an error.
//
// An error indicates that specific preconditions for running the experiment
// are not met (e.g., the input URL is invalid).
//
// On success, the experiment will post on the channel each new
// measurement until it is done and closes the channel.
RunAsync(ctx context.Context, sess ExperimentSession, input string,
callbacks ExperimentCallbacks) (<-chan *ExperimentAsyncTestKeys, error)
}

// ExperimentCallbacks contains experiment event-handling callbacks
// ExperimentCallbacks contains experiment event-handling callbacks.
type ExperimentCallbacks interface {
// OnProgress provides information about an experiment progress.
OnProgress(percentage float64, message string)
// OnProgress provides information about the experiment's progress.
//
// The prog field is a number between 0.0 and 1.0 representing progress, where
// 0.0 corresponds to 0% and 1.0 corresponds to 100%.
OnProgress(prog float64, message string)
}

// PrinterCallbacks is the default event handler
Expand Down Expand Up @@ -166,51 +122,21 @@ type Experiment interface {

// ReportID returns the open report's ID, if we have opened a report
// successfully before, or an empty string, otherwise.
//
// Deprecated: new code should use a Submitter.
ReportID() string

// MeasureAsync runs an async measurement. This operation could post
// one or more measurements onto the returned channel. We'll close the
// channel when we've emitted all the measurements.
//
// Arguments:
//
// - ctx is the context for deadline/cancellation/timeout;
//
// - input is the input (typically a URL but it could also be
// just an endpoint or an empty string for input-less experiments
// such as, e.g., ndt7 and dash).
//
// Return value:
//
// - on success, channel where to post measurements (the channel
// will be closed when done) and nil error;
//
// - on failure, nil channel and non-nil error.
MeasureAsync(ctx context.Context, input string) (<-chan *Measurement, error)

// MeasureWithContext performs a synchronous measurement.
//
// Return value: strictly either a non-nil measurement and
// a nil error or a nil measurement and a non-nil error.
//
// CAVEAT: while this API is perfectly fine for experiments that
// return a single measurement, it will only return the first measurement
// when used with an asynchronous experiment.
MeasureWithContext(ctx context.Context, input string) (measurement *Measurement, err error)

// SubmitAndUpdateMeasurementContext submits a measurement and updates the
// fields whose value has changed as part of the submission.
//
// Deprecated: new code should use a Submitter.
SubmitAndUpdateMeasurementContext(
ctx context.Context, measurement *Measurement) error

// OpenReportContext will open a report using the given context
// to possibly limit the lifetime of this operation.
//
// Deprecated: new code should use a Submitter.
OpenReportContext(ctx context.Context) error
}

Expand Down
6 changes: 3 additions & 3 deletions internal/oonirun/experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,12 @@ type experimentWrapper struct {
total int
}

func (ew *experimentWrapper) MeasureAsync(
ctx context.Context, input string, idx int) (<-chan *model.Measurement, error) {
func (ew *experimentWrapper) MeasureWithContext(
ctx context.Context, input string, idx int) (*model.Measurement, error) {
if input != "" {
ew.logger.Infof("[%d/%d] running with input: %s", idx+1, ew.total, input)
}
return ew.child.MeasureAsync(ctx, input, idx)
return ew.child.MeasureWithContext(ctx, input, idx)
}

// experimentSubmitterWrapper implements a submission policy where we don't
Expand Down
15 changes: 5 additions & 10 deletions internal/oonirun/experiment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,11 @@ func TestExperimentRunWithFailureToSubmitAndShuffle(t *testing.T) {
},
MockNewExperiment: func() model.Experiment {
exp := &mocks.Experiment{
MockMeasureAsync: func(ctx context.Context, input string) (<-chan *model.Measurement, error) {
out := make(chan *model.Measurement)
go func() {
defer close(out)
ff := &testingx.FakeFiller{}
var meas model.Measurement
ff.Fill(&meas)
out <- &meas
}()
return out, nil
MockMeasureWithContext: func(ctx context.Context, input string) (*model.Measurement, error) {
ff := &testingx.FakeFiller{}
var meas model.Measurement
ff.Fill(&meas)
return &meas, nil
},
MockKibiBytesReceived: func() float64 {
calledKibiBytesReceived++
Expand Down
Loading

0 comments on commit 236988b

Please sign in to comment.