From 87c40f153a0eb191e0008c7c070f3434d755cc02 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Fri, 15 Dec 2023 11:50:11 -0500 Subject: [PATCH] CBG-3627 implement hlc - there is more locking than seems necessary here, but I don't think this will be in contention, especially becuase these are inside SQL transactions that are happening one at a time - each bucket maintains it's own HLC, as it is stored inside the bucket db, and so needs to be re-initialized from the last possible clock that was serialized. --- bucket.go | 4 ++ collection.go | 22 ++++++---- collection_test.go | 29 ++++++------- feeds_test.go | 31 ++++++++------ hlc.go | 59 ++++++++++++++++++++++++++ hlc_test.go | 100 +++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 211 insertions(+), 34 deletions(-) create mode 100644 hlc.go create mode 100644 hlc_test.go diff --git a/bucket.go b/bucket.go index 2e5d90b..9d39cd4 100644 --- a/bucket.go +++ b/bucket.go @@ -42,6 +42,7 @@ type Bucket struct { serial uint32 // Serial number for logging inMemory bool // True if it's an in-memory database closed bool // represents state when it is closed + hlc *hybridLogicalClock } type collectionsMap = map[sgbucket.DataStoreNameImpl]*Collection @@ -195,6 +196,8 @@ func OpenBucket(urlStr string, bucketName string, mode OpenMode) (b *Bucket, err return nil, err } + bucket.hlc = NewHybridLogicalClock(bucket.getLastTimestamp()) + exists, bucketCopy := registerBucket(bucket) // someone else beat registered the bucket in the registry, that's OK we'll close ours if exists { @@ -385,6 +388,7 @@ func (b *Bucket) copy() *Bucket { expManager: b.expManager, serial: b.serial, inMemory: b.inMemory, + hlc: b.hlc, } return r } diff --git a/collection.go b/collection.go index 26f4241..36125a1 100644 --- a/collection.go +++ b/collection.go @@ -583,6 +583,14 @@ func (c *Collection) getLastCas(q queryable) (cas CAS, err error) { return } +// getLastTimestamp returns the last timestamp assigned to any doc in bucket. Returns 0 in the case of no cas values assigned. +func (bucket *Bucket) getLastTimestamp() timestamp { + var lastTimestamp timestamp + row := bucket.db().QueryRow("SELECT lastCas FROM bucket") + _ = scan(row, &lastTimestamp) + return timestamp(lastTimestamp) +} + // Updates the collection's and the bucket's lastCas. func (c *Collection) setLastCas(txn *sql.Tx, cas CAS) (err error) { _, err = txn.Exec(`UPDATE bucket SET lastCas=?1`, cas) @@ -597,15 +605,13 @@ func (c *Collection) setLastCas(txn *sql.Tx, cas CAS) (err error) { func (c *Collection) withNewCas(fn func(txn *sql.Tx, newCas CAS) (*event, error)) error { var e *event err := c.bucket.inTransaction(func(txn *sql.Tx) error { - newCas, err := c.bucket.getLastCas(txn) - if err == nil { - newCas++ - e, err = fn(txn, newCas) - if err == nil { - err = c.setLastCas(txn, newCas) - } + newCas := uint64(c.bucket.hlc.Now()) + var err error + e, err = fn(txn, newCas) + if err != nil { + return err } - return err + return c.setLastCas(txn, newCas) }) if err == nil && e != nil { c.postNewEvent(e) diff --git a/collection_test.go b/collection_test.go index a60c126..bc62207 100644 --- a/collection_test.go +++ b/collection_test.go @@ -182,7 +182,7 @@ func TestEvalSubdocPaths(t *testing.T) { assert.Error(t, err) } -func initSubDocTest(t *testing.T) sgbucket.DataStore { +func initSubDocTest(t *testing.T) (CAS, sgbucket.DataStore) { ensureNoLeaks(t) coll := makeTestBucket(t).DefaultDataStore() @@ -199,41 +199,42 @@ func initSubDocTest(t *testing.T) sgbucket.DataStore { var fullDoc map[string]any cas, err := coll.Get("key", &fullDoc) assert.NoError(t, err) - assert.Equal(t, CAS(1), cas) + assert.Greater(t, cas, CAS(0)) - return coll + return cas, coll } func TestWriteSubDoc(t *testing.T) { ctx := testCtx(t) - coll := initSubDocTest(t) + initialCas, coll := initSubDocTest(t) // update json rawJson := []byte(`"was here"`) // test update using incorrect cas value - cas, err := coll.WriteSubDoc(ctx, "key", "rosmar", 10, rawJson) + cas1, err := coll.WriteSubDoc(ctx, "key", "rosmar", 10, rawJson) assert.Error(t, err) + assert.Equal(t, CAS(0), cas1) // test update using correct cas value - cas, err = coll.WriteSubDoc(ctx, "key", "rosmar", cas, rawJson) + cas2, err := coll.WriteSubDoc(ctx, "key", "rosmar", initialCas, rawJson) assert.NoError(t, err) - assert.Equal(t, CAS(2), cas) + assert.Greater(t, cas2, initialCas) var fullDoc map[string]any - cas, err = coll.Get("key", &fullDoc) + cas2Get, err := coll.Get("key", &fullDoc) assert.NoError(t, err) - assert.Equal(t, CAS(2), cas) + assert.Equal(t, cas2, cas2Get) assert.EqualValues(t, map[string]any{"rosmar": "was here"}, fullDoc) // test update using 0 cas value - cas, err = coll.WriteSubDoc(ctx, "key", "rosmar", 0, rawJson) + cas3, err := coll.WriteSubDoc(ctx, "key", "rosmar", 0, rawJson) assert.NoError(t, err) - assert.Equal(t, CAS(3), cas) + assert.Greater(t, cas3, cas2) } func TestInsertSubDoc(t *testing.T) { ctx := testCtx(t) - coll := initSubDocTest(t) + initialCas, coll := initSubDocTest(t) rosmarMap := map[string]any{"foo": "lol", "bar": "baz"} expectedDoc := map[string]any{"rosmar": rosmarMap} @@ -243,13 +244,13 @@ func TestInsertSubDoc(t *testing.T) { assert.Error(t, err) // test update - err = coll.SubdocInsert(ctx, "key", "rosmar.kilroy", CAS(1), "was here") + err = coll.SubdocInsert(ctx, "key", "rosmar.kilroy", initialCas, "was here") assert.NoError(t, err) var fullDoc map[string]any cas, err := coll.Get("key", &fullDoc) assert.NoError(t, err) - assert.Equal(t, CAS(2), cas) + assert.Greater(t, cas, initialCas) rosmarMap["kilroy"] = "was here" assert.EqualValues(t, expectedDoc, fullDoc) diff --git a/feeds_test.go b/feeds_test.go index e5f7e7e..a4219ed 100644 --- a/feeds_test.go +++ b/feeds_test.go @@ -68,12 +68,12 @@ func TestMutations(t *testing.T) { require.NoError(t, err) }() - readExpectedEventsDEF(t, events, 4) + readExpectedEventsDEF(t, events) // Read the mutation of "eskimo": e := <-events e.TimeReceived = time.Time{} - assert.Equal(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpDeletion, Key: []byte("eskimo"), Cas: 7, DataType: sgbucket.FeedDataTypeRaw}, e) + assertEventEquals(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpDeletion, Key: []byte("eskimo"), DataType: sgbucket.FeedDataTypeRaw}, e) require.NoError(t, bucket.CloseAndDelete(testCtx(t))) @@ -130,7 +130,7 @@ func TestCheckpoint(t *testing.T) { e := <-events assert.Equal(t, "Checkpoint:myID", string(e.Key)) - readExpectedEventsDEF(t, events, 5) + readExpectedEventsDEF(t, events) event = <-events assert.Equal(t, sgbucket.FeedOpEndBackfill, event.Opcode) @@ -157,28 +157,35 @@ func startFeedWithArgs(t *testing.T, bucket *Bucket, args sgbucket.FeedArguments return events, args.DoneChan } +func assertEventEquals(t *testing.T, expected sgbucket.FeedEvent, actual sgbucket.FeedEvent) { + assert.Equal(t, expected.Opcode, actual.Opcode) + assert.Equal(t, expected.Key, actual.Key) + assert.Equal(t, expected.Value, actual.Value) + assert.Equal(t, expected.DataType, actual.DataType) +} + func readExpectedEventsABC(t *testing.T, events chan sgbucket.FeedEvent) { e := <-events e.TimeReceived = time.Time{} - assert.Equal(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpMutation, Key: []byte("able"), Value: []byte(`"A"`), Cas: 1, DataType: sgbucket.FeedDataTypeJSON}, e) + assertEventEquals(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpMutation, Key: []byte("able"), Value: []byte(`"A"`), DataType: sgbucket.FeedDataTypeJSON}, e) e = <-events e.TimeReceived = time.Time{} - assert.Equal(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpMutation, Key: []byte("baker"), Value: []byte(`"B"`), Cas: 2, DataType: sgbucket.FeedDataTypeJSON}, e) + assertEventEquals(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpMutation, Key: []byte("baker"), Value: []byte(`"B"`), DataType: sgbucket.FeedDataTypeJSON}, e) e = <-events e.TimeReceived = time.Time{} - assert.Equal(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpMutation, Key: []byte("charlie"), Value: []byte(`"C"`), Cas: 3, DataType: sgbucket.FeedDataTypeJSON}, e) + assertEventEquals(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpMutation, Key: []byte("charlie"), Value: []byte(`"C"`), DataType: sgbucket.FeedDataTypeJSON}, e) } -func readExpectedEventsDEF(t *testing.T, events chan sgbucket.FeedEvent, cas CAS) { +func readExpectedEventsDEF(t *testing.T, events chan sgbucket.FeedEvent) { e := <-events e.TimeReceived = time.Time{} - assert.Equal(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpMutation, Key: []byte("delta"), Value: []byte(`"D"`), Cas: cas, DataType: sgbucket.FeedDataTypeJSON}, e) + assertEventEquals(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpMutation, Key: []byte("delta"), Value: []byte(`"D"`), DataType: sgbucket.FeedDataTypeJSON}, e) e = <-events e.TimeReceived = time.Time{} - assert.Equal(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpMutation, Key: []byte("eskimo"), Value: []byte(`"E"`), Cas: cas + 1, DataType: sgbucket.FeedDataTypeJSON}, e) + assertEventEquals(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpMutation, Key: []byte("eskimo"), Value: []byte(`"E"`), DataType: sgbucket.FeedDataTypeJSON}, e) e = <-events e.TimeReceived = time.Time{} - assert.Equal(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpMutation, Key: []byte("fahrvergnügen"), Value: []byte(`"F"`), Cas: cas + 2, DataType: sgbucket.FeedDataTypeJSON}, e) + assertEventEquals(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpMutation, Key: []byte("fahrvergnügen"), Value: []byte(`"F"`), DataType: sgbucket.FeedDataTypeJSON}, e) } func TestCrossBucketEvents(t *testing.T) { @@ -209,8 +216,8 @@ func TestCrossBucketEvents(t *testing.T) { require.NoError(t, err) }() - readExpectedEventsDEF(t, events, 4) - readExpectedEventsDEF(t, events2, 4) + readExpectedEventsDEF(t, events) + readExpectedEventsDEF(t, events2) bucket.Close(testCtx(t)) require.NoError(t, bucket2.CloseAndDelete(testCtx(t))) diff --git a/hlc.go b/hlc.go new file mode 100644 index 0000000..221fc91 --- /dev/null +++ b/hlc.go @@ -0,0 +1,59 @@ +// Copyright 2023-Present Couchbase, Inc. +// +// Use of this software is governed by the Business Source License included +// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified +// in that file, in accordance with the Business Source License, use of this +// software will be governed by the Apache License, Version 2.0, included in +// the file licenses/APL2.txt. + +package rosmar + +import ( + "sync" + "time" +) + +type timestamp uint64 + +// hybridLogicalClock is a hybrid logical clock implementation for rosmar that produces timestamps that will always be increasing regardless of clock changes. +type hybridLogicalClock struct { + clock clock + counter timestamp + highestTime timestamp + mutex sync.Mutex +} + +// clock interface is used to abstract the system clock for testing purposes. +type clock interface { + // getTime returns the current time in nanoseconds. + getTime() timestamp +} + +type systemClock struct{} + +// getTime returns the current time in nanoseconds. +func (c *systemClock) getTime() timestamp { + return timestamp(time.Now().UnixNano()) +} + +// NewHybridLogicalClock returns a new HLC from a previously initialized time. +func NewHybridLogicalClock(lastTime timestamp) *hybridLogicalClock { + return &hybridLogicalClock{ + highestTime: lastTime, + clock: &systemClock{}, + } +} + +// Now returns the next time represented in nanoseconds. This can be the current timestamp, or if multiple occur in the same nanosecond, an increasing timestamp. +func (c *hybridLogicalClock) Now() timestamp { + c.mutex.Lock() + defer c.mutex.Unlock() + physicalTime := c.clock.getTime() + if c.highestTime >= physicalTime { + c.counter++ + } else { + c.counter = 0 + c.highestTime = physicalTime + } + return c.highestTime + c.counter +} diff --git a/hlc_test.go b/hlc_test.go new file mode 100644 index 0000000..0597fdc --- /dev/null +++ b/hlc_test.go @@ -0,0 +1,100 @@ +// Copyright 2023-Present Couchbase, Inc. +// +// Use of this software is governed by the Business Source License included +// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified +// in that file, in accordance with the Business Source License, use of this +// software will be governed by the Apache License, Version 2.0, included in +// the file licenses/APL2.txt. + +package rosmar + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestHybridLogicalClockNow(t *testing.T) { + clock := hybridLogicalClock{clock: &systemClock{}} + timestamp1 := clock.Now() + timestamp2 := clock.Now() + require.Greater(t, timestamp2, timestamp1) +} + +func generateTimestamps(wg *sync.WaitGroup, clock *hybridLogicalClock, n int, result chan []timestamp) { + defer wg.Done() + timestamps := make([]timestamp, n) + for i := 0; i < n; i++ { + timestamps[i] = clock.Now() + } + result <- timestamps +} + +func TestHLCNowConcurrent(t *testing.T) { + clock := hybridLogicalClock{clock: &systemClock{}} + goroutines := 100 + timestampCount := 100 + + wg := sync.WaitGroup{} + results := make(chan []timestamp) + for i := 0; i < goroutines; i++ { + wg.Add(1) + go generateTimestamps(&wg, &clock, timestampCount, results) + } + + doneChan := make(chan struct{}) + go func() { + wg.Wait() + doneChan <- struct{}{} + }() + allTimestamps := make([]timestamp, 0, goroutines*timestampCount) +loop: + for { + select { + case timestamps := <-results: + allTimestamps = append(allTimestamps, timestamps...) + case <-doneChan: + break loop + } + } + uniqueTimestamps := make(map[timestamp]struct{}) + for _, timestamp := range allTimestamps { + if _, ok := uniqueTimestamps[timestamp]; ok { + t.Errorf("Timestamp %d is not unique", timestamp) + } + uniqueTimestamps[timestamp] = struct{}{} + } +} + +type fakeClock struct { + time timestamp +} + +func (c *fakeClock) getTime() timestamp { + return c.time +} + +func TestHLCReverseTime(t *testing.T) { + clock := &fakeClock{} + hlc := hybridLogicalClock{clock: clock} + require.Equal(t, timestamp(1), hlc.Now()) + require.Equal(t, timestamp(2), hlc.Now()) + + // reverse time no counter + clock.time = timestamp(0) + require.Equal(t, timestamp(3), hlc.Now()) + + // reset time to normal + clock.time = timestamp(6) + require.Equal(t, timestamp(6), hlc.Now()) + + // reverse time again + clock.time = timestamp(1) + require.Equal(t, timestamp(7), hlc.Now()) + + // jump to a value we had previously + clock.time = timestamp(6) + require.Equal(t, int(timestamp(8)), int(hlc.Now())) + require.Equal(t, int(timestamp(9)), int(hlc.Now())) +}