Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed point read latch #26

Merged
merged 1 commit into from
Sep 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,18 +141,18 @@ func (c *Collection) UpdateAt(idx uint32, columnName string, fn func(v Cursor) e
})
}

// SelectAt performs a selection on a specific row specified by its index.
// SelectAt performs a selection on a specific row specified by its index. It returns
// a boolean value indicating whether an element is present at the index or not.
func (c *Collection) SelectAt(idx uint32, fn func(v Selector)) bool {
c.lock.RLock()
contains := c.fill.Contains(idx)
c.lock.RUnlock()

// If it's empty or over the sequence, not found
if idx >= uint32(len(c.fill))<<6 || !contains {
chunk := uint(idx >> chunkShift)
if idx >= uint32(len(c.fill))<<6 || !c.fill.Contains(idx) {
return false
}

// Lock the chunk which we are about to read and call the selector delegate
c.slock.RLock(chunk)
fn(Selector{idx: idx, col: c})
c.slock.RUnlock(chunk)
return true
}

Expand Down
66 changes: 57 additions & 9 deletions collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,15 +256,15 @@ func runReplication(t *testing.T, updates, inserts, concurrency int) {
return txn.Range("float64", func(v Cursor) {
v1, v2 := v.FloatAt("float64"), v.IntAt("int32")
if v1 != 0 {
clone, ok := txn.ReadAt(v.idx)
assert.True(t, ok)
assert.Equal(t, v.FloatAt("float64"), clone.FloatAt("float64"))
assert.True(t, txn.SelectAt(v.idx, func(s Selector) {
assert.Equal(t, v.FloatAt("float64"), s.FloatAt("float64"))
}))
}

if v2 != 0 {
clone, ok := txn.ReadAt(v.idx)
assert.True(t, ok)
assert.Equal(t, v.IntAt("int32"), clone.IntAt("int32"))
assert.True(t, txn.SelectAt(v.idx, func(s Selector) {
assert.Equal(t, v.IntAt("int32"), s.IntAt("int32"))
}))
}
})
})
Expand Down Expand Up @@ -341,9 +341,9 @@ func TestInsertObject(t *testing.T) {

assert.Equal(t, 2, col.Count())
assert.NoError(t, col.Query(func(txn *Txn) error {
selector, ok := txn.ReadAt(0)
assert.True(t, ok)
assert.Equal(t, "A", selector.StringAt("name"))
assert.True(t, txn.SelectAt(0, func(v Selector) {
assert.Equal(t, "A", v.StringAt("name"))
}))
return nil
}))
}
Expand Down Expand Up @@ -491,6 +491,54 @@ func TestInsertParallel(t *testing.T) {
}))
}

func TestConcurrentPointReads(t *testing.T) {
obj := Object{
"name": "Roman",
"age": 35,
"wallet": 50.99,
"health": 100,
"mana": 200,
}

col := NewCollection()
col.CreateColumnsOf(obj)
for i := 0; i < 1000; i++ {
col.Insert(obj)
}

var ops int64
var wg sync.WaitGroup
wg.Add(2)

// Reader
go func() {
for i := 0; i < 10000; i++ {
col.SelectAt(99, func(v Selector) {
_ = v.StringAt("name")
})
atomic.AddInt64(&ops, 1)
runtime.Gosched()
}
wg.Done()
}()

// Writer
go func() {
for i := 0; i < 10000; i++ {
col.UpdateAt(99, "name", func(v Cursor) error {
v.SetString("test")
return nil
})
atomic.AddInt64(&ops, 1)
runtime.Gosched()
}
wg.Done()
}()

wg.Wait()
assert.Equal(t, 20000, int(atomic.LoadInt64(&ops)))
}

