Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

get block from redis #2933

Draft
wants to merge 10 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions libs/tendermint/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ type State struct {

preBlockTaskChan chan *preBlockTask
taskResultChan chan *preBlockTaskRes

blockCtx *BlockContext
}

// preBlockSignal
Expand Down Expand Up @@ -208,7 +210,9 @@ func NewState(
vcHeight: make(map[int64]string),
taskResultChan: make(chan *preBlockTaskRes, 1),
preBlockTaskChan: make(chan *preBlockTask, 1),
blockCtx: newBlockContext(),
}
cs.blockCtx.init()
// set function defaults (may be overwritten before calling Start)
cs.decideProposal = cs.defaultDecideProposal
cs.doPrevote = cs.defaultDoPrevote
Expand Down
41 changes: 41 additions & 0 deletions libs/tendermint/consensus/consensus_context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package consensus

import (
"os"
"time"

"github.com/okex/exchain/libs/tendermint/delta"
"github.com/okex/exchain/libs/tendermint/delta/redis-cgi"
"github.com/okex/exchain/libs/tendermint/libs/log"
"github.com/okex/exchain/libs/tendermint/types"
"github.com/spf13/viper"
)

type BlockContext struct {
deltaBroker delta.DeltaBroker
enableBlockRedis bool
logger log.Logger
}

func newBlockContext() *BlockContext {
bc := &BlockContext{
enableBlockRedis: types.DownloadDelta,
logger: log.NewTMLogger(log.NewSyncWriter(os.Stdout)).With("module", "consensus"),
}
return bc
}

