Skip to content

Commit

Permalink
CBG-3627 implement hlc
Browse files Browse the repository at this point in the history
- there is more locking than seems necessary here, but I don't think
  this will be in contention, especially becuase these are inside SQL
  transactions that are happening one at a time
- each bucket maintains it's own HLC, as it is stored inside the bucket
  db, and so needs to be re-initialized from the last possible clock
  that was serialized.
  • Loading branch information
torcolvin committed Dec 15, 2023
1 parent 3238bec commit 87c40f1
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 34 deletions.
4 changes: 4 additions & 0 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Bucket struct {
serial uint32 // Serial number for logging
inMemory bool // True if it's an in-memory database
closed bool // represents state when it is closed
hlc *hybridLogicalClock
}

type collectionsMap = map[sgbucket.DataStoreNameImpl]*Collection
Expand Down Expand Up @@ -195,6 +196,8 @@ func OpenBucket(urlStr string, bucketName string, mode OpenMode) (b *Bucket, err
return nil, err
}

bucket.hlc = NewHybridLogicalClock(bucket.getLastTimestamp())

exists, bucketCopy := registerBucket(bucket)
// someone else beat registered the bucket in the registry, that's OK we'll close ours
if exists {
Expand Down Expand Up @@ -385,6 +388,7 @@ func (b *Bucket) copy() *Bucket {
expManager: b.expManager,
serial: b.serial,
inMemory: b.inMemory,
hlc: b.hlc,
}
return r
}
Expand Down
22 changes: 14 additions & 8 deletions collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,14 @@ func (c *Collection) getLastCas(q queryable) (cas CAS, err error) {
return
}

// getLastTimestamp returns the last timestamp assigned to any doc in bucket. Returns 0 in the case of no cas values assigned.
func (bucket *Bucket) getLastTimestamp() timestamp {
var lastTimestamp timestamp
row := bucket.db().QueryRow("SELECT lastCas FROM bucket")
_ = scan(row, &lastTimestamp)
return timestamp(lastTimestamp)
}

