Skip to content

Commit

Permalink
[FAB-2366] Convert channel creation to common path
Browse files Browse the repository at this point in the history
https://jira.hyperledger.org/browse/FAB-2366

With FAB-2364, a common code path for config updates and channel
creation was introduced, but not hooked into the production code path.

This CR switches broadcast to use this new path, and removes the old
channel creation path from the multichain manager.

Change-Id: I59fc9c5e47b6abd8c6e18f9d8c9dd9cbf9a3c114
Signed-off-by: Jason Yellick <jyellick@us.ibm.com>
  • Loading branch information
Jason Yellick committed Feb 19, 2017
1 parent 1219131 commit 5455c58
Show file tree
Hide file tree
Showing 12 changed files with 185 additions and 363 deletions.
4 changes: 4 additions & 0 deletions common/configtx/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ func NewManagerImpl(configEnv *cb.ConfigEnvelope, initializer api.Initializer, c
return nil, fmt.Errorf("Nil config envelope Config")
}

if configEnv.Config.Channel == nil {
return nil, fmt.Errorf("Nil config envelope Config.Channel")
}

if configEnv.Config.Header == nil {
return nil, fmt.Errorf("Nil config envelop Config Header")
}
Expand Down
41 changes: 25 additions & 16 deletions orderer/common/broadcast/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ import (

var logger = logging.MustGetLogger("orderer/common/broadcast")

// ConfigUpdateProcessor is used to transform CONFIG_UPDATE transactions which are used to generate other envelope
// message types with preprocessing by the orderer
type ConfigUpdateProcessor interface {
// Process takes in an envelope of type CONFIG_UPDATE and proceses it
// to transform it either into another envelope type
Process(envConfigUpdate *cb.Envelope) (*cb.Envelope, error)
}

// Handler defines an interface which handles broadcasts
type Handler interface {
// Handle starts a service thread for a given gRPC connection and services the broadcast connection
Expand All @@ -37,13 +45,10 @@ type Handler interface {

// SupportManager provides a way for the Handler to look up the Support for a chain
type SupportManager interface {
ConfigUpdateProcessor

// GetChain gets the chain support for a given ChannelId
GetChain(chainID string) (Support, bool)

// ProposeChain accepts a configuration transaction for a chain which does not already exists
// The status returned is whether the proposal is accepted for consideration, only after consensus
// occurs will the proposal be committed or rejected
ProposeChain(env *cb.Envelope) cb.Status
}

// Support provides the backing resources needed to support broadcast on a chain
Expand Down Expand Up @@ -84,23 +89,27 @@ func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
}

support, ok := bh.sm.GetChain(payload.Header.ChannelHeader.ChannelId)
if !ok {
// Chain not found, maybe create one?
if payload.Header.ChannelHeader.Type != int32(cb.HeaderType_CONFIG_UPDATE) {
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_NOT_FOUND})
if payload.Header.ChannelHeader.Type == int32(cb.HeaderType_CONFIG_UPDATE) {
logger.Debugf("Preprocessing CONFIG_UPDATE")
msg, err = bh.sm.Process(msg)
if err != nil {
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
}

logger.Debugf("Proposing new chain")
err = srv.Send(&ab.BroadcastResponse{Status: bh.sm.ProposeChain(msg)})
if err != nil {
return err
err = proto.Unmarshal(msg.Payload, payload)
if payload.Header == nil || payload.Header.ChannelHeader == nil || payload.Header.ChannelHeader.ChannelId == "" {
logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing")
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR})
}
continue
}

support, ok := bh.sm.GetChain(payload.Header.ChannelHeader.ChannelId)
if !ok {
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_NOT_FOUND})
}

if logger.IsEnabledFor(logging.DEBUG) {
logger.Debugf("Broadcast is filtering message for chain %s", payload.Header.ChannelHeader.ChannelId)
logger.Debugf("Broadcast is filtering message for channel %s", payload.Header.ChannelHeader.ChannelId)
}

// Normal transaction for existing chain
Expand Down
42 changes: 20 additions & 22 deletions orderer/common/broadcast/broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/hyperledger/fabric/protos/utils"

