diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index cca75d5..673b27e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -61,11 +61,14 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 + - uses: actions/setup-go@v4 + with: + go-version: 1.21.3 + cache: false - name: golangci-lint uses: golangci/golangci-lint-action@v3 with: - version: v1.52.0 - args: --timeout 3m # this is slow only in github actions + version: v1.55.0 test-race: runs-on: ubuntu-latest diff --git a/.golangci.yml b/.golangci.yml index eb36e91..4adb74f 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -8,6 +8,9 @@ # config file for golangci-lint +run: + timeout: 3m + linters: enable: #- bodyclose # checks whether HTTP response body is closed successfully diff --git a/README.md b/README.md index 81f68bf..c315324 100644 --- a/README.md +++ b/README.md @@ -21,10 +21,6 @@ Rosmar supports: ## 1. Building and Using It -Rosmar requires an updated version of sg-bucket -- this is on the `feature/walrus-xattrs` branch. Rosmar's `go.mod` file points to the appropriate commit. - -To use Rosmar in Sync Gateway, check out the latter's `feature/walrus_xattrs` branch, in which Walrus has been replaced with Rosmar. - To run SG normally with a Rosmar bucket, use a non-persistent SG config file like this one: ```json @@ -126,6 +122,10 @@ The special URL `rosmar:/?mode=memory` opens an ephemeral in-memory database. Do > Note: The directory contains the SQLite database file `rosmar.sqlite` plus SQLite side files. But its contents should be considered opaque. +### Bucket Persistence + +For in memory buckets, closing a bucket does not delete the bucket from memory, representing how Couchbase Server would not delete the bucket. In order to delete the bucket from memory, call `Bucket.CloseAndDelete`. + ### Metadata Purging Rosmar does not purge tombstones (deleted documents) automatically; call the method `Bucket.PurgeTombstones()`. diff --git a/bucket-registry.go b/bucket-registry.go deleted file mode 100644 index ec692d4..0000000 --- a/bucket-registry.go +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright 2023-Present Couchbase, Inc. -// -// Use of this software is governed by the Business Source License included -// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified -// in that file, in accordance with the Business Source License, use of this -// software will be governed by the Apache License, Version 2.0, included in -// the file licenses/APL2.txt. - -package rosmar - -import ( - "sync" - - "golang.org/x/exp/slices" -) - -// The bucket registry tracks all open Buckets (except in-memory ones) by their URL. -// This is used by feeds, to forward events from the Bucket that created them to other Buckets on -// the same file that should also be notified. -// -// The registry is not bulletproof; it can be fooled into thinking Buckets on the same file aren't, -// if their URLs aren't identical. This can happen if: -// -// * one of the file paths goes through (different) symlinks -// * the paths are capitalized differently, on a case-preserving filesystem like macOS or Windows -// * there's some kind of character-encoding difference that the filesystem ignores -// -// The way to fix this is to ask the filesystem to canonicalize the path (e.g. by calling -// `realpath` or `fcntl(F_GETPATH)`) and use the canonical path as the key. -// Unfortunately Go doesn't seem to have an API for that. - -var bucketRegistry = map[string][]*Bucket{} // Maps URL to slice of Buckets at that URL -var bucketRegistryMutex sync.Mutex // Thread-safe access to bucketRegistry - -// Adds a newly opened Bucket to the registry. -func registerBucket(bucket *Bucket) { - url := bucket.url - if isInMemoryURL(url) { - return - } - debug("registerBucket %v at %s", bucket, url) - bucketRegistryMutex.Lock() - bucketRegistry[url] = append(bucketRegistry[url], bucket) - bucketRegistryMutex.Unlock() -} - -// Removes a Bucket from the registry. Must be called before closing. -func unregisterBucket(bucket *Bucket) { - url := bucket.url - if isInMemoryURL(url) { - return - } - debug("UNregisterBucket %v at %s", bucket, url) - bucketRegistryMutex.Lock() - defer bucketRegistryMutex.Unlock() - - buckets := bucketRegistry[url] - i := slices.Index(buckets, bucket) - if i < 0 { - warn("unregisterBucket couldn't find %v", bucket) - return - } - if len(buckets) == 1 { - delete(bucketRegistry, url) - } else { - // Copy the slice before mutating, in case a client is iterating it: - buckets = slices.Clone(buckets) - buckets[i] = nil // remove ptr that might be left in the underlying array, for gc - buckets = slices.Delete(buckets, i, i+1) - bucketRegistry[url] = buckets - } -} - -// Returns the array of Bucket instances open on a given URL. -func bucketsAtURL(url string) (buckets []*Bucket) { - if !isInMemoryURL(url) { - bucketRegistryMutex.Lock() - buckets = bucketRegistry[url] - bucketRegistryMutex.Unlock() - } - return -} diff --git a/bucket.go b/bucket.go index 0a59868..151cf79 100644 --- a/bucket.go +++ b/bucket.go @@ -9,6 +9,7 @@ package rosmar import ( + "context" "database/sql" _ "embed" "errors" @@ -16,7 +17,6 @@ import ( "io/fs" "net/url" "os" - "path" "path/filepath" "runtime" "strings" @@ -32,15 +32,17 @@ import ( // Rosmar implementation of a collection-aware bucket. // Implements sgbucket interfaces BucketStore, DynamicDataStoreBucket, DeletableStore, MutationFeedStore2. type Bucket struct { - url string // Filesystem path or other URL - name string // Bucket name - collections collectionsMap // Collections, indexed by DataStoreName - mutex sync.Mutex // mutex for synchronized access to Bucket - sqliteDB *sql.DB // SQLite database handle (do not access; call db() instead) - expTimer *time.Timer // Schedules expiration of docs - nextExp uint32 // Timestamp when expTimer will run (0 if never) - serial uint32 // Serial number for logging - inMemory bool // True if it's an in-memory database + url string // Filesystem path or other URL + name string // Bucket name + collections collectionsMap // Collections, indexed by DataStoreName + collectionFeeds map[sgbucket.DataStoreNameImpl][]*dcpFeed + mutex *sync.Mutex // mutex for synchronized access to Bucket + sqliteDB *sql.DB // SQLite database handle (do not access; call db() instead) + expTimer *time.Timer // Schedules expiration of docs + nextExp *uint32 // Timestamp when expTimer will run (0 if never) + serial uint32 // Serial number for logging + inMemory bool // True if it's an in-memory database + closed bool // represents state when it is closed } type collectionsMap = map[sgbucket.DataStoreNameImpl]*Collection @@ -76,33 +78,41 @@ const ( ReOpenExisting // Open an existing bucket, or fail if none exists. ) -// OpenBucketFromPath opens a bucket from a filesystem path. See OpenBucket for details. -func OpenBucketFromPath(path string, mode OpenMode) (*Bucket, error) { - return OpenBucket(uriFromPath(path), mode) -} - // Creates a new bucket, or opens an existing one. // // The URL should have the scheme 'rosmar' or 'file' and a filesystem path. // The path represents a directory; the SQLite database files will be created inside it. // Alternatively, the `InMemoryURL` can be given, to create an in-memory bucket with no file. -func OpenBucket(urlStr string, mode OpenMode) (bucket *Bucket, err error) { +func OpenBucket(urlStr string, bucketName string, mode OpenMode) (b *Bucket, err error) { traceEnter("OpenBucket", "%q, %d", urlStr, mode) defer func() { traceExit("OpenBucket", err, "ok") }() + u, err := encodeDBURL(urlStr) if err != nil { return nil, err } urlStr = u.String() + bucket := getCachedBucket(bucketName) + if bucket != nil { + if mode == CreateNew { + return nil, fs.ErrExist + } + if urlStr != bucket.url { + return nil, fmt.Errorf("bucket %q already exists at %q, will not open at %q", bucketName, bucket.url, urlStr) + } + registerBucket(bucket) + return bucket, nil + + } + query := u.Query() inMemory := query.Get("mode") == "memory" - var bucketName string if inMemory { if mode == ReOpenExisting { return nil, fs.ErrNotExist } - bucketName = "memory" + u = u.JoinPath(bucketName) } else { dir := u.Path if runtime.GOOS == "windows" { @@ -124,7 +134,6 @@ func OpenBucket(urlStr string, mode OpenMode) (bucket *Bucket, err error) { query.Set("mode", ifelse(mode == ReOpenExisting, "rw", "rwc")) u = u.JoinPath(kDBFilename) - bucketName = path.Base(dir) } // See https://github.com/mattn/go-sqlite3#connection-string @@ -160,12 +169,20 @@ func OpenBucket(urlStr string, mode OpenMode) (bucket *Bucket, err error) { db.SetMaxOpenConns(ifelse(inMemory, 1, kMaxOpenConnections)) bucket = &Bucket{ - url: urlStr, - sqliteDB: db, - collections: make(map[sgbucket.DataStoreNameImpl]*Collection), - inMemory: inMemory, - serial: serial, + url: urlStr, + sqliteDB: db, + collections: make(map[sgbucket.DataStoreNameImpl]*Collection), + collectionFeeds: make(map[sgbucket.DataStoreNameImpl][]*dcpFeed), + mutex: &sync.Mutex{}, + nextExp: new(uint32), + inMemory: inMemory, + serial: serial, } + defer func() { + if err != nil { + _ = bucket.CloseAndDelete(context.TODO()) + } + }() // Initialize the schema if necessary: var vers int @@ -180,6 +197,10 @@ func OpenBucket(urlStr string, mode OpenMode) (bucket *Bucket, err error) { } else { bucket.scheduleExpiration() } + err = bucket.setName(bucketName) + if err != nil { + return nil, err + } registerBucket(bucket) return bucket, err @@ -190,18 +211,14 @@ func OpenBucket(urlStr string, mode OpenMode) (bucket *Bucket, err error) { // bucket name. The bucket will be opened in a subdirectory of the directory URL. func OpenBucketIn(dirUrlStr string, bucketName string, mode OpenMode) (*Bucket, error) { if isInMemoryURL(dirUrlStr) { - bucket, err := OpenBucket(dirUrlStr, mode) - if err == nil { - err = bucket.SetName(bucketName) - } - return bucket, err + return OpenBucket(dirUrlStr, bucketName, mode) } u, err := parseDBFileURL(dirUrlStr) if err != nil { return nil, err } - return OpenBucket(u.JoinPath(bucketName).String(), mode) + return OpenBucket(u.JoinPath(bucketName).String(), bucketName, mode) } // Deletes the bucket at the given URL, i.e. the filesystem directory at its path, if it exists. @@ -221,10 +238,6 @@ func DeleteBucketAt(urlStr string) (err error) { return nil } - if len(bucketsAtURL(u.String())) > 0 { - return fmt.Errorf("there is a Bucket open at that URL") - } - // For safety's sake, don't delete just any directory. Ensure it contains a db file: dir := u.Path if runtime.GOOS == "windows" { @@ -288,8 +301,7 @@ func (bucket *Bucket) initializeSchema(bucketName string) (err error) { sql.Named("COLL", sgbucket.DefaultCollection), ) if err != nil { - _ = bucket.CloseAndDelete() - panic("Rosmar SQL schema is invalid: " + err.Error()) + return fmt.Errorf("Rosmar SQL schema is invalid: %w", err) } bucket.name = bucketName return @@ -306,11 +318,10 @@ func (bucket *Bucket) db() queryable { // Returns the database handle as a `queryable` interface value. This is the same as `db()` without locking. This is not safe to call without bucket.mutex being locked the caller. func (bucket *Bucket) _db() queryable { - if db := bucket.sqliteDB; db != nil { - return db - } else { + if bucket.closed { return closedDB{} } + return bucket.sqliteDB } // Runs a function within a SQLite transaction. @@ -322,7 +333,7 @@ func (bucket *Bucket) inTransaction(fn func(txn *sql.Tx) error) error { bucket.mutex.Lock() defer bucket.mutex.Unlock() - if bucket.sqliteDB == nil { + if bucket.closed { return ErrBucketClosed } @@ -360,6 +371,23 @@ func (bucket *Bucket) inTransaction(fn func(txn *sql.Tx) error) error { return remapError(err) } +// Make a copy of the bucket, but it holds the same underlying structures. +func (b *Bucket) copy() *Bucket { + r := &Bucket{ + url: b.url, + name: b.name, + collectionFeeds: b.collectionFeeds, + collections: make(collectionsMap), + mutex: b.mutex, + sqliteDB: b.sqliteDB, + expTimer: b.expTimer, + nextExp: b.nextExp, + serial: b.serial, + inMemory: b.inMemory, + } + return r +} + // uriFromPath converts a file path to a rosmar URI. On windows, these need to have forward slashes and drive letters will have an extra /, such as romsar://c:/foo/bar. func uriFromPath(path string) string { uri := "rosmar://" diff --git a/bucket_api.go b/bucket_api.go index 79ba186..3a26d5a 100644 --- a/bucket_api.go +++ b/bucket_api.go @@ -31,7 +31,7 @@ func (bucket *Bucket) GetURL() string { return bucket.url } func (bucket *Bucket) GetName() string { return bucket.name } // Renames the bucket. This doesn't affect its URL, only the value returned by GetName. -func (bucket *Bucket) SetName(name string) error { +func (bucket *Bucket) setName(name string) error { info("Bucket %s is now named %q", bucket, name) _, err := bucket.db().Exec(`UPDATE bucket SET name=?1`, name) if err == nil { @@ -57,6 +57,11 @@ func (bucket *Bucket) Close(_ context.Context) { bucket.mutex.Lock() defer bucket.mutex.Unlock() + bucket.closed = true +} + +// _closeSqliteDB closes the underlying sqlite database and shuts down dcpFeeds. Must have a lock to call this function. +func (bucket *Bucket) _closeSqliteDB() { if bucket.expTimer != nil { bucket.expTimer.Stop() } @@ -71,16 +76,11 @@ func (bucket *Bucket) Close(_ context.Context) { } // Closes a bucket and deletes its directory and files (unless it's in-memory.) -func (bucket *Bucket) CloseAndDelete() (err error) { - bucket.Close(context.TODO()) - +func (bucket *Bucket) CloseAndDelete(ctx context.Context) (err error) { bucket.mutex.Lock() defer bucket.mutex.Unlock() - if bucket.url != "" { - err = DeleteBucketAt(bucket.url) - bucket.url = "" - } - return err + bucket._closeSqliteDB() + return deleteBucket(ctx, bucket) } func (bucket *Bucket) IsSupported(feature sgbucket.BucketStoreFeature) bool { @@ -349,8 +349,8 @@ func (bucket *Bucket) scheduleExpirationAtOrBefore(exp uint32) { if exp > 0 { bucket.mutex.Lock() defer bucket.mutex.Unlock() - if exp < bucket.nextExp || bucket.nextExp == 0 { - bucket.nextExp = exp + if exp < *bucket.nextExp || *bucket.nextExp == 0 { + bucket.nextExp = &exp dur := expDuration(exp) if dur < 0 { dur = 0 @@ -367,7 +367,7 @@ func (bucket *Bucket) scheduleExpirationAtOrBefore(exp uint32) { func (bucket *Bucket) doExpiration() { bucket.mutex.Lock() - bucket.nextExp = 0 + bucket.nextExp = func(x uint32) *uint32 { return &x }(0) bucket.mutex.Unlock() debug("EXP: Running scheduled expiration...") diff --git a/bucket_registry.go b/bucket_registry.go new file mode 100644 index 0000000..32cae80 --- /dev/null +++ b/bucket_registry.go @@ -0,0 +1,137 @@ +// Copyright 2023-Present Couchbase, Inc. +// +// Use of this software is governed by the Business Source License included +// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified +// in that file, in accordance with the Business Source License, use of this +// software will be governed by the Apache License, Version 2.0, included in +// the file licenses/APL2.txt. + +package rosmar + +import ( + "context" + "sync" +) + +// The bucket registry tracks all open Buckets and refcounts them. This represents a cluster of buckets, one per bucket name. When OpenBucket is called, a Bucket instance is added to bucketRegistry, representing the canonical bucket object. This object will not be removed from the bucket registry until: + +// * In Memory bucket: bucket is not deleted until any Bucket's CloseAndDelete is closed. +// * On disk bucket: bucket is deleted from registry when all there are no open copies of the bucket in memory. Unlike in memory bucket, the bucket will stay persisted on disk to be reopened. +// +// Any Buckets returned by OpenBucket will be a copy of the canonical bucket object, which shares pointers to all mutable objects and copies of immutable objects. The difference between the canonical copy of the bucket is the `closed` state, representing when the bucket is no longer writeable. Sharing the data structures allows a single DCP prodcuer and expiry framework. + +// bucketRegistry tracks all open buckets +type bucketRegistry struct { + bucketCount map[string]uint // stores a reference count of open buckets + buckets map[string]*Bucket // stores a reference to each open bucket + lock sync.Mutex +} + +var cluster *bucketRegistry // global cluster registry +func init() { + cluster = &bucketRegistry{ + bucketCount: make(map[string]uint), + buckets: make(map[string]*Bucket), + } +} + +// registryBucket adds a newly opened Bucket to the registry. +func (r *bucketRegistry) registerBucket(bucket *Bucket) { + name := bucket.GetName() + debug("registerBucket %v %s at %s", bucket, name, bucket.url) + r.lock.Lock() + defer r.lock.Unlock() + _, ok := r.buckets[name] + if !ok { + b := bucket.copy() + r.buckets[name] = b + } + r.bucketCount[name] += 1 +} + +// getCachedBucket returns a bucket from the registry if it exists. +func (r *bucketRegistry) getCachedBucket(name string) *Bucket { + r.lock.Lock() + defer r.lock.Unlock() + bucket := r.buckets[name] + if bucket == nil { + return nil + } + return bucket.copy() +} + +// unregisterBucket removes a Bucket from the registry. Must be called before closing. +func (r *bucketRegistry) unregisterBucket(bucket *Bucket) { + name := bucket.name + debug("UNregisterBucket %v %s at %s", bucket, name, bucket.url) + r.lock.Lock() + defer r.lock.Unlock() + + bucketCount := r.bucketCount[name] + if bucketCount < 0 { + warn("unregisterBucket couldn't find %v", bucket) + return + } + if bucketCount == 1 { + delete(r.bucketCount, name) + // if an in memory bucket, don't close the sqlite db since it will vanish + if !bucket.inMemory { + bucket._closeSqliteDB() + delete(r.buckets, name) + } + return + } + r.bucketCount[name] -= 1 + return +} + +// deleteBucket deletes a bucket from the registry and disk. Closes all existing buckets of the same name. +func (r *bucketRegistry) deleteBucket(ctx context.Context, bucket *Bucket) error { + name := bucket.name + r.lock.Lock() + defer r.lock.Unlock() + + _, ok := r.buckets[name] + if ok { + delete(r.buckets, name) + } + delete(r.bucketCount, name) + return DeleteBucketAt(bucket.url) +} + +// getBucketNames returns a list of all bucket names in the bucketRegistry. +func (r *bucketRegistry) getBucketNames() []string { + r.lock.Lock() + defer r.lock.Unlock() + + names := make([]string, 0, len(r.buckets)) + for name := range r.buckets { + names = append(names, name) + } + return names +} + +// getCachedBucket returns an instance of a bucket. If there are other copies of this bucket already in memory, it will return this version. If this is an in memory bucket, the bucket will not be removed until deleteBucket is called. +func getCachedBucket(name string) *Bucket { + return cluster.getCachedBucket(name) +} + +// registryBucket adds a newly opened Bucket to the registry. +func registerBucket(bucket *Bucket) { + cluster.registerBucket(bucket) +} + +// unregisterBucket removes a Bucket from the registry. Must be called before closing. +func unregisterBucket(bucket *Bucket) { + cluster.unregisterBucket(bucket) +} + +// deleteBucket will delete a bucket from the registry and from disk. +func deleteBucket(ctx context.Context, bucket *Bucket) error { + return cluster.deleteBucket(ctx, bucket) +} + +// GetBucketNames returns a list of all bucket names. +func GetBucketNames() []string { + return cluster.getBucketNames() +} diff --git a/bucket_registry_test.go b/bucket_registry_test.go new file mode 100644 index 0000000..578879d --- /dev/null +++ b/bucket_registry_test.go @@ -0,0 +1,83 @@ +// Copyright 2023-Present Couchbase, Inc. +// +// Use of this software is governed by the Business Source License included +// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified +// in that file, in accordance with the Business Source License, use of this +// software will be governed by the Apache License, Version 2.0, included in +// the file licenses/APL2.txt. + +package rosmar + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestReuseInMemoryBucket(t *testing.T) { + ensureNoLeaks(t) + bucketName := strings.ToLower(t.Name()) + bucket1, err := OpenBucket(InMemoryURL, bucketName, CreateOrOpen) + require.NoError(t, err) + key := "foo" + body := []byte("bar") + require.NoError(t, bucket1.DefaultDataStore().Set("foo", 0, nil, body)) + require.False(t, bucket1.closed) + bucket1.Close(testCtx(t)) + defer func() { + assert.NoError(t, bucket1.CloseAndDelete(testCtx(t))) + assert.Len(t, GetBucketNames(), 0) + }() + + require.True(t, bucket1.closed) + require.Equal(t, []string{bucketName}, GetBucketNames()) + + bucket2, err := OpenBucket(InMemoryURL, bucketName, CreateOrOpen) + require.NoError(t, err) + require.False(t, bucket2.closed) + require.Equal(t, []string{bucketName}, GetBucketNames()) + var bucket2Body []byte + _, err = bucket2.DefaultDataStore().Get(key, &bucket2Body) + require.NoError(t, err) + require.Equal(t, body, bucket2Body) +} + +func TestBucketRegistryRefCountPersistentBucket(t *testing.T) { + ensureNoLeaks(t) + + bucketName := strings.ToLower(t.Name()) + bucket, err := OpenBucket(uriFromPath(t.TempDir()+"/"+bucketName), bucketName, CreateOrOpen) + require.NoError(t, err) + require.Equal(t, []string{bucketName}, GetBucketNames()) + bucket.Close(testCtx(t)) + require.Len(t, GetBucketNames(), 0) +} + +func TestDuplicateBucketNamesDifferentPath(t *testing.T) { + + ensureNoLeaks(t) + + bucketName := strings.ToLower(t.Name()) + path1 := uriFromPath(t.TempDir() + "/" + bucketName + "1") + path2 := uriFromPath(t.TempDir() + "/" + bucketName + "2") + + bucket1, err := OpenBucket(path1, bucketName, CreateOrOpen) + require.NoError(t, err) + require.Equal(t, []string{bucketName}, GetBucketNames()) + + bucket2, err := OpenBucket(path2, bucketName, CreateOrOpen) + require.ErrorContains(t, err, "already exists") + require.Equal(t, []string{bucketName}, GetBucketNames()) + + bucket1.Close(testCtx(t)) + require.Len(t, GetBucketNames(), 0) + + // Close bucket1, should allow bucket2 to open + bucket2, err = OpenBucket(path2, bucketName, CreateOrOpen) + require.NoError(t, err) + defer bucket2.Close(testCtx(t)) + require.Equal(t, []string{bucketName}, GetBucketNames()) + +} diff --git a/bucket_test.go b/bucket_test.go index 9c421c6..4d53e3e 100644 --- a/bucket_test.go +++ b/bucket_test.go @@ -12,10 +12,10 @@ import ( "context" "errors" "fmt" - "io/fs" "log" "os" "runtime" + "strings" "testing" "time" @@ -44,10 +44,10 @@ func makeTestBucket(t *testing.T) *Bucket { LoggingCallback = func(level LogLevel, fmt string, args ...any) { t.Logf(logLevelNamesPrint[level]+fmt, args...) } - bucket, err := OpenBucketFromPath(testBucketPath(t), CreateNew) + bucket, err := OpenBucket(uriFromPath(testBucketPath(t)), strings.ToLower(t.Name()), CreateNew) require.NoError(t, err) t.Cleanup(func() { - bucket.Close(testCtx(t)) + assert.NoError(t, bucket.CloseAndDelete(testCtx(t))) }) return bucket @@ -63,23 +63,30 @@ func requireAddRaw(t *testing.T, c sgbucket.DataStore, key string, exp Exp, valu require.True(t, added, "Doc was not added") } +func bucketCount(name string) uint { + cluster.lock.Lock() + defer cluster.lock.Unlock() + return cluster.bucketCount[name] +} + func TestNewBucket(t *testing.T) { + ensureNoLeaks(t) bucket := makeTestBucket(t) - assert.Equal(t, testBucketDirName, bucket.GetName()) + bucketName := strings.ToLower(t.Name()) + assert.Equal(t, bucketName, bucket.GetName()) assert.Contains(t, bucket.GetURL(), testBucketDirName) - url := bucket.url - buckets := bucketsAtURL(url) - assert.Contains(t, buckets, bucket) + require.Equal(t, uint(1), bucketCount(bucketName)) - bucket.Close(testCtx(t)) - assert.Empty(t, bucketsAtURL(url)) + require.NoError(t, bucket.CloseAndDelete(testCtx(t))) + require.Equal(t, uint(0), bucketCount(bucketName)) } func TestGetMissingBucket(t *testing.T) { + ensureNoLeaks(t) path := uriFromPath(testBucketPath(t)) require.NoError(t, DeleteBucketAt(path)) - bucket, err := OpenBucket(path, ReOpenExisting) + bucket, err := OpenBucket(path, strings.ToLower(t.Name()), ReOpenExisting) if runtime.GOOS == "windows" { assert.ErrorContains(t, err, "unable to open database file: The system cannot find the path specified") } else { @@ -89,9 +96,13 @@ func TestGetMissingBucket(t *testing.T) { } func TestCallClosedBucket(t *testing.T) { + ensureNoLeaks(t) bucket := makeTestBucket(t) c := bucket.DefaultDataStore() bucket.Close(testCtx(t)) + defer func() { + assert.NoError(t, bucket.CloseAndDelete(testCtx(t))) + }() _, err := bucket.ListDataStores() assert.ErrorContains(t, err, "bucket has been closed") _, _, err = c.GetRaw("foo") @@ -99,62 +110,73 @@ func TestCallClosedBucket(t *testing.T) { } func TestNewBucketInMemory(t *testing.T) { + ensureNoLeaks(t) assert.NoError(t, DeleteBucketAt(InMemoryURL)) - modes := []OpenMode{CreateNew, CreateOrOpen} - for _, mode := range modes { - bucket, err := OpenBucket(InMemoryURL, mode) - require.NoError(t, err) - require.NotNil(t, bucket) + testCases := []struct { + name string + mode OpenMode + }{ + { + name: "CreateNew", + mode: CreateNew, + }, + { + name: "CreateOrOpen", + mode: CreateOrOpen, + }, + } + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + bucket, err := OpenBucket(InMemoryURL, strings.ToLower(t.Name()), testCase.mode) + require.NoError(t, err) + require.NotNil(t, bucket) - assert.Empty(t, bucketsAtURL(bucket.url)) + require.Equal(t, uint(1), bucketCount(bucket.GetName())) - err = bucket.CloseAndDelete() - assert.NoError(t, err) - } + err = bucket.CloseAndDelete(testCtx(t)) + assert.NoError(t, err) - _, err := OpenBucket(InMemoryURL, ReOpenExisting) - assert.Error(t, err) - assert.Equal(t, err, fs.ErrNotExist) + assert.Empty(t, bucketCount(bucket.GetName())) + }) + } } var defaultCollection = dsName("_default", "_default") func TestTwoBucketsOneURL(t *testing.T) { + ensureNoLeaks(t) bucket1 := makeTestBucket(t) url := bucket1.url - bucket2, err := OpenBucket(url, CreateNew) + bucketName := strings.ToLower(t.Name()) + bucket2, err := OpenBucket(url, bucketName, CreateNew) require.ErrorContains(t, err, "already exists") require.Nil(t, bucket2) - bucket2, err = OpenBucket(url, ReOpenExisting) + bucket2, err = OpenBucket(url, bucketName, ReOpenExisting) require.NoError(t, err) t.Cleanup(func() { - bucket2.Close(testCtx(t)) + assert.NoError(t, bucket2.CloseAndDelete(testCtx(t))) }) - buckets := bucketsAtURL(url) - assert.Len(t, buckets, 2) - assert.Contains(t, buckets, bucket1) - assert.Contains(t, buckets, bucket2) + require.Equal(t, uint(2), bucketCount(bucketName)) bucket1.Close(testCtx(t)) - buckets = bucketsAtURL(url) - assert.Len(t, buckets, 1) - assert.Contains(t, buckets, bucket2) + require.Equal(t, uint(1), bucketCount(bucketName)) err = DeleteBucketAt(url) - assert.ErrorContains(t, err, "there is a Bucket open at that URL") + require.Error(t, err) - bucket2.Close(testCtx(t)) - assert.Empty(t, bucketsAtURL(url)) + require.NoError(t, bucket2.CloseAndDelete(testCtx(t))) + assert.Empty(t, bucketCount(bucketName)) err = DeleteBucketAt(url) assert.NoError(t, err) } func TestDefaultCollection(t *testing.T) { + ensureNoLeaks(t) bucket := makeTestBucket(t) // Initially one collection: @@ -164,10 +186,11 @@ func TestDefaultCollection(t *testing.T) { coll := bucket.DefaultDataStore() assert.NotNil(t, coll) - assert.Equal(t, "RosmarTest._default._default", coll.GetName()) + assert.Equal(t, strings.ToLower(t.Name())+"._default._default", coll.GetName()) } func TestCreateCollection(t *testing.T) { + ensureNoLeaks(t) bucket := makeTestBucket(t) collName := dsName("_default", "foo") @@ -177,7 +200,7 @@ func TestCreateCollection(t *testing.T) { coll, err := bucket.NamedDataStore(collName) assert.NoError(t, err) assert.NotNil(t, coll) - assert.Equal(t, "RosmarTest._default.foo", coll.GetName()) + assert.Equal(t, strings.ToLower(t.Name())+"._default.foo", coll.GetName()) colls, err := bucket.ListDataStores() assert.NoError(t, err) @@ -187,6 +210,7 @@ func TestCreateCollection(t *testing.T) { //////// MULTI-COLLECTION: func TestMultiCollectionBucket(t *testing.T) { + ensureNoLeaks(t) ensureNoLeakedFeeds(t) huddle := makeTestBucket(t) @@ -261,8 +285,8 @@ func TestGetPersistentMultiCollectionBucket(t *testing.T) { huddle.Close(testCtx(t)) // Reopen persisted collection bucket - loadedHuddle, loadedErr := OpenBucket(huddleURL, ReOpenExisting) - assert.NoError(t, loadedErr) + loadedHuddle, loadedErr := OpenBucket(huddleURL, strings.ToLower(t.Name()), ReOpenExisting) + require.NoError(t, loadedErr) // validate contents var loadedValue interface{} @@ -291,8 +315,8 @@ func TestGetPersistentMultiCollectionBucket(t *testing.T) { loadedHuddle.Close(testCtx(t)) // Reopen persisted collection bucket again to ensure dropped collection is not present - reloadedHuddle, reloadedErr := OpenBucket(huddleURL, ReOpenExisting) - assert.NoError(t, reloadedErr) + reloadedHuddle, reloadedErr := OpenBucket(huddleURL, strings.ToLower(t.Name()), ReOpenExisting) + require.NoError(t, reloadedErr) // reopen dropped collection, verify that previous data is not present var reloadedValue interface{} @@ -308,14 +332,14 @@ func TestGetPersistentMultiCollectionBucket(t *testing.T) { assert.Equal(t, "c2_value", reloadedValue) // Close and Delete the bucket, should delete underlying collections - require.NoError(t, reloadedHuddle.CloseAndDelete()) + require.NoError(t, reloadedHuddle.CloseAndDelete(testCtx(t))) // Attempt to reopen deleted bucket - _, err = OpenBucket(huddleURL, ReOpenExisting) + _, err = OpenBucket(huddleURL, strings.ToLower(t.Name()), ReOpenExisting) assert.Error(t, err) // Create new bucket at same path: - postDeleteHuddle, err := OpenBucket(huddleURL, CreateNew) + postDeleteHuddle, err := OpenBucket(huddleURL, strings.ToLower(t.Name()), CreateNew) require.NoError(t, err) var postDeleteValue interface{} postDeleteC2, err := postDeleteHuddle.NamedDataStore(dsName("scope1", "collection2")) @@ -323,10 +347,11 @@ func TestGetPersistentMultiCollectionBucket(t *testing.T) { _, err = postDeleteC2.Get("doc1", &postDeleteValue) require.Error(t, err) require.True(t, errors.As(err, &sgbucket.MissingError{})) - require.NoError(t, postDeleteHuddle.CloseAndDelete()) + require.NoError(t, postDeleteHuddle.CloseAndDelete(testCtx(t))) } func TestExpiration(t *testing.T) { + ensureNoLeaks(t) bucket := makeTestBucket(t) c := bucket.DefaultDataStore() @@ -379,6 +404,7 @@ func TestExpiration(t *testing.T) { } func TestUriFromPathWindows(t *testing.T) { + ensureNoLeaks(t) if runtime.GOOS != "windows" { t.Skip("This test is only for windows") } @@ -416,6 +442,7 @@ func TestUriFromPathWindows(t *testing.T) { } func TestUriFromPathNonWindows(t *testing.T) { + ensureNoLeaks(t) if runtime.GOOS == "windows" { t.Skip("This test is only for non-windows") } diff --git a/collection.go b/collection.go index daf6178..a2a776b 100644 --- a/collection.go +++ b/collection.go @@ -30,7 +30,6 @@ type Collection struct { bucket *Bucket id CollectionID // Row ID in collections table; public ID + 1 mutex sync.Mutex - feeds []*dcpFeed viewCache map[viewKey]*rosmarView } diff --git a/collection_test.go b/collection_test.go index 824d6c1..311f63f 100644 --- a/collection_test.go +++ b/collection_test.go @@ -24,7 +24,7 @@ import ( ) func TestDeleteThenAdd(t *testing.T) { - ensureNoLeakedFeeds(t) + ensureNoLeaks(t) coll := makeTestBucket(t).DefaultDataStore() var value interface{} @@ -41,7 +41,7 @@ func TestDeleteThenAdd(t *testing.T) { } func TestIncr(t *testing.T) { - ensureNoLeakedFeeds(t) + ensureNoLeaks(t) coll := makeTestBucket(t).DefaultDataStore() count, err := coll.Incr("count1", 1, 100, 0) assert.NoError(t, err, "Incr") @@ -62,7 +62,7 @@ func TestIncr(t *testing.T) { // Spawns 1000 goroutines that 'simultaneously' use Incr to increment the same counter by 1. func TestIncrAtomic(t *testing.T) { - ensureNoLeakedFeeds(t) + ensureNoLeaks(t) coll := makeTestBucket(t).DefaultDataStore() var waiters sync.WaitGroup numIncrements := 5 @@ -82,7 +82,7 @@ func TestIncrAtomic(t *testing.T) { } func TestAppend(t *testing.T) { - ensureNoLeakedFeeds(t) + ensureNoLeaks(t) coll := makeTestBucket(t).DefaultDataStore() exists, err := coll.Exists("key") @@ -105,7 +105,7 @@ func TestAppend(t *testing.T) { } func TestGets(t *testing.T) { - ensureNoLeakedFeeds(t) + ensureNoLeaks(t) coll := makeTestBucket(t).DefaultDataStore() @@ -181,7 +181,7 @@ func TestEvalSubdocPaths(t *testing.T) { } func initSubDocTest(t *testing.T) sgbucket.DataStore { - ensureNoLeakedFeeds(t) + ensureNoLeaks(t) coll := makeTestBucket(t).DefaultDataStore() require.True(t, coll.IsSupported(sgbucket.BucketStoreFeatureSubdocOperations)) @@ -260,7 +260,7 @@ func TestInsertSubDoc(t *testing.T) { } func TestWriteCas(t *testing.T) { - ensureNoLeakedFeeds(t) + ensureNoLeaks(t) coll := makeTestBucket(t).DefaultDataStore() @@ -336,7 +336,7 @@ func TestWriteCas(t *testing.T) { } func TestRemove(t *testing.T) { - ensureNoLeakedFeeds(t) + ensureNoLeaks(t) coll := makeTestBucket(t).DefaultDataStore() @@ -454,6 +454,11 @@ func addToCollection(t *testing.T, coll sgbucket.DataStore, key string, exp uint require.True(t, added, "Expected doc to be added") } +func ensureNoLeaks(t *testing.T) { + t.Cleanup(func() { assert.Len(t, GetBucketNames(), 0) }) + ensureNoLeakedFeeds(t) +} + func ensureNoLeakedFeeds(t *testing.T) { if !assert.Equal(t, int32(0), atomic.LoadInt32(&activeFeedCount), "Previous test left unclosed Tap/DCP feeds") { return diff --git a/feeds.go b/feeds.go index efde4e5..d5799f4 100644 --- a/feeds.go +++ b/feeds.go @@ -134,9 +134,9 @@ func (c *Collection) StartDCPFeed( feed.events.push(nil) // push an eof } else { // Register the feed with the collection for future notifications: - c.mutex.Lock() - c.feeds = append(c.feeds, feed) - c.mutex.Unlock() + c.bucket.mutex.Lock() + c.bucket.collectionFeeds[c.DataStoreNameImpl] = append(c.bucket.collectionFeeds[c.DataStoreNameImpl], feed) + c.bucket.mutex.Unlock() } go feed.run() return nil @@ -170,20 +170,22 @@ func (c *Collection) postNewEvent(e *event) { c.postEvent(feedEvent) c.bucket.scheduleExpirationAtOrBefore(e.exp) - // Tell collections of other buckets on the same db file to post the event too: - for _, otherBucket := range bucketsAtURL(c.bucket.url) { - if otherBucket != c.bucket { - if otherCollection := otherBucket.getOpenCollectionByID(c.id); otherCollection != nil { - otherCollection.postEvent(feedEvent) + /* + // Tell collections of other buckets on the same db file to post the event too: + for _, otherBucket := range bucketsAtURL(c.bucket.url) { + if otherBucket != c.bucket { + if otherCollection := otherBucket.getOpenCollectionByID(c.id); otherCollection != nil { + otherCollection.postEvent(feedEvent) + } } } - } + */ } func (c *Collection) postEvent(event *sgbucket.FeedEvent) { - c.mutex.Lock() - feeds := c.feeds - c.mutex.Unlock() + c.bucket.mutex.Lock() + feeds := c.bucket.collectionFeeds[c.DataStoreNameImpl] + c.bucket.mutex.Unlock() for _, feed := range feeds { if feed != nil { @@ -200,10 +202,10 @@ func (c *Collection) postEvent(event *sgbucket.FeedEvent) { // stops all feeds. Caller MUST hold the bucket's lock. func (c *Collection) _stopFeeds() { - for _, feed := range c.feeds { + for _, feed := range c.bucket.collectionFeeds[c.DataStoreNameImpl] { feed.close() } - c.feeds = nil + c.bucket.collectionFeeds = nil } //////// DCPFEED: diff --git a/feeds_test.go b/feeds_test.go index d9a10fc..e5f7e7e 100644 --- a/feeds_test.go +++ b/feeds_test.go @@ -11,6 +11,7 @@ package rosmar import ( "context" "fmt" + "strings" "sync" "testing" "time" @@ -74,7 +75,7 @@ func TestMutations(t *testing.T) { e.TimeReceived = time.Time{} assert.Equal(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpDeletion, Key: []byte("eskimo"), Cas: 7, DataType: sgbucket.FeedDataTypeRaw}, e) - bucket.Close(testCtx(t)) + require.NoError(t, bucket.CloseAndDelete(testCtx(t))) _, ok := <-doneChan assert.False(t, ok) @@ -190,7 +191,7 @@ func TestCrossBucketEvents(t *testing.T) { addToCollection(t, c, "charlie", 0, "C") // Open a 2nd bucket on the same file, to receive events: - bucket2, err := OpenBucket(bucket.url, ReOpenExisting) + bucket2, err := OpenBucket(bucket.url, strings.ToLower(t.Name()), ReOpenExisting) require.NoError(t, err) t.Cleanup(func() { bucket2.Close(testCtx(t)) @@ -212,7 +213,7 @@ func TestCrossBucketEvents(t *testing.T) { readExpectedEventsDEF(t, events2, 4) bucket.Close(testCtx(t)) - bucket2.Close(testCtx(t)) + require.NoError(t, bucket2.CloseAndDelete(testCtx(t))) _, ok := <-doneChan assert.False(t, ok) diff --git a/go.mod b/go.mod index 16e5df7..3cd62aa 100644 --- a/go.mod +++ b/go.mod @@ -3,11 +3,10 @@ module github.com/couchbaselabs/rosmar go 1.19 require ( - github.com/couchbase/sg-bucket v0.0.0-20231003103030-627c70e18148 + github.com/couchbase/sg-bucket v0.0.0-20231116231254-16c1ad8b2483 github.com/google/uuid v1.3.0 github.com/mattn/go-sqlite3 v1.14.17 github.com/stretchr/testify v1.8.4 - golang.org/x/exp v0.0.0-20230801115018-d63ba01acd4b ) require ( diff --git a/go.sum b/go.sum index 24f42ea..ff923df 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= -github.com/couchbase/sg-bucket v0.0.0-20231003103030-627c70e18148 h1:9E3u0yA+be219iLLOjuYgagOfM7UqtZ0YIhMXysJVKs= -github.com/couchbase/sg-bucket v0.0.0-20231003103030-627c70e18148/go.mod h1:hy6J0RXx/Ry+5EiI8VVMetsVfBXQq5/djQLbvfRau0k= +github.com/couchbase/sg-bucket v0.0.0-20231116231254-16c1ad8b2483 h1:K6y82On0A3coA+GwW+HGKIwpCpca6ZSvTAJwwTmzCrg= +github.com/couchbase/sg-bucket v0.0.0-20231116231254-16c1ad8b2483/go.mod h1:hy6J0RXx/Ry+5EiI8VVMetsVfBXQq5/djQLbvfRau0k= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -17,8 +17,6 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -golang.org/x/exp v0.0.0-20230801115018-d63ba01acd4b h1:r+vk0EmXNmekl0S0BascoeeoHk/L7wmaW2QF90K+kYI= -golang.org/x/exp v0.0.0-20230801115018-d63ba01acd4b/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4= golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=