Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-3627 implement hlc for each bucket #23

Merged
merged 4 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Bucket struct {
serial uint32 // Serial number for logging
inMemory bool // True if it's an in-memory database
closed bool // represents state when it is closed
hlc *hybridLogicalClock
}

type collectionsMap = map[sgbucket.DataStoreNameImpl]*Collection
Expand Down Expand Up @@ -195,6 +196,8 @@ func OpenBucket(urlStr string, bucketName string, mode OpenMode) (b *Bucket, err
return nil, err
}

bucket.hlc = NewHybridLogicalClock(bucket.getLastTimestamp())

exists, bucketCopy := registerBucket(bucket)
// someone else beat registered the bucket in the registry, that's OK we'll close ours
if exists {
Expand Down Expand Up @@ -385,6 +388,7 @@ func (b *Bucket) copy() *Bucket {
expManager: b.expManager,
serial: b.serial,
inMemory: b.inMemory,
hlc: b.hlc,
}
return r
}
Expand Down
22 changes: 14 additions & 8 deletions collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,14 @@ func (c *Collection) getLastCas(q queryable) (cas CAS, err error) {
return
}

// getLastTimestamp returns the last timestamp assigned to any doc in bucket. Returns 0 in the case of no cas values assigned.
func (bucket *Bucket) getLastTimestamp() timestamp {
var lastTimestamp timestamp
row := bucket.db().QueryRow("SELECT lastCas FROM bucket")
_ = scan(row, &lastTimestamp)
return timestamp(lastTimestamp)
}