// Updates the collection's and the bucket's lastCas.
func (c *Collection) setLastCas(txn *sql.Tx, cas CAS) (err error) {
_, err = txn.Exec(`UPDATE bucket SET lastCas=?1`, cas)
Expand All @@ -597,15 +605,13 @@ func (c *Collection) setLastCas(txn *sql.Tx, cas CAS) (err error) {
func (c *Collection) withNewCas(fn func(txn *sql.Tx, newCas CAS) (*event, error)) error {
var e *event
err := c.bucket.inTransaction(func(txn *sql.Tx) error {
newCas, err := c.bucket.getLastCas(txn)
if err == nil {
newCas++
e, err = fn(txn, newCas)
if err == nil {
err = c.setLastCas(txn, newCas)
}
newCas := uint64(c.bucket.hlc.Now())
var err error
e, err = fn(txn, newCas)
if err != nil {
return err
}
return err
return c.setLastCas(txn, newCas)
})
if err == nil && e != nil {
c.postNewEvent(e)
Expand Down
29 changes: 15 additions & 14 deletions collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func TestEvalSubdocPaths(t *testing.T) {
assert.Error(t, err)
}

func initSubDocTest(t *testing.T) sgbucket.DataStore {
func initSubDocTest(t *testing.T) (CAS, sgbucket.DataStore) {
ensureNoLeaks(t)

coll := makeTestBucket(t).DefaultDataStore()
Expand All @@ -199,41 +199,42 @@ func initSubDocTest(t *testing.T) sgbucket.DataStore {
var fullDoc map[string]any
cas, err := coll.Get("key", &fullDoc)
assert.NoError(t, err)
assert.Equal(t, CAS(1), cas)
assert.Greater(t, cas, CAS(0))

return coll
return cas, coll
}

func TestWriteSubDoc(t *testing.T) {
ctx := testCtx(t)
coll := initSubDocTest(t)
initialCas, coll := initSubDocTest(t)

// update json
rawJson := []byte(`"was here"`)
// test update using incorrect cas value
cas, err := coll.WriteSubDoc(ctx, "key", "rosmar", 10, rawJson)
cas1, err := coll.WriteSubDoc(ctx, "key", "rosmar", 10, rawJson)
assert.Error(t, err)
assert.Equal(t, CAS(0), cas1)

// test update using correct cas value
cas, err = coll.WriteSubDoc(ctx, "key", "rosmar", cas, rawJson)
cas2, err := coll.WriteSubDoc(ctx, "key", "rosmar", initialCas, rawJson)
assert.NoError(t, err)
assert.Equal(t, CAS(2), cas)
assert.Greater(t, cas2, initialCas)

var fullDoc map[string]any
cas, err = coll.Get("key", &fullDoc)
cas2Get, err := coll.Get("key", &fullDoc)
assert.NoError(t, err)
assert.Equal(t, CAS(2), cas)
assert.Equal(t, cas2, cas2Get)
assert.EqualValues(t, map[string]any{"rosmar": "was here"}, fullDoc)

// test update using 0 cas value
cas, err = coll.WriteSubDoc(ctx, "key", "rosmar", 0, rawJson)
cas3, err := coll.WriteSubDoc(ctx, "key", "rosmar", 0, rawJson)
assert.NoError(t, err)
assert.Equal(t, CAS(3), cas)
assert.Greater(t, cas3, cas2)
}

func TestInsertSubDoc(t *testing.T) {
ctx := testCtx(t)
coll := initSubDocTest(t)
initialCas, coll := initSubDocTest(t)

rosmarMap := map[string]any{"foo": "lol", "bar": "baz"}
expectedDoc := map[string]any{"rosmar": rosmarMap}
Expand All @@ -243,13 +244,13 @@ func TestInsertSubDoc(t *testing.T) {
assert.Error(t, err)

// test update
err = coll.SubdocInsert(ctx, "key", "rosmar.kilroy", CAS(1), "was here")
err = coll.SubdocInsert(ctx, "key", "rosmar.kilroy", initialCas, "was here")
assert.NoError(t, err)

var fullDoc map[string]any
cas, err := coll.Get("key", &fullDoc)
assert.NoError(t, err)
assert.Equal(t, CAS(2), cas)
assert.Greater(t, cas, initialCas)

rosmarMap["kilroy"] = "was here"
assert.EqualValues(t, expectedDoc, fullDoc)
Expand Down
31 changes: 19 additions & 12 deletions feeds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ func TestMutations(t *testing.T) {
require.NoError(t, err)
}()

readExpectedEventsDEF(t, events, 4)
readExpectedEventsDEF(t, events)

// Read the mutation of "eskimo":
e := <-events
e.TimeReceived = time.Time{}
assert.Equal(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpDeletion, Key: []byte("eskimo"), Cas: 7, DataType: sgbucket.FeedDataTypeRaw}, e)
assertEventEquals(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpDeletion, Key: []byte("eskimo"), DataType: sgbucket.FeedDataTypeRaw}, e)

require.NoError(t, bucket.CloseAndDelete(testCtx(t)))

Expand Down Expand Up @@ -130,7 +130,7 @@ func TestCheckpoint(t *testing.T) {
e := <-events
assert.Equal(t, "Checkpoint:myID", string(e.Key))

readExpectedEventsDEF(t, events, 5)
readExpectedEventsDEF(t, events)

event = <-events
assert.Equal(t, sgbucket.FeedOpEndBackfill, event.Opcode)
Expand All @@ -157,28 +157,35 @@ func startFeedWithArgs(t *testing.T, bucket *Bucket, args sgbucket.FeedArguments
return events, args.DoneChan
}

func assertEventEquals(t *testing.T, expected sgbucket.FeedEvent, actual sgbucket.FeedEvent) {
assert.Equal(t, expected.Opcode, actual.Opcode)
assert.Equal(t, expected.Key, actual.Key)
assert.Equal(t, expected.Value, actual.Value)
assert.Equal(t, expected.DataType, actual.DataType)
}

func readExpectedEventsABC(t *testing.T, events chan sgbucket.FeedEvent) {
e := <-events
e.TimeReceived = time.Time{}
assert.Equal(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpMutation, Key: []byte("able"), Value: []byte(`"A"`), Cas: 1, DataType: sgbucket.FeedDataTypeJSON}, e)
assertEventEquals(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpMutation, Key: []byte("able"), Value: []byte(`"A"`), DataType: sgbucket.FeedDataTypeJSON}, e)
e = <-events
e.TimeReceived = time.Time{}
assert.Equal(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpMutation, Key: []byte("baker"), Value: []byte(`"B"`), Cas: 2, DataType: sgbucket.FeedDataTypeJSON}, e)
assertEventEquals(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpMutation, Key: []byte("baker"), Value: []byte(`"B"`), DataType: sgbucket.FeedDataTypeJSON}, e)
e = <-events
e.TimeReceived = time.Time{}
assert.Equal(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpMutation, Key: []byte("charlie"), Value: []byte(`"C"`), Cas: 3, DataType: sgbucket.FeedDataTypeJSON}, e)
assertEventEquals(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpMutation, Key: []byte("charlie"), Value: []byte(`"C"`), DataType: sgbucket.FeedDataTypeJSON}, e)
}

func readExpectedEventsDEF(t *testing.T, events chan sgbucket.FeedEvent, cas CAS) {
func readExpectedEventsDEF(t *testing.T, events chan sgbucket.FeedEvent) {
e := <-events
e.TimeReceived = time.Time{}
assert.Equal(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpMutation, Key: []byte("delta"), Value: []byte(`"D"`), Cas: cas, DataType: sgbucket.FeedDataTypeJSON}, e)
assertEventEquals(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpMutation, Key: []byte("delta"), Value: []byte(`"D"`), DataType: sgbucket.FeedDataTypeJSON}, e)
e = <-events
e.TimeReceived = time.Time{}
assert.Equal(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpMutation, Key: []byte("eskimo"), Value: []byte(`"E"`), Cas: cas + 1, DataType: sgbucket.FeedDataTypeJSON}, e)
assertEventEquals(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpMutation, Key: []byte("eskimo"), Value: []byte(`"E"`), DataType: sgbucket.FeedDataTypeJSON}, e)
e = <-events
e.TimeReceived = time.Time{}
assert.Equal(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpMutation, Key: []byte("fahrvergnügen"), Value: []byte(`"F"`), Cas: cas + 2, DataType: sgbucket.FeedDataTypeJSON}, e)
assertEventEquals(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpMutation, Key: []byte("fahrvergnügen"), Value: []byte(`"F"`), DataType: sgbucket.FeedDataTypeJSON}, e)
}

