Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanup: remove asynchronous experiments #1613

Merged
merged 5 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 73 additions & 98 deletions internal/engine/experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,104 +96,6 @@ func (e *experiment) ReportID() string {
return report.ReportID()
}

// experimentAsyncWrapper makes a sync experiment behave like it was async
type experimentAsyncWrapper struct {
*experiment
}

var _ model.ExperimentMeasurerAsync = &experimentAsyncWrapper{}

// RunAsync implements ExperimentMeasurerAsync.RunAsync.
func (eaw *experimentAsyncWrapper) RunAsync(
ctx context.Context, sess model.ExperimentSession, input string,
callbacks model.ExperimentCallbacks) (<-chan *model.ExperimentAsyncTestKeys, error) {
out := make(chan *model.ExperimentAsyncTestKeys)
measurement := eaw.experiment.newMeasurement(input)
start := time.Now()
args := &model.ExperimentArgs{
Callbacks: eaw.callbacks,
Measurement: measurement,
Session: eaw.session,
}
err := eaw.experiment.measurer.Run(ctx, args)
stop := time.Now()
if err != nil {
return nil, err
}
go func() {
defer close(out) // signal the reader we're done!
out <- &model.ExperimentAsyncTestKeys{
Extensions: measurement.Extensions,
Input: measurement.Input,
MeasurementRuntime: stop.Sub(start).Seconds(),
TestKeys: measurement.TestKeys,
TestHelpers: measurement.TestHelpers,
}
}()
return out, nil
}

// MeasureAsync implements [model.Experiment].
func (e *experiment) MeasureAsync(
ctx context.Context, input string) (<-chan *model.Measurement, error) {
err := e.session.MaybeLookupLocationContext(ctx) // this already tracks session bytes
if err != nil {
return nil, err
}
ctx = bytecounter.WithSessionByteCounter(ctx, e.session.byteCounter)
ctx = bytecounter.WithExperimentByteCounter(ctx, e.byteCounter)
var async model.ExperimentMeasurerAsync
if v, okay := e.measurer.(model.ExperimentMeasurerAsync); okay {
async = v
} else {
async = &experimentAsyncWrapper{e}
}
in, err := async.RunAsync(ctx, e.session, input, e.callbacks)
if err != nil {
return nil, err
}
out := make(chan *model.Measurement)
go func() {
defer close(out) // we need to signal the consumer we're done
for tk := range in {
measurement := e.newMeasurement(input)
measurement.Extensions = tk.Extensions
measurement.Input = tk.Input
measurement.MeasurementRuntime = tk.MeasurementRuntime
measurement.TestHelpers = tk.TestHelpers
measurement.TestKeys = tk.TestKeys
if err := model.ScrubMeasurement(measurement, e.session.ProbeIP()); err != nil {
// If we fail to scrub the measurement then we are not going to
// submit it. Most likely causes of error here are unlikely,
// e.g., the TestKeys being not serializable.
e.session.Logger().Warnf("can't scrub measurement: %s", err.Error())
continue
}
out <- measurement
}
}()
return out, nil
}

// MeasureWithContext implements [model.Experiment].
func (e *experiment) MeasureWithContext(
ctx context.Context, input string,
) (measurement *model.Measurement, err error) {
out, err := e.MeasureAsync(ctx, input)
if err != nil {
return nil, err
}
for m := range out {
if measurement == nil {
measurement = m // as documented just return the first one
}
}
if measurement == nil {
err = errors.New("experiment returned no measurements")
}
return
}

