Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve initial metadata read using bson instead of unmarshal #88

Merged
merged 6 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 120 additions & 46 deletions lib/oplog/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,10 @@ type Tailer struct {
// Raw oplog entry from Mongo
type rawOplogEntry struct {
Timestamp primitive.Timestamp `bson:"ts"`
HistoryID int64 `bson:"h"`
MongoVersion int `bson:"v"`
Operation string `bson:"op"`
Namespace string `bson:"ns"`
Doc bson.Raw `bson:"o"`
Update rawOplogEntryID `bson:"o2"`
Update bson.Raw `bson:"o2"`
}

// Parsed Cursor Result
Expand All @@ -52,10 +50,6 @@ type cursorResultStatus struct {
DidLosePosition bool
}

type rawOplogEntryID struct {
ID interface{} `bson:"_id"`
}

const requeryDuration = time.Second

var (
Expand Down Expand Up @@ -214,7 +208,7 @@ func (tailer *Tailer) tailOnce(out []PublisherChannels, stop <-chan bool, readOr
continue
}

ts, pubs, sendMetricsData := tailer.unmarshalEntry(rawData, tailer.Denylist, readOrdinal)
ts, pubs, sendMetricsData := tailer.processEntry(rawData, readOrdinal)

if ts != nil {
lastTimestamp = *ts
Expand Down Expand Up @@ -366,16 +360,13 @@ func closeCursor(cursor *mongo.Cursor) {
}
}

// unmarshalEntry unmarshals a single entry from the oplog.
// processEntry processes a single entry from the oplog.
//
// The timestamp of the entry is returned so that tailOnce knows the timestamp of the last entry it read, even if it
// ignored it or failed at some later step.
func (tailer *Tailer) unmarshalEntry(rawData bson.Raw, denylist *sync.Map, readOrdinal int) (timestamp *primitive.Timestamp, pubs []*redispub.Publication, sendMetricsData func()) {
var result rawOplogEntry

err := bson.Unmarshal(rawData, &result)
if err != nil {
log.Log.Errorw("Error unmarshalling oplog entry", "error", err)
func (tailer *Tailer) processEntry(rawData bson.Raw, readOrdinal int) (timestamp *primitive.Timestamp, pubs []*redispub.Publication, sendMetricsData func()) {
result := tailer.unmarshalEntryMetadata(rawData)
if result == nil {
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this code, if there is any error in the unmarshal, we just returned here. Now, even if the various lookups or type checks fail, we log a message and continue. Should we just return if any step of this lookup process fails?

I'm also curious about when the previous unmarshal used to error. Did it error if the struct field type didn't match the type in the document? Probably. The change I proposed above deals with that. However, did it error if a matching key in the document for the struct wasn't found? I don't think so, and doing my change would mean different behavior in that case. I think looking into when bson unmarshalling errors and making sure behavior is the same as before (splitting the lookup and type check if necessary) is a good move.

Copy link
Contributor Author

@eparker-tulip eparker-tulip Nov 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did it error if the struct field type didn't match the type in the document?

Yes, I tested this. If a field was missing or extra, there was no error. Only if a field existed but with a different type would the original unmarshal error. So this tries to mostly follow that pattern, but does error on namespace which would seem to be a required field. However, maybe none are -- I don't know what namespace would be set to in the case of a transaction. In that case, splitting up the lookup and type conversion and exactly matching the old behavior is probably best -- I had started that way, but wanted to avoid it since it makes for terribly messy code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it does get gnarly. Maybe put the conversion from bson to rawOplogEntry in a helper function? That might help that, and also make it easier to make changes in the future if necessary,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would work -- I'd also have to pass in the denylist to allow for the early filtering, but that's probably fine

}

Expand All @@ -387,6 +378,10 @@ func (tailer *Tailer) unmarshalEntry(rawData bson.Raw, denylist *sync.Map, readO
status := "ignored"
database := "(no database)"
messageLen := float64(len(rawData))

if len(entries) > 0 {
database = entries[0].Database
}

sendMetricsData = func() {
// TODO: remove these in a future version
Expand All @@ -398,17 +393,6 @@ func (tailer *Tailer) unmarshalEntry(rawData bson.Raw, denylist *sync.Map, readO
metricLastReceivedStaleness.WithLabelValues(strconv.Itoa(readOrdinal)).Set(float64(time.Since(time.Unix(int64(timestamp.T), 0))))
}

if len(entries) > 0 {
database = entries[0].Database
}

if _, denied := denylist.Load(database); denied {
log.Log.Debugw("Skipping oplog entry", "database", database)
metricOplogEntriesFiltered.WithLabelValues(database).Add(1)

return
}

type errEntry struct {
err error
op *oplogEntry
Expand Down Expand Up @@ -483,8 +467,87 @@ func (tailer *Tailer) getStartTime(maxOrdinal int, getTimestampOfLastOplogEntry
return primitive.Timestamp{T: uint32(time.Now().Unix() << 32)}
}

func parseID(idRaw bson.RawValue) (id interface{}, err error) {
if idRaw.IsZero() {
log.Log.Error("failed to get objectId: _id is empty or not set")
err = errors.New("empty or missing objectId")
return
}
err = idRaw.Unmarshal(&id)
if err != nil {
log.Log.Errorf("failed to unmarshal objectId: %v", err)
}
return
}

// unmarshalEntryMetadata processes the top-level data from an entry and returns a rawOplogEntry object.
// This avoids using bson.Unmarshal on the whole document as that has very poor performance, even with the
// bson.Raw type to limit depth. While messy, using these raw bson methods here provides far better performance.
func (tailer *Tailer) unmarshalEntryMetadata(rawData bson.Raw) *rawOplogEntry {
var result rawOplogEntry
var ok bool
nsLookup, err := rawData.LookupErr("ns");
if err == nil {
result.Namespace, ok = nsLookup.StringValueOK()
if !ok {
// this means there was a type mismatch
log.Log.Error("Error unmarshalling oplog namespace entry")
return nil
}
}

// filter if db is in denylist
if len(result.Namespace) > 0 && result.Namespace != "admin.$cmd" {
db, _ := parseNamespace(result.Namespace)
if _, denied := tailer.Denylist.Load(db); denied {
log.Log.Debugw("Skipping oplog entry", "database", db)
metricOplogEntriesFiltered.WithLabelValues(db).Add(1)
return nil
}
}

tsLookup, err := rawData.LookupErr("ts")
if err == nil {
t, i, ok := tsLookup.TimestampOK()
if !ok {
log.Log.Error("Error unmarshalling oplog timestamp entry")
return nil
}
result.Timestamp = primitive.Timestamp{T: t, I: i}
}

opLookup, err := rawData.LookupErr("op")
if err == nil {
result.Operation, ok = opLookup.StringValueOK()
if !ok {
log.Log.Error("Error unmarshalling oplog operation entry")
return nil
}
}

oLookup, err := rawData.LookupErr("o")
if err == nil {
result.Doc, ok = oLookup.DocumentOK()
if !ok {
log.Log.Error("Error unmarshalling oplog document entry")
return nil
}
}

o2Lookup, err := rawData.LookupErr("o2")
if err == nil {
result.Update, ok = o2Lookup.DocumentOK()
if !ok {
log.Log.Error("Error unmarshalling oplog update entry")
return nil
}
}

return &result
}

// converts a rawOplogEntry to an oplogEntry
func (tailer *Tailer) parseRawOplogEntry(entry rawOplogEntry, txIdx *uint) []oplogEntry {
func (tailer *Tailer) parseRawOplogEntry(entry *rawOplogEntry, txIdx *uint) []oplogEntry {
if txIdx == nil {
idx := uint(0)
txIdx = &idx
Expand All @@ -505,19 +568,14 @@ func (tailer *Tailer) parseRawOplogEntry(entry rawOplogEntry, txIdx *uint) []opl

out.Database, out.Collection = parseNamespace(out.Namespace)

var errID error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: What about calling this just err? errID sounds like it stores an error ID, rather than it being the error for the ID parsing part.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is a pattern Yotam likes -- to be clear what each error variable is for, rather than just always err. But I'm fine either way, and I agree this could be confusing.

if out.Operation == operationUpdate {
out.DocID = entry.Update.ID
out.DocID, errID = parseID(entry.Update.Lookup("_id"))
} else {
idLookup := entry.Doc.Lookup("_id")
if idLookup.IsZero() {
log.Log.Error("failed to get objectId: _id is empty or not set")
return nil
}
err := idLookup.Unmarshal(&out.DocID)
if err != nil {
log.Log.Errorf("failed to unmarshal objectId: %v", err)
return nil
}
out.DocID, errID = parseID(entry.Doc.Lookup("_id"))
}
if errID != nil {
return nil
}

return []oplogEntry{out}
Expand All @@ -527,20 +585,36 @@ func (tailer *Tailer) parseRawOplogEntry(entry rawOplogEntry, txIdx *uint) []opl
return nil
}

var txData struct {
ApplyOps []rawOplogEntry `bson:"applyOps"`
applyOpsLookup, err := entry.Doc.LookupErr("applyOps")
if err != nil {
log.Log.Errorf("Looking up transaction data: %v", err)
return nil
}

applyOpsValues, ok := applyOpsLookup.ArrayOK()
if !ok {
log.Log.Error("Failed to access transaction data as array")
return nil
}

if err := bson.Unmarshal(entry.Doc, &txData); err != nil {
log.Log.Errorf("unmarshaling transaction data: %v", err)
applyOpsArray, err := applyOpsValues.Values()
if err != nil {
log.Log.Errorf("Getting transaction ops array: %v", err)
return nil
}

var ret []oplogEntry

for _, v := range txData.ApplyOps {
v.Timestamp = entry.Timestamp
ret = append(ret, tailer.parseRawOplogEntry(v, txIdx)...)
for _, rawEntry := range applyOpsArray {
rawDoc, ok := rawEntry.DocumentOK()
if ok {
v := tailer.unmarshalEntryMetadata(rawDoc)
if v != nil {
v.Timestamp = entry.Timestamp
ret = append(ret, tailer.parseRawOplogEntry(v, txIdx)...)
}
} else {
log.Log.Error("Getting transaction op doc")
}
}

return ret
Expand Down
40 changes: 17 additions & 23 deletions lib/oplog/tail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,6 @@ func TestGetStartTime(t *testing.T) {
}
}

func mustRaw(t *testing.T, data interface{}) bson.Raw {
b, err := bson.Marshal(data)
require.NoError(t, err)

var raw bson.Raw
require.NoError(t, bson.Unmarshal(b, &raw))

return raw
}

func TestParseRawOplogEntry(t *testing.T) {
tests := map[string]struct {
in rawOplogEntry
Expand All @@ -124,7 +114,7 @@ func TestParseRawOplogEntry(t *testing.T) {
Timestamp: primitive.Timestamp{T: 1234},
Operation: "i",
Namespace: "foo.Bar",
Doc: mustRaw(t, map[string]interface{}{"_id": "someid", "foo": "bar"}),
Doc: rawBson(t, map[string]interface{}{"_id": "someid", "foo": "bar"}),
},
want: []oplogEntry{{
Timestamp: primitive.Timestamp{T: 1234},
Expand All @@ -141,8 +131,8 @@ func TestParseRawOplogEntry(t *testing.T) {
Timestamp: primitive.Timestamp{T: 1234},
Operation: "u",
Namespace: "foo.Bar",
Doc: mustRaw(t, map[string]interface{}{"new": "data"}),
Update: rawOplogEntryID{ID: "updateid"},
Doc: rawBson(t, map[string]interface{}{"new": "data"}),
Update: rawBson(t, map[string]interface{}{"_id": "updateid"}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you used rawBson above and mustRaw in the other test cases. Why so? The pattern I notice seems to be that mustRaw is used for the input, and rawBson for the expected result. mustRaw seems weird actually. Other than what rawBson does, it checks if a byte array can be unmarshalled into bson.Raw, but that doesn't seem helpful, because they both have the same underlying type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, mustRaw was already there, and I added rawBson for the other tests, so when I had to update this I used the rawBson, but I see now they're basically the same so I replaced all the mustRaw with rawBson to simplify.

},
want: []oplogEntry{{
Timestamp: primitive.Timestamp{T: 1234},
Expand All @@ -159,7 +149,7 @@ func TestParseRawOplogEntry(t *testing.T) {
Timestamp: primitive.Timestamp{T: 1234},
Operation: "d",
Namespace: "foo.Bar",
Doc: mustRaw(t, map[string]interface{}{"_id": "someid"}),
Doc: rawBson(t, map[string]interface{}{"_id": "someid"}),
},
want: []oplogEntry{{
Timestamp: primitive.Timestamp{T: 1234},
Expand All @@ -176,7 +166,7 @@ func TestParseRawOplogEntry(t *testing.T) {
Timestamp: primitive.Timestamp{T: 1234},
Operation: "c",
Namespace: "foo.$cmd",
Doc: mustRaw(t, map[string]interface{}{"drop": "Foo"}),
Doc: rawBson(t, map[string]interface{}{"drop": "Foo"}),
},
want: nil,
},
Expand All @@ -185,47 +175,51 @@ func TestParseRawOplogEntry(t *testing.T) {
Timestamp: primitive.Timestamp{T: 1234},
Operation: "c",
Namespace: "admin.$cmd",
Doc: mustRaw(t, map[string]interface{}{
Doc: rawBson(t, map[string]interface{}{
"applyOps": []rawOplogEntry{
{
Timestamp: primitive.Timestamp{T: 1234},
Operation: "c",
Namespace: "admin.$cmd",
Doc: mustRaw(t, map[string]interface{}{
Doc: rawBson(t, map[string]interface{}{
"applyOps": []rawOplogEntry{
{
Operation: "i",
Namespace: "foo.Bar",
Doc: mustRaw(t, map[string]interface{}{
Doc: rawBson(t, map[string]interface{}{
"_id": "id1",
"foo": "baz",
}),
Update: rawBson(t, map[string]interface{}{}),
},
},
}),
Update: rawBson(t, map[string]interface{}{}),
},
{
Operation: "i",
Namespace: "foo.Bar",
Doc: mustRaw(t, map[string]interface{}{
Doc: rawBson(t, map[string]interface{}{
"_id": "id1",
"foo": "bar",
}),
Update: rawBson(t, map[string]interface{}{}),
},
{
Operation: "u",
Namespace: "foo.Bar",
Doc: mustRaw(t, map[string]interface{}{
Doc: rawBson(t, map[string]interface{}{
"foo": "quux",
}),
Update: rawOplogEntryID{"id2"},
Update: rawBson(t, map[string]interface{}{"_id": "id2"}),
},
{
Operation: "d",
Namespace: "foo.Bar",
Doc: mustRaw(t, map[string]interface{}{
Doc: rawBson(t, map[string]interface{}{
"_id": "id3",
}),
Update: rawBson(t, map[string]interface{}{}),
},
},
}),
Expand Down Expand Up @@ -287,7 +281,7 @@ func TestParseRawOplogEntry(t *testing.T) {

for testName, test := range tests {
t.Run(testName, func(t *testing.T) {
got := (&Tailer{Denylist: &sync.Map{}}).parseRawOplogEntry(test.in, nil)
got := (&Tailer{Denylist: &sync.Map{}}).parseRawOplogEntry(&test.in, nil)

if diff := pretty.Compare(parseEntry(t, got), parseEntry(t, test.want)); diff != "" {
t.Errorf("Got incorrect result (-got +want)\n%s", diff)
Expand Down
Loading