Skip to content

Commit

Permalink
[FAB-1777] Refactor orderer multichain package
Browse files Browse the repository at this point in the history
https://jira.hyperledger.org/browse/FAB-1777

The orderer multichain package has become a little unwieldy as more and
more configuration based handlers have been added to it. This changeset
consolidates these many parameters into embedded structures to alleviate
this problem in preparation for adding the chain config handler.

Change-Id: I9b999d9e52dfa38d3aa42a9379892530357d4b14
Signed-off-by: Jason Yellick <jyellick@us.ibm.com>
  • Loading branch information
Jason Yellick committed Jan 20, 2017
1 parent b3f03b1 commit 2cdafd0
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 84 deletions.
76 changes: 24 additions & 52 deletions orderer/multichain/chainsupport.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,80 +82,68 @@ type ChainSupport interface {

broadcast.Support
ConsenterSupport

// ConfigTxManager returns the corresponding configtx.Manager for this chain
ConfigTxManager() configtx.Manager
}

type chainSupport struct {
chain Chain
cutter blockcutter.Receiver
configManager configtx.Manager
policyManager policies.Manager
sharedConfigManager sharedconfig.Manager
ledger ordererledger.ReadWriter
filters *filter.RuleSet
signer crypto.LocalSigner
lastConfiguration uint64
lastConfigSeq uint64
*ledgerResources
chain Chain
cutter blockcutter.Receiver
filters *filter.RuleSet
signer crypto.LocalSigner
lastConfiguration uint64
lastConfigSeq uint64
}

func newChainSupport(
filters *filter.RuleSet,
configManager configtx.Manager,
policyManager policies.Manager,
backing ordererledger.ReadWriter,
sharedConfigManager sharedconfig.Manager,
ledgerResources *ledgerResources,
consenters map[string]Consenter,
signer crypto.LocalSigner,
) *chainSupport {

cutter := blockcutter.NewReceiverImpl(sharedConfigManager, filters)
consenterType := sharedConfigManager.ConsensusType()
cutter := blockcutter.NewReceiverImpl(ledgerResources.SharedConfig(), filters)
consenterType := ledgerResources.SharedConfig().ConsensusType()
consenter, ok := consenters[consenterType]
if !ok {
logger.Fatalf("Error retrieving consenter of type: %s", consenterType)
}

cs := &chainSupport{
configManager: configManager,
policyManager: policyManager,
sharedConfigManager: sharedConfigManager,
cutter: cutter,
filters: filters,
ledger: backing,
signer: signer,
ledgerResources: ledgerResources,
cutter: cutter,
filters: filters,
signer: signer,
}

var err error
cs.chain, err = consenter.HandleChain(cs)
if err != nil {
logger.Fatalf("Error creating consenter for chain %x: %s", configManager.ChainID(), err)
logger.Fatalf("Error creating consenter for chain %x: %s", ledgerResources.ChainID(), err)
}

return cs
}

