Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: Benchmark transaction iteration #289

Merged
merged 6 commits into from
Mar 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions bench/storage/txn_get_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2022 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package storage

import (
"context"
"fmt"
"testing"
)

func Benchmark_Storage_Simple_Txn_Read_Sync_1_1(b *testing.B) {
for _, vsz := range valueSize {
b.Run(fmt.Sprintf("ValueSize:%04d", vsz), func(b *testing.B) {
ctx := context.Background()
err := runStorageBenchTxnGet(b, ctx, vsz, 1, 1, true)
if err != nil {
b.Fatal(err)
}
})
}
}

func Benchmark_Storage_Simple_Txn_Read_Sync_2_2(b *testing.B) {
for _, vsz := range valueSize {
b.Run(fmt.Sprintf("ValueSize:%04d", vsz), func(b *testing.B) {
ctx := context.Background()
err := runStorageBenchTxnGet(b, ctx, vsz, 2, 2, true)
if err != nil {
b.Fatal(err)
}
})
}
}

func Benchmark_Storage_Simple_Txn_Read_Sync_10_10(b *testing.B) {
for _, vsz := range valueSize {
b.Run(fmt.Sprintf("ValueSize:%04d", vsz), func(b *testing.B) {
ctx := context.Background()
err := runStorageBenchTxnGet(b, ctx, vsz, 10, 10, true)
if err != nil {
b.Fatal(err)
}
})
}
}

func Benchmark_Storage_Simple_Txn_Read_Sync_100_100(b *testing.B) {
for _, vsz := range valueSize {
b.Run(fmt.Sprintf("ValueSize:%04d", vsz), func(b *testing.B) {
ctx := context.Background()
err := runStorageBenchTxnGet(b, ctx, vsz, 100, 100, true)
if err != nil {
b.Fatal(err)
}
})
}
}
65 changes: 65 additions & 0 deletions bench/storage/txn_iterator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2022 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package storage

import (
"context"
"fmt"
"testing"
)

func Benchmark_Storage_Simple_Txn_Iterator_Sync_1_1_1(b *testing.B) {
for _, vsz := range valueSize {
b.Run(fmt.Sprintf("ValueSize:%04d", vsz), func(b *testing.B) {
ctx := context.Background()
err := runStorageBenchTxnIterator(b, ctx, vsz, 1, 1, 1, true)
if err != nil {
b.Fatal(err)
}
})
}
}

func Benchmark_Storage_Simple_Txn_Iterator_Sync_2_1_2(b *testing.B) {
for _, vsz := range valueSize {
b.Run(fmt.Sprintf("ValueSize:%04d", vsz), func(b *testing.B) {
ctx := context.Background()
err := runStorageBenchTxnIterator(b, ctx, vsz, 2, 1, 2, true)
if err != nil {
b.Fatal(err)
}
})
}
}

func Benchmark_Storage_Simple_Txn_Iterator_Sync_10_1_10(b *testing.B) {
for _, vsz := range valueSize {
b.Run(fmt.Sprintf("ValueSize:%04d", vsz), func(b *testing.B) {
ctx := context.Background()
err := runStorageBenchTxnIterator(b, ctx, vsz, 10, 1, 10, true)
if err != nil {
b.Fatal(err)
}
})
}
}

func Benchmark_Storage_Simple_Txn_Iterator_Sync_100_1_100(b *testing.B) {
for _, vsz := range valueSize {
b.Run(fmt.Sprintf("ValueSize:%04d", vsz), func(b *testing.B) {
ctx := context.Background()
err := runStorageBenchTxnIterator(b, ctx, vsz, 100, 1, 100, true)
if err != nil {
b.Fatal(err)
}
})
}
}
139 changes: 138 additions & 1 deletion bench/storage/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@ package storage
import (
"context"
"math/rand"
"sort"
"testing"

ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"

benchutils "github.com/sourcenetwork/defradb/bench"
"github.com/sourcenetwork/defradb/db"
)

