Skip to content

Commit

Permalink
[seiv2] Merge main (#217)
Browse files Browse the repository at this point in the history
* reformat logs to use simple concatenation with separators (#207)

* Use write-lock in (*TxPriorityQueue).ReapMax funcs (#209)

ReapMaxBytesMaxGas and ReapMaxTxs funcs in TxPriorityQueue claim
> Transactions returned are not removed from the mempool transaction
> store or indexes.

However, they use a priority queue to accomplish the claim
> Transaction are retrieved in priority order.

This is accomplished by popping all items out of the whole heap, and
then pushing then back in sequentially. A copy of the heap cannot be
obtained otherwise. Both of the mentioned functions use a read-lock
(RLock) when doing this. This results in a potential scenario where
multiple executions of the ReapMax can be started in parallel, and
both would be popping items out of the priority queue.

In practice, this can be abused by executing the `unconfirmed_txs` RPC
call repeatedly. Based on our observations, running it multiple times
per millisecond results in multiple threads picking it up at the same
time. Such a scenario can be obtained via the WebSocket interface, and
spamming `unconfirmed_txs` calls there. The behavior that happens is a
`Panic in WSJSONRPC handler` when a queue item unexpectedly disappears
for `mempool.(*TxPriorityQueue).Swap`.
(`runtime error: index out of range [0] with length 0`)

This can additionally lead to a `CONSENSUS FAILURE!!!` if the race
condition occurs for `internal/consensus.(*State).finalizeCommit`
when it tries to do `mempool.(*TxPriorityQueue).RemoveTx`, but
the ReapMax has already removed all elements from the underlying
heap. (`runtime error: index out of range [-1]`)

This commit switches the lock type to a write-lock (Lock) to ensure
no parallel modifications take place. This commit additionally updates
the tests to allow parallel execution of the func calls in testing,
as to prevent regressions (in case someone wants to downgrade the locks
without considering the implications from the underlying heap usage).

* Fix root dir for tendermint reindex command (#210)

* Replay events during restart to avoid tx missing (#211)

---------

Co-authored-by: Denys S <150304777+dssei@users.noreply.github.com>
Co-authored-by: Valters Jansons <sigv@users.noreply.github.com>
Co-authored-by: Yiming Zang <50607998+yzang2019@users.noreply.github.com>
  • Loading branch information
4 people authored Mar 27, 2024
1 parent fbc1bc0 commit 400fe74
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 29 deletions.
3 changes: 3 additions & 0 deletions cmd/tendermint/commands/reindex_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/tendermint/tendermint/internal/state/indexer/sink/kv"
"github.com/tendermint/tendermint/internal/state/indexer/sink/psql"
"github.com/tendermint/tendermint/internal/store"
"github.com/tendermint/tendermint/libs/cli"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/os"
"github.com/tendermint/tendermint/rpc/coretypes"
Expand Down Expand Up @@ -52,6 +53,8 @@ either or both arguments.
tendermint reindex-event --start-height 2 --end-height 10
`,
RunE: func(cmd *cobra.Command, args []string) error {
home, err := cmd.Flags().GetString(cli.HomeFlag)
conf.RootDir = home
bs, ss, err := loadStateAndBlockStore(conf)
if err != nil {
return fmt.Errorf("%s: %w", reindexFailed, err)
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ func DefaultBaseConfig() BaseConfig {
FilterPeers: false,
DBBackend: "goleveldb",
DBPath: "data",
RootDir: "/root/.sei",
}
}

Expand Down
22 changes: 21 additions & 1 deletion internal/consensus/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,11 @@ func (h *Handshaker) ReplayBlocks(
return h.replayBlocks(ctx, state, appClient, appBlockHeight, storeBlockHeight, false)

} else if appBlockHeight == storeBlockHeight {
// We're good!
// We're good! But we need to reindex events
err := h.replayEvents(appBlockHeight)
if err != nil {
return nil, err
}
if err := checkAppHashEqualsOneFromState(appHash, state); err != nil {
return nil, err
}
Expand Down Expand Up @@ -550,6 +554,22 @@ func (h *Handshaker) replayBlock(
return state, nil
}

// replayEvents will be called during restart to avoid tx missing to be indexed
func (h *Handshaker) replayEvents(height int64) error {
block := h.store.LoadBlock(height)
meta := h.store.LoadBlockMeta(height)
res, err := h.stateStore.LoadFinalizeBlockResponses(height)
if err != nil {
return err
}
validatorUpdates, err := types.PB2TM.ValidatorUpdates(res.ValidatorUpdates)
if err != nil {
return err
}
sm.FireEvents(h.logger, h.eventBus, block, meta.BlockID, res, validatorUpdates)
return nil
}

func checkAppHashEqualsOneFromBlock(appHash []byte, block *types.Block) error {
if !bytes.Equal(appHash, block.AppHash) {
return fmt.Errorf(`block.AppHash does not match AppHash after replay. Got '%X', expected '%X'.
Expand Down
19 changes: 9 additions & 10 deletions internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package mempool
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -1036,17 +1036,16 @@ func (txmp *TxMempool) GetPeerFailedCheckTxCount(nodeID types.NodeID) uint64 {

// AppendCheckTxErr wraps error message into an ABCIMessageLogs json string
func (txmp *TxMempool) AppendCheckTxErr(existingLogs string, log string) string {
var logs []map[string]interface{}
json.Unmarshal([]byte(existingLogs), &logs)
var builder strings.Builder

// Append the new ABCIMessageLog to the slice
logs = append(logs, map[string]interface{}{
"log": log,
})
builder.WriteString(existingLogs)
// If there are already logs, append the new log with a separator
if builder.Len() > 0 {
builder.WriteString("; ")
}
builder.WriteString(log)

// Marshal the updated slice back into a JSON string
jsonData, _ := json.Marshal(logs)
return string(jsonData)
return builder.String()
}

func (txmp *TxMempool) handlePendingTransactions() {
Expand Down
22 changes: 7 additions & 15 deletions internal/mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package mempool
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
Expand Down Expand Up @@ -947,24 +946,17 @@ func TestAppendCheckTxErr(t *testing.T) {
}
t.Cleanup(client.Wait)
txmp := setup(t, client, 500)
existingData := `[{"log":"existing error log"}]`
existingLogData := "existing error log"
newLogData := "sample error log"

// Append new error
result := txmp.AppendCheckTxErr(existingData, "sample error msg")
actualResult := txmp.AppendCheckTxErr(existingLogData, newLogData)
expectedResult := fmt.Sprintf("%s; %s", existingLogData, newLogData)

// Unmarshal the result
var data []map[string]interface{}
err := json.Unmarshal([]byte(result), &data)
require.NoError(t, err)
require.Equal(t, len(data), 2)
require.Equal(t, data[1]["log"], "sample error msg")
require.Equal(t, expectedResult, actualResult)

// Append new error to empty log
result = txmp.AppendCheckTxErr("", "sample error msg")
actualResult = txmp.AppendCheckTxErr("", newLogData)

// Unmarshal the result
err = json.Unmarshal([]byte(result), &data)
require.NoError(t, err)
require.Equal(t, len(data), 1)
require.Equal(t, data[0]["log"], "sample error msg")
require.Equal(t, newLogData, actualResult)
}
6 changes: 3 additions & 3 deletions internal/state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func (blockExec *BlockExecutor) ApplyBlock(

// Events are fired after everything else.
// NOTE: if we crash between Commit and Save, events wont be fired during replay
fireEvents(blockExec.logger, blockExec.eventBus, block, blockID, fBlockRes, validatorUpdates)
FireEvents(blockExec.logger, blockExec.eventBus, block, blockID, fBlockRes, validatorUpdates)

return state, nil
}
Expand Down Expand Up @@ -687,7 +687,7 @@ func (state State) Update(
// Fire NewBlock, NewBlockHeader.
// Fire TxEvent for every tx.
// NOTE: if Tendermint crashes before commit, some or all of these events may be published again.
func fireEvents(
func FireEvents(
logger log.Logger,
eventBus types.BlockEventPublisher,
block *types.Block,
Expand Down Expand Up @@ -811,7 +811,7 @@ func ExecCommitBlock(
}

blockID := types.BlockID{Hash: block.Hash(), PartSetHeader: bps.Header()}
fireEvents(be.logger, be.eventBus, block, blockID, finalizeBlockResponse, validatorUpdates)
FireEvents(be.logger, be.eventBus, block, blockID, finalizeBlockResponse, validatorUpdates)
}

// Commit block
Expand Down

0 comments on commit 400fe74

Please sign in to comment.