diff --git a/internal/cmd/miniooni/libminiooni.go b/internal/cmd/miniooni/libminiooni.go index 934e83a294..465e71088d 100644 --- a/internal/cmd/miniooni/libminiooni.go +++ b/internal/cmd/miniooni/libminiooni.go @@ -42,6 +42,7 @@ type Options struct { MaxRuntime int64 NoJSON bool NoCollector bool + Parallelism int ProbeServicesURL string Proxy string Random bool @@ -99,6 +100,9 @@ func init() { &globalOptions.ProbeServicesURL, "probe-services", 0, "Set the URL of the probe-services instance you want to use", "URL", ) + getopt.FlagLong( + &globalOptions.Parallelism, "parallelism", 1, "Parallelism for performing measurements", "NUMBER", + ) getopt.FlagLong( &globalOptions.Proxy, "proxy", 0, "Set the proxy URL", "URL", ) @@ -404,6 +408,7 @@ func mainSingleIteration(logger model.Logger, experimentName string, currentOpti Name: experimentName, NoCollector: currentOptions.NoCollector, NoJSON: currentOptions.NoJSON, + Parallelism: currentOptions.Parallelism, Random: currentOptions.Random, ReportFile: currentOptions.ReportFile, Session: sess, diff --git a/internal/engine/inputprocessor.go b/internal/engine/inputprocessor.go index ed33b3e9b3..07512f2476 100644 --- a/internal/engine/inputprocessor.go +++ b/internal/engine/inputprocessor.go @@ -2,6 +2,7 @@ package engine import ( "context" + "sync" "time" "github.com/ooni/probe-cli/v3/internal/model" @@ -14,31 +15,6 @@ type InputProcessorExperiment interface { ctx context.Context, input string) (<-chan *model.Measurement, error) } -// InputProcessorExperimentWrapper is a wrapper for an -// Experiment that also allow to pass around the input index. -type InputProcessorExperimentWrapper interface { - MeasureAsync( - ctx context.Context, input string, idx int) (<-chan *model.Measurement, error) -} - -// NewInputProcessorExperimentWrapper creates a new -// instance of InputProcessorExperimentWrapper. -func NewInputProcessorExperimentWrapper( - exp InputProcessorExperiment) InputProcessorExperimentWrapper { - return inputProcessorExperimentWrapper{exp: exp} -} - -type inputProcessorExperimentWrapper struct { - exp InputProcessorExperiment -} - -func (ipew inputProcessorExperimentWrapper) MeasureAsync( - ctx context.Context, input string, idx int) (<-chan *model.Measurement, error) { - return ipew.exp.MeasureAsync(ctx, input) -} - -var _ InputProcessorExperimentWrapper = inputProcessorExperimentWrapper{} - // InputProcessor processes inputs. We perform a Measurement // for each input using the given Experiment. type InputProcessor struct { @@ -46,11 +22,14 @@ type InputProcessor struct { Annotations map[string]string // Experiment is the code that will run the experiment. - Experiment InputProcessorExperimentWrapper + Experiment InputProcessorExperiment // Inputs is the list of inputs to measure. Inputs []model.OOAPIURLInfo + // Logger is the MANDATORY logger to use. + Logger model.Logger + // MaxRuntime is the optional maximum runtime // when looping over a list of inputs (e.g. when // running Web Connectivity). Zero means that @@ -60,54 +39,18 @@ type InputProcessor struct { // Options contains command line options for this experiment. Options []string + // Parallelism contains the OPTIONAL parallelism + // for performing measurements. A zero or negative + // value implies we want just one goroutine. + Parallelism int + // Saver is the code that will save measurement results // on persistent storage (e.g. the file system). - Saver InputProcessorSaverWrapper + Saver Saver // Submitter is the code that will submit measurements // to the OONI collector. - Submitter InputProcessorSubmitterWrapper -} - -// InputProcessorSaverWrapper is InputProcessor's -// wrapper for a Saver implementation. -type InputProcessorSaverWrapper interface { - SaveMeasurement(idx int, m *model.Measurement) error -} - -type inputProcessorSaverWrapper struct { - saver Saver -} - -// NewInputProcessorSaverWrapper wraps a Saver for InputProcessor. -func NewInputProcessorSaverWrapper(saver Saver) InputProcessorSaverWrapper { - return inputProcessorSaverWrapper{saver: saver} -} - -func (ipsw inputProcessorSaverWrapper) SaveMeasurement( - idx int, m *model.Measurement) error { - return ipsw.saver.SaveMeasurement(m) -} - -// InputProcessorSubmitterWrapper is InputProcessor's -// wrapper for a Submitter implementation. -type InputProcessorSubmitterWrapper interface { - Submit(ctx context.Context, idx int, m *model.Measurement) error -} - -type inputProcessorSubmitterWrapper struct { - submitter Submitter -} - -// NewInputProcessorSubmitterWrapper wraps a Submitter -// for the InputProcessor. -func NewInputProcessorSubmitterWrapper(submitter Submitter) InputProcessorSubmitterWrapper { - return inputProcessorSubmitterWrapper{submitter: submitter} -} - -func (ipsw inputProcessorSubmitterWrapper) Submit( - ctx context.Context, idx int, m *model.Measurement) error { - return ipsw.submitter.Submit(ctx, m) + Submitter Submitter } // Run processes all the input subject to the duration of the @@ -123,8 +66,10 @@ func (ipsw inputProcessorSubmitterWrapper) Submit( // though is free to choose different policies by configuring // the Experiment, Submitter, and Saver fields properly. func (ip *InputProcessor) Run(ctx context.Context) error { - _, err := ip.run(ctx) - return err + // TODO(bassosimone): it's unclear how to report errors back + // now that we're in a parallel context. + ip.run(ctx) + return nil } // These are the reasons why run could stop. @@ -135,37 +80,74 @@ const ( // run is like Run but, in addition to returning an error, it // also returns the reason why we stopped. -func (ip *InputProcessor) run(ctx context.Context) (int, error) { +func (ip *InputProcessor) run(ctx context.Context) { + saver := StartAsyncSaver(ip.Saver) + submitter := StartAsyncSubmitter(ip.Logger, ip.Submitter, saver) + wg := &sync.WaitGroup{} + urls := ip.generateInputs() + parallelism := ip.Parallelism + if parallelism < 1 { + parallelism = 1 + } start := time.Now() - for idx, url := range ip.Inputs { + for cnt := 0; cnt < parallelism; cnt++ { + wg.Add(1) + go ip.performMeasurement(ctx, wg, urls, start, submitter) + } + // wait for measurers to join + wg.Wait() + // termination protocol for saver and submitter + submitter.Stop() + submitter.Wait() + + saver.Stop() + saver.Wait() +} + +func (ip *InputProcessor) performMeasurement( + ctx context.Context, wg *sync.WaitGroup, urls <-chan *inputWithIndex, + start time.Time, submitter AsyncSubmitter) (int, error) { + defer wg.Done() // synchronize with the parent + for inputIdx := range urls { + idx := inputIdx.idx + input := inputIdx.input if ip.MaxRuntime > 0 && time.Since(start) > ip.MaxRuntime { return stopMaxRuntime, nil } - input := url.URL - var measurements []*model.Measurement - source, err := ip.Experiment.MeasureAsync(ctx, input, idx) + if input != "" { + ip.Logger.Infof("[%d/%d] running with input: %s", idx+1, len(ip.Inputs), input) + } + source, err := ip.Experiment.MeasureAsync(ctx, input) if err != nil { return 0, err } - // NOTE: we don't want to intermix measuring with submitting - // therefore we collect all measurements first for meas := range source { - measurements = append(measurements, meas) - } - for _, meas := range measurements { - meas.AddAnnotations(ip.Annotations) - meas.Options = ip.Options - err = ip.Submitter.Submit(ctx, idx, meas) - if err != nil { - return 0, err - } - // Note: must be after submission because submission modifies - // the measurement to include the report ID. - err = ip.Saver.SaveMeasurement(idx, meas) - if err != nil { - return 0, err - } + submitter.Submit(idx, meas) } } return stopNormal, nil } + +// inputWithIndex combines an input with its index. +type inputWithIndex struct { + // idx is the index + idx int + + // input contains the URL input + input string +} + +// generateInputs returns a channel where each input is emitted. +func (ip *InputProcessor) generateInputs() <-chan *inputWithIndex { + out := make(chan *inputWithIndex) + go func() { + defer close(out) + for idx, url := range ip.Inputs { + out <- &inputWithIndex{ + idx: idx, + input: url.URL, + } + } + }() + return out +} diff --git a/internal/engine/probeservices/collector.go b/internal/engine/probeservices/collector.go index 3504dbbac8..e794052bde 100644 --- a/internal/engine/probeservices/collector.go +++ b/internal/engine/probeservices/collector.go @@ -199,7 +199,7 @@ func NewSubmitter(opener ReportOpener, logger model.Logger) *Submitter { // Submit submits the current measurement to the OONI backend created using // the ReportOpener passed to the constructor. -func (sub *Submitter) Submit(ctx context.Context, m *model.Measurement) error { +func (sub *Submitter) Submit(ctx context.Context, idx int, m *model.Measurement) error { var err error sub.mu.Lock() defer sub.mu.Unlock() diff --git a/internal/engine/saver.go b/internal/engine/saver.go index 471be329cc..570c80ad88 100644 --- a/internal/engine/saver.go +++ b/internal/engine/saver.go @@ -1,14 +1,104 @@ package engine import ( + "context" "errors" + "sync" + "github.com/ooni/probe-cli/v3/internal/atomicx" "github.com/ooni/probe-cli/v3/internal/model" ) +// AsyncSaver is a an asynchronous [Saver]. +type AsyncSaver interface { + // SaveMeasurement requests the saver to save a measurement. The return + // value indicates whether the measurement has been accepted by the + // saver or whether it has been rejected. The saver will reject + // measurements after you've called its Stop method. + SaveMeasurement(idx int, m *model.Measurement) bool + + // Stop tells the saver that it should stop running as soon + // as possible, which entails trying to finish saving all the queued + // measurements to avoid losing them. Use Wait to know when the + // saver has finished saver all measurements. + Stop() + + // Wait waits for the saver to finish saving all the + // measurements that are currently queued for writing. Note + // that you should call Stop before calling Wait to inform + // the saver that it should stop running ASAP. + Wait() +} + // Saver saves a measurement on some persistent storage. type Saver interface { - SaveMeasurement(m *model.Measurement) error + SaveMeasurement(idx int, m *model.Measurement) error +} + +// asyncSaver implements AsyncSaver. +type asyncSaver struct { + // logger is the Logger to use. + logger model.Logger + + // queue is the queue containing measurements to submit. + queue chan *measurementWithIndex + + // running tracks the running goroutines. + running *sync.WaitGroup + + // stopped indicates that the submitter has stopped. + stopped *atomicx.Int64 +} + +// StartAsyncSaver creates a new [AsyncSaver] with the given [Saver]. We'll +// run the [AsyncSaver] in a background goroutine. You should call Stop to tell +// it it's time to stop and Wait to wait for it to complete. +func StartAsyncSaver(saver Saver) AsyncSaver { + as := &asyncSaver{ + logger: nil, + queue: make(chan *measurementWithIndex), + running: &sync.WaitGroup{}, + stopped: &atomicx.Int64{}, + } + go as.run(context.Background(), saver) + as.running.Add(1) + return as +} + +// SaveMeasurement implements AsyncSaver.SaveMeasurement. +func (as *asyncSaver) SaveMeasurement(idx int, m *model.Measurement) bool { + if as.stopped.Load() > 0 { + return false + } + as.queue <- &measurementWithIndex{ + idx: idx, + m: m, + } + return true +} + +// run saves measurements in FIFO order. +func (as *asyncSaver) run(ctx context.Context, saver Saver) { + defer as.running.Done() + for m := range as.queue { + // TODO(bassosimone): should we tell anyone about this error? + err := saver.SaveMeasurement(m.idx, m.m) + if err != nil { + as.logger.Warnf("asyncSaver: cannot save measurement: %s", err.Error()) + continue + } + } +} + +// Stop implements AsyncSaver.Stop. +func (as *asyncSaver) Stop() { + as.stopped.Add(1) // must happen BEFORE closing the channel + close(as.queue) +} + +// Wait implements AsyncSaver.Wait. +func (as *asyncSaver) Wait() { + as.running.Wait() } // SaverConfig is the configuration for creating a new Saver. @@ -49,7 +139,7 @@ func NewSaver(config SaverConfig) (Saver, error) { type fakeSaver struct{} -func (fs fakeSaver) SaveMeasurement(m *model.Measurement) error { +func (fs fakeSaver) SaveMeasurement(idx int, m *model.Measurement) error { return nil } @@ -61,7 +151,7 @@ type realSaver struct { Logger model.Logger } -func (rs realSaver) SaveMeasurement(m *model.Measurement) error { +func (rs realSaver) SaveMeasurement(idx int, m *model.Measurement) error { rs.Logger.Info("saving measurement to disk") return rs.Experiment.SaveMeasurement(m, rs.FilePath) } diff --git a/internal/engine/submitter.go b/internal/engine/submitter.go index 074b820a0b..f01d870846 100644 --- a/internal/engine/submitter.go +++ b/internal/engine/submitter.go @@ -2,18 +2,140 @@ package engine import ( "context" + "sync" + "github.com/ooni/probe-cli/v3/internal/atomicx" "github.com/ooni/probe-cli/v3/internal/model" ) // TODO(bassosimone): maybe keep track of which measurements // could not be submitted by a specific submitter? +// AsyncSubmitter is a an async submitter. It runs one or more [Submitter] +// in the background and will use them for submitting measurements. +type AsyncSubmitter interface { + // Submit requests the submitter to submit a measurement. The return + // value indicates whether the measurement has been accepted by the + // submitter or whether it has been rejected. The submitter will reject + // measurements after you've called its Stop method. + Submit(idx int, m *model.Measurement) bool + + // Stop tells the submitter that it should stop running as soon + // as possible, which entails trying to submit all the queued + // measurements to avoid losing them. Use Wait to know when the + // submitter has finished submitting all measurements. + Stop() + + // Wait waits for the submitter to finish submitting all the + // measurements that are currently queued for submission. Note + // that you should call Stop before calling Wait to inform + // the submitter that it should stop running ASAP. + Wait() +} + +// measurementWithIndex is a measurement along with its index. +type measurementWithIndex struct { + // idx is the index + idx int + + // m is the measurement + m *model.Measurement +} + +// asyncSubmitterBuffer is the buffer used by the async submitter's queue. +const asyncSubmitterBuffer = 4 + +// asyncSubmitter implements AsyncSubmitter. +type asyncSubmitter struct { + // asyncSaver is the AsyncSaver instance to use. + asyncSaver AsyncSaver + + // logger is the Logger to use. + logger model.Logger + + // queue is the queue containing measurements to submit. + queue chan *measurementWithIndex + + // running tracks the running goroutines. + running *sync.WaitGroup + + // stopped indicates that the submitter has stopped. + stopped *atomicx.Int64 +} + +// StartAsyncSubmitter creates a new AsyncSubmitter using the +// given underlying [config]. We'll create a queue with a +// maximum buffer. When the queue is full, Submit blocks +// waiting for pending submissions to complete. This factory +// will create a single background goroutine that submits the +// measurements. You must use Stop to kill such a goroutine +// and Wait to wait for the goroutine to join. The [logger] argument +// contains the logger to be used. The [asyncSaver] argument is +// the saver that should write on disk measurements +// that have been submitted, with their correct report ID. +func StartAsyncSubmitter( + logger model.Logger, submitter Submitter, asyncSaver AsyncSaver) AsyncSubmitter { + asub := &asyncSubmitter{ + asyncSaver: asyncSaver, + logger: logger, + queue: make(chan *measurementWithIndex, asyncSubmitterBuffer), + running: &sync.WaitGroup{}, + stopped: &atomicx.Int64{}, + } + go asub.run(context.Background(), submitter) + asub.running.Add(1) + return asub +} + +// Submit implements AsyncSubmitter.Submit. +func (asub *asyncSubmitter) Submit(idx int, m *model.Measurement) bool { + if asub.stopped.Load() > 0 { + return false + } + asub.queue <- &measurementWithIndex{ + idx: idx, + m: m, + } + return true +} + +// run submits measurements in FIFO order. +func (asub *asyncSubmitter) run(ctx context.Context, submitter Submitter) { + defer asub.running.Done() + for m := range asub.queue { + // TODO(bassosimone): add support for knowing when we could not + // submit measurements. We will discuss this once we've tried + // out this simple concept in a real-world experiment. + // + // Likewise, we should discuss policies regarding retries, which + // we're not implementing at the moment for brevity. + err := submitter.Submit(ctx, m.idx, m.m) + if err != nil { + asub.logger.Warnf("asyncSubmitter: cannot submit measurement: %s", err.Error()) + // FALLTHRU + } + // We chain saving after the submission such that the reportID, which is + // modified by Submit (a choice that I regret of), get finally saved. + _ = asub.asyncSaver.SaveMeasurement(m.idx, m.m) + } +} + +// Stop implements AsyncSubmitter.Stop. +func (asub *asyncSubmitter) Stop() { + asub.stopped.Add(1) // must happen BEFORE closing the channel + close(asub.queue) +} + +// Wait implements AsyncSubmitter.Wait. +func (asub *asyncSubmitter) Wait() { + asub.running.Wait() +} + // Submitter submits a measurement to the OONI collector. type Submitter interface { // Submit submits the measurement and updates its // report ID field in case of success. - Submit(ctx context.Context, m *model.Measurement) error + Submit(ctx context.Context, idx int, m *model.Measurement) error } // SubmitterSession is the Submitter's view of the Session. @@ -50,7 +172,7 @@ func NewSubmitter(ctx context.Context, config SubmitterConfig) (Submitter, error type stubSubmitter struct{} -func (stubSubmitter) Submit(ctx context.Context, m *model.Measurement) error { +func (stubSubmitter) Submit(ctx context.Context, idx int, m *model.Measurement) error { return nil } @@ -61,7 +183,7 @@ type realSubmitter struct { logger model.Logger } -func (rs realSubmitter) Submit(ctx context.Context, m *model.Measurement) error { +func (rs realSubmitter) Submit(ctx context.Context, idx int, m *model.Measurement) error { rs.logger.Info("submitting measurement to OONI collector; please be patient...") - return rs.subm.Submit(ctx, m) + return rs.subm.Submit(ctx, idx, m) } diff --git a/internal/oonirun/experiment.go b/internal/oonirun/experiment.go index 10cfdfa96e..05fb77d19e 100644 --- a/internal/oonirun/experiment.go +++ b/internal/oonirun/experiment.go @@ -42,6 +42,10 @@ type Experiment struct { // NoJSON OPTIONALLY indicates we don't want to save measurements to a JSON file. NoJSON bool + // Parallellism OPTIONALLY indicates the number of goroutines to use + // to perform measurements. A zero or negative value implies one goroutine. + Parallelism int + // Random OPTIONALLY indicates we should randomize inputs. Random bool @@ -121,19 +125,14 @@ func (ed *Experiment) newInputProcessor(experiment engine.Experiment, inputList []model.OOAPIURLInfo, saver engine.Saver, submitter engine.Submitter) inputProcessor { return &engine.InputProcessor{ Annotations: ed.Annotations, - Experiment: &experimentWrapper{ - child: engine.NewInputProcessorExperimentWrapper(experiment), - logger: ed.Session.Logger(), - total: len(inputList), - }, - Inputs: inputList, - MaxRuntime: time.Duration(ed.MaxRuntime) * time.Second, - Options: experimentOptionsToStringList(ed.ExtraOptions), - Saver: engine.NewInputProcessorSaverWrapper(saver), - Submitter: &experimentSubmitterWrapper{ - child: engine.NewInputProcessorSubmitterWrapper(submitter), - logger: ed.Session.Logger(), - }, + Experiment: experiment, + Inputs: inputList, + Logger: ed.Session.Logger(), + MaxRuntime: time.Duration(ed.MaxRuntime) * time.Second, + Options: experimentOptionsToStringList(ed.ExtraOptions), + Parallelism: ed.Parallelism, + Saver: saver, + Submitter: submitter, } } @@ -190,41 +189,3 @@ func experimentOptionsToStringList(options map[string]any) (out []string) { } return } - -// experimentWrapper wraps an experiment and logs progress -type experimentWrapper struct { - // child is the child experiment wrapper - child engine.InputProcessorExperimentWrapper - - // logger is the logger to use - logger model.Logger - - // total is the total number of inputs - total int -} - -func (ew *experimentWrapper) MeasureAsync( - ctx context.Context, input string, idx int) (<-chan *model.Measurement, error) { - if input != "" { - ew.logger.Infof("[%d/%d] running with input: %s", idx+1, ew.total, input) - } - return ew.child.MeasureAsync(ctx, input, idx) -} - -// experimentSubmitterWrapper implements a submission policy where we don't -// fail if we cannot submit a measurement -type experimentSubmitterWrapper struct { - // child is the child submitter wrapper - child engine.InputProcessorSubmitterWrapper - - // logger is the logger to use - logger model.Logger -} - -func (sw *experimentSubmitterWrapper) Submit(ctx context.Context, idx int, m *model.Measurement) error { - if err := sw.child.Submit(ctx, idx, m); err != nil { - sw.logger.Warnf("submitting measurement failed: %s", err.Error()) - } - // policy: we do not stop the loop if measurement submission fails - return nil -} diff --git a/internal/oonirun/v2.go b/internal/oonirun/v2.go index b7455ebf15..4af20cba1f 100644 --- a/internal/oonirun/v2.go +++ b/internal/oonirun/v2.go @@ -177,6 +177,7 @@ func v2MeasureDescriptor(ctx context.Context, config *LinkConfig, desc *v2Descri Name: nettest.TestName, NoCollector: config.NoCollector, NoJSON: config.NoJSON, + Parallelism: 1, // TODO(bassosimone): support this use case Random: config.Random, ReportFile: config.ReportFile, Session: config.Session,