From 8a648894e582caf1ae7b56c695754ef03e6b5624 Mon Sep 17 00:00:00 2001 From: Kostas Christidis Date: Sat, 19 Nov 2016 10:29:48 -0500 Subject: [PATCH] [FAB-1092] Illegal genesis message When the kafka package was merged, we hadn't settled on a block format, and when the block format got finalized these changes weren't applied to the kafka package. As an interim solution, since this entire package is being reworked to support multiple chains and the factoring out of common components between the kafka and solo packages, this changeset hooks *directly* into the static bootstrapper to provide a valid genesis block. Change-Id: Ieb542e9293b31dcd3f4c8c53d422c734a49a9886 Signed-off-by: Kostas Christidis --- orderer/kafka/broadcast.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/orderer/kafka/broadcast.go b/orderer/kafka/broadcast.go index 2dccb3d0acb..140033c1138 100644 --- a/orderer/kafka/broadcast.go +++ b/orderer/kafka/broadcast.go @@ -21,11 +21,11 @@ import ( "sync" "time" - "golang.org/x/net/context" - + "github.com/hyperledger/fabric/orderer/common/bootstrap/static" "github.com/hyperledger/fabric/orderer/config" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" + "golang.org/x/net/context" "github.com/golang/protobuf/proto" ) @@ -47,12 +47,17 @@ type broadcasterImpl struct { prevHash []byte } +type broadcastSessionResponder struct { + queue chan *ab.BroadcastResponse +} + func newBroadcaster(conf *config.TopLevel) Broadcaster { + genesisBlock, _ := static.New().GenesisBlock() return &broadcasterImpl{ producer: newProducer(conf), config: conf, batchChan: make(chan *cb.Envelope, conf.General.BatchSize), - messages: [][]byte{[]byte("genesis")}, + messages: genesisBlock.GetData().Data, nextNumber: 0, } } @@ -98,7 +103,6 @@ func (b *broadcasterImpl) sendBlock() error { b.prevHash = block.Header.Hash() blockBytes, err := proto.Marshal(block) - if err != nil { logger.Fatalf("Error marshaling block: %s", err) } @@ -154,10 +158,6 @@ func (b *broadcasterImpl) recvRequests(stream ab.AtomicBroadcast_BroadcastServer } } -type broadcastSessionResponder struct { - queue chan *ab.BroadcastResponse -} - func newBroadcastSessionResponder(context context.Context, stream ab.AtomicBroadcast_BroadcastServer, queueSize uint) *broadcastSessionResponder { bsr := &broadcastSessionResponder{ queue: make(chan *ab.BroadcastResponse, queueSize),