diff --git a/orderer/multichain/chainsupport.go b/orderer/multichain/chainsupport.go index 29fdf9d94df..714add1a7f8 100644 --- a/orderer/multichain/chainsupport.go +++ b/orderer/multichain/chainsupport.go @@ -82,80 +82,68 @@ type ChainSupport interface { broadcast.Support ConsenterSupport - - // ConfigTxManager returns the corresponding configtx.Manager for this chain - ConfigTxManager() configtx.Manager } type chainSupport struct { - chain Chain - cutter blockcutter.Receiver - configManager configtx.Manager - policyManager policies.Manager - sharedConfigManager sharedconfig.Manager - ledger ordererledger.ReadWriter - filters *filter.RuleSet - signer crypto.LocalSigner - lastConfiguration uint64 - lastConfigSeq uint64 + *ledgerResources + chain Chain + cutter blockcutter.Receiver + filters *filter.RuleSet + signer crypto.LocalSigner + lastConfiguration uint64 + lastConfigSeq uint64 } func newChainSupport( filters *filter.RuleSet, - configManager configtx.Manager, - policyManager policies.Manager, - backing ordererledger.ReadWriter, - sharedConfigManager sharedconfig.Manager, + ledgerResources *ledgerResources, consenters map[string]Consenter, signer crypto.LocalSigner, ) *chainSupport { - cutter := blockcutter.NewReceiverImpl(sharedConfigManager, filters) - consenterType := sharedConfigManager.ConsensusType() + cutter := blockcutter.NewReceiverImpl(ledgerResources.SharedConfig(), filters) + consenterType := ledgerResources.SharedConfig().ConsensusType() consenter, ok := consenters[consenterType] if !ok { logger.Fatalf("Error retrieving consenter of type: %s", consenterType) } cs := &chainSupport{ - configManager: configManager, - policyManager: policyManager, - sharedConfigManager: sharedConfigManager, - cutter: cutter, - filters: filters, - ledger: backing, - signer: signer, + ledgerResources: ledgerResources, + cutter: cutter, + filters: filters, + signer: signer, } var err error cs.chain, err = consenter.HandleChain(cs) if err != nil { - logger.Fatalf("Error creating consenter for chain %x: %s", configManager.ChainID(), err) + logger.Fatalf("Error creating consenter for chain %x: %s", ledgerResources.ChainID(), err) } return cs } // createStandardFilters creates the set of filters for a normal (non-system) chain -func createStandardFilters(configManager configtx.Manager, policyManager policies.Manager, sharedConfig sharedconfig.Manager) *filter.RuleSet { +func createStandardFilters(ledgerResources *ledgerResources) *filter.RuleSet { return filter.NewRuleSet([]filter.Rule{ filter.EmptyRejectRule, - sizefilter.MaxBytesRule(sharedConfig.BatchSize().AbsoluteMaxBytes), - sigfilter.New(sharedConfig.IngressPolicyNames, policyManager), - configtx.NewFilter(configManager), + sizefilter.MaxBytesRule(ledgerResources.SharedConfig().BatchSize().AbsoluteMaxBytes), + sigfilter.New(ledgerResources.SharedConfig().IngressPolicyNames, ledgerResources.PolicyManager()), + configtx.NewFilter(ledgerResources), filter.AcceptRule, }) } // createSystemChainFilters creates the set of filters for the ordering system chain -func createSystemChainFilters(ml *multiLedger, configManager configtx.Manager, policyManager policies.Manager, sharedConfig sharedconfig.Manager) *filter.RuleSet { +func createSystemChainFilters(ml *multiLedger, ledgerResources *ledgerResources) *filter.RuleSet { return filter.NewRuleSet([]filter.Rule{ filter.EmptyRejectRule, - sizefilter.MaxBytesRule(sharedConfig.BatchSize().AbsoluteMaxBytes), - sigfilter.New(sharedConfig.IngressPolicyNames, policyManager), + sizefilter.MaxBytesRule(ledgerResources.SharedConfig().BatchSize().AbsoluteMaxBytes), + sigfilter.New(ledgerResources.SharedConfig().IngressPolicyNames, ledgerResources.PolicyManager()), newSystemChainFilter(ml), - configtx.NewFilter(configManager), + configtx.NewFilter(ledgerResources), filter.AcceptRule, }) } @@ -172,22 +160,6 @@ func (cs *chainSupport) Sign(message []byte) ([]byte, error) { return cs.signer.Sign(message) } -func (cs *chainSupport) SharedConfig() sharedconfig.Manager { - return cs.sharedConfigManager -} - -func (cs *chainSupport) ConfigTxManager() configtx.Manager { - return cs.configManager -} - -func (cs *chainSupport) ChainID() string { - return cs.configManager.ChainID() -} - -func (cs *chainSupport) PolicyManager() policies.Manager { - return cs.policyManager -} - func (cs *chainSupport) Filters() *filter.RuleSet { return cs.filters } @@ -231,7 +203,7 @@ func (cs *chainSupport) addBlockSignature(block *cb.Block) { } func (cs *chainSupport) addLastConfigSignature(block *cb.Block) { - configSeq := cs.configManager.Sequence() + configSeq := cs.Sequence() if configSeq > cs.lastConfigSeq { cs.lastConfiguration = block.Header.Number cs.lastConfigSeq = configSeq diff --git a/orderer/multichain/chainsupport_test.go b/orderer/multichain/chainsupport_test.go index 4d79e7cb38b..57d4fb6c080 100644 --- a/orderer/multichain/chainsupport_test.go +++ b/orderer/multichain/chainsupport_test.go @@ -64,7 +64,7 @@ func (mc *mockCommitter) Commit() { func TestCommitConfig(t *testing.T) { ml := &mockLedgerReadWriter{} cm := &mockconfigtx.Manager{} - cs := &chainSupport{ledger: ml, configManager: cm, signer: &xxxCryptoHelper{}} + cs := &chainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: &xxxCryptoHelper{}} txs := []*cb.Envelope{makeNormalTx("foo", 0), makeNormalTx("bar", 1)} committers := []filter.Committer{&mockCommitter{}, &mockCommitter{}} block := cs.CreateNextBlock(txs) @@ -89,7 +89,7 @@ func TestCommitConfig(t *testing.T) { func TestWriteBlockSignatures(t *testing.T) { ml := &mockLedgerReadWriter{} cm := &mockconfigtx.Manager{} - cs := &chainSupport{ledger: ml, configManager: cm, signer: &xxxCryptoHelper{}} + cs := &chainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: &xxxCryptoHelper{}} blockMetadata := func(block *cb.Block) *cb.Metadata { metadata, err := utils.GetMetadataFromBlock(block, cb.BlockMetadataIndex_SIGNATURES) @@ -107,7 +107,7 @@ func TestWriteBlockSignatures(t *testing.T) { func TestWriteLastConfiguration(t *testing.T) { ml := &mockLedgerReadWriter{} cm := &mockconfigtx.Manager{} - cs := &chainSupport{ledger: ml, configManager: cm, signer: &xxxCryptoHelper{}} + cs := &chainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: &xxxCryptoHelper{}} lastConfig := func(block *cb.Block) uint64 { index, err := utils.GetLastConfigurationIndexFromBlock(block) diff --git a/orderer/multichain/manager.go b/orderer/multichain/manager.go index 24d08067bc4..d127d8b5431 100644 --- a/orderer/multichain/manager.go +++ b/orderer/multichain/manager.go @@ -20,7 +20,6 @@ import ( "fmt" "github.com/hyperledger/fabric/common/configtx" - "github.com/hyperledger/fabric/common/policies" "github.com/hyperledger/fabric/orderer/common/sharedconfig" ordererledger "github.com/hyperledger/fabric/orderer/ledger" cb "github.com/hyperledger/fabric/protos/common" @@ -60,6 +59,20 @@ type Manager interface { ProposeChain(env *cb.Envelope) cb.Status } +type configResources struct { + configtx.Manager + sharedConfig sharedconfig.Manager +} + +func (cr *configResources) SharedConfig() sharedconfig.Manager { + return cr.sharedConfig +} + +type ledgerResources struct { + *configResources + ledger ordererledger.ReadWriter +} + type multiLedger struct { chains map[string]*chainSupport consenters map[string]Consenter @@ -101,19 +114,16 @@ func NewManagerImpl(ledgerFactory ordererledger.Factory, consenters map[string]C if configTx == nil { logger.Fatalf("Could not find configuration transaction for chain %s", chainID) } - configManager, policyManager, backingLedger, sharedConfigManager := ml.newResources(configTx) - chainID := configManager.ChainID() + ledgerResources := ml.newLedgerResources(configTx) + chainID := ledgerResources.ChainID() - if sharedConfigManager.ChainCreationPolicyNames() != nil { + if ledgerResources.SharedConfig().ChainCreationPolicyNames() != nil { if ml.sysChain != nil { logger.Fatalf("There appear to be two system chains %s and %s", ml.sysChain.support.ChainID(), chainID) } logger.Debugf("Starting with system chain: %x", chainID) - chain := newChainSupport(createSystemChainFilters(ml, configManager, policyManager, sharedConfigManager), - configManager, - policyManager, - backingLedger, - sharedConfigManager, + chain := newChainSupport(createSystemChainFilters(ml, ledgerResources), + ledgerResources, consenters, signer) ml.chains[string(chainID)] = chain @@ -122,11 +132,8 @@ func NewManagerImpl(ledgerFactory ordererledger.Factory, consenters map[string]C defer chain.start() } else { logger.Debugf("Starting chain: %x", chainID) - chain := newChainSupport(createStandardFilters(configManager, policyManager, sharedConfigManager), - configManager, - policyManager, - backingLedger, - sharedConfigManager, + chain := newChainSupport(createStandardFilters(ledgerResources), + ledgerResources, consenters, signer) ml.chains[string(chainID)] = chain @@ -151,19 +158,23 @@ func (ml *multiLedger) GetChain(chainID string) (ChainSupport, bool) { return cs, ok } -func newConfigTxManagerAndHandlers(configEnvelope *cb.ConfigurationEnvelope) (configtx.Manager, policies.Manager, sharedconfig.Manager, error) { - initializer := configtx.NewInitializer() +func newConfigResources(configEnvelope *cb.ConfigurationEnvelope) (*configResources, error) { sharedConfigManager := sharedconfig.NewManagerImpl() + initializer := configtx.NewInitializer() initializer.Handlers()[cb.ConfigurationItem_Orderer] = sharedConfigManager + configManager, err := configtx.NewManagerImpl(configEnvelope, initializer) if err != nil { - return nil, nil, nil, fmt.Errorf("Error unpacking configuration transaction: %s", err) + return nil, fmt.Errorf("Error unpacking configuration transaction: %s", err) } - return configManager, initializer.PolicyManager(), sharedConfigManager, nil + return &configResources{ + Manager: configManager, + sharedConfig: sharedConfigManager, + }, nil } -func (ml *multiLedger) newResources(configTx *cb.Envelope) (configtx.Manager, policies.Manager, ordererledger.ReadWriter, sharedconfig.Manager) { +func (ml *multiLedger) newLedgerResources(configTx *cb.Envelope) *ledgerResources { payload := &cb.Payload{} err := proto.Unmarshal(configTx.Payload, payload) if err != nil { @@ -176,20 +187,23 @@ func (ml *multiLedger) newResources(configTx *cb.Envelope) (configtx.Manager, po logger.Fatalf("Error unmarshaling a config transaction to config envelope: %s", err) } - configManager, policyManager, sharedConfigManager, err := newConfigTxManagerAndHandlers(configEnvelope) + configResources, err := newConfigResources(configEnvelope) if err != nil { logger.Fatalf("Error creating configtx manager and handlers: %s", err) } - chainID := configManager.ChainID() + chainID := configResources.ChainID() ledger, err := ml.ledgerFactory.GetOrCreate(chainID) if err != nil { logger.Fatalf("Error getting ledger for %s", chainID) } - return configManager, policyManager, ledger, sharedConfigManager + return &ledgerResources{ + configResources: configResources, + ledger: ledger, + } } func (ml *multiLedger) systemChain() *systemChain { @@ -197,8 +211,8 @@ func (ml *multiLedger) systemChain() *systemChain { } func (ml *multiLedger) newChain(configtx *cb.Envelope) { - configManager, policyManager, backingLedger, sharedConfig := ml.newResources(configtx) - backingLedger.Append(ordererledger.CreateNextBlock(backingLedger, []*cb.Envelope{configtx})) + ledgerResources := ml.newLedgerResources(configtx) + ledgerResources.ledger.Append(ordererledger.CreateNextBlock(ledgerResources.ledger, []*cb.Envelope{configtx})) // Copy the map to allow concurrent reads from broadcast/deliver while the new chainSupport is newChains := make(map[string]*chainSupport) @@ -206,8 +220,8 @@ func (ml *multiLedger) newChain(configtx *cb.Envelope) { newChains[key] = value } - cs := newChainSupport(createStandardFilters(configManager, policyManager, sharedConfig), configManager, policyManager, backingLedger, sharedConfig, ml.consenters, ml.signer) - chainID := configManager.ChainID() + cs := newChainSupport(createStandardFilters(ledgerResources), ledgerResources, ml.consenters, ml.signer) + chainID := ledgerResources.ChainID() logger.Debugf("Created and starting new chain %s", chainID) diff --git a/orderer/multichain/systemchain.go b/orderer/multichain/systemchain.go index e885d37b5cf..2289e4b1027 100644 --- a/orderer/multichain/systemchain.go +++ b/orderer/multichain/systemchain.go @@ -203,7 +203,7 @@ func (sc *systemChain) authorize(configEnvelope *cb.ConfigurationEnvelope) cb.St return cb.Status_SUCCESS } -func (sc *systemChain) inspect(configTxManager configtx.Manager, policyManager policies.Manager, sharedConfigManager sharedconfig.Manager) cb.Status { +func (sc *systemChain) inspect(configResources *configResources) cb.Status { // XXX decide what it is that we will require to be the same in the new configuration, and what will be allowed to be different // Are all keys allowed? etc. @@ -241,12 +241,12 @@ func (sc *systemChain) authorizeAndInspect(configTx *cb.Envelope) cb.Status { return status } - configTxManager, policyManager, sharedConfigManager, err := newConfigTxManagerAndHandlers(configEnvelope) + configResources, err := newConfigResources(configEnvelope) if err != nil { logger.Debugf("Failed to create config manager and handlers: %s", err) return cb.Status_BAD_REQUEST } // Make sure that the configuration does not modify any of the orderer - return sc.inspect(configTxManager, policyManager, sharedConfigManager) + return sc.inspect(configResources) }