Skip to content

Commit

Permalink
CBG-4252 Use shared clock for all buckets
Browse files Browse the repository at this point in the history
This is not required for the implementation like couchbase server spec,
but this allows for easier testing of XDCR bucket to bucket since it
guarantees that a document of the same name will have a unique cas
regardless of the bucket it is written to.
  • Loading branch information
torcolvin committed Sep 23, 2024
1 parent 106c8a4 commit 02e61e3
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 5 deletions.
4 changes: 1 addition & 3 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ type Bucket struct {
serial uint32 // Serial number for logging
inMemory bool // True if it's an in-memory database
closed bool // represents state when it is closed
hlc *hybridLogicalClock
}

type collectionsMap = map[sgbucket.DataStoreNameImpl]*Collection
Expand Down Expand Up @@ -196,7 +195,7 @@ func OpenBucket(urlStr string, bucketName string, mode OpenMode) (b *Bucket, err
return nil, err
}

bucket.hlc = NewHybridLogicalClock(bucket.getLastTimestamp())
hlc.updateLatestTime(bucket.getLastTimestamp())

exists, bucketCopy := registerBucket(bucket)
// someone else beat registered the bucket in the registry, that's OK we'll close ours
Expand Down Expand Up @@ -388,7 +387,6 @@ func (b *Bucket) copy() *Bucket {
expManager: b.expManager,
serial: b.serial,
inMemory: b.inMemory,
hlc: b.hlc,
}
return r
}
Expand Down
6 changes: 5 additions & 1 deletion bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,14 @@ func testBucketPath(t *testing.T) string {
}

func makeTestBucket(t *testing.T) *Bucket {
return makeTestBucketWithName(t, strings.ToLower(t.Name()))
}

func makeTestBucketWithName(t *testing.T, name string) *Bucket {
LoggingCallback = func(level LogLevel, fmt string, args ...any) {
t.Logf(logLevelNamesPrint[level]+fmt, args...)
}
bucket, err := OpenBucket(uriFromPath(testBucketPath(t)), strings.ToLower(t.Name()), CreateNew)
bucket, err := OpenBucket(uriFromPath(testBucketPath(t)), name, CreateNew)
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, bucket.CloseAndDelete(testCtx(t)))
Expand Down
2 changes: 1 addition & 1 deletion collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ func (c *Collection) setLastCas(txn *sql.Tx, cas CAS) (err error) {
func (c *Collection) withNewCas(fn func(txn *sql.Tx, newCas CAS) (*event, error)) error {
var e *event
err := c.bucket.inTransaction(func(txn *sql.Tx) error {
newCas := uint64(c.bucket.hlc.Now())
newCas := uint64(hlc.Now())
var err error
e, err = fn(txn, newCas)
if err != nil {
Expand Down
14 changes: 14 additions & 0 deletions hlc.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ import (
"time"
)

var hlc *hybridLogicalClock

func init() {
hlc = NewHybridLogicalClock(0)
}

type timestamp uint64

// hybridLogicalClock is a hybrid logical clock implementation for rosmar that produces timestamps that will always be increasing regardless of clock changes.
Expand Down Expand Up @@ -43,6 +49,14 @@ func NewHybridLogicalClock(lastTime timestamp) *hybridLogicalClock {
}
}

func (c *hybridLogicalClock) updateLatestTime(lastTime timestamp) {
c.mutex.Lock()
defer c.mutex.Unlock()
if uint64(lastTime) > c.highestTime {
c.highestTime = uint64(lastTime)
}
}

// Now returns the next time represented in nanoseconds. This can be the current timestamp, or if multiple occur in the same nanosecond, an increasing timestamp.
func (c *hybridLogicalClock) Now() timestamp {
c.mutex.Lock()
Expand Down
55 changes: 55 additions & 0 deletions hlc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
package rosmar

import (
"fmt"
"sync"
"testing"

sgbucket "github.com/couchbase/sg-bucket"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -105,3 +107,56 @@ func TestHLCReverseTime(t *testing.T) {
require.Equal(t, timestamp(0x3d0000), hlc.Now())

}

func TestHLCCrossBucket(t *testing.T) {
goroutines := 10
documentCount := 10

collection1 := makeTestBucketWithName(t, "bucket1").DefaultDataStore()
collection2 := makeTestBucketWithName(t, "bucket2").DefaultDataStore()

wg := sync.WaitGroup{}
results := make(chan []uint64)

createDocuments := func(goroutineIdx int, collection sgbucket.DataStore) {

defer wg.Done()
casValues := make([]uint64, documentCount)
for i := 0; i < documentCount; i++ {
cas, err := collection.WriteCas(fmt.Sprintf("key_%d_%d", goroutineIdx, i), 0, 0, []byte(" World"), sgbucket.AddOnly)
require.NoError(t, err)
casValues[i] = cas
}
results <- casValues
}
for i := 0; i < goroutines; i++ {
for _, collection := range []sgbucket.DataStore{collection1, collection2} {
wg.Add(1)
go createDocuments(i, collection)
}
}

doneChan := make(chan struct{})
go func() {
wg.Wait()
doneChan <- struct{}{}
}()
allCas := make([]uint64, 0, goroutines*documentCount)
loop:
for {
select {
case casValues := <-results:
allCas = append(allCas, casValues...)
case <-doneChan:
break loop
}
}
uniqueCas := make(map[uint64]struct{})
for _, cas := range allCas {
if _, ok := uniqueCas[cas]; ok {
t.Errorf("cas %d is not unique", cas)
}
uniqueCas[cas] = struct{}{}
}

}

0 comments on commit 02e61e3

Please sign in to comment.