func TestCrossBucketEvents(t *testing.T) {
Expand Down Expand Up @@ -209,8 +216,8 @@ func TestCrossBucketEvents(t *testing.T) {
require.NoError(t, err)
}()

readExpectedEventsDEF(t, events, 4)
readExpectedEventsDEF(t, events2, 4)
readExpectedEventsDEF(t, events)
readExpectedEventsDEF(t, events2)

bucket.Close(testCtx(t))
require.NoError(t, bucket2.CloseAndDelete(testCtx(t)))
Expand Down
59 changes: 59 additions & 0 deletions hlc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2023-Present Couchbase, Inc.
//
// Use of this software is governed by the Business Source License included
// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
// in that file, in accordance with the Business Source License, use of this
// software will be governed by the Apache License, Version 2.0, included in
// the file licenses/APL2.txt.

package rosmar

import (
"sync"
"time"
)

type timestamp uint64

// hybridLogicalClock is a hybrid logical clock implementation for rosmar that produces timestamps that will always be increasing regardless of clock changes.
type hybridLogicalClock struct {
clock clock
counter timestamp
highestTime timestamp
mutex sync.Mutex
}

// clock interface is used to abstract the system clock for testing purposes.
type clock interface {
// getTime returns the current time in nanoseconds.
getTime() timestamp
}

type systemClock struct{}

// getTime returns the current time in nanoseconds.
func (c *systemClock) getTime() timestamp {
return timestamp(time.Now().UnixNano())
}

// NewHybridLogicalClock returns a new HLC from a previously initialized time.
func NewHybridLogicalClock(lastTime timestamp) *hybridLogicalClock {
return &hybridLogicalClock{
highestTime: lastTime,
clock: &systemClock{},
}
}

// Now returns the next time represented in nanoseconds. This can be the current timestamp, or if multiple occur in the same nanosecond, an increasing timestamp.
func (c *hybridLogicalClock) Now() timestamp {
c.mutex.Lock()
defer c.mutex.Unlock()
physicalTime := c.clock.getTime()
if c.highestTime >= physicalTime {
c.counter++
} else {
c.counter = 0
c.highestTime = physicalTime
}
return c.highestTime + c.counter
}
100 changes: 100 additions & 0 deletions hlc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2023-Present Couchbase, Inc.
//
// Use of this software is governed by the Business Source License included
// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
// in that file, in accordance with the Business Source License, use of this
// software will be governed by the Apache License, Version 2.0, included in
// the file licenses/APL2.txt.

package rosmar

import (
"sync"
"testing"

"github.com/stretchr/testify/require"
)

func TestHybridLogicalClockNow(t *testing.T) {
clock := hybridLogicalClock{clock: &systemClock{}}
timestamp1 := clock.Now()
timestamp2 := clock.Now()
require.Greater(t, timestamp2, timestamp1)
}

func generateTimestamps(wg *sync.WaitGroup, clock *hybridLogicalClock, n int, result chan []timestamp) {
defer wg.Done()
timestamps := make([]timestamp, n)
for i := 0; i < n; i++ {
timestamps[i] = clock.Now()
}
result <- timestamps
}

func TestHLCNowConcurrent(t *testing.T) {
clock := hybridLogicalClock{clock: &systemClock{}}
goroutines := 100
timestampCount := 100

wg := sync.WaitGroup{}
results := make(chan []timestamp)
for i := 0; i < goroutines; i++ {
wg.Add(1)
go generateTimestamps(&wg, &clock, timestampCount, results)
}

doneChan := make(chan struct{})
go func() {
wg.Wait()
doneChan <- struct{}{}
}()
allTimestamps := make([]timestamp, 0, goroutines*timestampCount)
loop:
for {
select {
case timestamps := <-results:
allTimestamps = append(allTimestamps, timestamps...)
case <-doneChan:
break loop
}
}
uniqueTimestamps := make(map[timestamp]struct{})
for _, timestamp := range allTimestamps {
if _, ok := uniqueTimestamps[timestamp]; ok {
t.Errorf("Timestamp %d is not unique", timestamp)
}
uniqueTimestamps[timestamp] = struct{}{}
}
}

type fakeClock struct {
time timestamp
}

func (c *fakeClock) getTime() timestamp {
return c.time
}

func TestHLCReverseTime(t *testing.T) {
clock := &fakeClock{}
hlc := hybridLogicalClock{clock: clock}
require.Equal(t, timestamp(1), hlc.Now())
require.Equal(t, timestamp(2), hlc.Now())

// reverse time no counter
clock.time = timestamp(0)
require.Equal(t, timestamp(3), hlc.Now())

// reset time to normal
clock.time = timestamp(6)
require.Equal(t, timestamp(6), hlc.Now())

// reverse time again
clock.time = timestamp(1)
require.Equal(t, timestamp(7), hlc.Now())

// jump to a value we had previously
clock.time = timestamp(6)
require.Equal(t, int(timestamp(8)), int(hlc.Now()))
require.Equal(t, int(timestamp(9)), int(hlc.Now()))
}

0 comments on commit 87c40f1

Please sign in to comment.