Skip to content

Commit

Permalink
CBG-3271 allow in memory buckets to persist
Browse files Browse the repository at this point in the history
- in memory buckets will now live for the lifecycle of the program until
  Bucket.CloseAndDelete is called. This facilitates bucket closing without
  removing the data, which is a common use in Sync Gateway tests that
  use persistent config to update database configuration.
- When a bucket is first created, a copy is stored in the bucket
  registry, and this is never closed until:
  	- in memory: CloseAndDelete is called
	- on disk: Close is called to bring refcount of buckets to 0
- force bucket name to be defined, and do not let multiple copies of a
  persistent bucket to be opened if they are different paths. Note, this
  is a blunt instrument, and it is indiscriminate to path manipulations.
  It does path matching on lexography, not normalized paths. The idea is
  to be safer. Sync Gateway is not architected to support multiple
  buckets with the same name that do not have the same backing data.

Implementation:

The global state of a bucket is representative of two things:

- *sql.DB represnting the underlying sqlite connection
- dcpFeeds for each connection that exists

The sql.DB connection can be opened multiple times on the same path, and
this was the original implementation of rosmar. However, it can't be
opened multiple times for in memory files except via cache=shared query
parameter to sqlite3_open. This ended up causing behavior that I didn't
understand, and is not typically a supported in sqlite, since multiple
databases are managed with a WAL when on disk.

DCP feeds in rosmar work by having pushing events on a CUD operation to a
queue, which can be read by any running feeds. Instead of having
separate feeds for each copy of Bucket and publishing them via
`bucketsAtUrl`, we now only have a single canonical set of bucket feeds.
Moved this field from a Collection to Bucket.

Whether a bucket is open or closed is controlled by Bucket._db() that is
called by any CRUD operations. A Collection has a pointer to its parent
bucket. Each Bucket opened now will create a Collection dynamically, but
these share pointers to the cached in memory versions.
  • Loading branch information
torcolvin committed Nov 8, 2023
1 parent 6d4a3e8 commit a42502b
Show file tree
Hide file tree
Showing 12 changed files with 365 additions and 155 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ Rosmar supports:

## 1. Building and Using It

Rosmar requires an updated version of sg-bucket -- this is on the `feature/walrus-xattrs` branch. Rosmar's `go.mod` file points to the appropriate commit.

To use Rosmar in Sync Gateway, check out the latter's `feature/walrus_xattrs` branch, in which Walrus has been replaced with Rosmar.

To run SG normally with a Rosmar bucket, use a non-persistent SG config file like this one:

