diff --git a/orderer/common/broadcast/broadcast.go b/orderer/common/broadcast/broadcast.go index aff005d7607..3eedf55da15 100644 --- a/orderer/common/broadcast/broadcast.go +++ b/orderer/common/broadcast/broadcast.go @@ -39,7 +39,13 @@ type Handler interface { // SupportManager provides a way for the Handler to look up the Support for a chain type SupportManager interface { + // GetChain gets the chain support for a given ChainID GetChain(chainID string) (Support, bool) + + // ProposeChain accepts a configuration transaction for a chain which does not already exists + // The status returned is whether the proposal is accepted for consideration, only after consensus + // occurs will the proposal be committed or rejected + ProposeChain(env *cb.Envelope) cb.Status } // Support provides the backing resources needed to support broadcast on a chain @@ -125,21 +131,27 @@ func (b *broadcaster) queueEnvelopes(srv ab.AtomicBroadcast_BroadcastServer) err support, ok := b.bs.sm.GetChain(payload.Header.ChainHeader.ChainID) if !ok { - // XXX Hook in chain creation logic here - panic("Unimplemented") - } - - _, filterErr := support.Filters().Apply(msg) - - if filterErr != nil { - logger.Debugf("Rejecting broadcast message") - err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST}) + // Chain not found, maybe create one? + if payload.Header.ChainHeader.Type != int32(cb.HeaderType_CONFIGURATION_TRANSACTION) { + err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_NOT_FOUND}) + } else { + logger.Debugf("Proposing new chain") + err = srv.Send(&ab.BroadcastResponse{Status: b.bs.sm.ProposeChain(msg)}) + } } else { - select { - case b.queue <- &msgAndSupport{msg: msg, support: support}: - err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS}) - default: - err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE}) + // Normal transaction for existing chain + _, filterErr := support.Filters().Apply(msg) + + if filterErr != nil { + logger.Debugf("Rejecting broadcast message") + err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST}) + } else { + select { + case b.queue <- &msgAndSupport{msg: msg, support: support}: + err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS}) + default: + err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE}) + } } } diff --git a/orderer/common/broadcast/broadcast_test.go b/orderer/common/broadcast/broadcast_test.go index b42df39cbdc..32240382a54 100644 --- a/orderer/common/broadcast/broadcast_test.go +++ b/orderer/common/broadcast/broadcast_test.go @@ -20,10 +20,10 @@ import ( "fmt" "testing" - "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric/orderer/common/filter" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" + "github.com/hyperledger/fabric/protos/utils" "google.golang.org/grpc" ) @@ -65,6 +65,19 @@ func (mm *mockSupportManager) GetChain(chainID string) (Support, bool) { return chain, ok } +func (mm *mockSupportManager) ProposeChain(configTx *cb.Envelope) cb.Status { + payload := utils.ExtractPayloadOrPanic(configTx) + + mm.chains[string(payload.Header.ChainHeader.ChainID)] = &mockSupport{ + filters: filter.NewRuleSet([]filter.Rule{ + filter.EmptyRejectRule, + filter.AcceptRule, + }), + queue: make(chan *cb.Envelope), + } + return cb.Status_SUCCESS +} + func (mm *mockSupportManager) halt() { for _, chain := range mm.chains { chain.halt() @@ -95,6 +108,21 @@ func (ms *mockSupport) halt() { } } +func makeConfigMessage(chainID string) *cb.Envelope { + payload := &cb.Payload{ + Data: utils.MarshalOrPanic(&cb.ConfigurationEnvelope{}), + Header: &cb.Header{ + ChainHeader: &cb.ChainHeader{ + ChainID: chainID, + Type: int32(cb.HeaderType_CONFIGURATION_TRANSACTION), + }, + }, + } + return &cb.Envelope{ + Payload: utils.MarshalOrPanic(payload), + } +} + func makeMessage(chainID string, data []byte) *cb.Envelope { payload := &cb.Payload{ Data: data, @@ -104,14 +132,9 @@ func makeMessage(chainID string, data []byte) *cb.Envelope { }, }, } - data, err := proto.Marshal(payload) - if err != nil { - panic(err) + return &cb.Envelope{ + Payload: utils.MarshalOrPanic(payload), } - env := &cb.Envelope{ - Payload: data, - } - return env } func getMultichainManager() *mockSupportManager { @@ -200,3 +223,44 @@ func TestEmptyEnvelope(t *testing.T) { } } + +func TestBadChainID(t *testing.T) { + mm := getMultichainManager() + defer mm.halt() + bh := NewHandlerImpl(mm, 2) + m := newMockB() + defer close(m.recvChan) + go bh.Handle(m) + + m.recvChan <- makeMessage("Wrong chain", []byte("Some bytes")) + reply := <-m.sendChan + if reply.Status != cb.Status_NOT_FOUND { + t.Fatalf("Should have rejected message to a chain which does not exist") + } + +} + +func TestNewChainID(t *testing.T) { + mm := getMultichainManager() + defer mm.halt() + bh := NewHandlerImpl(mm, 2) + m := newMockB() + defer close(m.recvChan) + go bh.Handle(m) + + m.recvChan <- makeConfigMessage("New chain") + reply := <-m.sendChan + if reply.Status != cb.Status_SUCCESS { + t.Fatalf("Should have created a new chain, got %d", reply.Status) + } + + if len(mm.chains) != 2 { + t.Fatalf("Should have created a new chain") + } + + m.recvChan <- makeMessage("New chain", []byte("Some bytes")) + reply = <-m.sendChan + if reply.Status != cb.Status_SUCCESS { + t.Fatalf("Should have successfully sent message to new chain") + } +} diff --git a/orderer/multichain/chainsupport.go b/orderer/multichain/chainsupport.go index c7bd6e0d424..69df30dad5d 100644 --- a/orderer/multichain/chainsupport.go +++ b/orderer/multichain/chainsupport.go @@ -67,6 +67,12 @@ type ChainSupport interface { broadcast.Support deliver.Support ConsenterSupport + + // ChainID returns the ChainID for this chain support + ChainID() string + + // ConfigTxManager returns the corresponding configtx.Manager for this chain + ConfigTxManager() configtx.Manager } type chainSupport struct { @@ -79,9 +85,16 @@ type chainSupport struct { filters *filter.RuleSet } -func newChainSupport(configManager configtx.Manager, policyManager policies.Manager, backing rawledger.ReadWriter, sharedConfigManager sharedconfig.Manager, consenters map[string]Consenter) *chainSupport { +func newChainSupport( + filters *filter.RuleSet, + configManager configtx.Manager, + policyManager policies.Manager, + backing rawledger.ReadWriter, + sharedConfigManager sharedconfig.Manager, + consenters map[string]Consenter, +) *chainSupport { + batchSize := sharedConfigManager.BatchSize() // XXX this needs to be pushed deeper so that the blockcutter queries it after each write for reconfiguration support - filters := createBroadcastRuleset(configManager) cutter := blockcutter.NewReceiverImpl(batchSize, filters) consenterType := sharedConfigManager.ConsensusType() consenter, ok := consenters[consenterType] @@ -107,12 +120,24 @@ func newChainSupport(configManager configtx.Manager, policyManager policies.Mana return cs } -func createBroadcastRuleset(configManager configtx.Manager) *filter.RuleSet { +// createStandardFilters creates the set of filters for a normal (non-system) chain +func createStandardFilters(configManager configtx.Manager) *filter.RuleSet { return filter.NewRuleSet([]filter.Rule{ filter.EmptyRejectRule, configtx.NewFilter(configManager), filter.AcceptRule, }) + +} + +// createSystemChainFilters creates the set of filters for the ordering system chain +func createSystemChainFilters(ml *multiLedger, configManager configtx.Manager) *filter.RuleSet { + return filter.NewRuleSet([]filter.Rule{ + filter.EmptyRejectRule, + newSystemChainFilter(ml), + configtx.NewFilter(configManager), + filter.AcceptRule, + }) } func (cs *chainSupport) start() { @@ -123,10 +148,14 @@ func (cs *chainSupport) SharedConfig() sharedconfig.Manager { return cs.sharedConfigManager } -func (cs *chainSupport) ConfigManager() configtx.Manager { +func (cs *chainSupport) ConfigTxManager() configtx.Manager { return cs.configManager } +func (cs *chainSupport) ChainID() string { + return cs.configManager.ChainID() +} + func (cs *chainSupport) PolicyManager() policies.Manager { return cs.policyManager } diff --git a/orderer/multichain/manager.go b/orderer/multichain/manager.go index 6d5d3635b92..8fe25c25ec8 100644 --- a/orderer/multichain/manager.go +++ b/orderer/multichain/manager.go @@ -17,7 +17,7 @@ limitations under the License. package multichain import ( - "sync" + "fmt" "github.com/hyperledger/fabric/orderer/common/configtx" "github.com/hyperledger/fabric/orderer/common/policies" @@ -48,13 +48,18 @@ func init() { type Manager interface { // GetChain retrieves the chain support for a chain (and whether it exists) GetChain(chainID string) (ChainSupport, bool) + + // ProposeChain accepts a configuration transaction for a chain which does not already exists + // The status returned is whether the proposal is accepted for consideration, only after consensus + // occurs will the proposal be committed or rejected + ProposeChain(env *cb.Envelope) cb.Status } type multiLedger struct { chains map[string]*chainSupport consenters map[string]Consenter ledgerFactory rawledger.Factory - mutex sync.Mutex + sysChain *systemChain } // getConfigTx, this should ultimately be done more intelligently, but for now, we search the whole chain for txs and pick the last config one @@ -104,6 +109,7 @@ func NewManagerImpl(ledgerFactory rawledger.Factory, consenters map[string]Conse ml := &multiLedger{ chains: make(map[string]*chainSupport), ledgerFactory: ledgerFactory, + consenters: consenters, } existingChains := ledgerFactory.ChainIDs() @@ -118,23 +124,43 @@ func NewManagerImpl(ledgerFactory rawledger.Factory, consenters map[string]Conse } configManager, policyManager, backingLedger, sharedConfigManager := ml.newResources(configTx) chainID := configManager.ChainID() - ml.chains[chainID] = newChainSupport(configManager, policyManager, backingLedger, sharedConfigManager, consenters) - } - for _, cs := range ml.chains { - cs.start() + if sharedConfigManager.ChainCreators() != nil { + if ml.sysChain != nil { + logger.Fatalf("There appear to be two system chains %x and %x", ml.sysChain.support.ChainID(), chainID) + } + logger.Debugf("Starting with system chain: %x", chainID) + chain := newChainSupport(createSystemChainFilters(ml, configManager), configManager, policyManager, backingLedger, sharedConfigManager, consenters) + ml.chains[string(chainID)] = chain + ml.sysChain = newSystemChain(chain) + // We delay starting this chain, as it might try to copy and replace the chains map via newChain before the map is fully built + defer chain.start() + } else { + logger.Debugf("Starting chain: %x", chainID) + chain := newChainSupport(createStandardFilters(configManager), configManager, policyManager, backingLedger, sharedConfigManager, consenters) + ml.chains[string(chainID)] = chain + chain.start() + } + } return ml } +// ProposeChain accepts a configuration transaction for a chain which does not already exists +// The status returned is whether the proposal is accepted for consideration, only after consensus +// occurs will the proposal be committed or rejected +func (ml *multiLedger) ProposeChain(env *cb.Envelope) cb.Status { + return ml.sysChain.proposeChain(env) +} + // GetChain retrieves the chain support for a chain (and whether it exists) func (ml *multiLedger) GetChain(chainID string) (ChainSupport, bool) { cs, ok := ml.chains[chainID] return cs, ok } -func (ml *multiLedger) newResources(configTx *cb.Envelope) (configtx.Manager, policies.Manager, rawledger.ReadWriter, sharedconfig.Manager) { +func newConfigTxManagerAndHandlers(configEnvelope *cb.ConfigurationEnvelope) (configtx.Manager, policies.Manager, sharedconfig.Manager, error) { policyManager := policies.NewManagerImpl(xxxCryptoHelper{}) sharedConfigManager := sharedconfig.NewManagerImpl() configHandlerMap := make(map[cb.ConfigurationItem_ConfigurationType]configtx.Handler) @@ -150,6 +176,15 @@ func (ml *multiLedger) newResources(configTx *cb.Envelope) (configtx.Manager, po } } + configManager, err := configtx.NewConfigurationManager(configEnvelope, policyManager, configHandlerMap) + if err != nil { + return nil, nil, nil, fmt.Errorf("Error unpacking configuration transaction: %s", err) + } + + return configManager, policyManager, sharedConfigManager, nil +} + +func (ml *multiLedger) newResources(configTx *cb.Envelope) (configtx.Manager, policies.Manager, rawledger.ReadWriter, sharedconfig.Manager) { payload := &cb.Payload{} err := proto.Unmarshal(configTx.Payload, payload) if err != nil { @@ -162,9 +197,10 @@ func (ml *multiLedger) newResources(configTx *cb.Envelope) (configtx.Manager, po logger.Fatalf("Error unmarshaling a config transaction to config envelope: %s", err) } - configManager, err := configtx.NewConfigurationManager(configEnvelope, policyManager, configHandlerMap) + configManager, policyManager, sharedConfigManager, err := newConfigTxManagerAndHandlers(configEnvelope) + if err != nil { - logger.Fatalf("Error unpacking configuration transaction: %s", err) + logger.Fatalf("Error creating configtx manager and handlers: %s", err) } chainID := configManager.ChainID() @@ -176,3 +212,28 @@ func (ml *multiLedger) newResources(configTx *cb.Envelope) (configtx.Manager, po return configManager, policyManager, ledger, sharedConfigManager } + +func (ml *multiLedger) systemChain() *systemChain { + return ml.sysChain +} + +func (ml *multiLedger) newChain(configtx *cb.Envelope) { + configManager, policyManager, backingLedger, sharedConfig := ml.newResources(configtx) + backingLedger.Append([]*cb.Envelope{configtx}, nil) + + // Copy the map to allow concurrent reads from broadcast/deliver while the new chainSupport is + newChains := make(map[string]*chainSupport) + for key, value := range ml.chains { + newChains[key] = value + } + + cs := newChainSupport(createStandardFilters(configManager), configManager, policyManager, backingLedger, sharedConfig, ml.consenters) + chainID := configManager.ChainID() + + logger.Debugf("Created and starting new chain %s", chainID) + + newChains[string(chainID)] = cs + cs.start() + + ml.chains = newChains +} diff --git a/orderer/multichain/manager_test.go b/orderer/multichain/manager_test.go index aab55e1b447..61992ad3a79 100644 --- a/orderer/multichain/manager_test.go +++ b/orderer/multichain/manager_test.go @@ -39,6 +39,7 @@ func init() { } } +// TODO move to util func makeNormalTx(chainID string, i int) *cb.Envelope { payload := &cb.Payload{ Header: &cb.Header{ @@ -54,27 +55,6 @@ func makeNormalTx(chainID string, i int) *cb.Envelope { } } -func makeConfigTx(chainID string, i int) *cb.Envelope { - payload := &cb.Payload{ - Header: &cb.Header{ - ChainHeader: &cb.ChainHeader{ - Type: int32(cb.HeaderType_CONFIGURATION_TRANSACTION), - ChainID: chainID, - }, - }, - Data: utils.MarshalOrPanic(&cb.ConfigurationEnvelope{ - Items: []*cb.SignedConfigurationItem{&cb.SignedConfigurationItem{ - ConfigurationItem: utils.MarshalOrPanic(&cb.ConfigurationItem{ - Value: []byte(fmt.Sprintf("%d", i)), - }), - }}, - }), - } - return &cb.Envelope{ - Payload: utils.MarshalOrPanic(payload), - } -} - // Tests for a normal chain which contains 3 config transactions and other normal transactions to make sure the right one returned func TestGetConfigTx(t *testing.T) { _, rl := ramledger.New(10, genesisBlock) @@ -155,3 +135,92 @@ func TestManagerImpl(t *testing.T) { t.Fatalf("Block 1 not produced after timeout") } } + +// This test brings up the entire system, with the mock consenter, including the broadcasters etc. and creates a new chain +func TestNewChain(t *testing.T) { + lf, rl := ramledger.New(10, genesisBlock) + + consenters := make(map[string]Consenter) + consenters[static.DefaultConsensusType] = &mockConsenter{} + + manager := NewManagerImpl(lf, consenters) + + oldGenesisTx := utils.ExtractEnvelopeOrPanic(genesisBlock, 0) + oldGenesisTxPayload := utils.ExtractPayloadOrPanic(oldGenesisTx) + oldConfigEnv := utils.UnmarshalConfigurationEnvelopeOrPanic(oldGenesisTxPayload.Data) + + newChainID := "TestNewChain" + newChainMessage := ab.ChainCreationConfigurationTransaction(static.AcceptAllPolicyKey, newChainID, oldConfigEnv) + + status := manager.ProposeChain(newChainMessage) + + if status != cb.Status_SUCCESS { + t.Fatalf("Error submitting chain creation request") + } + + it, _ := rl.Iterator(ab.SeekInfo_SPECIFIED, 1) + select { + case <-it.ReadyChan(): + block, status := it.Next() + if status != cb.Status_SUCCESS { + t.Fatalf("Could not retrieve block") + } + if len(block.Data.Data) != 1 { + t.Fatalf("Should have had only one message in the orderer transaction block") + } + genesisConfigTx := utils.UnmarshalEnvelopeOrPanic(utils.UnmarshalPayloadOrPanic(utils.ExtractEnvelopeOrPanic(block, 0).Payload).Data) + if !reflect.DeepEqual(genesisConfigTx, newChainMessage) { + t.Errorf("Orderer config block contains wrong transaction, expected %v got %v", genesisConfigTx, newChainMessage) + } + case <-time.After(time.Second): + t.Fatalf("Block 1 not produced after timeout in system chain") + } + + chainSupport, ok := manager.GetChain(newChainID) + + if !ok { + t.Fatalf("Should have gotten new chain which was created") + } + + messages := make([]*cb.Envelope, static.DefaultBatchSize) + for i := 0; i < static.DefaultBatchSize; i++ { + messages[i] = makeNormalTx(newChainID, i) + } + + for _, message := range messages { + chainSupport.Enqueue(message) + } + + it, _ = chainSupport.Reader().Iterator(ab.SeekInfo_SPECIFIED, 0) + select { + case <-it.ReadyChan(): + block, status := it.Next() + if status != cb.Status_SUCCESS { + t.Fatalf("Could not retrieve new chain genesis block") + } + if len(block.Data.Data) != 1 { + t.Fatalf("Should have had only one message in the new genesis block") + } + genesisConfigTx := utils.ExtractEnvelopeOrPanic(block, 0) + if !reflect.DeepEqual(genesisConfigTx, newChainMessage) { + t.Errorf("Genesis block contains wrong transaction, expected %v got %v", genesisConfigTx, newChainMessage) + } + case <-time.After(time.Second): + t.Fatalf("Block 1 not produced after timeout in system chain") + } + + select { + case <-it.ReadyChan(): + block, status := it.Next() + if status != cb.Status_SUCCESS { + t.Fatalf("Could not retrieve block on new chain") + } + for i := 0; i < static.DefaultBatchSize; i++ { + if !reflect.DeepEqual(utils.ExtractEnvelopeOrPanic(block, i), messages[i]) { + t.Errorf("Block contents wrong at index %d in new chain", i) + } + } + case <-time.After(time.Second): + t.Fatalf("Block 1 not produced after timeout on new chain") + } +} diff --git a/orderer/multichain/systemchain.go b/orderer/multichain/systemchain.go new file mode 100644 index 00000000000..2cedcc64f16 --- /dev/null +++ b/orderer/multichain/systemchain.go @@ -0,0 +1,255 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multichain + +import ( + "bytes" + + "github.com/hyperledger/fabric/core/util" + "github.com/hyperledger/fabric/orderer/common/configtx" + "github.com/hyperledger/fabric/orderer/common/filter" + "github.com/hyperledger/fabric/orderer/common/policies" + "github.com/hyperledger/fabric/orderer/common/sharedconfig" + cb "github.com/hyperledger/fabric/protos/common" + ab "github.com/hyperledger/fabric/protos/orderer" + + "github.com/golang/protobuf/proto" +) + +// Define some internal interfaces for easier mocking +type chainCreator interface { + newChain(configTx *cb.Envelope) + systemChain() *systemChain +} + +type limitedSupport interface { + ChainID() string + PolicyManager() policies.Manager + SharedConfig() sharedconfig.Manager + Enqueue(env *cb.Envelope) bool +} + +type systemChainCommitter struct { + cc chainCreator + configTx *cb.Envelope +} + +func (scc *systemChainCommitter) Isolated() bool { + return true +} + +func (scc *systemChainCommitter) Commit() { + scc.cc.newChain(scc.configTx) +} + +type systemChainFilter struct { + cc chainCreator +} + +func newSystemChainFilter(cc chainCreator) filter.Rule { + return &systemChainFilter{ + cc: cc, + } +} + +func (scf *systemChainFilter) Apply(env *cb.Envelope) (filter.Action, filter.Committer) { + msgData := &cb.Payload{} + + err := proto.Unmarshal(env.Payload, msgData) + if err != nil { + return filter.Forward, nil + } + + if msgData.Header == nil || msgData.Header.ChainHeader == nil || msgData.Header.ChainHeader.Type != int32(cb.HeaderType_ORDERER_TRANSACTION) { + return filter.Forward, nil + } + + configTx := &cb.Envelope{} + err = proto.Unmarshal(msgData.Data, configTx) + if err != nil { + return filter.Reject, nil + } + + status := scf.cc.systemChain().authorizeAndInspect(configTx) + if status != cb.Status_SUCCESS { + return filter.Reject, nil + } + + return filter.Accept, &systemChainCommitter{ + cc: scf.cc, + configTx: configTx, + } +} + +type systemChain struct { + support limitedSupport +} + +func newSystemChain(support limitedSupport) *systemChain { + return &systemChain{ + support: support, + } +} + +func (sc *systemChain) proposeChain(configTx *cb.Envelope) cb.Status { + status := sc.authorizeAndInspect(configTx) + if status != cb.Status_SUCCESS { + return status + } + + marshaledEnv, err := proto.Marshal(configTx) + if err != nil { + logger.Debugf("Rejecting chain proposal: Error marshaling config: %s", err) + return cb.Status_INTERNAL_SERVER_ERROR + } + + sysPayload := &cb.Payload{ + Header: &cb.Header{ + ChainHeader: &cb.ChainHeader{ + ChainID: sc.support.ChainID(), + Type: int32(cb.HeaderType_ORDERER_TRANSACTION), + }, + }, + Data: marshaledEnv, + } + + marshaledPayload, err := proto.Marshal(sysPayload) + if err != nil { + logger.Debugf("Rejecting chain proposal: Error marshaling payload: %s", err) + return cb.Status_INTERNAL_SERVER_ERROR + } + + sysTran := &cb.Envelope{ + Payload: marshaledPayload, + // XXX Add signature eventually + } + + if !sc.support.Enqueue(sysTran) { + return cb.Status_INTERNAL_SERVER_ERROR + } + + return cb.Status_SUCCESS +} + +func (sc *systemChain) authorize(configEnvelope *cb.ConfigurationEnvelope) cb.Status { + creationConfigItem := &cb.ConfigurationItem{} + err := proto.Unmarshal(configEnvelope.Items[0].ConfigurationItem, creationConfigItem) + if err != nil { + logger.Debugf("Failing to validate chain creation because of unmarshaling error: %s", err) + return cb.Status_BAD_REQUEST + } + + if creationConfigItem.Key != ab.CreationPolicyKey { + logger.Debugf("Failing to validate chain creation because first configuration item was not the CreationPolicy") + return cb.Status_BAD_REQUEST + } + + creationPolicy := &ab.CreationPolicy{} + err = proto.Unmarshal(creationConfigItem.Value, creationPolicy) + if err != nil { + logger.Debugf("Failing to validate chain creation because first configuration item could not unmarshal to a CreationPolicy: %s", err) + return cb.Status_BAD_REQUEST + } + + ok := false + for _, chainCreatorPolicy := range sc.support.SharedConfig().ChainCreators() { + if chainCreatorPolicy == creationPolicy.Policy { + ok = true + break + } + } + + if !ok { + logger.Debugf("Failed to validate chain creation because chain creation policy is not authorized for chain creation") + return cb.Status_FORBIDDEN + } + + policy, ok := sc.support.PolicyManager().GetPolicy(creationPolicy.Policy) + if !ok { + logger.Debugf("Failed to get policy for chain creation despite it being listed as an authorized policy") + return cb.Status_INTERNAL_SERVER_ERROR + } + + // XXX actually do policy signature validation + _ = policy + + var remainingBytes []byte + for i, item := range configEnvelope.Items { + if i == 0 { + // Do not include the creation policy + continue + } + remainingBytes = append(remainingBytes, item.ConfigurationItem...) + } + + configHash := util.ComputeCryptoHash(remainingBytes) + + if !bytes.Equal(configHash, creationPolicy.Digest) { + logger.Debugf("Validly signed chain creation did not contain correct digest for remaining configuration %x vs. %x", configHash, creationPolicy.Digest) + return cb.Status_BAD_REQUEST + } + + return cb.Status_SUCCESS +} + +func (sc *systemChain) inspect(configTxManager configtx.Manager, policyManager policies.Manager, sharedConfigManager sharedconfig.Manager) cb.Status { + // XXX decide what it is that we will require to be the same in the new configuration, and what will be allowed to be different + // Are all keys allowed? etc. + + return cb.Status_SUCCESS +} + +func (sc *systemChain) authorizeAndInspect(configTx *cb.Envelope) cb.Status { + payload := &cb.Payload{} + err := proto.Unmarshal(configTx.Payload, payload) + if err != nil { + logger.Debugf("Rejecting chain proposal: Error unmarshaling envelope payload: %s", err) + return cb.Status_BAD_REQUEST + } + + if payload.Header == nil || payload.Header.ChainHeader == nil || payload.Header.ChainHeader.Type != int32(cb.HeaderType_CONFIGURATION_TRANSACTION) { + logger.Debugf("Rejecting chain proposal: Not a configuration transaction: %s", err) + return cb.Status_BAD_REQUEST + } + + configEnvelope := &cb.ConfigurationEnvelope{} + err = proto.Unmarshal(payload.Data, configEnvelope) + if err != nil { + logger.Debugf("Rejecting chain proposal: Error unmarshalling config envelope from payload: %s", err) + return cb.Status_BAD_REQUEST + } + + if len(configEnvelope.Items) == 0 { + logger.Debugf("Failing to validate chain creation because configuration was empty") + return cb.Status_BAD_REQUEST + } + + // Make sure that the configuration was signed by the appropriate authorized entities + status := sc.authorize(configEnvelope) + if status != cb.Status_SUCCESS { + return status + } + + configTxManager, policyManager, sharedConfigManager, err := newConfigTxManagerAndHandlers(configEnvelope) + if err != nil { + logger.Debugf("Failed to create config manager and handlers: %s", err) + return cb.Status_BAD_REQUEST + } + + // Make sure that the configuration does not modify any of the orderer + return sc.inspect(configTxManager, policyManager, sharedConfigManager) +} diff --git a/orderer/multichain/systemchain_test.go b/orderer/multichain/systemchain_test.go new file mode 100644 index 00000000000..c0cc14cfc99 --- /dev/null +++ b/orderer/multichain/systemchain_test.go @@ -0,0 +1,249 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multichain + +import ( + "reflect" + "testing" + + coreutil "github.com/hyperledger/fabric/core/util" + "github.com/hyperledger/fabric/orderer/common/bootstrap/static" + "github.com/hyperledger/fabric/orderer/common/filter" + "github.com/hyperledger/fabric/orderer/common/policies" + "github.com/hyperledger/fabric/orderer/common/sharedconfig" + cb "github.com/hyperledger/fabric/protos/common" + ab "github.com/hyperledger/fabric/protos/orderer" + "github.com/hyperledger/fabric/protos/utils" +) + +type mockPolicy struct { + err error +} + +func (mp *mockPolicy) Evaluate(header [][]byte, payload []byte, identities [][]byte, signatures [][]byte) error { + return mp.err +} + +type mockPolicyManager struct { + mp *mockPolicy +} + +func (mpm *mockPolicyManager) GetPolicy(id string) (policies.Policy, bool) { + return mpm.mp, mpm.mp != nil +} + +type mockSharedConfig struct { + chainCreators []string +} + +func (msc *mockSharedConfig) ConsensusType() string { + panic("Unimplemented") +} + +func (msc *mockSharedConfig) BatchSize() int { + panic("Unimplemented") +} + +func (msc *mockSharedConfig) ChainCreators() []string { + return msc.chainCreators +} + +type mockSupport struct { + mpm *mockPolicyManager + msc *mockSharedConfig + chainID string + queue []*cb.Envelope +} + +func newMockSupport(chainID string) *mockSupport { + return &mockSupport{ + mpm: &mockPolicyManager{}, + msc: &mockSharedConfig{}, + chainID: chainID, + } +} + +func (ms *mockSupport) Enqueue(msg *cb.Envelope) bool { + ms.queue = append(ms.queue, msg) + return true +} + +func (ms *mockSupport) ChainID() string { + return ms.chainID +} + +func (ms *mockSupport) PolicyManager() policies.Manager { + return ms.mpm +} + +func (ms *mockSupport) SharedConfig() sharedconfig.Manager { + return ms.msc +} + +type mockChainCreator struct { + newChains []*cb.Envelope + ms *mockSupport + sysChain *systemChain +} + +func newMockChainCreator() *mockChainCreator { + mcc := &mockChainCreator{ + ms: newMockSupport(static.TestChainID), + } + mcc.sysChain = newSystemChain(mcc.ms) + return mcc +} + +func (mcc *mockChainCreator) newChain(configTx *cb.Envelope) { + mcc.newChains = append(mcc.newChains, configTx) +} + +func (mcc *mockChainCreator) systemChain() *systemChain { + return mcc.sysChain +} + +func TestGoodProposal(t *testing.T) { + newChainID := "NewChainID" + + mcc := newMockChainCreator() + mcc.ms.msc.chainCreators = []string{static.AcceptAllPolicyKey} + mcc.ms.mpm.mp = &mockPolicy{} + + chainCreateTx := &cb.ConfigurationItem{ + Header: &cb.ChainHeader{ + ChainID: newChainID, + Type: int32(cb.HeaderType_CONFIGURATION_ITEM), + }, + Key: ab.CreationPolicyKey, + Type: cb.ConfigurationItem_Orderer, + Value: utils.MarshalOrPanic(&ab.CreationPolicy{ + Policy: static.AcceptAllPolicyKey, + Digest: coreutil.ComputeCryptoHash([]byte{}), + }), + } + ingressTx := makeConfigTxWithItems(newChainID, chainCreateTx) + status := mcc.sysChain.proposeChain(ingressTx) + if status != cb.Status_SUCCESS { + t.Fatalf("Should have successfully proposed chain") + } + + expected := 1 + if len(mcc.ms.queue) != expected { + t.Fatalf("Expected %d creation txs in the chain, but found %d", expected, len(mcc.ms.queue)) + } + + wrapped := mcc.ms.queue[0] + payload := utils.UnmarshalPayloadOrPanic(wrapped.Payload) + if payload.Header.ChainHeader.Type != int32(cb.HeaderType_ORDERER_TRANSACTION) { + t.Fatalf("Wrapped transaction should be of type ORDERER_TRANSACTION") + } + envelope := utils.UnmarshalEnvelopeOrPanic(payload.Data) + if !reflect.DeepEqual(envelope, ingressTx) { + t.Fatalf("Received different configtx than ingressed into the system") + } + + sysFilter := newSystemChainFilter(mcc) + action, committer := sysFilter.Apply(wrapped) + + if action != filter.Accept { + t.Fatalf("Should have accepted the transaction, as it was already validated") + } + + if !committer.Isolated() { + t.Fatalf("Chain creation transactions should be isolated on commit") + } + + committer.Commit() + expected = 1 + if len(mcc.newChains) != 1 { + t.Fatalf("Proposal should only have created one new chain") + } + + if !reflect.DeepEqual(mcc.newChains[0], ingressTx) { + t.Fatalf("New chain should have been created with ingressTx") + } +} + +func TestProposalWithBadPolicy(t *testing.T) { + newChainID := "NewChainID" + + mcc := newMockChainCreator() + mcc.ms.mpm.mp = &mockPolicy{} + + chainCreateTx := &cb.ConfigurationItem{ + Key: ab.CreationPolicyKey, + Type: cb.ConfigurationItem_Orderer, + Value: utils.MarshalOrPanic(&ab.CreationPolicy{ + Policy: static.AcceptAllPolicyKey, + Digest: coreutil.ComputeCryptoHash([]byte{}), + }), + } + ingressTx := makeConfigTxWithItems(newChainID, chainCreateTx) + + status := mcc.sysChain.proposeChain(ingressTx) + + if status == cb.Status_SUCCESS { + t.Fatalf("Should not have validated the transaction with no authorized chain creation policies") + } +} + +func TestProposalWithMissingPolicy(t *testing.T) { + newChainID := "NewChainID" + + mcc := newMockChainCreator() + mcc.ms.msc.chainCreators = []string{static.AcceptAllPolicyKey} + + chainCreateTx := &cb.ConfigurationItem{ + Key: ab.CreationPolicyKey, + Type: cb.ConfigurationItem_Orderer, + Value: utils.MarshalOrPanic(&ab.CreationPolicy{ + Policy: static.AcceptAllPolicyKey, + Digest: coreutil.ComputeCryptoHash([]byte{}), + }), + } + ingressTx := makeConfigTxWithItems(newChainID, chainCreateTx) + + status := mcc.sysChain.proposeChain(ingressTx) + + if status == cb.Status_SUCCESS { + t.Fatalf("Should not have validated the transaction with missing policy") + } +} + +func TestProposalWithBadDigest(t *testing.T) { + newChainID := "NewChainID" + + mcc := newMockChainCreator() + mcc.ms.mpm.mp = &mockPolicy{} + mcc.ms.msc.chainCreators = []string{static.AcceptAllPolicyKey} + + chainCreateTx := &cb.ConfigurationItem{ + Key: ab.CreationPolicyKey, + Type: cb.ConfigurationItem_Orderer, + Value: utils.MarshalOrPanic(&ab.CreationPolicy{ + Policy: static.AcceptAllPolicyKey, + Digest: coreutil.ComputeCryptoHash([]byte("BAD_DIGEST")), + }), + } + ingressTx := makeConfigTxWithItems(newChainID, chainCreateTx) + + status := mcc.sysChain.proposeChain(ingressTx) + + if status == cb.Status_SUCCESS { + t.Fatalf("Should not have validated the transaction with missing policy") + } +} diff --git a/orderer/multichain/chainsupport_mock_test.go b/orderer/multichain/util_test.go similarity index 66% rename from orderer/multichain/chainsupport_mock_test.go rename to orderer/multichain/util_test.go index e7d2219c3b9..69d615bbf4a 100644 --- a/orderer/multichain/chainsupport_mock_test.go +++ b/orderer/multichain/util_test.go @@ -17,8 +17,11 @@ limitations under the License. package multichain import ( + "fmt" + "github.com/hyperledger/fabric/orderer/common/blockcutter" cb "github.com/hyperledger/fabric/protos/common" + "github.com/hyperledger/fabric/protos/utils" ) type mockConsenter struct { @@ -62,27 +65,40 @@ func (mch *mockChain) Halt() { close(mch.queue) } -type mockConfigtxManager struct { - config *cb.ConfigurationEnvelope +type mockLedgerWriter struct { } -func (mcm *mockConfigtxManager) Apply(configtx *cb.ConfigurationEnvelope) error { - mcm.config = configtx +func (mlw *mockLedgerWriter) Append(blockContents []*cb.Envelope, metadata [][]byte) *cb.Block { + logger.Debugf("Committed block") return nil } -func (mcm *mockConfigtxManager) Validate(configtx *cb.ConfigurationEnvelope) error { - panic("Unimplemented") +func makeConfigTx(chainID string, i int) *cb.Envelope { + return makeConfigTxWithItems(chainID, &cb.ConfigurationItem{ + Value: []byte(fmt.Sprintf("%d", i)), + }) } -func (mcm *mockConfigtxManager) ChainID() string { - panic("Unimplemented") -} - -type mockLedgerWriter struct { -} - -func (mlw *mockLedgerWriter) Append(blockContents []*cb.Envelope, metadata [][]byte) *cb.Block { - logger.Debugf("Committed block") - return nil +func makeConfigTxWithItems(chainID string, items ...*cb.ConfigurationItem) *cb.Envelope { + signedItems := make([]*cb.SignedConfigurationItem, len(items)) + for i, item := range items { + signedItems[i] = &cb.SignedConfigurationItem{ + ConfigurationItem: utils.MarshalOrPanic(item), + } + } + + payload := &cb.Payload{ + Header: &cb.Header{ + ChainHeader: &cb.ChainHeader{ + Type: int32(cb.HeaderType_CONFIGURATION_TRANSACTION), + ChainID: chainID, + }, + }, + Data: utils.MarshalOrPanic(&cb.ConfigurationEnvelope{ + Items: signedItems, + }), + } + return &cb.Envelope{ + Payload: utils.MarshalOrPanic(payload), + } } diff --git a/protos/orderer/configuration.go b/protos/orderer/configuration.go new file mode 100644 index 00000000000..0871e9e8282 --- /dev/null +++ b/protos/orderer/configuration.go @@ -0,0 +1,88 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package orderer + +import ( + cu "github.com/hyperledger/fabric/core/util" + cb "github.com/hyperledger/fabric/protos/common" + "github.com/hyperledger/fabric/protos/utils" +) + +const CreationPolicyKey = "CreationPolicy" + +// ChainCreationConfiguration creates a new chain creation configuration envelope from +// the supplied creationPolicy, new chainID, and a template configuration envelope +// The template configuration envelope will have the correct chainID set on all items, +// and the first item will be a CreationPolicy which is ready for the signatures as +// required by the policy +func ChainCreationConfiguration(creationPolicy, newChainID string, template *cb.ConfigurationEnvelope) *cb.ConfigurationEnvelope { + newConfigItems := make([]*cb.SignedConfigurationItem, len(template.Items)) + var hashBytes []byte + + for i, item := range template.Items { + configItem := utils.UnmarshalConfigurationItemOrPanic(item.ConfigurationItem) + configItem.Header.ChainID = newChainID + newConfigItems[i] = &cb.SignedConfigurationItem{ + ConfigurationItem: utils.MarshalOrPanic(configItem), + } + hashBytes = append(hashBytes, newConfigItems[i].ConfigurationItem...) + } + + digest := cu.ComputeCryptoHash(hashBytes) + + authorizeItem := &cb.SignedConfigurationItem{ + ConfigurationItem: utils.MarshalOrPanic(&cb.ConfigurationItem{ + Header: &cb.ChainHeader{ + ChainID: newChainID, + Type: int32(cb.HeaderType_CONFIGURATION_ITEM), + }, + Type: cb.ConfigurationItem_Orderer, + Key: CreationPolicyKey, + Value: utils.MarshalOrPanic(&CreationPolicy{ + Policy: creationPolicy, + Digest: digest, + }), + }), + } + + authorizedConfig := append([]*cb.SignedConfigurationItem{authorizeItem}, newConfigItems...) + + return &cb.ConfigurationEnvelope{ + Items: authorizedConfig, + } +} + +// ChainCreationConfigurationTransaction creates a new chain creation configuration transaction +// by invoking ChainCreationConfiguration and embedding the resulting configuration envelope is a +// configuration transaction +func ChainCreationConfigurationTransaction(creationPolicy, newChainID string, template *cb.ConfigurationEnvelope) *cb.Envelope { + configurationEnvelope := ChainCreationConfiguration(creationPolicy, newChainID, template) + + newGenesisTx := &cb.Envelope{ + Payload: utils.MarshalOrPanic(&cb.Payload{ + Header: &cb.Header{ + ChainHeader: &cb.ChainHeader{ + Type: int32(cb.HeaderType_CONFIGURATION_TRANSACTION), + ChainID: newChainID, + }, + }, + Data: utils.MarshalOrPanic(configurationEnvelope), + }), + } + + return newGenesisTx +} diff --git a/protos/utils/commonutils.go b/protos/utils/commonutils.go index b83cb932082..c48f553dc7c 100644 --- a/protos/utils/commonutils.go +++ b/protos/utils/commonutils.go @@ -60,6 +60,44 @@ func CreateNonce() ([]byte, error) { return nonce, nil } +// UnmarshalPayloadOrPanic unmarshals bytes to a Payload structure or panics on error +func UnmarshalPayloadOrPanic(encoded []byte) *cb.Payload { + payload, err := UnmarshalPayload(encoded) + if err != nil { + panic(fmt.Errorf("Error unmarshaling data to payload: %s", err)) + } + return payload +} + +// UnmarshalPayload unmarshals bytes to a Payload structure +func UnmarshalPayload(encoded []byte) (*cb.Payload, error) { + payload := &cb.Payload{} + err := proto.Unmarshal(encoded, payload) + if err != nil { + return nil, err + } + return payload, err +} + +// UnmarshalEnvelopeOrPanic unmarshals bytes to an Envelope structure or panics on error +func UnmarshalEnvelopeOrPanic(encoded []byte) *cb.Envelope { + envelope, err := UnmarshalEnvelope(encoded) + if err != nil { + panic(fmt.Errorf("Error unmarshaling data to envelope: %s", err)) + } + return envelope +} + +// UnmarshalEnvelope unmarshals bytes to an Envelope structure +func UnmarshalEnvelope(encoded []byte) (*cb.Envelope, error) { + envelope := &cb.Envelope{} + err := proto.Unmarshal(encoded, envelope) + if err != nil { + return nil, err + } + return envelope, err +} + // ExtractEnvelopeOrPanic retrieves the requested envelope from a given block and unmarshals it -- it panics if either of these operation fail. func ExtractEnvelopeOrPanic(block *cb.Block, index int) *cb.Envelope { envelopeCount := len(block.Data.Data) diff --git a/protos/utils/configtxutils.go b/protos/utils/configtxutils.go index a0d8f8b9f35..9221262589f 100644 --- a/protos/utils/configtxutils.go +++ b/protos/utils/configtxutils.go @@ -156,3 +156,41 @@ func BreakOutBlockToConfigurationEnvelope(block *pb.Block) (*pb.ConfigurationEnv return configEnvelope, envelopeSignatures[0], nil } // BreakOutPayloadToConfigurationEnvelope + +// UnmarshalConfigurationItemOrPanic unmarshals bytes to a ConfigurationItem or panics on error +func UnmarshalConfigurationItemOrPanic(encoded []byte) *pb.ConfigurationItem { + configItem, err := UnmarshalConfigurationItem(encoded) + if err != nil { + panic(fmt.Errorf("Error unmarshaling data to ConfigurationItem: %s", err)) + } + return configItem +} + +// UnmarshalConfigurationItem unmarshals bytes to a ConfigurationItem +func UnmarshalConfigurationItem(encoded []byte) (*pb.ConfigurationItem, error) { + configItem := &pb.ConfigurationItem{} + err := proto.Unmarshal(encoded, configItem) + if err != nil { + return nil, err + } + return configItem, nil +} + +// UnmarshalConfigurationEnvelopeOrPanic unmarshals bytes to a ConfigurationEnvelope or panics on error +func UnmarshalConfigurationEnvelopeOrPanic(encoded []byte) *pb.ConfigurationEnvelope { + configEnvelope, err := UnmarshalConfigurationEnvelope(encoded) + if err != nil { + panic(fmt.Errorf("Error unmarshaling data to ConfigurationEnvelope: %s", err)) + } + return configEnvelope +} + +// UnmarshalConfigurationEnvelope unmarshals bytes to a ConfigurationEnvelope +func UnmarshalConfigurationEnvelope(encoded []byte) (*pb.ConfigurationEnvelope, error) { + configEnvelope := &pb.ConfigurationEnvelope{} + err := proto.Unmarshal(encoded, configEnvelope) + if err != nil { + return nil, err + } + return configEnvelope, nil +}