Skip to content

Commit

Permalink
feat: same-block execution (#418)
Browse files Browse the repository at this point in the history
* refactor: decouple dash parameters from a MakeBlock function to a Block.SetDashParams method

* feat: add node's pro-rtx-hash into a context

* feat: add proTxHash into context for consensus/reactor_test.go

* feat: some modifications for PoC same-block execution

* feat: change an updated struct from round state on uncommitted-state

* feat: support next-validators as for next-block execution mode

* feat: support next-validators for same-block execution mode

* feat: some modifications for next-core-lock

* chore: same block execution state/state_test.go (WIP)

* test(state): fix TestManyValidatorChangesSaveLoad()

* fix(state): panic on empty consensus params load

* chore(state): remove state.NextValidators

* chore(proto/abci): same block execution protobuf changes

* refactor(types): centralize generation of ResultsHash

* refactor(state): consolidate redundant code in block execution

* chore(state) execution fix for new data structures

* test(state): adapt TestProposerPriorityProposerAlternates for same block exec

* test(state): same block exec TestFourAddFourMinusOneGenesisValidators

* test(state): same block exec TestStoreLoadValidatorsIncrementsProposerPriority

* test(state): same block exec for TestManyValidatorChangesSaveLoad

* fix(state): same block exec TestConsensusParamsChangesSaveLoad

* test(state): fix TestProcessProposal

* chore(state): correct handling of ApplyBlock etc

* test(mempool): builds but fails

* refactor(state): UncommittedState now holds final values instead of updates

This was needed for clean implementation of NextValidatorsHash

* refactor(state): rename UncommittedChanges to Changeset

* chore: same block exec - block

* chore(state): state ID - start of work

* chore: SBE - block fields (WIP)

* chore: SBE chainlocks and proposed app version (WIP)

* test(state): SBE green (execution|state|validation)_test

* chore(abci): improve validation of prepare/process proposal responses

* chore(state): apphash defaults to array containing zeros

* test(state): TestStoreLoadValidators fixed for SBE

* chore(state): AppHash defaults to slice of 0s instead of nil

* chore(state): fix invalid state.LastStateID

* test(state): SBE TestPruneStates

* test(state): fix rollback_test

* fix(abci): correct handling of AppHash in Application

* test(store): update for SBE

* chore(types): rename stateID.LastAppHash to AppHash

* refactor(statesync): state provider returns AppHash as HexBytes

* test(statesync): fix for SBE

* test(consensus): show test logs on console

* test(consensus): fix byzantine test

* test(blocksync): update for SBE

* fix(state): events fired twice

* fix(consensus): proposal contains invalid chainlock height

* test(consensus): correct chainlock in initial block in SBE

* Update abci/example/kvstore/kvstore.go

Co-authored-by: lklimek <842586+lklimek@users.noreply.github.com>

* Update abci/example/kvstore/kvstore.go

Co-authored-by: lklimek <842586+lklimek@users.noreply.github.com>

* fix(state): double commits during replay

* test(consensus): fix replay handshake* tests (WIP)

* fix: evidence package

* fix: validator_conn_executor_test.go

* refactor: executing kvstore

* feat!(kvstore): same-block-execution KV store (#447)

* feat!(kvstore): same-block-execution KV store

* fix(kvstore): dstIter not closed correctly

* fix(kvstore): AppHash at genesis should be zero

* fix(kvstore): SBE support for persistent KV store (#452)

* test(proxy): fix test of Info()

* fix: some fixes and refactoring for stabilizing same-block execution (#451)

* fix: some fixes and refactoring for stabilizing same-block execution

* refactor: merge kvstore.App with kvstore.Application and remove kvstore_v2.go

* fix: kvstore_test.go

* fix: TestValUpdates

* fix: TestValUpdates

* fix: TestKVStore

* fix: rpc/client tests

* fix: make stable some tests affected by kvstore refactoring

* fix: update uncommitted state if a validator out of quorum set and a node has proposal block for the height

* Update internal/consensus/replay_test.go

Co-authored-by: lklimek <842586+lklimek@users.noreply.github.com>

* refactor: changes according to PR feedback

* fix: blocking wal_test.go because of default context

* fix: code style issues

* fix: abci-cli test

* fix: typo in struct name CurentRoundState => CurrentRoundState

* fix: a few data race issues

* fix: TestHandshakeReplayNone

* fix: ValidatorUpdate.MarshalJSON

* fix: ValidatorUpdate.UnmarshalJSON

* chore(e2e): prototype e2e test app for same-block execution

* fix(consensus): state is nil on genesis

* fix(state): invalid stateID height on genesis

* chore: improve logging

* Update abci/tests/server/client.go

Co-authored-by: lklimek <842586+lklimek@users.noreply.github.com>

* Update abci/tests/server/client.go

Co-authored-by: lklimek <842586+lklimek@users.noreply.github.com>

* Update internal/proxy/client_test.go

Co-authored-by: lklimek <842586+lklimek@users.noreply.github.com>

* refactor: changes according to PR feedback

Co-authored-by: lklimek <842586+lklimek@users.noreply.github.com>

* fix: stabilize codebase after merge / resolve conflicts

Co-authored-by: Lukasz Klimek <842586+lklimek@users.noreply.github.com>
  • Loading branch information
shotonoff and lklimek authored Sep 9, 2022
1 parent 5a2320f commit 4628714
Show file tree
Hide file tree
Showing 124 changed files with 5,414 additions and 4,193 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ proto-check-breaking: check-proto-deps
###############################################################################

build_abci:
@go build -mod=readonly -i ./abci/cmd/...
@go build -mod=readonly ./abci/cmd/...
.PHONY: build_abci

install_abci:
Expand Down
131 changes: 85 additions & 46 deletions abci/cmd/abci-cli/abci-cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"os"
"os/signal"
"strconv"
"strings"
"syscall"

Expand Down Expand Up @@ -127,6 +128,7 @@ func addCommands(cmd *cobra.Command, logger log.Logger) {
cmd.AddCommand(consoleCmd)
cmd.AddCommand(echoCmd)
cmd.AddCommand(infoCmd)
cmd.AddCommand(prepareBlockCmd)
cmd.AddCommand(finalizeBlockCmd)
cmd.AddCommand(checkTxCmd)
cmd.AddCommand(commitCmd)
Expand Down Expand Up @@ -193,6 +195,14 @@ var infoCmd = &cobra.Command{
RunE: cmdInfo,
}

var prepareBlockCmd = &cobra.Command{
Use: "prepare_proposal",
Short: "deliver a block of transactions to the application and get app-hash of a new state",
Long: "deliver a block of transactions to the application and get app-hash of a new state",
Args: cobra.MinimumNArgs(2),
RunE: cmdPrepareProposal,
}

var finalizeBlockCmd = &cobra.Command{
Use: "finalize_block",
Short: "deliver a block of transactions to the application",
Expand Down Expand Up @@ -322,23 +332,31 @@ func cmdTest(cmd *cobra.Command, args []string) error {
func() error { return servertest.InitChain(ctx, client) },
func() error { return servertest.Commit(ctx, client) },
func() error {
return servertest.FinalizeBlock(ctx, client, [][]byte{
return servertest.ProcessProposal(ctx, client, types.ResponseProcessProposal_ACCEPT, [][]byte{
[]byte("abc"),
}, []uint32{
code.CodeTypeBadNonce,
}, nil, nil)
},
func() error {
return servertest.FinalizeBlock(ctx, client, [][]byte{[]byte("abc")})
},
func() error { return servertest.Commit(ctx, client) },
func() error {
return servertest.FinalizeBlock(ctx, client, [][]byte{
return servertest.ProcessProposal(ctx, client, types.ResponseProcessProposal_ACCEPT, [][]byte{
{0x00},
}, []uint32{
code.CodeTypeOK,
}, nil, []byte{0, 0, 0, 0, 0, 0, 0, 1})
},
func() error { return servertest.Commit(ctx, client) },
func() error {
return servertest.FinalizeBlock(ctx, client, [][]byte{
return servertest.FinalizeBlock(ctx, client, [][]byte{{0x00}})
},
func() error {
return servertest.Commit(ctx, client)
},
func() error {
return servertest.ProcessProposal(ctx, client, types.ResponseProcessProposal_ACCEPT, [][]byte{
{0x00},
{0x01},
{0x00, 0x02},
Expand Down Expand Up @@ -366,10 +384,14 @@ func cmdTest(cmd *cobra.Command, args []string) error {
}, nil)
},
func() error {
return servertest.ProcessProposal(ctx, client, [][]byte{
return servertest.ProcessProposal(ctx, client, types.ResponseProcessProposal_ACCEPT, [][]byte{
{0x01},
}, types.ResponseProcessProposal_ACCEPT)
}, []uint32{}, nil, nil)
},
func() error {
return servertest.FinalizeBlock(ctx, client, nil)
},
func() error { return servertest.Commit(ctx, client) },
})
}

Expand Down Expand Up @@ -462,6 +484,10 @@ func muxOnCommands(cmd *cobra.Command, pArgs []string) error {
return cmdCheckTx(cmd, actualArgs)
case "commit":
return cmdCommit(cmd, actualArgs)
case "prepare_proposal":
return cmdPrepareProposal(cmd, actualArgs)
case "process_proposal":
return cmdProcessProposal(cmd, actualArgs)
case "finalize_block":
return cmdFinalizeBlock(cmd, actualArgs)
case "echo":
Expand All @@ -470,10 +496,6 @@ func muxOnCommands(cmd *cobra.Command, pArgs []string) error {
return cmdInfo(cmd, actualArgs)
case "query":
return cmdQuery(cmd, actualArgs)
case "prepare_proposal":
return cmdPrepareProposal(cmd, actualArgs)
case "process_proposal":
return cmdProcessProposal(cmd, actualArgs)
default:
return cmdUnimplemented(cmd, pArgs)
}
Expand Down Expand Up @@ -537,31 +559,36 @@ const codeBad uint32 = 10

// Append new txs to application
func cmdFinalizeBlock(cmd *cobra.Command, args []string) error {
txs := make([][]byte, len(args))
for i, arg := range args {
if len(args) == 0 {
printResponse(cmd, args, response{
Code: codeBad,
Log: "Must provide at least one transaction",
})
return nil
}
height, err := strconv.Atoi(args[0])
if err != nil {
return err
}
appHash, err := hex.DecodeString(args[1])
if err != nil {
return err
}
txs := make([][]byte, len(args)-2)
for i, arg := range args[2:] {
txBytes, err := stringOrHexToBytes(arg)
if err != nil {
return err
}
txs[i] = txBytes
}
res, err := client.FinalizeBlock(cmd.Context(), &types.RequestFinalizeBlock{Txs: txs})
res, err := client.FinalizeBlock(cmd.Context(), &types.RequestFinalizeBlock{Height: int64(height), Txs: txs, AppHash: appHash})
if err != nil {
return err
}
resps := make([]response, 0, len(res.TxResults)+1)
for _, tx := range res.TxResults {
resps = append(resps, response{
Code: tx.Code,
Data: tx.Data,
Info: tx.Info,
Log: tx.Log,
})
}
resps = append(resps, response{
Data: res.AppHash,
printResponse(cmd, args, response{
Data: []byte(res.String()),
})
printResponse(cmd, args, resps...)
return nil
}

Expand Down Expand Up @@ -648,25 +675,23 @@ func inTxArray(txByteArray [][]byte, tx []byte) bool {
}

func cmdPrepareProposal(cmd *cobra.Command, args []string) error {
txsBytesArray := make([][]byte, len(args))

for i, arg := range args {
txBytes, err := stringOrHexToBytes(arg)
if err != nil {
return err
}
txsBytesArray[i] = txBytes
height, txsBytesArray, err := processProposalArgs(args)
if err != nil {
panic(err)
}

res, err := client.PrepareProposal(cmd.Context(), &types.RequestPrepareProposal{
Txs: txsBytesArray,
Height: height,
Txs: txsBytesArray,
// kvstore has to have this parameter in order not to reject a tx as the default value is 0
MaxTxBytes: 65536,
})
if err != nil {
return err
}
resps := make([]response, 0, len(res.TxResults)+1)
resps = append(resps, response{
Data: []byte(fmt.Sprintf(`{"appHash":"%X"}`, res.AppHash)),
})
for _, tx := range res.TxRecords {
existingTx := inTxArray(txsBytesArray, tx.Tx)
if tx.Action == types.TxRecord_UNKNOWN ||
Expand All @@ -689,24 +714,19 @@ func cmdPrepareProposal(cmd *cobra.Command, args []string) error {
}

func cmdProcessProposal(cmd *cobra.Command, args []string) error {
txsBytesArray := make([][]byte, len(args))

for i, arg := range args {
txBytes, err := stringOrHexToBytes(arg)
if err != nil {
return err
}
txsBytesArray[i] = txBytes
height, txsBytesArray, err := processProposalArgs(args)
if err != nil {
panic(err)
}

res, err := client.ProcessProposal(cmd.Context(), &types.RequestProcessProposal{
Txs: txsBytesArray,
Height: height,
Txs: txsBytesArray,
})
if err != nil {
return err
}

printResponse(cmd, args, response{
Data: []byte(fmt.Sprintf(`{"appHash":"%X"}`, res.AppHash)),
Status: int32(res.Status),
})
return nil
Expand Down Expand Up @@ -808,3 +828,22 @@ func stringOrHexToBytes(s string) ([]byte, error) {

return []byte(s[1 : len(s)-1]), nil
}

func processProposalArgs(args []string) (int64, [][]byte, error) {
if len(args) == 0 {
return 0, nil, errors.New("must provide a block height and at least one transaction")
}
height, err := strconv.Atoi(args[0])
if err != nil {
return 0, nil, err
}
txsBytesArray := make([][]byte, len(args)-1)
for i, arg := range args[1:] {
txBytes, err := stringOrHexToBytes(arg)
if err != nil {
return 0, nil, err
}
txsBytesArray[i] = txBytes
}
return int64(height), txsBytesArray, nil
}
72 changes: 61 additions & 11 deletions abci/example/counter/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"encoding/binary"
"fmt"

tmtypes "github.com/tendermint/tendermint/types"

"github.com/tendermint/tendermint/abci/example/code"
"github.com/tendermint/tendermint/abci/types"
tmcrypto "github.com/tendermint/tendermint/crypto"
tmtypes "github.com/tendermint/tendermint/types"
)

type Application struct {
Expand All @@ -20,12 +20,23 @@ type Application struct {
HasCoreChainLocks bool
CurrentCoreChainLockHeight uint32
CoreChainLockStep int32

lastHeight int64
lastCoreChainLock tmtypes.CoreChainLock
lastTxResults []*types.ExecTxResult
}

func NewApplication(serial bool) *Application {
return &Application{serial: serial, CoreChainLockStep: 1}
}

func (app *Application) InitCoreChainLock(initCoreChainHeight uint32, step int32) {
app.CoreChainLockStep = step
app.HasCoreChainLocks = true
app.CurrentCoreChainLockHeight = initCoreChainHeight
app.lastCoreChainLock = tmtypes.NewMockChainLock(app.CurrentCoreChainLockHeight)
}

func (app *Application) Info(_ context.Context, _ *types.RequestInfo) (*types.ResponseInfo, error) {
return &types.ResponseInfo{Data: fmt.Sprintf("{\"hashes\":%v,\"txs\":%v}", app.hashCount, app.txCount)}, nil
}
Expand Down Expand Up @@ -72,12 +83,48 @@ func (app *Application) Query(_ context.Context, reqQuery *types.RequestQuery) (
}
}

func (app *Application) PrepareProposal(_ context.Context, req *types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) {
app.handleRequest(req.Height, req.Txs)
resp := types.ResponsePrepareProposal{
AppHash: make([]byte, tmcrypto.DefaultAppHashSize),
CoreChainLockUpdate: app.lastCoreChainLock.ToProto(),
TxResults: app.lastTxResults,
}
return &resp, nil
}

func (app *Application) ProcessProposal(_ context.Context, req *types.RequestProcessProposal) (*types.ResponseProcessProposal, error) {
app.handleRequest(req.Height, req.Txs)
resp := types.ResponseProcessProposal{
AppHash: make([]byte, tmcrypto.DefaultAppHashSize),
Status: types.ResponseProcessProposal_ACCEPT,
CoreChainLockUpdate: app.lastCoreChainLock.ToProto(),
TxResults: app.lastTxResults,
}
return &resp, nil
}

func (app *Application) FinalizeBlock(_ context.Context, req *types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) {
var resp types.ResponseFinalizeBlock
for _, tx := range req.Txs {
app.handleRequest(req.Height, req.Txs)
resp := types.ResponseFinalizeBlock{}
app.updateCoreChainLock()
return &resp, nil
}

func (app *Application) handleRequest(height int64, txs [][]byte) {
if app.lastHeight == height {
return
}
app.lastHeight = height
app.lastTxResults = app.handleTxs(txs)
}

func (app *Application) handleTxs(txs [][]byte) []*types.ExecTxResult {
var txResults []*types.ExecTxResult
for _, tx := range txs {
if app.serial {
if len(tx) > 8 {
resp.TxResults = append(resp.TxResults, &types.ExecTxResult{
txResults = append(txResults, &types.ExecTxResult{
Code: code.CodeTypeEncodingError,
Log: fmt.Sprintf("Max tx size is 8 bytes, got %d", len(tx)),
})
Expand All @@ -86,18 +133,21 @@ func (app *Application) FinalizeBlock(_ context.Context, req *types.RequestFinal
copy(tx8[len(tx8)-len(tx):], tx)
txValue := binary.BigEndian.Uint64(tx8)
if txValue != uint64(app.txCount) {
resp.TxResults = append(resp.TxResults, &types.ExecTxResult{
txResults = append(txResults, &types.ExecTxResult{
Code: code.CodeTypeBadNonce,
Log: fmt.Sprintf("Invalid nonce. Expected %v, got %v", app.txCount, txValue),
})
}
}
app.txCount++
}
if app.HasCoreChainLocks {
app.CurrentCoreChainLockHeight = app.CurrentCoreChainLockHeight + uint32(app.CoreChainLockStep)
coreChainLock := tmtypes.NewMockChainLock(app.CurrentCoreChainLockHeight)
resp.NextCoreChainLockUpdate = coreChainLock.ToProto()
return txResults
}

func (app *Application) updateCoreChainLock() {
if !app.HasCoreChainLocks {
return
}
return &resp, nil
app.CurrentCoreChainLockHeight += uint32(app.CoreChainLockStep)
app.lastCoreChainLock = tmtypes.NewMockChainLock(app.CurrentCoreChainLockHeight)
}
Loading

0 comments on commit 4628714

Please sign in to comment.