Skip to content

Commit 67c0de8

Browse files
authored
Optimize BSON for rechecks & mismatches. (#165)
This optimizes a few BSON marshal/unmarshal operations: - Unmarshaling document rechecks - This optimizes generation of recheck tasks, which entails reading & unmarshaling rechecks in series. - Marshaling VerificationResult and MismatchInfo - This optimizes behavior when there are lots of mismatches. This also includes a new UnmarshalCursor function that works like mongo.Cursor.All but for structs that implement UnmarshalFromBSON (and forbid bson.Unmarshal). It also fixes 2 intermittently-failing tests: - TestChangeStream_Resume_NoSkip - TestRecheckDocsWithDstChangeEvents
1 parent c5053a4 commit 67c0de8

File tree

14 files changed

+509
-99
lines changed

14 files changed

+509
-99
lines changed

internal/util/cluster_time.go

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,19 @@
11
package util
22

33
import (
4+
"github.com/10gen/migration-verifier/mbson"
45
"github.com/pkg/errors"
56
"go.mongodb.org/mongo-driver/v2/bson"
67
"go.mongodb.org/mongo-driver/v2/mongo"
78
)
89

910
func GetClusterTimeFromSession(sess *mongo.Session) (bson.Timestamp, error) {
10-
ctStruct := struct {
11-
ClusterTime struct {
12-
ClusterTime bson.Timestamp `bson:"clusterTime"`
13-
} `bson:"$clusterTime"`
14-
}{}
15-
1611
clusterTimeRaw := sess.ClusterTime()
17-
err := bson.Unmarshal(sess.ClusterTime(), &ctStruct)
12+
13+
ctrv, err := clusterTimeRaw.LookupErr("$clusterTime", "clusterTime")
1814
if err != nil {
19-
return bson.Timestamp{}, errors.Wrapf(err, "failed to find clusterTime in session cluster time document (%v)", clusterTimeRaw)
15+
return bson.Timestamp{}, errors.Wrapf(err, "finding clusterTime in session cluster time document (%v)", clusterTimeRaw)
2016
}
2117

22-
return ctStruct.ClusterTime.ClusterTime, nil
18+
return mbson.CastRawValue[bson.Timestamp](ctrv)
2319
}

internal/verifier/change_stream_test.go

Lines changed: 49 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525
"go.mongodb.org/mongo-driver/v2/bson"
2626
"go.mongodb.org/mongo-driver/v2/mongo"
2727
"go.mongodb.org/mongo-driver/v2/mongo/options"
28-
"go.mongodb.org/mongo-driver/v2/mongo/readconcern"
2928
)
3029