func runStorageBenchGet(b *testing.B, ctx context.Context, valueSize, objCount, opCount int, doSync bool) error {
Expand All @@ -36,7 +39,8 @@ func runStorageBenchGet(b *testing.B, ctx context.Context, valueSize, objCount,
b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := 0; j < opCount; j++ {
key := ds.NewKey(keys[rand.Int31n(int32(len(keys)))])
positionInInterval := getSampledIndex(len(keys), opCount, j)
key := ds.NewKey(keys[positionInInterval])
_, err := db.Get(ctx, key)
if err != nil {
return err
Expand All @@ -48,6 +52,97 @@ func runStorageBenchGet(b *testing.B, ctx context.Context, valueSize, objCount,
return nil
}

func runStorageBenchTxnGet(b *testing.B, ctx context.Context, valueSize, objCount, opCount int, doSync bool) error {
db, err := benchutils.NewTestDB(ctx, b)

if err != nil {
return err
}
defer db.Root().Close() //nolint

keys, err := backfillBenchmarkTxn(ctx, db, objCount, valueSize)
if err != nil {
return err
}

txn, err := db.NewTxn(ctx, false)
AndrewSisley marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
defer txn.Discard(ctx)

b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := 0; j < opCount; j++ {
positionInInterval := getSampledIndex(len(keys), opCount, j)
key := ds.NewKey(keys[positionInInterval])
_, err := txn.Rootstore().Get(ctx, key)
if err != nil {
return err
}
}
}
b.StopTimer()

return nil
}

func runStorageBenchTxnIterator(b *testing.B, ctx context.Context, valueSize, objCount, opCount, pointCount int, doSync bool) error {
db, err := benchutils.NewTestDB(ctx, b)

if err != nil {
return err
}
defer db.Root().Close() //nolint

keys, err := backfillBenchmarkTxn(ctx, db, objCount, valueSize)
if err != nil {
return err
}

txn, err := db.NewTxn(ctx, false)
if err != nil {
return err
}
defer txn.Discard(ctx)

b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := 0; j < opCount; j++ {
iterator, err := txn.Rootstore().GetIterator(query.Query{})
if err != nil {
return err
}
for k := 0; k < pointCount; k++ {
positionInInterval := getSampledIndex(len(keys), pointCount, k)
startKey := ds.NewKey(keys[positionInInterval])

result, err := iterator.IteratePrefix(ctx, startKey, startKey)
if err != nil {
return err
}
for {
_, hasNextItem := result.NextSync()
if !hasNextItem {
break
}
}
err = result.Close()
if err != nil {
return err
}
}
err = iterator.Close()
if err != nil {
return err
}
}
}
b.StopTimer()
txn.Discard(ctx)
return nil
}

func runStorageBenchPut(b *testing.B, ctx context.Context, valueSize, objCount, opCount int, doSync bool) error {
db, err := benchutils.NewTestStorage(ctx, b)
if err != nil {
Expand Down Expand Up @@ -155,5 +250,47 @@ func backfillBenchmarkStorageDB(ctx context.Context, db ds.Batching, objCount in
}
}

sort.Strings(keys)
return keys, batch.Commit(ctx)
}

func backfillBenchmarkTxn(ctx context.Context, db *db.DB, objCount int, valueSize int) ([]string, error) {
txn, err := db.NewTxn(ctx, false)
if err != nil {
return nil, err
}
defer txn.Discard(ctx)

keys := make([]string, objCount)
for i := 0; i < objCount; i++ {
keyBuf := make([]byte, 32)
value := make([]byte, valueSize)
if _, err := rand.Read(value); err != nil {
return nil, err
}
if _, err := rand.Read(keyBuf); err != nil {
return nil, err
}
key := ds.NewKey(string(keyBuf))
keys[i] = string(keyBuf)

if err := txn.Rootstore().Put(ctx, key, value); err != nil {
return nil, err
}
}

sort.Strings(keys)
return keys, txn.Commit(ctx)
}

func getSampledIndex(populationSize int, sampleSize int, i int) int {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whats the goal of this sample index func, instead of the original random selector? From what I can tell, its just splitting up the keyspace into sample size chunks, then randomly selecting from that chunk.

Copy link
Contributor Author

@AndrewSisley AndrewSisley Mar 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. Big difference is that this keeps the keys ordered (as well as uniformly distributed), which the iterator relies on. Is debatable whether the sort should be included in the iterator benches (and omitted from the Get) - I chose to leave it out and do it this way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot that the keys were generated in a random order - I have chosen to standardize this (and sort the keys), although the iterator actually handled it well (just a performance hit, I thought it would fail).

if sampleSize >= populationSize {
if i == 0 {
return 0
}
return (populationSize - 1) / i
}

pointsPerInterval := populationSize / sampleSize
return (i * pointsPerInterval) + rand.Intn(pointsPerInterval)
}
14 changes: 3 additions & 11 deletions datastores/badger/v3/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ func (iterator *BadgerIterator) next() {
}

func (iterator *BadgerIterator) IteratePrefix(ctx context.Context, startPrefix ds.Key, endPrefix ds.Key) (dsq.Results, error) {
formattedStartPrefix := asFormattedString(startPrefix)
formattedEndPrefix := asFormattedString(endPrefix)
formattedStartPrefix := startPrefix.String()
formattedEndPrefix := endPrefix.String()

iterator.resultsBuilder = dsq.NewResultBuilder(iterator.query)

Expand Down Expand Up @@ -118,18 +118,10 @@ func (iterator *BadgerIterator) IteratePrefix(ctx context.Context, startPrefix d
return iterator.resultsBuilder.Results(), nil
}

func asFormattedString(prefix ds.Key) string {
prefixString := prefix.String()
if prefixString != "/" {
prefixString = prefixString + "/"
}
return prefixString
}

type itemKeyValidator = func(key string, startPrefix string, endPrefix string) bool

func isValidAscending(key string, startPrefix string, endPrefix string) bool {
return key >= startPrefix && key < endPrefix
return key >= startPrefix && key <= endPrefix
AndrewSisley marked this conversation as resolved.
Show resolved Hide resolved
}

func isValidDescending(key string, startPrefix string, endPrefix string) bool {
Expand Down
3 changes: 3 additions & 0 deletions query/graphql/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,9 @@ func (p *Planner) queryDocs(ctx context.Context, query *parser.Query) ([]map[str
}

if !next {
if err2 := (plan.Close()); err2 != nil {
log.ErrorE(ctx, "Error closing plan node", err2)
}
return []map[string]interface{}{}, nil
}

Expand Down
20 changes: 15 additions & 5 deletions store/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,26 @@ type txn struct {
var _ core.Txn = (*txn)(nil)

func NewTxnFrom(ctx context.Context, rootstore ds.Batching, readonly bool) (core.Txn, error) {
var rootTxn ds.Txn
var err error
var isBatch bool
// check if our datastore natively supports iterable transaction, transactions or batching
if iterableTxnStore, ok := rootstore.(iterable.IterableTxnDatastore); ok {
rootTxn, err = iterableTxnStore.NewIterableTransaction(ctx, readonly)
rootTxn, err := iterableTxnStore.NewIterableTransaction(ctx, readonly)
if err != nil {
return nil, err
}
} else if txnStore, ok := rootstore.(ds.TxnDatastore); ok {
multistore := MultiStoreFrom(rootTxn)
return &txn{
rootTxn,
multistore,
false,
[]func(){},
[]func(){},
}, nil
}

var rootTxn ds.Txn
var err error
var isBatch bool
if txnStore, ok := rootstore.(ds.TxnDatastore); ok {
rootTxn, err = txnStore.NewTransaction(ctx, readonly)
if err != nil {
return nil, err
Expand Down