Skip to content

Commit d4ac9d7

Browse files
committed
add max duration
1 parent b9471c1 commit d4ac9d7

File tree

4 files changed

+99
-138
lines changed

4 files changed

+99
-138
lines changed

internal/verifier/mismatches.go

Lines changed: 91 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/binary"
66
"fmt"
7+
"time"
78

89
"github.com/10gen/migration-verifier/agg"
910
"github.com/10gen/migration-verifier/agg/accum"
@@ -14,7 +15,6 @@ import (
1415
"go.mongodb.org/mongo-driver/v2/bson"
1516
"go.mongodb.org/mongo-driver/v2/mongo"
1617
"go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore"
17-
"golang.org/x/exp/slices"
1818
)
1919

2020
const (
@@ -92,14 +92,20 @@ func createMismatchesCollection(ctx context.Context, db *mongo.Database) error {
9292
}
9393

9494
type recheckCounts struct {
95-
// FromMismatch are rechecks from previously-seen mismatches.
95+
// FromMismatch are rechecks in the given generation from mismatches
96+
// in the prior generation.
9697
FromMismatch int64
9798

98-
// FromChange are rechecks from previously-seen changes.
99+
// FromChange are rechecks from changes seen in the prior generation.
99100
FromChange int64
100101

101-
// NewMismatches are newly-seen mismatches that *will* require rechecks.
102+
// NewMismatches are mismatches seen thus far in the current generation
103+
// that will be rechecked in the next generation.
102104
NewMismatches int64
105+
106+
// MaxMismatchDuration indicates the longest-lived mismatch, among either
107+
// the current or the prior generation.
108+
MaxMismatchDuration time.Duration
103109
}
104110

105111
// NB: This is OK to call for generation==0. In this case it will only
@@ -119,13 +125,30 @@ func countRechecksForGeneration(
119125
{{"$match", bson.D{
120126
{"generation", bson.D{{"$in", []any{generation, generation - 1}}}},
121127
{"type", verificationTaskVerifyDocuments},
128+
129+
// Mismatches can exist only for “failed” tasks, or for tasks
130+
// that *will* be “failed” once they finish but are currently
131+
// “processing”.
132+
{"status", bson.D{{"$in", mslices.Of(
133+
verificationTaskProcessing,
134+
verificationTaskFailed,
135+
)}}},
122136
}}},
123137
{{"$lookup", bson.D{
124138
{"from", mismatchesCollectionName},
125139
{"localField", "_id"},
126140
{"foreignField", "task"},
127141
{"as", "mismatches"},
142+
{"pipeline", mongo.Pipeline{
143+
{{"$group", bson.D{
144+
{"_id", nil},
145+
146+
{"count", accum.Sum{1}},
147+
{"maxDurationMS", accum.Max{"$durationMS"}},
148+
}}},
149+
}},
128150
}}},
151+
{{"$unwind", "$mismatches"}},
129152
{{"$group", bson.D{
130153
{"_id", nil},
131154
{"allRechecks", accum.Sum{
@@ -138,17 +161,18 @@ func countRechecksForGeneration(
138161
{"rechecksFromMismatch", accum.Sum{
139162
agg.Cond{
140163
If: agg.Eq{"$generation", generation - 1},
141-
Then: agg.Size{"$mismatches"},
164+
Then: agg.Size{"$mismatches.count"},
142165
Else: 0,
143166
},
144167
}},
145168
{"newMismatches", accum.Sum{
146169
agg.Cond{
147170
If: agg.Eq{"$generation", generation},
148-
Then: agg.Size{"$mismatches"},
171+
Then: agg.Size{"$mismatches.count"},
149172
Else: 0,
150173
},
151174
}},
175+
{"maxMismatchDurationMS", accum.Max{"$count.maxDurationMS"}},
152176
}}},
153177
},
154178
)
@@ -163,14 +187,15 @@ func countRechecksForGeneration(
163187
return recheckCounts{}, errors.Wrap(err, "reading count of last generation’s found mismatches")
164188
}
165189

166-
// This happens if there were no tasks in the queried generations.
190+
// This happens if there were no failed or in-progress tasks in the queried generations.
167191
return recheckCounts{}, nil
168192
}
169193

170194
result := struct {
171-
AllRechecks int64
172-
RechecksFromMismatch int64
173-
NewMismatches int64
195+
AllRechecks int64
196+
RechecksFromMismatch int64
197+
NewMismatches int64
198+
MaxMismatchDurationMS int64
174199
}{}
175200

176201
err = cursor.Decode(&result)
@@ -189,9 +214,10 @@ func countRechecksForGeneration(
189214
}
190215

191216
return recheckCounts{
192-
FromMismatch: result.RechecksFromMismatch,
193-
FromChange: result.AllRechecks - result.RechecksFromMismatch,
194-
NewMismatches: result.NewMismatches,
217+
FromMismatch: result.RechecksFromMismatch,
218+
FromChange: result.AllRechecks - result.RechecksFromMismatch,
219+
NewMismatches: result.NewMismatches,
220+
MaxMismatchDuration: time.Duration(result.MaxMismatchDurationMS) * time.Millisecond,
195221
}, nil
196222
}
197223

@@ -257,63 +283,11 @@ func getMismatchesForTasks(
257283
return result, nil
258284
}
259285

260-
func getDocumentMismatchReportDataForGeneration(
261-
ctx context.Context,
262-
db *mongo.Database,
263-
generation int,
264-
limit int64,
265-
) (mismatchReportData, error) {
266-
reportData, err := getDocumentMismatchReportData(
267-
ctx,
268-
db.Collection(verificationTasksCollection),
269-
limit,
270-
mongo.Pipeline{
271-
{{"$match", bson.D{
272-
{"generation", generation},
273-
{"status", bson.D{{"$in", mslices.Of(
274-
verificationTaskProcessing,
275-
verificationTaskFailed,
276-
)}}},
277-
}}},
278-
{{"$lookup", bson.D{
279-
{"from", mismatchesCollectionName},
280-
{"localField", "_id"},
281-
{"foreignField", "task"},
282-
{"as", "mismatch"},
283-
}}},
284-
{{"$unwind", "$mismatch"}},
285-
{{"$replaceWith", "$mismatch"}},
286-
},
287-
)
288-
289-
return reportData, errors.Wrapf(err, "fetching generation %d’s discrepancies", generation)
290-
}
291-
292-
func getDocumentMismatchReportDataForTasks(
286+
func getDocumentMismatchReportData(
293287
ctx context.Context,
294288
db *mongo.Database,
295289
taskIDs []bson.ObjectID,
296290
limit int64,
297-
) (mismatchReportData, error) {
298-
reportData, err := getDocumentMismatchReportData(
299-
ctx,
300-
db.Collection(mismatchesCollectionName),
301-
limit,
302-
mongo.Pipeline{
303-
{{"$match", bson.D{
304-
{"task", bson.D{{"$in", taskIDs}}},
305-
}}},
306-
},
307-
)
308-
309-
return reportData, errors.Wrapf(err, "fetching %d tasks’ discrepancies", len(taskIDs))
310-
}
311-
312-
func getDocumentMismatchReportData(
313-
ctx context.Context,
314-
coll *mongo.Collection,
315-
limit int64,
316-
initialStages mongo.Pipeline,
317291
) (mismatchReportData, error) {
318292
// A filter to identify docs marked “missing” (on either src or dst)
319293
missingFilter := getMismatchDocMissingAggExpr("$$ROOT")
@@ -335,61 +309,61 @@ func getDocumentMismatchReportData(
335309

336310
contentDiffersFilter := agg.Not{missingFilter}
337311

338-
pl := append(
339-
slices.Clone(initialStages),
340-
mongo.Pipeline{
341-
{{"$sort", bson.D{
342-
{"detail.mismatchTimes.duration", -1},
343-
{"detail.id", 1},
344-
}}},
345-
{{"$facet", bson.D{
346-
{"counts", mongo.Pipeline{
347-
{{"$group", bson.D{
348-
{"_id", nil},
349-
350-
{"contentDiffers", accum.Sum{agg.Cond{
351-
If: contentDiffersFilter,
352-
Then: 1,
353-
Else: 0,
354-
}}},
355-
{"missingOnDst", accum.Sum{agg.Cond{
356-
If: missingOnDstFilter,
357-
Then: 1,
358-
Else: 0,
359-
}}},
360-
{"extraOnDst", accum.Sum{agg.Cond{
361-
If: extraOnDstFilter,
362-
Then: 1,
363-
Else: 0,
364-
}}},
312+
pl := mongo.Pipeline{
313+
{{"$match", bson.D{
314+
{"task", bson.D{{"$in", taskIDs}}},
315+
}}},
316+
{{"$sort", bson.D{
317+
{"detail.mismatchTimes.duration", -1},
318+
{"detail.id", 1},
319+
}}},
320+
{{"$facet", bson.D{
321+
{"counts", mongo.Pipeline{
322+
{{"$group", bson.D{
323+
{"_id", nil},
324+
325+
{"contentDiffers", accum.Sum{agg.Cond{
326+
If: contentDiffersFilter,
327+
Then: 1,
328+
Else: 0,
365329
}}},
366-
}},
367-
{"contentDiffers", mongo.Pipeline{
368-
{{"$match", bson.D{{"$expr", contentDiffersFilter}}}},
369-
{{"$limit", limit}},
370-
}},
371-
{"missingOnDst", mongo.Pipeline{
372-
{{"$match", bson.D{{"$expr", missingOnDstFilter}}}},
373-
{{"$limit", limit}},
374-
}},
375-
{"extraOnDst", mongo.Pipeline{
376-
{{"$match", bson.D{{"$expr", extraOnDstFilter}}}},
377-
{{"$limit", limit}},
378-
}},
379-
}}},
380-
{{"$addFields", bson.D{
381-
{"counts", agg.ArrayElemAt{
382-
Array: "$counts",
383-
Index: 0,
384-
}},
385-
}}},
386-
}...,
387-
)
330+
{"missingOnDst", accum.Sum{agg.Cond{
331+
If: missingOnDstFilter,
332+
Then: 1,
333+
Else: 0,
334+
}}},
335+
{"extraOnDst", accum.Sum{agg.Cond{
336+
If: extraOnDstFilter,
337+
Then: 1,
338+
Else: 0,
339+
}}},
340+
}}},
341+
}},
342+
{"contentDiffers", mongo.Pipeline{
343+
{{"$match", bson.D{{"$expr", contentDiffersFilter}}}},
344+
{{"$limit", limit}},
345+
}},
346+
{"missingOnDst", mongo.Pipeline{
347+
{{"$match", bson.D{{"$expr", missingOnDstFilter}}}},
348+
{{"$limit", limit}},
349+
}},
350+
{"extraOnDst", mongo.Pipeline{
351+
{{"$match", bson.D{{"$expr", extraOnDstFilter}}}},
352+
{{"$limit", limit}},
353+
}},
354+
}}},
355+
{{"$addFields", bson.D{
356+
{"counts", agg.ArrayElemAt{
357+
Array: "$counts",
358+
Index: 0,
359+
}},
360+
}}},
361+
}
388362

389-
cursor, err := coll.Aggregate(ctx, pl)
363+
cursor, err := db.Collection(mismatchesCollectionName).Aggregate(ctx, pl)
390364

391365
if err != nil {
392-
return mismatchReportData{}, errors.Wrap(err, "starting aggregation")
366+
return mismatchReportData{}, errors.Wrapf(err, "fetching %d tasks' discrepancies", len(taskIDs))
393367
}
394368

395369
var results []mismatchReportData

internal/verifier/progress.go

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -42,19 +42,20 @@ func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) {
4242
recheckStats, err := countRechecksForGeneration(
4343
egCtx,
4444
verifier.metaClient.Database(verifier.metaDBName),
45-
generation-1,
45+
generation,
4646
)
4747

4848
if err != nil {
49-
return errors.Wrapf(err, "counting mismatches seen during generation %d", generation-1)
49+
return errors.Wrapf(err, "counting mismatches seen during generation %d", generation)
5050
}
5151

52-
genStats.CurrentRechecks = option.Some(ProgressRechecks{
52+
genStats.PriorRechecks = option.Some(ProgressRechecks{
5353
Changes: recheckStats.FromChange,
5454
Mismatches: recheckStats.FromMismatch,
5555
})
5656

5757
genStats.MismatchesFound = recheckStats.NewMismatches
58+
genStats.MaxMismatchDuration = recheckStats.MaxMismatchDuration
5859

5960
return nil
6061
},
@@ -110,22 +111,6 @@ func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) {
110111
},
111112
)
112113

113-
eg.Go(
114-
func() error {
115-
reportData, err := getDocumentMismatchReportDataForGeneration(
116-
ctx,
117-
verifier.metaClient.Database(verifier.metaDBName),
118-
generation,
119-
1,
120-
)
121-
122-
if err != nil {
123-
return errors.Wrapf(err, "running mismatch report for generation %d", generation)
124-
}
125-
126-
},
127-
)
128-
129114
if err := eg.Wait(); err != nil {
130115
return Progress{Error: err}, err
131116
}

internal/verifier/summary.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ func (verifier *Verifier) reportDocumentMismatches(ctx context.Context, strBuild
130130
)
131131
failedTaskIDs := slices.Collect(maps.Keys(failedTaskMap))
132132

133-
reportData, err := getDocumentMismatchReportDataForTasks(
133+
reportData, err := getDocumentMismatchReportData(
134134
ctx,
135135
verifier.verificationDatabase(),
136136
failedTaskIDs,

internal/verifier/webserver.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,9 @@ type ProgressGenerationStats struct {
257257
SrcBytesCompared types.ByteCount `json:"srcBytesCompared"`
258258
TotalSrcBytes types.ByteCount `json:"totalSrcBytes,omitempty"`
259259

260-
CurrentRechecks option.Option[ProgressRechecks] `json:"currentRechecks"`
260+
PriorRechecks option.Option[ProgressRechecks] `json:"priorRechecks"`
261+
MismatchesFound int64 `json:"mismatchesFound"`
262+
MaxMismatchDuration time.Duration `json:"maxMismatchDuration"`
261263
}
262264

263265
type ProgressChangeStats struct {

0 commit comments

Comments
 (0)