3130
func (suite *IntegrationTestSuite) TestChangeStreamFilter_NoNamespaces() {
@@ -255,12 +254,6 @@ func (suite *IntegrationTestSuite) TestChangeStream_Resume_NoSkip() {
255254

256255
verifier1 := suite.BuildVerifier()
257256

258-
// Use of linearizable read concern below seems to freeze pre-4.4 servers.
259-
srcVersion := verifier1.srcClusterInfo.VersionArray
260-
if util.CmpMinorVersions([2]int(srcVersion), [2]int{4, 4}) == -1 {
261-
suite.T().Skipf("Source version (%v) is too old for this test.", srcVersion)
262-
}
263-
264257
srcDB := verifier1.srcClient.Database(suite.DBNameForTest())
265258
srcColl := srcDB.Collection("coll")
266259

@@ -289,9 +282,10 @@ func (suite *IntegrationTestSuite) TestChangeStream_Resume_NoSkip() {
289282
"should see a change stream resume token persisted",
290283
)
291284

292-
insertCtx, cancelInserts := contextplus.WithCancelCause(ctx)
293-
defer cancelInserts(ctx.Err())
285+
stopInserts := make(chan struct{})
294286
insertsDone := make(chan struct{})
287+
288+
var insertedIDs []any
295289
go func() {
296290
defer func() {
297291
close(insertsDone)
@@ -302,21 +296,25 @@ func (suite *IntegrationTestSuite) TestChangeStream_Resume_NoSkip() {
302296
)
303297
require.NoError(suite.T(), err)
304298

305-
sessCtx := mongo.NewSessionContext(insertCtx, sess)
299+
sessCtx := mongo.NewSessionContext(ctx, sess)
306300

307-
docID := int32(1)
308301
for {
309-
_, err := srcColl.InsertOne(
302+
select {
303+
case <-ctx.Done():
304+
return
305+
case <-stopInserts:
306+
return
307+
default:
308+
}
309+
310+
inserted, err := srcColl.InsertOne(
310311
sessCtx,
311-
bson.D{{"_id", docID}},
312+
bson.D{},
312313
)
313314

314-
if err != nil {
315-
require.ErrorIs(suite.T(), err, context.Canceled)
316-
return
317-
}
315+
require.NoError(suite.T(), err, "should insert")
318316

319-
docID++
317+
insertedIDs = append(insertedIDs, inserted.InsertedID)
320318
}
321319
}()
322320

@@ -340,58 +338,43 @@ func (suite *IntegrationTestSuite) TestChangeStream_Resume_NoSkip() {
340338
verifier2 := suite.BuildVerifier()
341339
suite.startSrcChangeStreamReaderAndHandler(ctx, verifier2)
342340

343-
cancelInserts(fmt.Errorf("verifier2 started"))
341+
close(stopInserts)
344342
<-insertsDone
345343

346-
lastIDRes := srcColl.Database().Collection(
347-
srcColl.Name(),
348-
options.Collection().SetReadConcern(readconcern.Linearizable()),
349-
).FindOne(
350-
ctx,
351-
bson.D{},
352-
options.FindOne().
353-
SetSort(bson.D{{"_id", -1}}),
354-
)
355-
require.NoError(suite.T(), lastIDRes.Err())
356-
357-
lastDocID := lo.Must(lo.Must(lastIDRes.Raw()).LookupErr("_id")).Int32()
358-
344+
var foundIDs []any
359345
assert.Eventually(
360346
suite.T(),
361347
func() bool {
362348
rechecks := suite.fetchPendingVerifierRechecks(ctx, verifier2)
363349

364-
return lo.SomeBy(
350+
foundIDs = lo.Map(
365351
rechecks,
366-
func(cur bson.M) bool {
352+
func(cur bson.M, _ int) any {
367353
id := cur["_id"].(bson.D)
368354

369355
for _, el := range id {
370-
if el.Key != "docID" {
371-
continue
356+
if el.Key == "docID" {
357+
return el.Value
372358
}
373-
374-
return el.Value.(int32) == lastDocID
375359
}
376360

377361
panic(fmt.Sprintf("no docID in _id: %+v", id))
378362
},
379363
)
364+
365+
return len(foundIDs) == len(insertedIDs)
380366
},
381367
time.Minute,
382368
100*time.Millisecond,
383369
"last-inserted doc shows as recheck",
384370
)
385371

386-
sess := lo.Must(verifier2.verificationDatabase().Client().StartSession())
387-
sctx := mongo.NewSessionContext(ctx, sess)
388-
389-
rechecks := suite.fetchPendingVerifierRechecks(sctx, verifier2)
390-
if !assert.EqualValues(suite.T(), lastDocID, len(rechecks), "all source docs should be rechecked") {
391-
for _, recheck := range rechecks {
392-
suite.T().Logf("found recheck: %v", recheck)
393-
}
394-
}
372+
assert.ElementsMatch(
373+
suite.T(),
374+
insertedIDs,
375+
foundIDs,
376+
"all source docs should be rechecked",
377+
)
395378
}
396379

397380
// TestChangeStreamResumability creates a verifier, starts its change stream,
@@ -1069,15 +1052,28 @@ func (suite *IntegrationTestSuite) TestRecheckDocsWithDstChangeEvents() {
10691052
require.Eventually(
10701053
suite.T(),
10711054
func() bool {
1072-
recheckColl := verifier.getRecheckQueueCollection(1 + verifier.generation)
1073-
cursor, err := recheckColl.Find(ctx, bson.D{})
1074-
if errors.Is(err, mongo.ErrNoDocuments) {
1055+
rechecksM := suite.fetchPendingVerifierRechecks(ctx, verifier)
1056+
1057+
if len(rechecksM) < 3 {
10751058
return false
10761059
}
10771060

1078-
suite.Require().NoError(err)
1079-
suite.Require().NoError(cursor.All(ctx, &rechecks))
1080-
return len(rechecks) == 3
1061+
require.Len(suite.T(), rechecksM, 3)
1062+
1063+
rechecks = lo.Map(
1064+
rechecksM,
1065+
func(m bson.M, _ int) recheck.Doc {
1066+
raw, err := bson.Marshal(m)
1067+
require.NoError(suite.T(), err)
1068+
1069+
doc := recheck.Doc{}
1070+
require.NoError(suite.T(), (&doc).UnmarshalFromBSON(raw))
1071+
1072+
return doc
1073+
},
1074+
)
1075+
1076+
return true
10811077
},
10821078
time.Minute,
10831079
500*time.Millisecond,

internal/verifier/mismatches.go

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,16 @@ package verifier
22

33
import (
44
"context"
5+
"encoding/binary"
6+
"fmt"
57

68
"github.com/10gen/migration-verifier/option"
79
"github.com/pkg/errors"
810
"github.com/samber/lo"
911
"go.mongodb.org/mongo-driver/v2/bson"
1012
"go.mongodb.org/mongo-driver/v2/mongo"
1113
"go.mongodb.org/mongo-driver/v2/mongo/options"
14+
"go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore"
1215
)
1316

1417
const (
@@ -20,6 +23,36 @@ type MismatchInfo struct {
2023
Detail VerificationResult
2124
}
2225

26+
var _ bson.Marshaler = MismatchInfo{}
27+
28+
func (mi MismatchInfo) MarshalBSON() ([]byte, error) {
29+
panic("Use MarshalToBSON().")
30+
}
31+
32+
func (mi MismatchInfo) MarshalToBSON() []byte {
33+
detail := mi.Detail.MarshalToBSON()
34+
35+
bsonLen := 4 + // header
36+
1 + 4 + 1 + len(bson.ObjectID{}) + // Task
37+
1 + 6 + 1 + len(detail) + // Detail
38+
1 // NUL
39+
40+
buf := make(bson.Raw, 4, bsonLen)
41+
42+
binary.LittleEndian.PutUint32(buf, uint32(bsonLen))
43+
44+
buf = bsoncore.AppendObjectIDElement(buf, "task", mi.Task)
45+
buf = bsoncore.AppendDocumentElement(buf, "detail", detail)
46+
47+
buf = append(buf, 0)
48+
49+
if len(buf) != bsonLen {
50+
panic(fmt.Sprintf("%T BSON length is %d but expected %d", mi, len(buf), bsonLen))
51+
}
52+
53+
return buf
54+
}
55+
2356
func createMismatchesCollection(ctx context.Context, db *mongo.Database) error {
2457
_, err := db.Collection(mismatchesCollectionName).Indexes().CreateMany(
2558
ctx,
@@ -108,7 +141,7 @@ func recordMismatches(
108141
Document: MismatchInfo{
109142
Task: taskID,
110143
Detail: r,
111-
},
144+
}.MarshalToBSON(),
112145
}
113146
},
114147
)
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package verifier
2+
3+
import (
4+
"testing"
5+
6+
"github.com/10gen/migration-verifier/mbson"
7+
"github.com/10gen/migration-verifier/option"
8+
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/require"
10+
"go.mongodb.org/mongo-driver/v2/bson"
11+
)
12+
13+
func TestMismatchesInfoMarshal(t *testing.T) {
14+
results := []VerificationResult{
15+
{},
16+
{
17+
ID: mbson.ToRawValue("hello"),
18+
},
19+
{
20+
SrcTimestamp: option.Some(bson.Timestamp{12, 23}),
21+
},
22+
{
23+
DstTimestamp: option.Some(bson.Timestamp{12, 23}),
24+
},
25+
{
26+
ID: mbson.ToRawValue("hello"),
27+
Field: "yeah",
28+
Cluster: "hoho",
29+
Details: "aaa",
30+
NameSpace: "xxxxx",
31+
SrcTimestamp: option.Some(bson.Timestamp{12, 23}),
32+
DstTimestamp: option.Some(bson.Timestamp{23, 24}),
33+
},
34+
}
35+
36+
for _, result := range results {
37+
mi := MismatchInfo{
38+
Task: bson.NewObjectID(),
39+
Detail: result,
40+
}
41+
42+
raw := mi.MarshalToBSON()
43+
44+
var rt MismatchInfo
45+
require.NoError(t, bson.Unmarshal(raw, &rt))
46+
47+
assert.Equal(t, mi, rt, "should round-trip")
48+
}
49+
}

internal/verifier/recheck.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,7 @@ func (verifier *Verifier) insertRecheckDocs(
139139
DataSize: dataSizes[i],
140140
}
141141

142-
recheckRaw, err := recheckDoc.MarshalToBSON()
143-
if err != nil {
144-
return errors.Wrapf(err, "marshaling recheck for %#q", dbName+"."+collNames[i])
145-
}
142+
recheckRaw := recheckDoc.MarshalToBSON()
146143

147144
curRechecks = append(
148145
curRechecks,
@@ -339,15 +336,12 @@ func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error {
339336
// subject to a 16MB limit on group size.
340337
for cursor.Next(ctx) {
341338
var doc recheck.Doc
342-
err = cursor.Decode(&doc)
339+
err = (&doc).UnmarshalFromBSON(cursor.Current)
343340
if err != nil {
344341
return err
345342
}
346343

347-
idRaw, err := cursor.Current.LookupErr("_id", "docID")
348-
if err != nil {
349-
return errors.Wrapf(err, "failed to find docID in enqueued recheck %v", cursor.Current)
350-
}
344+
idRaw := doc.PrimaryKey.DocumentID
351345

352346
// We persist rechecks if any of these happen:
353347
// - the namespace has changed
@@ -395,7 +389,7 @@ func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error {
395389

396390
err = cursor.Err()
397391
if err != nil {
398-
return err
392+
return errors.Wrapf(err, "reading persisted rechecks")
399393
}
400394

401395
err = persistBufferedRechecks()

0 commit comments

Comments
 (0)