diff --git a/bucket.go b/bucket.go index 151cf79..32277e6 100644 --- a/bucket.go +++ b/bucket.go @@ -36,13 +36,12 @@ type Bucket struct { name string // Bucket name collections collectionsMap // Collections, indexed by DataStoreName collectionFeeds map[sgbucket.DataStoreNameImpl][]*dcpFeed - mutex *sync.Mutex // mutex for synchronized access to Bucket - sqliteDB *sql.DB // SQLite database handle (do not access; call db() instead) - expTimer *time.Timer // Schedules expiration of docs - nextExp *uint32 // Timestamp when expTimer will run (0 if never) - serial uint32 // Serial number for logging - inMemory bool // True if it's an in-memory database - closed bool // represents state when it is closed + mutex *sync.Mutex // mutex for synchronized access to Bucket + sqliteDB *sql.DB // SQLite database handle (do not access; call db() instead) + expManager *expirationManager // expiration manager for bucket + serial uint32 // Serial number for logging + inMemory bool // True if it's an in-memory database + closed bool // represents state when it is closed } type collectionsMap = map[sgbucket.DataStoreNameImpl]*Collection @@ -174,10 +173,10 @@ func OpenBucket(urlStr string, bucketName string, mode OpenMode) (b *Bucket, err collections: make(map[sgbucket.DataStoreNameImpl]*Collection), collectionFeeds: make(map[sgbucket.DataStoreNameImpl][]*dcpFeed), mutex: &sync.Mutex{}, - nextExp: new(uint32), inMemory: inMemory, serial: serial, } + bucket.expManager = newExpirationManager(bucket.doExpiration) defer func() { if err != nil { _ = bucket.CloseAndDelete(context.TODO()) @@ -307,7 +306,7 @@ func (bucket *Bucket) initializeSchema(bucketName string) (err error) { return } -// Returns the database handle as a `queryable` interface value. +// db returns the database handle as a `queryable` interface value. // If the bucket has been closed, it returns a special `closedDB` value that will return // ErrBucketClosed from any call. func (bucket *Bucket) db() queryable { @@ -316,7 +315,7 @@ func (bucket *Bucket) db() queryable { return bucket._db() } -// Returns the database handle as a `queryable` interface value. This is the same as `db()` without locking. This is not safe to call without bucket.mutex being locked the caller. +// db returns the database handle as a `queryable` interface value. This is the same as `db()` without locking. This is not safe to call without bucket.mutex being locked the caller. func (bucket *Bucket) _db() queryable { if bucket.closed { return closedDB{} @@ -324,8 +323,17 @@ func (bucket *Bucket) _db() queryable { return bucket.sqliteDB } +// _underlyingDB returns the database handle as a `queryable` interface value. This should only be used for bucket maintanence operations that should occur regardless of if bucket is open or closed. This is not safe to call without bucket.mutex being locked the caller. +func (bucket *Bucket) _underlyingDB() queryable { + if bucket.sqliteDB == nil { + logError("bucket.sqliteDB is nil for _underlyingDB call. This function is being called after bucket is closed.") + return closedDB{} + } + return bucket.sqliteDB +} + // Runs a function within a SQLite transaction. -func (bucket *Bucket) inTransaction(fn func(txn *sql.Tx) error) error { +func (bucket *Bucket) inTransaction(fn func(txn *sql.Tx) error, checkClosedBucket bucketClosedCheck) error { // SQLite allows only a single writer, so use a mutex to avoid BUSY and LOCKED errors. // However, these errors can still occur (somehow?), so we retry if we get one. // --Update, 25 July 2023: After adding "_txlock=immediate" to the DB options when opening, @@ -333,7 +341,7 @@ func (bucket *Bucket) inTransaction(fn func(txn *sql.Tx) error) error { bucket.mutex.Lock() defer bucket.mutex.Unlock() - if bucket.closed { + if checkClosedBucket == true && bucket.closed { return ErrBucketClosed } @@ -380,8 +388,7 @@ func (b *Bucket) copy() *Bucket { collections: make(collectionsMap), mutex: b.mutex, sqliteDB: b.sqliteDB, - expTimer: b.expTimer, - nextExp: b.nextExp, + expManager: b.expManager, serial: b.serial, inMemory: b.inMemory, } diff --git a/bucket_api.go b/bucket_api.go index 3a26d5a..7c2f3a1 100644 --- a/bucket_api.go +++ b/bucket_api.go @@ -13,11 +13,17 @@ import ( "database/sql" "errors" "fmt" - "time" sgbucket "github.com/couchbase/sg-bucket" ) +type bucketClosedCheck bool + +const ( + checkBucketClosed bucketClosedCheck = true + skipCheckBucketClosed bucketClosedCheck = false +) + func (bucket *Bucket) String() string { return fmt.Sprintf("B#%d", bucket.serial) } @@ -62,9 +68,7 @@ func (bucket *Bucket) Close(_ context.Context) { // _closeSqliteDB closes the underlying sqlite database and shuts down dcpFeeds. Must have a lock to call this function. func (bucket *Bucket) _closeSqliteDB() { - if bucket.expTimer != nil { - bucket.expTimer.Stop() - } + bucket.expManager.stop() for _, c := range bucket.collections { c.close() } @@ -174,10 +178,16 @@ func (bucket *Bucket) DropDataStore(name sgbucket.DataStoreName) error { return bucket.dropCollection(sc) } +// ListDataStores returns a list of the names of all data stores in the bucket. func (bucket *Bucket) ListDataStores() (result []sgbucket.DataStoreName, err error) { - traceEnter("ListDataStores", "%s", bucket) - defer func() { traceExit("ListDataStores", err, "%v", result) }() - rows, err := bucket.db().Query(`SELECT id, scope, name FROM collections ORDER BY id`) + return bucket.listDataStores(bucket.db()) +} + +// listDataStores returns a list of the names of all data stores in the bucket, given a specific db handle. +func (bucket *Bucket) listDataStores(db queryable) (result []sgbucket.DataStoreName, err error) { + traceEnter("listDataStores", "%s", bucket) + defer func() { traceExit("listDataStores", err, "%v", result) }() + rows, err := db.Query(`SELECT id, scope, name FROM collections ORDER BY id`) if err != nil { return nil, err } @@ -309,8 +319,8 @@ func (bucket *Bucket) dropCollection(name sgbucket.DataStoreNameImpl) error { //////// EXPIRATION (CUSTOM API): -// Returns the earliest expiration time of any document, or 0 if none. -func (bucket *Bucket) NextExpiration() (exp Exp, err error) { +// nextExpiration returns the earliest expiration time of any document, or 0 if none. +func (bucket *Bucket) nextExpiration() (exp Exp, err error) { var expVal sql.NullInt64 row := bucket.db().QueryRow(`SELECT min(exp) FROM documents WHERE exp > 0`) err = scan(row, &expVal) @@ -320,9 +330,9 @@ func (bucket *Bucket) NextExpiration() (exp Exp, err error) { return } -// Immediately deletes all expired documents in this bucket. -func (bucket *Bucket) ExpireDocuments() (int64, error) { - names, err := bucket.ListDataStores() +// expireDocuments immediately deletes all expired documents in this bucket. +func (bucket *Bucket) expireDocuments() (int64, error) { + names, err := bucket.listDataStores(bucket._underlyingDB()) if err != nil { return 0, err } @@ -330,7 +340,7 @@ func (bucket *Bucket) ExpireDocuments() (int64, error) { for _, name := range names { if coll, err := bucket.getCollection(name.(sgbucket.DataStoreNameImpl)); err != nil { return 0, err - } else if n, err := coll.ExpireDocuments(); err != nil { + } else if n, err := coll.expireDocuments(); err != nil { return 0, err } else { count += n @@ -339,40 +349,20 @@ func (bucket *Bucket) ExpireDocuments() (int64, error) { return count, nil } +// scheduleExpiration schedules the next expiration of documents to occur, from the minimum expiration value in the bucket. func (bucket *Bucket) scheduleExpiration() { - if nextExp, err := bucket.NextExpiration(); err == nil && nextExp > 0 { - bucket.scheduleExpirationAtOrBefore(nextExp) - } -} - -func (bucket *Bucket) scheduleExpirationAtOrBefore(exp uint32) { - if exp > 0 { - bucket.mutex.Lock() - defer bucket.mutex.Unlock() - if exp < *bucket.nextExp || *bucket.nextExp == 0 { - bucket.nextExp = &exp - dur := expDuration(exp) - if dur < 0 { - dur = 0 - } - debug("EXP: Scheduling in %s", dur) - if bucket.expTimer == nil { - bucket.expTimer = time.AfterFunc(dur, bucket.doExpiration) - } else { - bucket.expTimer.Reset(dur) - } - } + if nextExp, err := bucket.nextExpiration(); err == nil && nextExp > 0 { + bucket.expManager._scheduleExpirationAtOrBefore(nextExp) } } func (bucket *Bucket) doExpiration() { - bucket.mutex.Lock() - bucket.nextExp = func(x uint32) *uint32 { return &x }(0) - bucket.mutex.Unlock() + bucket.expManager._clearNext() debug("EXP: Running scheduled expiration...") - if n, err := bucket.ExpireDocuments(); err != nil { - logError("Bucket %s error expiring docs: %v", bucket, err) + if n, err := bucket.expireDocuments(); err != nil { + // If there's an error expiring docs, it means there is a programming error of a leaked expiration goroutine. + panic("Error expiring docs: " + err.Error()) } else if n > 0 { info("Bucket %s expired %d docs", bucket, n) } @@ -389,7 +379,7 @@ func (bucket *Bucket) PurgeTombstones() (count int64, err error) { count, err = result.RowsAffected() } return err - }) + }, true) traceExit("PurgeTombstones", err, "%d", count) return } diff --git a/bucket_test.go b/bucket_test.go index 4d53e3e..ed3d46f 100644 --- a/bucket_test.go +++ b/bucket_test.go @@ -355,7 +355,7 @@ func TestExpiration(t *testing.T) { bucket := makeTestBucket(t) c := bucket.DefaultDataStore() - exp, err := bucket.NextExpiration() + exp, err := bucket.nextExpiration() require.NoError(t, err) require.Equal(t, Exp(0), exp) @@ -367,7 +367,7 @@ func TestExpiration(t *testing.T) { requireAddRaw(t, c, "k3", 0, []byte("v3")) requireAddRaw(t, c, "k4", exp4, []byte("v4")) - exp, err = bucket.NextExpiration() + exp, err = bucket.nextExpiration() require.NoError(t, err) // Usually this will return exp2, but if this is slow enough that the expiration goroutine runs to expire document k2, it can return exp4. require.Contains(t, []Exp{exp2, exp4}, exp) @@ -375,9 +375,9 @@ func TestExpiration(t *testing.T) { log.Printf("... waiting 1 sec ...") time.Sleep(1 * time.Second) - exp, err = bucket.NextExpiration() + exp, err = bucket.nextExpiration() require.NoError(t, err) - assert.Equal(t, exp4, exp) + require.Equal(t, int(exp4), int(exp)) _, _, err = c.GetRaw("k1") assert.NoError(t, err) @@ -391,7 +391,7 @@ func TestExpiration(t *testing.T) { log.Printf("... waiting 2 secs ...") time.Sleep(2 * time.Second) - exp, err = bucket.NextExpiration() + exp, err = bucket.nextExpiration() require.NoError(t, err) assert.Equal(t, uint32(0), exp) @@ -403,6 +403,21 @@ func TestExpiration(t *testing.T) { assert.Equal(t, int64(2), n) } +func TestExpirationAfterClose(t *testing.T) { + bucket, err := OpenBucket(InMemoryURL, strings.ToLower(t.Name()), CreateNew) + ctx := testCtx(t) + defer bucket.CloseAndDelete(ctx) + require.NoError(t, err) + c := bucket.DefaultDataStore() + + // set expiry long enough that Close will happen first + exp := Exp(time.Now().Add(1 * time.Second).Unix()) + requireAddRaw(t, c, "docID", exp, []byte("v1")) + bucket.Close(ctx) + // sleep to ensure we won't panic + time.Sleep(2 * time.Second) +} + func TestUriFromPathWindows(t *testing.T) { ensureNoLeaks(t) if runtime.GOOS != "windows" { diff --git a/collection.go b/collection.go index a2a776b..93c62b9 100644 --- a/collection.go +++ b/collection.go @@ -357,22 +357,26 @@ func (c *Collection) WriteCas(key string, flags int, exp Exp, cas CAS, val any, return } +// Remove creates a document tombstone. It removes the document's value and user xattrs. func (c *Collection) Remove(key string, cas CAS) (casOut CAS, err error) { traceEnter("Remove", "%q, 0x%x", key, cas) - casOut, err = c.remove(key, &cas) + casOut, err = c.remove(key, &cas, checkBucketClosed) traceExit("Remove", err, "0x%x", casOut) return } +// Delete creates a document tombstone. It removes the document's value and user xattrs. Equivalent to Remove without a CAS check. func (c *Collection) Delete(key string) (err error) { traceEnter("Delete", "%q", key) - _, err = c.remove(key, nil) + _, err = c.remove(key, nil, checkBucketClosed) traceExit("Delete", err, "ok") return err } -func (c *Collection) remove(key string, ifCas *CAS) (casOut CAS, err error) { - err = c.withNewCas(func(txn *sql.Tx, newCas CAS) (e *event, err error) { +// remove creates a document tombstone. It removes the document's value and user xattrs. checkClosed will allow removing the document even the bucket instance is "closed". +func (c *Collection) remove(key string, ifCas *CAS, checkClosed bucketClosedCheck) (casOut CAS, err error) { + fmt.Println("remove", checkClosed) + err = c.withNewCasAndBucketClosedCheck(func(txn *sql.Tx, newCas CAS) (e *event, err error) { // Get the doc, possibly checking cas: var cas CAS var rawXattrs []byte @@ -416,7 +420,7 @@ func (c *Collection) remove(key string, ifCas *CAS) (casOut CAS, err error) { } casOut = newCas return - }) + }, checkClosed) return } @@ -518,14 +522,14 @@ func (c *Collection) IsSupported(feature sgbucket.BucketStoreFeature) bool { //////// EXPIRATION -// Immediately deletes all expired documents in this collection. -func (c *Collection) ExpireDocuments() (count int64, err error) { - traceEnter("ExpireDocuments", "") - defer func() { traceExit("ExpireDocuments", err, "%d", count) }() +// _expireDocuments immediately deletes all expired documents in this collection. +func (c *Collection) expireDocuments() (count int64, err error) { + traceEnter("_expireDocuments", "") + defer func() { traceExit("_expireDocuments", err, "%d", count) }() // First find all the expired docs and collect their keys: exp := nowAsExpiry() - rows, err := c.db().Query(`SELECT key FROM documents + rows, err := c.bucket._underlyingDB().Query(`SELECT key FROM documents WHERE collection = ?1 AND exp > 0 AND exp <= ?2`, c.id, exp) if err != nil { return @@ -546,8 +550,10 @@ func (c *Collection) ExpireDocuments() (count int64, err error) { // will get its own db connection, and if the db only supports one connection (i.e. in-memory) // having both queries active would deadlock.) for _, key := range keys { - if c.Delete(key) == nil { + _, err = c.remove(key, nil, skipCheckBucketClosed) + if err == nil { count++ + } else { } } return @@ -581,6 +587,11 @@ func (c *Collection) setLastCas(txn *sql.Tx, cas CAS) (err error) { // Runs a function within a SQLite transaction, passing it a new CAS to assign to the // document being modified. The function returns an event to be posted. func (c *Collection) withNewCas(fn func(txn *sql.Tx, newCas CAS) (*event, error)) error { + return c.withNewCasAndBucketClosedCheck(fn, checkBucketClosed) +} + +// withNewCasAndBucketClosedCheck runs a function within a SQLite transaction like withNewCas. This allows the caller to bypass the bucket closed status, suitable for functions that need to run on the underlying bucket object. +func (c *Collection) withNewCasAndBucketClosedCheck(fn func(txn *sql.Tx, newCas CAS) (*event, error), checkClosed bucketClosedCheck) error { var e *event err := c.bucket.inTransaction(func(txn *sql.Tx) error { newCas, err := c.bucket.getLastCas(txn) @@ -592,7 +603,7 @@ func (c *Collection) withNewCas(fn func(txn *sql.Tx, newCas CAS) (*event, error) } } return err - }) + }, checkClosed) if err == nil && e != nil { c.postNewEvent(e) } diff --git a/designdoc.go b/designdoc.go index eca21e8..7914ac6 100644 --- a/designdoc.go +++ b/designdoc.go @@ -103,7 +103,7 @@ func (c *Collection) PutDDoc(_ context.Context, designDoc string, ddoc *sgbucket } c.forgetCachedViews(designDoc) return nil - }) + }, checkBucketClosed) traceExit("PutDDoc", err, "ok") return err } @@ -121,7 +121,7 @@ func (c *Collection) DeleteDDoc(designDoc string) error { } } return err - }) + }, checkBucketClosed) traceExit("DeleteDDoc", err, "ok") return err } diff --git a/expiration_manager.go b/expiration_manager.go new file mode 100644 index 0000000..32bb18f --- /dev/null +++ b/expiration_manager.go @@ -0,0 +1,104 @@ +// Copyright 2023-Present Couchbase, Inc. +// +// Use of this software is governed by the Business Source License included +// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified +// in that file, in accordance with the Business Source License, use of this +// software will be governed by the Apache License, Version 2.0, included in +// the file licenses/APL2.txt. + +package rosmar + +import ( + "sync" + "time" +) + +// expirationManager handles expiration for a given bucket. It stores a timer which will call expirationFunc to delete documents. The value of when the timer +type expirationManager struct { + mutex *sync.Mutex // mutex for synchronized access to expirationManager + timer *time.Timer // Schedules expiration of docs + nextExp *uint32 // Timestamp when expTimer will run (0 if never) + expirationFunc func() // Function to call when timer expires +} + +func newExpirationManager(expiractionFunc func()) *expirationManager { + var nextExp uint32 + return &expirationManager{ + mutex: &sync.Mutex{}, + nextExp: &nextExp, + expirationFunc: expiractionFunc, + } +} + +// stop stops existing timers and waits for any expiration processes to complete +func (e *expirationManager) stop() { + e.mutex.Lock() + defer e.mutex.Unlock() + if e.timer != nil { + e.timer.Stop() + } +} + +// _getNext returns the next expiration time, 0 if there is no scheduled expiration. +func (e *expirationManager) _getNext() uint32 { + return *e.nextExp +} + +// setNext sets the next expiration time and schedules an expiration to occur after that time. +func (e *expirationManager) setNext(exp uint32) { + e.mutex.Lock() + defer e.mutex.Unlock() + e._setNext(exp) +} + +// _clearNext clears the next expiration time. +func (e *expirationManager) _clearNext() { + var exp uint32 + e.nextExp = &exp +} + +// setNext sets the next expiration time and schedules an expiration to occur after that time. Requires caller to have acquired mutex. +func (e *expirationManager) _setNext(exp uint32) { + info("_setNext ", exp) + e.nextExp = &exp + if exp == 0 { + e.timer = nil + return + } + dur := expDuration(exp) + if dur < 0 { + dur = 0 + } + debug("EXP: Scheduling in %s", dur) + if e.timer == nil { + e.timer = time.AfterFunc(dur, e.runExpiry) + } else { + e.timer.Reset(dur) + } +} + +// scheduleExpirationAtOrBefore schedules the next expiration of documents to occur, from the minimum expiration value in the bucket. +func (e *expirationManager) scheduleExpirationAtOrBefore(exp uint32) { + e.mutex.Lock() + defer e.mutex.Unlock() + e._scheduleExpirationAtOrBefore(exp) +} + +// _scheduleExpirationAtOrBefore schedules the next expiration of documents to occur, from the minimum expiration value in the bucket. Requires the mutext to be held. +func (e *expirationManager) _scheduleExpirationAtOrBefore(exp uint32) { + if exp == 0 { + return + } + currentNextExp := e._getNext() + // if zero will unset the timer. + if currentNextExp == 0 || exp < currentNextExp { + e._setNext(exp) + } +} + +// runExpiry is called when the timer expires. It calls the expirationFunc and then reschedules the timer if necessary. +func (e *expirationManager) runExpiry() { + e.mutex.Lock() + defer e.mutex.Unlock() + e.expirationFunc() +} diff --git a/feeds.go b/feeds.go index d5799f4..5c52947 100644 --- a/feeds.go +++ b/feeds.go @@ -168,7 +168,7 @@ func (c *Collection) postNewEvent(e *event) { feedEvent := e.asFeedEvent() c.postEvent(feedEvent) - c.bucket.scheduleExpirationAtOrBefore(e.exp) + c.bucket.expManager._scheduleExpirationAtOrBefore(e.exp) /* // Tell collections of other buckets on the same db file to post the event too: diff --git a/views.go b/views.go index 3e779ab..f18a2e6 100644 --- a/views.go +++ b/views.go @@ -306,7 +306,7 @@ func (c *Collection) updateView(ctx context.Context, designDoc string, viewName view.lastCas = latestCas } return err - }) + }, checkBucketClosed) return view, err }