func (bc *BlockContext) init() {
// todo use flag
if true {
url := viper.GetString(types.FlagRedisUrl)
auth := viper.GetString(types.FlagRedisAuth)
expire := time.Duration(viper.GetInt(types.FlagRedisExpire)) * time.Second
dbNum := viper.GetInt(types.FlagRedisDB)
if dbNum < 0 || dbNum > 15 {
panic("redis-db only support 0~15")
}
bc.deltaBroker = redis_cgi.NewRedisClient(url, auth, expire, dbNum, bc.logger)
bc.logger.Info("Init redis broker", "url", url)
}
}
53 changes: 50 additions & 3 deletions libs/tendermint/consensus/consensus_main_routine.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ func (cs *State) handleAVCProposal(proposal *types.Proposal) {
if !bytes.Equal(proposal.BlockID.PartsHeader.Hash, res.blockParts.Header().Hash) || proposal.Height != res.block.Height {
return
}
pi := ProposalBlockMessage{proposal, res.block}
cs.blockCtx.deltaBroker.Pub(pi.Marshal())
cs.sendInternalMessage(msgInfo{&ProposalMessage{proposal}, ""})
for i := 0; i < res.blockParts.Total(); i++ {
part := res.blockParts.GetPart(i)
Expand Down Expand Up @@ -164,6 +166,25 @@ func (cs *State) handleMsg(mi msgInfo) (added bool) {
if added, err = cs.setProposal(msg.Proposal); added {
cs.handleAVCProposal(msg.Proposal)
}
case *ProposalBlockMessage:
// not handle this msg if
// 1.the Height is not equal
// 2.has received the proposal and the proposal is not equal
// 3.has received the block
if cs.Height != msg.Block.Height || cs.Height != msg.Proposal.Height ||
(cs.Proposal != nil && !bytes.Equal(cs.Proposal.Signature, msg.Proposal.Signature)) ||
cs.ProposalBlock != nil {
return
}
if cs.Proposal == nil {
if add, _ := cs.defaultSetProposal(msg.Proposal); !add {
return
}
}
cs.ProposalBlock = msg.Block
cs.trc.Pin("recvBlock")
cs.finishReceiveBlock(msg.Block.Height)
cs.Logger.Info("GetBlockRedis", "height", msg.Proposal.Height, "time", tmtime.Now())
case *BlockPartMessage:
// if avc and has 2/3 votes, it can use the blockPartsHeader from votes
if cs.HasVC && cs.ProposalBlockParts == nil && cs.Round == 0 {
Expand All @@ -188,10 +209,19 @@ func (cs *State) handleMsg(mi msgInfo) (added bool) {
// RoundState with the updated copy or by emitting RoundState events in
// more places for routines depending on it to listen for.

cs.mtx.Unlock()
cs.mtx.Lock()
if added && cs.ProposalBlockParts.IsComplete() {
cs.handleCompleteProposal(msg.Height)
cs.trc.Pin("lastPart")
cs.bt.onRecvBlock(msg.Height)
cs.bt.totalParts = cs.ProposalBlockParts.Total()
cs.Logger.Info("GetBlockP2P", "height", msg.Height, "time", tmtime.Now())

if cs.ProposalBlock == nil {
err = cs.unmarshalBlock()
if err != nil {
return
}
cs.finishReceiveBlock(msg.Height)
}
}

if added {
Expand Down Expand Up @@ -307,6 +337,23 @@ func (cs *State) scheduleRound0(rs *cstypes.RoundState) {
cs.scheduleTimeout(sleepDuration, rs.Height, 0, cstypes.RoundStepNewHeight)
}

//
func (cs *State) finishReceiveBlock(height int64) {
cs.mtx.Unlock()
cs.mtx.Lock()

if cs.prerunTx {
cs.blockExec.NotifyPrerun(cs.ProposalBlock)
}
if !cs.ProposalBlockParts.IsComplete() {
cs.ProposalBlockParts = cs.ProposalBlock.MakePartSet(types.BlockPartSizeBytes)
}
// NOTE: it's possible to receive complete proposal blocks for future rounds without having the proposal
cs.Logger.Info("Received complete proposal block", "height", cs.ProposalBlock.Height, "hash", cs.ProposalBlock.Hash())
cs.eventBus.PublishEventCompleteProposal(cs.CompleteProposalEvent())
cs.handleCompleteProposal(height)
}

// requestForProposer FireEvent to broadcast ProposeRequestMessage
func (cs *State) requestForProposer(prMsg ProposeRequestMessage) {
if signature, err := cs.privValidator.SignBytes(prMsg.SignBytes()); err == nil {
Expand Down
21 changes: 3 additions & 18 deletions libs/tendermint/consensus/consensus_propose.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ func (cs *State) defaultDecideProposal(height int64, round int) {
proposal := types.NewProposal(height, round, cs.ValidRound, propBlockID)
proposal.HasVC = cs.HasVC
if err := cs.privValidator.SignProposal(cs.state.ChainID, proposal); err == nil {
pi := ProposalBlockMessage{proposal, block}
cs.blockCtx.deltaBroker.Pub(pi.Marshal())

// send proposal and block parts on internal msg queue
cs.sendInternalMessage(msgInfo{&ProposalMessage{proposal}, ""})
Expand Down Expand Up @@ -343,24 +345,7 @@ func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID p2p.ID) (add
return
}
automation.AddBlockTimeOut(height, round)
added, err = cs.addBlockPart(height, round, part, peerID)

if added && cs.ProposalBlockParts.IsComplete() {
err = cs.unmarshalBlock()
if err != nil {
return
}
cs.trc.Pin("lastPart")
cs.bt.onRecvBlock(height)
cs.bt.totalParts = cs.ProposalBlockParts.Total()
if cs.prerunTx {
cs.blockExec.NotifyPrerun(cs.ProposalBlock)
}
// NOTE: it's possible to receive complete proposal blocks for future rounds without having the proposal
cs.Logger.Info("Received complete proposal block", "height", cs.ProposalBlock.Height, "hash", cs.ProposalBlock.Hash())
cs.eventBus.PublishEventCompleteProposal(cs.CompleteProposalEvent())
}
return
return cs.addBlockPart(height, round, part, peerID)
}

func (cs *State) handleCompleteProposal(height int64) {
Expand Down
49 changes: 49 additions & 0 deletions libs/tendermint/consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func (conR *Reactor) OnStart() error {
}

go conR.updateRoundStateRoutine()
go conR.getBlockRoutine()

return nil
}
Expand Down Expand Up @@ -1099,6 +1100,18 @@ OUTER_LOOP:
}
}

func (conR *Reactor) getBlockRoutine() {
ctx := conR.conS.blockCtx
subChan := ctx.deltaBroker.SubChannel().Channel()
for msg := range subChan {
pbm := &ProposalBlockMessage{}
if err := pbm.Unmarshal([]byte(msg.Payload)); err == nil {
conR.Logger.Debug("Block from Redis:", "chan", msg.Channel, "height", pbm.Proposal.Height)
conR.conS.peerMsgQueue <- msgInfo{pbm, ""}
}
}
}

func (conR *Reactor) peerStatsRoutine() {
conR.resetSwitchToFastSyncTimer()

Expand Down Expand Up @@ -1712,6 +1725,7 @@ func RegisterMessages(cdc *amino.Codec) {
cdc.RegisterConcrete(&ProposeRequestMessage{}, "tendermint/ProposeRequestMessage", nil)
cdc.RegisterConcrete(&ProposeResponseMessage{}, "tendermint/ProposeResponseMessage", nil)
cdc.RegisterConcrete(&ViewChangeMessage{}, "tendermint/ChangeValidator", nil)
cdc.RegisterConcrete(&ProposalBlockMessage{}, "tendermint/Block", nil)
}

func decodeMsg(bz []byte) (msg Message, err error) {
Expand Down Expand Up @@ -1885,6 +1899,41 @@ func (m *BlockPartMessage) String() string {

//-------------------------------------

// BlockMessage is a whole block
type ProposalBlockMessage struct {
Proposal *types.Proposal
Block *types.Block
}

// ValidateBasic performs basic validation.
func (m *ProposalBlockMessage) ValidateBasic() error {
if m.Proposal.Height < 0 {
return errors.New("negative Height")
}
if m.Proposal.Round < 0 {
return errors.New("negative Round")
}
if err := m.Block.ValidateBasic(); err != nil {
return fmt.Errorf("wrong Block: %v", err)
}
return nil
}

// String returns a string representation.
func (m *ProposalBlockMessage) String() string {
return fmt.Sprintf("[Block H:%v R:%v B:%v]", m.Proposal.Height, m.Proposal.Round, m.Block)
}

func (m *ProposalBlockMessage) Marshal() []byte {
return cdc.MustMarshalBinaryBare(m)
}

func (m *ProposalBlockMessage) Unmarshal(bs []byte) error {
return cdc.UnmarshalBinaryBare(bs, m)
}

//-------------------------------------

// VoteMessage is sent when voting for a proposal (or lack thereof).
type VoteMessage struct {
Vote *types.Vote
Expand Down
8 changes: 8 additions & 0 deletions libs/tendermint/delta/delta.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
package delta

import "github.com/go-redis/redis/v8"

type DeltaBroker interface {
GetLocker() bool
ReleaseLocker()
ResetMostRecentHeightAfterUpload(height int64, upload func(int64) bool) (bool, int64, error)
SetDeltas(height int64, bytes []byte) error
GetDeltas(height int64) ([]byte, error, int64)
SetBlock(height int64, round int, bytes []byte) error
GetBlock(height int64, round int) ([]byte, error)
SetBlockParts(height int64, round int, bytes []byte) error
GetBlockParts(height int64, round int) ([]byte, error)
Pub(msg []byte)
SubChannel() *redis.PubSub
}
43 changes: 37 additions & 6 deletions libs/tendermint/delta/redis-cgi/cgi.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,19 @@ func (r *RedisClient) ResetMostRecentHeightAfterUpload(targetHeight int64, uploa
return res, mrh, err
}

func (r *RedisClient) SetBlock(height int64, bytes []byte) error {
func (r *RedisClient) SetBlock(height int64, round int, bytes []byte) error {
if len(bytes) == 0 {
return fmt.Errorf("block is empty")
}
req := r.rdb.SetNX(context.Background(), genBlockKey(height), bytes, r.ttl)
req := r.rdb.SetNX(context.Background(), genBlockKey(height, round), bytes, r.ttl)
return req.Err()
}

func (r *RedisClient) SetBlockParts(height int64, round int, bytes []byte) error {
if len(bytes) == 0 {
return fmt.Errorf("block is empty")
}
req := r.rdb.SetNX(context.Background(), genBlockPartsKey(height, round), bytes, r.ttl)
return req.Err()
}

Expand All @@ -106,8 +114,19 @@ func (r *RedisClient) SetDeltas(height int64, bytes []byte) error {
return req.Err()
}

func (r *RedisClient) GetBlock(height int64) ([]byte, error) {
bytes, err := r.rdb.Get(context.Background(), genBlockKey(height)).Bytes()
func (r *RedisClient) GetBlock(height int64, round int) ([]byte, error) {
bytes, err := r.rdb.Get(context.Background(), genBlockKey(height, round)).Bytes()
if err == redis.Nil {
return nil, fmt.Errorf("get empty block")
}
if err != nil {
return nil, err
}
return bytes, nil
}

func (r *RedisClient) GetBlockParts(height int64, round int) ([]byte, error) {
bytes, err := r.rdb.Get(context.Background(), genBlockPartsKey(height, round)).Bytes()
if err == redis.Nil {
return nil, fmt.Errorf("get empty block")
}
Expand Down Expand Up @@ -137,8 +156,20 @@ func (r *RedisClient) getMostRecentHeight() (mrh int64) {
return
}

func genBlockKey(height int64) string {
return fmt.Sprintf("BH:%d", height)
func (r *RedisClient) Pub(msg []byte) {
r.rdb.Publish(context.Background(), "block", msg)
}

func (r *RedisClient) SubChannel() *redis.PubSub {
return r.rdb.Subscribe(context.Background(), "block")
}

func genBlockKey(height int64, round int) string {
return fmt.Sprintf("BH-%d:%d", height, round)
}

func genBlockPartsKey(height int64, round int) string {
return fmt.Sprintf("BPH-%d:%d", height, round)
}

func genDeltaKey(height int64) string {
Expand Down