Skip to content
46 changes: 40 additions & 6 deletions internal/verifier/change_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,24 @@ const (
changeReaderCollectionName = "changeReader"
)

type readerCurrentTimes struct {
LastResumeTime bson.Timestamp
LastClusterTime bson.Timestamp
}

func (rp readerCurrentTimes) Lag() time.Duration {
return time.Second * time.Duration(
int(rp.LastClusterTime.T)-int(rp.LastResumeTime.T),
)
}

type changeReader interface {
getWhichCluster() whichCluster
getReadChannel() <-chan changeEventBatch
getStartTimestamp() bson.Timestamp
getLastSeenClusterTime() option.Option[bson.Timestamp]
getEventsPerSecond() option.Option[float64]
getLag() option.Option[time.Duration]
getCurrentTimes() option.Option[readerCurrentTimes]
getBufferSaturation() float64
setWritesOff(bson.Timestamp)
start(context.Context, *errgroup.Group) error
Expand All @@ -64,9 +75,10 @@ type ChangeReaderCommon struct {

lastChangeEventTime *msync.TypedAtomic[option.Option[bson.Timestamp]]

currentTimes *msync.TypedAtomic[option.Option[readerCurrentTimes]]

startAtTs *bson.Timestamp

lag *msync.TypedAtomic[option.Option[time.Duration]]
batchSizeHistory *history.History[int]

onDDLEvent ddlEventHandling
Expand All @@ -77,7 +89,7 @@ func newChangeReaderCommon(clusterName whichCluster) ChangeReaderCommon {
readerType: clusterName,
changeEventBatchChan: make(chan changeEventBatch, batchChanBufferSize),
writesOffTs: util.NewEventual[bson.Timestamp](),
lag: msync.NewTypedAtomic(option.None[time.Duration]()),
currentTimes: msync.NewTypedAtomic(option.None[readerCurrentTimes]()),
lastChangeEventTime: msync.NewTypedAtomic(option.None[bson.Timestamp]()),
batchSizeHistory: history.New[int](time.Minute),
onDDLEvent: lo.Ternary(
Expand Down Expand Up @@ -123,11 +135,23 @@ func (rc *ChangeReaderCommon) getBufferSaturation() float64 {
return util.DivideToF64(len(rc.changeEventBatchChan), cap(rc.changeEventBatchChan))
}

func (rc *ChangeReaderCommon) getCurrentTimes() option.Option[readerCurrentTimes] {
return rc.currentTimes.Load()
}

/*
// getLag returns the observed change stream lag (i.e., the delta between
// cluster time and the most-recently-seen change event).
func (rc *ChangeReaderCommon) getLag() option.Option[time.Duration] {
return rc.lag.Load()
if prog, has := rc.progress.Load().Get(); has {
return option.Some(
time.Duration(int(prog.lastClusterTime.T)-int(prog.lastResumeTime.T)) * time.Second,
)
}

return option.None[time.Duration]()
}
*/

// getEventsPerSecond returns the number of change events per second we’ve been
// seeing “recently”. (See implementation for the actual period over which we
Expand Down Expand Up @@ -221,8 +245,18 @@ func (rc *ChangeReaderCommon) loadResumeToken(ctx context.Context) (option.Optio
func (rc *ChangeReaderCommon) updateLag(sess *mongo.Session, token bson.Raw) {
tokenTs, err := rc.resumeTokenTSExtractor(token)
if err == nil {
lagSecs := int64(sess.OperationTime().T) - int64(tokenTs.T)
rc.lag.Store(option.Some(time.Second * time.Duration(lagSecs)))
cTime, err := util.GetClusterTimeFromSession(sess)
if err != nil {
rc.logger.Warn().
Err(err).
Str("reader", string(rc.getWhichCluster())).
Msg("Failed to extract cluster time from session.")
} else {
rc.currentTimes.Store(option.Some(readerCurrentTimes{
LastResumeTime: tokenTs,
LastClusterTime: cTime,
}))
}
} else {
rc.logger.Warn().
Err(err).
Expand Down
4 changes: 2 additions & 2 deletions internal/verifier/change_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamLag() {
verifierRunner.AwaitGenerationEnd(),
)

return verifier.srcChangeReader.getLag().IsSome()
return verifier.srcChangeReader.getCurrentTimes().IsSome()
},
time.Minute,
100*time.Millisecond,
Expand All @@ -602,7 +602,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamLag() {
// NB: The lag will include whatever time elapsed above before
// verifier read the event, so it can be several seconds.
suite.Assert().Less(
verifier.srcChangeReader.getLag().MustGet(),
verifier.srcChangeReader.getCurrentTimes().MustGet().Lag(),
10*time.Minute,
"verifier lag is as expected",
)
Expand Down
6 changes: 0 additions & 6 deletions internal/verifier/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,6 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh

verifier.logger.Debug().Msg("Starting Check")

verifier.phase = Check
defer func() {
verifier.phase = Idle
}()

if err := verifier.startChangeHandling(ctx); err != nil {
return err
}
Expand Down Expand Up @@ -362,7 +357,6 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh
// on enqueued rechecks. Meanwhile, generaiton 3’s recheck tasks will
// derive from rechecks enqueued during generation 2.
verifier.generation++
verifier.phase = Recheck
verifier.mux.Unlock()

// Generation of recheck tasks can partial-fail. The following will
Expand Down
24 changes: 9 additions & 15 deletions internal/verifier/migration_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ type Verifier struct {
lastGeneration bool
running bool
generation int
phase string
port int
metaURI string
metaClient *mongo.Client
Expand Down Expand Up @@ -181,7 +180,6 @@ func NewVerifier(settings VerifierSettings, logPath string) *Verifier {
logger: logger,
writer: logWriter,

phase: Idle,
numWorkers: NumWorkers,
readPreference: readpref.Primary(),
partitionSizeInBytes: 400 * 1024 * 1024,
Expand Down Expand Up @@ -1255,9 +1253,17 @@ func (verifier *Verifier) verifyMetadataAndPartitionCollection(
}

func (verifier *Verifier) GetVerificationStatus(ctx context.Context) (*VerificationStatus, error) {
taskCollection := verifier.verificationTaskCollection()
generation, _ := verifier.getGeneration()

return verifier.getVerificationStatusForGeneration(ctx, generation)
}

func (verifier *Verifier) getVerificationStatusForGeneration(
ctx context.Context,
generation int,
) (*VerificationStatus, error) {
taskCollection := verifier.verificationTaskCollection()

var results []bson.Raw

err := retry.New().WithCallback(
Expand Down Expand Up @@ -1396,18 +1402,6 @@ func (verifier *Verifier) StartServer() error {
return server.Run(context.Background())
}

func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) {
status, err := verifier.GetVerificationStatus(ctx)
if err != nil {
return Progress{Error: err}, err
}
return Progress{
Phase: verifier.phase,
Generation: verifier.generation,
Status: status,
}, nil
}

// Returned boolean indicates that namespaces are cached, and
// whatever needs them can proceed.
func (verifier *Verifier) ensureNamespaces(ctx context.Context) bool {
Expand Down
10 changes: 7 additions & 3 deletions internal/verifier/mismatches.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func countMismatchesForTasks(
ctx context.Context,
db *mongo.Database,
taskIDs []bson.ObjectID,
filter bson.D,
filter any,
) (int64, int64, error) {
cursor, err := db.Collection(mismatchesCollectionName).Aggregate(
ctx,
Expand Down Expand Up @@ -116,8 +116,12 @@ func countMismatchesForTasks(
return 0, 0, errors.Wrap(err, "reading mismatch counts")
}

if len(got) != 1 {
return 0, 0, fmt.Errorf("unexpected mismatch count result: %+v", got)
switch len(got) {
case 0:
return 0, 0, nil
case 1:
default:
return 0, 0, fmt.Errorf("unexpected mismatch count (%d) result: %+v", len(got), got)
}

totalRV, err := got[0].LookupErr("total")
Expand Down
174 changes: 174 additions & 0 deletions internal/verifier/progress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package verifier

import (
"context"
"time"

"github.com/10gen/migration-verifier/contextplus"
"github.com/10gen/migration-verifier/internal/types"
"github.com/10gen/migration-verifier/mslices"
"github.com/pkg/errors"
"github.com/samber/lo"
"go.mongodb.org/mongo-driver/v2/bson"
)

func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) {
verifier.mux.RLock()
defer verifier.mux.RUnlock()

var vStatus *VerificationStatus

generation := verifier.generation

progressTime := time.Now()
genElapsed := progressTime.Sub(verifier.generationStartTime)

genStats := ProgressGenerationStats{
TimeElapsed: genElapsed.String(),
}

eg, egCtx := contextplus.ErrGroup(ctx)
eg.Go(
func() error {
var err error
vStatus, err = verifier.getVerificationStatusForGeneration(egCtx, generation)

return errors.Wrapf(err, "fetching generation %d’s tasks’ status", generation)
},
)
eg.Go(
func() error {
var err error
nsStats, err := verifier.GetPersistedNamespaceStatisticsForGeneration(ctx, generation)

if err != nil {
return errors.Wrapf(err, "fetching generation %d’s persisted namespace stats", generation)
}

var totalDocs, comparedDocs types.DocumentCount
var totalBytes, comparedBytes types.ByteCount
var totalNss, completedNss types.NamespaceCount

for _, result := range nsStats {
totalDocs += result.TotalDocs
comparedDocs += result.DocsCompared
totalBytes += result.TotalBytes
comparedBytes += result.BytesCompared

totalNss++
if result.PartitionsDone > 0 {
partitionsPending := result.PartitionsAdded + result.PartitionsProcessing
if partitionsPending == 0 {
completedNss++
}
}
}

var activeWorkers int
perNamespaceWorkerStats := verifier.getPerNamespaceWorkerStats()
for _, nsWorkerStats := range perNamespaceWorkerStats {
for _, workerStats := range nsWorkerStats {
activeWorkers++
comparedDocs += workerStats.SrcDocCount
comparedBytes += workerStats.SrcByteCount
}
}

genStats.DocsCompared = comparedDocs
genStats.TotalDocs = totalDocs

genStats.SrcBytesCompared = comparedBytes
genStats.TotalSrcBytes = totalBytes

genStats.ActiveWorkers = activeWorkers

return nil
},
)
eg.Go(
func() error {
failedTasks, incompleteTasks, err := FetchFailedAndIncompleteTasks(
ctx,
verifier.logger,
verifier.verificationTaskCollection(),
verificationTaskVerifyDocuments,
generation,
)
if err != nil {
return errors.Wrapf(err, "fetching generation %d’s failed & incomplete tasks", generation)
}

taskIDsToQuery := lo.Map(
lo.Flatten(mslices.Of(failedTasks, incompleteTasks)),
func(ft VerificationTask, _ int) bson.ObjectID {
return ft.PrimaryKey
},
)

mismatchCount, _, err := countMismatchesForTasks(
egCtx,
verifier.verificationDatabase(),
taskIDsToQuery,
true,
)
if err != nil {
return errors.Wrapf(err, "counting mismatches seen during generation %d", generation)
}

genStats.MismatchesFound = mismatchCount

return nil
},
)
eg.Go(
func() error {
recheckColl := verifier.getRecheckQueueCollection(1 + generation)
count, err := recheckColl.EstimatedDocumentCount(ctx)
if err != nil {
return errors.Wrapf(err, "counting rechecks enqueued during generation %d", generation)
}

genStats.RechecksEnqueued = count

return nil
},
)

if err := eg.Wait(); err != nil {
return Progress{Error: err}, err
}

return Progress{
Phase: lo.Ternary(
verifier.running,
lo.Ternary(generation > 0, Recheck, Check),
Idle,
),
Generation: verifier.generation,
GenerationStats: genStats,
SrcChangeStats: ProgressChangeStats{
EventsPerSecond: verifier.srcChangeReader.getEventsPerSecond(),
CurrentTimes: verifier.srcChangeReader.getCurrentTimes(),
BufferSaturation: verifier.srcChangeReader.getBufferSaturation(),
},
DstChangeStats: ProgressChangeStats{
EventsPerSecond: verifier.dstChangeReader.getEventsPerSecond(),
CurrentTimes: verifier.dstChangeReader.getCurrentTimes(),
BufferSaturation: verifier.dstChangeReader.getBufferSaturation(),
},
Status: vStatus,
}, nil

}

/*
func optDurationToOptString(dur option.Option[time.Duration]) option.Option[string] {
var ret option.Option[string]

if dur, has := dur.Get(); has {
ret = option.Some(dur.String())
}

return ret
}
*/
7 changes: 7 additions & 0 deletions internal/verifier/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,13 @@ var jsonTemplate *template.Template
func (verifier *Verifier) GetPersistedNamespaceStatistics(ctx context.Context) ([]NamespaceStats, error) {
generation, _ := verifier.getGeneration()

return verifier.GetPersistedNamespaceStatisticsForGeneration(ctx, generation)
}

func (verifier *Verifier) GetPersistedNamespaceStatisticsForGeneration(
ctx context.Context,
generation int,
) ([]NamespaceStats, error) {
templateOnce.Do(func() {
tmpl, err := template.New("").Parse(perNsStatsPipelineTemplate)
if err != nil {
Expand Down
Loading