Skip to content

Commit

Permalink
storage/disk: react to "txn too big" errors (#3880)
Browse files Browse the repository at this point in the history
Fixes ##3879.

Signed-off-by: Gasc Florian <florian.gasc@gmail.com>
  • Loading branch information
floriangasc authored Oct 20, 2021
1 parent 68d3bc3 commit 528836a
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 7 deletions.
47 changes: 40 additions & 7 deletions storage/disk/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,25 +167,58 @@ type update struct {
delete bool
}

func (txn *transaction) Write(_ context.Context, op storage.PatchOp, path storage.Path, value interface{}) error {
// errTxnTooBigErrorHandler checks if the passed error was caused by a transaction
// that was _too big_. If so, it attempts to commit the transaction and opens a new one.
// See https://dgraph.io/docs/badger/get-started/#read-write-transactions
func errTxnTooBigErrorHandler(txn *transaction, err error) error {
errSetCommit := txn.underlying.Commit()
if errSetCommit != nil {
return wrapError(errSetCommit)
}
txn.underlying = txn.db.db.NewTransaction(true)
return nil
}

// !!!  infinite recursion only if infinite txn too big error occurred
func deleteWithErrTxnTooBigErrorHandling(txn *transaction, u *update) error {
err := txn.underlying.Delete(u.key)
if err == badger.ErrTxnTooBig {
if txnErr := errTxnTooBigErrorHandler(txn, err); txnErr != nil {
return txnErr
}
return deleteWithErrTxnTooBigErrorHandling(txn, u)
}
return wrapError(err)
}

// !!! infinite recursion only if infinite txn too big error occurred
func setWithErrTxnTooBigErrorHandling(txn *transaction, u *update) error {
err := txn.underlying.Set(u.key, u.value)
if err == badger.ErrTxnTooBig {
if txnErr := errTxnTooBigErrorHandler(txn, err); txnErr != nil {
return txnErr
}
return setWithErrTxnTooBigErrorHandling(txn, u)
}
return wrapError(err)
}

func (txn *transaction) Write(_ context.Context, op storage.PatchOp, path storage.Path, value interface{}) error {
updates, err := txn.partitionWrite(op, path, value)
if err != nil {
return err
}

for _, u := range updates {
if u.delete {
if err := txn.underlying.Delete(u.key); err != nil {
return wrapError(err)
if err := deleteWithErrTxnTooBigErrorHandling(txn, &u); err != nil {
return err
}
} else {
if err := txn.underlying.Set(u.key, u.value); err != nil {
return wrapError(err)
if err := setWithErrTxnTooBigErrorHandling(txn, &u); err != nil {
return err
}
}
}

return nil
}

Expand Down
119 changes: 119 additions & 0 deletions storage/disk/txn_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright 2021 The OPA Authors. All rights reserved.
// Use of this source code is governed by an Apache2
// license that can be found in the LICENSE file.

package disk

import (
"context"
"math/rand"
"strconv"
"strings"
"testing"

"github.com/open-policy-agent/opa/storage"
"github.com/open-policy-agent/opa/util"
"github.com/open-policy-agent/opa/util/test"
)

func randomString(n int) string {
var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")

s := make([]rune, n)
for i := range s {
s[i] = letters[rand.Intn(len(letters))]
}
return string(s)
}

func fixture(nbKeys int) interface{} {
i := 1
var keyValues = []string{}
for i <= nbKeys {
keyValues = append(keyValues, "\""+strconv.Itoa(i)+randomString(4)+"\": \""+randomString(3)+"\"")
i++
}
jsonBytes := []byte(`{"foo":{` + strings.Join(keyValues, ",") + `}}`)
return util.MustUnmarshalJSON(jsonBytes)
}

func TestSetTxnIsTooBigToFitIntoOneRequestWhenUseDiskStore(t *testing.T) {
test.WithTempFS(map[string]string{}, func(dir string) {
ctx := context.Background()
s, err := New(ctx, Options{Dir: dir, Partitions: []storage.Path{
storage.MustParsePath("/foo"),
}})
if err != nil {
t.Fatal(err)
}

nbKeys := 140_000 // !!! 135_000 it's ok, but 140_000 not
jsonFixture := fixture(nbKeys)
errTxn := storage.Txn(ctx, s, storage.WriteParams, func(txn storage.Transaction) error {

errTxnWrite := s.Write(ctx, txn, storage.AddOp, storage.MustParsePath("/"), jsonFixture)
if errTxnWrite != nil {
t.Fatal(errTxnWrite)
}
return nil
})
if errTxn != nil {
t.Fatal(errTxn)
}

result, errRead := storage.ReadOne(ctx, s, storage.MustParsePath("/foo"))
if errRead != nil {
t.Fatal(errRead)
}
actualNbKeys := len(result.(map[string]interface{}))
if nbKeys != actualNbKeys {
t.Fatalf("Expected %d keys, read %d", nbKeys, actualNbKeys)
}
})

}

func TestDeleteTxnIsTooBigToFitIntoOneRequestWhenUseDiskStore(t *testing.T) {
test.WithTempFS(map[string]string{}, func(dir string) {
ctx := context.Background()
s, err := New(ctx, Options{Dir: dir, Partitions: []storage.Path{
storage.MustParsePath("/foo"),
}})
if err != nil {
t.Fatal(err)
}
nbKeys := 200_000
jsonFixture := fixture(nbKeys)
errTxn := storage.Txn(ctx, s, storage.WriteParams, func(txn storage.Transaction) error {

errTxnWrite := s.Write(ctx, txn, storage.AddOp, storage.MustParsePath("/"), jsonFixture)
if errTxnWrite != nil {
t.Fatal(errTxnWrite)
}
return nil
})
if errTxn != nil {
t.Fatal(errTxn)
}

errTxnD := storage.Txn(ctx, s, storage.WriteParams, func(txn storage.Transaction) error {
errTxnWrite := s.Write(ctx, txn, storage.RemoveOp, storage.MustParsePath("/foo"), jsonFixture)
if errTxnWrite != nil {
t.Fatal(errTxnWrite)
}
return nil
})
if errTxnD != nil {
t.Fatal(errTxnD)
}

results, errRead := storage.ReadOne(ctx, s, storage.MustParsePath("/foo"))
if !storage.IsNotFound(errRead) {
t.Fatal(errRead)
}
if results != nil {
t.Fatalf("Unexpected results %v", results)
}
})

}

0 comments on commit 528836a

Please sign in to comment.