Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-3271 make sure expiration works on a closed bucket #17

Merged
merged 7 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 11 additions & 13 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,12 @@ type Bucket struct {
name string // Bucket name
collections collectionsMap // Collections, indexed by DataStoreName
collectionFeeds map[sgbucket.DataStoreNameImpl][]*dcpFeed
mutex *sync.Mutex // mutex for synchronized access to Bucket
sqliteDB *sql.DB // SQLite database handle (do not access; call db() instead)
expTimer *time.Timer // Schedules expiration of docs
nextExp *uint32 // Timestamp when expTimer will run (0 if never)
serial uint32 // Serial number for logging
inMemory bool // True if it's an in-memory database
closed bool // represents state when it is closed
mutex *sync.Mutex // mutex for synchronized access to Bucket
sqliteDB *sql.DB // SQLite database handle (do not access; call db() instead)
expManager *expiryManager // expiration manager for bucket
serial uint32 // Serial number for logging
inMemory bool // True if it's an in-memory database
closed bool // represents state when it is closed
}

type collectionsMap = map[sgbucket.DataStoreNameImpl]*Collection
Expand Down Expand Up @@ -174,10 +173,10 @@ func OpenBucket(urlStr string, bucketName string, mode OpenMode) (b *Bucket, err
collections: make(map[sgbucket.DataStoreNameImpl]*Collection),
collectionFeeds: make(map[sgbucket.DataStoreNameImpl][]*dcpFeed),
mutex: &sync.Mutex{},
nextExp: new(uint32),
inMemory: inMemory,
serial: serial,
}
bucket.expManager = newExpirationManager(bucket.doExpiration)
defer func() {
if err != nil {
_ = bucket.CloseAndDelete(context.TODO())
Expand All @@ -203,7 +202,7 @@ func OpenBucket(urlStr string, bucketName string, mode OpenMode) (b *Bucket, err
}

registerBucket(bucket)
return bucket, err
return bucket.copy(), err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens in the case of concurrent OpenBucket requests for a bucket that doesn't already exist in the registry? It doesn't look like we're doing any locking, so in the case that registerBucket finds an existing bucket in the registry at line 205 (i.e. this instance of OpenBucket lost the race), I think we should be returning a copy of that bucket, and not the newly created one.

}

// Creates or re-opens a bucket, like OpenBucket.
Expand Down Expand Up @@ -307,7 +306,7 @@ func (bucket *Bucket) initializeSchema(bucketName string) (err error) {
return
}

// Returns the database handle as a `queryable` interface value.
// db returns the database handle as a `queryable` interface value.
// If the bucket has been closed, it returns a special `closedDB` value that will return
// ErrBucketClosed from any call.
func (bucket *Bucket) db() queryable {
Expand All @@ -316,7 +315,7 @@ func (bucket *Bucket) db() queryable {
return bucket._db()
}

// Returns the database handle as a `queryable` interface value. This is the same as `db()` without locking. This is not safe to call without bucket.mutex being locked the caller.
// db returns the database handle as a `queryable` interface value. This is the same as `db()` without locking. This is not safe to call without bucket.mutex being locked the caller.
func (bucket *Bucket) _db() queryable {
if bucket.closed {
return closedDB{}
Expand Down Expand Up @@ -380,8 +379,7 @@ func (b *Bucket) copy() *Bucket {
collections: make(collectionsMap),
mutex: b.mutex,
sqliteDB: b.sqliteDB,
expTimer: b.expTimer,
nextExp: b.nextExp,
expManager: b.expManager,
serial: b.serial,
inMemory: b.inMemory,
}
Expand Down
50 changes: 14 additions & 36 deletions bucket_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"database/sql"
"errors"
"fmt"
"time"

sgbucket "github.com/couchbase/sg-bucket"
)
Expand Down Expand Up @@ -62,9 +61,7 @@ func (bucket *Bucket) Close(_ context.Context) {

// _closeSqliteDB closes the underlying sqlite database and shuts down dcpFeeds. Must have a lock to call this function.
func (bucket *Bucket) _closeSqliteDB() {
if bucket.expTimer != nil {
bucket.expTimer.Stop()
}
bucket.expManager.stop()
for _, c := range bucket.collections {
c.close()
}
Expand Down Expand Up @@ -174,6 +171,7 @@ func (bucket *Bucket) DropDataStore(name sgbucket.DataStoreName) error {
return bucket.dropCollection(sc)
}

// ListDataStores returns a list of the names of all data stores in the bucket.
func (bucket *Bucket) ListDataStores() (result []sgbucket.DataStoreName, err error) {
traceEnter("ListDataStores", "%s", bucket)
defer func() { traceExit("ListDataStores", err, "%v", result) }()
Expand Down Expand Up @@ -309,8 +307,8 @@ func (bucket *Bucket) dropCollection(name sgbucket.DataStoreNameImpl) error {

//////// EXPIRATION (CUSTOM API):

// Returns the earliest expiration time of any document, or 0 if none.
func (bucket *Bucket) NextExpiration() (exp Exp, err error) {
// nextExpiration returns the earliest expiration time of any document, or 0 if none.
func (bucket *Bucket) nextExpiration() (exp Exp, err error) {
var expVal sql.NullInt64
row := bucket.db().QueryRow(`SELECT min(exp) FROM documents WHERE exp > 0`)
err = scan(row, &expVal)
Expand All @@ -320,8 +318,8 @@ func (bucket *Bucket) NextExpiration() (exp Exp, err error) {
return
}

// Immediately deletes all expired documents in this bucket.
func (bucket *Bucket) ExpireDocuments() (int64, error) {
// expireDocuments immediately deletes all expired documents in this bucket.
func (bucket *Bucket) expireDocuments() (int64, error) {
names, err := bucket.ListDataStores()
if err != nil {
return 0, err
Expand All @@ -330,7 +328,7 @@ func (bucket *Bucket) ExpireDocuments() (int64, error) {
for _, name := range names {
if coll, err := bucket.getCollection(name.(sgbucket.DataStoreNameImpl)); err != nil {
return 0, err
} else if n, err := coll.ExpireDocuments(); err != nil {
} else if n, err := coll.expireDocuments(); err != nil {
return 0, err
} else {
count += n
Expand All @@ -339,40 +337,20 @@ func (bucket *Bucket) ExpireDocuments() (int64, error) {
return count, nil
}

// scheduleExpiration schedules the next expiration of documents to occur, from the minimum expiration value in the bucket.
func (bucket *Bucket) scheduleExpiration() {
if nextExp, err := bucket.NextExpiration(); err == nil && nextExp > 0 {
bucket.scheduleExpirationAtOrBefore(nextExp)
}
}

func (bucket *Bucket) scheduleExpirationAtOrBefore(exp uint32) {
if exp > 0 {
bucket.mutex.Lock()
defer bucket.mutex.Unlock()
if exp < *bucket.nextExp || *bucket.nextExp == 0 {
bucket.nextExp = &exp
dur := expDuration(exp)
if dur < 0 {
dur = 0
}
debug("EXP: Scheduling in %s", dur)
if bucket.expTimer == nil {
bucket.expTimer = time.AfterFunc(dur, bucket.doExpiration)
} else {
bucket.expTimer.Reset(dur)
}
}
if nextExp, err := bucket.nextExpiration(); err == nil && nextExp > 0 {
bucket.expManager._scheduleExpirationAtOrBefore(nextExp)
torcolvin marked this conversation as resolved.
Show resolved Hide resolved
}
}

func (bucket *Bucket) doExpiration() {
bucket.mutex.Lock()
bucket.nextExp = func(x uint32) *uint32 { return &x }(0)
bucket.mutex.Unlock()
bucket.expManager._clearNext()

debug("EXP: Running scheduled expiration...")
if n, err := bucket.ExpireDocuments(); err != nil {
logError("Bucket %s error expiring docs: %v", bucket, err)
if n, err := bucket.expireDocuments(); err != nil {
// If there's an error expiring docs, it means there is a programming error of a leaked expiration goroutine.
panic("Error expiring docs: " + err.Error())
} else if n > 0 {
info("Bucket %s expired %d docs", bucket, n)
}
Expand Down
3 changes: 1 addition & 2 deletions bucket_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ func (r *bucketRegistry) registerBucket(bucket *Bucket) {
defer r.lock.Unlock()
_, ok := r.buckets[name]
if !ok {
b := bucket.copy()
r.buckets[name] = b
r.buckets[name] = bucket
}
r.bucketCount[name] += 1
}
Expand Down
28 changes: 23 additions & 5 deletions bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func TestExpiration(t *testing.T) {
bucket := makeTestBucket(t)
c := bucket.DefaultDataStore()

exp, err := bucket.NextExpiration()
exp, err := bucket.nextExpiration()
require.NoError(t, err)
require.Equal(t, Exp(0), exp)

Expand All @@ -367,17 +367,17 @@ func TestExpiration(t *testing.T) {
requireAddRaw(t, c, "k3", 0, []byte("v3"))
requireAddRaw(t, c, "k4", exp4, []byte("v4"))

exp, err = bucket.NextExpiration()
exp, err = bucket.nextExpiration()
require.NoError(t, err)
// Usually this will return exp2, but if this is slow enough that the expiration goroutine runs to expire document k2, it can return exp4.
require.Contains(t, []Exp{exp2, exp4}, exp)

log.Printf("... waiting 1 sec ...")
time.Sleep(1 * time.Second)

exp, err = bucket.NextExpiration()
exp, err = bucket.nextExpiration()
require.NoError(t, err)
assert.Equal(t, exp4, exp)
require.Equal(t, int(exp4), int(exp))

_, _, err = c.GetRaw("k1")
assert.NoError(t, err)
Expand All @@ -391,7 +391,7 @@ func TestExpiration(t *testing.T) {
log.Printf("... waiting 2 secs ...")
time.Sleep(2 * time.Second)

exp, err = bucket.NextExpiration()
exp, err = bucket.nextExpiration()
require.NoError(t, err)
assert.Equal(t, uint32(0), exp)

Expand All @@ -403,6 +403,24 @@ func TestExpiration(t *testing.T) {
assert.Equal(t, int64(2), n)
}

func TestExpirationAfterClose(t *testing.T) {
t.Skip("Slow test useful for debugging issues with expiration")
bucket, err := OpenBucket(InMemoryURL, strings.ToLower(t.Name()), CreateNew)
ctx := testCtx(t)
defer func() {
assert.NoError(t, bucket.CloseAndDelete(ctx))
}()
require.NoError(t, err)
c := bucket.DefaultDataStore()

// set expiry long enough that Close will happen first
exp := Exp(time.Now().Add(1 * time.Second).Unix())
requireAddRaw(t, c, "docID", exp, []byte("v1"))
bucket.Close(ctx)
// sleep to ensure we won't panic
time.Sleep(2 * time.Second)
}

func TestUriFromPathWindows(t *testing.T) {
ensureNoLeaks(t)
if runtime.GOOS != "windows" {
Expand Down
11 changes: 7 additions & 4 deletions collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,20 +357,23 @@ func (c *Collection) WriteCas(key string, flags int, exp Exp, cas CAS, val any,
return
}

// Remove creates a document tombstone. It removes the document's value and user xattrs.
func (c *Collection) Remove(key string, cas CAS) (casOut CAS, err error) {
traceEnter("Remove", "%q, 0x%x", key, cas)
casOut, err = c.remove(key, &cas)
traceExit("Remove", err, "0x%x", casOut)
return
}

// Delete creates a document tombstone. It removes the document's value and user xattrs. Equivalent to Remove without a CAS check.
func (c *Collection) Delete(key string) (err error) {
traceEnter("Delete", "%q", key)
_, err = c.remove(key, nil)
traceExit("Delete", err, "ok")
return err
}

// remove creates a document tombstone. It removes the document's value and user xattrs. checkClosed will allow removing the document even the bucket instance is "closed".
func (c *Collection) remove(key string, ifCas *CAS) (casOut CAS, err error) {
err = c.withNewCas(func(txn *sql.Tx, newCas CAS) (e *event, err error) {
// Get the doc, possibly checking cas:
Expand Down Expand Up @@ -518,10 +521,10 @@ func (c *Collection) IsSupported(feature sgbucket.BucketStoreFeature) bool {

//////// EXPIRATION

// Immediately deletes all expired documents in this collection.
func (c *Collection) ExpireDocuments() (count int64, err error) {
traceEnter("ExpireDocuments", "")
defer func() { traceExit("ExpireDocuments", err, "%d", count) }()
// _expireDocuments immediately deletes all expired documents in this collection.
func (c *Collection) expireDocuments() (count int64, err error) {
traceEnter("_expireDocuments", "")
defer func() { traceExit("_expireDocuments", err, "%d", count) }()

// First find all the expired docs and collect their keys:
exp := nowAsExpiry()
Expand Down
107 changes: 107 additions & 0 deletions expiry_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright 2023-Present Couchbase, Inc.
//
// Use of this software is governed by the Business Source License included
// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
// in that file, in accordance with the Business Source License, use of this
// software will be governed by the Apache License, Version 2.0, included in
// the file licenses/APL2.txt.

package rosmar

import (
"sync"
"time"
)

// expiryManager handles expiration for a given bucket. It stores a timer which will call expirationFunc to delete documents. The value of when the timer
type expiryManager struct {
mutex *sync.Mutex // mutex for synchronized access to expiryManager
timer *time.Timer // Schedules expiration of docs
nextExp *uint32 // Timestamp when expTimer will run (0 if never)
expirationFunc func() // Function to call when timer expires
}

func newExpirationManager(expiractionFunc func()) *expiryManager {
var nextExp uint32
return &expiryManager{
mutex: &sync.Mutex{},
nextExp: &nextExp,
expirationFunc: expiractionFunc,
}
}

// stop stops existing timers and waits for any expiration processes to complete
func (e *expiryManager) stop() {
e.mutex.Lock()
defer e.mutex.Unlock()
if e.timer != nil {
e.timer.Stop()
}
}

// _getNext returns the next expiration time, 0 if there is no scheduled expiration.
func (e *expiryManager) _getNext() uint32 {
return *e.nextExp
}

// setNext sets the next expiration time and schedules an expiration to occur after that time.
func (e *expiryManager) setNext(exp uint32) {
e.mutex.Lock()
defer e.mutex.Unlock()
e._setNext(exp)
}

// _clearNext clears the next expiration time.
func (e *expiryManager) _clearNext() {
var exp uint32
e.nextExp = &exp
}

// setNext sets the next expiration time and schedules an expiration to occur after that time. Requires caller to have acquired mutex.
func (e *expiryManager) _setNext(exp uint32) {
info("_setNext ", exp)
e.nextExp = &exp
if exp == 0 {
e.timer = nil
return
}
dur := expDuration(exp)
if dur < 0 {
dur = 0
}
debug("EXP: Scheduling in %s", dur)
if e.timer == nil {
e.timer = time.AfterFunc(dur, e.runExpiry)
} else {
e.timer.Reset(dur)
}
}

// scheduleExpirationAtOrBefore schedules the next expiration of documents to occur, from the minimum expiration value in the bucket.
func (e *expiryManager) scheduleExpirationAtOrBefore(exp uint32) {
if exp == 0 {
return
}
e.mutex.Lock()
defer e.mutex.Unlock()
e._scheduleExpirationAtOrBefore(exp)
}

// _scheduleExpirationAtOrBefore schedules the next expiration of documents to occur, from the minimum expiration value in the bucket. Requires the mutext to be held.
func (e *expiryManager) _scheduleExpirationAtOrBefore(exp uint32) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't look like we actually need this version (that requires holding the mutex, based on my other comment) - can this just be moved inside scheduleExpirationAtOrBefore?

if exp == 0 {
return
}
currentNextExp := e._getNext()
// if zero will unset the timer.
if currentNextExp == 0 || exp < currentNextExp {
e._setNext(exp)
}
}

// runExpiry is called when the timer expires. It calls the expirationFunc and then reschedules the timer if necessary.
func (e *expiryManager) runExpiry() {
e.mutex.Lock()
defer e.mutex.Unlock()
e.expirationFunc()
}
Loading