Skip to content

Commit

Permalink
1) Removed unused worker (#4512)
Browse files Browse the repository at this point in the history
2) Proper error checking
3) Tests for gas 30m
  • Loading branch information
Frozen authored Sep 28, 2023
1 parent 018c336 commit d8f1225
Show file tree
Hide file tree
Showing 11 changed files with 110 additions and 83 deletions.
11 changes: 5 additions & 6 deletions api/service/legacysync/syncing.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/harmony-one/harmony/internal/chain"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/node/worker"
"github.com/harmony-one/harmony/p2p"
libp2p_peer "github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
Expand Down Expand Up @@ -932,7 +931,7 @@ func (ss *StateSync) UpdateBlockAndStatus(block *types.Block, bc core.BlockChain
}

// generateNewState will construct most recent state from downloaded blocks
func (ss *StateSync) generateNewState(bc core.BlockChain, worker *worker.Worker) error {
func (ss *StateSync) generateNewState(bc core.BlockChain) error {
// update blocks created before node start sync
parentHash := bc.CurrentBlock().Hash()

Expand Down Expand Up @@ -995,7 +994,7 @@ func (ss *StateSync) generateNewState(bc core.BlockChain, worker *worker.Worker)
}

// ProcessStateSync processes state sync from the blocks received but not yet processed so far
func (ss *StateSync) ProcessStateSync(startHash []byte, size uint32, bc core.BlockChain, worker *worker.Worker) error {
func (ss *StateSync) ProcessStateSync(startHash []byte, size uint32, bc core.BlockChain) error {
// Gets consensus hashes.
if err := ss.getConsensusHashes(startHash, size); err != nil {
return errors.Wrap(err, "getConsensusHashes")
Expand All @@ -1005,7 +1004,7 @@ func (ss *StateSync) ProcessStateSync(startHash []byte, size uint32, bc core.Blo
if ss.stateSyncTaskQueue.Len() > 0 {
ss.downloadBlocks(bc)
}
return ss.generateNewState(bc, worker)
return ss.generateNewState(bc)
}

func (peerConfig *SyncPeerConfig) registerToBroadcast(peerHash []byte, ip, port string) error {
Expand Down Expand Up @@ -1076,7 +1075,7 @@ func (ss *StateSync) GetMaxPeerHeight() (uint64, error) {
}

// SyncLoop will keep syncing with peers until catches up
func (ss *StateSync) SyncLoop(bc core.BlockChain, worker *worker.Worker, isBeacon bool, consensus *consensus.Consensus, loopMinTime time.Duration) {
func (ss *StateSync) SyncLoop(bc core.BlockChain, isBeacon bool, consensus *consensus.Consensus, loopMinTime time.Duration) {
utils.Logger().Info().Msgf("legacy sync is executing ...")
if !isBeacon {
ss.RegisterNodeInfo()
Expand Down Expand Up @@ -1110,7 +1109,7 @@ func (ss *StateSync) SyncLoop(bc core.BlockChain, worker *worker.Worker, isBeaco
if size > SyncLoopBatchSize {
size = SyncLoopBatchSize
}
err := ss.ProcessStateSync(startHash[:], size, bc, worker)
err := ss.ProcessStateSync(startHash[:], size, bc)
if err != nil {
utils.Logger().Error().Err(err).
Msgf("[SYNC] ProcessStateSync failed (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)",
Expand Down
7 changes: 3 additions & 4 deletions api/service/stagedsync/syncing.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/harmony-one/harmony/core"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/node/worker"
"github.com/harmony-one/harmony/shard"
"github.com/ledgerwatch/erigon-lib/kv"

Expand Down Expand Up @@ -163,7 +162,7 @@ func initDB(ctx context.Context, db kv.RwDB) error {
}

// SyncLoop will keep syncing with peers until catches up
func (s *StagedSync) SyncLoop(bc core.BlockChain, worker *worker.Worker, isBeacon bool, consensus *consensus.Consensus, loopMinTime time.Duration) {
func (s *StagedSync) SyncLoop(bc core.BlockChain, isBeacon bool, consensus *consensus.Consensus, loopMinTime time.Duration) {

utils.Logger().Info().
Uint64("current height", bc.CurrentBlock().NumberU64()).
Expand Down Expand Up @@ -204,7 +203,7 @@ func (s *StagedSync) SyncLoop(bc core.BlockChain, worker *worker.Worker, isBeaco
}
startTime := time.Now()

if err := s.runSyncCycle(bc, worker, isBeacon, consensus, maxPeersHeight); err != nil {
if err := s.runSyncCycle(bc, isBeacon, consensus, maxPeersHeight); err != nil {
utils.Logger().Error().
Err(err).
Bool("isBeacon", isBeacon).
Expand Down Expand Up @@ -266,7 +265,7 @@ func (s *StagedSync) SyncLoop(bc core.BlockChain, worker *worker.Worker, isBeaco
}

// runSyncCycle will run one cycle of staged syncing
func (s *StagedSync) runSyncCycle(bc core.BlockChain, worker *worker.Worker, isBeacon bool, consensus *consensus.Consensus, maxPeersHeight uint64) error {
func (s *StagedSync) runSyncCycle(bc core.BlockChain, isBeacon bool, consensus *consensus.Consensus, maxPeersHeight uint64) error {
canRunCycleInOneTransaction := s.MaxBlocksPerSyncCycle > 0 && s.MaxBlocksPerSyncCycle <= s.MaxMemSyncCycleSize
var tx kv.RwTx
if canRunCycleInOneTransaction {
Expand Down
2 changes: 1 addition & 1 deletion core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func ApplyStakingTransaction(
vmenv := vm.NewEVM(context, statedb, config, cfg)

// Apply the transaction to the current state (included in the env)
gas, err = ApplyStakingMessage(vmenv, msg, gp, bc)
gas, err = ApplyStakingMessage(vmenv, msg, gp)
if err != nil {
return nil, 0, err
}
Expand Down
10 changes: 4 additions & 6 deletions core/state_transition.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ type StateTransition struct {
data []byte
state vm.StateDB
evm *vm.EVM
bc ChainContext
}

// Message represents a message sent to a contract.
Expand Down Expand Up @@ -131,7 +130,7 @@ func (result *ExecutionResult) Revert() []byte {
}

// NewStateTransition initialises and returns a new state transition object.
func NewStateTransition(evm *vm.EVM, msg Message, gp *GasPool, bc ChainContext) *StateTransition {
func NewStateTransition(evm *vm.EVM, msg Message, gp *GasPool) *StateTransition {
return &StateTransition{
gp: gp,
evm: evm,
Expand All @@ -140,7 +139,6 @@ func NewStateTransition(evm *vm.EVM, msg Message, gp *GasPool, bc ChainContext)
value: msg.Value(),
data: msg.Data(),
state: evm.StateDB,
bc: bc,
}
}

Expand All @@ -152,12 +150,12 @@ func NewStateTransition(evm *vm.EVM, msg Message, gp *GasPool, bc ChainContext)
// indicates a core error meaning that the message would always fail for that particular
// state and would never be accepted within a block.
func ApplyMessage(evm *vm.EVM, msg Message, gp *GasPool) (ExecutionResult, error) {
return NewStateTransition(evm, msg, gp, nil).TransitionDb()
return NewStateTransition(evm, msg, gp).TransitionDb()
}

// ApplyStakingMessage computes the new state for staking message
func ApplyStakingMessage(evm *vm.EVM, msg Message, gp *GasPool, bc ChainContext) (uint64, error) {
return NewStateTransition(evm, msg, gp, bc).StakingTransitionDb()
func ApplyStakingMessage(evm *vm.EVM, msg Message, gp *GasPool) (uint64, error) {
return NewStateTransition(evm, msg, gp).StakingTransitionDb()
}

// to returns the recipient of the message.
Expand Down
4 changes: 2 additions & 2 deletions core/state_transition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func testApplyStakingMessage(test applyStakingMessageTest, t *testing.T) {
vmenv := vm.NewEVM(ctx, db, params.TestChainConfig, vm.Config{})

// run the staking tx
_, err := ApplyStakingMessage(vmenv, msg, gp, chain)
_, err := ApplyStakingMessage(vmenv, msg, gp)
if err != nil {
if test.expectedError == nil {
t.Errorf(fmt.Sprintf("Got error %v but expected none", err))
Expand Down Expand Up @@ -193,7 +193,7 @@ func TestCollectGasRounding(t *testing.T) {

vmenv := vm.NewEVM(ctx, db, params.TestChainConfig, vm.Config{})
gasPool := new(GasPool).AddGas(math.MaxUint64)
st := NewStateTransition(vmenv, msg, gasPool, nil)
st := NewStateTransition(vmenv, msg, gasPool)
// buy gas to set initial gas to 5: gasLimit * gasPrice
if err := st.buyGas(); err != nil {
t.Fatal(err)
Expand Down
2 changes: 1 addition & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type ISync interface {
AddLastMileBlock(block *types.Block)
GetActivePeerNumber() int
CreateSyncConfig(peers []p2p.Peer, shardID uint32, selfPeerID libp2p_peer.ID, waitForEachPeerToConnect bool) error
SyncLoop(bc core.BlockChain, worker *worker.Worker, isBeacon bool, consensus *consensus.Consensus, loopMinTime time.Duration)
SyncLoop(bc core.BlockChain, isBeacon bool, consensus *consensus.Consensus, loopMinTime time.Duration)
IsSynchronized() bool
IsSameBlockchainHeight(bc core.BlockChain) (uint64, bool)
AddNewBlock(peerHash []byte, block *types.Block)
Expand Down
17 changes: 9 additions & 8 deletions node/node_newblock.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package node

import (
"errors"
"sort"
"strings"
"time"

"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/crypto/bls"
"github.com/pkg/errors"

staking "github.com/harmony-one/harmony/staking/types"

Expand Down Expand Up @@ -116,16 +116,18 @@ func (node *Node) ProposeNewBlock(commitSigs chan []byte) (*types.Block, error)
utils.AnalysisStart("ProposeNewBlock", nowEpoch, blockNow)
defer utils.AnalysisEnd("ProposeNewBlock", nowEpoch, blockNow)

node.Worker.UpdateCurrent()

header := node.Worker.GetCurrentHeader()
// Update worker's current header and
// state data in preparation to propose/process new transactions
leaderKey := node.Consensus.GetLeaderPubKey()
env, err := node.Worker.UpdateCurrent()
if err != nil {
return nil, errors.Wrap(err, "failed to update worker")
}

var (
header = env.CurrentHeader()
leaderKey = node.Consensus.GetLeaderPubKey()
coinbase = node.GetAddressForBLSKey(leaderKey.Object, header.Epoch())
beneficiary = coinbase
err error
)

// After staking, all coinbase will be the address of bls pub key
Expand All @@ -134,8 +136,7 @@ func (node *Node) ProposeNewBlock(commitSigs chan []byte) (*types.Block, error)
coinbase.SetBytes(blsPubKeyBytes[:])
}

emptyAddr := common.Address{}
if coinbase == emptyAddr {
if coinbase == (common.Address{}) {
return nil, errors.New("[ProposeNewBlock] Failed setting coinbase")
}

Expand Down
13 changes: 6 additions & 7 deletions node/node_syncing.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/harmony-one/harmony/core/types"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/node/worker"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/shard"
)
Expand Down Expand Up @@ -269,7 +268,7 @@ func (node *Node) doBeaconSyncing() {
}

// DoSyncing keep the node in sync with other peers, willJoinConsensus means the node will try to join consensus after catch up
func (node *Node) DoSyncing(bc core.BlockChain, worker *worker.Worker, willJoinConsensus bool) {
func (node *Node) DoSyncing(bc core.BlockChain, willJoinConsensus bool) {
if node.NodeConfig.IsOffline {
return
}
Expand All @@ -280,15 +279,15 @@ func (node *Node) DoSyncing(bc core.BlockChain, worker *worker.Worker, willJoinC
for {
select {
case <-ticker.C:
node.doSync(bc, worker, willJoinConsensus)
node.doSync(bc, willJoinConsensus)
case <-node.Consensus.BlockNumLowChan:
node.doSync(bc, worker, willJoinConsensus)
node.doSync(bc, willJoinConsensus)
}
}
}

// doSync keep the node in sync with other peers, willJoinConsensus means the node will try to join consensus after catch up
func (node *Node) doSync(bc core.BlockChain, worker *worker.Worker, willJoinConsensus bool) {
func (node *Node) doSync(bc core.BlockChain, willJoinConsensus bool) {

syncInstance := node.SyncInstance()
if syncInstance.GetActivePeerNumber() < legacysync.NumPeersLowBound {
Expand Down Expand Up @@ -317,7 +316,7 @@ func (node *Node) doSync(bc core.BlockChain, worker *worker.Worker, willJoinCons
node.Consensus.BlocksNotSynchronized()
}
isBeacon := bc.ShardID() == shard.BeaconChainShardID
syncInstance.SyncLoop(bc, worker, isBeacon, node.Consensus, legacysync.LoopMinTime)
syncInstance.SyncLoop(bc, isBeacon, node.Consensus, legacysync.LoopMinTime)
if willJoinConsensus {
node.IsSynchronized.Set()
node.Consensus.BlocksSynchronized()
Expand Down Expand Up @@ -388,7 +387,7 @@ func (node *Node) supportSyncing() {
utils.Logger().Debug().Msg("[SYNC] initialized state for staged sync")
}

go node.DoSyncing(node.Blockchain(), node.Worker, joinConsensus)
go node.DoSyncing(node.Blockchain(), joinConsensus)
}

// InitSyncingServer starts downloader server.
Expand Down
7 changes: 7 additions & 0 deletions node/worker/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package worker

import "github.com/harmony-one/harmony/block"

type Environment interface {
CurrentHeader() *block.Header
}
Loading

0 comments on commit d8f1225

Please sign in to comment.