// loadPlayers loads a list of players from the fixture
func loadPlayers(amount int) *Collection {
out := NewCollection(Options{
Expand Down
39 changes: 21 additions & 18 deletions examples/bench/bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"context"
"encoding/json"
"fmt"
"hash/crc32"
"math/bits"
"os"
"sync"
"sync/atomic"
Expand All @@ -31,13 +31,13 @@ func main() {
createCollection(players, amount)

// This runs point query benchmarks
runBenchmark("Point Reads/Writes", func(writeTxn bool) (reads int, writes int) {
runBenchmark("Point Reads/Writes", func(v uint32, writeTxn bool) (reads int, writes int) {

// To avoid task granuarity problem, load up a bit more work on each
// of the goroutines, a few hundred reads should be enough to amortize
// the cost of scheduling goroutines, so we can actually test our code.
for i := 0; i < 1000; i++ {
offset := randN(amount - 1)
offset := randN(v, amount-1)
if writeTxn {
players.UpdateAt(offset, "balance", func(v column.Cursor) error {
v.SetFloat64(0)
Expand All @@ -56,7 +56,7 @@ func main() {
}

// runBenchmark runs a benchmark
func runBenchmark(name string, fn func(bool) (int, int)) {
func runBenchmark(name string, fn func(uint32, bool) (int, int)) {
fmt.Printf("Benchmarking %v ...\n", name)
fmt.Printf("%7v\t%6v\t%17v\t%13v\n", "WORK", "PROCS", "READ RATE", "WRITE RATE")
for _, workload := range []int{0, 10, 50, 90, 100} {
Expand All @@ -69,12 +69,12 @@ func runBenchmark(name string, fn func(bool) (int, int)) {
var reads, writes int64
var wg sync.WaitGroup
start := time.Now()
for time.Since(start) < time.Second {
for i := uint32(0); time.Since(start) < time.Second; i++ {
wg.Add(1)
work <- async.NewTask(func(ctx context.Context) (interface{}, error) {
defer wg.Done()

r, w := fn(chanceOf(workload))
r, w := fn(i, chanceOf(i, workload))
atomic.AddInt64(&reads, int64(r))
atomic.AddInt64(&writes, int64(w))
return nil, nil
Expand Down Expand Up @@ -145,21 +145,24 @@ func createCollection(out *column.Collection, amount int) *column.Collection {
return out
}

var epoch uint32

// This random number generator not the most amazing one, but much better
// than using math.Rand for our benchmarks, since it would create a lock
// contention and bias the results.
func randN(n int) uint32 {
v := atomic.AddUint32(&epoch, 1)
return crc32.ChecksumIEEE([]byte{
byte(v >> 24),
byte(v >> 16),
byte(v >> 8),
byte(v),
}) % uint32(n)
func randN(v uint32, n int) uint32 {
return uint32(xxhash(v) % uint64(n))
}

func chanceOf(v uint32, chance int) bool {
return randN(v, 100) < uint32(chance)
}

func chanceOf(chance int) bool {
return randN(100) < uint32(chance)
func xxhash(v uint32) uint64 {
packed := uint64(v) + uint64(v)<<32
x := packed ^ (0x1cad21f72c81017c ^ 0xdb979083e96dd4de)
x ^= bits.RotateLeft64(x, 49) ^ bits.RotateLeft64(x, 24)
x *= 0x9fb21c651e98df25
x ^= (x >> 35) + 4 // len
x *= 0x9fb21c651e98df25
x ^= (x >> 28)
return x
}
15 changes: 4 additions & 11 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,17 +257,10 @@ func (txn *Txn) UpdateAt(index uint32, columnName string, fn func(v Cursor) erro
return fn(cursor)
}

// ReadAt returns a selector for a specified index together with a boolean value that indicates
// whether an element is present at the specified index or not.
func (txn *Txn) ReadAt(index uint32) (Selector, bool) {
if !txn.index.Contains(index) {
return Selector{}, false
}

return Selector{
idx: index,
txn: txn,
}, true
// SelectAt performs a selection on a specific row specified by its index. It returns
// a boolean value indicating whether an element is present at the index or not.
func (txn *Txn) SelectAt(index uint32, fn func(v Selector)) bool {
return txn.owner.SelectAt(index, fn)
}

// DeleteAt attempts to delete an item at the specified index for this transaction. If the item
Expand Down
7 changes: 2 additions & 5 deletions txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,8 @@ func TestIndexInvalid(t *testing.T) {
}))

players.Query(func(txn *Txn) error {
_, ok := txn.ReadAt(999999)
assert.False(t, ok)

_, ok = txn.ReadAt(0)
assert.True(t, ok)
assert.False(t, txn.SelectAt(999999, func(v Selector) {}))
assert.True(t, txn.SelectAt(0, func(v Selector) {}))
return nil
})

Expand Down