Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-3271 make sure expiration works on a closed bucket #17

Merged
merged 7 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 27 additions & 26 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,12 @@ type Bucket struct {
name string // Bucket name
collections collectionsMap // Collections, indexed by DataStoreName
collectionFeeds map[sgbucket.DataStoreNameImpl][]*dcpFeed
mutex *sync.Mutex // mutex for synchronized access to Bucket
sqliteDB *sql.DB // SQLite database handle (do not access; call db() instead)
expTimer *time.Timer // Schedules expiration of docs
nextExp *uint32 // Timestamp when expTimer will run (0 if never)
serial uint32 // Serial number for logging
inMemory bool // True if it's an in-memory database
closed bool // represents state when it is closed
mutex *sync.Mutex // mutex for synchronized access to Bucket
sqliteDB *sql.DB // SQLite database handle (do not access; call db() instead)
expManager *expiryManager // expiration manager for bucket
serial uint32 // Serial number for logging
inMemory bool // True if it's an in-memory database
closed bool // represents state when it is closed
}

type collectionsMap = map[sgbucket.DataStoreNameImpl]*Collection
Expand Down Expand Up @@ -87,23 +86,19 @@ func OpenBucket(urlStr string, bucketName string, mode OpenMode) (b *Bucket, err
traceEnter("OpenBucket", "%q, %d", urlStr, mode)
defer func() { traceExit("OpenBucket", err, "ok") }()

ctx := context.TODO()
u, err := encodeDBURL(urlStr)
if err != nil {
return nil, err
}
urlStr = u.String()

bucket := getCachedBucket(bucketName)
bucket, err := getCachedBucket(bucketName, urlStr, mode)
if err != nil {
return nil, err
}
if bucket != nil {
if mode == CreateNew {
return nil, fs.ErrExist
}
if urlStr != bucket.url {
return nil, fmt.Errorf("bucket %q already exists at %q, will not open at %q", bucketName, bucket.url, urlStr)
}
registerBucket(bucket)
return bucket, nil

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the handling on line 117 to not support ReOpenExisting for inMemory buckets still make sense with the bucket registry/cluster?


query := u.Query()
Expand Down Expand Up @@ -174,13 +169,13 @@ func OpenBucket(urlStr string, bucketName string, mode OpenMode) (b *Bucket, err
collections: make(map[sgbucket.DataStoreNameImpl]*Collection),
collectionFeeds: make(map[sgbucket.DataStoreNameImpl][]*dcpFeed),
mutex: &sync.Mutex{},
nextExp: new(uint32),
inMemory: inMemory,
serial: serial,
}
bucket.expManager = newExpirationManager(bucket.doExpiration)
defer func() {
if err != nil {
_ = bucket.CloseAndDelete(context.TODO())
_ = bucket.CloseAndDelete(ctx)
}
}()

Expand All @@ -194,16 +189,23 @@ func OpenBucket(urlStr string, bucketName string, mode OpenMode) (b *Bucket, err
if err = bucket.initializeSchema(bucketName); err != nil {
return nil, err
}
} else {
bucket.scheduleExpiration()
}
err = bucket.setName(bucketName)
if err != nil {
return nil, err
}

registerBucket(bucket)
return bucket, err
exists, bucketCopy := registerBucket(bucket)
// someone else beat registered the bucket in the registry, that's OK we'll close ours
if exists {
bucket.Close(ctx)
}
// only schedule expiration if bucket is not new. This doesn't need to be locked because only one bucket will execute this code.
if vers != 0 {
bucket._scheduleExpiration()
}

return bucketCopy, err
}

// Creates or re-opens a bucket, like OpenBucket.
Expand Down Expand Up @@ -307,7 +309,7 @@ func (bucket *Bucket) initializeSchema(bucketName string) (err error) {
return
}

// Returns the database handle as a `queryable` interface value.
// db returns the database handle as a `queryable` interface value.
// If the bucket has been closed, it returns a special `closedDB` value that will return
// ErrBucketClosed from any call.
func (bucket *Bucket) db() queryable {
Expand All @@ -316,7 +318,7 @@ func (bucket *Bucket) db() queryable {
return bucket._db()
}

// Returns the database handle as a `queryable` interface value. This is the same as `db()` without locking. This is not safe to call without bucket.mutex being locked the caller.
// db returns the database handle as a `queryable` interface value. This is the same as `db()` without locking. This is not safe to call without bucket.mutex being locked the caller.
func (bucket *Bucket) _db() queryable {
if bucket.closed {
return closedDB{}
Expand Down Expand Up @@ -380,8 +382,7 @@ func (b *Bucket) copy() *Bucket {
collections: make(collectionsMap),
mutex: b.mutex,
sqliteDB: b.sqliteDB,
expTimer: b.expTimer,
nextExp: b.nextExp,
expManager: b.expManager,
serial: b.serial,
inMemory: b.inMemory,
}
Expand Down
54 changes: 16 additions & 38 deletions bucket_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"database/sql"
"errors"
"fmt"
"time"

sgbucket "github.com/couchbase/sg-bucket"
)
Expand Down Expand Up @@ -62,9 +61,7 @@ func (bucket *Bucket) Close(_ context.Context) {

// _closeSqliteDB closes the underlying sqlite database and shuts down dcpFeeds. Must have a lock to call this function.
func (bucket *Bucket) _closeSqliteDB() {
if bucket.expTimer != nil {
bucket.expTimer.Stop()
}
bucket.expManager.stop()
for _, c := range bucket.collections {
c.close()
}
Expand Down Expand Up @@ -174,6 +171,7 @@ func (bucket *Bucket) DropDataStore(name sgbucket.DataStoreName) error {
return bucket.dropCollection(sc)
}

// ListDataStores returns a list of the names of all data stores in the bucket.
func (bucket *Bucket) ListDataStores() (result []sgbucket.DataStoreName, err error) {
traceEnter("ListDataStores", "%s", bucket)
defer func() { traceExit("ListDataStores", err, "%v", result) }()
Expand Down Expand Up @@ -309,8 +307,8 @@ func (bucket *Bucket) dropCollection(name sgbucket.DataStoreNameImpl) error {

//////// EXPIRATION (CUSTOM API):

// Returns the earliest expiration time of any document, or 0 if none.
func (bucket *Bucket) NextExpiration() (exp Exp, err error) {
// nextExpiration returns the earliest expiration time of any document, or 0 if none.
func (bucket *Bucket) nextExpiration() (exp Exp, err error) {
var expVal sql.NullInt64
row := bucket.db().QueryRow(`SELECT min(exp) FROM documents WHERE exp > 0`)
err = scan(row, &expVal)
Expand All @@ -320,8 +318,8 @@ func (bucket *Bucket) NextExpiration() (exp Exp, err error) {
return
}

// Immediately deletes all expired documents in this bucket.
func (bucket *Bucket) ExpireDocuments() (int64, error) {
// expireDocuments immediately deletes all expired documents in this bucket.
func (bucket *Bucket) expireDocuments() (int64, error) {
names, err := bucket.ListDataStores()
if err != nil {
return 0, err
Expand All @@ -330,7 +328,7 @@ func (bucket *Bucket) ExpireDocuments() (int64, error) {
for _, name := range names {
if coll, err := bucket.getCollection(name.(sgbucket.DataStoreNameImpl)); err != nil {
return 0, err
} else if n, err := coll.ExpireDocuments(); err != nil {
} else if n, err := coll.expireDocuments(); err != nil {
return 0, err
} else {
count += n
Expand All @@ -339,45 +337,25 @@ func (bucket *Bucket) ExpireDocuments() (int64, error) {
return count, nil
}

func (bucket *Bucket) scheduleExpiration() {
if nextExp, err := bucket.NextExpiration(); err == nil && nextExp > 0 {
bucket.scheduleExpirationAtOrBefore(nextExp)
}
}

func (bucket *Bucket) scheduleExpirationAtOrBefore(exp uint32) {
if exp > 0 {
bucket.mutex.Lock()
defer bucket.mutex.Unlock()
if exp < *bucket.nextExp || *bucket.nextExp == 0 {
bucket.nextExp = &exp
dur := expDuration(exp)
if dur < 0 {
dur = 0
}
debug("EXP: Scheduling in %s", dur)
if bucket.expTimer == nil {
bucket.expTimer = time.AfterFunc(dur, bucket.doExpiration)
} else {
bucket.expTimer.Reset(dur)
}
}
// scheduleExpiration schedules the next expiration of documents to occur, from the minimum expiration value in the bucket. This requires locking expiration manager.
func (bucket *Bucket) _scheduleExpiration() {
if nextExp, err := bucket.nextExpiration(); err == nil && nextExp > 0 {
bucket.expManager._scheduleExpirationAtOrBefore(nextExp)
torcolvin marked this conversation as resolved.
Show resolved Hide resolved
}
}

func (bucket *Bucket) doExpiration() {
bucket.mutex.Lock()
bucket.nextExp = func(x uint32) *uint32 { return &x }(0)
bucket.mutex.Unlock()
bucket.expManager._clearNext()

debug("EXP: Running scheduled expiration...")
if n, err := bucket.ExpireDocuments(); err != nil {
logError("Bucket %s error expiring docs: %v", bucket, err)
if n, err := bucket.expireDocuments(); err != nil {
// If there's an error expiring docs, it means there is a programming error of a leaked expiration goroutine.
panic("Error expiring docs: " + err.Error())
} else if n > 0 {
info("Bucket %s expired %d docs", bucket, n)
}

bucket.scheduleExpiration()
bucket._scheduleExpiration()
}

// Completely removes all deleted documents (tombstones).
Expand Down
39 changes: 25 additions & 14 deletions bucket_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ package rosmar

import (
"context"
"fmt"
"io/fs"
"sync"
)

Expand All @@ -35,29 +37,38 @@ func init() {
}
}

// registryBucket adds a newly opened Bucket to the registry.
func (r *bucketRegistry) registerBucket(bucket *Bucket) {
// registerBucket adds a newly opened Bucket to the registry. Returns true if the bucket already exists, and a copy of the bucket to use.
func (r *bucketRegistry) registerBucket(bucket *Bucket) (bool, *Bucket) {
name := bucket.GetName()
debug("registerBucket %v %s at %s", bucket, name, bucket.url)
debug("_registerBucket %v %s at %s", bucket, name, bucket.url)
r.lock.Lock()
defer r.lock.Unlock()

_, ok := r.buckets[name]
if !ok {
b := bucket.copy()
r.buckets[name] = b
r.buckets[name] = bucket
}
r.bucketCount[name] += 1
return ok, r.buckets[name].copy()
}

// getCachedBucket returns a bucket from the registry if it exists.
func (r *bucketRegistry) getCachedBucket(name string) *Bucket {
func (r *bucketRegistry) getCachedBucket(name, url string, mode OpenMode) (*Bucket, error) {
r.lock.Lock()
defer r.lock.Unlock()
bucket := r.buckets[name]
if bucket == nil {
return nil
return nil, nil
}
if mode == CreateNew {
return nil, fs.ErrExist
}
if url != bucket.url {
return nil, fmt.Errorf("bucket %q already exists at %q, will not open at %q", name, bucket.url, url)
}
return bucket.copy()

r.bucketCount[name] += 1
return r.buckets[name].copy(), nil
}

// unregisterBucket removes a Bucket from the registry. Must be called before closing.
Expand Down Expand Up @@ -111,14 +122,14 @@ func (r *bucketRegistry) getBucketNames() []string {
return names
}

// getCachedBucket returns an instance of a bucket. If there are other copies of this bucket already in memory, it will return this version. If this is an in memory bucket, the bucket will not be removed until deleteBucket is called.
func getCachedBucket(name string) *Bucket {
return cluster.getCachedBucket(name)
// getCachedBucket returns an instance of a bucket. If there are other copies of this bucket already in memory, it will return this version. If this is an in memory bucket, the bucket will not be removed until deleteBucket is called. Returns an error if the bucket can not be opened, but nil error and nil bucket if there is no registered bucket.
func getCachedBucket(name, url string, mode OpenMode) (*Bucket, error) {
return cluster.getCachedBucket(name, url, mode)
}

// registryBucket adds a newly opened Bucket to the registry.
func registerBucket(bucket *Bucket) {
cluster.registerBucket(bucket)
// registryBucket adds a copy of a Bucket to the registry. Returns true if the bucket already exists.
func registerBucket(bucket *Bucket) (bool, *Bucket) {
return cluster.registerBucket(bucket)
}

// unregisterBucket removes a Bucket from the registry. Must be called before closing.
Expand Down
28 changes: 23 additions & 5 deletions bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func TestExpiration(t *testing.T) {
bucket := makeTestBucket(t)
c := bucket.DefaultDataStore()

exp, err := bucket.NextExpiration()
exp, err := bucket.nextExpiration()
require.NoError(t, err)
require.Equal(t, Exp(0), exp)

Expand All @@ -367,17 +367,17 @@ func TestExpiration(t *testing.T) {
requireAddRaw(t, c, "k3", 0, []byte("v3"))
requireAddRaw(t, c, "k4", exp4, []byte("v4"))

exp, err = bucket.NextExpiration()
exp, err = bucket.nextExpiration()
require.NoError(t, err)
// Usually this will return exp2, but if this is slow enough that the expiration goroutine runs to expire document k2, it can return exp4.
require.Contains(t, []Exp{exp2, exp4}, exp)

log.Printf("... waiting 1 sec ...")
time.Sleep(1 * time.Second)

exp, err = bucket.NextExpiration()
exp, err = bucket.nextExpiration()
require.NoError(t, err)
assert.Equal(t, exp4, exp)
require.Equal(t, int(exp4), int(exp))

_, _, err = c.GetRaw("k1")
assert.NoError(t, err)
Expand All @@ -391,7 +391,7 @@ func TestExpiration(t *testing.T) {
log.Printf("... waiting 2 secs ...")
time.Sleep(2 * time.Second)

exp, err = bucket.NextExpiration()
exp, err = bucket.nextExpiration()
require.NoError(t, err)
assert.Equal(t, uint32(0), exp)

Expand All @@ -403,6 +403,24 @@ func TestExpiration(t *testing.T) {
assert.Equal(t, int64(2), n)
}

func TestExpirationAfterClose(t *testing.T) {
t.Skip("Slow test useful for debugging issues with expiration")
bucket, err := OpenBucket(InMemoryURL, strings.ToLower(t.Name()), CreateNew)
ctx := testCtx(t)
defer func() {
assert.NoError(t, bucket.CloseAndDelete(ctx))
}()
require.NoError(t, err)
c := bucket.DefaultDataStore()

// set expiry long enough that Close will happen first
exp := Exp(time.Now().Add(1 * time.Second).Unix())
requireAddRaw(t, c, "docID", exp, []byte("v1"))
bucket.Close(ctx)
// sleep to ensure we won't panic
time.Sleep(2 * time.Second)
}

func TestUriFromPathWindows(t *testing.T) {
ensureNoLeaks(t)
if runtime.GOOS != "windows" {
Expand Down
Loading