```json
Expand Down Expand Up @@ -126,6 +122,10 @@ The special URL `rosmar:/?mode=memory` opens an ephemeral in-memory database. Do

> Note: The directory contains the SQLite database file `rosmar.sqlite` plus SQLite side files. But its contents should be considered opaque.

### Bucket Persistence

For in memory buckets, closing a bucket does not delete the bucket from memory, representing how Couchbase Server would not delete the bucket. In order to delete the bucket from memory, call `Bucket.CloseAndDelete`.

### Metadata Purging

Rosmar does not purge tombstones (deleted documents) automatically; call the method `Bucket.PurgeTombstones()`.
Expand Down
131 changes: 97 additions & 34 deletions bucket-registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
package rosmar

import (
"context"
"slices"
"sync"

"golang.org/x/exp/slices"
)

// The bucket registry tracks all open Buckets (except in-memory ones) by their URL.
Expand All @@ -29,54 +29,117 @@ import (
// `realpath` or `fcntl(F_GETPATH)`) and use the canonical path as the key.
// Unfortunately Go doesn't seem to have an API for that.

var bucketRegistry = map[string][]*Bucket{} // Maps URL to slice of Buckets at that URL
var bucketRegistryMutex sync.Mutex // Thread-safe access to bucketRegistry
type bucketRegistry struct {
byURL map[string][]*Bucket
inMemoryRef map[string]*Bucket
lock sync.Mutex
}

// Adds a newly opened Bucket to the registry.
func registerBucket(bucket *Bucket) {
var cluster *bucketRegistry // global cluster registry
func init() {
cluster = &bucketRegistry{
byURL: make(map[string][]*Bucket),
inMemoryRef: make(map[string]*Bucket),
}
}

// registryBucket adds a newly opened Bucket to the registry.
func (r *bucketRegistry) registerBucket(bucket *Bucket) {
url := bucket.url
if isInMemoryURL(url) {
return
name := bucket.GetName()
debug("registerBucket %v %s at %s", bucket, name, url)
r.lock.Lock()
defer r.lock.Unlock()
_, ok := r.inMemoryRef[name]
if !ok {
b := bucket.copy()
r.inMemoryRef[name] = b
}
debug("registerBucket %v at %s", bucket, url)
bucketRegistryMutex.Lock()
bucketRegistry[url] = append(bucketRegistry[url], bucket)
bucketRegistryMutex.Unlock()
r.byURL[url] = append(r.byURL[url], bucket)
}

func (r *bucketRegistry) getInMemoryBucket(name string) *Bucket {
r.lock.Lock()
defer r.lock.Unlock()
return r.inMemoryRef[name]
}

// Removes a Bucket from the registry. Must be called before closing.
func unregisterBucket(bucket *Bucket) {
func (r *bucketRegistry) unregisterBucket(bucket *Bucket) {
url := bucket.url
if isInMemoryURL(url) {
return
}
debug("UNregisterBucket %v at %s", bucket, url)
bucketRegistryMutex.Lock()
defer bucketRegistryMutex.Unlock()
name := bucket.name
debug("UNregisterBucket %v at %s", bucket, name, url)
r.lock.Lock()
defer r.lock.Unlock()

buckets := bucketRegistry[url]
buckets := r.byURL[url]
i := slices.Index(buckets, bucket)
if i < 0 {
warn("unregisterBucket couldn't find %v", bucket)
return
}
if len(buckets) == 1 {
delete(bucketRegistry, url)
} else {
// Copy the slice before mutating, in case a client is iterating it:
buckets = slices.Clone(buckets)
buckets[i] = nil // remove ptr that might be left in the underlying array, for gc
buckets = slices.Delete(buckets, i, i+1)
bucketRegistry[url] = buckets
delete(r.byURL, url)
if !bucket.inMemory {
bucket._closeAllInstances()
delete(r.inMemoryRef, name)
}
return
}
// Copy the slice before mutating, in case a client is iterating it:
buckets = slices.Clone(buckets)
buckets[i] = nil // remove ptr that might be left in the underlying array, for gc
buckets = slices.Delete(buckets, i, i+1)
r.byURL[url] = buckets
return
}

// Returns the array of Bucket instances open on a given URL.
func bucketsAtURL(url string) (buckets []*Bucket) {
if !isInMemoryURL(url) {
bucketRegistryMutex.Lock()
buckets = bucketRegistry[url]
bucketRegistryMutex.Unlock()
// Delete bucket from the registry and disk. Closes all existing buckets.
func (r *bucketRegistry) deleteBucket(ctx context.Context, bucket *Bucket) error {
url := bucket.url
name := bucket.name
r.lock.Lock()
defer r.lock.Unlock()

_, ok := r.inMemoryRef[name]
if ok {
delete(r.inMemoryRef, name)
}
return
delete(r.byURL, url)
return DeleteBucketAt(url)
}

func (r *bucketRegistry) getBucketNames() []string {
r.lock.Lock()
defer r.lock.Unlock()

names := make([]string, 0, len(r.inMemoryRef))
for name := range r.inMemoryRef {
names = append(names, name)
}
return names
}

// getInMemoryBucket returns an instance of a bucket. If there are other copies of this bucket already in memory, it will return this version. If this is an in memory bucket, the bucket will not be removed until deleteBucket is called.
func getInMemoryBucket(name string) *Bucket {
return cluster.getInMemoryBucket(name)
}

func registerBucket(bucket *Bucket) {
cluster.registerBucket(bucket)
}

// eeregisterBucket removes a Bucket from the registry. Must be called before closing.
func unregisterBucket(bucket *Bucket) {
cluster.unregisterBucket(bucket)
}

// deleteBucket will delete a bucket from the registry and from disk.
func deleteBucket(ctx context.Context, bucket *Bucket) error {
return cluster.deleteBucket(ctx, bucket)
}

// getBucketNames returns a list of all bucket names.
func getBucketNames() []string {
return cluster.getBucketNames()
}
Loading

0 comments on commit a42502b

Please sign in to comment.