Skip to content

Commit

Permalink
CBG-3271 make sure expiration works on a closed bucket
Browse files Browse the repository at this point in the history
Make sure the following works:

- open bucket
- write doc with expiry of 1 sec
- close bucket
- wait more than 1 sec

The doc should expire, because another caller can re-open foo, and would
expect the document to be deleted. This is akin to metadata purge
interval.

Implementation:

- Abstracted ExpirationManager into a separate struct / file for
  readability.
- ExpirationManager exists on every Bucket, since expiration is set
  based on a CRUD operation, which could occur on any copy of the
  bucket.
- The functions for expiration use a new function _underlyingDB which
  returns a queryable object that is never closed.
  • Loading branch information
torcolvin committed Nov 17, 2023
1 parent adb4806 commit d4682bf
Show file tree
Hide file tree
Showing 8 changed files with 203 additions and 76 deletions.
35 changes: 21 additions & 14 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,12 @@ type Bucket struct {
name string // Bucket name
collections collectionsMap // Collections, indexed by DataStoreName
collectionFeeds map[sgbucket.DataStoreNameImpl][]*dcpFeed
mutex *sync.Mutex // mutex for synchronized access to Bucket
sqliteDB *sql.DB // SQLite database handle (do not access; call db() instead)
expTimer *time.Timer // Schedules expiration of docs
nextExp *uint32 // Timestamp when expTimer will run (0 if never)
serial uint32 // Serial number for logging
inMemory bool // True if it's an in-memory database
closed bool // represents state when it is closed
mutex *sync.Mutex // mutex for synchronized access to Bucket
sqliteDB *sql.DB // SQLite database handle (do not access; call db() instead)
expManager *expirationManager // expiration manager for bucket
serial uint32 // Serial number for logging
inMemory bool // True if it's an in-memory database
closed bool // represents state when it is closed
}

