Skip to content

Commit

Permalink
[Utility][Persistence][Savepoints/Rollbacks] Refactor UtilityContext …
Browse files Browse the repository at this point in the history
…into UtilityUnitOfWork (Issue #563) (#577)

## Description

This PR creates the logical abstraction that would represent the "Unit
Of Work" (inspired by [Martin Fowler's work in his "Patterns of
Enterprise Application
Architecture"](https://martinfowler.com/eaaCatalog/unitOfWork.html))

Essentially what previously was known as `UtilityContext` becomes
`LeaderUtilityUnitOfWork` or `ReplicaUtilityUnitOfWork` depending on the
fact that the current node is the Leader or not (in Consensus).

It touches many files because it moves everything that is interacting
with `Persistence` into the `utility/unit_of_work` folder.
This is to facilitate subsequent refactorings (see related PRs under
#562) into more modular components.

This PR also sets the ground up for the completion of #508 

## Issue

Fixes #563 

## Type of change

Please mark the relevant option(s):

- [x] New feature, functionality or library
- [ ] Bug fix
- [ ] Code health or cleanup
- [ ] Major breaking change
- [ ] Documentation
- [ ] Other <!-- add details here if it a different type of change -->

## List of changes

- Refactored `UtilityContext` to be a polymorphic `UtilityOfWork`
- Introduced `LeaderUtilityUnitOfWork` and `ReplicaUtilityUnitOfWork`
that expose the methods that are going be to used depending on the
"role" of the node in the consensus process.
- Moved `utilityContext` related code under `utility/unit_of_work` and
renamed all references accordingly

## Testing

- [x] `make develop_test`
- [x]
[LocalNet](https://github.com/pokt-network/pocket/blob/main/docs/development/README.md)
w/ all of the steps outlined in the `README`

<!-- REMOVE this comment block after following the instructions
 If you added additional tests or infrastructure, describe it here.
 Bonus points for images and videos or gifs.
-->

## Required Checklist

- [x] I have performed a self-review of my own code
- [x] I have commented my code, particularly in hard-to-understand areas
- [x] I have tested my changes using the available tooling
- [x] I have updated the corresponding CHANGELOG

### If Applicable Checklist

- [x] I have updated the corresponding README(s); local and/or global
- [ ] I have added tests that prove my fix is effective or that my
feature works
- [ ] I have added, or updated,
[mermaid.js](https://mermaid-js.github.io) diagrams in the corresponding
README(s)
- [ ] I have added, or updated, documentation and
[mermaid.js](https://mermaid-js.github.io) diagrams in `shared/docs/*`
if I updated `shared/*`README(s)

---------

Signed-off-by: Alessandro De Blasis <alex@deblasis.net>
  • Loading branch information
deblasis authored Mar 26, 2023
1 parent 70c52e4 commit 19bf4d3
Show file tree
Hide file tree
Showing 48 changed files with 2,191 additions and 1,959 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/changelog-verify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ jobs:
const result = `<!-- validate_changelogs_review -->The changelog validation failed with the following output:
\`\`\`
${{ needs.validate.outputs.all }}
${{ fromJSON(needs.validate.outputs.all) }}
\`\`\`
Please update the relevant CHANGELOG.md files and ensure they follow the correct format.`;
Expand Down
33 changes: 14 additions & 19 deletions consensus/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

func (m *consensusModule) commitBlock(block *coreTypes.Block) error {
// Commit the context
if err := m.utilityContext.Commit(block.BlockHeader.QuorumCertificate); err != nil {
if err := m.utilityUnitOfWork.Commit(block.BlockHeader.QuorumCertificate); err != nil {
return err
}

Expand All @@ -22,12 +22,7 @@ func (m *consensusModule) commitBlock(block *coreTypes.Block) error {
}).
Msg("🧱🧱🧱 Committing block 🧱🧱🧱")

// Release the context
if err := m.utilityContext.Release(); err != nil {
m.logger.Warn().Err(err).Msg("Error releasing utility context")
}

m.utilityContext = nil
m.utilityUnitOfWork = nil

return nil
}
Expand Down Expand Up @@ -71,29 +66,29 @@ func (m *consensusModule) isValidMessageBlock(msg *typesCons.HotstuffMessage) (b
return true, nil
}

// Creates a new Utility context and clears/nullifies any previous contexts if they exist
func (m *consensusModule) refreshUtilityContext() error {
// Catch-all structure to release the previous utility context if it wasn't properly cleaned up.
// Ideally, this should not be called.
if m.utilityContext != nil {
m.logger.Warn().Msg(typesCons.NilUtilityContextWarning)
if err := m.utilityContext.Release(); err != nil {
m.logger.Warn().Err(err).Msg("Error releasing utility context")
// Creates a new Utility Unit Of Work and clears/nullifies any previous UOW if they exist
func (m *consensusModule) refreshUtilityUnitOfWork() error {
// Catch-all structure to release the previous utility UOW if it wasn't properly cleaned up.
// IMPROVE: This should not be called if the UOW is properly managed in the entire lifecycle
if m.utilityUnitOfWork != nil {
m.logger.Warn().Msg(typesCons.NilUtilityUOWWarning)
if err := m.utilityUnitOfWork.Release(); err != nil {
m.logger.Warn().Err(err).Msg("Error releasing utility unit of work")
}
m.utilityContext = nil
m.utilityUnitOfWork = nil
}

// Only one write context can exist at a time, and the utility context needs to instantiate
// Only one write context can exist at a time, and the utility unit of work needs to instantiate
// a new one to modify the state.
if err := m.GetBus().GetPersistenceModule().ReleaseWriteContext(); err != nil {
m.logger.Warn().Err(err).Msg("Error releasing persistence write context")
}

utilityContext, err := m.GetBus().GetUtilityModule().NewContext(int64(m.height))
utilityUOW, err := m.GetBus().GetUtilityModule().NewUnitOfWork(int64(m.height))
if err != nil {
return err
}

m.utilityContext = utilityContext
m.utilityUnitOfWork = utilityUOW
return nil
}
4 changes: 2 additions & 2 deletions consensus/debugging.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ func (m *consensusModule) SetBlock(block *coreTypes.Block) {
m.block = block
}

func (m *consensusModule) SetUtilityContext(utilityContext modules.UtilityContext) {
m.utilityContext = utilityContext
func (m *consensusModule) SetUtilityUnitOfWork(utilityUnitOfWork modules.UtilityUnitOfWork) {
m.utilityUnitOfWork = utilityUnitOfWork
}

func (m *consensusModule) HandleDebugMessage(debugMessage *messaging.DebugMessage) error {
Expand Down
8 changes: 8 additions & 0 deletions consensus/doc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.0.0.39] - 2023-03-26

- Refactored `utilityContext` into `utilityUnitOfWork`
- Added `utilityUnitOfWorkFactory` to create `utilityUnitOfWork` instances depending on the fact that the current node is `Leader` or `Replica`
- Renamed `prepareAndApplyBlock` to `prepareBlock`
- Centralized `applyBlock` logic
- Updated tests

## [0.0.0.38] - 2023-03-25

- Add quorum certificate to the block before committing to persistence
Expand Down
8 changes: 4 additions & 4 deletions consensus/e2e_tests/pacemaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,15 +177,15 @@ func TestPacemakerCatchupSameStepDifferentRounds(t *testing.T) {

// Set all nodes to the same STEP and HEIGHT BUT different ROUNDS
for _, pocketNode := range pocketNodes {
// Update height, step, leaderId, and utility context via setters exposed with the debug interface
// Update height, step, leaderId, and utility unit of work via setters exposed with the debug interface
consensusModImpl := GetConsensusModImpl(pocketNode)
consensusModImpl.MethodByName("SetHeight").Call([]reflect.Value{reflect.ValueOf(testHeight)})
consensusModImpl.MethodByName("SetStep").Call([]reflect.Value{reflect.ValueOf(testStep)})

// utilityContext is only set on new rounds, which is skipped in this test
utilityContext, err := pocketNode.GetBus().GetUtilityModule().NewContext(int64(testHeight))
// utilityUnitOfWork is only set on new rounds, which is skipped in this test
utilityUnitOfWork, err := pocketNode.GetBus().GetUtilityModule().NewUnitOfWork(int64(testHeight))
require.NoError(t, err)
consensusModImpl.MethodByName("SetUtilityContext").Call([]reflect.Value{reflect.ValueOf(utilityContext)})
consensusModImpl.MethodByName("SetUtilityUnitOfWork").Call([]reflect.Value{reflect.ValueOf(utilityUnitOfWork)})
}

// Set the leader to be in the highest round.
Expand Down
67 changes: 49 additions & 18 deletions consensus/e2e_tests/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
mocksPer "github.com/pokt-network/pocket/persistence/types/mocks"
"github.com/pokt-network/pocket/runtime"
"github.com/pokt-network/pocket/runtime/configs"
"github.com/pokt-network/pocket/runtime/defaults"
"github.com/pokt-network/pocket/runtime/genesis"
"github.com/pokt-network/pocket/runtime/test_artifacts"
"github.com/pokt-network/pocket/shared"
Expand All @@ -40,9 +41,10 @@ func TestMain(m *testing.M) {
const (
numValidators = 4
stateHash = "42"
maxTxBytes = 90000
)

var maxTxBytes = defaults.DefaultConsensusMaxMempoolBytes

type IdToNodeMapping map[typesCons.NodeId]*shared.Node

/*** Node Generation Helpers ***/
Expand All @@ -55,7 +57,7 @@ func GenerateNodeRuntimeMgrs(_ *testing.T, validatorCount int, clockMgr clock.Cl
for i, config := range cfgs {
config.Consensus = &configs.ConsensusConfig{
PrivateKey: config.PrivateKey,
MaxMempoolBytes: 500000000,
MaxMempoolBytes: maxTxBytes,
PacemakerConfig: &configs.PacemakerConfig{
TimeoutMsec: 5000,
Manual: false,
Expand Down Expand Up @@ -102,14 +104,16 @@ func CreateTestConsensusPocketNode(
persistenceMock := basePersistenceMock(t, eventsChannel, bus)
bus.RegisterModule(persistenceMock)

_, err := consensus.Create(bus)
consensusMod, err := consensus.Create(bus)
require.NoError(t, err)
consensusModule, ok := consensusMod.(modules.ConsensusModule)
require.True(t, ok)

runtimeMgr := (bus).GetRuntimeMgr()
// TODO(olshansky): At the moment we are using the same base mocks for all the tests,
// but note that they will need to be customized on a per test basis.
p2pMock := baseP2PMock(t, eventsChannel)
utilityMock := baseUtilityMock(t, eventsChannel, runtimeMgr.GetGenesis())
utilityMock := baseUtilityMock(t, eventsChannel, runtimeMgr.GetGenesis(), consensusModule)
telemetryMock := baseTelemetryMock(t, eventsChannel)
loggerMock := baseLoggerMock(t, eventsChannel)
rpcMock := baseRpcMock(t, eventsChannel)
Expand Down Expand Up @@ -429,44 +433,71 @@ func baseP2PMock(t *testing.T, eventsChannel modules.EventsChannel) *mockModules
}

// Creates a utility module mock with mock implementations of some basic functionality
func baseUtilityMock(t *testing.T, _ modules.EventsChannel, genesisState *genesis.GenesisState) *mockModules.MockUtilityModule {
func baseUtilityMock(t *testing.T, _ modules.EventsChannel, genesisState *genesis.GenesisState, consensusMod modules.ConsensusModule) *mockModules.MockUtilityModule {
ctrl := gomock.NewController(t)
utilityMock := mockModules.NewMockUtilityModule(ctrl)
utilityContextMock := baseUtilityContextMock(t, genesisState)

utilityMock.EXPECT().Start().Return(nil).AnyTimes()
utilityMock.EXPECT().SetBus(gomock.Any()).Return().AnyTimes()
utilityMock.EXPECT().
NewContext(gomock.Any()).
Return(utilityContextMock, nil).
NewUnitOfWork(gomock.Any()).
DoAndReturn(
// mimicking the behavior of the utility module's NewUnitOfWork method
func(height int64) (modules.UtilityUnitOfWork, error) {
if consensusMod.IsLeader() {
return baseLeaderUtilityUnitOfWorkMock(t, genesisState), nil
}
return baseReplicaUtilityUnitOfWorkMock(t, genesisState), nil
}).
MaxTimes(4)
utilityMock.EXPECT().GetModuleName().Return(modules.UtilityModuleName).AnyTimes()

return utilityMock
}

func baseUtilityContextMock(t *testing.T, genesisState *genesis.GenesisState) *mockModules.MockUtilityContext {
func baseLeaderUtilityUnitOfWorkMock(t *testing.T, genesisState *genesis.GenesisState) *mockModules.MockLeaderUtilityUnitOfWork {
ctrl := gomock.NewController(t)
utilityContextMock := mockModules.NewMockUtilityContext(ctrl)
utilityLeaderUnitOfWorkMock := mockModules.NewMockLeaderUtilityUnitOfWork(ctrl)

persistenceContextMock := mockModules.NewMockPersistenceRWContext(ctrl)
persistenceContextMock.EXPECT().GetAllValidators(gomock.Any()).Return(genesisState.GetValidators(), nil).AnyTimes()
persistenceContextMock.EXPECT().GetBlockHash(gomock.Any()).Return("", nil).AnyTimes()

utilityContextMock.EXPECT().
utilityLeaderUnitOfWorkMock.EXPECT().
CreateAndApplyProposalBlock(gomock.Any(), maxTxBytes).
Return(stateHash, make([][]byte, 0), nil).
AnyTimes()
utilityContextMock.EXPECT().
utilityLeaderUnitOfWorkMock.EXPECT().
ApplyBlock().
Return(stateHash, make([][]byte, 0), nil).
AnyTimes()
utilityLeaderUnitOfWorkMock.EXPECT().SetProposalBlock(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
utilityLeaderUnitOfWorkMock.EXPECT().Commit(gomock.Any()).Return(nil).AnyTimes()
utilityLeaderUnitOfWorkMock.EXPECT().Release().Return(nil).AnyTimes()

persistenceContextMock.EXPECT().Release().Return(nil).AnyTimes()

return utilityLeaderUnitOfWorkMock
}

func baseReplicaUtilityUnitOfWorkMock(t *testing.T, genesisState *genesis.GenesisState) *mockModules.MockReplicaUtilityUnitOfWork {
ctrl := gomock.NewController(t)
utilityReplicaUnitOfWorkMock := mockModules.NewMockReplicaUtilityUnitOfWork(ctrl)

persistenceContextMock := mockModules.NewMockPersistenceRWContext(ctrl)
persistenceContextMock.EXPECT().GetAllValidators(gomock.Any()).Return(genesisState.GetValidators(), nil).AnyTimes()
persistenceContextMock.EXPECT().GetBlockHash(gomock.Any()).Return("", nil).AnyTimes()

utilityReplicaUnitOfWorkMock.EXPECT().
ApplyBlock().
Return(stateHash, nil).
Return(stateHash, make([][]byte, 0), nil).
AnyTimes()
utilityContextMock.EXPECT().SetProposalBlock(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
utilityContextMock.EXPECT().Commit(gomock.Any()).Return(nil).AnyTimes()
utilityContextMock.EXPECT().Release().Return(nil).AnyTimes()
utilityReplicaUnitOfWorkMock.EXPECT().SetProposalBlock(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
utilityReplicaUnitOfWorkMock.EXPECT().Commit(gomock.Any()).Return(nil).AnyTimes()
utilityReplicaUnitOfWorkMock.EXPECT().Release().Return(nil).AnyTimes()

persistenceContextMock.EXPECT().Release().Return(nil).AnyTimes()

return utilityContextMock
return utilityReplicaUnitOfWorkMock
}

func baseTelemetryMock(t *testing.T, _ modules.EventsChannel) *mockModules.MockTelemetryModule {
Expand Down
1 change: 0 additions & 1 deletion consensus/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,6 @@ func (m *consensusModule) broadcastToValidators(msg *typesCons.HotstuffMessage)

/*** Persistence Helpers ***/

// TECHDEBT(#388): Integrate this with the `persistence` module or a real mempool.
func (m *consensusModule) clearMessagesPool() {
for _, step := range HotstuffSteps {
m.hotstuffMempool[step].Clear()
Expand Down
44 changes: 23 additions & 21 deletions consensus/hotstuff_leader.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package consensus

import (
"errors"

consensusTelemetry "github.com/pokt-network/pocket/consensus/telemetry"
typesCons "github.com/pokt-network/pocket/consensus/types"
"github.com/pokt-network/pocket/shared/codec"
coreTypes "github.com/pokt-network/pocket/shared/core/types"
"github.com/pokt-network/pocket/shared/modules"
)

// CONSOLIDATE: Last/Prev & AppHash/StateHash/BlockHash
Expand Down Expand Up @@ -48,9 +51,9 @@ func (handler *HotstuffLeaderMessageHandler) HandleNewRoundMessage(m *consensusM
},
).Msg("📬 Received enough 📬 votes")

// Clear the previous utility context, if it exists, and create a new one
if err := m.refreshUtilityContext(); err != nil {
m.logger.Error().Err(err).Msg("Could not refresh utility context")
// Clear the previous utility unitOfWork, if it exists, and create a new one
if err := m.refreshUtilityUnitOfWork(); err != nil {
m.logger.Error().Err(err).Msg("Could not refresh utility unitOfWork")
return
}

Expand All @@ -62,24 +65,25 @@ func (handler *HotstuffLeaderMessageHandler) HandleNewRoundMessage(m *consensusM
// TODO: Add test to make sure same block is not applied twice if round is interrupted after being 'Applied'.
// TODO: Add more unit tests for these checks...
if m.shouldPrepareNewBlock(highPrepareQC) {
block, err := m.prepareAndApplyBlock(highPrepareQC)
block, err := m.prepareBlock(highPrepareQC)
if err != nil {
m.logger.Error().Err(err).Msg(typesCons.ErrPrepareBlock.Error())
m.paceMaker.InterruptRound("failed to prepare & apply block")
m.paceMaker.InterruptRound("failed to prepare new block")
return
}
m.block = block
} else {
// Leader acts like a replica if `prepareQC` is not `nil`
// TODO: Do we need to call `validateProposal` here similar to how replicas does it
if err := m.applyBlock(highPrepareQC.Block); err != nil {
m.logger.Error().Err(err).Msg(typesCons.ErrApplyBlock.Error())
m.paceMaker.InterruptRound("failed to apply block")
return
}
m.block = highPrepareQC.Block
}

if err := m.applyBlock(m.block); err != nil {
m.logger.Error().Err(err).Msg(typesCons.ErrApplyBlock.Error())
m.paceMaker.InterruptRound("failed to apply block")
return
}

m.step = Prepare
m.hotstuffMempool[NewRound].Clear()

Expand Down Expand Up @@ -362,11 +366,6 @@ func (m *consensusModule) validateMessageSignature(msg *typesCons.HotstuffMessag
address, valAddrToIdMap[address], msg, pubKey)
}

// TODO(#388): Utilize the shared mempool implementation for consensus messages.
//
// It doesn't actually work because SizeOf returns the size of the map pointer,
// and does not recursively determine the size of all the underlying elements
// Add proper tests and implementation once the mempool is implemented.
func (m *consensusModule) indexHotstuffMessage(msg *typesCons.HotstuffMessage) error {
if m.consCfg.MaxMempoolBytes < uint64(m.hotstuffMempool[typesCons.HotstuffStep(msg.Type)].TotalMsgBytes()) {
m.logger.Error().Err(typesCons.ErrConsensusMempoolFull).Msg(typesCons.DisregardHotstuffMessage)
Expand All @@ -384,17 +383,20 @@ func (m *consensusModule) indexHotstuffMessage(msg *typesCons.HotstuffMessage) e

// This is a helper function intended to be called by a leader/validator during a view change
// to prepare a new block that is applied to the new underlying context.
// TODO: Split this into atomic & functional `prepareBlock` and `applyBlock` methods
func (m *consensusModule) prepareAndApplyBlock(qc *typesCons.QuorumCertificate) (*coreTypes.Block, error) {
func (m *consensusModule) prepareBlock(qc *typesCons.QuorumCertificate) (*coreTypes.Block, error) {
if m.isReplica() {
return nil, typesCons.ErrReplicaPrepareBlock
}

// TECHDEBT: Retrieve this from consensus consensus config
maxTxBytes := 90000
maxTxBytes := m.consCfg.MaxMempoolBytes

leaderUOW, ok := m.utilityUnitOfWork.(modules.LeaderUtilityUnitOfWork)
if !ok {
return nil, errors.New("invalid utility unitOfWork, should be of type LeaderUtilityUnitOfWork")
}

// Reap the mempool for transactions to be applied in this block
stateHash, txs, err := m.utilityContext.CreateAndApplyProposalBlock(m.privateKey.Address(), maxTxBytes)
stateHash, txs, err := leaderUOW.CreateAndApplyProposalBlock(m.privateKey.Address(), maxTxBytes)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -429,7 +431,7 @@ func (m *consensusModule) prepareAndApplyBlock(qc *typesCons.QuorumCertificate)
}

// Set the proposal block in the persistence context
if err := m.utilityContext.SetProposalBlock(blockHeader.StateHash, blockHeader.ProposerAddress, block.Transactions); err != nil {
if err := m.utilityUnitOfWork.SetProposalBlock(blockHeader.StateHash, blockHeader.ProposerAddress, block.Transactions); err != nil {
return nil, err
}

Expand Down
10 changes: 5 additions & 5 deletions consensus/hotstuff_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ func (handler *HotstuffReplicaMessageHandler) HandleNewRoundMessage(m *consensus
return
}

// Clear the previous utility context, if it exists, and create a new one
if err := m.refreshUtilityContext(); err != nil {
m.logger.Error().Err(err).Msg("Could not refresh utility context")
// Clear the previous utility unitOfWork, if it exists, and create a new one
if err := m.refreshUtilityUnitOfWork(); err != nil {
m.logger.Error().Err(err).Msg("Could not refresh utility unitOfWork")
return
}

Expand Down Expand Up @@ -238,12 +238,12 @@ func (m *consensusModule) validateProposal(msg *typesCons.HotstuffMessage) error
func (m *consensusModule) applyBlock(block *coreTypes.Block) error {
blockHeader := block.BlockHeader
// Set the proposal block in the persistence context
if err := m.utilityContext.SetProposalBlock(blockHeader.StateHash, blockHeader.ProposerAddress, block.Transactions); err != nil {
if err := m.utilityUnitOfWork.SetProposalBlock(blockHeader.StateHash, blockHeader.ProposerAddress, block.Transactions); err != nil {
return err
}

// Apply all the transactions in the block and get the stateHash
stateHash, err := m.utilityContext.ApplyBlock()
stateHash, _, err := m.utilityUnitOfWork.ApplyBlock()
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 19bf4d3

Please sign in to comment.