From 02e61e35db73073d566ecd7466585d1ff4c39fb8 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Fri, 20 Sep 2024 10:24:09 -0400 Subject: [PATCH] CBG-4252 Use shared clock for all buckets This is not required for the implementation like couchbase server spec, but this allows for easier testing of XDCR bucket to bucket since it guarantees that a document of the same name will have a unique cas regardless of the bucket it is written to. --- bucket.go | 4 +--- bucket_test.go | 6 +++++- collection.go | 2 +- hlc.go | 14 +++++++++++++ hlc_test.go | 55 ++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 76 insertions(+), 5 deletions(-) diff --git a/bucket.go b/bucket.go index 9d39cd4..0ed1edc 100644 --- a/bucket.go +++ b/bucket.go @@ -42,7 +42,6 @@ type Bucket struct { serial uint32 // Serial number for logging inMemory bool // True if it's an in-memory database closed bool // represents state when it is closed - hlc *hybridLogicalClock } type collectionsMap = map[sgbucket.DataStoreNameImpl]*Collection @@ -196,7 +195,7 @@ func OpenBucket(urlStr string, bucketName string, mode OpenMode) (b *Bucket, err return nil, err } - bucket.hlc = NewHybridLogicalClock(bucket.getLastTimestamp()) + hlc.updateLatestTime(bucket.getLastTimestamp()) exists, bucketCopy := registerBucket(bucket) // someone else beat registered the bucket in the registry, that's OK we'll close ours @@ -388,7 +387,6 @@ func (b *Bucket) copy() *Bucket { expManager: b.expManager, serial: b.serial, inMemory: b.inMemory, - hlc: b.hlc, } return r } diff --git a/bucket_test.go b/bucket_test.go index f7844be..28bec01 100644 --- a/bucket_test.go +++ b/bucket_test.go @@ -41,10 +41,14 @@ func testBucketPath(t *testing.T) string { } func makeTestBucket(t *testing.T) *Bucket { + return makeTestBucketWithName(t, strings.ToLower(t.Name())) +} + +func makeTestBucketWithName(t *testing.T, name string) *Bucket { LoggingCallback = func(level LogLevel, fmt string, args ...any) { t.Logf(logLevelNamesPrint[level]+fmt, args...) } - bucket, err := OpenBucket(uriFromPath(testBucketPath(t)), strings.ToLower(t.Name()), CreateNew) + bucket, err := OpenBucket(uriFromPath(testBucketPath(t)), name, CreateNew) require.NoError(t, err) t.Cleanup(func() { assert.NoError(t, bucket.CloseAndDelete(testCtx(t))) diff --git a/collection.go b/collection.go index 49e1366..ce81c42 100644 --- a/collection.go +++ b/collection.go @@ -591,7 +591,7 @@ func (c *Collection) setLastCas(txn *sql.Tx, cas CAS) (err error) { func (c *Collection) withNewCas(fn func(txn *sql.Tx, newCas CAS) (*event, error)) error { var e *event err := c.bucket.inTransaction(func(txn *sql.Tx) error { - newCas := uint64(c.bucket.hlc.Now()) + newCas := uint64(hlc.Now()) var err error e, err = fn(txn, newCas) if err != nil { diff --git a/hlc.go b/hlc.go index e716dbc..d3d2d5d 100644 --- a/hlc.go +++ b/hlc.go @@ -13,6 +13,12 @@ import ( "time" ) +var hlc *hybridLogicalClock + +func init() { + hlc = NewHybridLogicalClock(0) +} + type timestamp uint64 // hybridLogicalClock is a hybrid logical clock implementation for rosmar that produces timestamps that will always be increasing regardless of clock changes. @@ -43,6 +49,14 @@ func NewHybridLogicalClock(lastTime timestamp) *hybridLogicalClock { } } +func (c *hybridLogicalClock) updateLatestTime(lastTime timestamp) { + c.mutex.Lock() + defer c.mutex.Unlock() + if uint64(lastTime) > c.highestTime { + c.highestTime = uint64(lastTime) + } +} + // Now returns the next time represented in nanoseconds. This can be the current timestamp, or if multiple occur in the same nanosecond, an increasing timestamp. func (c *hybridLogicalClock) Now() timestamp { c.mutex.Lock() diff --git a/hlc_test.go b/hlc_test.go index 16369de..b75ff30 100644 --- a/hlc_test.go +++ b/hlc_test.go @@ -9,9 +9,11 @@ package rosmar import ( + "fmt" "sync" "testing" + sgbucket "github.com/couchbase/sg-bucket" "github.com/stretchr/testify/require" ) @@ -105,3 +107,56 @@ func TestHLCReverseTime(t *testing.T) { require.Equal(t, timestamp(0x3d0000), hlc.Now()) } + +func TestHLCCrossBucket(t *testing.T) { + goroutines := 10 + documentCount := 10 + + collection1 := makeTestBucketWithName(t, "bucket1").DefaultDataStore() + collection2 := makeTestBucketWithName(t, "bucket2").DefaultDataStore() + + wg := sync.WaitGroup{} + results := make(chan []uint64) + + createDocuments := func(goroutineIdx int, collection sgbucket.DataStore) { + + defer wg.Done() + casValues := make([]uint64, documentCount) + for i := 0; i < documentCount; i++ { + cas, err := collection.WriteCas(fmt.Sprintf("key_%d_%d", goroutineIdx, i), 0, 0, []byte(" World"), sgbucket.AddOnly) + require.NoError(t, err) + casValues[i] = cas + } + results <- casValues + } + for i := 0; i < goroutines; i++ { + for _, collection := range []sgbucket.DataStore{collection1, collection2} { + wg.Add(1) + go createDocuments(i, collection) + } + } + + doneChan := make(chan struct{}) + go func() { + wg.Wait() + doneChan <- struct{}{} + }() + allCas := make([]uint64, 0, goroutines*documentCount) +loop: + for { + select { + case casValues := <-results: + allCas = append(allCas, casValues...) + case <-doneChan: + break loop + } + } + uniqueCas := make(map[uint64]struct{}) + for _, cas := range allCas { + if _, ok := uniqueCas[cas]; ok { + t.Errorf("cas %d is not unique", cas) + } + uniqueCas[cas] = struct{}{} + } + +}