type collectionsMap = map[sgbucket.DataStoreNameImpl]*Collection
Expand Down Expand Up @@ -174,10 +173,10 @@ func OpenBucket(urlStr string, bucketName string, mode OpenMode) (b *Bucket, err
collections: make(map[sgbucket.DataStoreNameImpl]*Collection),
collectionFeeds: make(map[sgbucket.DataStoreNameImpl][]*dcpFeed),
mutex: &sync.Mutex{},
nextExp: new(uint32),
inMemory: inMemory,
serial: serial,
}
bucket.expManager = newExpirationManager(bucket.doExpiration)
defer func() {
if err != nil {
_ = bucket.CloseAndDelete(context.TODO())
Expand Down Expand Up @@ -307,7 +306,7 @@ func (bucket *Bucket) initializeSchema(bucketName string) (err error) {
return
}

// Returns the database handle as a `queryable` interface value.
// db returns the database handle as a `queryable` interface value.
// If the bucket has been closed, it returns a special `closedDB` value that will return
// ErrBucketClosed from any call.
func (bucket *Bucket) db() queryable {
Expand All @@ -316,24 +315,33 @@ func (bucket *Bucket) db() queryable {
return bucket._db()
}

// Returns the database handle as a `queryable` interface value. This is the same as `db()` without locking. This is not safe to call without bucket.mutex being locked the caller.
// db returns the database handle as a `queryable` interface value. This is the same as `db()` without locking. This is not safe to call without bucket.mutex being locked the caller.
func (bucket *Bucket) _db() queryable {
if bucket.closed {
return closedDB{}
}
return bucket.sqliteDB
}

// _underlyingDB returns the database handle as a `queryable` interface value. This should only be used for bucket maintanence operations that should occur regardless of if bucket is open or closed. This is not safe to call without bucket.mutex being locked the caller.
func (bucket *Bucket) _underlyingDB() queryable {
if bucket.sqliteDB == nil {
logError("bucket.sqliteDB is nil for _underlyingDB call. This function is being called after bucket is closed.")
return closedDB{}
}
return bucket.sqliteDB
}

// Runs a function within a SQLite transaction.
func (bucket *Bucket) inTransaction(fn func(txn *sql.Tx) error) error {
func (bucket *Bucket) inTransaction(fn func(txn *sql.Tx) error, checkClosedBucket bucketClosedCheck) error {
// SQLite allows only a single writer, so use a mutex to avoid BUSY and LOCKED errors.
// However, these errors can still occur (somehow?), so we retry if we get one.
// --Update, 25 July 2023: After adding "_txlock=immediate" to the DB options when opening,
// the busy/locked errors have gone away. But there's no harm leaving the retry code in place.
bucket.mutex.Lock()
defer bucket.mutex.Unlock()

if bucket.closed {
if checkClosedBucket == true && bucket.closed {
return ErrBucketClosed
}

Expand Down Expand Up @@ -380,8 +388,7 @@ func (b *Bucket) copy() *Bucket {
collections: make(collectionsMap),
mutex: b.mutex,
sqliteDB: b.sqliteDB,
expTimer: b.expTimer,
nextExp: b.nextExp,
expManager: b.expManager,
serial: b.serial,
inMemory: b.inMemory,
}
Expand Down
72 changes: 31 additions & 41 deletions bucket_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,17 @@ import (
"database/sql"
"errors"
"fmt"
"time"

sgbucket "github.com/couchbase/sg-bucket"
)

type bucketClosedCheck bool

const (
checkBucketClosed bucketClosedCheck = true
skipCheckBucketClosed bucketClosedCheck = false
)

func (bucket *Bucket) String() string {
return fmt.Sprintf("B#%d", bucket.serial)
}
Expand Down Expand Up @@ -62,9 +68,7 @@ func (bucket *Bucket) Close(_ context.Context) {

// _closeSqliteDB closes the underlying sqlite database and shuts down dcpFeeds. Must have a lock to call this function.
func (bucket *Bucket) _closeSqliteDB() {
if bucket.expTimer != nil {
bucket.expTimer.Stop()
}
bucket.expManager.stop()
for _, c := range bucket.collections {
c.close()
}
Expand Down Expand Up @@ -174,10 +178,16 @@ func (bucket *Bucket) DropDataStore(name sgbucket.DataStoreName) error {
return bucket.dropCollection(sc)
}

// ListDataStores returns a list of the names of all data stores in the bucket.
func (bucket *Bucket) ListDataStores() (result []sgbucket.DataStoreName, err error) {
traceEnter("ListDataStores", "%s", bucket)
defer func() { traceExit("ListDataStores", err, "%v", result) }()
rows, err := bucket.db().Query(`SELECT id, scope, name FROM collections ORDER BY id`)
return bucket.listDataStores(bucket.db())
}

// listDataStores returns a list of the names of all data stores in the bucket, given a specific db handle.
func (bucket *Bucket) listDataStores(db queryable) (result []sgbucket.DataStoreName, err error) {
traceEnter("listDataStores", "%s", bucket)
defer func() { traceExit("listDataStores", err, "%v", result) }()
rows, err := db.Query(`SELECT id, scope, name FROM collections ORDER BY id`)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -309,8 +319,8 @@ func (bucket *Bucket) dropCollection(name sgbucket.DataStoreNameImpl) error {

//////// EXPIRATION (CUSTOM API):

// Returns the earliest expiration time of any document, or 0 if none.
func (bucket *Bucket) NextExpiration() (exp Exp, err error) {
// nextExpiration returns the earliest expiration time of any document, or 0 if none.
func (bucket *Bucket) nextExpiration() (exp Exp, err error) {
var expVal sql.NullInt64
row := bucket.db().QueryRow(`SELECT min(exp) FROM documents WHERE exp > 0`)
err = scan(row, &expVal)
Expand All @@ -320,17 +330,17 @@ func (bucket *Bucket) NextExpiration() (exp Exp, err error) {
return
}

// Immediately deletes all expired documents in this bucket.
func (bucket *Bucket) ExpireDocuments() (int64, error) {
names, err := bucket.ListDataStores()
// expireDocuments immediately deletes all expired documents in this bucket.
func (bucket *Bucket) expireDocuments() (int64, error) {
names, err := bucket.listDataStores(bucket._underlyingDB())
if err != nil {
return 0, err
}
var count int64
for _, name := range names {
if coll, err := bucket.getCollection(name.(sgbucket.DataStoreNameImpl)); err != nil {
return 0, err
} else if n, err := coll.ExpireDocuments(); err != nil {
} else if n, err := coll.expireDocuments(); err != nil {
return 0, err
} else {
count += n
Expand All @@ -339,40 +349,20 @@ func (bucket *Bucket) ExpireDocuments() (int64, error) {
return count, nil
}

// scheduleExpiration schedules the next expiration of documents to occur, from the minimum expiration value in the bucket.
func (bucket *Bucket) scheduleExpiration() {
if nextExp, err := bucket.NextExpiration(); err == nil && nextExp > 0 {
bucket.scheduleExpirationAtOrBefore(nextExp)
}
}

func (bucket *Bucket) scheduleExpirationAtOrBefore(exp uint32) {
if exp > 0 {
bucket.mutex.Lock()
defer bucket.mutex.Unlock()
if exp < *bucket.nextExp || *bucket.nextExp == 0 {
bucket.nextExp = &exp
dur := expDuration(exp)
if dur < 0 {
dur = 0
}
debug("EXP: Scheduling in %s", dur)
if bucket.expTimer == nil {
bucket.expTimer = time.AfterFunc(dur, bucket.doExpiration)
} else {
bucket.expTimer.Reset(dur)
}
}
if nextExp, err := bucket.nextExpiration(); err == nil && nextExp > 0 {
bucket.expManager._scheduleExpirationAtOrBefore(nextExp)
}
}

func (bucket *Bucket) doExpiration() {
bucket.mutex.Lock()
bucket.nextExp = func(x uint32) *uint32 { return &x }(0)
bucket.mutex.Unlock()
bucket.expManager._clearNext()

debug("EXP: Running scheduled expiration...")
if n, err := bucket.ExpireDocuments(); err != nil {
logError("Bucket %s error expiring docs: %v", bucket, err)
if n, err := bucket.expireDocuments(); err != nil {
// If there's an error expiring docs, it means there is a programming error of a leaked expiration goroutine.
panic("Error expiring docs: " + err.Error())
} else if n > 0 {
info("Bucket %s expired %d docs", bucket, n)
}
Expand All @@ -389,7 +379,7 @@ func (bucket *Bucket) PurgeTombstones() (count int64, err error) {
count, err = result.RowsAffected()
}
return err
})
}, true)
traceExit("PurgeTombstones", err, "%d", count)
return
}
Expand Down
25 changes: 20 additions & 5 deletions bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func TestExpiration(t *testing.T) {
bucket := makeTestBucket(t)
c := bucket.DefaultDataStore()

exp, err := bucket.NextExpiration()
exp, err := bucket.nextExpiration()
require.NoError(t, err)
require.Equal(t, Exp(0), exp)

Expand All @@ -367,17 +367,17 @@ func TestExpiration(t *testing.T) {
requireAddRaw(t, c, "k3", 0, []byte("v3"))
requireAddRaw(t, c, "k4", exp4, []byte("v4"))

exp, err = bucket.NextExpiration()
exp, err = bucket.nextExpiration()
require.NoError(t, err)
// Usually this will return exp2, but if this is slow enough that the expiration goroutine runs to expire document k2, it can return exp4.
require.Contains(t, []Exp{exp2, exp4}, exp)

log.Printf("... waiting 1 sec ...")
time.Sleep(1 * time.Second)

exp, err = bucket.NextExpiration()
exp, err = bucket.nextExpiration()
require.NoError(t, err)
assert.Equal(t, exp4, exp)
require.Equal(t, int(exp4), int(exp))

_, _, err = c.GetRaw("k1")
assert.NoError(t, err)
Expand All @@ -391,7 +391,7 @@ func TestExpiration(t *testing.T) {
log.Printf("... waiting 2 secs ...")
time.Sleep(2 * time.Second)

exp, err = bucket.NextExpiration()
exp, err = bucket.nextExpiration()
require.NoError(t, err)
assert.Equal(t, uint32(0), exp)

Expand All @@ -403,6 +403,21 @@ func TestExpiration(t *testing.T) {
assert.Equal(t, int64(2), n)
}

func TestExpirationAfterClose(t *testing.T) {
bucket, err := OpenBucket(InMemoryURL, strings.ToLower(t.Name()), CreateNew)
ctx := testCtx(t)
defer bucket.CloseAndDelete(ctx)

Check failure on line 409 in bucket_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `bucket.CloseAndDelete` is not checked (errcheck)
require.NoError(t, err)
c := bucket.DefaultDataStore()

// set expiry long enough that Close will happen first
exp := Exp(time.Now().Add(1 * time.Second).Unix())
requireAddRaw(t, c, "docID", exp, []byte("v1"))
bucket.Close(ctx)
// sleep to ensure we won't panic
time.Sleep(2 * time.Second)
}

func TestUriFromPathWindows(t *testing.T) {
ensureNoLeaks(t)
if runtime.GOOS != "windows" {
Expand Down
35 changes: 23 additions & 12 deletions collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,22 +357,26 @@ func (c *Collection) WriteCas(key string, flags int, exp Exp, cas CAS, val any,
return
}

// Remove creates a document tombstone. It removes the document's value and user xattrs.
func (c *Collection) Remove(key string, cas CAS) (casOut CAS, err error) {
traceEnter("Remove", "%q, 0x%x", key, cas)
casOut, err = c.remove(key, &cas)
casOut, err = c.remove(key, &cas, checkBucketClosed)
traceExit("Remove", err, "0x%x", casOut)
return
}

// Delete creates a document tombstone. It removes the document's value and user xattrs. Equivalent to Remove without a CAS check.
func (c *Collection) Delete(key string) (err error) {
traceEnter("Delete", "%q", key)
_, err = c.remove(key, nil)
_, err = c.remove(key, nil, checkBucketClosed)
traceExit("Delete", err, "ok")
return err
}

func (c *Collection) remove(key string, ifCas *CAS) (casOut CAS, err error) {
err = c.withNewCas(func(txn *sql.Tx, newCas CAS) (e *event, err error) {
// remove creates a document tombstone. It removes the document's value and user xattrs. checkClosed will allow removing the document even the bucket instance is "closed".
func (c *Collection) remove(key string, ifCas *CAS, checkClosed bucketClosedCheck) (casOut CAS, err error) {
fmt.Println("remove", checkClosed)
err = c.withNewCasAndBucketClosedCheck(func(txn *sql.Tx, newCas CAS) (e *event, err error) {
// Get the doc, possibly checking cas:
var cas CAS
var rawXattrs []byte
Expand Down Expand Up @@ -416,7 +420,7 @@ func (c *Collection) remove(key string, ifCas *CAS) (casOut CAS, err error) {
}
casOut = newCas
return
})
}, checkClosed)
return
}

Expand Down Expand Up @@ -518,14 +522,14 @@ func (c *Collection) IsSupported(feature sgbucket.BucketStoreFeature) bool {

//////// EXPIRATION

// Immediately deletes all expired documents in this collection.
func (c *Collection) ExpireDocuments() (count int64, err error) {
traceEnter("ExpireDocuments", "")
defer func() { traceExit("ExpireDocuments", err, "%d", count) }()
// _expireDocuments immediately deletes all expired documents in this collection.
func (c *Collection) expireDocuments() (count int64, err error) {
traceEnter("_expireDocuments", "")
defer func() { traceExit("_expireDocuments", err, "%d", count) }()

// First find all the expired docs and collect their keys:
exp := nowAsExpiry()
rows, err := c.db().Query(`SELECT key FROM documents
rows, err := c.bucket._underlyingDB().Query(`SELECT key FROM documents
WHERE collection = ?1 AND exp > 0 AND exp <= ?2`, c.id, exp)
if err != nil {
return
Expand All @@ -546,8 +550,10 @@ func (c *Collection) ExpireDocuments() (count int64, err error) {
// will get its own db connection, and if the db only supports one connection (i.e. in-memory)
// having both queries active would deadlock.)
for _, key := range keys {
if c.Delete(key) == nil {
_, err = c.remove(key, nil, skipCheckBucketClosed)
if err == nil {
count++
} else {
}
}
return
Expand Down Expand Up @@ -581,6 +587,11 @@ func (c *Collection) setLastCas(txn *sql.Tx, cas CAS) (err error) {
// Runs a function within a SQLite transaction, passing it a new CAS to assign to the
// document being modified. The function returns an event to be posted.
func (c *Collection) withNewCas(fn func(txn *sql.Tx, newCas CAS) (*event, error)) error {
return c.withNewCasAndBucketClosedCheck(fn, checkBucketClosed)
}

// withNewCasAndBucketClosedCheck runs a function within a SQLite transaction like withNewCas. This allows the caller to bypass the bucket closed status, suitable for functions that need to run on the underlying bucket object.
func (c *Collection) withNewCasAndBucketClosedCheck(fn func(txn *sql.Tx, newCas CAS) (*event, error), checkClosed bucketClosedCheck) error {
var e *event
err := c.bucket.inTransaction(func(txn *sql.Tx) error {
newCas, err := c.bucket.getLastCas(txn)
Expand All @@ -592,7 +603,7 @@ func (c *Collection) withNewCas(fn func(txn *sql.Tx, newCas CAS) (*event, error)
}
}
return err
})
}, checkClosed)
if err == nil && e != nil {
c.postNewEvent(e)
}
Expand Down
4 changes: 2 additions & 2 deletions designdoc.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (c *Collection) PutDDoc(_ context.Context, designDoc string, ddoc *sgbucket
}
c.forgetCachedViews(designDoc)
return nil
})
}, checkBucketClosed)
traceExit("PutDDoc", err, "ok")
return err
}
Expand All @@ -121,7 +121,7 @@ func (c *Collection) DeleteDDoc(designDoc string) error {
}
}
return err
})
}, checkBucketClosed)
traceExit("DeleteDDoc", err, "ok")
return err
}
Expand Down
Loading

0 comments on commit d4682bf

Please sign in to comment.