Skip to content

Commit

Permalink
feat(engine): allow runner to return many measurements (#527)
Browse files Browse the repository at this point in the history
This is required to implement websteps, which is currently tracked
by ooni/probe#1733.

We introduce the concept of async runner. An async runner will
post measurements on a channel until it is done. When it is done,
it will close the channel to notify the reader about that.

This change causes sync experiments now to strictly return either
a non-nil measurement or a non-nil error.

While this is a pretty much obvious situation in golang, we had
some parts of the codebase that were not robust to this assumption
and attempted to submit a measurement after the measure call
returned an error.

Luckily, we had enough tests to catch this change in our assumption
and this is why there are extra docs and tests changes.
  • Loading branch information
bassosimone authored Sep 29, 2021
1 parent 8931a36 commit ff1c170
Show file tree
Hide file tree
Showing 8 changed files with 203 additions and 44 deletions.
9 changes: 3 additions & 6 deletions internal/cmd/miniooni/libminiooni.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,15 +460,12 @@ type experimentWrapper struct {
total int
}

func (ew *experimentWrapper) MeasureWithContext(
ctx context.Context, idx int, input string) (*model.Measurement, error) {
func (ew *experimentWrapper) MeasureAsync(
ctx context.Context, input string, idx int) (<-chan *model.Measurement, error) {
if input != "" {
log.Infof("[%d/%d] running with input: %s", idx+1, ew.total, input)
}
measurement, err := ew.child.MeasureWithContext(ctx, idx, input)
warnOnError(err, "measurement failed")
// policy: we do not stop the loop if the measurement fails
return measurement, nil
return ew.child.MeasureAsync(ctx, input, idx)
}

type submitterWrapper struct {
Expand Down
2 changes: 2 additions & 0 deletions internal/engine/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Package engine contains the engine API.
package engine
130 changes: 116 additions & 14 deletions internal/engine/experiment.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
// Package engine contains the engine API.
package engine

import (
Expand Down Expand Up @@ -92,28 +91,130 @@ func (e *Experiment) ReportID() string {
// Measure performs a measurement with input. We assume that you have
// configured the available test helpers, either manually or by calling
// the session's MaybeLookupBackends() method.
//
// Return value: strictly either a non-nil measurement and
// a nil error or a nil measurement and a non-nil error.
//
// CAVEAT: while this API is perfectly fine for experiments that
// return a single measurement, it will only return the first measurement
// when used with an asynchronous experiment. We plan on eventually
// migrating all experiments to run in asynchronous fashion.
//
// Deprecated: use MeasureWithContext instead, please.
func (e *Experiment) Measure(input string) (*model.Measurement, error) {
return e.MeasureWithContext(context.Background(), input)
}

// experimentAsyncWrapper makes a sync experiment behave like it was async
type experimentAsyncWrapper struct {
*Experiment
}

var _ model.ExperimentMeasurerAsync = &experimentAsyncWrapper{}

// RunAsync implements ExperimentMeasurerAsync.RunAsync.
func (eaw *experimentAsyncWrapper) RunAsync(
ctx context.Context, sess model.ExperimentSession, input string,
callbacks model.ExperimentCallbacks) (<-chan *model.ExperimentAsyncTestKeys, error) {
out := make(chan *model.ExperimentAsyncTestKeys)
measurement := eaw.Experiment.newMeasurement(input)
start := time.Now()
err := eaw.Experiment.measurer.Run(ctx, eaw.session, measurement, eaw.callbacks)
stop := time.Now()
if err != nil {
return nil, err
}
go func() {
defer close(out) // signal the reader we're done!
out <- &model.ExperimentAsyncTestKeys{
Extensions: measurement.Extensions,
MeasurementRuntime: stop.Sub(start).Seconds(),
TestKeys: measurement.TestKeys,
}
}()
return out, nil
}

// MeasureAsync runs an async measurement. This operation could post
// one or more measurements onto the returned channel. We'll close the
// channel when we've emitted all the measurements.
//
// Arguments:
//
// - ctx is the context for deadline/cancellation/timeout;
//
// - input is the input (typically a URL but it could also be
// just an endpoint or an empty string for input-less experiments
// such as, e.g., ndt7 and dash).
//
// Return value:
//
// - on success, channel where to post measurements (the channel
// will be closed when done) and nil error;
//
// - on failure, nil channel and non-nil error.
func (e *Experiment) MeasureAsync(
ctx context.Context, input string) (<-chan *model.Measurement, error) {
err := e.session.MaybeLookupLocationContext(ctx) // this already tracks session bytes
if err != nil {
return nil, err
}
ctx = dialer.WithSessionByteCounter(ctx, e.session.byteCounter)
ctx = dialer.WithExperimentByteCounter(ctx, e.byteCounter)
var async model.ExperimentMeasurerAsync
if v, okay := e.measurer.(model.ExperimentMeasurerAsync); okay {
async = v
} else {
async = &experimentAsyncWrapper{e}
}
in, err := async.RunAsync(ctx, e.session, input, e.callbacks)
if err != nil {
return nil, err
}
out := make(chan *model.Measurement)
go func() {
defer close(out) // we need to signal the consumer we're done
for tk := range in {
measurement := e.newMeasurement(input)
measurement.Extensions = tk.Extensions
measurement.MeasurementRuntime = tk.MeasurementRuntime
measurement.TestKeys = tk.TestKeys
if err := measurement.Scrub(e.session.ProbeIP()); err != nil {
// If we fail to scrub the measurement then we are not going to
// submit it. Most likely causes of error here are unlikely,
// e.g., the TestKeys being not serializable.
e.session.Logger().Warnf("can't scrub measurement: %s", err.Error())
continue
}
out <- measurement
}
}()
return out, nil
}

// MeasureWithContext is like Measure but with context.
//
// Return value: strictly either a non-nil measurement and
// a nil error or a nil measurement and a non-nil error.
//
// CAVEAT: while this API is perfectly fine for experiments that
// return a single measurement, it will only return the first measurement
// when used with an asynchronous experiment. We plan on eventually
// migrating all experiments to run in asynchronous fashion.
func (e *Experiment) MeasureWithContext(
ctx context.Context, input string,
) (measurement *model.Measurement, err error) {
err = e.session.MaybeLookupLocationContext(ctx) // this already tracks session bytes
out, err := e.MeasureAsync(ctx, input)
if err != nil {
return
return nil, err
}
ctx = dialer.WithSessionByteCounter(ctx, e.session.byteCounter)
ctx = dialer.WithExperimentByteCounter(ctx, e.byteCounter)
measurement = e.newMeasurement(input)
start := time.Now()
err = e.measurer.Run(ctx, e.session, measurement, e.callbacks)
stop := time.Now()
measurement.MeasurementRuntime = stop.Sub(start).Seconds()
scrubErr := measurement.Scrub(e.session.ProbeIP())
if err == nil {
err = scrubErr
for m := range out {
if measurement == nil {
measurement = m // as documented just return the first one
}
}
if measurement == nil {
err = errors.New("experiment returned no measurements")
}
return
}
Expand All @@ -139,11 +240,12 @@ func (e *Experiment) SubmitAndUpdateMeasurement(measurement *model.Measurement)
func (e *Experiment) SubmitAndUpdateMeasurementContext(
ctx context.Context, measurement *model.Measurement) error {
if e.report == nil {
return errors.New("Report is not open")
return errors.New("report is not open")
}
return e.report.SubmitMeasurement(ctx, measurement)
}

// newMeasurement creates a new measurement for this experiment with the given input.
func (e *Experiment) newMeasurement(input string) *model.Measurement {
utctimenow := time.Now().UTC()
m := &model.Measurement{
Expand Down
4 changes: 2 additions & 2 deletions internal/engine/experiment_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ func TestMeasurementFailure(t *testing.T) {
if err.Error() != "mocked error" {
t.Fatal("unexpected error type")
}
if measurement == nil {
t.Fatal("expected non nil measurement here")
if measurement != nil {
t.Fatal("expected nil measurement here")
}
}

Expand Down
44 changes: 26 additions & 18 deletions internal/engine/inputprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ import (
// InputProcessorExperiment is the Experiment
// according to InputProcessor.
type InputProcessorExperiment interface {
MeasureWithContext(
ctx context.Context, input string) (*model.Measurement, error)
MeasureAsync(
ctx context.Context, input string) (<-chan *model.Measurement, error)
}

// InputProcessorExperimentWrapper is a wrapper for an
// Experiment that also allow to pass around the input index.
type InputProcessorExperimentWrapper interface {
MeasureWithContext(
ctx context.Context, idx int, input string) (*model.Measurement, error)
MeasureAsync(
ctx context.Context, input string, idx int) (<-chan *model.Measurement, error)
}

// NewInputProcessorExperimentWrapper creates a new
Expand All @@ -32,9 +32,9 @@ type inputProcessorExperimentWrapper struct {
exp InputProcessorExperiment
}

func (ipew inputProcessorExperimentWrapper) MeasureWithContext(
ctx context.Context, idx int, input string) (*model.Measurement, error) {
return ipew.exp.MeasureWithContext(ctx, input)
func (ipew inputProcessorExperimentWrapper) MeasureAsync(
ctx context.Context, input string, idx int) (<-chan *model.Measurement, error) {
return ipew.exp.MeasureAsync(ctx, input)
}

var _ InputProcessorExperimentWrapper = inputProcessorExperimentWrapper{}
Expand Down Expand Up @@ -142,21 +142,29 @@ func (ip *InputProcessor) run(ctx context.Context) (int, error) {
return stopMaxRuntime, nil
}
input := url.URL
meas, err := ip.Experiment.MeasureWithContext(ctx, idx, input)
var measurements []*model.Measurement
source, err := ip.Experiment.MeasureAsync(ctx, input, idx)
if err != nil {
return 0, err
}
meas.AddAnnotations(ip.Annotations)
meas.Options = ip.Options
err = ip.Submitter.Submit(ctx, idx, meas)
if err != nil {
return 0, err
// NOTE: we don't want to intermix measuring with submitting
// therefore we collect all measurements first
for meas := range source {
measurements = append(measurements, meas)
}
// Note: must be after submission because submission modifies
// the measurement to include the report ID.
err = ip.Saver.SaveMeasurement(idx, meas)
if err != nil {
return 0, err
for _, meas := range measurements {
meas.AddAnnotations(ip.Annotations)
meas.Options = ip.Options
err = ip.Submitter.Submit(ctx, idx, meas)
if err != nil {
return 0, err
}
// Note: must be after submission because submission modifies
// the measurement to include the report ID.
err = ip.Saver.SaveMeasurement(idx, meas)
if err != nil {
return 0, err
}
}
}
return stopNormal, nil
Expand Down
11 changes: 8 additions & 3 deletions internal/engine/inputprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ type FakeInputProcessorExperiment struct {
M []*model.Measurement
}

func (fipe *FakeInputProcessorExperiment) MeasureWithContext(
ctx context.Context, input string) (*model.Measurement, error) {
func (fipe *FakeInputProcessorExperiment) MeasureAsync(
ctx context.Context, input string) (<-chan *model.Measurement, error) {
if fipe.Err != nil {
return nil, fipe.Err
}
Expand All @@ -30,7 +30,12 @@ func (fipe *FakeInputProcessorExperiment) MeasureWithContext(
m.AddAnnotation("foo", "baz") // would be bar below
m.Input = model.MeasurementTarget(input)
fipe.M = append(fipe.M, m)
return m, nil
out := make(chan *model.Measurement)
go func() {
defer close(out)
out <- m
}()
return out, nil
}

func TestInputProcessorMeasurementFailed(t *testing.T) {
Expand Down
41 changes: 41 additions & 0 deletions internal/engine/model/experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,47 @@ type ExperimentSession interface {
UserAgent() string
}

// ExperimentAsyncTestKeys is the type of test keys returned by an experiment
// when running in async fashion rather than in sync fashion.
type ExperimentAsyncTestKeys struct {
// MeasurementRuntime is the total measurement runtime.
MeasurementRuntime float64

// TestKeys contains the actual test keys.
TestKeys interface{}

// Extensions contains the extensions used by this experiment.
Extensions map[string]int64
}

// ExperimentMeasurerAsync is a measurer that can run in async fashion.
//
// Currently this functionality is optional, but we will likely
// migrate all experiments to use this functionality in 2022.
type ExperimentMeasurerAsync interface {
// RunAsync runs the experiment in async fashion.
//
// Arguments:
//
// - ctx is the context for deadline/timeout/cancellation
//
// - sess is the measurement session
//
// - input is the input URL to measure
//
// - callbacks contains the experiment callbacks
//
// Returns either a channel where TestKeys are posted or an error.
//
// An error indicates that specific preconditions for running the experiment
// are not met (e.g., the input URL is invalid).
//
// On success, the experiment will post on the channel each new
// measurement until it is done and closes the channel.
RunAsync(ctx context.Context, sess ExperimentSession, input string,
callbacks ExperimentCallbacks) (<-chan *ExperimentAsyncTestKeys, error)
}

// ExperimentCallbacks contains experiment event-handling callbacks
type ExperimentCallbacks interface {
// OnProgress provides information about an experiment progress.
Expand Down
6 changes: 5 additions & 1 deletion pkg/oonimkall/internal/tasks/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,11 @@ func (r *Runner) Run(ctx context.Context) {
Idx: int64(idx),
Input: input,
})
// fallthrough: we want to submit the report anyway
// Historical note: here we used to fallthrough but, since we have
// implemented async measurements, the case where there is an error
// and we also have a valid measurement cant't happen anymore. So,
// now the only valid strategy here is to continue.
continue
}
data, err := json.Marshal(m)
runtimex.PanicOnError(err, "measurement.MarshalJSON failed")
Expand Down

0 comments on commit ff1c170

Please sign in to comment.