Skip to content

Commit

Permalink
Merge "[FAB-1161] Push genesis block upon orderer init"
Browse files Browse the repository at this point in the history
  • Loading branch information
mastersingh24 authored and Gerrit Code Review committed Nov 29, 2016
2 parents 2c22539 + c1e6fb4 commit af0cd3e
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 55 deletions.
20 changes: 10 additions & 10 deletions orderer/kafka/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,32 +46,32 @@ type broadcasterImpl struct {
prevHash []byte
}

type broadcastSessionResponder struct {
queue chan *ab.BroadcastResponse
}

func newBroadcaster(conf *config.TopLevel) Broadcaster {
genesisBlock, _ := static.New().GenesisBlock()
return &broadcasterImpl{

b := &broadcasterImpl{
producer: newProducer(conf),
config: conf,
batchChan: make(chan *cb.Envelope, conf.General.BatchSize),
messages: genesisBlock.GetData().Data,
nextNumber: 0,
}
}

// Broadcast receives ordering requests by clients and sends back an
// acknowledgement for each received message in order, indicating
// success or type of failure
func (b *broadcasterImpl) Broadcast(stream ab.AtomicBroadcast_BroadcastServer) error {
b.once.Do(func() {
// Send the genesis block to create the topic
// otherwise consumers will throw an exception.
b.sendBlock()
// Spawn the goroutine that cuts blocks
go b.cutBlock(b.config.General.BatchTimeout, b.config.General.BatchSize)
})

return b
}

// Broadcast receives ordering requests by clients and sends back an
// acknowledgement for each received message in order, indicating
// success or type of failure
func (b *broadcasterImpl) Broadcast(stream ab.AtomicBroadcast_BroadcastServer) error {
return b.recvRequests(stream)
}

Expand Down
30 changes: 29 additions & 1 deletion orderer/kafka/broadcast_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,47 @@ limitations under the License.
package kafka

import (
"fmt"
"testing"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/orderer/common/bootstrap/static"
"github.com/hyperledger/fabric/orderer/config"
cb "github.com/hyperledger/fabric/protos/common"
)

func mockNewBroadcaster(t *testing.T, conf *config.TopLevel, seek int64, disk chan []byte) Broadcaster {
genesisBlock, _ := static.New().GenesisBlock()
wait := make(chan struct{})

mb := &broadcasterImpl{
producer: mockNewProducer(t, conf, seek, disk),
config: conf,
batchChan: make(chan *cb.Envelope, conf.General.BatchSize),
messages: [][]byte{[]byte("checkpoint")},
messages: genesisBlock.GetData().Data,
nextNumber: uint64(seek),
}

go func() {
rxBlockBytes := <-disk
rxBlock := &cb.Block{}
if err := proto.Unmarshal(rxBlockBytes, rxBlock); err != nil {
panic(err)
}
if !proto.Equal(rxBlock.GetData(), genesisBlock.GetData()) {
panic(fmt.Errorf("Broadcaster not functioning as expected"))
}
close(wait)
}()

mb.once.Do(func() {
// Send the genesis block to create the topic
// otherwise consumers will throw an exception.
mb.sendBlock()
// Spawn the goroutine that cuts blocks
go mb.cutBlock(mb.config.General.BatchTimeout, mb.config.General.BatchSize)
})
<-wait

return mb
}
51 changes: 7 additions & 44 deletions orderer/kafka/broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,46 +17,15 @@ limitations under the License.
package kafka

import (
"bytes"
"strconv"
"sync"
"testing"
"time"

"github.com/golang/protobuf/proto"
cb "github.com/hyperledger/fabric/protos/common"
)

func TestBroadcastInit(t *testing.T) {
disk := make(chan []byte)

mb := mockNewBroadcaster(t, testConf, oldestOffset, disk)
defer testClose(t, mb)

mbs := newMockBroadcastStream(t)
go func() {
if err := mb.Broadcast(mbs); err != nil {
t.Fatal("Broadcast error:", err)
}
}()

for {
select {
case in := <-disk:
block := new(cb.Block)
err := proto.Unmarshal(in, block)
if err != nil {
t.Fatal("Expected a block on the broker's disk")
}
if !(bytes.Equal(block.Data.Data[0], []byte("checkpoint"))) {
t.Fatal("Expected first block to be a checkpoint")
}
return
case <-time.After(500 * time.Millisecond):
t.Fatal("Should have received the initialization block by now")
}
}
}

func TestBroadcastResponse(t *testing.T) {
disk := make(chan []byte)

Expand All @@ -70,8 +39,6 @@ func TestBroadcastResponse(t *testing.T) {
}
}()

<-disk // We tested the checkpoint block in a previous test, so we can ignore it now

// Send a message to the orderer
go func() {
mbs.incoming <- &cb.Envelope{Payload: []byte("single message")}
Expand Down Expand Up @@ -103,8 +70,6 @@ func TestBroadcastBatch(t *testing.T) {
}
}()

<-disk // We tested the checkpoint block in a previous test, so we can ignore it now

// Pump a batch's worth of messages into the system
go func() {
for i := 0; i < int(testConf.General.BatchSize); i++ {
Expand Down Expand Up @@ -158,8 +123,6 @@ func TestBroadcastBatch(t *testing.T) {
}
}()
<-disk // We tested the checkpoint block in a previous test, so we can ignore it now
// Force the response queue to overflow by blocking the broadcast stream's Send() method
mbs.closed = true
defer func() { mbs.closed = false }()
Expand Down Expand Up @@ -201,8 +164,6 @@ func TestBroadcastIncompleteBatch(t *testing.T) {
}
}()

<-disk // We tested the checkpoint block in a previous test, so we can ignore it now

// Pump less than batchSize messages into the system
go func() {
for i := 0; i < messageCount; i++ {
Expand Down Expand Up @@ -239,6 +200,8 @@ func TestBroadcastConsecutiveIncompleteBatches(t *testing.T) {
t.Skip("Skipping test as it requires a batchsize > 1")
}

var once sync.Once

messageCount := int(testConf.General.BatchSize) - 1

disk := make(chan []byte)
Expand All @@ -254,8 +217,6 @@ func TestBroadcastConsecutiveIncompleteBatches(t *testing.T) {
}()

for i := 0; i < 2; i++ {
<-disk // Checkpoint block in first pass, first incomplete block in second pass -- both tested elsewhere

// Pump less than batchSize messages into the system
go func() {
for i := 0; i < messageCount; i++ {
Expand All @@ -268,6 +229,10 @@ func TestBroadcastConsecutiveIncompleteBatches(t *testing.T) {
for i := 0; i < messageCount; i++ {
<-mbs.outgoing
}

once.Do(func() {
<-disk // First incomplete block, tested elsewhere
})
}

for {
Expand Down Expand Up @@ -301,8 +266,6 @@ func TestBroadcastBatchAndQuitEarly(t *testing.T) {
}
}()

<-disk // We tested the checkpoint block in a previous test, so we can ignore it now

// Pump a batch's worth of messages into the system
go func() {
for i := 0; i < int(testConf.General.BatchSize); i++ {
Expand Down

0 comments on commit af0cd3e

Please sign in to comment.