From 2cdafd0ce11d36c0481317cf55de9625773977e3 Mon Sep 17 00:00:00 2001 From: Jason Yellick Date: Thu, 19 Jan 2017 23:55:30 -0500 Subject: [PATCH] [FAB-1777] Refactor orderer multichain package https://jira.hyperledger.org/browse/FAB-1777 The orderer multichain package has become a little unwieldy as more and more configuration based handlers have been added to it. This changeset consolidates these many parameters into embedded structures to alleviate this problem in preparation for adding the chain config handler. Change-Id: I9b999d9e52dfa38d3aa42a9379892530357d4b14 Signed-off-by: Jason Yellick --- orderer/multichain/chainsupport.go | 76 ++++++++----------------- orderer/multichain/chainsupport_test.go | 6 +- orderer/multichain/manager.go | 66 ++++++++++++--------- orderer/multichain/systemchain.go | 6 +- 4 files changed, 70 insertions(+), 84 deletions(-) diff --git a/orderer/multichain/chainsupport.go b/orderer/multichain/chainsupport.go index 29fdf9d94df..714add1a7f8 100644 --- a/orderer/multichain/chainsupport.go +++ b/orderer/multichain/chainsupport.go @@ -82,80 +82,68 @@ type ChainSupport interface { broadcast.Support ConsenterSupport - - // ConfigTxManager returns the corresponding configtx.Manager for this chain - ConfigTxManager() configtx.Manager } type chainSupport struct { - chain Chain - cutter blockcutter.Receiver - configManager configtx.Manager - policyManager policies.Manager - sharedConfigManager sharedconfig.Manager - ledger ordererledger.ReadWriter - filters *filter.RuleSet - signer crypto.LocalSigner - lastConfiguration uint64 - lastConfigSeq uint64 + *ledgerResources + chain Chain + cutter blockcutter.Receiver + filters *filter.RuleSet + signer crypto.LocalSigner + lastConfiguration uint64 + lastConfigSeq uint64 } func newChainSupport( filters *filter.RuleSet, - configManager configtx.Manager, - policyManager policies.Manager, - backing ordererledger.ReadWriter, - sharedConfigManager sharedconfig.Manager, + ledgerResources *ledgerResources, consenters map[string]Consenter, signer crypto.LocalSigner, ) *chainSupport { - cutter := blockcutter.NewReceiverImpl(sharedConfigManager, filters) - consenterType := sharedConfigManager.ConsensusType() + cutter := blockcutter.NewReceiverImpl(ledgerResources.SharedConfig(), filters) + consenterType := ledgerResources.SharedConfig().ConsensusType() consenter, ok := consenters[consenterType] if !ok { logger.Fatalf("Error retrieving consenter of type: %s", consenterType) } cs := &chainSupport{ - configManager: configManager, - policyManager: policyManager, - sharedConfigManager: sharedConfigManager, - cutter: cutter, - filters: filters, - ledger: backing, - signer: signer, + ledgerResources: ledgerResources, + cutter: cutter, + filters: filters, + signer: signer, } var err error cs.chain, err = consenter.HandleChain(cs) if err != nil { - logger.Fatalf("Error creating consenter for chain %x: %s", configManager.ChainID(), err) + logger.Fatalf("Error creating consenter for chain %x: %s", ledgerResources.ChainID(), err) } return cs } // createStandardFilters creates the set of filters for a normal (non-system) chain -func createStandardFilters(configManager configtx.Manager, policyManager policies.Manager, sharedConfig sharedconfig.Manager) *filter.RuleSet { +func createStandardFilters(ledgerResources *ledgerResources) *filter.RuleSet { return filter.NewRuleSet([]filter.Rule{ filter.EmptyRejectRule, - sizefilter.MaxBytesRule(sharedConfig.BatchSize().AbsoluteMaxBytes), - sigfilter.New(sharedConfig.IngressPolicyNames, policyManager), - configtx.NewFilter(configManager), + sizefilter.MaxBytesRule(ledgerResources.SharedConfig().BatchSize().AbsoluteMaxBytes), + sigfilter.New(ledgerResources.SharedConfig().IngressPolicyNames, ledgerResources.PolicyManager()), + configtx.NewFilter(ledgerResources), filter.AcceptRule, }) } // createSystemChainFilters creates the set of filters for the ordering system chain -func createSystemChainFilters(ml *multiLedger, configManager configtx.Manager, policyManager policies.Manager, sharedConfig sharedconfig.Manager) *filter.RuleSet { +func createSystemChainFilters(ml *multiLedger, ledgerResources *ledgerResources) *filter.RuleSet { return filter.NewRuleSet([]filter.Rule{ filter.EmptyRejectRule, - sizefilter.MaxBytesRule(sharedConfig.BatchSize().AbsoluteMaxBytes), - sigfilter.New(sharedConfig.IngressPolicyNames, policyManager), + sizefilter.MaxBytesRule(ledgerResources.SharedConfig().BatchSize().AbsoluteMaxBytes), + sigfilter.New(ledgerResources.SharedConfig().IngressPolicyNames, ledgerResources.PolicyManager()), newSystemChainFilter(ml), - configtx.NewFilter(configManager), + configtx.NewFilter(ledgerResources), filter.AcceptRule, }) } @@ -172,22 +160,6 @@ func (cs *chainSupport) Sign(message []byte) ([]byte, error) { return cs.signer.Sign(message) } -func (cs *chainSupport) SharedConfig() sharedconfig.Manager { - return cs.sharedConfigManager -} - -func (cs *chainSupport) ConfigTxManager() configtx.Manager { - return cs.configManager -} - -func (cs *chainSupport) ChainID() string { - return cs.configManager.ChainID() -} - -func (cs *chainSupport) PolicyManager() policies.Manager { - return cs.policyManager -} - func (cs *chainSupport) Filters() *filter.RuleSet { return cs.filters } @@ -231,7 +203,7 @@ func (cs *chainSupport) addBlockSignature(block *cb.Block) { } func (cs *chainSupport) addLastConfigSignature(block *cb.Block) { - configSeq := cs.configManager.Sequence() + configSeq := cs.Sequence() if configSeq > cs.lastConfigSeq { cs.lastConfiguration = block.Header.Number cs.lastConfigSeq = configSeq diff --git a/orderer/multichain/chainsupport_test.go b/orderer/multichain/chainsupport_test.go index 4d79e7cb38b..57d4fb6c080 100644 --- a/orderer/multichain/chainsupport_test.go +++ b/orderer/multichain/chainsupport_test.go @@ -64,7 +64,7 @@ func (mc *mockCommitter) Commit() { func TestCommitConfig(t *testing.T) { ml := &mockLedgerReadWriter{} cm := &mockconfigtx.Manager{} - cs := &chainSupport{ledger: ml, configManager: cm, signer: &xxxCryptoHelper{}} + cs := &chainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: &xxxCryptoHelper{}} txs := []*cb.Envelope{makeNormalTx("foo", 0), makeNormalTx("bar", 1)} committers := []filter.Committer{&mockCommitter{}, &mockCommitter{}} block := cs.CreateNextBlock(txs) @@ -89,7 +89,7 @@ func TestCommitConfig(t *testing.T) { func TestWriteBlockSignatures(t *testing.T) { ml := &mockLedgerReadWriter{} cm := &mockconfigtx.Manager{} - cs := &chainSupport{ledger: ml, configManager: cm, signer: &xxxCryptoHelper{}} + cs := &chainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: &xxxCryptoHelper{}} blockMetadata := func(block *cb.Block) *cb.Metadata { metadata, err := utils.GetMetadataFromBlock(block, cb.BlockMetadataIndex_SIGNATURES) @@ -107,7 +107,7 @@ func TestWriteBlockSignatures(t *testing.T) { func TestWriteLastConfiguration(t *testing.T) { ml := &mockLedgerReadWriter{} cm := &mockconfigtx.Manager{} - cs := &chainSupport{ledger: ml, configManager: cm, signer: &xxxCryptoHelper{}} + cs := &chainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: &xxxCryptoHelper{}} lastConfig := func(block *cb.Block) uint64 { index, err := utils.GetLastConfigurationIndexFromBlock(block) diff --git a/orderer/multichain/manager.go b/orderer/multichain/manager.go index 24d08067bc4..d127d8b5431 100644 --- a/orderer/multichain/manager.go +++ b/orderer/multichain/manager.go @@ -20,7 +20,6 @@ import ( "fmt" "github.com/hyperledger/fabric/common/configtx" - "github.com/hyperledger/fabric/common/policies" "github.com/hyperledger/fabric/orderer/common/sharedconfig" ordererledger "github.com/hyperledger/fabric/orderer/ledger" cb "github.com/hyperledger/fabric/protos/common" @@ -60,6 +59,20 @@ type Manager interface { ProposeChain(env *cb.Envelope) cb.Status } +type configResources struct { + configtx.Manager + sharedConfig sharedconfig.Manager +} + +func (cr *configResources) SharedConfig() sharedconfig.Manager { + return cr.sharedConfig +} + +type ledgerResources struct { + *configResources + ledger ordererledger.ReadWriter +} + type multiLedger struct { chains map[string]*chainSupport consenters map[string]Consenter @@ -101,19 +114,16 @@ func NewManagerImpl(ledgerFactory ordererledger.Factory, consenters map[string]C if configTx == nil { logger.Fatalf("Could not find configuration transaction for chain %s", chainID) } - configManager, policyManager, backingLedger, sharedConfigManager := ml.newResources(configTx) - chainID := configManager.ChainID() + ledgerResources := ml.newLedgerResources(configTx) + chainID := ledgerResources.ChainID() - if sharedConfigManager.ChainCreationPolicyNames() != nil { + if ledgerResources.SharedConfig().ChainCreationPolicyNames() != nil { if ml.sysChain != nil { logger.Fatalf("There appear to be two system chains %s and %s", ml.sysChain.support.ChainID(), chainID) } logger.Debugf("Starting with system chain: %x", chainID) - chain := newChainSupport(createSystemChainFilters(ml, configManager, policyManager, sharedConfigManager), - configManager, - policyManager, - backingLedger, - sharedConfigManager, + chain := newChainSupport(createSystemChainFilters(ml, ledgerResources), + ledgerResources, consenters, signer) ml.chains[string(chainID)] = chain @@ -122,11 +132,8 @@ func NewManagerImpl(ledgerFactory ordererledger.Factory, consenters map[string]C defer chain.start() } else { logger.Debugf("Starting chain: %x", chainID) - chain := newChainSupport(createStandardFilters(configManager, policyManager, sharedConfigManager), - configManager, - policyManager, - backingLedger, - sharedConfigManager, + chain := newChainSupport(createStandardFilters(ledgerResources), + ledgerResources, consenters, signer) ml.chains[string(chainID)] = chain @@ -151,19 +158,23 @@ func (ml *multiLedger) GetChain(chainID string) (ChainSupport, bool) { return cs, ok } -func newConfigTxManagerAndHandlers(configEnvelope *cb.ConfigurationEnvelope) (configtx.Manager, policies.Manager, sharedconfig.Manager, error) { - initializer := configtx.NewInitializer() +func newConfigResources(configEnvelope *cb.ConfigurationEnvelope) (*configResources, error) { sharedConfigManager := sharedconfig.NewManagerImpl() + initializer := configtx.NewInitializer() initializer.Handlers()[cb.ConfigurationItem_Orderer] = sharedConfigManager + configManager, err := configtx.NewManagerImpl(configEnvelope, initializer) if err != nil { - return nil, nil, nil, fmt.Errorf("Error unpacking configuration transaction: %s", err) + return nil, fmt.Errorf("Error unpacking configuration transaction: %s", err) } - return configManager, initializer.PolicyManager(), sharedConfigManager, nil + return &configResources{ + Manager: configManager, + sharedConfig: sharedConfigManager, + }, nil } -func (ml *multiLedger) newResources(configTx *cb.Envelope) (configtx.Manager, policies.Manager, ordererledger.ReadWriter, sharedconfig.Manager) { +func (ml *multiLedger) newLedgerResources(configTx *cb.Envelope) *ledgerResources { payload := &cb.Payload{} err := proto.Unmarshal(configTx.Payload, payload) if err != nil { @@ -176,20 +187,23 @@ func (ml *multiLedger) newResources(configTx *cb.Envelope) (configtx.Manager, po logger.Fatalf("Error unmarshaling a config transaction to config envelope: %s", err) } - configManager, policyManager, sharedConfigManager, err := newConfigTxManagerAndHandlers(configEnvelope) + configResources, err := newConfigResources(configEnvelope) if err != nil { logger.Fatalf("Error creating configtx manager and handlers: %s", err) } - chainID := configManager.ChainID() + chainID := configResources.ChainID() ledger, err := ml.ledgerFactory.GetOrCreate(chainID) if err != nil { logger.Fatalf("Error getting ledger for %s", chainID) } - return configManager, policyManager, ledger, sharedConfigManager + return &ledgerResources{ + configResources: configResources, + ledger: ledger, + } } func (ml *multiLedger) systemChain() *systemChain { @@ -197,8 +211,8 @@ func (ml *multiLedger) systemChain() *systemChain { } func (ml *multiLedger) newChain(configtx *cb.Envelope) { - configManager, policyManager, backingLedger, sharedConfig := ml.newResources(configtx) - backingLedger.Append(ordererledger.CreateNextBlock(backingLedger, []*cb.Envelope{configtx})) + ledgerResources := ml.newLedgerResources(configtx) + ledgerResources.ledger.Append(ordererledger.CreateNextBlock(ledgerResources.ledger, []*cb.Envelope{configtx})) // Copy the map to allow concurrent reads from broadcast/deliver while the new chainSupport is newChains := make(map[string]*chainSupport) @@ -206,8 +220,8 @@ func (ml *multiLedger) newChain(configtx *cb.Envelope) { newChains[key] = value } - cs := newChainSupport(createStandardFilters(configManager, policyManager, sharedConfig), configManager, policyManager, backingLedger, sharedConfig, ml.consenters, ml.signer) - chainID := configManager.ChainID() + cs := newChainSupport(createStandardFilters(ledgerResources), ledgerResources, ml.consenters, ml.signer) + chainID := ledgerResources.ChainID() logger.Debugf("Created and starting new chain %s", chainID) diff --git a/orderer/multichain/systemchain.go b/orderer/multichain/systemchain.go index e885d37b5cf..2289e4b1027 100644 --- a/orderer/multichain/systemchain.go +++ b/orderer/multichain/systemchain.go @@ -203,7 +203,7 @@ func (sc *systemChain) authorize(configEnvelope *cb.ConfigurationEnvelope) cb.St return cb.Status_SUCCESS } -func (sc *systemChain) inspect(configTxManager configtx.Manager, policyManager policies.Manager, sharedConfigManager sharedconfig.Manager) cb.Status { +func (sc *systemChain) inspect(configResources *configResources) cb.Status { // XXX decide what it is that we will require to be the same in the new configuration, and what will be allowed to be different // Are all keys allowed? etc. @@ -241,12 +241,12 @@ func (sc *systemChain) authorizeAndInspect(configTx *cb.Envelope) cb.Status { return status } - configTxManager, policyManager, sharedConfigManager, err := newConfigTxManagerAndHandlers(configEnvelope) + configResources, err := newConfigResources(configEnvelope) if err != nil { logger.Debugf("Failed to create config manager and handlers: %s", err) return cb.Status_BAD_REQUEST } // Make sure that the configuration does not modify any of the orderer - return sc.inspect(configTxManager, policyManager, sharedConfigManager) + return sc.inspect(configResources) }