diff --git a/orderer/kafka/broadcast.go b/orderer/kafka/broadcast.go index b45a4834c39..c32996f22bd 100644 --- a/orderer/kafka/broadcast.go +++ b/orderer/kafka/broadcast.go @@ -46,25 +46,17 @@ type broadcasterImpl struct { prevHash []byte } -type broadcastSessionResponder struct { - queue chan *ab.BroadcastResponse -} - func newBroadcaster(conf *config.TopLevel) Broadcaster { genesisBlock, _ := static.New().GenesisBlock() - return &broadcasterImpl{ + + b := &broadcasterImpl{ producer: newProducer(conf), config: conf, batchChan: make(chan *cb.Envelope, conf.General.BatchSize), messages: genesisBlock.GetData().Data, nextNumber: 0, } -} -// Broadcast receives ordering requests by clients and sends back an -// acknowledgement for each received message in order, indicating -// success or type of failure -func (b *broadcasterImpl) Broadcast(stream ab.AtomicBroadcast_BroadcastServer) error { b.once.Do(func() { // Send the genesis block to create the topic // otherwise consumers will throw an exception. @@ -72,6 +64,14 @@ func (b *broadcasterImpl) Broadcast(stream ab.AtomicBroadcast_BroadcastServer) e // Spawn the goroutine that cuts blocks go b.cutBlock(b.config.General.BatchTimeout, b.config.General.BatchSize) }) + + return b +} + +// Broadcast receives ordering requests by clients and sends back an +// acknowledgement for each received message in order, indicating +// success or type of failure +func (b *broadcasterImpl) Broadcast(stream ab.AtomicBroadcast_BroadcastServer) error { return b.recvRequests(stream) } diff --git a/orderer/kafka/broadcast_mock_test.go b/orderer/kafka/broadcast_mock_test.go index 6063581e5f6..acf698b9362 100644 --- a/orderer/kafka/broadcast_mock_test.go +++ b/orderer/kafka/broadcast_mock_test.go @@ -17,19 +17,47 @@ limitations under the License. package kafka import ( + "fmt" "testing" + "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric/orderer/common/bootstrap/static" "github.com/hyperledger/fabric/orderer/config" cb "github.com/hyperledger/fabric/protos/common" ) func mockNewBroadcaster(t *testing.T, conf *config.TopLevel, seek int64, disk chan []byte) Broadcaster { + genesisBlock, _ := static.New().GenesisBlock() + wait := make(chan struct{}) + mb := &broadcasterImpl{ producer: mockNewProducer(t, conf, seek, disk), config: conf, batchChan: make(chan *cb.Envelope, conf.General.BatchSize), - messages: [][]byte{[]byte("checkpoint")}, + messages: genesisBlock.GetData().Data, nextNumber: uint64(seek), } + + go func() { + rxBlockBytes := <-disk + rxBlock := &cb.Block{} + if err := proto.Unmarshal(rxBlockBytes, rxBlock); err != nil { + panic(err) + } + if !proto.Equal(rxBlock.GetData(), genesisBlock.GetData()) { + panic(fmt.Errorf("Broadcaster not functioning as expected")) + } + close(wait) + }() + + mb.once.Do(func() { + // Send the genesis block to create the topic + // otherwise consumers will throw an exception. + mb.sendBlock() + // Spawn the goroutine that cuts blocks + go mb.cutBlock(mb.config.General.BatchTimeout, mb.config.General.BatchSize) + }) + <-wait + return mb } diff --git a/orderer/kafka/broadcast_test.go b/orderer/kafka/broadcast_test.go index 133c8b8c41f..86140ba8919 100644 --- a/orderer/kafka/broadcast_test.go +++ b/orderer/kafka/broadcast_test.go @@ -17,8 +17,8 @@ limitations under the License. package kafka import ( - "bytes" "strconv" + "sync" "testing" "time" @@ -26,37 +26,6 @@ import ( cb "github.com/hyperledger/fabric/protos/common" ) -func TestBroadcastInit(t *testing.T) { - disk := make(chan []byte) - - mb := mockNewBroadcaster(t, testConf, oldestOffset, disk) - defer testClose(t, mb) - - mbs := newMockBroadcastStream(t) - go func() { - if err := mb.Broadcast(mbs); err != nil { - t.Fatal("Broadcast error:", err) - } - }() - - for { - select { - case in := <-disk: - block := new(cb.Block) - err := proto.Unmarshal(in, block) - if err != nil { - t.Fatal("Expected a block on the broker's disk") - } - if !(bytes.Equal(block.Data.Data[0], []byte("checkpoint"))) { - t.Fatal("Expected first block to be a checkpoint") - } - return - case <-time.After(500 * time.Millisecond): - t.Fatal("Should have received the initialization block by now") - } - } -} - func TestBroadcastResponse(t *testing.T) { disk := make(chan []byte) @@ -70,8 +39,6 @@ func TestBroadcastResponse(t *testing.T) { } }() - <-disk // We tested the checkpoint block in a previous test, so we can ignore it now - // Send a message to the orderer go func() { mbs.incoming <- &cb.Envelope{Payload: []byte("single message")} @@ -103,8 +70,6 @@ func TestBroadcastBatch(t *testing.T) { } }() - <-disk // We tested the checkpoint block in a previous test, so we can ignore it now - // Pump a batch's worth of messages into the system go func() { for i := 0; i < int(testConf.General.BatchSize); i++ { @@ -158,8 +123,6 @@ func TestBroadcastBatch(t *testing.T) { } }() - <-disk // We tested the checkpoint block in a previous test, so we can ignore it now - // Force the response queue to overflow by blocking the broadcast stream's Send() method mbs.closed = true defer func() { mbs.closed = false }() @@ -201,8 +164,6 @@ func TestBroadcastIncompleteBatch(t *testing.T) { } }() - <-disk // We tested the checkpoint block in a previous test, so we can ignore it now - // Pump less than batchSize messages into the system go func() { for i := 0; i < messageCount; i++ { @@ -239,6 +200,8 @@ func TestBroadcastConsecutiveIncompleteBatches(t *testing.T) { t.Skip("Skipping test as it requires a batchsize > 1") } + var once sync.Once + messageCount := int(testConf.General.BatchSize) - 1 disk := make(chan []byte) @@ -254,8 +217,6 @@ func TestBroadcastConsecutiveIncompleteBatches(t *testing.T) { }() for i := 0; i < 2; i++ { - <-disk // Checkpoint block in first pass, first incomplete block in second pass -- both tested elsewhere - // Pump less than batchSize messages into the system go func() { for i := 0; i < messageCount; i++ { @@ -268,6 +229,10 @@ func TestBroadcastConsecutiveIncompleteBatches(t *testing.T) { for i := 0; i < messageCount; i++ { <-mbs.outgoing } + + once.Do(func() { + <-disk // First incomplete block, tested elsewhere + }) } for { @@ -301,8 +266,6 @@ func TestBroadcastBatchAndQuitEarly(t *testing.T) { } }() - <-disk // We tested the checkpoint block in a previous test, so we can ignore it now - // Pump a batch's worth of messages into the system go func() { for i := 0; i < int(testConf.General.BatchSize); i++ {