Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions internal/cmd/miniooni/libminiooni.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Options struct {
MaxRuntime int64
NoJSON bool
NoCollector bool
Parallelism int
ProbeServicesURL string
Proxy string
Random bool
Expand Down Expand Up @@ -99,6 +100,9 @@ func init() {
&globalOptions.ProbeServicesURL, "probe-services", 0,
"Set the URL of the probe-services instance you want to use", "URL",
)
getopt.FlagLong(
&globalOptions.Parallelism, "parallelism", 1, "Parallelism for performing measurements", "NUMBER",
)
getopt.FlagLong(
&globalOptions.Proxy, "proxy", 0, "Set the proxy URL", "URL",
)
Expand Down Expand Up @@ -404,6 +408,7 @@ func mainSingleIteration(logger model.Logger, experimentName string, currentOpti
Name: experimentName,
NoCollector: currentOptions.NoCollector,
NoJSON: currentOptions.NoJSON,
Parallelism: currentOptions.Parallelism,
Random: currentOptions.Random,
ReportFile: currentOptions.ReportFile,
Session: sess,
Expand Down
168 changes: 75 additions & 93 deletions internal/engine/inputprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package engine

import (
"context"
"sync"
"time"

"github.com/ooni/probe-cli/v3/internal/model"
Expand All @@ -14,43 +15,21 @@ type InputProcessorExperiment interface {
ctx context.Context, input string) (<-chan *model.Measurement, error)
}

// InputProcessorExperimentWrapper is a wrapper for an
// Experiment that also allow to pass around the input index.
type InputProcessorExperimentWrapper interface {
MeasureAsync(
ctx context.Context, input string, idx int) (<-chan *model.Measurement, error)
}

// NewInputProcessorExperimentWrapper creates a new
// instance of InputProcessorExperimentWrapper.
func NewInputProcessorExperimentWrapper(
exp InputProcessorExperiment) InputProcessorExperimentWrapper {
return inputProcessorExperimentWrapper{exp: exp}
}

type inputProcessorExperimentWrapper struct {
exp InputProcessorExperiment
}

func (ipew inputProcessorExperimentWrapper) MeasureAsync(
ctx context.Context, input string, idx int) (<-chan *model.Measurement, error) {
return ipew.exp.MeasureAsync(ctx, input)
}

var _ InputProcessorExperimentWrapper = inputProcessorExperimentWrapper{}

// InputProcessor processes inputs. We perform a Measurement
// for each input using the given Experiment.
type InputProcessor struct {
// Annotations contains the measurement annotations
Annotations map[string]string

// Experiment is the code that will run the experiment.
Experiment InputProcessorExperimentWrapper
Experiment InputProcessorExperiment

// Inputs is the list of inputs to measure.
Inputs []model.OOAPIURLInfo

// Logger is the MANDATORY logger to use.
Logger model.Logger

// MaxRuntime is the optional maximum runtime
// when looping over a list of inputs (e.g. when
// running Web Connectivity). Zero means that
Expand All @@ -60,54 +39,18 @@ type InputProcessor struct {
// Options contains command line options for this experiment.
Options []string

// Parallelism contains the OPTIONAL parallelism
// for performing measurements. A zero or negative
// value implies we want just one goroutine.
Parallelism int

// Saver is the code that will save measurement results
// on persistent storage (e.g. the file system).
Saver InputProcessorSaverWrapper
Saver Saver

// Submitter is the code that will submit measurements
// to the OONI collector.
Submitter InputProcessorSubmitterWrapper
}

// InputProcessorSaverWrapper is InputProcessor's
// wrapper for a Saver implementation.
type InputProcessorSaverWrapper interface {
SaveMeasurement(idx int, m *model.Measurement) error
}

type inputProcessorSaverWrapper struct {
saver Saver
}

// NewInputProcessorSaverWrapper wraps a Saver for InputProcessor.
func NewInputProcessorSaverWrapper(saver Saver) InputProcessorSaverWrapper {
return inputProcessorSaverWrapper{saver: saver}
}

func (ipsw inputProcessorSaverWrapper) SaveMeasurement(
idx int, m *model.Measurement) error {
return ipsw.saver.SaveMeasurement(m)
}

// InputProcessorSubmitterWrapper is InputProcessor's
// wrapper for a Submitter implementation.
type InputProcessorSubmitterWrapper interface {
Submit(ctx context.Context, idx int, m *model.Measurement) error
}

type inputProcessorSubmitterWrapper struct {
submitter Submitter
}

// NewInputProcessorSubmitterWrapper wraps a Submitter
// for the InputProcessor.
func NewInputProcessorSubmitterWrapper(submitter Submitter) InputProcessorSubmitterWrapper {
return inputProcessorSubmitterWrapper{submitter: submitter}
}

func (ipsw inputProcessorSubmitterWrapper) Submit(
ctx context.Context, idx int, m *model.Measurement) error {
return ipsw.submitter.Submit(ctx, m)
Submitter Submitter
}

// Run processes all the input subject to the duration of the
Expand All @@ -123,8 +66,10 @@ func (ipsw inputProcessorSubmitterWrapper) Submit(
// though is free to choose different policies by configuring
// the Experiment, Submitter, and Saver fields properly.
func (ip *InputProcessor) Run(ctx context.Context) error {
_, err := ip.run(ctx)
return err
// TODO(bassosimone): it's unclear how to report errors back
// now that we're in a parallel context.
ip.run(ctx)
return nil
}

// These are the reasons why run could stop.
Expand All @@ -135,37 +80,74 @@ const (

// run is like Run but, in addition to returning an error, it
// also returns the reason why we stopped.
func (ip *InputProcessor) run(ctx context.Context) (int, error) {
func (ip *InputProcessor) run(ctx context.Context) {
saver := StartAsyncSaver(ip.Saver)
submitter := StartAsyncSubmitter(ip.Logger, ip.Submitter, saver)
wg := &sync.WaitGroup{}
urls := ip.generateInputs()
parallelism := ip.Parallelism
if parallelism < 1 {
parallelism = 1
}
start := time.Now()
for idx, url := range ip.Inputs {
for cnt := 0; cnt < parallelism; cnt++ {
wg.Add(1)
go ip.performMeasurement(ctx, wg, urls, start, submitter)
}
// wait for measurers to join
wg.Wait()
// termination protocol for saver and submitter
submitter.Stop()
submitter.Wait()

saver.Stop()
saver.Wait()
}

func (ip *InputProcessor) performMeasurement(
ctx context.Context, wg *sync.WaitGroup, urls <-chan *inputWithIndex,
start time.Time, submitter AsyncSubmitter) (int, error) {
defer wg.Done() // synchronize with the parent
for inputIdx := range urls {
idx := inputIdx.idx
input := inputIdx.input
if ip.MaxRuntime > 0 && time.Since(start) > ip.MaxRuntime {
return stopMaxRuntime, nil
}
input := url.URL
var measurements []*model.Measurement
source, err := ip.Experiment.MeasureAsync(ctx, input, idx)
if input != "" {
ip.Logger.Infof("[%d/%d] running with input: %s", idx+1, len(ip.Inputs), input)
}
source, err := ip.Experiment.MeasureAsync(ctx, input)
if err != nil {
return 0, err
}
// NOTE: we don't want to intermix measuring with submitting
// therefore we collect all measurements first
for meas := range source {
measurements = append(measurements, meas)
}
for _, meas := range measurements {
meas.AddAnnotations(ip.Annotations)
meas.Options = ip.Options
err = ip.Submitter.Submit(ctx, idx, meas)
if err != nil {
return 0, err
}
// Note: must be after submission because submission modifies
// the measurement to include the report ID.
err = ip.Saver.SaveMeasurement(idx, meas)
if err != nil {
return 0, err
}
submitter.Submit(idx, meas)
}
}
return stopNormal, nil
}

// inputWithIndex combines an input with its index.
type inputWithIndex struct {
// idx is the index
idx int

// input contains the URL input
input string
}

// generateInputs returns a channel where each input is emitted.
func (ip *InputProcessor) generateInputs() <-chan *inputWithIndex {
out := make(chan *inputWithIndex)
go func() {
defer close(out)
for idx, url := range ip.Inputs {
out <- &inputWithIndex{
idx: idx,
input: url.URL,
}
}
}()
return out
}
2 changes: 1 addition & 1 deletion internal/engine/probeservices/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func NewSubmitter(opener ReportOpener, logger model.Logger) *Submitter {

// Submit submits the current measurement to the OONI backend created using
// the ReportOpener passed to the constructor.
func (sub *Submitter) Submit(ctx context.Context, m *model.Measurement) error {
func (sub *Submitter) Submit(ctx context.Context, idx int, m *model.Measurement) error {
var err error
sub.mu.Lock()
defer sub.mu.Unlock()
Expand Down
96 changes: 93 additions & 3 deletions internal/engine/saver.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,104 @@
package engine

import (
"context"
"errors"
"sync"

"github.com/ooni/probe-cli/v3/internal/atomicx"
"github.com/ooni/probe-cli/v3/internal/model"
)

// AsyncSaver is a an asynchronous [Saver].
type AsyncSaver interface {
// SaveMeasurement requests the saver to save a measurement. The return
// value indicates whether the measurement has been accepted by the
// saver or whether it has been rejected. The saver will reject
// measurements after you've called its Stop method.
SaveMeasurement(idx int, m *model.Measurement) bool

// Stop tells the saver that it should stop running as soon
// as possible, which entails trying to finish saving all the queued
// measurements to avoid losing them. Use Wait to know when the
// saver has finished saver all measurements.
Stop()

// Wait waits for the saver to finish saving all the
// measurements that are currently queued for writing. Note
// that you should call Stop before calling Wait to inform
// the saver that it should stop running ASAP.
Wait()
}

// Saver saves a measurement on some persistent storage.
type Saver interface {
SaveMeasurement(m *model.Measurement) error
SaveMeasurement(idx int, m *model.Measurement) error
}

// asyncSaver implements AsyncSaver.
type asyncSaver struct {
// logger is the Logger to use.
logger model.Logger

// queue is the queue containing measurements to submit.
queue chan *measurementWithIndex

// running tracks the running goroutines.
running *sync.WaitGroup

// stopped indicates that the submitter has stopped.
stopped *atomicx.Int64
}

// StartAsyncSaver creates a new [AsyncSaver] with the given [Saver]. We'll
// run the [AsyncSaver] in a background goroutine. You should call Stop to tell
// it it's time to stop and Wait to wait for it to complete.
func StartAsyncSaver(saver Saver) AsyncSaver {
as := &asyncSaver{
logger: nil,
queue: make(chan *measurementWithIndex),
running: &sync.WaitGroup{},
stopped: &atomicx.Int64{},
}
go as.run(context.Background(), saver)
as.running.Add(1)
return as
}

// SaveMeasurement implements AsyncSaver.SaveMeasurement.
func (as *asyncSaver) SaveMeasurement(idx int, m *model.Measurement) bool {
if as.stopped.Load() > 0 {
return false
}
as.queue <- &measurementWithIndex{
idx: idx,
m: m,
}
return true
}

// run saves measurements in FIFO order.
func (as *asyncSaver) run(ctx context.Context, saver Saver) {
defer as.running.Done()
for m := range as.queue {
// TODO(bassosimone): should we tell anyone about this error?
err := saver.SaveMeasurement(m.idx, m.m)
if err != nil {
as.logger.Warnf("asyncSaver: cannot save measurement: %s", err.Error())
continue
}
}
}

// Stop implements AsyncSaver.Stop.
func (as *asyncSaver) Stop() {
as.stopped.Add(1) // must happen BEFORE closing the channel
close(as.queue)
}

// Wait implements AsyncSaver.Wait.
func (as *asyncSaver) Wait() {
as.running.Wait()
}

// SaverConfig is the configuration for creating a new Saver.
Expand Down Expand Up @@ -49,7 +139,7 @@ func NewSaver(config SaverConfig) (Saver, error) {

type fakeSaver struct{}

func (fs fakeSaver) SaveMeasurement(m *model.Measurement) error {
func (fs fakeSaver) SaveMeasurement(idx int, m *model.Measurement) error {
return nil
}

Expand All @@ -61,7 +151,7 @@ type realSaver struct {
Logger model.Logger
}

func (rs realSaver) SaveMeasurement(m *model.Measurement) error {
func (rs realSaver) SaveMeasurement(idx int, m *model.Measurement) error {
rs.Logger.Info("saving measurement to disk")
return rs.Experiment.SaveMeasurement(m, rs.FilePath)
}
Expand Down
Loading