Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved UpdateAt and Fetch #24

Merged
merged 2 commits into from
Sep 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 19 additions & 26 deletions collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,18 +134,28 @@ func (c *Collection) InsertWithTTL(obj Object, ttl time.Duration) (index uint32)
return
}

// UpdateAt updates a specific row/column combination and sets the value. It is also
// possible to update during the query, which is much more convenient to use.
func (c *Collection) UpdateAt(idx uint32, columnName string, value interface{}) {
c.Query(func(txn *Txn) error {
if cursor, err := txn.cursorFor(columnName); err == nil {
cursor.idx = idx
cursor.Set(value)
}
return nil
// UpdateAt updates a specific row by initiating a separate transaction for the update.
func (c *Collection) UpdateAt(idx uint32, columnName string, fn func(v Cursor) error) error {
return c.Query(func(txn *Txn) error {
return txn.UpdateAt(idx, columnName, fn)
})
}

// SelectAt performs a selection on a specific row specified by its index.
func (c *Collection) SelectAt(idx uint32, fn func(v Selector)) bool {
c.lock.RLock()
contains := c.fill.Contains(idx)
c.lock.RUnlock()

// If it's empty or over the sequence, not found
if idx >= uint32(len(c.fill))<<6 || !contains {
return false
}

fn(Selector{idx: idx, col: c})
return true
}

// DeleteAt attempts to delete an item at the specified index for this collection. If the item
// exists, it marks at as deleted and returns true, otherwise it returns false.
func (c *Collection) DeleteAt(idx uint32) (deleted bool) {
Expand Down Expand Up @@ -231,23 +241,6 @@ func (c *Collection) DropIndex(indexName string) error {
return nil
}

// Fetch retrieves an object by its handle and returns a Selector for it.
func (c *Collection) Fetch(idx uint32) (Selector, bool) {
c.lock.RLock()
contains := c.fill.Contains(idx)
c.lock.RUnlock()

// If it's empty or over the sequence, not found
if idx >= uint32(len(c.fill))<<6 || !contains {
return Selector{}, false
}

return Selector{
idx: idx,
col: c,
}, true
}

// Query creates a transaction which allows for filtering and iteration over the
// columns in this collection. It also allows for individual rows to be modified or
// deleted during iteration (range), but the actual operations will be queued and
Expand Down
84 changes: 47 additions & 37 deletions collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ import (

/*
cpu: Intel(R) Core(TM) i7-9700K CPU @ 3.60GHz
BenchmarkCollection/insert-8 1861 582483 ns/op 1249 B/op 1 allocs/op
BenchmarkCollection/fetch-8 30763866 38.66 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/scan-8 1906 618875 ns/op 102 B/op 0 allocs/op
BenchmarkCollection/count-8 748754 1416 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/range-8 16807 71049 ns/op 7 B/op 0 allocs/op
BenchmarkCollection/update-at-8 3753175 330.0 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/update-all-8 1156 994670 ns/op 4133 B/op 0 allocs/op
BenchmarkCollection/delete-at-8 8459896 146.6 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/delete-all-8 2460322 478.9 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/insert-8 2167 578821 ns/op 1223 B/op 1 allocs/op
BenchmarkCollection/select-at-8 42703713 27.72 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/scan-8 2032 598751 ns/op 49 B/op 0 allocs/op
BenchmarkCollection/count-8 800036 1498 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/range-8 16833 70556 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/update-at-8 3689354 323.6 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/update-all-8 1198 1003934 ns/op 4004 B/op 0 allocs/op
BenchmarkCollection/delete-at-8 8071692 145.7 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/delete-all-8 2328974 494.7 ns/op 0 B/op 0 allocs/op
*/
func BenchmarkCollection(b *testing.B) {
b.Run("insert", func(b *testing.B) {
Expand All @@ -58,14 +58,14 @@ func BenchmarkCollection(b *testing.B) {

amount := 100000
players := loadPlayers(amount)
b.Run("fetch", func(b *testing.B) {
b.Run("select-at", func(b *testing.B) {
name := ""
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
if s, ok := players.Fetch(20); ok {
name = s.StringAt("name")
}
players.SelectAt(20, func(v Selector) {
name = v.StringAt("name")
})
}
assert.NotEmpty(b, name)
})
Expand Down Expand Up @@ -118,7 +118,10 @@ func BenchmarkCollection(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
players.UpdateAt(20, "balance", 1.0)
players.UpdateAt(20, "balance", func(v Cursor) error {
v.Set(1.0)
return nil
})
}
})

Expand Down Expand Up @@ -214,14 +217,17 @@ func runReplication(t *testing.T, updates, inserts, concurrency int) {

// Randomly update a column
offset := uint32(rand.Int31n(int32(inserts - 1)))
switch rand.Int31n(3) {
case 0:
primary.UpdateAt(offset, "float64", math.Round(rand.Float64()*1000)/100)
case 1:
primary.UpdateAt(offset, "int32", rand.Int31n(100000))
case 2:
primary.UpdateAt(offset, "string", fmt.Sprintf("hi %v", rand.Int31n(10)))
}
primary.UpdateAt(offset, "float64", func(v Cursor) error {
switch rand.Int31n(3) {
case 0:
v.SetFloat64(math.Round(rand.Float64()*1000) / 100)
case 1:
v.SetInt32At("int32", rand.Int31n(100000))
case 2:
v.SetStringAt("string", fmt.Sprintf("hi %v", rand.Int31n(10)))
}
return nil
})

// Randomly delete an item
if rand.Int31n(5) == 0 {
Expand Down Expand Up @@ -250,13 +256,13 @@ func runReplication(t *testing.T, updates, inserts, concurrency int) {
return txn.Range("float64", func(v Cursor) {
v1, v2 := v.FloatAt("float64"), v.IntAt("int32")
if v1 != 0 {
clone, ok := replica.Fetch(v.idx)
clone, ok := txn.ReadAt(v.idx)
assert.True(t, ok)
assert.Equal(t, v.FloatAt("float64"), clone.FloatAt("float64"))
}

if v2 != 0 {
clone, ok := replica.Fetch(v.idx)
clone, ok := txn.ReadAt(v.idx)
assert.True(t, ok)
assert.Equal(t, v.IntAt("int32"), clone.IntAt("int32"))
}
Expand Down Expand Up @@ -288,30 +294,34 @@ func TestCollection(t *testing.T) {
}))

{ // Find the object by its index
v, ok := col.Fetch(idx)
assert.True(t, ok)
assert.Equal(t, "Roman", v.StringAt("name"))
assert.True(t, col.SelectAt(idx, func(v Selector) {
assert.Equal(t, "Roman", v.StringAt("name"))
}))
}

{ // Remove the object
col.DeleteAt(idx)
_, ok := col.Fetch(idx)
assert.False(t, ok)
assert.False(t, col.SelectAt(idx, func(v Selector) {
assert.Fail(t, "unreachable")
}))
}

{ // Add a new one, should replace
idx := col.Insert(obj)
v, ok := col.Fetch(idx)
assert.True(t, ok)
assert.Equal(t, "Roman", v.StringAt("name"))
assert.True(t, col.SelectAt(idx, func(v Selector) {
assert.Equal(t, "Roman", v.StringAt("name"))
}))
}

{ // Update the wallet
col.UpdateAt(idx, "wallet", float64(1000))
v, ok := col.Fetch(idx)
assert.True(t, ok)
assert.Equal(t, int64(1000), v.IntAt("wallet"))
assert.Equal(t, true, v.BoolAt("rich"))
col.UpdateAt(idx, "wallet", func(v Cursor) error {
v.SetFloat64(1000)
return nil
})
assert.True(t, col.SelectAt(idx, func(v Selector) {
assert.Equal(t, int64(1000), v.IntAt("wallet"))
assert.Equal(t, true, v.BoolAt("rich"))
}))
}

{ // Drop the colun
Expand Down
69 changes: 35 additions & 34 deletions examples/bench/README.md
Original file line number Diff line number Diff line change
@@ -1,43 +1,44 @@
# Concurrency Benchmark

This is an example benchmark with various workloads (90% read / 10% write, etc) on a collection of 1 million elements with different goroutine pools. In this example we're combining two types of transactions:
* Read transactions that query a random index and iterate over the results over a single column.
* Read transactions that update a random element (point-read).
* Write transactions that update a random element (point-write).

Note that the goal of this benchmark is to validate concurrency, not throughput this represents the current "best" case scenario when the updates are random and do less likely to incur contention. Reads, however quite often would hit the same chunks as only the index itself is randomized.

```
90%-10% 1 procs 249,208,310 read/s 117 write/s
90%-10% 8 procs 1,692,667,386 read/s 738 write/s
90%-10% 16 procs 1,509,926,215 read/s 635 write/s
90%-10% 32 procs 1,489,456,934 read/s 660 write/s
90%-10% 64 procs 1,533,053,898 read/s 666 write/s
90%-10% 128 procs 1,495,078,423 read/s 654 write/s
90%-10% 256 procs 1,443,437,689 read/s 656 write/s
90%-10% 512 procs 1,464,321,958 read/s 704 write/s
90%-10% 1024 procs 1,495,877,020 read/s 635 write/s
90%-10% 2048 procs 1,413,233,904 read/s 658 write/s
90%-10% 4096 procs 1,376,644,077 read/s 743 write/s
50%-50% 1 procs 236,528,133 read/s 861 write/s
50%-50% 8 procs 1,589,501,618 read/s 6,335 write/s
50%-50% 16 procs 1,607,166,585 read/s 6,484 write/s
50%-50% 32 procs 1,575,200,925 read/s 6,438 write/s
50%-50% 64 procs 1,432,978,587 read/s 5,808 write/s
50%-50% 128 procs 1,181,986,760 read/s 4,606 write/s
50%-50% 256 procs 1,529,174,062 read/s 6,180 write/s
50%-50% 512 procs 1,472,102,974 read/s 5,961 write/s
50%-50% 1024 procs 1,399,040,792 read/s 6,066 write/s
50%-50% 2048 procs 1,295,570,830 read/s 5,919 write/s
50%-50% 4096 procs 1,181,556,697 read/s 5,871 write/s
10%-90% 1 procs 199,670,671 read/s 7,119 write/s
10%-90% 8 procs 1,224,172,050 read/s 44,464 write/s
10%-90% 16 procs 1,317,755,536 read/s 46,451 write/s
10%-90% 32 procs 1,429,807,620 read/s 51,758 write/s
10%-90% 64 procs 1,413,067,976 read/s 51,304 write/s
10%-90% 128 procs 1,302,410,992 read/s 46,375 write/s
10%-90% 256 procs 1,223,553,655 read/s 45,110 write/s
10%-90% 512 procs 1,120,740,609 read/s 42,799 write/s
10%-90% 1024 procs 1,071,064,037 read/s 41,519 write/s
10%-90% 2048 procs 1,044,805,034 read/s 42,868 write/s
10%-90% 4096 procs 877,312,822 read/s 42,910 write/s
WORK PROCS READS WRITES
90%-10% 1 procs 51,642 txn/s 5,884 txn/s
90%-10% 8 procs 195,201 txn/s 21,803 txn/s
90%-10% 16 procs 311,078 txn/s 34,519 txn/s
90%-10% 32 procs 370,100 txn/s 41,225 txn/s
90%-10% 64 procs 374,964 txn/s 41,582 txn/s
90%-10% 128 procs 347,933 txn/s 38,589 txn/s
90%-10% 256 procs 337,840 txn/s 37,329 txn/s
90%-10% 512 procs 342,272 txn/s 37,692 txn/s
90%-10% 1024 procs 339,367 txn/s 37,049 txn/s
90%-10% 2048 procs 327,060 txn/s 35,568 txn/s
90%-10% 4096 procs 314,160 txn/s 32,818 txn/s
50%-50% 1 procs 28,944 txn/s 29,054 txn/s
50%-50% 8 procs 59,487 txn/s 59,342 txn/s
50%-50% 16 procs 70,271 txn/s 70,276 txn/s
50%-50% 32 procs 70,067 txn/s 69,796 txn/s
50%-50% 64 procs 61,443 txn/s 61,559 txn/s
50%-50% 128 procs 54,985 txn/s 54,760 txn/s
50%-50% 256 procs 53,684 txn/s 53,465 txn/s
50%-50% 512 procs 62,488 txn/s 61,967 txn/s
50%-50% 1024 procs 69,211 txn/s 68,090 txn/s
50%-50% 2048 procs 74,262 txn/s 73,639 txn/s
50%-50% 4096 procs 77,700 txn/s 75,452 txn/s
10%-90% 1 procs 4,811 txn/s 43,825 txn/s
10%-90% 8 procs 8,585 txn/s 77,136 txn/s
10%-90% 16 procs 8,582 txn/s 77,260 txn/s
10%-90% 32 procs 8,866 txn/s 79,127 txn/s
10%-90% 64 procs 8,090 txn/s 73,265 txn/s
10%-90% 128 procs 7,412 txn/s 67,985 txn/s
10%-90% 256 procs 6,473 txn/s 58,903 txn/s
10%-90% 512 procs 6,916 txn/s 61,835 txn/s
10%-90% 1024 procs 7,989 txn/s 71,794 txn/s
10%-90% 2048 procs 8,930 txn/s 78,657 txn/s
10%-90% 4096 procs 9,231 txn/s 81,465 txn/s
```
31 changes: 14 additions & 17 deletions examples/bench/bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func main() {
createCollection(players, amount)

// Iterate over various workloads
fmt.Printf(" WORK PROCS READS WRITES\n")
for _, w := range []int{10, 50, 90} {

// Iterate over various concurrency levels
Expand All @@ -55,23 +56,19 @@ func main() {
defer wg.Done()
offset := uint32(rand.Uint32n(uint32(amount - 1)))

// Given our write probabiliy, randomly update an offset
// Given our write probabiliy, randomly read/write at an offset
if rand.Uint32n(100) < uint32(w) {
players.UpdateAt(offset, "balance", 0.0)
players.UpdateAt(offset, "balance", func(v column.Cursor) error {
v.SetFloat64(0)
return nil
})
atomic.AddInt64(&writes, 1)
return nil, nil
}

// Otherwise, randomly read something
players.Query(func(txn *column.Txn) error {
var count int64
txn.With(races[rand.Uint32n(4)]).Range("balance", func(v column.Cursor) {
count++
} else {
players.SelectAt(offset, func(v column.Selector) {
_ = v.FloatAt("balance") // Read
})
atomic.AddInt64(&reads, count)
return nil
})

atomic.AddInt64(&reads, 1)
}
return nil, nil
})
}
Expand All @@ -82,9 +79,9 @@ func main() {

wg.Wait()
pool.Cancel()
fmt.Printf("%v%%-%v%% %4v procs %20v %15v\n", 100-w, w, n,
humanize.Comma(readsPerSec)+" read/s",
humanize.Comma(writesPerSec)+" write/s",
fmt.Printf("%v%%-%v%% %4v procs %15v %15v\n", 100-w, w, n,
humanize.Comma(readsPerSec)+" txn/s",
humanize.Comma(writesPerSec)+" txn/s",
)
}

Expand Down
11 changes: 11 additions & 0 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,17 @@ func (txn *Txn) Count() int {
return int(txn.index.Count())
}

// UpdateAt creates a cursor to a specific element that can be read or updated.
func (txn *Txn) UpdateAt(index uint32, columnName string, fn func(v Cursor) error) error {
cursor, err := txn.cursorFor(columnName)
if err != nil {
return err
}

cursor.idx = index
return fn(cursor)
}

// ReadAt returns a selector for a specified index together with a boolean value that indicates
// whether an element is present at the specified index or not.
func (txn *Txn) ReadAt(index uint32) (Selector, bool) {
Expand Down
27 changes: 27 additions & 0 deletions txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,3 +469,30 @@ func TestUninitializedSet(t *testing.T) {
})
}))
}

func TestUpdateAt(t *testing.T) {
c := NewCollection()
c.CreateColumn("col1", ForString())
index := c.Insert(map[string]interface{}{
"col1": "hello",
})

assert.NoError(t, c.UpdateAt(index, "col1", func(v Cursor) error {
v.Set("hi")
return nil
}))

assert.True(t, c.SelectAt(index, func(v Selector) {
assert.Equal(t, "hi", v.StringAt("col1"))
}))
}

func TestUpdateAtInvalid(t *testing.T) {
c := NewCollection()
c.CreateColumn("col1", ForString())

assert.Error(t, c.UpdateAt(0, "col2", func(v Cursor) error {
v.SetString("hi")
return nil
}))
}