// Updates the collection's and the bucket's lastCas.
func (c *Collection) setLastCas(txn *sql.Tx, cas CAS) (err error) {
_, err = txn.Exec(`UPDATE bucket SET lastCas=?1`, cas)
Expand All @@ -597,15 +605,13 @@ func (c *Collection) setLastCas(txn *sql.Tx, cas CAS) (err error) {
func (c *Collection) withNewCas(fn func(txn *sql.Tx, newCas CAS) (*event, error)) error {
var e *event
err := c.bucket.inTransaction(func(txn *sql.Tx) error {
newCas, err := c.bucket.getLastCas(txn)
if err == nil {
newCas++
e, err = fn(txn, newCas)
if err == nil {
err = c.setLastCas(txn, newCas)
}
newCas := uint64(c.bucket.hlc.Now())
var err error
e, err = fn(txn, newCas)
if err != nil {
return err
}
return err
return c.setLastCas(txn, newCas)
})
if err == nil && e != nil {
c.postNewEvent(e)
Expand Down
29 changes: 15 additions & 14 deletions collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func TestEvalSubdocPaths(t *testing.T) {
assert.Error(t, err)
}

func initSubDocTest(t *testing.T) sgbucket.DataStore {
func initSubDocTest(t *testing.T) (CAS, sgbucket.DataStore) {
ensureNoLeaks(t)

coll := makeTestBucket(t).DefaultDataStore()
Expand All @@ -199,41 +199,42 @@ func initSubDocTest(t *testing.T) sgbucket.DataStore {
var fullDoc map[string]any
cas, err := coll.Get("key", &fullDoc)
assert.NoError(t, err)
assert.Equal(t, CAS(1), cas)
assert.Greater(t, cas, CAS(0))

return coll
return cas, coll
}

func TestWriteSubDoc(t *testing.T) {
ctx := testCtx(t)
coll := initSubDocTest(t)
initialCas, coll := initSubDocTest(t)

// update json
rawJson := []byte(`"was here"`)
// test update using incorrect cas value
cas, err := coll.WriteSubDoc(ctx, "key", "rosmar", 10, rawJson)
cas1, err := coll.WriteSubDoc(ctx, "key", "rosmar", 10, rawJson)
assert.Error(t, err)
assert.Equal(t, CAS(0), cas1)

// test update using correct cas value
cas, err = coll.WriteSubDoc(ctx, "key", "rosmar", cas, rawJson)
cas2, err := coll.WriteSubDoc(ctx, "key", "rosmar", initialCas, rawJson)
assert.NoError(t, err)
assert.Equal(t, CAS(2), cas)
assert.Greater(t, cas2, initialCas)

var fullDoc map[string]any
cas, err = coll.Get("key", &fullDoc)
cas2Get, err := coll.Get("key", &fullDoc)
assert.NoError(t, err)
assert.Equal(t, CAS(2), cas)
assert.Equal(t, cas2, cas2Get)
assert.EqualValues(t, map[string]any{"rosmar": "was here"}, fullDoc)

// test update using 0 cas value
cas, err = coll.WriteSubDoc(ctx, "key", "rosmar", 0, rawJson)
cas3, err := coll.WriteSubDoc(ctx, "key", "rosmar", 0, rawJson)
assert.NoError(t, err)
assert.Equal(t, CAS(3), cas)
assert.Greater(t, cas3, cas2)
}

func TestInsertSubDoc(t *testing.T) {
ctx := testCtx(t)
coll := initSubDocTest(t)
initialCas, coll := initSubDocTest(t)

rosmarMap := map[string]any{"foo": "lol", "bar": "baz"}
expectedDoc := map[string]any{"rosmar": rosmarMap}
Expand All @@ -243,13 +244,13 @@ func TestInsertSubDoc(t *testing.T) {
assert.Error(t, err)

// test update
err = coll.SubdocInsert(ctx, "key", "rosmar.kilroy", CAS(1), "was here")
err = coll.SubdocInsert(ctx, "key", "rosmar.kilroy", initialCas, "was here")
assert.NoError(t, err)

var fullDoc map[string]any
cas, err := coll.Get("key", &fullDoc)
assert.NoError(t, err)
assert.Equal(t, CAS(2), cas)
assert.Greater(t, cas, initialCas)

rosmarMap["kilroy"] = "was here"
assert.EqualValues(t, expectedDoc, fullDoc)
Expand Down
31 changes: 19 additions & 12 deletions feeds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ func TestMutations(t *testing.T) {
require.NoError(t, err)
}()

readExpectedEventsDEF(t, events, 4)
readExpectedEventsDEF(t, events)

// Read the mutation of "eskimo":
e := <-events
e.TimeReceived = time.Time{}
assert.Equal(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpDeletion, Key: []byte("eskimo"), Cas: 7, DataType: sgbucket.FeedDataTypeRaw}, e)
assertEventEquals(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpDeletion, Key: []byte("eskimo"), DataType: sgbucket.FeedDataTypeRaw}, e)

require.NoError(t, bucket.CloseAndDelete(testCtx(t)))

Expand Down Expand Up @@ -130,7 +130,7 @@ func TestCheckpoint(t *testing.T) {
e := <-events
assert.Equal(t, "Checkpoint:myID", string(e.Key))

readExpectedEventsDEF(t, events, 5)
readExpectedEventsDEF(t, events)

event = <-events
assert.Equal(t, sgbucket.FeedOpEndBackfill, event.Opcode)
Expand All @@ -157,28 +157,35 @@ func startFeedWithArgs(t *testing.T, bucket *Bucket, args sgbucket.FeedArguments
return events, args.DoneChan
}

func assertEventEquals(t *testing.T, expected sgbucket.FeedEvent, actual sgbucket.FeedEvent) {
assert.Equal(t, expected.Opcode, actual.Opcode)
assert.Equal(t, expected.Key, actual.Key)
assert.Equal(t, expected.Value, actual.Value)
assert.Equal(t, expected.DataType, actual.DataType)
}

func readExpectedEventsABC(t *testing.T, events chan sgbucket.FeedEvent) {
e := <-events
e.TimeReceived = time.Time{}
assert.Equal(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpMutation, Key: []byte("able"), Value: []byte(`"A"`), Cas: 1, DataType: sgbucket.FeedDataTypeJSON}, e)
assertEventEquals(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpMutation, Key: []byte("able"), Value: []byte(`"A"`), DataType: sgbucket.FeedDataTypeJSON}, e)
e = <-events
e.TimeReceived = time.Time{}
assert.Equal(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpMutation, Key: []byte("baker"), Value: []byte(`"B"`), Cas: 2, DataType: sgbucket.FeedDataTypeJSON}, e)
assertEventEquals(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpMutation, Key: []byte("baker"), Value: []byte(`"B"`), DataType: sgbucket.FeedDataTypeJSON}, e)
e = <-events
e.TimeReceived = time.Time{}
assert.Equal(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpMutation, Key: []byte("charlie"), Value: []byte(`"C"`), Cas: 3, DataType: sgbucket.FeedDataTypeJSON}, e)
assertEventEquals(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpMutation, Key: []byte("charlie"), Value: []byte(`"C"`), DataType: sgbucket.FeedDataTypeJSON}, e)
}

func readExpectedEventsDEF(t *testing.T, events chan sgbucket.FeedEvent, cas CAS) {
func readExpectedEventsDEF(t *testing.T, events chan sgbucket.FeedEvent) {
e := <-events
e.TimeReceived = time.Time{}
assert.Equal(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpMutation, Key: []byte("delta"), Value: []byte(`"D"`), Cas: cas, DataType: sgbucket.FeedDataTypeJSON}, e)
assertEventEquals(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpMutation, Key: []byte("delta"), Value: []byte(`"D"`), DataType: sgbucket.FeedDataTypeJSON}, e)
e = <-events
e.TimeReceived = time.Time{}
assert.Equal(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpMutation, Key: []byte("eskimo"), Value: []byte(`"E"`), Cas: cas + 1, DataType: sgbucket.FeedDataTypeJSON}, e)
assertEventEquals(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpMutation, Key: []byte("eskimo"), Value: []byte(`"E"`), DataType: sgbucket.FeedDataTypeJSON}, e)
e = <-events
e.TimeReceived = time.Time{}
assert.Equal(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpMutation, Key: []byte("fahrvergnügen"), Value: []byte(`"F"`), Cas: cas + 2, DataType: sgbucket.FeedDataTypeJSON}, e)
assertEventEquals(t, sgbucket.FeedEvent{Opcode: sgbucket.FeedOpMutation, Key: []byte("fahrvergnügen"), Value: []byte(`"F"`), DataType: sgbucket.FeedDataTypeJSON}, e)
}

