-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CBG-3271 make sure expiration works on a closed bucket #17
Changes from 6 commits
0db83cb
ed11962
5539ee5
5090510
d5e2377
515e69f
30107c5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,13 +36,12 @@ type Bucket struct { | |
name string // Bucket name | ||
collections collectionsMap // Collections, indexed by DataStoreName | ||
collectionFeeds map[sgbucket.DataStoreNameImpl][]*dcpFeed | ||
mutex *sync.Mutex // mutex for synchronized access to Bucket | ||
sqliteDB *sql.DB // SQLite database handle (do not access; call db() instead) | ||
expTimer *time.Timer // Schedules expiration of docs | ||
nextExp *uint32 // Timestamp when expTimer will run (0 if never) | ||
serial uint32 // Serial number for logging | ||
inMemory bool // True if it's an in-memory database | ||
closed bool // represents state when it is closed | ||
mutex *sync.Mutex // mutex for synchronized access to Bucket | ||
sqliteDB *sql.DB // SQLite database handle (do not access; call db() instead) | ||
expManager *expiryManager // expiration manager for bucket | ||
serial uint32 // Serial number for logging | ||
inMemory bool // True if it's an in-memory database | ||
closed bool // represents state when it is closed | ||
} | ||
|
||
type collectionsMap = map[sgbucket.DataStoreNameImpl]*Collection | ||
|
@@ -87,6 +86,7 @@ func OpenBucket(urlStr string, bucketName string, mode OpenMode) (b *Bucket, err | |
traceEnter("OpenBucket", "%q, %d", urlStr, mode) | ||
defer func() { traceExit("OpenBucket", err, "ok") }() | ||
|
||
ctx := context.TODO() | ||
u, err := encodeDBURL(urlStr) | ||
if err != nil { | ||
return nil, err | ||
|
@@ -95,13 +95,18 @@ func OpenBucket(urlStr string, bucketName string, mode OpenMode) (b *Bucket, err | |
|
||
bucket := getCachedBucket(bucketName) | ||
if bucket != nil { | ||
defer func() { | ||
if err != nil { | ||
bucket.Close(ctx) | ||
} | ||
}() | ||
|
||
if mode == CreateNew { | ||
return nil, fs.ErrExist | ||
} | ||
if urlStr != bucket.url { | ||
return nil, fmt.Errorf("bucket %q already exists at %q, will not open at %q", bucketName, bucket.url, urlStr) | ||
} | ||
registerBucket(bucket) | ||
return bucket, nil | ||
|
||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does the handling on line 117 to not support ReOpenExisting for inMemory buckets still make sense with the bucket registry/cluster? |
||
|
@@ -174,13 +179,13 @@ func OpenBucket(urlStr string, bucketName string, mode OpenMode) (b *Bucket, err | |
collections: make(map[sgbucket.DataStoreNameImpl]*Collection), | ||
collectionFeeds: make(map[sgbucket.DataStoreNameImpl][]*dcpFeed), | ||
mutex: &sync.Mutex{}, | ||
nextExp: new(uint32), | ||
inMemory: inMemory, | ||
serial: serial, | ||
} | ||
bucket.expManager = newExpirationManager(bucket.doExpiration) | ||
defer func() { | ||
if err != nil { | ||
_ = bucket.CloseAndDelete(context.TODO()) | ||
_ = bucket.CloseAndDelete(ctx) | ||
} | ||
}() | ||
|
||
|
@@ -194,16 +199,23 @@ func OpenBucket(urlStr string, bucketName string, mode OpenMode) (b *Bucket, err | |
if err = bucket.initializeSchema(bucketName); err != nil { | ||
return nil, err | ||
} | ||
} else { | ||
bucket.scheduleExpiration() | ||
} | ||
err = bucket.setName(bucketName) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
registerBucket(bucket) | ||
return bucket, err | ||
exists, bucketCopy := registerBucket(bucket) | ||
// someone else beat registered the bucket in the registry, that's OK we'll close ours | ||
if exists { | ||
bucket.Close(ctx) | ||
} | ||
// only schedule expiration if bucket is not new. This doesn't need to be locked because only one bucket will execute this code. | ||
if vers != 0 { | ||
bucket._scheduleExpiration() | ||
} | ||
|
||
return bucketCopy, err | ||
} | ||
|
||
// Creates or re-opens a bucket, like OpenBucket. | ||
|
@@ -307,7 +319,7 @@ func (bucket *Bucket) initializeSchema(bucketName string) (err error) { | |
return | ||
} | ||
|
||
// Returns the database handle as a `queryable` interface value. | ||
// db returns the database handle as a `queryable` interface value. | ||
// If the bucket has been closed, it returns a special `closedDB` value that will return | ||
// ErrBucketClosed from any call. | ||
func (bucket *Bucket) db() queryable { | ||
|
@@ -316,7 +328,7 @@ func (bucket *Bucket) db() queryable { | |
return bucket._db() | ||
} | ||
|
||
// Returns the database handle as a `queryable` interface value. This is the same as `db()` without locking. This is not safe to call without bucket.mutex being locked the caller. | ||
// db returns the database handle as a `queryable` interface value. This is the same as `db()` without locking. This is not safe to call without bucket.mutex being locked the caller. | ||
func (bucket *Bucket) _db() queryable { | ||
if bucket.closed { | ||
return closedDB{} | ||
|
@@ -380,8 +392,7 @@ func (b *Bucket) copy() *Bucket { | |
collections: make(collectionsMap), | ||
mutex: b.mutex, | ||
sqliteDB: b.sqliteDB, | ||
expTimer: b.expTimer, | ||
nextExp: b.nextExp, | ||
expManager: b.expManager, | ||
serial: b.serial, | ||
inMemory: b.inMemory, | ||
} | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -35,18 +35,26 @@ func init() { | |||||
} | ||||||
} | ||||||
|
||||||
// registryBucket adds a newly opened Bucket to the registry. | ||||||
func (r *bucketRegistry) registerBucket(bucket *Bucket) { | ||||||
// registryBucket adds a newly opened Bucket to the registry. Returns true if the bucket already exists, and a copy of the bucket to use. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
func (r *bucketRegistry) registerBucket(bucket *Bucket) (bool, *Bucket) { | ||||||
name := bucket.GetName() | ||||||
debug("registerBucket %v %s at %s", bucket, name, bucket.url) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logging is done in _registerBucket, looks like you could remove it here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done. |
||||||
r.lock.Lock() | ||||||
defer r.lock.Unlock() | ||||||
return r._registerBucket(bucket) | ||||||
} | ||||||
|
||||||
// _registryBucket adds a newly opened Bucket to the registry. Returns true if the bucket already exists, and a copy of the bucket to use. | ||||||
torcolvin marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
func (r *bucketRegistry) _registerBucket(bucket *Bucket) (bool, *Bucket) { | ||||||
name := bucket.GetName() | ||||||
debug("_registerBucket %v %s at %s", bucket, name, bucket.url) | ||||||
|
||||||
_, ok := r.buckets[name] | ||||||
if !ok { | ||||||
b := bucket.copy() | ||||||
r.buckets[name] = b | ||||||
r.buckets[name] = bucket | ||||||
} | ||||||
r.bucketCount[name] += 1 | ||||||
return ok, r.buckets[name].copy() | ||||||
} | ||||||
|
||||||
// getCachedBucket returns a bucket from the registry if it exists. | ||||||
|
@@ -57,7 +65,9 @@ func (r *bucketRegistry) getCachedBucket(name string) *Bucket { | |||||
if bucket == nil { | ||||||
return nil | ||||||
} | ||||||
return bucket.copy() | ||||||
// return a copy of the bucket | ||||||
_, bucket = r._registerBucket(bucket) | ||||||
torcolvin marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
return bucket | ||||||
} | ||||||
|
||||||
// unregisterBucket removes a Bucket from the registry. Must be called before closing. | ||||||
|
@@ -116,8 +126,13 @@ func getCachedBucket(name string) *Bucket { | |||||
return cluster.getCachedBucket(name) | ||||||
} | ||||||
|
||||||
// registryBucket adds a newly opened Bucket to the registry. | ||||||
func registerBucket(bucket *Bucket) { | ||||||
// registryBucket adds a copy of a Bucket to the registry. Returns true if the bucket already exists. | ||||||
func registerBucket(bucket *Bucket) (bool, *Bucket) { | ||||||
return cluster.registerBucket(bucket) | ||||||
} | ||||||
|
||||||
// registryNewBucket adds a newly opened Bucket to the registry. | ||||||
func registerNewBucket(bucket *Bucket) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This doesn't look like it's needed any longer? |
||||||
cluster.registerBucket(bucket) | ||||||
} | ||||||
|
||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think we should pass OpenMode to getCachedBucket to avoid some of the open-then-close we're doing?