Skip to content

Commit

Permalink
test: procedures in stress tool
Browse files Browse the repository at this point in the history
Combine act+proc setup and running.

Add a "no error" flag `ne` so the actions and procedures that
intentionally generate errors are not created.
  • Loading branch information
jchappelow committed Jun 21, 2024
1 parent 79a1e9d commit 6502e30
Show file tree
Hide file tree
Showing 15 changed files with 991 additions and 232 deletions.
2 changes: 1 addition & 1 deletion core/types/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

// Client defines methods are used to talk to a Kwil provider.
type Client interface {
// DEPRECATED: Use Call instead.
// CallAction. Deprecated: Use Call instead.
CallAction(ctx context.Context, dbid string, action string, inputs []any) (*Records, error)
Call(ctx context.Context, dbid string, procedure string, inputs []any) (*Records, error)
ChainID() string
Expand Down
6 changes: 5 additions & 1 deletion core/utils/json/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
// UnmarshalMapWithoutFloat unmarshals a JSON byte slice into a slice of maps.
// It will try to convert all return values into ints, but will keep them as strings if it fails.
// It ensures they aren't returned as floats, which is important for maintaining consistency
// with Kwil's decimal types. All returned types will be string or int64.
// with Kwil's decimal types. All returned types will be string, int64, or a []any.
func UnmarshalMapWithoutFloat(b []byte) ([]map[string]any, error) {
d := json.NewDecoder(strings.NewReader(string(b)))
d.UseNumber()
Expand Down Expand Up @@ -59,6 +59,10 @@ func convertJsonNumbers(val any) any {
return val.String()
}
return i
case string:
return val
case int64:
return val
default:
// in case we are unmarshalling something crazy like a double nested slice,
// we reflect on the value and recursively call convertJsonNumbers if it's a slice.
Expand Down
2 changes: 1 addition & 1 deletion extensions/precompiles/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type DeploymentContext struct {
Schema *types.Schema
}

// ProcedureContext is the context for a procedure execution.
// ProcedureContext is the context for a procedure and action execution.
type ProcedureContext struct {
// Ctx is the context of the current execution.
Ctx context.Context
Expand Down
10 changes: 5 additions & 5 deletions internal/engine/execution/procedure.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func prepareActions(schema *types.Schema) ([]*preparedAction, error) {
if action.IsOwnerOnly() {
instructions = append(instructions, instructionFunc(func(scope *precompiles.ProcedureContext, global *GlobalContext, db sql.DB) error {
if !bytes.Equal(scope.Signer, owner) {
return fmt.Errorf("cannot call owner procedure, not owner")
return fmt.Errorf("cannot call owner action, not owner")
}

return nil
Expand Down Expand Up @@ -141,7 +141,7 @@ func prepareActions(schema *types.Schema) ([]*preparedAction, error) {
}
}
if calledAction == nil {
return nil, fmt.Errorf(`procedure "%s" not found`, stmt.Action)
return nil, fmt.Errorf(`action "%s" not found`, stmt.Action)
}

// we leave the namespace and receivers empty, since action calls can only
Expand Down Expand Up @@ -169,7 +169,7 @@ func prepareActions(schema *types.Schema) ([]*preparedAction, error) {
// Call executes an action.
func (p *preparedAction) call(scope *precompiles.ProcedureContext, global *GlobalContext, db sql.DB, inputs []any) error {
if len(inputs) != len(p.parameters) {
return fmt.Errorf(`%w: procedure "%s" requires %d arguments, but %d were provided`, ErrIncorrectNumberOfArguments, p.name, len(p.parameters), len(inputs))
return fmt.Errorf(`%w: action "%s" requires %d arguments, but %d were provided`, ErrIncorrectNumberOfArguments, p.name, len(p.parameters), len(inputs))
}

for i, param := range p.parameters {
Expand Down Expand Up @@ -262,7 +262,7 @@ func (e *callMethod) execute(scope *precompiles.ProcedureContext, global *Global
if e.Namespace == "" {
procedure, ok := dataset.actions[e.Method]
if !ok {
return fmt.Errorf(`procedure "%s" not found`, e.Method)
return fmt.Errorf(`action "%s" not found`, e.Method)
}

err = procedure.call(newScope, global, db, inputs)
Expand All @@ -286,7 +286,7 @@ func (e *callMethod) execute(scope *precompiles.ProcedureContext, global *Global
scope.Result = newScope.Result

if len(e.Receivers) > len(results) {
return fmt.Errorf(`%w: procedure "%s" returned %d values, but only %d receivers were specified`, ErrIncorrectNumberOfArguments, e.Method, len(results), len(e.Receivers))
return fmt.Errorf(`%w: action "%s" returned %d values, but only %d receivers were specified`, ErrIncorrectNumberOfArguments, e.Method, len(results), len(e.Receivers))
}

// Make the result available to either subsequent instructions or as the FinalResult.
Expand Down
140 changes: 22 additions & 118 deletions test/stress/db.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package main

import (
"bytes"
"context"
"errors"
"fmt"
"math/rand"
"time"

"github.com/kwilteam/kwil-db/core/types"
clientType "github.com/kwilteam/kwil-db/core/types/client"
"github.com/kwilteam/kwil-db/core/types/transactions"
"github.com/kwilteam/kwil-db/core/utils"
Expand All @@ -19,11 +18,12 @@ import (
// testScheme.

type asyncResp struct {
err error
res *transactions.TransactionResult
err error
res *transactions.TransactionResult
expectFail bool
}

func (ar *asyncResp) Error() error {
func (ar *asyncResp) error() error {
if ar.err != nil {
return ar.err
}
Expand All @@ -33,6 +33,19 @@ func (ar *asyncResp) Error() error {
}
return nil
}
func (ar *asyncResp) Error() error {
err := ar.error()
if err != nil {
if ar.expectFail {
err = errors.Join(err, ErrExpected)
}
return err
}
if ar.expectFail {
return errors.New("UNEXPECTEDLY succeeded when it should have failed")
}
return nil
}

func (h *harness) dropDB(ctx context.Context, dbid string) error {
var txHash transactions.TxHash
Expand All @@ -57,15 +70,11 @@ func (h *harness) dropDB(ctx context.Context, dbid string) error {
return nil
}

func (h *harness) deployDBAsync(ctx context.Context) (string, <-chan asyncResp, error) {
schema, err := loadTestSchema()
if err != nil {
return "", nil, err
}
func (h *harness) deployDBAsync(ctx context.Context, schema *types.Schema) (string, <-chan asyncResp, error) {
schema.Name = random.String(12)

var txHash transactions.TxHash
err = h.underNonceLock(ctx, func(nonce int64) error {
err := h.underNonceLock(ctx, func(nonce int64) error {
var err error
txHash, err = h.DeployDatabase(ctx, schema, clientType.WithNonce(nonce))
return err
Expand All @@ -78,7 +87,6 @@ func (h *harness) deployDBAsync(ctx context.Context) (string, <-chan asyncResp,
// fmt.Println("deployDBAsync", dbid)
promise := make(chan asyncResp, 1)
go func() {
// time.Sleep(500 * time.Millisecond) // lame, see executeAction notes
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
resp, err := h.WaitTx(ctx, txHash, txPollInterval)
Expand All @@ -95,8 +103,8 @@ func (h *harness) deployDBAsync(ctx context.Context) (string, <-chan asyncResp,
return dbid, promise, nil
}

func (h *harness) deployDB(ctx context.Context) (string, error) {
dbid, promise, err := h.deployDBAsync(ctx)
func (h *harness) deployDB(ctx context.Context, schema *types.Schema) (string, error) {
dbid, promise, err := h.deployDBAsync(ctx, schema)
if err != nil {
return "", err
}
Expand All @@ -110,107 +118,3 @@ func (h *harness) deployDB(ctx context.Context) (string, error) {
}
return dbid, nil
}

func (h *harness) getOrCreateUser(ctx context.Context, dbid string) (int, string, error) {
const (
meUser = "me"
molAge int = 42
)
// fmt.Println("dbid", dbid)

recs, err := h.CallAction(ctx, dbid, actListUsers, nil)
if err != nil {
return 0, "", fmt.Errorf("%s: %w", actListUsers, err)
}
h.printRecs(ctx, recs)
recs.Reset()

var userID int
var userName string
for recs.Next() {
rec := recs.Record()
uid, user, wallet := rec["id"].(int), rec["username"].(string), rec["wallet"].([]byte)
if bytes.Equal(wallet, h.acctID) {
userName = user
userID = uid
break
}
}
if userName == "" {
userName, userID = meUser, int(random.New().Int63())
err := h.executeAction(ctx, dbid, actCreateUser, [][]any{{userID, meUser, molAge}})
if err != nil {
return 0, "", fmt.Errorf("%s: %w", actCreateUser, err)
}
h.logger.Info(fmt.Sprintf("Added me to users table: %d / %v", userID, userName))
} else {
h.logger.Info(fmt.Sprintf("Found me in list_users: %d / %v", userID, userName))
}

return userID, userName, nil
}

func (h *harness) nextPostID(ctx context.Context, dbid string, userID int) (int, error) {
recs, err := h.CallAction(ctx, dbid, actGetUserPosts, []any{userID}) // tuples for the Schema.Actions[i].Inputs
if err != nil {
return 0, fmt.Errorf("get_user_posts_by_userid: %w", err)
}
h.printRecs(ctx, recs)
var nextPostID int
for recs.Next() {
rec := recs.Record()
if postID := rec["id"].(int); postID >= nextPostID {
nextPostID = postID + 1
}
}
return nextPostID, nil
}

// createPost is the synchronous version of createPostAsync. It's unused
// presently, but this whole thing is a playground, so it remains for now.
/* xxx
func (h *harness) createPost(ctx context.Context, dbid string, postID int, title, content string) error {
err := h.executeAction(ctx, dbid, actCreatePost, [][]any{{postID, title, content}})
if err == nil {
h.printf("Created post: %d / %v (len %d)", postID, title, len(content))
}
return err
}
*/

func (h *harness) createPostAsync(ctx context.Context, dbid string, postID int, title, content string) (<-chan asyncResp, error) {
args := [][]any{{postID, title, content}}
// Randomly fail execution. TODO: make frequency flag, like execFailRate,
// but we really don't need it to succeed, only be mined. The failures
// ensure expected nonce and balance updates regardless.
if rand.Intn(6) == 0 {
if rand.Intn(2) == 0 {
// kwild.abci: "msg":"failed to execute transaction","error":"ERROR: invalid input syntax for type bigint: \"not integer\" (SQLSTATE 22P02)"
args = [][]any{{"not integer", title, content}} // id not integer (SQL exec error)
} else {
// kwild.abci: "msg":"failed to execute transaction","error":"incorrect number of arguments: procedure \"create_post\" requires 3 arguments, but 2 were provided"
args = [][]any{{postID, title}} // too few args (engine procedure call error)
}
}
txHash, err := h.executeActionAsync(ctx, dbid, actCreatePost, args)
if err != nil {
return nil, err
}

promise := make(chan asyncResp, 1)
go func() {
// time.Sleep(500 * time.Millisecond) // lame, see executeAction notes
t0 := time.Now()
ctx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()
resp, err := h.WaitTx(ctx, txHash, txPollInterval)
if err != nil {
err = errors.Join(err, h.recoverNonce(ctx))
promise <- asyncResp{err: err}
return
}
h.printf("Created post: %d / %v (len %d), waited %v", postID, title, len(content), time.Since(t0))
promise <- asyncResp{res: &resp.TxResult}
}()
return promise, nil
}
Loading

0 comments on commit 6502e30

Please sign in to comment.