func TestCrossBucketEvents(t *testing.T) {
Expand Down Expand Up @@ -209,8 +216,8 @@ func TestCrossBucketEvents(t *testing.T) {
require.NoError(t, err)
}()

readExpectedEventsDEF(t, events, 4)
readExpectedEventsDEF(t, events2, 4)
readExpectedEventsDEF(t, events)
readExpectedEventsDEF(t, events2)

bucket.Close(testCtx(t))
require.NoError(t, bucket2.CloseAndDelete(testCtx(t)))
Expand Down
59 changes: 59 additions & 0 deletions hlc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2023-Present Couchbase, Inc.
//
// Use of this software is governed by the Business Source License included
// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
// in that file, in accordance with the Business Source License, use of this
// software will be governed by the Apache License, Version 2.0, included in
// the file licenses/APL2.txt.

package rosmar

import (
"sync"
"time"
)

type timestamp uint64

// hybridLogicalClock is a hybrid logical clock implementation for rosmar that produces timestamps that will always be increasing regardless of clock changes.
type hybridLogicalClock struct {
clock clock
counter uint16
highestTime uint64
mutex sync.Mutex
}

// clock interface is used to abstract the system clock for testing purposes.
type clock interface {
// getTime returns the current time in nanoseconds.
getTime() uint64
}

type systemClock struct{}

// getTime returns the current time in nanoseconds.
func (c *systemClock) getTime() uint64 {
return uint64(time.Now().UnixNano())
}

// NewHybridLogicalClock returns a new HLC from a previously initialized time.
func NewHybridLogicalClock(lastTime timestamp) *hybridLogicalClock {
return &hybridLogicalClock{
highestTime: uint64(lastTime),
clock: &systemClock{},
}
}

// Now returns the next time represented in nanoseconds. This can be the current timestamp, or if multiple occur in the same nanosecond, an increasing timestamp.
func (c *hybridLogicalClock) Now() timestamp {
c.mutex.Lock()
defer c.mutex.Unlock()
physicalTime := c.clock.getTime() &^ 0xFFFF // round to 48 bits to allow counter at end
if c.highestTime >= physicalTime {
c.counter++
} else {
c.counter = 0
c.highestTime = physicalTime
}
return timestamp(c.highestTime | uint64(c.counter))
torcolvin marked this conversation as resolved.
Show resolved Hide resolved
}
107 changes: 107 additions & 0 deletions hlc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright 2023-Present Couchbase, Inc.
//
// Use of this software is governed by the Business Source License included
// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
// in that file, in accordance with the Business Source License, use of this
// software will be governed by the Apache License, Version 2.0, included in
// the file licenses/APL2.txt.

package rosmar

import (
"sync"
"testing"

"github.com/stretchr/testify/require"
)

func TestHybridLogicalClockNow(t *testing.T) {
clock := hybridLogicalClock{clock: &systemClock{}}
timestamp1 := clock.Now()
timestamp2 := clock.Now()
require.Greater(t, timestamp2, timestamp1)
}

func generateTimestamps(wg *sync.WaitGroup, clock *hybridLogicalClock, n int, result chan []timestamp) {
defer wg.Done()
timestamps := make([]timestamp, n)
for i := 0; i < n; i++ {
timestamps[i] = clock.Now()
}
result <- timestamps
}

func TestHLCNowConcurrent(t *testing.T) {
clock := hybridLogicalClock{clock: &systemClock{}}
goroutines := 100
timestampCount := 100

wg := sync.WaitGroup{}
results := make(chan []timestamp)
for i := 0; i < goroutines; i++ {
wg.Add(1)
go generateTimestamps(&wg, &clock, timestampCount, results)
}

doneChan := make(chan struct{})
go func() {
wg.Wait()
doneChan <- struct{}{}
}()
allTimestamps := make([]timestamp, 0, goroutines*timestampCount)
loop:
for {
select {
case timestamps := <-results:
allTimestamps = append(allTimestamps, timestamps...)
case <-doneChan:
break loop
}
}
uniqueTimestamps := make(map[timestamp]struct{})
for _, timestamp := range allTimestamps {
if _, ok := uniqueTimestamps[timestamp]; ok {
t.Errorf("Timestamp %d is not unique", timestamp)
}
uniqueTimestamps[timestamp] = struct{}{}
}
}

type fakeClock struct {
time uint64
}

func (c *fakeClock) getTime() uint64 {
return c.time
}

func TestHLCReverseTime(t *testing.T) {
clock := &fakeClock{}
hlc := hybridLogicalClock{clock: clock}
startTime := uint64(1000000) // 1 second
clock.time = startTime
require.Equal(t, timestamp(0xf0000), hlc.Now())
require.Equal(t, timestamp(0xf0001), hlc.Now())

// reverse time no counter
clock.time = 0
require.Equal(t, timestamp(0xf0002), hlc.Now())

// reset time to normal
clock.time = startTime
require.Equal(t, timestamp(0xf0003), hlc.Now())

// reverse time again
clock.time = 1
require.Equal(t, timestamp(0xf0004), hlc.Now())

// jump to a value we had previously
clock.time = startTime * 2
require.Equal(t, timestamp(0x1e0000), hlc.Now())
require.Equal(t, timestamp(0x1e0001), hlc.Now())

// continue forward
clock.time *= 2
require.Equal(t, timestamp(0x3d0000), hlc.Now())

}