// SubmitAndUpdateMeasurementContext implements [model.Experiment].
func (e *experiment) SubmitAndUpdateMeasurementContext(
ctx context.Context, measurement *model.Measurement) error {
Expand Down Expand Up @@ -277,6 +179,79 @@ func (e *experiment) OpenReportContext(ctx context.Context) error {
return nil
}

// MeasureWithContext implements [model.Experiment].
func (e *experiment) MeasureWithContext(ctx context.Context, input string) (*model.Measurement, error) {
// Here we ensure that we have already looked up the probe location
// information such that we correctly populate the measurement and also
// VERY IMPORTANTLY to scrub the IP address from the measurement.
//
// Also, this SHOULD happen before wrapping the context for byte counting
// since MaybeLookupLocationContext already accounts for bytes I/O.
//
// TODO(bassosimone,DecFox): historically we did this only for measuring
// and not for opening a report, which probably is not correct. Because the
// function call is idempotent, call it also when opening a report?
if err := e.session.MaybeLookupLocationContext(ctx); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if MaybeLookupLocationWithContext would be more descriptive?

Re. moment to call it, a subtle concern here might be the low likelihood that there's a network change between measurement and submission.

Thinking mostly in asynchronous submission, I think it'd be healthy to a) call it when submitting; b) raise an error (or warning) if submitting a msmt for a different (CC,ASN) that the report we've opened. I can work on that as part of my future deliverables, that include deferred submission (which is an special case).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ainghazal wrote:

I was wondering if MaybeLookupLocationWithContext would be more descriptive?

Yeah, so, first of all we can gain some more characters by removing WithContext, which is an artifact from when we also had a version of the call without a context. How would you suggest to change the name?

Re. moment to call it, a subtle concern here might be the low likelihood that there's a network change between measurement and submission.

Thinking mostly in asynchronous submission, I think it'd be healthy to a) call it when submitting; b) raise an error (or warning) if submitting a msmt for a different (CC,ASN) that the report we've opened. I can work on that as part of my future deliverables, that include deferred submission (which is an special case).

As far as I know, when we're resubmitting we're using a probeservices.Submitter: https://github.com/ooni/probe-cli/blob/master/internal/probeservices/collector.go#L185. The code should open reports only based on the data stored inside the measurement, so it should support the use case of measuring, say, w/o VPN, and then submitting with VPN. It would be great to have another pair of eyes looking at the code as well as having some manual testing to ensure whether this is still true. It might also be good to have a test guaranteeing that this is the case.

Copy link
Contributor

@ainghazal ainghazal Jun 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would you suggest to change the name?

if so, perhaps just MaybeLookupLocation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

return nil, err
}

// Tweak the context such that the bytes sent and received are accounted
// to both the session's byte counter and to the experiment's byte counter.
ctx = bytecounter.WithSessionByteCounter(ctx, e.session.byteCounter)
ctx = bytecounter.WithExperimentByteCounter(ctx, e.byteCounter)

// Create a new measurement that the experiment measurer will finish filling
// by adding the test keys etc. Please, note that, as of 2024-06-05, we're using
// the measurement Input to provide input to an experiment. We'll probably
// change this, when we'll have finished implementing richer input.
measurement := e.newMeasurement(input)

// Record when we started the experiment, to compute the runtime.
start := time.Now()

// Prepare the arguments for the experiment measurer
args := &model.ExperimentArgs{
Callbacks: e.callbacks,
Measurement: measurement,
Session: e.session,
}

// Invoke the measurer. Conventionally, an error being returned here
// indicates that something went wrong during the measurement. For example,
// it could be that the user provided us with a malformed input. In case
// there's censorship, by all means the experiment should return a nil error
// and fill the measurement accordingly.
err := e.measurer.Run(ctx, args)

// Record when the experiment finished running.
stop := time.Now()

// Handle the case where there was a fundamental error.
if err != nil {
return nil, err
}

// Make sure we record the measurement runtime.
measurement.MeasurementRuntime = stop.Sub(start).Seconds()

// Scub the measurement removing the probe IP addr from it. We are 100% sure we know
bassosimone marked this conversation as resolved.
Show resolved Hide resolved
// our own IP addr, since we called MaybeLookupLocation above. Obviously, we aren't
// going to submit the measurement in case we can't scrub it, so we just return an error
// if this specific corner case happens.
//
// TODO(bassosimone,DecFox): a dual stack client MAY be such that we discover its IPv4
// address but the IPv6 address is present inside the measurement. We should ensure that
// we improve our discovering capabilities to also cover this specific case.
if err := model.ScrubMeasurement(measurement, e.session.ProbeIP()); err != nil {
e.session.Logger().Warnf("can't scrub measurement: %s", err.Error())
return nil, err
}

// We're all good! Let us return the measurement to the caller, which will
// addtionally take care that we're submitting it, if needed.
return measurement, nil
}

