Skip to content

Commit

Permalink
CBG-3271 allow in memory buckets to persist (#16)
Browse files Browse the repository at this point in the history
- in memory buckets will now live for the lifecycle of the program until
Bucket.CloseAndDelete is called. This facilitates bucket closing without
removing the data, which is a common use in Sync Gateway tests that use
persistent config to update database configuration.
- When a bucket is first created, a copy is stored in the bucket
registry, and this is never closed until:
  	- in memory: CloseAndDelete is called
	- on disk: Close is called to bring refcount of buckets to 0
- force bucket name to be defined, and do not let multiple copies of a
persistent bucket to be opened if they are different paths. Note, this
is a blunt instrument, and it is indiscriminate to path manipulations.
It does path matching on lexography, not normalized paths. The idea is
to be safer. Sync Gateway is not architected to support multiple buckets
with the same name that do not have the same backing data.

Implementation:

The global state of a bucket is representative of two things:

- *sql.DB represnting the underlying sqlite connection
- dcpFeeds for each connection that exists

The sql.DB connection can be opened multiple times on the same path, and
this was the original implementation of rosmar. However, it can't be
opened multiple times for in memory files except via cache=shared query
parameter to sqlite3_open. This ended up causing behavior that I didn't
understand, and is not typically a supported in sqlite, since multiple
databases are managed with a WAL when on disk.

DCP feeds in rosmar work by having pushing events on a CUD operation to
a queue, which can be read by any running feeds. Instead of having
separate feeds for each copy of Bucket and publishing them via
`bucketsAtUrl`, we now only have a single canonical set of bucket feeds.
Moved this field from a Collection to Bucket. This addresses
https://issues.couchbase.com/browse/CBG-3540

Whether a bucket is open or closed is controlled by Bucket._db() that is
called by any CRUD operations. A Collection has a pointer to its parent
bucket. Each Bucket opened now will create a Collection dynamically, but
these share pointers to the cached in memory versions.

Depends on couchbase/sg-bucket#110
  • Loading branch information
torcolvin authored Nov 16, 2023
1 parent 6d4a3e8 commit adb4806
Show file tree
Hide file tree
Showing 15 changed files with 420 additions and 217 deletions.
7 changes: 5 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,14 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-go@v4
with:
go-version: 1.21.3
cache: false
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
with:
version: v1.52.0
args: --timeout 3m # this is slow only in github actions
version: v1.55.0

test-race:
runs-on: ubuntu-latest
Expand Down
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

# config file for golangci-lint

run:
timeout: 3m

linters:
enable:
#- bodyclose # checks whether HTTP response body is closed successfully
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ Rosmar supports:

## 1. Building and Using It

Rosmar requires an updated version of sg-bucket -- this is on the `feature/walrus-xattrs` branch. Rosmar's `go.mod` file points to the appropriate commit.

To use Rosmar in Sync Gateway, check out the latter's `feature/walrus_xattrs` branch, in which Walrus has been replaced with Rosmar.

To run SG normally with a Rosmar bucket, use a non-persistent SG config file like this one:

```json
Expand Down Expand Up @@ -126,6 +122,10 @@ The special URL `rosmar:/?mode=memory` opens an ephemeral in-memory database. Do

> Note: The directory contains the SQLite database file `rosmar.sqlite` plus SQLite side files. But its contents should be considered opaque.

### Bucket Persistence

For in memory buckets, closing a bucket does not delete the bucket from memory, representing how Couchbase Server would not delete the bucket. In order to delete the bucket from memory, call `Bucket.CloseAndDelete`.

### Metadata Purging

Rosmar does not purge tombstones (deleted documents) automatically; call the method `Bucket.PurgeTombstones()`.
Expand Down
82 changes: 0 additions & 82 deletions bucket-registry.go

This file was deleted.

108 changes: 68 additions & 40 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
package rosmar

import (
"context"
"database/sql"
_ "embed"
"errors"
"fmt"
"io/fs"
"net/url"
"os"
"path"
"path/filepath"
"runtime"
"strings"
Expand All @@ -32,15 +32,17 @@ import (
// Rosmar implementation of a collection-aware bucket.
// Implements sgbucket interfaces BucketStore, DynamicDataStoreBucket, DeletableStore, MutationFeedStore2.
type Bucket struct {
url string // Filesystem path or other URL
name string // Bucket name
collections collectionsMap // Collections, indexed by DataStoreName
mutex sync.Mutex // mutex for synchronized access to Bucket
sqliteDB *sql.DB // SQLite database handle (do not access; call db() instead)
expTimer *time.Timer // Schedules expiration of docs
nextExp uint32 // Timestamp when expTimer will run (0 if never)
serial uint32 // Serial number for logging
inMemory bool // True if it's an in-memory database
url string // Filesystem path or other URL
name string // Bucket name
collections collectionsMap // Collections, indexed by DataStoreName
collectionFeeds map[sgbucket.DataStoreNameImpl][]*dcpFeed
mutex *sync.Mutex // mutex for synchronized access to Bucket
sqliteDB *sql.DB // SQLite database handle (do not access; call db() instead)
expTimer *time.Timer // Schedules expiration of docs
nextExp *uint32 // Timestamp when expTimer will run (0 if never)
serial uint32 // Serial number for logging
inMemory bool // True if it's an in-memory database
closed bool // represents state when it is closed
}

type collectionsMap = map[sgbucket.DataStoreNameImpl]*Collection
Expand Down Expand Up @@ -76,33 +78,41 @@ const (
ReOpenExisting // Open an existing bucket, or fail if none exists.
)

// OpenBucketFromPath opens a bucket from a filesystem path. See OpenBucket for details.
func OpenBucketFromPath(path string, mode OpenMode) (*Bucket, error) {
return OpenBucket(uriFromPath(path), mode)
}

// Creates a new bucket, or opens an existing one.
//
// The URL should have the scheme 'rosmar' or 'file' and a filesystem path.
// The path represents a directory; the SQLite database files will be created inside it.
// Alternatively, the `InMemoryURL` can be given, to create an in-memory bucket with no file.
func OpenBucket(urlStr string, mode OpenMode) (bucket *Bucket, err error) {
func OpenBucket(urlStr string, bucketName string, mode OpenMode) (b *Bucket, err error) {
traceEnter("OpenBucket", "%q, %d", urlStr, mode)
defer func() { traceExit("OpenBucket", err, "ok") }()

u, err := encodeDBURL(urlStr)
if err != nil {
return nil, err
}
urlStr = u.String()

bucket := getCachedBucket(bucketName)
if bucket != nil {
if mode == CreateNew {
return nil, fs.ErrExist
}
if urlStr != bucket.url {
return nil, fmt.Errorf("bucket %q already exists at %q, will not open at %q", bucketName, bucket.url, urlStr)
}
registerBucket(bucket)
return bucket, nil

}

query := u.Query()
inMemory := query.Get("mode") == "memory"
var bucketName string
if inMemory {
if mode == ReOpenExisting {
return nil, fs.ErrNotExist
}
bucketName = "memory"
u = u.JoinPath(bucketName)
} else {
dir := u.Path
if runtime.GOOS == "windows" {
Expand All @@ -124,7 +134,6 @@ func OpenBucket(urlStr string, mode OpenMode) (bucket *Bucket, err error) {

query.Set("mode", ifelse(mode == ReOpenExisting, "rw", "rwc"))
u = u.JoinPath(kDBFilename)
bucketName = path.Base(dir)
}

// See https://github.com/mattn/go-sqlite3#connection-string
Expand Down Expand Up @@ -160,12 +169,20 @@ func OpenBucket(urlStr string, mode OpenMode) (bucket *Bucket, err error) {
db.SetMaxOpenConns(ifelse(inMemory, 1, kMaxOpenConnections))

bucket = &Bucket{
url: urlStr,
sqliteDB: db,
collections: make(map[sgbucket.DataStoreNameImpl]*Collection),
inMemory: inMemory,
serial: serial,
url: urlStr,
sqliteDB: db,
collections: make(map[sgbucket.DataStoreNameImpl]*Collection),
collectionFeeds: make(map[sgbucket.DataStoreNameImpl][]*dcpFeed),
mutex: &sync.Mutex{},
nextExp: new(uint32),
inMemory: inMemory,
serial: serial,
}
defer func() {
if err != nil {
_ = bucket.CloseAndDelete(context.TODO())
}
}()

// Initialize the schema if necessary:
var vers int
Expand All @@ -180,6 +197,10 @@ func OpenBucket(urlStr string, mode OpenMode) (bucket *Bucket, err error) {
} else {
bucket.scheduleExpiration()
}
err = bucket.setName(bucketName)
if err != nil {
return nil, err
}

registerBucket(bucket)
return bucket, err
Expand All @@ -190,18 +211,14 @@ func OpenBucket(urlStr string, mode OpenMode) (bucket *Bucket, err error) {
// bucket name. The bucket will be opened in a subdirectory of the directory URL.
func OpenBucketIn(dirUrlStr string, bucketName string, mode OpenMode) (*Bucket, error) {
if isInMemoryURL(dirUrlStr) {
bucket, err := OpenBucket(dirUrlStr, mode)
if err == nil {
err = bucket.SetName(bucketName)
}
return bucket, err
return OpenBucket(dirUrlStr, bucketName, mode)
}

u, err := parseDBFileURL(dirUrlStr)
if err != nil {
return nil, err
}
return OpenBucket(u.JoinPath(bucketName).String(), mode)
return OpenBucket(u.JoinPath(bucketName).String(), bucketName, mode)
}

// Deletes the bucket at the given URL, i.e. the filesystem directory at its path, if it exists.
Expand All @@ -221,10 +238,6 @@ func DeleteBucketAt(urlStr string) (err error) {
return nil
}

if len(bucketsAtURL(u.String())) > 0 {
return fmt.Errorf("there is a Bucket open at that URL")
}

// For safety's sake, don't delete just any directory. Ensure it contains a db file:
dir := u.Path
if runtime.GOOS == "windows" {
Expand Down Expand Up @@ -288,8 +301,7 @@ func (bucket *Bucket) initializeSchema(bucketName string) (err error) {
sql.Named("COLL", sgbucket.DefaultCollection),
)
if err != nil {
_ = bucket.CloseAndDelete()
panic("Rosmar SQL schema is invalid: " + err.Error())
return fmt.Errorf("Rosmar SQL schema is invalid: %w", err)
}
bucket.name = bucketName
return
Expand All @@ -306,11 +318,10 @@ func (bucket *Bucket) db() queryable {

// Returns the database handle as a `queryable` interface value. This is the same as `db()` without locking. This is not safe to call without bucket.mutex being locked the caller.
func (bucket *Bucket) _db() queryable {
if db := bucket.sqliteDB; db != nil {
return db
} else {
if bucket.closed {
return closedDB{}
}
return bucket.sqliteDB
}

// Runs a function within a SQLite transaction.
Expand All @@ -322,7 +333,7 @@ func (bucket *Bucket) inTransaction(fn func(txn *sql.Tx) error) error {
bucket.mutex.Lock()
defer bucket.mutex.Unlock()

if bucket.sqliteDB == nil {
if bucket.closed {
return ErrBucketClosed
}

Expand Down Expand Up @@ -360,6 +371,23 @@ func (bucket *Bucket) inTransaction(fn func(txn *sql.Tx) error) error {
return remapError(err)
}

// Make a copy of the bucket, but it holds the same underlying structures.
func (b *Bucket) copy() *Bucket {
r := &Bucket{
url: b.url,
name: b.name,
collectionFeeds: b.collectionFeeds,
collections: make(collectionsMap),
mutex: b.mutex,
sqliteDB: b.sqliteDB,
expTimer: b.expTimer,
nextExp: b.nextExp,
serial: b.serial,
inMemory: b.inMemory,
}
return r
}

// uriFromPath converts a file path to a rosmar URI. On windows, these need to have forward slashes and drive letters will have an extra /, such as romsar://c:/foo/bar.
func uriFromPath(path string) string {
uri := "rosmar://"
Expand Down
Loading

0 comments on commit adb4806

Please sign in to comment.