Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into piersy/fix-tracing-…
Browse files Browse the repository at this point in the history
…panic
  • Loading branch information
piersy committed Feb 28, 2022
2 parents ce33106 + c78a51f commit 750b710
Show file tree
Hide file tree
Showing 27 changed files with 1,091 additions and 270 deletions.
12 changes: 8 additions & 4 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,9 @@ workflows:
- build-geth
- prepare-system-contracts
- race:
filters:
branches:
only: /master|release.*/
requires:
- build-geth
- prepare-system-contracts
Expand Down Expand Up @@ -542,10 +545,11 @@ workflows:
requires:
- checkout-monorepo
- build-geth
- end-to-end-cip35-eth-compatibility-test:
requires:
- checkout-monorepo
- build-geth
# Flaky!
# - end-to-end-cip35-eth-compatibility-test:
# requires:
# - checkout-monorepo
# - build-geth
- end-to-end-replica-test:
requires:
- checkout-monorepo
Expand Down
56 changes: 24 additions & 32 deletions consensus/istanbul/announce/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type ProxyContext interface {
GetProxiedValidatorEngine() proxy.ProxiedValidatorEngine
}

// Manager is the facade and entry point for the implementation of the announce protocol. It exposes methods to
// start and stop the announce Worker, and to handle announce messages.
type Manager struct {
logger log.Logger

Expand Down Expand Up @@ -139,7 +141,7 @@ func (m *Manager) announceThread() {
m.worker.Run()
}

// This function will handle a queryEnode message.
// HandleQueryEnodeMsg handles a queryEnodeMsg received by the p2p network, according to the announce protocol spec.
func (m *Manager) HandleQueryEnodeMsg(addr common.Address, peer consensus.Peer, payload []byte) error {
logger := m.logger.New("func", "HandleQueryEnodeMsg")

Expand Down Expand Up @@ -176,7 +178,11 @@ func (m *Manager) HandleQueryEnodeMsg(addr common.Address, peer consensus.Peer,
logger = logger.New("msgAddress", msg.Address, "msgVersion", qeData.Version)

// Do some validation checks on the istanbul.QueryEnodeData
if isValid, err := m.validateQueryEnode(msg.Address, msg.QueryEnodeMsg()); !isValid || err != nil {

// Check if the number of rows in the queryEnodePayload is at most 2 times the size of the current validator connection set.
// Note that this is a heuristic of the actual size of validator connection set at the time the validator constructed the announce message.
maxQueries := len(validatorConnSet) * 2
if isValid, err := validateQueryEnode(m.logger, msg.Address, msg.QueryEnodeMsg(), maxQueries); !isValid || err != nil {
logger.Warn("Validation of queryEnode message failed", "isValid", isValid, "err", err)
return err
}
Expand All @@ -195,15 +201,9 @@ func (m *Manager) HandleQueryEnodeMsg(addr common.Address, peer consensus.Peer,
if encEnodeURL.DestAddress != w.Ecdsa.Address {
continue
}
enodeBytes, err := w.Ecdsa.Decrypt(encEnodeURL.EncryptedEnodeURL)
node, err := DecryptAndParseEnodeURL(&w.Ecdsa, encEnodeURL.EncryptedEnodeURL)
if err != nil {
m.logger.Warn("Error decrypting endpoint", "err", err, "encEnodeURL.EncryptedEnodeURL", encEnodeURL.EncryptedEnodeURL)
return err
}
enodeURL := string(enodeBytes)
node, err := enode.ParseV4(enodeURL)
if err != nil {
logger.Warn("Error parsing enodeURL", "enodeUrl", enodeURL)
logger.Error("Can't process encEnodeURL. err", err, "encEnodeURL.EncryptedEnodeURL", encEnodeURL.EncryptedEnodeURL)
return err
}

Expand Down Expand Up @@ -271,30 +271,17 @@ func (m *Manager) answerQueryEnodeMsg(address common.Address, node *enode.Node,
// message. This is to force all validators that send a queryEnode message to
// create as succint message as possible, and prevent any possible network DOS attacks
// via extremely large queryEnode message.
func (m *Manager) validateQueryEnode(msgAddress common.Address, qeData *istanbul.QueryEnodeData) (bool, error) {
logger := m.logger.New("func", "validateQueryEnode", "msg address", msgAddress)
func validateQueryEnode(lg log.Logger, msgAddress common.Address, qeData *istanbul.QueryEnodeData, maxQueries int) (bool, error) {
logger := lg.New("func", "validateQueryEnode", "msg address", msgAddress)

// Check if there are any duplicates in the queryEnode message
var encounteredAddresses = make(map[common.Address]bool)
for _, encEnodeURL := range qeData.EncryptedEnodeURLs {
if encounteredAddresses[encEnodeURL.DestAddress] {
logger.Info("QueryEnode message has duplicate entries", "address", encEnodeURL.DestAddress)
return false, nil
}

encounteredAddresses[encEnodeURL.DestAddress] = true
if has, dupAddress := qeData.HasDuplicates(); has {
logger.Info("QueryEnode message has duplicate entries", "address", dupAddress)
return false, nil
}

// Check if the number of rows in the queryEnodePayload is at most 2 times the size of the current validator connection set.
// Note that this is a heuristic of the actual size of validator connection set at the time the validator constructed the announce message.
validatorConnSet, err := m.network.RetrieveValidatorConnSet()
if err != nil {
return false, err
}

if len(qeData.EncryptedEnodeURLs) > 2*len(validatorConnSet) {
logger.Info("Number of queryEnode message encrypted enodes is more than two times the size of the current validator connection set", "num queryEnode enodes", len(qeData.EncryptedEnodeURLs), "reg/elected val set size", len(validatorConnSet))
return false, err
if len(qeData.EncryptedEnodeURLs) > maxQueries {
logger.Info("Number of queryEnode message encrypted enodes is more max allowed", "num queryEnode enodes", len(qeData.EncryptedEnodeURLs), "max", maxQueries)
return false, nil
}

return true, nil
Expand Down Expand Up @@ -339,6 +326,7 @@ func (m *Manager) SendVersionCertificateTable(peer consensus.Peer) error {
return m.vcGossiper.SendAllFrom(m.state.VersionCertificateTable, peer)
}

// HandleVersionCertificatesMsg handles a versionCertificates received by the p2p network, according to the announce protocol spec.
func (m *Manager) HandleVersionCertificatesMsg(addr common.Address, peer consensus.Peer, payload []byte) error {
logger := m.logger.New("func", "HandleVersionCertificatesMsg")
logger.Trace("Handling version certificates msg")
Expand Down Expand Up @@ -479,7 +467,7 @@ func (m *Manager) StartAnnouncing(onStart func() error) error {
go m.announceThread()

if err := onStart(); err != nil {
m.StopAnnouncing(func() error { return nil })
m.unlockedStopAnnouncing(func() error { return nil })
return err
}

Expand All @@ -494,6 +482,10 @@ func (m *Manager) StopAnnouncing(onStop func() error) error {
return istanbul.ErrStoppedAnnounce
}

return m.unlockedStopAnnouncing(onStop)
}

func (m *Manager) unlockedStopAnnouncing(onStop func() error) error {
m.worker.Stop()
m.announceThreadWg.Wait()

Expand Down
1 change: 1 addition & 0 deletions consensus/istanbul/announce/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package announce

import "github.com/celo-org/celo-blockchain/common"

// Network manages the communication needed for the announce protocol to work.
type Network interface {
// Gossip gossips protocol messages
Gossip(payload []byte, ethMsgCode uint64) error
Expand Down
23 changes: 23 additions & 0 deletions consensus/istanbul/announce/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package announce

import (
"fmt"

"github.com/celo-org/celo-blockchain/consensus/istanbul"
"github.com/celo-org/celo-blockchain/p2p/enode"
)

// DecryptAndParseEnodeURL decrypts an encrypted enodeURL with the given ecdsa key pair, and parses the
// resulting Node.
func DecryptAndParseEnodeURL(ecdsa *istanbul.EcdsaInfo, encEnodeURL []byte) (*enode.Node, error) {
enodeBytes, err := ecdsa.Decrypt(encEnodeURL)
if err != nil {
return nil, fmt.Errorf("Error decrypting endpoint. err=%v. encEnodeURL: '%v'", err, encEnodeURL)
}
enodeURL := string(enodeBytes)
node, err := enode.ParseV4(enodeURL)
if err != nil {
return nil, fmt.Errorf("Error parsing enodeURL. err=%v. enodeURL: '%v'", err, enodeURL)
}
return node, err
}
4 changes: 4 additions & 0 deletions consensus/istanbul/backend/announce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@ import (
"github.com/celo-org/celo-blockchain/rlp"
)

// This file is kept in the backend package since, while actually testing the announce protocol code, it requires
// several dependencies from the istanbul code.

// This test function will test the announce message generator and handler.
// It will also test the gossip query generator and handler.
func TestAnnounceGossipQueryMsg(t *testing.T) {
t.Skip() // Flaky
// Create three backends
numValidators := 3
genesisCfg, nodeKeys := getGenesisAndKeys(numValidators, true)
Expand Down
5 changes: 2 additions & 3 deletions consensus/istanbul/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,10 +590,9 @@ func (sb *Backend) Verify(proposal istanbul.Proposal) (*istanbulCore.StateProces
}
}

err := sb.VerifyHeader(sb.chain, block.Header(), false)
err := sb.verifyHeaderFromProposal(sb.chain, block.Header())

// ignore errEmptyAggregatedSeal error because we don't have the committed seals yet
if err != nil && err != errEmptyAggregatedSeal {
if err != nil {
if err == consensus.ErrFutureBlock {
return nil, time.Unix(int64(block.Header().Time), 0).Sub(now()), consensus.ErrFutureBlock
} else {
Expand Down
55 changes: 39 additions & 16 deletions consensus/istanbul/backend/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,10 @@ var (
errInvalidVotingChain = errors.New("invalid voting chain")
// errInvalidAggregatedSeal is returned if the aggregated seal is invalid.
errInvalidAggregatedSeal = errors.New("invalid aggregated seal")
// errInvalidAggregatedSeal is returned if the aggregated seal is missing.
// errEmptyAggregatedSeal is returned if the aggregated seal is missing.
errEmptyAggregatedSeal = errors.New("empty aggregated seal")
// errNonEmptyAggregatedSeal is returned if the aggregated seal is not empty during preprepase proposal phase.
errNonEmptyAggregatedSeal = errors.New("Non empty aggregated seal during preprepare")
// errMismatchTxhashes is returned if the TxHash in header is mismatch.
errMismatchTxhashes = errors.New("mismatch transactions hashes")
// errInvalidValidatorSetDiff is returned if the header contains invalid validator set diff
Expand All @@ -103,14 +105,22 @@ func (sb *Backend) Author(header *types.Header) (common.Address, error) {
// VerifyHeader checks whether a header conforms to the consensus rules of a
// given engine. Verifies the seal regardless of given "seal" argument.
func (sb *Backend) VerifyHeader(chain consensus.ChainHeaderReader, header *types.Header, seal bool) error {
return sb.verifyHeader(chain, header, nil)
return sb.verifyHeader(chain, header, false, nil)
}

// verifyHeaderFromProposal checks whether a header conforms to the consensus rules from the
// preprepare istanbul phase.
func (sb *Backend) verifyHeaderFromProposal(chain consensus.ChainHeaderReader, header *types.Header) error {
return sb.verifyHeader(chain, header, true, nil)
}

// verifyHeader checks whether a header conforms to the consensus rules.The
// caller may optionally pass in a batch of parents (ascending order) to avoid
// looking those up from the database. This is useful for concurrently verifying
// a batch of new headers.
func (sb *Backend) verifyHeader(chain consensus.ChainHeaderReader, header *types.Header, parents []*types.Header) error {
// If emptyAggregatedSeal is set, the aggregatedSeal will be checked to be completely empty. Otherwise
// it will be checked as a normal aggregated seal.
func (sb *Backend) verifyHeader(chain consensus.ChainHeaderReader, header *types.Header, emptyAggregatedSeal bool, parents []*types.Header) error {
if header.Number == nil {
return errUnknownBlock
}
Expand All @@ -132,7 +142,7 @@ func (sb *Backend) verifyHeader(chain consensus.ChainHeaderReader, header *types
return errInvalidExtraDataFormat
}

return sb.verifyCascadingFields(chain, header, parents)
return sb.verifyCascadingFields(chain, header, emptyAggregatedSeal, parents)
}

// A sanity check for lightest mode. Checks that the correct epoch block exists for this header
Expand Down Expand Up @@ -160,7 +170,9 @@ func (sb *Backend) checkEpochBlockExists(chain consensus.ChainHeaderReader, head
// rather depend on a batch of previous headers. The caller may optionally pass
// in a batch of parents (ascending order) to avoid looking those up from the
// database. This is useful for concurrently verifying a batch of new headers.
func (sb *Backend) verifyCascadingFields(chain consensus.ChainHeaderReader, header *types.Header, parents []*types.Header) error {
// If emptyAggregatedSeal is set, the aggregatedSeal will be checked to be completely empty. Otherwise
// it will be checked as a normal aggregated seal.
func (sb *Backend) verifyCascadingFields(chain consensus.ChainHeaderReader, header *types.Header, emptyAggregatedSeal bool, parents []*types.Header) error {
// The genesis block is the always valid dead-end
number := header.Number.Uint64()
if number == 0 {
Expand Down Expand Up @@ -189,7 +201,7 @@ func (sb *Backend) verifyCascadingFields(chain consensus.ChainHeaderReader, head
return err
}

return sb.verifyAggregatedSeals(chain, header, parents)
return sb.verifyAggregatedSeals(chain, header, emptyAggregatedSeal, parents)
}

// VerifyHeaders is similar to VerifyHeader, but verifies a batch of headers
Expand All @@ -206,7 +218,7 @@ func (sb *Backend) VerifyHeaders(chain consensus.ChainHeaderReader, headers []*t
if errored {
err = consensus.ErrUnknownAncestor
} else {
err = sb.verifyHeader(chain, header, headers[:i])
err = sb.verifyHeader(chain, header, false, headers[:i])
}

if err != nil {
Expand Down Expand Up @@ -252,7 +264,9 @@ func (sb *Backend) verifySigner(chain consensus.ChainHeaderReader, header *types

// verifyAggregatedSeals checks whether the aggregated seal and parent seal in the header is
// signed on by the block's validators and the parent block's validators respectively
func (sb *Backend) verifyAggregatedSeals(chain consensus.ChainHeaderReader, header *types.Header, parents []*types.Header) error {
// If emptyAggregatedSeal is set, the aggregatedSeal will be checked to be completely empty. Otherwise
// it will be checked as a normal aggregated seal.
func (sb *Backend) verifyAggregatedSeals(chain consensus.ChainHeaderReader, header *types.Header, emptyAggregatedseal bool, parents []*types.Header) error {
number := header.Number.Uint64()
// We don't need to verify committed seals in the genesis block
if number == 0 {
Expand All @@ -264,20 +278,29 @@ func (sb *Backend) verifyAggregatedSeals(chain consensus.ChainHeaderReader, head
return err
}

// The length of Committed seals should be larger than 0
if len(extra.AggregatedSeal.Signature) == 0 {
return errEmptyAggregatedSeal
}

// Check the signatures on the current header
snap, err := sb.snapshot(chain, number-1, header.ParentHash, parents)
if err != nil {
return err
}
validators := snap.ValSet.Copy()
err = sb.verifyAggregatedSeal(header.Hash(), validators, extra.AggregatedSeal)
if err != nil {
return err

if emptyAggregatedseal {
// The length of Committed seals should be exactly 0 (preprepare proposal check)
if len(extra.AggregatedSeal.Signature) != 0 {
return errNonEmptyAggregatedSeal
}
// Should we also verify that the bitmap and round are nil?
} else {
// The length of Committed seals should be larger than 0
if len(extra.AggregatedSeal.Signature) == 0 {
return errEmptyAggregatedSeal
}

err = sb.verifyAggregatedSeal(header.Hash(), validators, extra.AggregatedSeal)
if err != nil {
return err
}
}

// The genesis block is skipped since it has no parents.
Expand Down
14 changes: 14 additions & 0 deletions consensus/istanbul/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,20 @@ func (qed *QueryEnodeData) String() string {
return fmt.Sprintf("{Version: %v, Timestamp: %v, EncryptedEnodeURLs: %v}", qed.Version, qed.Timestamp, qed.EncryptedEnodeURLs)
}

// HasDuplicates returns true if there are duplicate destination addresses in the query, and the first
// duplicate's destination address.
func (qed *QueryEnodeData) HasDuplicates() (bool, common.Address) {
var encounteredAddresses = make(map[common.Address]bool)
for _, encEnodeURL := range qed.EncryptedEnodeURLs {
if encounteredAddresses[encEnodeURL.DestAddress] {
return true, encEnodeURL.DestAddress
}

encounteredAddresses[encEnodeURL.DestAddress] = true
}
return false, common.Address{}
}

// ==============================================
//
// define the functions that needs to be provided for rlp Encoder/Decoder.
Expand Down
Loading

0 comments on commit 750b710

Please sign in to comment.