Skip to content

Commit

Permalink
Merge "[FAB-1777] Refactor orderer multichain package"
Browse files Browse the repository at this point in the history
  • Loading branch information
Srinivasan Muralidharan authored and Gerrit Code Review committed Jan 23, 2017
2 parents d903c9c + 2cdafd0 commit b71ecc7
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 84 deletions.
76 changes: 24 additions & 52 deletions orderer/multichain/chainsupport.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,49 +85,37 @@ type ChainSupport interface {

broadcast.Support
ConsenterSupport

// ConfigTxManager returns the corresponding configtx.Manager for this chain
ConfigTxManager() configtx.Manager
}

type chainSupport struct {
chain Chain
cutter blockcutter.Receiver
configManager configtx.Manager
policyManager policies.Manager
sharedConfigManager sharedconfig.Manager
ledger ordererledger.ReadWriter
filters *filter.RuleSet
signer crypto.LocalSigner
lastConfiguration uint64
lastConfigSeq uint64
*ledgerResources
chain Chain
cutter blockcutter.Receiver
filters *filter.RuleSet
signer crypto.LocalSigner
lastConfiguration uint64
lastConfigSeq uint64
}

func newChainSupport(
filters *filter.RuleSet,
configManager configtx.Manager,
policyManager policies.Manager,
backing ordererledger.ReadWriter,
sharedConfigManager sharedconfig.Manager,
ledgerResources *ledgerResources,
consenters map[string]Consenter,
signer crypto.LocalSigner,
) *chainSupport {

cutter := blockcutter.NewReceiverImpl(sharedConfigManager, filters)
consenterType := sharedConfigManager.ConsensusType()
cutter := blockcutter.NewReceiverImpl(ledgerResources.SharedConfig(), filters)
consenterType := ledgerResources.SharedConfig().ConsensusType()
consenter, ok := consenters[consenterType]
if !ok {
logger.Fatalf("Error retrieving consenter of type: %s", consenterType)
}

cs := &chainSupport{
configManager: configManager,
policyManager: policyManager,
sharedConfigManager: sharedConfigManager,
cutter: cutter,
filters: filters,
ledger: backing,
signer: signer,
ledgerResources: ledgerResources,
cutter: cutter,
filters: filters,
signer: signer,
}

var err error
Expand All @@ -142,32 +130,32 @@ func newChainSupport(

cs.chain, err = consenter.HandleChain(cs, metadata)
if err != nil {
logger.Fatalf("Error creating consenter for chain %x: %s", configManager.ChainID(), err)
logger.Fatalf("Error creating consenter for chain %x: %s", ledgerResources.ChainID(), err)
}

return cs
}

// createStandardFilters creates the set of filters for a normal (non-system) chain
func createStandardFilters(configManager configtx.Manager, policyManager policies.Manager, sharedConfig sharedconfig.Manager) *filter.RuleSet {
func createStandardFilters(ledgerResources *ledgerResources) *filter.RuleSet {
return filter.NewRuleSet([]filter.Rule{
filter.EmptyRejectRule,
sizefilter.MaxBytesRule(sharedConfig.BatchSize().AbsoluteMaxBytes),
sigfilter.New(sharedConfig.IngressPolicyNames, policyManager),
configtx.NewFilter(configManager),
sizefilter.MaxBytesRule(ledgerResources.SharedConfig().BatchSize().AbsoluteMaxBytes),
sigfilter.New(ledgerResources.SharedConfig().IngressPolicyNames, ledgerResources.PolicyManager()),
configtx.NewFilter(ledgerResources),
filter.AcceptRule,
})

}

// createSystemChainFilters creates the set of filters for the ordering system chain
func createSystemChainFilters(ml *multiLedger, configManager configtx.Manager, policyManager policies.Manager, sharedConfig sharedconfig.Manager) *filter.RuleSet {
func createSystemChainFilters(ml *multiLedger, ledgerResources *ledgerResources) *filter.RuleSet {
return filter.NewRuleSet([]filter.Rule{
filter.EmptyRejectRule,
sizefilter.MaxBytesRule(sharedConfig.BatchSize().AbsoluteMaxBytes),
sigfilter.New(sharedConfig.IngressPolicyNames, policyManager),
sizefilter.MaxBytesRule(ledgerResources.SharedConfig().BatchSize().AbsoluteMaxBytes),
sigfilter.New(ledgerResources.SharedConfig().IngressPolicyNames, ledgerResources.PolicyManager()),
newSystemChainFilter(ml),
configtx.NewFilter(configManager),
configtx.NewFilter(ledgerResources),
filter.AcceptRule,
})
}
Expand All @@ -184,22 +172,6 @@ func (cs *chainSupport) Sign(message []byte) ([]byte, error) {
return cs.signer.Sign(message)
}

func (cs *chainSupport) SharedConfig() sharedconfig.Manager {
return cs.sharedConfigManager
}

func (cs *chainSupport) ConfigTxManager() configtx.Manager {
return cs.configManager
}

func (cs *chainSupport) ChainID() string {
return cs.configManager.ChainID()
}

func (cs *chainSupport) PolicyManager() policies.Manager {
return cs.policyManager
}

func (cs *chainSupport) Filters() *filter.RuleSet {
return cs.filters
}
Expand Down Expand Up @@ -243,7 +215,7 @@ func (cs *chainSupport) addBlockSignature(block *cb.Block) {
}

func (cs *chainSupport) addLastConfigSignature(block *cb.Block) {
configSeq := cs.configManager.Sequence()
configSeq := cs.Sequence()
if configSeq > cs.lastConfigSeq {
cs.lastConfiguration = block.Header.Number
cs.lastConfigSeq = configSeq
Expand Down
6 changes: 3 additions & 3 deletions orderer/multichain/chainsupport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (mc *mockCommitter) Commit() {
func TestCommitConfig(t *testing.T) {
ml := &mockLedgerReadWriter{}
cm := &mockconfigtx.Manager{}
cs := &chainSupport{ledger: ml, configManager: cm, signer: &xxxCryptoHelper{}}
cs := &chainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: &xxxCryptoHelper{}}
txs := []*cb.Envelope{makeNormalTx("foo", 0), makeNormalTx("bar", 1)}
committers := []filter.Committer{&mockCommitter{}, &mockCommitter{}}
block := cs.CreateNextBlock(txs)
Expand All @@ -90,7 +90,7 @@ func TestCommitConfig(t *testing.T) {
func TestWriteBlockSignatures(t *testing.T) {
ml := &mockLedgerReadWriter{}
cm := &mockconfigtx.Manager{}
cs := &chainSupport{ledger: ml, configManager: cm, signer: &xxxCryptoHelper{}}
cs := &chainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: &xxxCryptoHelper{}}

if utils.GetMetadataFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(0, nil), nil, nil), cb.BlockMetadataIndex_SIGNATURES) == nil {
t.Fatalf("Block should have block signature")
Expand Down Expand Up @@ -118,7 +118,7 @@ func TestWriteBlockOrdererMetadata(t *testing.T) {
func TestWriteLastConfiguration(t *testing.T) {
ml := &mockLedgerReadWriter{}
cm := &mockconfigtx.Manager{}
cs := &chainSupport{ledger: ml, configManager: cm, signer: &xxxCryptoHelper{}}
cs := &chainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: &xxxCryptoHelper{}}

expected := uint64(0)

Expand Down
66 changes: 40 additions & 26 deletions orderer/multichain/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"

"github.com/hyperledger/fabric/common/configtx"
"github.com/hyperledger/fabric/common/policies"
"github.com/hyperledger/fabric/orderer/common/sharedconfig"
ordererledger "github.com/hyperledger/fabric/orderer/ledger"
cb "github.com/hyperledger/fabric/protos/common"
Expand Down Expand Up @@ -60,6 +59,20 @@ type Manager interface {
ProposeChain(env *cb.Envelope) cb.Status
}

type configResources struct {
configtx.Manager
sharedConfig sharedconfig.Manager
}

func (cr *configResources) SharedConfig() sharedconfig.Manager {
return cr.sharedConfig
}

type ledgerResources struct {
*configResources
ledger ordererledger.ReadWriter
}

type multiLedger struct {
chains map[string]*chainSupport
consenters map[string]Consenter
Expand Down Expand Up @@ -101,19 +114,16 @@ func NewManagerImpl(ledgerFactory ordererledger.Factory, consenters map[string]C
if configTx == nil {
logger.Fatalf("Could not find configuration transaction for chain %s", chainID)
}
configManager, policyManager, backingLedger, sharedConfigManager := ml.newResources(configTx)
chainID := configManager.ChainID()
ledgerResources := ml.newLedgerResources(configTx)
chainID := ledgerResources.ChainID()

if sharedConfigManager.ChainCreationPolicyNames() != nil {
if ledgerResources.SharedConfig().ChainCreationPolicyNames() != nil {
if ml.sysChain != nil {
logger.Fatalf("There appear to be two system chains %s and %s", ml.sysChain.support.ChainID(), chainID)
}
logger.Debugf("Starting with system chain: %x", chainID)
chain := newChainSupport(createSystemChainFilters(ml, configManager, policyManager, sharedConfigManager),
configManager,
policyManager,
backingLedger,
sharedConfigManager,
chain := newChainSupport(createSystemChainFilters(ml, ledgerResources),
ledgerResources,
consenters,
signer)
ml.chains[string(chainID)] = chain
Expand All @@ -122,11 +132,8 @@ func NewManagerImpl(ledgerFactory ordererledger.Factory, consenters map[string]C
defer chain.start()
} else {
logger.Debugf("Starting chain: %x", chainID)
chain := newChainSupport(createStandardFilters(configManager, policyManager, sharedConfigManager),
configManager,
policyManager,
backingLedger,
sharedConfigManager,
chain := newChainSupport(createStandardFilters(ledgerResources),
ledgerResources,
consenters,
signer)
ml.chains[string(chainID)] = chain
Expand Down Expand Up @@ -155,19 +162,23 @@ func (ml *multiLedger) GetChain(chainID string) (ChainSupport, bool) {
return cs, ok
}

func newConfigTxManagerAndHandlers(configEnvelope *cb.ConfigurationEnvelope) (configtx.Manager, policies.Manager, sharedconfig.Manager, error) {
initializer := configtx.NewInitializer()
func newConfigResources(configEnvelope *cb.ConfigurationEnvelope) (*configResources, error) {
sharedConfigManager := sharedconfig.NewManagerImpl()
initializer := configtx.NewInitializer()
initializer.Handlers()[cb.ConfigurationItem_Orderer] = sharedConfigManager

configManager, err := configtx.NewManagerImpl(configEnvelope, initializer)
if err != nil {
return nil, nil, nil, fmt.Errorf("Error unpacking configuration transaction: %s", err)
return nil, fmt.Errorf("Error unpacking configuration transaction: %s", err)
}

return configManager, initializer.PolicyManager(), sharedConfigManager, nil
return &configResources{
Manager: configManager,
sharedConfig: sharedConfigManager,
}, nil
}

func (ml *multiLedger) newResources(configTx *cb.Envelope) (configtx.Manager, policies.Manager, ordererledger.ReadWriter, sharedconfig.Manager) {
func (ml *multiLedger) newLedgerResources(configTx *cb.Envelope) *ledgerResources {
payload := &cb.Payload{}
err := proto.Unmarshal(configTx.Payload, payload)
if err != nil {
Expand All @@ -180,38 +191,41 @@ func (ml *multiLedger) newResources(configTx *cb.Envelope) (configtx.Manager, po
logger.Fatalf("Error unmarshaling a config transaction to config envelope: %s", err)
}

configManager, policyManager, sharedConfigManager, err := newConfigTxManagerAndHandlers(configEnvelope)
configResources, err := newConfigResources(configEnvelope)

if err != nil {
logger.Fatalf("Error creating configtx manager and handlers: %s", err)
}

chainID := configManager.ChainID()
chainID := configResources.ChainID()

ledger, err := ml.ledgerFactory.GetOrCreate(chainID)
if err != nil {
logger.Fatalf("Error getting ledger for %s", chainID)
}

return configManager, policyManager, ledger, sharedConfigManager
return &ledgerResources{
configResources: configResources,
ledger: ledger,
}
}

func (ml *multiLedger) systemChain() *systemChain {
return ml.sysChain
}

func (ml *multiLedger) newChain(configtx *cb.Envelope) {
configManager, policyManager, backingLedger, sharedConfig := ml.newResources(configtx)
backingLedger.Append(ordererledger.CreateNextBlock(backingLedger, []*cb.Envelope{configtx}))
ledgerResources := ml.newLedgerResources(configtx)
ledgerResources.ledger.Append(ordererledger.CreateNextBlock(ledgerResources.ledger, []*cb.Envelope{configtx}))

// Copy the map to allow concurrent reads from broadcast/deliver while the new chainSupport is
newChains := make(map[string]*chainSupport)
for key, value := range ml.chains {
newChains[key] = value
}

cs := newChainSupport(createStandardFilters(configManager, policyManager, sharedConfig), configManager, policyManager, backingLedger, sharedConfig, ml.consenters, ml.signer)
chainID := configManager.ChainID()
cs := newChainSupport(createStandardFilters(ledgerResources), ledgerResources, ml.consenters, ml.signer)
chainID := ledgerResources.ChainID()

logger.Debugf("Created and starting new chain %s", chainID)

Expand Down
6 changes: 3 additions & 3 deletions orderer/multichain/systemchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (sc *systemChain) authorize(configEnvelope *cb.ConfigurationEnvelope) cb.St
return cb.Status_SUCCESS
}

func (sc *systemChain) inspect(configTxManager configtx.Manager, policyManager policies.Manager, sharedConfigManager sharedconfig.Manager) cb.Status {
func (sc *systemChain) inspect(configResources *configResources) cb.Status {
// XXX decide what it is that we will require to be the same in the new configuration, and what will be allowed to be different
// Are all keys allowed? etc.

Expand Down Expand Up @@ -241,12 +241,12 @@ func (sc *systemChain) authorizeAndInspect(configTx *cb.Envelope) cb.Status {
return status
}

configTxManager, policyManager, sharedConfigManager, err := newConfigTxManagerAndHandlers(configEnvelope)
configResources, err := newConfigResources(configEnvelope)
if err != nil {
logger.Debugf("Failed to create config manager and handlers: %s", err)
return cb.Status_BAD_REQUEST
}

// Make sure that the configuration does not modify any of the orderer
return sc.inspect(configTxManager, policyManager, sharedConfigManager)
return sc.inspect(configResources)
}

0 comments on commit b71ecc7

Please sign in to comment.