diff --git a/cmd/ooniprobe/internal/cli/geoip/geoip.go b/cmd/ooniprobe/internal/cli/geoip/geoip.go index 21d4ea59d8..b5074b29dd 100644 --- a/cmd/ooniprobe/internal/cli/geoip/geoip.go +++ b/cmd/ooniprobe/internal/cli/geoip/geoip.go @@ -8,6 +8,7 @@ import ( "github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/cli/root" "github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/ooni" "github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/output" + "github.com/ooni/probe-cli/v3/internal/miniengine" "github.com/ooni/probe-cli/v3/internal/model" ) @@ -20,13 +21,13 @@ func init() { type dogeoipconfig struct { Logger log.Interface - NewProbeCLI func() (ooni.ProbeCLI, error) + NewProbeCLI func() (*ooni.Probe, error) SectionTitle func(string) } var defaultconfig = dogeoipconfig{ Logger: log.Log, - NewProbeCLI: root.NewProbeCLI, + NewProbeCLI: root.Init, SectionTitle: output.SectionTitle, } @@ -37,23 +38,48 @@ func dogeoip(config dogeoipconfig) error { return err } - engine, err := probeCLI.NewProbeEngine(context.Background(), model.RunTypeManual) + // create a measurement session + sessConfig := probeCLI.NewSessionConfig(model.RunTypeManual) + sess, err := miniengine.NewSession(sessConfig) if err != nil { + log.WithError(err).Error("Failed to create a measurement session") return err } - defer engine.Close() + defer sess.Close() - err = engine.MaybeLookupLocation() + // XXX: not very lightweight to perform a full bootstrap here + + // bootstrap the measurement session + bootstrapConfig := &miniengine.BootstrapConfig{ + BackendURL: "", + CategoryCodes: []string{}, + Charging: true, + OnWiFi: true, + ProxyURL: probeCLI.ProxyURL(), + RunType: model.RunTypeManual, + SnowflakeRendezvousMethod: "", + TorArgs: []string{}, + TorBinary: "", + } + bootstrapTask := sess.Bootstrap(context.Background(), bootstrapConfig) + // XXX: skipping log messages here + <-bootstrapTask.Done() + if _, err := bootstrapTask.Result(); err != nil { + log.WithError(err).Error("Failed to bootstrap a measurement session") + return err + } + + location, err := sess.GeolocateResult() if err != nil { return err } config.Logger.WithFields(log.Fields{ "type": "table", - "asn": engine.ProbeASNString(), - "network_name": engine.ProbeNetworkName(), - "country_code": engine.ProbeCC(), - "ip": engine.ProbeIP(), + "asn": location.ProbeASNString, + "network_name": location.ProbeNetworkName, + "country_code": location.ProbeCC, + "ip": location.ProbeIP, }).Info("Looked up your location") return nil diff --git a/cmd/ooniprobe/internal/cli/info/info.go b/cmd/ooniprobe/internal/cli/info/info.go index 09974cdcb7..83c82724b1 100644 --- a/cmd/ooniprobe/internal/cli/info/info.go +++ b/cmd/ooniprobe/internal/cli/info/info.go @@ -16,12 +16,12 @@ func init() { type doinfoconfig struct { Logger log.Interface - NewProbeCLI func() (ooni.ProbeCLI, error) + NewProbeCLI func() (*ooni.Probe, error) } var defaultconfig = doinfoconfig{ Logger: log.Log, - NewProbeCLI: root.NewProbeCLI, + NewProbeCLI: root.Init, } func doinfo(config doinfoconfig) error { diff --git a/cmd/ooniprobe/internal/cli/root/root.go b/cmd/ooniprobe/internal/cli/root/root.go index 3bbcce9eb2..bbd51b02fd 100644 --- a/cmd/ooniprobe/internal/cli/root/root.go +++ b/cmd/ooniprobe/internal/cli/root/root.go @@ -20,15 +20,6 @@ var Command = Cmd.Command // Init should be called by all subcommand that care to have a ooni.Context instance var Init func() (*ooni.Probe, error) -// NewProbeCLI is like Init but returns a ooni.ProbeCLI instead. -func NewProbeCLI() (ooni.ProbeCLI, error) { - probeCLI, err := Init() - if err != nil { - return nil, err - } - return probeCLI, nil -} - func init() { configPath := Cmd.Flag("config", "Set a custom config file path").Short('c').String() diff --git a/cmd/ooniprobe/internal/nettests/dash.go b/cmd/ooniprobe/internal/nettests/dash.go index 80f8618dff..59ac3b39d4 100644 --- a/cmd/ooniprobe/internal/nettests/dash.go +++ b/cmd/ooniprobe/internal/nettests/dash.go @@ -6,9 +6,9 @@ type Dash struct { // Run starts the test func (d Dash) Run(ctl *Controller) error { - builder, err := ctl.Session.NewExperimentBuilder("dash") - if err != nil { - return err - } - return ctl.Run(builder, []string{""}) + return ctl.Run( + "dash", + "", // TODO(bassosimone) + []string{""}, + ) } diff --git a/cmd/ooniprobe/internal/nettests/dnscheck.go b/cmd/ooniprobe/internal/nettests/dnscheck.go index beebcd306d..d0d5deffb3 100644 --- a/cmd/ooniprobe/internal/nettests/dnscheck.go +++ b/cmd/ooniprobe/internal/nettests/dnscheck.go @@ -1,42 +1,11 @@ package nettests -import ( - "context" - - engine "github.com/ooni/probe-cli/v3/internal/engine" - "github.com/ooni/probe-cli/v3/internal/model" -) +import "errors" // DNSCheck nettest implementation. type DNSCheck struct{} -func (n DNSCheck) lookupURLs(ctl *Controller) ([]string, error) { - inputloader := &engine.InputLoader{ - CheckInConfig: &model.OOAPICheckInConfig{ - // not needed because we have default static input in the engine - }, - ExperimentName: "dnscheck", - InputPolicy: model.InputOrStaticDefault, - Session: ctl.Session, - SourceFiles: ctl.InputFiles, - StaticInputs: ctl.Inputs, - } - testlist, err := inputloader.Load(context.Background()) - if err != nil { - return nil, err - } - return ctl.BuildAndSetInputIdxMap(testlist) -} - // Run starts the nettest. func (n DNSCheck) Run(ctl *Controller) error { - builder, err := ctl.Session.NewExperimentBuilder("dnscheck") - if err != nil { - return err - } - urls, err := n.lookupURLs(ctl) - if err != nil { - return err - } - return ctl.Run(builder, urls) + return errors.New("not implemented") } diff --git a/cmd/ooniprobe/internal/nettests/facebook_messenger.go b/cmd/ooniprobe/internal/nettests/facebook_messenger.go index 1316babee5..2fef9ebca0 100644 --- a/cmd/ooniprobe/internal/nettests/facebook_messenger.go +++ b/cmd/ooniprobe/internal/nettests/facebook_messenger.go @@ -6,11 +6,9 @@ type FacebookMessenger struct { // Run starts the test func (h FacebookMessenger) Run(ctl *Controller) error { - builder, err := ctl.Session.NewExperimentBuilder( + return ctl.Run( "facebook_messenger", + "", // TODO(bassosimone) + []string{""}, ) - if err != nil { - return err - } - return ctl.Run(builder, []string{""}) } diff --git a/cmd/ooniprobe/internal/nettests/http_header_field_manipulation.go b/cmd/ooniprobe/internal/nettests/http_header_field_manipulation.go index 6fdd39688f..7ebe15cfcb 100644 --- a/cmd/ooniprobe/internal/nettests/http_header_field_manipulation.go +++ b/cmd/ooniprobe/internal/nettests/http_header_field_manipulation.go @@ -6,11 +6,9 @@ type HTTPHeaderFieldManipulation struct { // Run starts the test func (h HTTPHeaderFieldManipulation) Run(ctl *Controller) error { - builder, err := ctl.Session.NewExperimentBuilder( + return ctl.Run( "http_header_field_manipulation", + "", // TODO(bassosimone) + []string{""}, ) - if err != nil { - return err - } - return ctl.Run(builder, []string{""}) } diff --git a/cmd/ooniprobe/internal/nettests/http_invalid_request_line.go b/cmd/ooniprobe/internal/nettests/http_invalid_request_line.go index fb87e462d6..6292f1a26c 100644 --- a/cmd/ooniprobe/internal/nettests/http_invalid_request_line.go +++ b/cmd/ooniprobe/internal/nettests/http_invalid_request_line.go @@ -6,11 +6,9 @@ type HTTPInvalidRequestLine struct { // Run starts the test func (h HTTPInvalidRequestLine) Run(ctl *Controller) error { - builder, err := ctl.Session.NewExperimentBuilder( + return ctl.Run( "http_invalid_request_line", + "", // TODO(bassosimone) + []string{""}, ) - if err != nil { - return err - } - return ctl.Run(builder, []string{""}) } diff --git a/cmd/ooniprobe/internal/nettests/ndt.go b/cmd/ooniprobe/internal/nettests/ndt.go index b8848b0056..11de96d4f0 100644 --- a/cmd/ooniprobe/internal/nettests/ndt.go +++ b/cmd/ooniprobe/internal/nettests/ndt.go @@ -7,9 +7,9 @@ type NDT struct { // Run starts the test func (n NDT) Run(ctl *Controller) error { // Since 2020-03-18 probe-engine exports v7 as "ndt". - builder, err := ctl.Session.NewExperimentBuilder("ndt") - if err != nil { - return err - } - return ctl.Run(builder, []string{""}) + return ctl.Run( + "ndt", + "", // TODO(bassosimone) + []string{""}, + ) } diff --git a/cmd/ooniprobe/internal/nettests/nettests.go b/cmd/ooniprobe/internal/nettests/nettests.go index bee5be39f7..aac6f95248 100644 --- a/cmd/ooniprobe/internal/nettests/nettests.go +++ b/cmd/ooniprobe/internal/nettests/nettests.go @@ -3,14 +3,16 @@ package nettests import ( "context" "database/sql" + "encoding/json" "fmt" + "os" "time" "github.com/apex/log" "github.com/fatih/color" "github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/ooni" "github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/output" - engine "github.com/ooni/probe-cli/v3/internal/engine" + "github.com/ooni/probe-cli/v3/internal/miniengine" "github.com/ooni/probe-cli/v3/internal/model" "github.com/pkg/errors" ) @@ -22,7 +24,7 @@ type Nettest interface { // NewController creates a nettest controller func NewController( - nt Nettest, probe *ooni.Probe, res *model.DatabaseResult, sess *engine.Session) *Controller { + nt Nettest, probe *ooni.Probe, res *model.DatabaseResult, sess *miniengine.Session) *Controller { return &Controller{ Probe: probe, nt: nt, @@ -35,7 +37,7 @@ func NewController( // each nettest instance has one controller type Controller struct { Probe *ooni.Probe - Session *engine.Session + Session *miniengine.Session res *model.DatabaseResult nt Nettest ntCount int @@ -115,22 +117,19 @@ func (c *Controller) SetNettestIndex(i, n int) { c.ntIndex = i } +var _ model.ExperimentCallbacks = &Controller{} + // Run runs the selected nettest using the related experiment // with the specified inputs. // // This function will continue to run in most cases but will // immediately halt if something's wrong with the file system. -func (c *Controller) Run(builder model.ExperimentBuilder, inputs []string) error { +func (c *Controller) Run( + experimentName string, + checkInReportID string, + inputs []string, +) error { db := c.Probe.DB() - // This will configure the controller as handler for the callbacks - // called by ooni/probe-engine/experiment.Experiment. - builder.SetCallbacks(model.ExperimentCallbacks(c)) - c.numInputs = len(inputs) - exp := builder.NewExperiment() - defer func() { - c.res.DataUsageDown += exp.KibiBytesReceived() - c.res.DataUsageUp += exp.KibiBytesSent() - }() c.msmts = make(map[int64]*model.DatabaseMeasurement) @@ -141,15 +140,9 @@ func (c *Controller) Run(builder model.ExperimentBuilder, inputs []string) error log.Debug(color.RedString("status.queued")) log.Debug(color.RedString("status.started")) - if c.Probe.Config().Sharing.UploadResults { - if err := exp.OpenReportContext(context.Background()); err != nil { - log.Debugf( - "%s: %s", color.RedString("failure.report_create"), err.Error(), - ) - } else { - log.Debugf(color.RedString("status.report_create")) - reportID = sql.NullString{String: exp.ReportID(), Valid: true} - } + canSubmit := c.Probe.Config().Sharing.UploadResults && checkInReportID != "" + if canSubmit { + reportID = sql.NullString{String: checkInReportID, Valid: true} } maxRuntime := time.Duration(c.Probe.Config().Nettests.WebsitesMaxRuntime) * time.Second @@ -186,7 +179,7 @@ func (c *Controller) Run(builder model.ExperimentBuilder, inputs []string) error } msmt, err := db.CreateMeasurement( - reportID, exp.Name(), c.res.MeasurementDir, idx, resultID, urlID, + reportID, experimentName, c.res.MeasurementDir, idx, resultID, urlID, ) if err != nil { return errors.Wrap(err, "failed to create measurement") @@ -196,7 +189,10 @@ func (c *Controller) Run(builder model.ExperimentBuilder, inputs []string) error if input != "" { c.OnProgress(0, fmt.Sprintf("processing input: %s", input)) } - measurement, err := exp.MeasureWithContext(context.Background(), input) + options := make(map[string]any) + measurementTask := c.Session.Measure(context.Background(), experimentName, options, input) + awaitTask(measurementTask, c) + measurementResult, err := measurementTask.Result() if err != nil { log.WithError(err).Debug(color.RedString("failure.measurement")) if err := db.Failed(c.msmts[idx64], err.Error()); err != nil { @@ -212,12 +208,21 @@ func (c *Controller) Run(builder model.ExperimentBuilder, inputs []string) error continue } + // update the data usage counters + c.res.DataUsageDown += measurementResult.KibiBytesReceived + c.res.DataUsageUp += measurementResult.KibiBytesSent + + // set the measurement's reportID + measurementResult.Measurement.ReportID = checkInReportID + saveToDisk := true - if c.Probe.Config().Sharing.UploadResults { + if canSubmit { // Implementation note: SubmitMeasurement will fail here if we did fail // to open the report but we still want to continue. There will be a // bit of a spew in the logs, perhaps, but stopping seems less efficient. - if err := exp.SubmitAndUpdateMeasurementContext(context.Background(), measurement); err != nil { + submitTask := c.Session.Submit(context.Background(), measurementResult.Measurement) + awaitTask(submitTask, model.NewPrinterCallbacks(taskLogger)) + if _, err := submitTask.Result(); err != nil { log.Debug(color.RedString("failure.measurement_submission")) if err := db.UploadFailed(c.msmts[idx64], err.Error()); err != nil { return errors.Wrap(err, "failed to mark upload as failed") @@ -231,35 +236,36 @@ func (c *Controller) Run(builder model.ExperimentBuilder, inputs []string) error } // We only save the measurement to disk if we failed to upload the measurement if saveToDisk { - if err := exp.SaveMeasurement(measurement, msmt.MeasurementFilePath.String); err != nil { + if err := c.saveMeasurement(measurementResult.Measurement, msmt.MeasurementFilePath.String); err != nil { return errors.Wrap(err, "failed to save measurement on disk") } } + // make the measurement as done if err := db.Done(c.msmts[idx64]); err != nil { return errors.Wrap(err, "failed to mark measurement as done") } - // We're not sure whether it's enough to log the error or we should - // instead also mark the measurement as failed. Strictly speaking this - // is an inconsistency between the code that generate the measurement - // and the code that process the measurement. We do have some data - // but we're not gonna have a summary. To be reconsidered. - tk, err := exp.GetSummaryKeys(measurement) - if err != nil { - log.WithError(err).Error("failed to obtain testKeys") - continue - } - log.Debugf("Fetching: %d %v", idx, c.msmts[idx64]) - if err := db.AddTestKeys(c.msmts[idx64], tk); err != nil { + // write the measurement summary into the database + if err := db.AddTestKeys(c.msmts[idx64], measurementResult.Summary); err != nil { return errors.Wrap(err, "failed to add test keys to summary") } } + db.UpdateUploadedStatus(c.res) log.Debugf("status.end") return nil } +// saveMeasurement saves a measurement to disk +func (c *Controller) saveMeasurement(meas *model.Measurement, filepath string) error { + data, err := json.Marshal(meas) + if err != nil { + return err + } + return os.WriteFile(filepath, data, 0600) +} + // OnProgress should be called when a new progress event is available. func (c *Controller) OnProgress(perc float64, msg string) { // when we have maxRuntime, honor it diff --git a/cmd/ooniprobe/internal/nettests/psiphon.go b/cmd/ooniprobe/internal/nettests/psiphon.go index 940340cc4e..d22e195668 100644 --- a/cmd/ooniprobe/internal/nettests/psiphon.go +++ b/cmd/ooniprobe/internal/nettests/psiphon.go @@ -6,11 +6,9 @@ type Psiphon struct { // Run starts the test func (h Psiphon) Run(ctl *Controller) error { - builder, err := ctl.Session.NewExperimentBuilder( + return ctl.Run( "psiphon", + "", // TODO(bassosimone) + []string{""}, ) - if err != nil { - return err - } - return ctl.Run(builder, []string{""}) } diff --git a/cmd/ooniprobe/internal/nettests/riseupvpn.go b/cmd/ooniprobe/internal/nettests/riseupvpn.go index 185fbcefe4..f87b2701f5 100644 --- a/cmd/ooniprobe/internal/nettests/riseupvpn.go +++ b/cmd/ooniprobe/internal/nettests/riseupvpn.go @@ -6,12 +6,9 @@ type RiseupVPN struct { // Run starts the test func (h RiseupVPN) Run(ctl *Controller) error { - builder, err := ctl.Session.NewExperimentBuilder( + return ctl.Run( "riseupvpn", + "", // TODO(bassosimone) + []string{""}, ) - if err != nil { - return err - } - - return ctl.Run(builder, []string{""}) } diff --git a/cmd/ooniprobe/internal/nettests/run.go b/cmd/ooniprobe/internal/nettests/run.go index 25385a647e..0e34179294 100644 --- a/cmd/ooniprobe/internal/nettests/run.go +++ b/cmd/ooniprobe/internal/nettests/run.go @@ -8,7 +8,9 @@ import ( "github.com/apex/log" "github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/ooni" + "github.com/ooni/probe-cli/v3/internal/miniengine" "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/runtimex" "github.com/pkg/errors" ) @@ -59,28 +61,45 @@ func RunGroup(config RunGroupConfig) error { return nil } - sess, err := config.Probe.NewSession(context.Background(), config.RunType) + // create a measurement session + sessConfig := config.Probe.NewSessionConfig(config.RunType) + sess, err := miniengine.NewSession(sessConfig) if err != nil { log.WithError(err).Error("Failed to create a measurement session") return err } defer sess.Close() - err = sess.MaybeLookupLocation() - if err != nil { - log.WithError(err).Error("Failed to lookup the location of the probe") + // bootstrap the measurement session + log.Debugf("Enabled category codes are the following %v", config.Probe.Config().Nettests.WebsitesEnabledCategoryCodes) + bootstrapConfig := &miniengine.BootstrapConfig{ + BackendURL: "", + CategoryCodes: config.Probe.Config().Nettests.WebsitesEnabledCategoryCodes, + Charging: true, + OnWiFi: true, + ProxyURL: config.Probe.ProxyURL(), + RunType: config.RunType, + SnowflakeRendezvousMethod: "", + TorArgs: []string{}, + TorBinary: "", + } + bootstrapTask := sess.Bootstrap(context.Background(), bootstrapConfig) + awaitTask(bootstrapTask, model.NewPrinterCallbacks(taskLogger)) + if _, err := bootstrapTask.Result(); err != nil { + log.WithError(err).Error("Failed to bootstrap a measurement session") return err } + + // obtain the probe location + location := runtimex.Try1(sess.GeolocateResult()) + + // create the corresponding network inside the database db := config.Probe.DB() - network, err := db.CreateNetwork(sess) + network, err := db.CreateNetwork(location) if err != nil { log.WithError(err).Error("Failed to create the network row") return err } - if err := sess.MaybeLookupBackends(); err != nil { - log.WithError(err).Warn("Failed to discover OONI backends") - return err - } group, ok := All[config.GroupName] if !ok { diff --git a/cmd/ooniprobe/internal/nettests/signal.go b/cmd/ooniprobe/internal/nettests/signal.go index 3a2df6b874..3f95d57620 100644 --- a/cmd/ooniprobe/internal/nettests/signal.go +++ b/cmd/ooniprobe/internal/nettests/signal.go @@ -5,11 +5,9 @@ type Signal struct{} // Run starts the nettest. func (h Signal) Run(ctl *Controller) error { - builder, err := ctl.Session.NewExperimentBuilder( + return ctl.Run( "signal", + "", // TODO(bassosimone) + []string{""}, ) - if err != nil { - return err - } - return ctl.Run(builder, []string{""}) } diff --git a/cmd/ooniprobe/internal/nettests/stunreachability.go b/cmd/ooniprobe/internal/nettests/stunreachability.go index 186bb7bb98..f8fa58377b 100644 --- a/cmd/ooniprobe/internal/nettests/stunreachability.go +++ b/cmd/ooniprobe/internal/nettests/stunreachability.go @@ -1,42 +1,13 @@ package nettests import ( - "context" - - engine "github.com/ooni/probe-cli/v3/internal/engine" - "github.com/ooni/probe-cli/v3/internal/model" + "errors" ) // STUNReachability nettest implementation. type STUNReachability struct{} -func (n STUNReachability) lookupURLs(ctl *Controller) ([]string, error) { - inputloader := &engine.InputLoader{ - CheckInConfig: &model.OOAPICheckInConfig{ - // not needed because we have default static input in the engine - }, - ExperimentName: "stunreachability", - InputPolicy: model.InputOrStaticDefault, - Session: ctl.Session, - SourceFiles: ctl.InputFiles, - StaticInputs: ctl.Inputs, - } - testlist, err := inputloader.Load(context.Background()) - if err != nil { - return nil, err - } - return ctl.BuildAndSetInputIdxMap(testlist) -} - // Run starts the nettest. func (n STUNReachability) Run(ctl *Controller) error { - builder, err := ctl.Session.NewExperimentBuilder("stunreachability") - if err != nil { - return err - } - urls, err := n.lookupURLs(ctl) - if err != nil { - return err - } - return ctl.Run(builder, urls) + return errors.New("not implemented") } diff --git a/cmd/ooniprobe/internal/nettests/tasks.go b/cmd/ooniprobe/internal/nettests/tasks.go new file mode 100644 index 0000000000..a242e823d0 --- /dev/null +++ b/cmd/ooniprobe/internal/nettests/tasks.go @@ -0,0 +1,55 @@ +package nettests + +import ( + "github.com/apex/log" + "github.com/ooni/probe-cli/v3/internal/miniengine" + "github.com/ooni/probe-cli/v3/internal/model" +) + +// taskLogger is the logger used for logging tasks. +var taskLogger = log.WithFields(log.Fields{ + "type": "engine", +}) + +// runningTask is a [miniengine.Task] that is still running. +type runningTask interface { + Done() <-chan any + Events() <-chan *miniengine.Event +} + +// TODO(bassosimone): we need to set the verbosity + +// processTaskEvent processes an event emitted by a runingTask. +func processTaskEvent(callbacks model.ExperimentCallbacks, ev *miniengine.Event) { + switch ev.EventType { + case miniengine.EventTypeDebug: + taskLogger.Debug(ev.Message) + case miniengine.EventTypeInfo: + taskLogger.Info(ev.Message) + case miniengine.EventTypeProgress: + callbacks.OnProgress(ev.Progress, ev.Message) + case miniengine.EventTypeWarning: + taskLogger.Warn(ev.Message) + default: + taskLogger.Warnf("UNHANDLED EVENT: %+v", ev) + } +} + +// awaitTask waits for the given runningTask to terminate. +func awaitTask(task runningTask, callbacks model.ExperimentCallbacks) { + for { + select { + case <-task.Done(): + for { + select { + case ev := <-task.Events(): + processTaskEvent(callbacks, ev) + default: + return + } + } + case ev := <-task.Events(): + processTaskEvent(callbacks, ev) + } + } +} diff --git a/cmd/ooniprobe/internal/nettests/telegram.go b/cmd/ooniprobe/internal/nettests/telegram.go index 82d75d82c2..6a0ee3334b 100644 --- a/cmd/ooniprobe/internal/nettests/telegram.go +++ b/cmd/ooniprobe/internal/nettests/telegram.go @@ -6,11 +6,9 @@ type Telegram struct { // Run starts the test func (h Telegram) Run(ctl *Controller) error { - builder, err := ctl.Session.NewExperimentBuilder( + return ctl.Run( "telegram", + "", // TODO(bassosimone) + []string{""}, ) - if err != nil { - return err - } - return ctl.Run(builder, []string{""}) } diff --git a/cmd/ooniprobe/internal/nettests/tor.go b/cmd/ooniprobe/internal/nettests/tor.go index 96bfbb7d2f..307a73278b 100644 --- a/cmd/ooniprobe/internal/nettests/tor.go +++ b/cmd/ooniprobe/internal/nettests/tor.go @@ -6,11 +6,9 @@ type Tor struct { // Run starts the test func (h Tor) Run(ctl *Controller) error { - builder, err := ctl.Session.NewExperimentBuilder( + return ctl.Run( "tor", + "", // TODO(bassosimone) + []string{""}, ) - if err != nil { - return err - } - return ctl.Run(builder, []string{""}) } diff --git a/cmd/ooniprobe/internal/nettests/torsf.go b/cmd/ooniprobe/internal/nettests/torsf.go index 494dae4743..02db3550a7 100644 --- a/cmd/ooniprobe/internal/nettests/torsf.go +++ b/cmd/ooniprobe/internal/nettests/torsf.go @@ -6,11 +6,11 @@ type TorSf struct { // Run starts the test func (h TorSf) Run(ctl *Controller) error { - builder, err := ctl.Session.NewExperimentBuilder("torsf") - if err != nil { - return err - } - return ctl.Run(builder, []string{""}) + return ctl.Run( + "torsf", + "", // TODO(bassosimone) + []string{""}, + ) } func (h TorSf) onlyBackground() {} diff --git a/cmd/ooniprobe/internal/nettests/vanillator.go b/cmd/ooniprobe/internal/nettests/vanillator.go index 11d7266549..2aca20ac2d 100644 --- a/cmd/ooniprobe/internal/nettests/vanillator.go +++ b/cmd/ooniprobe/internal/nettests/vanillator.go @@ -6,11 +6,11 @@ type VanillaTor struct { // Run starts the test func (h VanillaTor) Run(ctl *Controller) error { - builder, err := ctl.Session.NewExperimentBuilder("vanilla_tor") - if err != nil { - return err - } - return ctl.Run(builder, []string{""}) + return ctl.Run( + "vanilla_tor", + "", // TODO(bassosimone) + []string{""}, + ) } func (h VanillaTor) onlyBackground() {} diff --git a/cmd/ooniprobe/internal/nettests/web_connectivity.go b/cmd/ooniprobe/internal/nettests/web_connectivity.go index fc643c0091..8a44f27202 100644 --- a/cmd/ooniprobe/internal/nettests/web_connectivity.go +++ b/cmd/ooniprobe/internal/nettests/web_connectivity.go @@ -1,52 +1,28 @@ package nettests import ( - "context" - - "github.com/apex/log" - engine "github.com/ooni/probe-cli/v3/internal/engine" - "github.com/ooni/probe-cli/v3/internal/model" + "errors" ) -func (n WebConnectivity) lookupURLs(ctl *Controller, categories []string) ([]string, error) { - inputloader := &engine.InputLoader{ - CheckInConfig: &model.OOAPICheckInConfig{ - // Setting Charging and OnWiFi to true causes the CheckIn - // API to return to us as much URL as possible with the - // given RunType hint. - Charging: true, - OnWiFi: true, - RunType: ctl.RunType, - WebConnectivity: model.OOAPICheckInConfigWebConnectivity{ - CategoryCodes: categories, - }, - }, - ExperimentName: "web_connectivity", - InputPolicy: model.InputOrQueryBackend, - Session: ctl.Session, - SourceFiles: ctl.InputFiles, - StaticInputs: ctl.Inputs, - } - testlist, err := inputloader.Load(context.Background()) - if err != nil { - return nil, err - } - return ctl.BuildAndSetInputIdxMap(testlist) -} - // WebConnectivity test implementation type WebConnectivity struct{} // Run starts the test func (n WebConnectivity) Run(ctl *Controller) error { - log.Debugf("Enabled category codes are the following %v", ctl.Probe.Config().Nettests.WebsitesEnabledCategoryCodes) - urls, err := n.lookupURLs(ctl, ctl.Probe.Config().Nettests.WebsitesEnabledCategoryCodes) + results, err := ctl.Session.CheckInResult() if err != nil { return err } - builder, err := ctl.Session.NewExperimentBuilder("web_connectivity") + if results.Tests.WebConnectivity == nil { + return errors.New("no web_connectivity data") + } + urls, err := ctl.BuildAndSetInputIdxMap(results.Tests.WebConnectivity.URLs) if err != nil { return err } - return ctl.Run(builder, urls) + return ctl.Run( + "web_connectivity", + results.Tests.WebConnectivity.ReportID, + urls, + ) } diff --git a/cmd/ooniprobe/internal/nettests/whatsapp.go b/cmd/ooniprobe/internal/nettests/whatsapp.go index 4660abe006..682faa7b23 100644 --- a/cmd/ooniprobe/internal/nettests/whatsapp.go +++ b/cmd/ooniprobe/internal/nettests/whatsapp.go @@ -6,11 +6,9 @@ type WhatsApp struct { // Run starts the test func (h WhatsApp) Run(ctl *Controller) error { - builder, err := ctl.Session.NewExperimentBuilder( + return ctl.Run( "whatsapp", + "", // TODO(bassosimone) + []string{""}, ) - if err != nil { - return err - } - return ctl.Run(builder, []string{""}) } diff --git a/cmd/ooniprobe/internal/ooni/ooni.go b/cmd/ooniprobe/internal/ooni/ooni.go index dcffdab6c4..3c18218742 100644 --- a/cmd/ooniprobe/internal/ooni/ooni.go +++ b/cmd/ooniprobe/internal/ooni/ooni.go @@ -3,7 +3,6 @@ package ooni import ( "context" _ "embed" // because we embed a file - "io/ioutil" "net/url" "os" "os/signal" @@ -14,9 +13,8 @@ import ( "github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/config" "github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/utils" "github.com/ooni/probe-cli/v3/internal/database" - "github.com/ooni/probe-cli/v3/internal/engine" - "github.com/ooni/probe-cli/v3/internal/kvstore" "github.com/ooni/probe-cli/v3/internal/legacy/assetsdir" + "github.com/ooni/probe-cli/v3/internal/miniengine" "github.com/ooni/probe-cli/v3/internal/model" "github.com/pkg/errors" ) @@ -24,11 +22,6 @@ import ( // DefaultSoftwareName is the default software name. const DefaultSoftwareName = "ooniprobe-cli" -// logger is the logger used by the engine. -var logger = log.WithFields(log.Fields{ - "type": "engine", -}) - // ProbeCLI is the OONI Probe CLI context. type ProbeCLI interface { Config() *config.Config @@ -110,6 +103,14 @@ func (p *Probe) Terminate() { p.isTerminated.Add(1) } +// ProxyURL returns the configured proxy URL +func (p *Probe) ProxyURL() string { + if p.proxyURL != nil { + return p.proxyURL.String() + } + return "" +} + // ListenForSignals will listen for SIGINT and SIGTERM. When it receives those // signals it will set isTerminatedAtomicInt to non-zero, which will cleanly // shutdown the test logic. @@ -193,7 +194,7 @@ func (p *Probe) Init(softwareName, softwareVersion, proxy string) error { // the return value as it does not matter to us here. _, _ = assetsdir.Cleanup(utils.AssetsDir(p.home)) - tempDir, err := ioutil.TempDir("", "ooni") + tempDir, err := os.MkdirTemp("", "ooni") if err != nil { return errors.Wrap(err, "creating TempDir") } @@ -211,19 +212,8 @@ func (p *Probe) Init(softwareName, softwareVersion, proxy string) error { return nil } -// NewSession creates a new ooni/probe-engine session using the -// current configuration inside the context. The caller must close -// the session when done using it, by calling sess.Close(). -func (p *Probe) NewSession(ctx context.Context, runType model.RunType) (*engine.Session, error) { - kvstore, err := kvstore.NewFS( - utils.EngineDir(p.home), - ) - if err != nil { - return nil, errors.Wrap(err, "creating engine's kvstore") - } - if err := os.MkdirAll(p.tunnelDir, 0700); err != nil { - return nil, errors.Wrap(err, "creating tunnel dir") - } +// NewSessionConfig creates a new [miniengine.SessionConfig]. +func (p *Probe) NewSessionConfig(runType model.RunType) *miniengine.SessionConfig { // When the software name is the default software name and we're running // in unattended mode, adjust the software name accordingly. // @@ -232,24 +222,14 @@ func (p *Probe) NewSession(ctx context.Context, runType model.RunType) (*engine. if runType == model.RunTypeTimed && softwareName == DefaultSoftwareName { softwareName = DefaultSoftwareName + "-unattended" } - return engine.NewSession(ctx, engine.SessionConfig{ - KVStore: kvstore, - Logger: logger, + return &miniengine.SessionConfig{ SoftwareName: softwareName, SoftwareVersion: p.softwareVersion, + StateDir: utils.EngineDir(p.home), TempDir: p.tempDir, TunnelDir: p.tunnelDir, - ProxyURL: p.proxyURL, - }) -} - -// NewProbeEngine creates a new ProbeEngine instance. -func (p *Probe) NewProbeEngine(ctx context.Context, runType model.RunType) (ProbeEngine, error) { - sess, err := p.NewSession(ctx, runType) - if err != nil { - return nil, err + Verbose: false, } - return sess, nil } // NewProbe creates a new probe instance. diff --git a/cmd/ooniprobe/internal/ooni/ooni_test.go b/cmd/ooniprobe/internal/ooni/ooni_test.go index 4f534c7f52..60b441323c 100644 --- a/cmd/ooniprobe/internal/ooni/ooni_test.go +++ b/cmd/ooniprobe/internal/ooni/ooni_test.go @@ -1,14 +1,13 @@ package ooni import ( - "io/ioutil" "os" "path" "testing" ) func TestInit(t *testing.T) { - ooniHome, err := ioutil.TempDir("", "oonihome") + ooniHome, err := os.MkdirTemp("", "oonihome") if err != nil { t.Fatal(err) } diff --git a/internal/cmd/exp/main.go b/internal/cmd/exp/main.go new file mode 100644 index 0000000000..70e9820bb2 --- /dev/null +++ b/internal/cmd/exp/main.go @@ -0,0 +1,117 @@ +package main + +// +// Demonstrates using the fundamental OONI Engine API +// + +import ( + "context" + "path/filepath" + + "github.com/apex/log" + "github.com/ooni/probe-cli/v3/internal/miniengine" + "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/runtimex" +) + +// awaitableTask is a [miniengine.Task] that we can await and for which +// we can obtain the interim events while it's running. +type awaitableTask interface { + Done() <-chan any + Events() <-chan *miniengine.Event +} + +// awaitTask awaits for the task to be done and emits interim events +func awaitTask(task awaitableTask) { + for { + select { + case <-task.Done(): + return + case ev := <-task.Events(): + switch ev.EventType { + case miniengine.EventTypeProgress: + log.Infof("PROGRESS %f %s", ev.Progress, ev.Message) + case miniengine.EventTypeInfo: + log.Infof("%s", ev.Message) + case miniengine.EventTypeWarning: + log.Warnf("%s", ev.Message) + case miniengine.EventTypeDebug: + log.Debugf("%s", ev.Message) + } + } + } +} + +func main() { + log.SetLevel(log.DebugLevel) + ctx := context.Background() + + // create session config + sessionConfig := &miniengine.SessionConfig{ + SoftwareName: "miniooni", + SoftwareVersion: "0.1.0-dev", + StateDir: filepath.Join("x", "state"), + TempDir: filepath.Join("x", "tmp"), + TunnelDir: filepath.Join("x", "tunnel"), + Verbose: false, + } + + // create session + sess := runtimex.Try1(miniengine.NewSession(sessionConfig)) + defer sess.Close() + + // create the bootstrap config + bootstrapConfig := &miniengine.BootstrapConfig{ + BackendURL: "", + CategoryCodes: []string{}, + Charging: false, + OnWiFi: false, + ProxyURL: "", + RunType: model.RunTypeTimed, + SnowflakeRendezvousMethod: "", + TorArgs: []string{}, + TorBinary: "", + } + + // bootstrap the session + bootstrapTask := sess.Bootstrap(ctx, bootstrapConfig) + awaitTask(bootstrapTask) + _ = runtimex.Try1(bootstrapTask.Result()) + + // obtain the probe geolocation + location := runtimex.Try1(sess.GeolocateResult()) + log.Infof("%+v", location) + + // obtain the check-in API response + checkInResult := runtimex.Try1(sess.CheckInResult()) + log.Infof("%+v", checkInResult) + + // obtain check-in information for the web connectivity experiment + runtimex.Assert(checkInResult.Tests.WebConnectivity != nil, "nil WebConnectivity") + webConnectivity := checkInResult.Tests.WebConnectivity + + log.Infof("report ID: %s", webConnectivity.ReportID) + + // measure and submit all the URLs + for _, entry := range webConnectivity.URLs { + // perform the measurement + options := make(map[string]any) + measurementTask := sess.Measure(ctx, "web_connectivity", options, entry.URL) + awaitTask(measurementTask) + measurementResult := runtimex.Try1(measurementTask.Result()) + log.Infof("%+v", measurementResult) + + // set the report ID + measurementResult.Measurement.ReportID = webConnectivity.ReportID + + // submit the measurement + submitTask := sess.Submit(ctx, measurementResult.Measurement) + awaitTask(submitTask) + _ = runtimex.Try1(submitTask.Result()) + log.Infof( + "https://explorer.ooni.org/measurement/%s?input=%s", + webConnectivity.ReportID, + entry.URL, + ) + } +} diff --git a/internal/database/actions.go b/internal/database/actions.go index ac6bfa390d..7d2f841f65 100644 --- a/internal/database/actions.go +++ b/internal/database/actions.go @@ -12,6 +12,7 @@ import ( "time" "github.com/apex/log" + "github.com/ooni/probe-cli/v3/internal/miniengine" "github.com/ooni/probe-cli/v3/internal/model" "github.com/pkg/errors" "github.com/upper/db/v4" @@ -33,8 +34,6 @@ type Database struct { sess db.Session } -var _ model.WritableDatabase = &Database{} - // Session implements Writable/ReadableDatabase.Session func (d *Database) Session() db.Session { return d.sess @@ -293,14 +292,14 @@ func (d *Database) CreateResult(homePath string, testGroupName string, networkID } // CreateNetwork implements WritableDatabase.CreateNetwork -func (d *Database) CreateNetwork(loc model.LocationProvider) (*model.DatabaseNetwork, error) { +func (d *Database) CreateNetwork(loc *miniengine.Location) (*model.DatabaseNetwork, error) { network := model.DatabaseNetwork{ - ASN: loc.ProbeASN(), - CountryCode: loc.ProbeCC(), - NetworkName: loc.ProbeNetworkName(), + ASN: uint(loc.ProbeASN), + CountryCode: loc.ProbeCC, + NetworkName: loc.ProbeNetworkName, // On desktop we consider it to always be wifi NetworkType: "wifi", - IP: loc.ProbeIP(), + IP: loc.ProbeIP, } newID, err := d.sess.Collection("networks").Insert(network) if err != nil { diff --git a/internal/miniengine/bootstrap.go b/internal/miniengine/bootstrap.go new file mode 100644 index 0000000000..1e42622009 --- /dev/null +++ b/internal/miniengine/bootstrap.go @@ -0,0 +1,272 @@ +package miniengine + +// +// The "bootstrap" task +// + +import ( + "net/url" + + "github.com/ooni/probe-cli/v3/internal/engine" + "github.com/ooni/probe-cli/v3/internal/kvstore" + "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/optional" + "github.com/ooni/probe-cli/v3/internal/platform" + "github.com/ooni/probe-cli/v3/internal/probeservices" + "golang.org/x/net/context" +) + +// BootstrapConfig contains the config for [Session.Bootstrap]. The zero value +// is invalid; please fill all fields marked as MANDATORY. +type BootstrapConfig struct { + // BackendURL allows you to OPTIONALLY force the + // usage of a specific OONI backend instance. + BackendURL string `json:"backend_url"` + + // CategoryCodes contains OPTIONAL category codes for the check-in API. + CategoryCodes []string `json:"category_codes"` + + // Charging is the OPTIONAL charging hint for the check-in API. + Charging bool `json:"charging"` + + // OnWiFi is the OPTIONAL on-wifi hint for the check-in API. + OnWiFi bool `json:"on_wifi"` + + // ProxyURL allows you to OPTIONALLY force a specific proxy + // rather than using no proxy (the default). + // + // Use `psiphon:///` to force using Psiphon with the + // embedded configuration file. Not all builds have + // an embedded configuration file, but OONI builds have + // such a file, so they can use this functionality. + // + // Use `tor:///` and `torsf:///` to respectively use vanilla tor + // and tor plus snowflake as tunnels. + // + // Use `socks5://127.0.0.1:9050/` to connect to a SOCKS5 + // proxy running on 127.0.0.1:9050. This could be, for example, + // a suitably configured `tor` instance. + ProxyURL string `json:"proxy_url"` + + // RunType is the MANDATORY run-type for the check-in API. + RunType model.RunType `json:"run_type"` + + // SnowflakeRendezvousMethod OPTIONALLY allows you to specify + // which snowflake rendezvous method to use. Valid methods to use + // here are "amp" and "domain_fronting". + SnowflakeRendezvousMethod string `json:"snowflake_rendezvous_method"` + + // TorArgs contains OPTIONAL arguments to pass to the "tor" binary + // when ProxyURL is `tor:///` or `torsf:///`. + TorArgs []string `json:"tor_args"` + + // TorBinary is the OPTIONAL "tor" binary to use. When using this code + // on mobile devices, we link with tor directly, so there is no need to + // specify this argument when running on a mobile device. + TorBinary string `json:"tor_binary"` +} + +// TODO(bassosimone): rather than having calls that return the geolocation and +// the result of the check-in, we should modify Bootstrap to return something +// like a Task[BootstrapResult] that contains both. The Bootstrap will still be +// idempotent and short circuit already existing results if they are available. +// +// By doing that, we would simplify the corresponding C API. + +// Bootstrap ensures that we bootstrap the [Session]. This function +// is safe to call multiple times. We'll only bootstrap on the first +// invocation and do nothing for subsequent invocations. +func (s *Session) Bootstrap(ctx context.Context, config *BootstrapConfig) *Task[Void] { + task := &Task[Void]{ + done: make(chan any), + events: s.emitter, + failure: nil, + result: Void{}, + } + go s.bootstrapAsync(ctx, config, task) + return task +} + +// bootstrapAsync runs the bootstrap in a background goroutine. +func (s *Session) bootstrapAsync(ctx context.Context, config *BootstrapConfig, task *Task[Void]) { + // synchronize with Task.Result + defer close(task.done) + + // make the whole operation locked with respect to s + defer s.mu.Unlock() + s.mu.Lock() + + // handle the case where bootstrap already occurred while we were locked + if !s.state.IsNone() { + return + } + + // perform a sync bootstrap + err := s.bootstrapSyncLocked(ctx, config) + + // pass result to the caller + task.failure = err +} + +// bootstrapSyncLocked executes a synchronous bootstrap. This function MUST be +// run while holding the s.mu mutex because it mutates s. +func (s *Session) bootstrapSyncLocked(ctx context.Context, config *BootstrapConfig) error { + // create a new instance of the [engineSessionState] type. + ess, err := s.newEngineSessionState(ctx, config) + if err != nil { + return err + } + + // MUTATE s to store the state + s.state = optional.Some(ess) + return nil +} + +// newEngineSessionState creates a new instance of [engineSessionState]. +func (s *Session) newEngineSessionState( + ctx context.Context, config *BootstrapConfig) (*engineSessionState, error) { + // create configuration for [engine.NewSession] + engineConfig, err := s.newEngineSessionConfig(config) + if err != nil { + return nil, err + } + + // create a new underlying session instance + child, err := engine.NewSession(ctx, *engineConfig) + if err != nil { + return nil, err + } + + // create a probeservices client + psc, err := child.NewProbeServicesClient(ctx) + if err != nil { + child.Close() + return nil, err + } + + // geolocate the probe. + location, err := s.geolocate(ctx, child) + if err != nil { + child.Close() + return nil, err + } + + // lookup the available backends. + if err := child.MaybeLookupBackendsContext(ctx); err != nil { + child.Close() + return nil, err + } + + // call the check-in API. + resp, err := s.checkIn(ctx, location, child, psc, config) + if err != nil { + child.Close() + return nil, err + } + + // create [engineSessionState] + ess := &engineSessionState{ + checkIn: resp, + geoloc: location, + psc: psc, + sess: child, + } + return ess, nil +} + +// geolocate performs the geolocation during the bootstrap. +func (s *Session) geolocate(ctx context.Context, sess *engine.Session) (*Location, error) { + // perform geolocation and handle failure + if err := sess.MaybeLookupLocationContext(ctx); err != nil { + return nil, err + } + + // copy the result of the geolocation + location := &Location{ + ProbeASN: int64(sess.ProbeASN()), + ProbeASNString: sess.ProbeASNString(), + ProbeCC: sess.ProbeCC(), + ProbeNetworkName: sess.ProbeNetworkName(), + ProbeIP: sess.ProbeIP(), + ResolverASN: int64(sess.ResolverASN()), + ResolverASNString: sess.ResolverASNString(), + ResolverIP: sess.ResolverIP(), + ResolverNetworkName: sess.ResolverNetworkName(), + } + return location, nil +} + +// checkIn invokes the checkIn API. +func (s *Session) checkIn( + ctx context.Context, + location *Location, + sess *engine.Session, + psc *probeservices.Client, + config *BootstrapConfig, +) (*model.OOAPICheckInResult, error) { + categoryCodes := config.CategoryCodes + if len(categoryCodes) <= 0 { + // make sure it not nil because this would + // actually break the check-in API + categoryCodes = []string{} + } + apiConfig := model.OOAPICheckInConfig{ + Charging: config.Charging, + OnWiFi: config.OnWiFi, + Platform: platform.Name(), + ProbeASN: location.ProbeASNString, + ProbeCC: location.ProbeCC, + RunType: config.RunType, + SoftwareName: sess.SoftwareName(), + SoftwareVersion: sess.SoftwareVersion(), + WebConnectivity: model.OOAPICheckInConfigWebConnectivity{ + CategoryCodes: categoryCodes, + }, + } + return psc.CheckIn(ctx, apiConfig) +} + +// newEngineSessionConfig creates a new [engine.SessionConfig] instance. +func (s *Session) newEngineSessionConfig(config *BootstrapConfig) (*engine.SessionConfig, error) { + // create keyvalue store inside the user provided stateDir. + kvstore, err := kvstore.NewFS(s.stateDir) + if err != nil { + return nil, err + } + + // honor user-provided backend service, if any. + var availableps []model.OOAPIService + if config.BackendURL != "" { + availableps = append(availableps, model.OOAPIService{ + Address: config.BackendURL, + Type: "https", + }) + } + + // honor user-provided proxy, if any. + var proxyURL *url.URL + if config.ProxyURL != "" { + var err error + proxyURL, err = url.Parse(config.ProxyURL) + if err != nil { + return nil, err + } + } + + // create the underlying session using the [engine] package. + engineConfig := &engine.SessionConfig{ + AvailableProbeServices: availableps, + KVStore: kvstore, + Logger: s.logger, + ProxyURL: proxyURL, + SoftwareName: s.softwareName, + SoftwareVersion: s.softwareVersion, + TempDir: s.tempDir, + TorArgs: config.TorArgs, + TorBinary: config.TorBinary, + SnowflakeRendezvous: config.SnowflakeRendezvousMethod, + TunnelDir: s.tunnelDir, + } + + return engineConfig, nil +} diff --git a/internal/miniengine/callbacks.go b/internal/miniengine/callbacks.go new file mode 100644 index 0000000000..47ee11583a --- /dev/null +++ b/internal/miniengine/callbacks.go @@ -0,0 +1,30 @@ +package miniengine + +// +// Measurement callbacks +// + +import "github.com/ooni/probe-cli/v3/internal/model" + +// callbacks implements [model.ExperimentCallbacks] and emits +// the callbacks events using the given channel. +type callbacks struct { + // emitter is the channel where to emit events. + emitter chan<- *Event +} + +var _ model.ExperimentCallbacks = &callbacks{} + +// OnProgress implements model.ExperimentCallbacks +func (c *callbacks) OnProgress(progress float64, message string) { + event := &Event{ + EventType: EventTypeProgress, + Message: message, + Progress: progress, + } + // Implementation note: it's fine to lose interim events + select { + case c.emitter <- event: + default: + } +} diff --git a/internal/miniengine/close.go b/internal/miniengine/close.go new file mode 100644 index 0000000000..400c71c9ee --- /dev/null +++ b/internal/miniengine/close.go @@ -0,0 +1,36 @@ +package miniengine + +// +// The "close" task +// + +import "github.com/ooni/probe-cli/v3/internal/optional" + +// TODO(bassosimone): we should refactor this code to return a Task[Void], which +// allows us to print logs while closing the session. + +// Close closes the [Session]. This function is safe to call multiple +// times. We'll close underlying resources on the first invocation and +// otherwise do nothing for subsequent invocations. +func (s *Session) Close() (err error) { + s.closeJustOnce.Do(func() { + // make sure the cleanup is synchronized. + defer s.mu.Unlock() + s.mu.Lock() + + // handle the case where there is no state. + if s.state.IsNone() { + return + } + + // obtain the underlying state + state := s.state.Unwrap() + + // replace with empty state + s.state = optional.None[*engineSessionState]() + + // close the underlying session + err = state.sess.Close() + }) + return err +} diff --git a/internal/miniengine/doc.go b/internal/miniengine/doc.go new file mode 100644 index 0000000000..d7bea4b205 --- /dev/null +++ b/internal/miniengine/doc.go @@ -0,0 +1 @@ +package miniengine diff --git a/internal/miniengine/event.go b/internal/miniengine/event.go new file mode 100644 index 0000000000..6b4f755ee9 --- /dev/null +++ b/internal/miniengine/event.go @@ -0,0 +1,29 @@ +package miniengine + +// +// Log and progress events. +// + +// EventTypeDebug is an [Event] containing a DEBUG message. +const EventTypeDebug = "DEBUG" + +// EventTypeInfo is an [Event] containing an INFO message. +const EventTypeInfo = "INFO" + +// EventTypeProgress is an [Event] containing a PROGRESS message. +const EventTypeProgress = "PROGRESS" + +// EventTypeWarning is an [Event] containing a WARNING message. +const EventTypeWarning = "WARNING" + +// Event is an interim event emitted by this implementation. +type Event struct { + // EventType is one of "DEBUG", "INFO", "PROGRESS", and "WARNING". + EventType string + + // Message is the string message. + Message string + + // Progress is the progress as a number between zero and one. + Progress float64 +} diff --git a/internal/miniengine/location.go b/internal/miniengine/location.go new file mode 100644 index 0000000000..774541ccfb --- /dev/null +++ b/internal/miniengine/location.go @@ -0,0 +1,35 @@ +package miniengine + +// +// Probe location +// + +// Location is the probe location. +type Location struct { + // ProbeASN is the probe AS number. + ProbeASN int64 `json:"probe_asn"` + + // ProbeASNString is the probe AS number as a string. + ProbeASNString string `json:"probe_asn_string"` + + // ProbeCC is the probe country code. + ProbeCC string `json:"probe_cc"` + + // ProbeNetworkName is the probe network name. + ProbeNetworkName string `json:"probe_network_name"` + + // IP is the probe IP. + ProbeIP string `json:"probe_ip"` + + // ResolverASN is the resolver ASN. + ResolverASN int64 `json:"resolver_asn"` + + // ResolverASNString is the resolver AS number as a string. + ResolverASNString string `json:"resolver_asn_string"` + + // ResolverIP is the resolver IP. + ResolverIP string `json:"resolver_ip"` + + // ResolverNetworkName is the resolver network name. + ResolverNetworkName string `json:"resolver_network_name"` +} diff --git a/internal/miniengine/logger.go b/internal/miniengine/logger.go new file mode 100644 index 0000000000..9c885ef9da --- /dev/null +++ b/internal/miniengine/logger.go @@ -0,0 +1,79 @@ +package miniengine + +// +// Emitting log messages as events. +// + +import ( + "fmt" + + "github.com/ooni/probe-cli/v3/internal/model" +) + +// loggerEmitter is a [model.Logger] and emits events using the given channel. +type loggerEmitter struct { + // emitter is the channel where to emit events. + emitter chan<- *Event + + // isVerbose indicates whether to emit debug logs. + isVerbose bool +} + +// ensure that taskLogger implements model.Logger. +var _ model.Logger = &loggerEmitter{} + +// newLoggerEmitter creates a new [loggerEmitter] instance. +func newLoggerEmitter(emitter chan<- *Event, isVerbose bool) *loggerEmitter { + return &loggerEmitter{ + emitter: emitter, + isVerbose: isVerbose, + } +} + +// Debug implements model.Logger.Debug. +func (cl *loggerEmitter) Debug(msg string) { + if cl.isVerbose { + cl.emit(EventTypeDebug, msg) + } +} + +// Debugf implements model.Logger.Debugf. +func (cl *loggerEmitter) Debugf(format string, v ...interface{}) { + if cl.isVerbose { + cl.Debug(fmt.Sprintf(format, v...)) + } +} + +// Info implements model.Logger.Info. +func (cl *loggerEmitter) Info(msg string) { + cl.emit(EventTypeInfo, msg) +} + +// Infof implements model.Logger.Infof. +func (cl *loggerEmitter) Infof(format string, v ...interface{}) { + cl.Info(fmt.Sprintf(format, v...)) +} + +// Warn implements model.Logger.Warn. +func (cl *loggerEmitter) Warn(msg string) { + cl.emit(EventTypeWarning, msg) +} + +// Warnf implements model.Logger.Warnf. +func (cl *loggerEmitter) Warnf(format string, v ...interface{}) { + cl.Warn(fmt.Sprintf(format, v...)) +} + +// emit is the code that actually emits the log event. +func (cl *loggerEmitter) emit(level string, message string) { + event := &Event{ + EventType: level, + Message: message, + Progress: 0, + } + // Implementation note: it's fine to lose interim events + select { + case cl.emitter <- event: + default: + } +} diff --git a/internal/miniengine/measure.go b/internal/miniengine/measure.go new file mode 100644 index 0000000000..6e898d856f --- /dev/null +++ b/internal/miniengine/measure.go @@ -0,0 +1,107 @@ +package miniengine + +// +// The "measure" task +// + +import ( + "github.com/ooni/probe-cli/v3/internal/model" + "golang.org/x/net/context" +) + +// MeasurementResult contains the results of [Session.Measure] +type MeasurementResult struct { + // KibiBytesReceived contains the KiB we received + KibiBytesReceived float64 `json:"kibi_bytes_received"` + + // KibiBytesSent contains the KiB we sent + KibiBytesSent float64 `json:"kibi_bytes_sent"` + + // Measurement is the generated [model.Measurement] + Measurement *model.Measurement `json:"measurement"` + + // Summary is the corresponding summary. + Summary any `json:"summary"` +} + +// Measure performs a measurement using the given experiment and input. +func (s *Session) Measure( + ctx context.Context, + name string, + options map[string]any, + input string, +) *Task[*MeasurementResult] { + task := &Task[*MeasurementResult]{ + done: make(chan any), + events: s.emitter, + failure: nil, + result: nil, + } + go s.measureAsync(ctx, name, options, input, task) + return task +} + +// measureAsync runs the measurement in a background goroutine. +func (s *Session) measureAsync( + ctx context.Context, + name string, + options map[string]any, + input string, + task *Task[*MeasurementResult], +) { + // synchronize with Task.Result + defer close(task.done) + + // lock and access the underlying session + s.mu.Lock() + defer s.mu.Unlock() + + // handle the case where we did not bootstrap + if s.state.IsNone() { + task.failure = ErrNoBootstrap + return + } + sess := s.state.Unwrap().sess + + // create a [model.ExperimentBuilder] + builder, err := sess.NewExperimentBuilder(name) + if err != nil { + task.failure = err + return + } + + // set the proper callbacks for the experiment + callbacks := &callbacks{s.emitter} + builder.SetCallbacks(callbacks) + + // set the proper options for the experiment + if err := builder.SetOptionsAny(options); err != nil { + task.failure = err + return + } + + // create an experiment instance + exp := builder.NewExperiment() + + // perform the measurement + meas, err := exp.MeasureWithContext(ctx, input) + if err != nil { + task.failure = err + return + } + + // obtain the summary + summary, err := exp.GetSummaryKeys(meas) + if err != nil { + task.failure = err + return + } + + // pass response to the caller + task.result = &MeasurementResult{ + KibiBytesReceived: exp.KibiBytesReceived(), + KibiBytesSent: exp.KibiBytesSent(), + Measurement: meas, + Summary: summary, + } +} diff --git a/internal/miniengine/session.go b/internal/miniengine/session.go new file mode 100644 index 0000000000..8d76b8e5eb --- /dev/null +++ b/internal/miniengine/session.go @@ -0,0 +1,198 @@ +package miniengine + +// +// Measurement session +// + +import ( + "errors" + "fmt" + "os" + "sync" + + "github.com/ooni/probe-cli/v3/internal/engine" + "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/optional" + "github.com/ooni/probe-cli/v3/internal/probeservices" +) + +// SessionConfig contains configuration for a [Session]. The zero value is +// invalid; please, initialize all the fields marked as MANDATORY. +type SessionConfig struct { + // SoftwareName is the MANDATORY name of the application + // that will be using the new [Session]. + SoftwareName string `json:"software_name"` + + // SoftwareVersion is the MANDATORY version of the application + // that will be using the new [Session]. + SoftwareVersion string `json:"software_version"` + + // StateDir is the MANDATORY directory where to store state + // information required by a [Session]. + StateDir string `json:"state_dir"` + + // TempDir is the MANDATORY directory inside which the [Session] shall + // store temporary files deleted when we close the [Session]. + TempDir string `json:"temp_dir"` + + // TunnelDir is the MANDATORY directory where the [Session] shall store + // persistent data regarding circumvention tunnels. + TunnelDir string `json:"tunnel_dir"` + + // Verbose OPTIONALLY configures the [Session] logger to be verbose. + Verbose bool `json:"verbose"` +} + +// ErrSessionConfig indicates that the [SessionConfig] is invalid. +var ErrSessionConfig = errors.New("invalid SessionConfig") + +// check checks whether the [SessionConfig] is valid. +func (sc *SessionConfig) check() error { + if sc.SoftwareName == "" { + return fmt.Errorf("%w: %s", ErrSessionConfig, "SoftwareName is empty") + } + if sc.SoftwareVersion == "" { + return fmt.Errorf("%w: %s", ErrSessionConfig, "SoftwareVersion is empty") + } + if sc.StateDir == "" { + return fmt.Errorf("%w: %s", ErrSessionConfig, "StateDir is empty") + } + if sc.TempDir == "" { + return fmt.Errorf("%w: %s", ErrSessionConfig, "TempDir is empty") + } + if sc.TunnelDir == "" { + return fmt.Errorf("%w: %s", ErrSessionConfig, "TunnelDir is empty") + } + return nil +} + +// mkdirAll ensures all the required directories exist. +func (sc *SessionConfig) mkdirAll() error { + if err := os.MkdirAll(sc.StateDir, 0700); err != nil { + return err + } + if err := os.MkdirAll(sc.TempDir, 0700); err != nil { + return err + } + if err := os.MkdirAll(sc.TunnelDir, 0700); err != nil { + return err + } + return nil +} + +// Session is a measurement session. The zero value is invalid; please +// create a new instance using the [NewSession] factory. +type Session struct { + // closeJustOnce ensures we close this [Session] just once. + closeJustOnce sync.Once + + // emitter is the [emitter] to use. + emitter chan *Event + + // logger is the [model.Logger] to use. + logger model.Logger + + // mu provides mutual exclusion. + mu sync.Mutex + + // softwareName is the software name. + softwareName string + + // softwareVersion is the software version. + softwareVersion string + + // stateDir is the directory containing state. + stateDir string + + // state contains the optional state. + state optional.Value[*engineSessionState] + + // tempDir is the temporary directory root. + tempDir string + + // tunnelDir is the directory containing tunnel state. + tunnelDir string +} + +// engineSessionState contains the state associated with an [engine.Session]. +type engineSessionState struct { + // checkIn contains the check-in API response. + checkIn *model.OOAPICheckInResult + + // geoloc contains the geolocation. + geoloc *Location + + // psc is the [probeservices.Client] to use. + psc *probeservices.Client + + // sess is the underlying [engine.Session]. + sess *engine.Session +} + +// NewSession creates a new [Session] instance. +func NewSession(config *SessionConfig) (*Session, error) { + // check whether the [SessionConfig] is valid. + if err := config.check(); err != nil { + return nil, err + } + + // make sure all the required directories exist. + if err := config.mkdirAll(); err != nil { + return nil, err + } + + // create the base event emitter + const buffer = 1024 + emitter := make(chan *Event, buffer) + + // create a logger using the base event emitter + logger := newLoggerEmitter(emitter, config.Verbose) + + // assemble and return a session. + sess := &Session{ + closeJustOnce: sync.Once{}, + emitter: emitter, + logger: logger, + mu: sync.Mutex{}, + softwareName: config.SoftwareName, + softwareVersion: config.SoftwareVersion, + stateDir: config.StateDir, + state: optional.None[*engineSessionState](), + tempDir: config.TempDir, + tunnelDir: config.TunnelDir, + } + return sess, nil +} + +// ErrNoBootstrap indicates that you did not bootstrap the [Session]. +var ErrNoBootstrap = errors.New("bootstrap the session first") + +// CheckInResult returns the check-in API result. +func (s *Session) CheckInResult() (*model.OOAPICheckInResult, error) { + // make sure this method is synchronized + defer s.mu.Unlock() + s.mu.Lock() + + // handle the case where there's no state. + if s.state.IsNone() { + return nil, ErrNoBootstrap + } + + // return the underlying value + return s.state.Unwrap().checkIn, nil +} + +// GeolocateResult returns the geolocation result. +func (s *Session) GeolocateResult() (*Location, error) { + // make sure this method is synchronized + defer s.mu.Unlock() + s.mu.Lock() + + // handle the case where there's no state. + if s.state.IsNone() { + return nil, ErrNoBootstrap + } + + // return the underlying value + return s.state.Unwrap().geoloc, nil +} diff --git a/internal/miniengine/submit.go b/internal/miniengine/submit.go new file mode 100644 index 0000000000..bceac2f7f9 --- /dev/null +++ b/internal/miniengine/submit.go @@ -0,0 +1,44 @@ +package miniengine + +// +// The "submit" task +// + +import ( + "github.com/ooni/probe-cli/v3/internal/model" + "golang.org/x/net/context" +) + +// Submit submits a [model.Measurement] to the OONI backend. You MUST initialize +// the measurement's report ID. You can find the report ID for each experiment +// in the results of the check-in API. +func (s *Session) Submit(ctx context.Context, meas *model.Measurement) *Task[Void] { + task := &Task[Void]{ + done: make(chan any), + events: s.emitter, + failure: nil, + result: Void{}, + } + go s.submitAsync(ctx, meas, task) + return task +} + +// submitAsync submits the measurement in a background goroutine. +func (s *Session) submitAsync(ctx context.Context, meas *model.Measurement, task *Task[Void]) { + // synchronize with Task.Result + defer close(task.done) + + // lock and access the underlying session + s.mu.Lock() + defer s.mu.Unlock() + + // handle the case where we did not bootstrap + if s.state.IsNone() { + task.failure = ErrNoBootstrap + return + } + state := s.state.Unwrap() + + // submit the measurement to the backend + task.failure = state.psc.SubmitMeasurement(ctx, meas) +} diff --git a/internal/miniengine/task.go b/internal/miniengine/task.go new file mode 100644 index 0000000000..a944947933 --- /dev/null +++ b/internal/miniengine/task.go @@ -0,0 +1,90 @@ +package miniengine + +// +// Task +// + +import ( + "github.com/ooni/probe-cli/v3/internal/model" + "golang.org/x/net/context" +) + +// Task is a long running operation that emits [Event] while it is running and +// produces a given Result. The zero value of this struct is invalid; you cannot +// create a valid [Task] outside of this package. +type Task[Result any] struct { + // done is closed when the [Task] is done. + done chan any + + // events is where the [Task] emits [Event]. + events chan *Event + + // failure is the [Task] failure or nil. + failure error + + // result is the [Task] result (zero on failure). + result Result +} + +// TODO(bassosimone): +// +// 1. we need a way to cancel/interrupt a running Task, which would +// simplify the C API implementation a bit. + +// Done returns a channel closed when the [Task] is done. +func (t *Task[Result]) Done() <-chan any { + return t.done +} + +// Events returns a channel where a running [Task] emits [Event]. +func (t *Task[Result]) Events() <-chan *Event { + return t.events +} + +// Result returns the [Task] result (if the task succeded) or the error that +// occurred (in case of failure). This method blocks until the channel returned +// by the [Task.Done] method has been closed. +func (t *Task[Result]) Result() (Result, error) { + <-t.done // synchronize with TaskRunner.Main + return t.result, t.failure +} + +// Await waits for the task to complete and properly emits log messages +// using the given logger and the given callbacks for progress. +func (t *Task[Result]) Await( + ctx context.Context, + logger model.Logger, + callbacks model.ExperimentCallbacks, +) { + for { + select { + case <-ctx.Done(): + return + case <-t.Done(): + for { + select { + case ev := <-t.Events(): + t.emit(logger, callbacks, ev) + default: + return + } + } + case ev := <-t.Events(): + t.emit(logger, callbacks, ev) + } + } +} + +// emit is the helper function for emitting events called by Await. +func (t *Task[Result]) emit(logger model.Logger, callbacks model.ExperimentCallbacks, ev *Event) { + switch ev.EventType { + case EventTypeProgress: + callbacks.OnProgress(ev.Progress, ev.Message) + case EventTypeDebug: + logger.Debug(ev.Message) + case EventTypeWarning: + logger.Warn(ev.Message) + default: + logger.Info(ev.Message) + } +} diff --git a/internal/miniengine/void.go b/internal/miniengine/void.go new file mode 100644 index 0000000000..f598d2dde0 --- /dev/null +++ b/internal/miniengine/void.go @@ -0,0 +1,4 @@ +package miniengine + +// Void is a structure without any content. +type Void struct{} diff --git a/internal/probeservices/collector.go b/internal/probeservices/collector.go index 54818f2749..132795aa20 100644 --- a/internal/probeservices/collector.go +++ b/internal/probeservices/collector.go @@ -23,6 +23,25 @@ var ( ErrJSONFormatNotSupported = errors.New("JSON format not supported") ) +// ErrEmptyReportID indicates you passed a measurement with an empty ReportID +// to a function that submits measurements directly. +var ErrEmptyReportID = errors.New("probeservices: empty report ID") + +// SubmitMeasurement submits the given measurement to the OONI collector. You MUST initialize +// the report ID of the measurement, otherwise the submission will fail immediately. +func (c Client) SubmitMeasurement(ctx context.Context, m *model.Measurement) error { + if m.ReportID == "" { + return ErrEmptyReportID + } + var updateResponse model.OOAPICollectorUpdateResponse + return c.APIClientTemplate.WithBodyLogging().Build().PostJSON( + ctx, fmt.Sprintf("/report/%s", m.ReportID), model.OOAPICollectorUpdateRequest{ + Format: "json", + Content: m, + }, &updateResponse, + ) +} + // NewReportTemplate creates a new ReportTemplate from a Measurement. func NewReportTemplate(m *model.Measurement) model.OOAPIReportTemplate { return model.OOAPIReportTemplate{ diff --git a/pkg/oonimkall/tasklogger.go b/pkg/oonimkall/tasklogger.go index b6550dd3d8..7793b2a76f 100644 --- a/pkg/oonimkall/tasklogger.go +++ b/pkg/oonimkall/tasklogger.go @@ -1,17 +1,17 @@ package oonimkall -import ( - "fmt" - - "github.com/ooni/probe-cli/v3/internal/model" -) - // // This file implements the logger used by a task. Outside // of this file, the rest of the codebase just sees a generic // model.Logger that can log events. // +import ( + "fmt" + + "github.com/ooni/probe-cli/v3/internal/model" +) + // taskLogger is the logger used by a task. type taskLogger struct { // emitter is the event emitter.