func (e *experiment) newReportTemplate() model.OOAPIReportTemplate {
return model.OOAPIReportTemplate{
DataFormatVersion: model.OOAPIReportDefaultDataFormatVersion,
Expand Down
7 changes: 0 additions & 7 deletions internal/mocks/experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ type Experiment struct {

MockReportID func() string

MockMeasureAsync func(ctx context.Context, input string) (<-chan *model.Measurement, error)

MockMeasureWithContext func(
ctx context.Context, input string) (measurement *model.Measurement, err error)

Expand Down Expand Up @@ -45,11 +43,6 @@ func (e *Experiment) ReportID() string {
return e.MockReportID()
}

func (e *Experiment) MeasureAsync(
ctx context.Context, input string) (<-chan *model.Measurement, error) {
return e.MockMeasureAsync(ctx, input)
}

func (e *Experiment) MeasureWithContext(
ctx context.Context, input string) (measurement *model.Measurement, err error) {
return e.MockMeasureWithContext(ctx, input)
Expand Down
16 changes: 0 additions & 16 deletions internal/mocks/experiment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,6 @@ func TestExperiment(t *testing.T) {
}
})

t.Run("MeasureAsync", func(t *testing.T) {
expected := errors.New("mocked err")
e := &Experiment{
MockMeasureAsync: func(ctx context.Context, input string) (<-chan *model.Measurement, error) {
return nil, expected
},
}
out, err := e.MeasureAsync(context.Background(), "xo")
if !errors.Is(err, expected) {
t.Fatal("unexpected err", err)
}
if out != nil {
t.Fatal("expected nil")
}
})

t.Run("MeasureWithContext", func(t *testing.T) {
expected := errors.New("mocked err")
e := &Experiment{
Expand Down
86 changes: 6 additions & 80 deletions internal/model/experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,57 +53,13 @@ type ExperimentSession interface {
UserAgent() string
}

// ExperimentAsyncTestKeys is the type of test keys returned by an experiment
// when running in async fashion rather than in sync fashion.
type ExperimentAsyncTestKeys struct {
// Extensions contains the extensions used by this experiment.
Extensions map[string]int64

// Input is the input this measurement refers to.
Input MeasurementInput

// MeasurementRuntime is the total measurement runtime.
MeasurementRuntime float64

// TestHelpers contains the test helpers used in the experiment
TestHelpers map[string]interface{}

// TestKeys contains the actual test keys.
TestKeys interface{}
}

// ExperimentMeasurerAsync is a measurer that can run in async fashion.
//
// Currently this functionality is optional, but we will likely
// migrate all experiments to use this functionality in 2022.
type ExperimentMeasurerAsync interface {
// RunAsync runs the experiment in async fashion.
//
// Arguments:
//
// - ctx is the context for deadline/timeout/cancellation
//
// - sess is the measurement session
//
// - input is the input URL to measure
//
// - callbacks contains the experiment callbacks
//
// Returns either a channel where TestKeys are posted or an error.
//
// An error indicates that specific preconditions for running the experiment
// are not met (e.g., the input URL is invalid).
//
// On success, the experiment will post on the channel each new
// measurement until it is done and closes the channel.
RunAsync(ctx context.Context, sess ExperimentSession, input string,
callbacks ExperimentCallbacks) (<-chan *ExperimentAsyncTestKeys, error)
}

// ExperimentCallbacks contains experiment event-handling callbacks
// ExperimentCallbacks contains experiment event-handling callbacks.
type ExperimentCallbacks interface {
// OnProgress provides information about an experiment progress.
OnProgress(percentage float64, message string)
// OnProgress provides information about the experiment's progress.
//
// The prog field is a number between 0.0 and 1.0 representing progress, where
// 0.0 corresponds to 0% and 1.0 corresponds to 100%.
OnProgress(prog float64, message string)
}

// PrinterCallbacks is the default event handler
Expand Down Expand Up @@ -166,51 +122,21 @@ type Experiment interface {

// ReportID returns the open report's ID, if we have opened a report
// successfully before, or an empty string, otherwise.
//
// Deprecated: new code should use a Submitter.
ReportID() string

// MeasureAsync runs an async measurement. This operation could post
// one or more measurements onto the returned channel. We'll close the
// channel when we've emitted all the measurements.
//
// Arguments:
//
// - ctx is the context for deadline/cancellation/timeout;
//
// - input is the input (typically a URL but it could also be
// just an endpoint or an empty string for input-less experiments
// such as, e.g., ndt7 and dash).
//
// Return value:
//
// - on success, channel where to post measurements (the channel
// will be closed when done) and nil error;
//
// - on failure, nil channel and non-nil error.
MeasureAsync(ctx context.Context, input string) (<-chan *Measurement, error)

// MeasureWithContext performs a synchronous measurement.
//
// Return value: strictly either a non-nil measurement and
// a nil error or a nil measurement and a non-nil error.
//
// CAVEAT: while this API is perfectly fine for experiments that
// return a single measurement, it will only return the first measurement
// when used with an asynchronous experiment.
MeasureWithContext(ctx context.Context, input string) (measurement *Measurement, err error)

// SubmitAndUpdateMeasurementContext submits a measurement and updates the
// fields whose value has changed as part of the submission.
//
// Deprecated: new code should use a Submitter.
SubmitAndUpdateMeasurementContext(
ctx context.Context, measurement *Measurement) error

// OpenReportContext will open a report using the given context
// to possibly limit the lifetime of this operation.
//
// Deprecated: new code should use a Submitter.
OpenReportContext(ctx context.Context) error
}

Expand Down
6 changes: 3 additions & 3 deletions internal/oonirun/experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,12 @@ type experimentWrapper struct {
total int
}

func (ew *experimentWrapper) MeasureAsync(
ctx context.Context, input string, idx int) (<-chan *model.Measurement, error) {
func (ew *experimentWrapper) MeasureWithContext(
ctx context.Context, input string, idx int) (*model.Measurement, error) {
if input != "" {
ew.logger.Infof("[%d/%d] running with input: %s", idx+1, ew.total, input)
}
return ew.child.MeasureAsync(ctx, input, idx)
return ew.child.MeasureWithContext(ctx, input, idx)
}

// experimentSubmitterWrapper implements a submission policy where we don't
Expand Down
15 changes: 5 additions & 10 deletions internal/oonirun/experiment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,11 @@ func TestExperimentRunWithFailureToSubmitAndShuffle(t *testing.T) {
},
MockNewExperiment: func() model.Experiment {
exp := &mocks.Experiment{
MockMeasureAsync: func(ctx context.Context, input string) (<-chan *model.Measurement, error) {
out := make(chan *model.Measurement)
go func() {
defer close(out)
ff := &testingx.FakeFiller{}
var meas model.Measurement
ff.Fill(&meas)
out <- &meas
}()
return out, nil
MockMeasureWithContext: func(ctx context.Context, input string) (*model.Measurement, error) {
ff := &testingx.FakeFiller{}
var meas model.Measurement
ff.Fill(&meas)
return &meas, nil
},
MockKibiBytesReceived: func() float64 {
calledKibiBytesReceived++
Expand Down
Loading
Loading