Skip to content

Commit

Permalink
[3.2.2 Backport] CBG-4495: do not put documents into rev cache for an…
Browse files Browse the repository at this point in the history
… on demand import (#7346)

* [3.2.2 Backport] CBG-4495: do not put documents into rev cache for an on demand import (#7328)

* CBG-4494 do not put documents into rev cache when they are being loaded

* have all on demand imports skip rev cache

* skip test which requires import

* remove redundant argument

* Remove single duplicate test always (#7332)

* CBG-4499: fix leaking goroutines (#7335)

* CBG-4499: fix leaking goroutines

* updates to address review

---------

Co-authored-by: Tor Colvin <tor.colvin@couchbase.com>
Co-authored-by: Gregory Newman-Smith <109068393+gregns1@users.noreply.github.com>
  • Loading branch information
3 people authored Jan 31, 2025
1 parent 09e9bb5 commit 3c0abf2
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 61 deletions.
21 changes: 13 additions & 8 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ func realDocID(docid string) string {
return docid
}

// GetDocument with raw returns the document from the bucket. This may perform an on-demand import.
func (c *DatabaseCollection) GetDocument(ctx context.Context, docid string, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, err error) {
doc, _, err = c.GetDocumentWithRaw(ctx, docid, unmarshalLevel)
return doc, err
}

// Lowest-level method that reads a document from the bucket
// GetDocumentWithRaw returns the document from the bucket. This may perform an on-demand import.
func (c *DatabaseCollection) GetDocumentWithRaw(ctx context.Context, docid string, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, rawBucketDoc *sgbucket.BucketDocument, err error) {
key := realDocID(docid)
if key == "" {
Expand Down Expand Up @@ -885,7 +886,8 @@ func (db *DatabaseCollectionWithUser) Put(ctx context.Context, docid string, bod
}

allowImport := db.UseXattrs()
doc, newRevID, err = db.updateAndReturnDoc(ctx, newDoc.ID, allowImport, &expiry, nil, nil, false, func(doc *Document) (resultDoc *Document, resultAttachmentData updatedAttachments, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) {
updateRevCache := true
doc, newRevID, err = db.updateAndReturnDoc(ctx, newDoc.ID, allowImport, &expiry, nil, nil, false, updateRevCache, func(doc *Document) (resultDoc *Document, resultAttachmentData updatedAttachments, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) {
var isSgWrite bool
var crc32Match bool

Expand Down Expand Up @@ -1010,7 +1012,8 @@ func (db *DatabaseCollectionWithUser) PutExistingRevWithConflictResolution(ctx c
}

allowImport := db.UseXattrs()
doc, _, err = db.updateAndReturnDoc(ctx, newDoc.ID, allowImport, &newDoc.DocExpiry, nil, existingDoc, false, func(doc *Document) (resultDoc *Document, resultAttachmentData updatedAttachments, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) {
updateRevCache := true
doc, _, err = db.updateAndReturnDoc(ctx, newDoc.ID, allowImport, &newDoc.DocExpiry, nil, existingDoc, false, updateRevCache, func(doc *Document) (resultDoc *Document, resultAttachmentData updatedAttachments, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) {
// (Be careful: this block can be invoked multiple times if there are races!)

var isSgWrite bool
Expand Down Expand Up @@ -1922,7 +1925,7 @@ type updateAndReturnDocCallback func(*Document) (resultDoc *Document, resultAtta
// 2. Specify the existing document body/xattr/cas, to avoid initial retrieval of the doc in cases that the current contents are already known (e.g. import).
// On cas failure, the document will still be reloaded from the bucket as usual.
// 3. If isImport=true, document body will not be updated - only metadata xattr(s)
func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, docid string, allowImport bool, expiry *uint32, opts *sgbucket.MutateInOptions, existingDoc *sgbucket.BucketDocument, isImport bool, callback updateAndReturnDocCallback) (doc *Document, newRevID string, err error) {
func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, docid string, allowImport bool, expiry *uint32, opts *sgbucket.MutateInOptions, existingDoc *sgbucket.BucketDocument, isImport bool, updateRevCache bool, callback updateAndReturnDocCallback) (doc *Document, newRevID string, err error) {
key := realDocID(docid)
if key == "" {
return nil, "", base.HTTPErrorf(400, "Invalid doc ID")
Expand Down Expand Up @@ -2166,10 +2169,12 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do
Deleted: doc.History[newRevID].Deleted,
}

if createNewRevIDSkipped {
db.revisionCache.Upsert(ctx, documentRevision)
} else {
db.revisionCache.Put(ctx, documentRevision)
if updateRevCache {
if createNewRevIDSkipped {
db.revisionCache.Upsert(ctx, documentRevision)
} else {
db.revisionCache.Put(ctx, documentRevision)
}
}

if db.eventMgr().HasHandlerForEvent(DocumentChange) {
Expand Down
4 changes: 3 additions & 1 deletion db/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ func (db *DatabaseCollectionWithUser) importDoc(ctx context.Context, docid strin
existingDoc.Expiry = *expiry
}

docOut, _, err = db.updateAndReturnDoc(ctx, newDoc.ID, true, expiry, mutationOptions, existingDoc, true, func(doc *Document) (resultDocument *Document, resultAttachmentData updatedAttachments, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) {
// do not update rev cache for on-demand imports
updateRevCache := mode == ImportFromFeed
docOut, _, err = db.updateAndReturnDoc(ctx, newDoc.ID, true, expiry, mutationOptions, existingDoc, true, updateRevCache, func(doc *Document) (resultDocument *Document, resultAttachmentData updatedAttachments, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) {
// Perform cas mismatch check first, as we want to identify cas mismatch before triggering migrate handling.
// If there's a cas mismatch, the doc has been updated since the version that triggered the import. Handling depends on import mode.
if doc.Cas != existingDoc.Cas {
Expand Down
1 change: 0 additions & 1 deletion db/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,6 @@ func TestImportWithCasFailureUpdate(t *testing.T) {
assert.NoError(t, err, "Error unmarshalling body")

runOnce = true

// Trigger import
_, err = collection.importDoc(ctx, testcase.docname, bodyD, nil, false, existingBucketDoc, ImportOnDemand)
assert.NoError(t, err)
Expand Down
167 changes: 117 additions & 50 deletions db/revision_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1237,55 +1237,6 @@ func TestRevCacheOnDemand(t *testing.T) {
testCtx, testCtxCancel := context.WithCancel(base.TestCtx(t))
defer testCtxCancel()

for i := 0; i < 2; i++ {
docID := fmt.Sprintf("extraDoc%d", i)
revID, _, err := collection.Put(ctx, docID, Body{"fake": "body"})
require.NoError(t, err)
go func() {
for {
select {
case <-testCtx.Done():
return
default:
_, err = db.revisionCache.Get(ctx, docID, revID, collection.GetCollectionID(), RevCacheOmitDelta) //nolint:errcheck
}
}
}()
}
log.Printf("Updating doc to trigger on-demand import")
err = collection.dataStore.Set(docID, 0, nil, []byte(`{"ver": "2"}`))
require.NoError(t, err)
log.Printf("Calling getRev for %s, %s", docID, revID)
rev, err := collection.getRev(ctx, docID, revID, 0, nil)
require.Error(t, err)
if base.IsEnterpriseEdition() {
fmt.Println("here")
}
require.ErrorContains(t, err, "missing")
// returns empty doc rev
assert.Equal(t, "", rev.DocID)
}

func TestRevCacheOnDemandMemoryEviction(t *testing.T) {
base.SkipImportTestsIfNotEnabled(t)

dbcOptions := DatabaseContextOptions{
RevisionCacheOptions: &RevisionCacheOptions{
MaxItemCount: 20,
ShardCount: 1,
MaxBytes: 112, // equivalent to max size 2 items
},
}
db, ctx := SetupTestDBWithOptions(t, dbcOptions)
defer db.Close(ctx)
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
docID := "doc1"
revID, _, err := collection.Put(ctx, docID, Body{"ver": "1"})
require.NoError(t, err)

testCtx, testCtxCancel := context.WithCancel(base.TestCtx(t))
defer testCtxCancel()

for i := 0; i < 2; i++ {
docID := fmt.Sprintf("extraDoc%d", i)
revID, _, err := collection.Put(ctx, docID, Body{"fake": "body"})
Expand All @@ -1310,7 +1261,6 @@ func TestRevCacheOnDemandMemoryEviction(t *testing.T) {
require.ErrorContains(t, err, "missing")
// returns empty doc rev
assert.Equal(t, "", rev.DocID)

}

func TestLoadActiveDocFromBucketRevCacheChurn(t *testing.T) {
Expand Down Expand Up @@ -1519,3 +1469,120 @@ func createDocAndReturnSizeAndRev(t *testing.T, ctx context.Context, docID strin

return expectedSize, rev
}

func TestRevCacheOnDemandImport(t *testing.T) {
base.SkipImportTestsIfNotEnabled(t)

dbcOptions := DatabaseContextOptions{
RevisionCacheOptions: &RevisionCacheOptions{
MaxItemCount: 2,
ShardCount: 1,
},
}
db, ctx := SetupTestDBWithOptions(t, dbcOptions)
defer db.Close(ctx)
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
docID := "doc1"
revID, _, err := collection.Put(ctx, docID, Body{"ver": "1"})
require.NoError(t, err)

ctx, testCtxCancel := context.WithCancel(ctx)
defer testCtxCancel()

for i := 0; i < 2; i++ {
docID := fmt.Sprintf("extraDoc%d", i)
revID, _, err := collection.Put(ctx, docID, Body{"fake": "body"})
require.NoError(t, err)
go func() {
for {
select {
case <-ctx.Done():
return
default:
_, err = db.revisionCache.Get(ctx, docID, revID, collection.GetCollectionID(), RevCacheOmitDelta) //nolint:errcheck
}
}
}()
}
err = collection.dataStore.Set(docID, 0, nil, []byte(`{"ver": "2"}`))
require.NoError(t, err)
rev, err := collection.getRev(ctx, docID, revID, 0, nil)
require.Error(t, err)
require.ErrorContains(t, err, "missing")
// returns empty doc rev
assert.Equal(t, "", rev.DocID)
}

func TestRevCacheOnDemandMemoryEviction(t *testing.T) {
base.SkipImportTestsIfNotEnabled(t)

dbcOptions := DatabaseContextOptions{
RevisionCacheOptions: &RevisionCacheOptions{
MaxItemCount: 20,
ShardCount: 1,
MaxBytes: 112, // equivalent to max size 2 items
},
}
db, ctx := SetupTestDBWithOptions(t, dbcOptions)
defer db.Close(ctx)
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
docID := "doc1"
revID, _, err := collection.Put(ctx, docID, Body{"ver": "1"})
require.NoError(t, err)

ctx, testCtxCancel := context.WithCancel(ctx)
defer testCtxCancel()

for i := 0; i < 2; i++ {
docID := fmt.Sprintf("extraDoc%d", i)
revID, _, err := collection.Put(ctx, docID, Body{"fake": "body"})
require.NoError(t, err)
go func() {
for {
select {
case <-ctx.Done():
return
default:
_, err = db.revisionCache.Get(ctx, docID, revID, collection.GetCollectionID(), RevCacheOmitDelta) //nolint:errcheck
}
}
}()
}
err = collection.dataStore.Set(docID, 0, nil, []byte(`{"ver": "2"}`))
require.NoError(t, err)
rev, err := collection.getRev(ctx, docID, revID, 0, nil)
require.Error(t, err)
require.ErrorContains(t, err, "missing")
// returns empty doc rev
assert.Equal(t, "", rev.DocID)

}

func TestRevCacheOnDemandImportNoCache(t *testing.T) {
base.SkipImportTestsIfNotEnabled(t)

db, ctx := setupTestDB(t)
defer db.Close(ctx)
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)

docID := "doc1"
revID1, _, err := collection.Put(ctx, docID, Body{"foo": "bar"})
require.NoError(t, err)

_, exists := collection.revisionCache.Peek(ctx, docID, revID1)
require.True(t, exists)

require.NoError(t, collection.dataStore.Set(docID, 0, nil, []byte(`{"foo": "baz"}`)))

doc, err := collection.GetDocument(ctx, docID, DocUnmarshalSync)
require.NoError(t, err)
require.Equal(t, Body{"foo": "baz"}, doc.Body(ctx))

// rev1 still exists in cache but not on server
_, exists = collection.revisionCache.Peek(ctx, docID, revID1)
require.True(t, exists)

// rev2 is not in cache but is on server
_, exists = collection.revisionCache.Peek(ctx, docID, doc.CurrentRev)
require.False(t, exists)
}
5 changes: 4 additions & 1 deletion rest/importtest/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1602,14 +1602,17 @@ func TestImportRevisionCopy(t *testing.T) {
_, err := dataStore.Add(key, 0, docBody)
assert.NoError(t, err, "Unable to insert doc TestImportDelete")

// 2. Trigger import via SG retrieval
// 2. Trigger import via SG retrieval, this will not populate the rev cache.
response := rt.SendAdminRequest("GET", "/{{.keyspace}}/_raw/"+key, "")
assert.Equal(t, 200, response.Code)
var rawInsertResponse rest.RawResponse
err = base.JSONUnmarshal(response.Body.Bytes(), &rawInsertResponse)
assert.NoError(t, err, "Unable to unmarshal raw response")
rev1id := rawInsertResponse.Sync.Rev

// Populate rev cache by getting the doc again
rt.GetDoc(key)

// 3. Update via SDK
updatedBody := make(map[string]interface{})
updatedBody["test"] = "TestImportRevisionCopyModified"
Expand Down

0 comments on commit 3c0abf2

Please sign in to comment.