Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-3271 allow in memory buckets to persist #16

Merged
merged 5 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,14 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-go@v4
with:
go-version: 1.21.3
cache: false
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
with:
version: v1.52.0
args: --timeout 3m # this is slow only in github actions
version: v1.55.0

test-race:
runs-on: ubuntu-latest
Expand Down
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

# config file for golangci-lint

run:
timeout: 3m

linters:
enable:
#- bodyclose # checks whether HTTP response body is closed successfully
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ Rosmar supports:

## 1. Building and Using It

Rosmar requires an updated version of sg-bucket -- this is on the `feature/walrus-xattrs` branch. Rosmar's `go.mod` file points to the appropriate commit.

To use Rosmar in Sync Gateway, check out the latter's `feature/walrus_xattrs` branch, in which Walrus has been replaced with Rosmar.

To run SG normally with a Rosmar bucket, use a non-persistent SG config file like this one:

```json
Expand Down Expand Up @@ -126,6 +122,10 @@ The special URL `rosmar:/?mode=memory` opens an ephemeral in-memory database. Do

> Note: The directory contains the SQLite database file `rosmar.sqlite` plus SQLite side files. But its contents should be considered opaque.

### Bucket Persistence

For in memory buckets, closing a bucket does not delete the bucket from memory, representing how Couchbase Server would not delete the bucket. In order to delete the bucket from memory, call `Bucket.CloseAndDelete`.

### Metadata Purging

Rosmar does not purge tombstones (deleted documents) automatically; call the method `Bucket.PurgeTombstones()`.
Expand Down
82 changes: 0 additions & 82 deletions bucket-registry.go

This file was deleted.

108 changes: 68 additions & 40 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
package rosmar

import (
"context"
"database/sql"
_ "embed"
"errors"
"fmt"
"io/fs"
"net/url"
"os"
"path"
"path/filepath"
"runtime"
"strings"
Expand All @@ -32,15 +32,17 @@ import (
// Rosmar implementation of a collection-aware bucket.
// Implements sgbucket interfaces BucketStore, DynamicDataStoreBucket, DeletableStore, MutationFeedStore2.
type Bucket struct {
url string // Filesystem path or other URL
name string // Bucket name
collections collectionsMap // Collections, indexed by DataStoreName
mutex sync.Mutex // mutex for synchronized access to Bucket
sqliteDB *sql.DB // SQLite database handle (do not access; call db() instead)
expTimer *time.Timer // Schedules expiration of docs
nextExp uint32 // Timestamp when expTimer will run (0 if never)
serial uint32 // Serial number for logging
inMemory bool // True if it's an in-memory database
url string // Filesystem path or other URL
name string // Bucket name
collections collectionsMap // Collections, indexed by DataStoreName
collectionFeeds map[sgbucket.DataStoreNameImpl][]*dcpFeed
mutex *sync.Mutex // mutex for synchronized access to Bucket
sqliteDB *sql.DB // SQLite database handle (do not access; call db() instead)
expTimer *time.Timer // Schedules expiration of docs
nextExp *uint32 // Timestamp when expTimer will run (0 if never)
serial uint32 // Serial number for logging
inMemory bool // True if it's an in-memory database
closed bool // represents state when it is closed
}

type collectionsMap = map[sgbucket.DataStoreNameImpl]*Collection
Expand Down Expand Up @@ -76,33 +78,41 @@ const (
ReOpenExisting // Open an existing bucket, or fail if none exists.
)

// OpenBucketFromPath opens a bucket from a filesystem path. See OpenBucket for details.
func OpenBucketFromPath(path string, mode OpenMode) (*Bucket, error) {
return OpenBucket(uriFromPath(path), mode)
}

// Creates a new bucket, or opens an existing one.
//
// The URL should have the scheme 'rosmar' or 'file' and a filesystem path.
// The path represents a directory; the SQLite database files will be created inside it.
// Alternatively, the `InMemoryURL` can be given, to create an in-memory bucket with no file.
func OpenBucket(urlStr string, mode OpenMode) (bucket *Bucket, err error) {
func OpenBucket(urlStr string, bucketName string, mode OpenMode) (b *Bucket, err error) {
traceEnter("OpenBucket", "%q, %d", urlStr, mode)
defer func() { traceExit("OpenBucket", err, "ok") }()

u, err := encodeDBURL(urlStr)
if err != nil {
return nil, err
}
urlStr = u.String()

bucket := getCachedBucket(bucketName)
if bucket != nil {
if mode == CreateNew {
return nil, fs.ErrExist
}
if urlStr != bucket.url {
return nil, fmt.Errorf("bucket %q already exists at %q, will not open at %q", bucketName, bucket.url, urlStr)
}
registerBucket(bucket)
return bucket, nil

}

query := u.Query()
inMemory := query.Get("mode") == "memory"
var bucketName string
if inMemory {
if mode == ReOpenExisting {
return nil, fs.ErrNotExist
}
bucketName = "memory"
u = u.JoinPath(bucketName)
} else {
dir := u.Path
if runtime.GOOS == "windows" {
Expand All @@ -124,7 +134,6 @@ func OpenBucket(urlStr string, mode OpenMode) (bucket *Bucket, err error) {

query.Set("mode", ifelse(mode == ReOpenExisting, "rw", "rwc"))
u = u.JoinPath(kDBFilename)
bucketName = path.Base(dir)
}

// See https://github.com/mattn/go-sqlite3#connection-string
Expand Down Expand Up @@ -160,12 +169,20 @@ func OpenBucket(urlStr string, mode OpenMode) (bucket *Bucket, err error) {
db.SetMaxOpenConns(ifelse(inMemory, 1, kMaxOpenConnections))

bucket = &Bucket{
url: urlStr,
sqliteDB: db,
collections: make(map[sgbucket.DataStoreNameImpl]*Collection),
inMemory: inMemory,
serial: serial,
url: urlStr,
sqliteDB: db,
collections: make(map[sgbucket.DataStoreNameImpl]*Collection),
collectionFeeds: make(map[sgbucket.DataStoreNameImpl][]*dcpFeed),
mutex: &sync.Mutex{},
nextExp: new(uint32),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with this usage - what's the advantage of setting new(uint32) instead of leaving nil?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just initializing a an int pointer to 0, is there a more canonical way without writing a function?

inMemory: inMemory,
serial: serial,
}
defer func() {
if err != nil {
_ = bucket.CloseAndDelete(context.TODO())
}
}()

// Initialize the schema if necessary:
var vers int
Expand All @@ -180,6 +197,10 @@ func OpenBucket(urlStr string, mode OpenMode) (bucket *Bucket, err error) {
} else {
bucket.scheduleExpiration()
}
err = bucket.setName(bucketName)
if err != nil {
return nil, err
}

registerBucket(bucket)
return bucket, err
Expand All @@ -190,18 +211,14 @@ func OpenBucket(urlStr string, mode OpenMode) (bucket *Bucket, err error) {
// bucket name. The bucket will be opened in a subdirectory of the directory URL.
func OpenBucketIn(dirUrlStr string, bucketName string, mode OpenMode) (*Bucket, error) {
if isInMemoryURL(dirUrlStr) {
bucket, err := OpenBucket(dirUrlStr, mode)
if err == nil {
err = bucket.SetName(bucketName)
}
return bucket, err
return OpenBucket(dirUrlStr, bucketName, mode)
}

u, err := parseDBFileURL(dirUrlStr)
if err != nil {
return nil, err
}
return OpenBucket(u.JoinPath(bucketName).String(), mode)
return OpenBucket(u.JoinPath(bucketName).String(), bucketName, mode)
}

// Deletes the bucket at the given URL, i.e. the filesystem directory at its path, if it exists.
Expand All @@ -221,10 +238,6 @@ func DeleteBucketAt(urlStr string) (err error) {
return nil
}

if len(bucketsAtURL(u.String())) > 0 {
return fmt.Errorf("there is a Bucket open at that URL")
}

// For safety's sake, don't delete just any directory. Ensure it contains a db file:
dir := u.Path
if runtime.GOOS == "windows" {
Expand Down Expand Up @@ -288,8 +301,7 @@ func (bucket *Bucket) initializeSchema(bucketName string) (err error) {
sql.Named("COLL", sgbucket.DefaultCollection),
)
if err != nil {
_ = bucket.CloseAndDelete()
panic("Rosmar SQL schema is invalid: " + err.Error())
return fmt.Errorf("Rosmar SQL schema is invalid: %w", err)
}
bucket.name = bucketName
return
Expand All @@ -306,11 +318,10 @@ func (bucket *Bucket) db() queryable {

// Returns the database handle as a `queryable` interface value. This is the same as `db()` without locking. This is not safe to call without bucket.mutex being locked the caller.
func (bucket *Bucket) _db() queryable {
if db := bucket.sqliteDB; db != nil {
return db
} else {
if bucket.closed {
return closedDB{}
}
return bucket.sqliteDB
}

// Runs a function within a SQLite transaction.
Expand All @@ -322,7 +333,7 @@ func (bucket *Bucket) inTransaction(fn func(txn *sql.Tx) error) error {
bucket.mutex.Lock()
defer bucket.mutex.Unlock()

if bucket.sqliteDB == nil {
if bucket.closed {
return ErrBucketClosed
}

Expand Down Expand Up @@ -360,6 +371,23 @@ func (bucket *Bucket) inTransaction(fn func(txn *sql.Tx) error) error {
return remapError(err)
}

// Make a copy of the bucket, but it holds the same underlying structures.
func (b *Bucket) copy() *Bucket {
r := &Bucket{
url: b.url,
name: b.name,
collectionFeeds: b.collectionFeeds,
collections: make(collectionsMap),
mutex: b.mutex,
sqliteDB: b.sqliteDB,
expTimer: b.expTimer,
nextExp: b.nextExp,
serial: b.serial,
inMemory: b.inMemory,
}
return r
}

// uriFromPath converts a file path to a rosmar URI. On windows, these need to have forward slashes and drive letters will have an extra /, such as romsar://c:/foo/bar.
func uriFromPath(path string) string {
uri := "rosmar://"
Expand Down
Loading