Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-3916 Use DCP feed to purge documents in viewsAndGSIBucketReadier #6817

Merged
merged 1 commit into from
May 9, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 97 additions & 1 deletion db/util_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,103 @@ func EmptyPrimaryIndex(ctx context.Context, dataStore sgbucket.DataStore) error
return results.Close()
}

// purgeWithDCPFeed purges all documents seen on a DCP feed with system xattrs, including tombstones which aren't found when emptying the primary index.
func purgeWithDCPFeed(ctx context.Context, dataStore sgbucket.DataStore, tbp *base.TestBucketPool) (numCompacted int, err error) {
purgedDocCount := 0
purgeTimeout := 60 * time.Second
purgeBody := Body{"_purged": true}
processedDocCount := 0

var purgeErrors *base.MultiError
collection, err := base.AsCollection(dataStore)
if err != nil {
return 0, fmt.Errorf("dataStore was not a gocb collection: %w", err)
}

var collectionIDs []uint32
if collection.IsSupported(sgbucket.BucketStoreFeatureCollections) {
collectionIDs = append(collectionIDs, collection.GetCollectionID())
}

dcpClientOpts := base.DCPClientOptions{
OneShot: true,
FailOnRollback: false,
CollectionIDs: collectionIDs,
MetadataStoreType: base.DCPMetadataStoreInMemory,
}

purgeCallback := func(event sgbucket.FeedEvent) bool {
var purgeErr error

processedDocCount++
// We only need to purge mutations/deletions
if event.Opcode != sgbucket.FeedOpMutation && event.Opcode != sgbucket.FeedOpDeletion {
return false
}

// If it's a deletion but doesn't have xattrs, ignore it
if event.Opcode == sgbucket.FeedOpDeletion && event.DataType&base.MemcachedDataTypeXattr == 0 {
return false
}

key := string(event.Key)

if base.TestUseXattrs() {
purgeErr = dataStore.DeleteWithXattrs(ctx, key, []string{base.SyncXattrName})
} else {
purgeErr = dataStore.Delete(key)
}
if base.IsKeyNotFoundError(dataStore, purgeErr) {
// If key no longer exists, need to add and remove to trigger removal from view
_, addErr := dataStore.Add(key, 0, purgeBody)
if addErr != nil {
purgeErrors = purgeErrors.Append(addErr)
tbp.Logf(ctx, "Error adding key %s to force deletion. %v", key, addErr)
return false
}

if delErr := dataStore.Delete(key); delErr != nil {
purgeErrors = purgeErrors.Append(delErr)
tbp.Logf(ctx, "Error deleting key %s. %v", key, delErr)
}
purgedDocCount++
} else if purgeErr != nil {
purgeErrors = purgeErrors.Append(purgeErr)
tbp.Logf(ctx, "Error removing key %s (purge). %v", key, purgeErr)
}
return false
}
feedID := "purgeFeed-" + collection.CollectionName()
dcpClient, err := base.NewDCPClient(ctx, feedID, purgeCallback, dcpClientOpts, collection.Bucket)
if err != nil {
return 0, err
}
doneChan, err := dcpClient.Start()
if err != nil {
return 0, fmt.Errorf("error starting purge DCP feed: %w", err)
}
// wait for feed to complete
timeout := time.After(purgeTimeout)
select {
case err := <-doneChan:
if err != nil {
tbp.Logf(ctx, "purgeDCPFeed finished with error: %v", err)
}
case <-timeout:
return 0, fmt.Errorf("timeout waiting for purge DCP feed to complete")
}
closeErr := dcpClient.Close()
if closeErr != nil {
tbp.Logf(ctx, "error closing purge DCP feed: %v", closeErr)
}

tbp.Logf(ctx, "Finished purge DCP feed ... Total docs purged: %d", purgedDocCount)
tbp.Logf(ctx, "Finished purge DCP feed ... Total docs processed: %d", processedDocCount)
return purgedDocCount, purgeErrors.ErrorOrNil()
}

// emptyAllDocsIndex ensures the AllDocs index for the given bucket is empty, including tombstones which aren't found when emptying the primary index.
// nolint:unused
func emptyAllDocsIndex(ctx context.Context, dataStore sgbucket.DataStore, tbp *base.TestBucketPool) (numCompacted int, err error) {
purgedDocCount := 0
purgeBody := Body{"_purged": true}
Expand Down Expand Up @@ -301,7 +397,7 @@ var viewsAndGSIBucketReadier base.TBPBucketReadierFunc = func(ctx context.Contex
if err != nil {
return err
}
if _, err := emptyAllDocsIndex(ctx, dataStore, tbp); err != nil {
if _, err := purgeWithDCPFeed(ctx, dataStore, tbp); err != nil {
return err
}
if err := EmptyPrimaryIndex(ctx, dataStore); err != nil {
Expand Down
Loading