// createStandardFilters creates the set of filters for a normal (non-system) chain
func createStandardFilters(configManager configtx.Manager, policyManager policies.Manager, sharedConfig sharedconfig.Manager) *filter.RuleSet {
func createStandardFilters(ledgerResources *ledgerResources) *filter.RuleSet {
return filter.NewRuleSet([]filter.Rule{
filter.EmptyRejectRule,
sizefilter.MaxBytesRule(sharedConfig.BatchSize().AbsoluteMaxBytes),
sigfilter.New(sharedConfig.IngressPolicyNames, policyManager),
configtx.NewFilter(configManager),
sizefilter.MaxBytesRule(ledgerResources.SharedConfig().BatchSize().AbsoluteMaxBytes),
sigfilter.New(ledgerResources.SharedConfig().IngressPolicyNames, ledgerResources.PolicyManager()),
configtx.NewFilter(ledgerResources),
filter.AcceptRule,
})

}

// createSystemChainFilters creates the set of filters for the ordering system chain
func createSystemChainFilters(ml *multiLedger, configManager configtx.Manager, policyManager policies.Manager, sharedConfig sharedconfig.Manager) *filter.RuleSet {
func createSystemChainFilters(ml *multiLedger, ledgerResources *ledgerResources) *filter.RuleSet {
return filter.NewRuleSet([]filter.Rule{
filter.EmptyRejectRule,
sizefilter.MaxBytesRule(sharedConfig.BatchSize().AbsoluteMaxBytes),
sigfilter.New(sharedConfig.IngressPolicyNames, policyManager),
sizefilter.MaxBytesRule(ledgerResources.SharedConfig().BatchSize().AbsoluteMaxBytes),
sigfilter.New(ledgerResources.SharedConfig().IngressPolicyNames, ledgerResources.PolicyManager()),
newSystemChainFilter(ml),
configtx.NewFilter(configManager),
configtx.NewFilter(ledgerResources),
filter.AcceptRule,
})
}
Expand All @@ -172,22 +160,6 @@ func (cs *chainSupport) Sign(message []byte) ([]byte, error) {
return cs.signer.Sign(message)
}

func (cs *chainSupport) SharedConfig() sharedconfig.Manager {
return cs.sharedConfigManager
}

func (cs *chainSupport) ConfigTxManager() configtx.Manager {
return cs.configManager
}

func (cs *chainSupport) ChainID() string {
return cs.configManager.ChainID()
}

func (cs *chainSupport) PolicyManager() policies.Manager {
return cs.policyManager
}

func (cs *chainSupport) Filters() *filter.RuleSet {
return cs.filters
}
Expand Down Expand Up @@ -231,7 +203,7 @@ func (cs *chainSupport) addBlockSignature(block *cb.Block) {
}

func (cs *chainSupport) addLastConfigSignature(block *cb.Block) {
configSeq := cs.configManager.Sequence()
configSeq := cs.Sequence()
if configSeq > cs.lastConfigSeq {
cs.lastConfiguration = block.Header.Number
cs.lastConfigSeq = configSeq
Expand Down
6 changes: 3 additions & 3 deletions orderer/multichain/chainsupport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (mc *mockCommitter) Commit() {
func TestCommitConfig(t *testing.T) {
ml := &mockLedgerReadWriter{}
cm := &mockconfigtx.Manager{}
cs := &chainSupport{ledger: ml, configManager: cm, signer: &xxxCryptoHelper{}}
cs := &chainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: &xxxCryptoHelper{}}
txs := []*cb.Envelope{makeNormalTx("foo", 0), makeNormalTx("bar", 1)}
committers := []filter.Committer{&mockCommitter{}, &mockCommitter{}}
block := cs.CreateNextBlock(txs)
Expand All @@ -89,7 +89,7 @@ func TestCommitConfig(t *testing.T) {
func TestWriteBlockSignatures(t *testing.T) {
ml := &mockLedgerReadWriter{}
cm := &mockconfigtx.Manager{}
cs := &chainSupport{ledger: ml, configManager: cm, signer: &xxxCryptoHelper{}}
cs := &chainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: &xxxCryptoHelper{}}

blockMetadata := func(block *cb.Block) *cb.Metadata {
metadata, err := utils.GetMetadataFromBlock(block, cb.BlockMetadataIndex_SIGNATURES)
Expand All @@ -107,7 +107,7 @@ func TestWriteBlockSignatures(t *testing.T) {
func TestWriteLastConfiguration(t *testing.T) {
ml := &mockLedgerReadWriter{}
cm := &mockconfigtx.Manager{}
cs := &chainSupport{ledger: ml, configManager: cm, signer: &xxxCryptoHelper{}}
cs := &chainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: &xxxCryptoHelper{}}

lastConfig := func(block *cb.Block) uint64 {
index, err := utils.GetLastConfigurationIndexFromBlock(block)
Expand Down
66 changes: 40 additions & 26 deletions orderer/multichain/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"

"github.com/hyperledger/fabric/common/configtx"
"github.com/hyperledger/fabric/common/policies"
"github.com/hyperledger/fabric/orderer/common/sharedconfig"
ordererledger "github.com/hyperledger/fabric/orderer/ledger"
cb "github.com/hyperledger/fabric/protos/common"
Expand Down Expand Up @@ -60,6 +59,20 @@ type Manager interface {
ProposeChain(env *cb.Envelope) cb.Status
}

type configResources struct {
configtx.Manager
sharedConfig sharedconfig.Manager
}

func (cr *configResources) SharedConfig() sharedconfig.Manager {
return cr.sharedConfig
}

type ledgerResources struct {
*configResources
ledger ordererledger.ReadWriter
}

type multiLedger struct {
chains map[string]*chainSupport
consenters map[string]Consenter
Expand Down Expand Up @@ -101,19 +114,16 @@ func NewManagerImpl(ledgerFactory ordererledger.Factory, consenters map[string]C
if configTx == nil {
logger.Fatalf("Could not find configuration transaction for chain %s", chainID)
}
configManager, policyManager, backingLedger, sharedConfigManager := ml.newResources(configTx)
chainID := configManager.ChainID()
ledgerResources := ml.newLedgerResources(configTx)
chainID := ledgerResources.ChainID()

if sharedConfigManager.ChainCreationPolicyNames() != nil {
if ledgerResources.SharedConfig().ChainCreationPolicyNames() != nil {
if ml.sysChain != nil {
logger.Fatalf("There appear to be two system chains %s and %s", ml.sysChain.support.ChainID(), chainID)
}
logger.Debugf("Starting with system chain: %x", chainID)
chain := newChainSupport(createSystemChainFilters(ml, configManager, policyManager, sharedConfigManager),
configManager,
policyManager,
backingLedger,
sharedConfigManager,
chain := newChainSupport(createSystemChainFilters(ml, ledgerResources),
ledgerResources,
consenters,
signer)
ml.chains[string(chainID)] = chain
Expand All @@ -122,11 +132,8 @@ func NewManagerImpl(ledgerFactory ordererledger.Factory, consenters map[string]C
defer chain.start()
} else {
logger.Debugf("Starting chain: %x", chainID)
chain := newChainSupport(createStandardFilters(configManager, policyManager, sharedConfigManager),
configManager,
policyManager,
backingLedger,
sharedConfigManager,
chain := newChainSupport(createStandardFilters(ledgerResources),
ledgerResources,
consenters,
signer)
ml.chains[string(chainID)] = chain
Expand All @@ -151,19 +158,23 @@ func (ml *multiLedger) GetChain(chainID string) (ChainSupport, bool) {
return cs, ok
}

func newConfigTxManagerAndHandlers(configEnvelope *cb.ConfigurationEnvelope) (configtx.Manager, policies.Manager, sharedconfig.Manager, error) {
initializer := configtx.NewInitializer()
func newConfigResources(configEnvelope *cb.ConfigurationEnvelope) (*configResources, error) {
sharedConfigManager := sharedconfig.NewManagerImpl()
initializer := configtx.NewInitializer()
initializer.Handlers()[cb.ConfigurationItem_Orderer] = sharedConfigManager

configManager, err := configtx.NewManagerImpl(configEnvelope, initializer)
if err != nil {
return nil, nil, nil, fmt.Errorf("Error unpacking configuration transaction: %s", err)
return nil, fmt.Errorf("Error unpacking configuration transaction: %s", err)
}

return configManager, initializer.PolicyManager(), sharedConfigManager, nil
return &configResources{
Manager: configManager,
sharedConfig: sharedConfigManager,
}, nil
}

func (ml *multiLedger) newResources(configTx *cb.Envelope) (configtx.Manager, policies.Manager, ordererledger.ReadWriter, sharedconfig.Manager) {
func (ml *multiLedger) newLedgerResources(configTx *cb.Envelope) *ledgerResources {
payload := &cb.Payload{}
err := proto.Unmarshal(configTx.Payload, payload)
if err != nil {
Expand All @@ -176,38 +187,41 @@ func (ml *multiLedger) newResources(configTx *cb.Envelope) (configtx.Manager, po
logger.Fatalf("Error unmarshaling a config transaction to config envelope: %s", err)
}

configManager, policyManager, sharedConfigManager, err := newConfigTxManagerAndHandlers(configEnvelope)
configResources, err := newConfigResources(configEnvelope)

if err != nil {
logger.Fatalf("Error creating configtx manager and handlers: %s", err)
}

chainID := configManager.ChainID()
chainID := configResources.ChainID()

ledger, err := ml.ledgerFactory.GetOrCreate(chainID)
if err != nil {
logger.Fatalf("Error getting ledger for %s", chainID)
}

return configManager, policyManager, ledger, sharedConfigManager
return &ledgerResources{
configResources: configResources,
ledger: ledger,
}
}

func (ml *multiLedger) systemChain() *systemChain {
return ml.sysChain
}

func (ml *multiLedger) newChain(configtx *cb.Envelope) {
configManager, policyManager, backingLedger, sharedConfig := ml.newResources(configtx)
backingLedger.Append(ordererledger.CreateNextBlock(backingLedger, []*cb.Envelope{configtx}))
ledgerResources := ml.newLedgerResources(configtx)
ledgerResources.ledger.Append(ordererledger.CreateNextBlock(ledgerResources.ledger, []*cb.Envelope{configtx}))

// Copy the map to allow concurrent reads from broadcast/deliver while the new chainSupport is
newChains := make(map[string]*chainSupport)
for key, value := range ml.chains {
newChains[key] = value
}

cs := newChainSupport(createStandardFilters(configManager, policyManager, sharedConfig), configManager, policyManager, backingLedger, sharedConfig, ml.consenters, ml.signer)
chainID := configManager.ChainID()
cs := newChainSupport(createStandardFilters(ledgerResources), ledgerResources, ml.consenters, ml.signer)
chainID := ledgerResources.ChainID()

logger.Debugf("Created and starting new chain %s", chainID)

Expand Down
6 changes: 3 additions & 3 deletions orderer/multichain/systemchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (sc *systemChain) authorize(configEnvelope *cb.ConfigurationEnvelope) cb.St
return cb.Status_SUCCESS
}

func (sc *systemChain) inspect(configTxManager configtx.Manager, policyManager policies.Manager, sharedConfigManager sharedconfig.Manager) cb.Status {
func (sc *systemChain) inspect(configResources *configResources) cb.Status {
// XXX decide what it is that we will require to be the same in the new configuration, and what will be allowed to be different
// Are all keys allowed? etc.

Expand Down Expand Up @@ -241,12 +241,12 @@ func (sc *systemChain) authorizeAndInspect(configTx *cb.Envelope) cb.Status {
return status
}

configTxManager, policyManager, sharedConfigManager, err := newConfigTxManagerAndHandlers(configEnvelope)
configResources, err := newConfigResources(configEnvelope)
if err != nil {
logger.Debugf("Failed to create config manager and handlers: %s", err)
return cb.Status_BAD_REQUEST
}

// Make sure that the configuration does not modify any of the orderer
return sc.inspect(configTxManager, policyManager, sharedConfigManager)
return sc.inspect(configResources)
}

0 comments on commit 2cdafd0

Please sign in to comment.