diff --git a/internal/verifier/change_reader.go b/internal/verifier/change_reader.go index 056fdbc5..e4245b56 100644 --- a/internal/verifier/change_reader.go +++ b/internal/verifier/change_reader.go @@ -31,13 +31,24 @@ const ( changeReaderCollectionName = "changeReader" ) +type readerCurrentTimes struct { + LastResumeTime bson.Timestamp + LastClusterTime bson.Timestamp +} + +func (rp readerCurrentTimes) Lag() time.Duration { + return time.Second * time.Duration( + int(rp.LastClusterTime.T)-int(rp.LastResumeTime.T), + ) +} + type changeReader interface { getWhichCluster() whichCluster getReadChannel() <-chan changeEventBatch getStartTimestamp() bson.Timestamp getLastSeenClusterTime() option.Option[bson.Timestamp] getEventsPerSecond() option.Option[float64] - getLag() option.Option[time.Duration] + getCurrentTimes() option.Option[readerCurrentTimes] getBufferSaturation() float64 setWritesOff(bson.Timestamp) start(context.Context, *errgroup.Group) error @@ -64,9 +75,10 @@ type ChangeReaderCommon struct { lastChangeEventTime *msync.TypedAtomic[option.Option[bson.Timestamp]] + currentTimes *msync.TypedAtomic[option.Option[readerCurrentTimes]] + startAtTs *bson.Timestamp - lag *msync.TypedAtomic[option.Option[time.Duration]] batchSizeHistory *history.History[int] onDDLEvent ddlEventHandling @@ -77,7 +89,7 @@ func newChangeReaderCommon(clusterName whichCluster) ChangeReaderCommon { readerType: clusterName, changeEventBatchChan: make(chan changeEventBatch, batchChanBufferSize), writesOffTs: util.NewEventual[bson.Timestamp](), - lag: msync.NewTypedAtomic(option.None[time.Duration]()), + currentTimes: msync.NewTypedAtomic(option.None[readerCurrentTimes]()), lastChangeEventTime: msync.NewTypedAtomic(option.None[bson.Timestamp]()), batchSizeHistory: history.New[int](time.Minute), onDDLEvent: lo.Ternary( @@ -123,11 +135,23 @@ func (rc *ChangeReaderCommon) getBufferSaturation() float64 { return util.DivideToF64(len(rc.changeEventBatchChan), cap(rc.changeEventBatchChan)) } +func (rc *ChangeReaderCommon) getCurrentTimes() option.Option[readerCurrentTimes] { + return rc.currentTimes.Load() +} + +/* // getLag returns the observed change stream lag (i.e., the delta between // cluster time and the most-recently-seen change event). func (rc *ChangeReaderCommon) getLag() option.Option[time.Duration] { - return rc.lag.Load() + if prog, has := rc.progress.Load().Get(); has { + return option.Some( + time.Duration(int(prog.lastClusterTime.T)-int(prog.lastResumeTime.T)) * time.Second, + ) + } + + return option.None[time.Duration]() } +*/ // getEventsPerSecond returns the number of change events per second we’ve been // seeing “recently”. (See implementation for the actual period over which we @@ -221,8 +245,18 @@ func (rc *ChangeReaderCommon) loadResumeToken(ctx context.Context) (option.Optio func (rc *ChangeReaderCommon) updateLag(sess *mongo.Session, token bson.Raw) { tokenTs, err := rc.resumeTokenTSExtractor(token) if err == nil { - lagSecs := int64(sess.OperationTime().T) - int64(tokenTs.T) - rc.lag.Store(option.Some(time.Second * time.Duration(lagSecs))) + cTime, err := util.GetClusterTimeFromSession(sess) + if err != nil { + rc.logger.Warn(). + Err(err). + Str("reader", string(rc.getWhichCluster())). + Msg("Failed to extract cluster time from session.") + } else { + rc.currentTimes.Store(option.Some(readerCurrentTimes{ + LastResumeTime: tokenTs, + LastClusterTime: cTime, + })) + } } else { rc.logger.Warn(). Err(err). diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index 62a39bca..69807dae 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -593,7 +593,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamLag() { verifierRunner.AwaitGenerationEnd(), ) - return verifier.srcChangeReader.getLag().IsSome() + return verifier.srcChangeReader.getCurrentTimes().IsSome() }, time.Minute, 100*time.Millisecond, @@ -602,7 +602,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamLag() { // NB: The lag will include whatever time elapsed above before // verifier read the event, so it can be several seconds. suite.Assert().Less( - verifier.srcChangeReader.getLag().MustGet(), + verifier.srcChangeReader.getCurrentTimes().MustGet().Lag(), 10*time.Minute, "verifier lag is as expected", ) diff --git a/internal/verifier/check.go b/internal/verifier/check.go index be003ba6..4f2fe703 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -246,11 +246,6 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh verifier.logger.Debug().Msg("Starting Check") - verifier.phase = Check - defer func() { - verifier.phase = Idle - }() - if err := verifier.startChangeHandling(ctx); err != nil { return err } @@ -362,7 +357,6 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh // on enqueued rechecks. Meanwhile, generaiton 3’s recheck tasks will // derive from rechecks enqueued during generation 2. verifier.generation++ - verifier.phase = Recheck verifier.mux.Unlock() // Generation of recheck tasks can partial-fail. The following will diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index e23445cb..0e741395 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -89,7 +89,6 @@ type Verifier struct { lastGeneration bool running bool generation int - phase string port int metaURI string metaClient *mongo.Client @@ -181,7 +180,6 @@ func NewVerifier(settings VerifierSettings, logPath string) *Verifier { logger: logger, writer: logWriter, - phase: Idle, numWorkers: NumWorkers, readPreference: readpref.Primary(), partitionSizeInBytes: 400 * 1024 * 1024, @@ -1255,9 +1253,17 @@ func (verifier *Verifier) verifyMetadataAndPartitionCollection( } func (verifier *Verifier) GetVerificationStatus(ctx context.Context) (*VerificationStatus, error) { - taskCollection := verifier.verificationTaskCollection() generation, _ := verifier.getGeneration() + return verifier.getVerificationStatusForGeneration(ctx, generation) +} + +func (verifier *Verifier) getVerificationStatusForGeneration( + ctx context.Context, + generation int, +) (*VerificationStatus, error) { + taskCollection := verifier.verificationTaskCollection() + var results []bson.Raw err := retry.New().WithCallback( @@ -1396,18 +1402,6 @@ func (verifier *Verifier) StartServer() error { return server.Run(context.Background()) } -func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) { - status, err := verifier.GetVerificationStatus(ctx) - if err != nil { - return Progress{Error: err}, err - } - return Progress{ - Phase: verifier.phase, - Generation: verifier.generation, - Status: status, - }, nil -} - // Returned boolean indicates that namespaces are cached, and // whatever needs them can proceed. func (verifier *Verifier) ensureNamespaces(ctx context.Context) bool { diff --git a/internal/verifier/mismatches.go b/internal/verifier/mismatches.go index 6cafb852..4a4002ba 100644 --- a/internal/verifier/mismatches.go +++ b/internal/verifier/mismatches.go @@ -85,7 +85,7 @@ func countMismatchesForTasks( ctx context.Context, db *mongo.Database, taskIDs []bson.ObjectID, - filter bson.D, + filter any, ) (int64, int64, error) { cursor, err := db.Collection(mismatchesCollectionName).Aggregate( ctx, @@ -116,8 +116,12 @@ func countMismatchesForTasks( return 0, 0, errors.Wrap(err, "reading mismatch counts") } - if len(got) != 1 { - return 0, 0, fmt.Errorf("unexpected mismatch count result: %+v", got) + switch len(got) { + case 0: + return 0, 0, nil + case 1: + default: + return 0, 0, fmt.Errorf("unexpected mismatch count (%d) result: %+v", len(got), got) } totalRV, err := got[0].LookupErr("total") diff --git a/internal/verifier/progress.go b/internal/verifier/progress.go new file mode 100644 index 00000000..419641d9 --- /dev/null +++ b/internal/verifier/progress.go @@ -0,0 +1,174 @@ +package verifier + +import ( + "context" + "time" + + "github.com/10gen/migration-verifier/contextplus" + "github.com/10gen/migration-verifier/internal/types" + "github.com/10gen/migration-verifier/mslices" + "github.com/pkg/errors" + "github.com/samber/lo" + "go.mongodb.org/mongo-driver/v2/bson" +) + +func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) { + verifier.mux.RLock() + defer verifier.mux.RUnlock() + + var vStatus *VerificationStatus + + generation := verifier.generation + + progressTime := time.Now() + genElapsed := progressTime.Sub(verifier.generationStartTime) + + genStats := ProgressGenerationStats{ + TimeElapsed: genElapsed.String(), + } + + eg, egCtx := contextplus.ErrGroup(ctx) + eg.Go( + func() error { + var err error + vStatus, err = verifier.getVerificationStatusForGeneration(egCtx, generation) + + return errors.Wrapf(err, "fetching generation %d’s tasks’ status", generation) + }, + ) + eg.Go( + func() error { + var err error + nsStats, err := verifier.GetPersistedNamespaceStatisticsForGeneration(ctx, generation) + + if err != nil { + return errors.Wrapf(err, "fetching generation %d’s persisted namespace stats", generation) + } + + var totalDocs, comparedDocs types.DocumentCount + var totalBytes, comparedBytes types.ByteCount + var totalNss, completedNss types.NamespaceCount + + for _, result := range nsStats { + totalDocs += result.TotalDocs + comparedDocs += result.DocsCompared + totalBytes += result.TotalBytes + comparedBytes += result.BytesCompared + + totalNss++ + if result.PartitionsDone > 0 { + partitionsPending := result.PartitionsAdded + result.PartitionsProcessing + if partitionsPending == 0 { + completedNss++ + } + } + } + + var activeWorkers int + perNamespaceWorkerStats := verifier.getPerNamespaceWorkerStats() + for _, nsWorkerStats := range perNamespaceWorkerStats { + for _, workerStats := range nsWorkerStats { + activeWorkers++ + comparedDocs += workerStats.SrcDocCount + comparedBytes += workerStats.SrcByteCount + } + } + + genStats.DocsCompared = comparedDocs + genStats.TotalDocs = totalDocs + + genStats.SrcBytesCompared = comparedBytes + genStats.TotalSrcBytes = totalBytes + + genStats.ActiveWorkers = activeWorkers + + return nil + }, + ) + eg.Go( + func() error { + failedTasks, incompleteTasks, err := FetchFailedAndIncompleteTasks( + ctx, + verifier.logger, + verifier.verificationTaskCollection(), + verificationTaskVerifyDocuments, + generation, + ) + if err != nil { + return errors.Wrapf(err, "fetching generation %d’s failed & incomplete tasks", generation) + } + + taskIDsToQuery := lo.Map( + lo.Flatten(mslices.Of(failedTasks, incompleteTasks)), + func(ft VerificationTask, _ int) bson.ObjectID { + return ft.PrimaryKey + }, + ) + + mismatchCount, _, err := countMismatchesForTasks( + egCtx, + verifier.verificationDatabase(), + taskIDsToQuery, + true, + ) + if err != nil { + return errors.Wrapf(err, "counting mismatches seen during generation %d", generation) + } + + genStats.MismatchesFound = mismatchCount + + return nil + }, + ) + eg.Go( + func() error { + recheckColl := verifier.getRecheckQueueCollection(1 + generation) + count, err := recheckColl.EstimatedDocumentCount(ctx) + if err != nil { + return errors.Wrapf(err, "counting rechecks enqueued during generation %d", generation) + } + + genStats.RechecksEnqueued = count + + return nil + }, + ) + + if err := eg.Wait(); err != nil { + return Progress{Error: err}, err + } + + return Progress{ + Phase: lo.Ternary( + verifier.running, + lo.Ternary(generation > 0, Recheck, Check), + Idle, + ), + Generation: verifier.generation, + GenerationStats: genStats, + SrcChangeStats: ProgressChangeStats{ + EventsPerSecond: verifier.srcChangeReader.getEventsPerSecond(), + CurrentTimes: verifier.srcChangeReader.getCurrentTimes(), + BufferSaturation: verifier.srcChangeReader.getBufferSaturation(), + }, + DstChangeStats: ProgressChangeStats{ + EventsPerSecond: verifier.dstChangeReader.getEventsPerSecond(), + CurrentTimes: verifier.dstChangeReader.getCurrentTimes(), + BufferSaturation: verifier.dstChangeReader.getBufferSaturation(), + }, + Status: vStatus, + }, nil + +} + +/* +func optDurationToOptString(dur option.Option[time.Duration]) option.Option[string] { + var ret option.Option[string] + + if dur, has := dur.Get(); has { + ret = option.Some(dur.String()) + } + + return ret +} +*/ diff --git a/internal/verifier/statistics.go b/internal/verifier/statistics.go index 73aa0725..29ad3038 100644 --- a/internal/verifier/statistics.go +++ b/internal/verifier/statistics.go @@ -179,6 +179,13 @@ var jsonTemplate *template.Template func (verifier *Verifier) GetPersistedNamespaceStatistics(ctx context.Context) ([]NamespaceStats, error) { generation, _ := verifier.getGeneration() + return verifier.GetPersistedNamespaceStatisticsForGeneration(ctx, generation) +} + +func (verifier *Verifier) GetPersistedNamespaceStatisticsForGeneration( + ctx context.Context, + generation int, +) ([]NamespaceStats, error) { templateOnce.Do(func() { tmpl, err := template.New("").Parse(perNsStatsPipelineTemplate) if err != nil { diff --git a/internal/verifier/summary.go b/internal/verifier/summary.go index d204ed88..01080bb5 100644 --- a/internal/verifier/summary.go +++ b/internal/verifier/summary.go @@ -587,10 +587,10 @@ func (verifier *Verifier) printChangeEventStatistics(builder io.Writer) { if eventsPerSec, has := cluster.csReader.getEventsPerSecond().Get(); has { var lagNote string - lag, hasLag := cluster.csReader.getLag().Get() + prog, hasProg := cluster.csReader.getCurrentTimes().Get() - if hasLag { - lagNote = fmt.Sprintf("lag: %s; ", reportutils.DurationToHMS(lag)) + if hasProg { + lagNote = fmt.Sprintf("lag: %s; ", reportutils.DurationToHMS(prog.Lag())) } saturation := cluster.csReader.getBufferSaturation() @@ -604,7 +604,7 @@ func (verifier *Verifier) printChangeEventStatistics(builder io.Writer) { reportutils.FmtReal(100*saturation), ) - if hasLag && lag > lagWarnThreshold { + if hasProg && prog.Lag() > lagWarnThreshold { fmt.Fprint( builder, "⚠️ Lag is excessive. Verification may fail. See documentation.\n", diff --git a/internal/verifier/webserver.go b/internal/verifier/webserver.go index 57c52624..7cbd5465 100644 --- a/internal/verifier/webserver.go +++ b/internal/verifier/webserver.go @@ -11,7 +11,9 @@ import ( "github.com/10gen/migration-verifier/contextplus" "github.com/10gen/migration-verifier/internal/logger" + "github.com/10gen/migration-verifier/internal/types" "github.com/10gen/migration-verifier/internal/verifier/webserver" + "github.com/10gen/migration-verifier/option" "github.com/gin-contrib/pprof" "github.com/gin-gonic/gin" "github.com/google/uuid" @@ -240,12 +242,38 @@ func (server *WebServer) writesOffEndpoint(c *gin.Context) { successResponse(c) } +type ProgressGenerationStats struct { + TimeElapsed string + ActiveWorkers int + + DocsCompared types.DocumentCount + TotalDocs types.DocumentCount + + SrcBytesCompared types.ByteCount + TotalSrcBytes types.ByteCount + + MismatchesFound int64 + RechecksEnqueued int64 +} + +type ProgressChangeStats struct { + EventsPerSecond option.Option[float64] + CurrentTimes option.Option[readerCurrentTimes] + BufferSaturation float64 +} + // Progress represents the structure of the JSON response from the Progress end point. type Progress struct { - Phase string `json:"phase"` - Generation int `json:"generation"` - Error error `json:"error"` - Status *VerificationStatus `json:"verificationStatus"` + Phase string + + Generation int + GenerationStats ProgressGenerationStats + + SrcChangeStats ProgressChangeStats + DstChangeStats ProgressChangeStats + + Error error + Status *VerificationStatus `json:"verificationStatus"` } // progressEndpoint implements the gin handle for the progress endpoint.