logging "github.com/op/go-logging"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -63,24 +64,20 @@ func (m *mockB) Recv() (*cb.Envelope, error) {
}

type mockSupportManager struct {
chains map[string]*mockSupport
chains map[string]*mockSupport
ProcessVal *cb.Envelope
}

func (mm *mockSupportManager) GetChain(chainID string) (Support, bool) {
chain, ok := mm.chains[chainID]
return chain, ok
}

func (mm *mockSupportManager) ProposeChain(configTx *cb.Envelope) cb.Status {
payload := utils.ExtractPayloadOrPanic(configTx)

mm.chains[string(payload.Header.ChannelHeader.ChannelId)] = &mockSupport{
filters: filter.NewRuleSet([]filter.Rule{
filter.EmptyRejectRule,
filter.AcceptRule,
}),
func (mm *mockSupportManager) Process(configTx *cb.Envelope) (*cb.Envelope, error) {
if mm.ProcessVal == nil {
return nil, fmt.Errorf("Nil result implies error")
}
return cb.Status_SUCCESS
return mm.ProcessVal, nil
}

type mockSupport struct {
Expand Down Expand Up @@ -222,8 +219,9 @@ func TestBadChannelId(t *testing.T) {
}
}

func TestNewChannelId(t *testing.T) {
func TestGoodConfigUpdate(t *testing.T) {
mm, _ := getMockSupportManager()
mm.ProcessVal = &cb.Envelope{Payload: utils.MarshalOrPanic(&cb.Payload{Header: &cb.Header{ChannelHeader: &cb.ChannelHeader{ChannelId: systemChain}}})}
bh := NewHandlerImpl(mm)
m := newMockB()
defer close(m.recvChan)
Expand All @@ -232,17 +230,17 @@ func TestNewChannelId(t *testing.T) {

m.recvChan <- makeConfigMessage(newChannelId)
reply := <-m.sendChan
if reply.Status != cb.Status_SUCCESS {
t.Fatalf("Should have created a new chain, got %d", reply.Status)
}
assert.Equal(t, cb.Status_SUCCESS, reply.Status, "Should have allowed a good CONFIG_UPDATE")
}

if len(mm.chains) != 2 {
t.Fatalf("Should have created a new chain")
}
func TestBadConfigUpdate(t *testing.T) {
mm, _ := getMockSupportManager()
bh := NewHandlerImpl(mm)
m := newMockB()
defer close(m.recvChan)
go bh.Handle(m)

m.recvChan <- makeMessage(newChannelId, []byte("Some bytes"))
reply = <-m.sendChan
if reply.Status != cb.Status_SUCCESS {
t.Fatalf("Should have successfully sent message to new chain, got %v", reply)
}
m.recvChan <- makeConfigMessage(systemChain)
reply := <-m.sendChan
assert.NotEqual(t, cb.Status_SUCCESS, reply.Status, "Should have rejected CONFIG_UPDATE")
}
7 changes: 4 additions & 3 deletions orderer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,13 @@ func main() {
consenters["kafka"] = kafka.New(conf.Kafka.Version, conf.Kafka.Retry, conf.Kafka.TLS)
consenters["sbft"] = sbft.New(makeSbftConsensusConfig(conf), makeSbftStackConfig(conf))

manager := multichain.NewManagerImpl(lf, consenters, localmsp.NewSigner())
signer := localmsp.NewSigner()

manager := multichain.NewManagerImpl(lf, consenters, signer)

server := NewServer(
manager,
int(conf.General.QueueSize),
int(conf.General.MaxWindowSize),
signer,
)

ab.RegisterAtomicBroadcastServer(grpcServer.Server(), server)
Expand Down
5 changes: 4 additions & 1 deletion orderer/multichain/chainsupport.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ type ChainSupport interface {

broadcast.Support
ConsenterSupport

// ProposeConfigUpdate applies a CONFIG_UPDATE to an existing config to produce a *cb.ConfigEnvelope
ProposeConfigUpdate(env *cb.Envelope) (*cb.ConfigEnvelope, error)
}

type chainSupport struct {
Expand Down Expand Up @@ -155,7 +158,7 @@ func createSystemChainFilters(ml *multiLedger, ledgerResources *ledgerResources)
filter.EmptyRejectRule,
sizefilter.MaxBytesRule(ledgerResources.SharedConfig().BatchSize().AbsoluteMaxBytes),
sigfilter.New(ledgerResources.SharedConfig().IngressPolicyNames, ledgerResources.PolicyManager()),
newSystemChainFilter(ml),
newSystemChainFilter(ledgerResources, ml),
configtxfilter.NewFilter(ledgerResources),
filter.AcceptRule,
})
Expand Down
35 changes: 13 additions & 22 deletions orderer/multichain/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@ type Manager interface {
// GetChain retrieves the chain support for a chain (and whether it exists)
GetChain(chainID string) (ChainSupport, bool)

// ProposeChain accepts a config transaction for a chain which does not already exists
// The status returned is whether the proposal is accepted for consideration, only after consensus
// occurs will the proposal be committed or rejected
ProposeChain(env *cb.Envelope) cb.Status
// SystemChannelID returns the channel ID for the system channel
SystemChannelID() string
}

type configResources struct {
Expand All @@ -58,11 +56,11 @@ type ledgerResources struct {
}

type multiLedger struct {
chains map[string]*chainSupport
consenters map[string]Consenter
ledgerFactory ordererledger.Factory
sysChain *systemChain
signer crypto.LocalSigner
chains map[string]*chainSupport
consenters map[string]Consenter
ledgerFactory ordererledger.Factory
signer crypto.LocalSigner
systemChannelID string
}

func getConfigTx(reader ordererledger.Reader) *cb.Envelope {
Expand Down Expand Up @@ -102,16 +100,16 @@ func NewManagerImpl(ledgerFactory ordererledger.Factory, consenters map[string]C
chainID := ledgerResources.ChainID()

if ledgerResources.SharedConfig().ChainCreationPolicyNames() != nil {
if ml.sysChain != nil {
logger.Fatalf("There appear to be two system chains %s and %s", ml.sysChain.support.ChainID(), chainID)
if ml.systemChannelID != "" {
logger.Fatalf("There appear to be two system chains %s and %s", ml.systemChannelID, chainID)
}
chain := newChainSupport(createSystemChainFilters(ml, ledgerResources),
ledgerResources,
consenters,
signer)
logger.Infof("Starting with system channel: %s and orderer type %s", chainID, chain.SharedConfig().ConsensusType())
ml.chains[string(chainID)] = chain
ml.sysChain = newSystemChain(chain)
ml.systemChannelID = chainID
// We delay starting this chain, as it might try to copy and replace the chains map via newChain before the map is fully built
defer chain.start()
} else {
Expand All @@ -126,18 +124,15 @@ func NewManagerImpl(ledgerFactory ordererledger.Factory, consenters map[string]C

}

if ml.sysChain == nil {
if ml.systemChannelID == "" {
logger.Panicf("No system chain found")
}

return ml
}

// ProposeChain accepts a config transaction for a chain which does not already exists
// The status returned is whether the proposal is accepted for consideration, only after consensus
// occurs will the proposal be committed or rejected
func (ml *multiLedger) ProposeChain(env *cb.Envelope) cb.Status {
return ml.sysChain.proposeChain(env)
func (ml *multiLedger) SystemChannelID() string {
return ml.systemChannelID
}

// GetChain retrieves the chain support for a chain (and whether it exists)
Expand Down Expand Up @@ -190,10 +185,6 @@ func (ml *multiLedger) newLedgerResources(configTx *cb.Envelope) *ledgerResource
}
}

func (ml *multiLedger) systemChain() *systemChain {
return ml.sysChain
}

func (ml *multiLedger) newChain(configtx *cb.Envelope) {
ledgerResources := ml.newLedgerResources(configtx)
ledgerResources.ledger.Append(ordererledger.CreateNextBlock(ledgerResources.ledger, []*cb.Envelope{configtx}))
Expand Down
43 changes: 12 additions & 31 deletions orderer/multichain/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/hyperledger/fabric/common/configtx"
configtxtest "github.com/hyperledger/fabric/common/configtx/test"
genesisconfig "github.com/hyperledger/fabric/common/configtx/tool/localconfig"
"github.com/hyperledger/fabric/common/configtx/tool/provisional"
mockcrypto "github.com/hyperledger/fabric/common/mocks/crypto"
Expand All @@ -31,7 +32,6 @@ import (
ab "github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric/protos/utils"

"github.com/hyperledger/fabric/msp"
logging "github.com/op/go-logging"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -223,23 +223,18 @@ func TestNewChain(t *testing.T) {

manager := NewManagerImpl(lf, consenters, mockCrypto())

generator := provisional.New(conf)
channelTemplate := generator.ChannelTemplate()

signer, err := msp.NewNoopMsp().GetDefaultSigningIdentity()
assert.NoError(t, err)

newChainID := "TestNewChain"
newChainMessage, err := configtx.MakeChainCreationTransaction(provisional.AcceptAllPolicyKey, newChainID, signer, channelTemplate)

configEnv, err := configtx.NewChainCreationTemplate(provisional.AcceptAllPolicyKey, configtxtest.CompositeTemplate()).Envelope(newChainID)
if err != nil {
t.Fatalf("Error producing config transaction: %s", err)
t.Fatalf("Error constructing configtx")
}
ingressTx := makeConfigTxFromConfigUpdateEnvelope(newChainID, configEnv)
wrapped := wrapConfigTx(ingressTx)

status := manager.ProposeChain(newChainMessage)

if status != cb.Status_SUCCESS {
t.Fatalf("Error submitting chain creation request")
}
chainSupport, ok := manager.GetChain(manager.SystemChannelID())
assert.True(t, ok, "Could not find system channel")
chainSupport.Enqueue(wrapped)

it, _ := rl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 1}}})
select {
Expand All @@ -251,21 +246,13 @@ func TestNewChain(t *testing.T) {
if len(block.Data.Data) != 1 {
t.Fatalf("Should have had only one message in the orderer transaction block")
}
configEnv, err := configtx.UnmarshalConfigEnvelope(utils.UnmarshalPayloadOrPanic(
utils.UnmarshalEnvelopeOrPanic(utils.UnmarshalPayloadOrPanic(utils.ExtractEnvelopeOrPanic(block, 0).Payload).Data).Payload).Data)

if err != nil {
t.Fatal(err)
}

if !reflect.DeepEqual(configEnv.LastUpdate, newChainMessage) {
t.Errorf("Orderer config block contains wrong transaction, expected %v got %v", configEnv.LastUpdate, newChainMessage)
}
assert.Equal(t, wrapped, utils.UnmarshalEnvelopeOrPanic(block.Data.Data[0]), "Orderer config block contains wrong transaction")
case <-time.After(time.Second):
t.Fatalf("Block 1 not produced after timeout in system chain")
}

chainSupport, ok := manager.GetChain(newChainID)
chainSupport, ok = manager.GetChain(newChainID)

if !ok {
t.Fatalf("Should have gotten new chain which was created")
Expand All @@ -290,14 +277,8 @@ func TestNewChain(t *testing.T) {
if len(block.Data.Data) != 1 {
t.Fatalf("Should have had only one message in the new genesis block")
}
configEnv, err := configtx.ConfigEnvelopeFromBlock(block)
if err != nil {
t.Fatal(err)
}

if !reflect.DeepEqual(configEnv.LastUpdate, newChainMessage) {
t.Errorf("Genesis block contains wrong transaction, expected %v got %v", configEnv.LastUpdate, newChainMessage)
}
assert.Equal(t, ingressTx, utils.UnmarshalEnvelopeOrPanic(block.Data.Data[0]), "Genesis block contains wrong transaction")
case <-time.After(time.Second):
t.Fatalf("Block 1 not produced after timeout in system chain")
}
Expand Down
Loading

0 comments on commit 5455c58

Please sign in to comment.