From 7442b127dc0712ab96a9bf89e9d696250bb60733 Mon Sep 17 00:00:00 2001
From: Luis Sanchez <sanchezl@us.ibm.com>
Date: Wed, 26 Oct 2016 11:55:49 -0400
Subject: [PATCH] Do not block on Broadcast responses

FAB-839

Added per-connection buffered channel of size QueueSize
for responses.

Added unit test TestBroadcastResponseQueueOverflow.

Change-Id: I317d127f74dcc8115201f8c127e83230d9d13a58
Signed-off-by: Luis Sanchez <sanchezl@us.ibm.com>
---
 orderer/kafka/broadcast.go           | 43 ++++++++++++++++++++-----
 orderer/kafka/broadcast_mock_test.go |  2 +-
 orderer/kafka/broadcast_test.go      | 47 ++++++++++++++++++++++++++++
 orderer/kafka/config_test.go         |  9 +++---
 orderer/kafka/orderer_mock_test.go   |  2 ++
 orderer/orderer.yaml                 |  1 -
 6 files changed, 91 insertions(+), 13 deletions(-)

diff --git a/orderer/kafka/broadcast.go b/orderer/kafka/broadcast.go
index f764ef51812..2dccb3d0acb 100644
--- a/orderer/kafka/broadcast.go
+++ b/orderer/kafka/broadcast.go
@@ -21,6 +21,8 @@ import (
 	"sync"
 	"time"
 
+	"golang.org/x/net/context"
+
 	"github.com/hyperledger/fabric/orderer/config"
 	cb "github.com/hyperledger/fabric/protos/common"
 	ab "github.com/hyperledger/fabric/protos/orderer"
@@ -112,7 +114,7 @@ func (b *broadcasterImpl) cutBlock(period time.Duration, maxSize uint) {
 		case msg := <-b.batchChan:
 			data, err := proto.Marshal(msg)
 			if err != nil {
-				logger.Fatalf("Error marshaling what should be a valid proto message: %s", err)
+				panic(fmt.Errorf("Error marshaling what should be a valid proto message: %s", err))
 			}
 			b.messages = append(b.messages, data)
 			if len(b.messages) >= int(maxSize) {
@@ -136,7 +138,9 @@ func (b *broadcasterImpl) cutBlock(period time.Duration, maxSize uint) {
 }
 
 func (b *broadcasterImpl) recvRequests(stream ab.AtomicBroadcast_BroadcastServer) error {
-	reply := new(ab.BroadcastResponse)
+	context, cancel := context.WithCancel(context.Background())
+	defer cancel()
+	bsr := newBroadcastSessionResponder(context, stream, b.config.General.QueueSize)
 	for {
 		msg, err := stream.Recv()
 		if err != nil {
@@ -145,12 +149,37 @@ func (b *broadcasterImpl) recvRequests(stream ab.AtomicBroadcast_BroadcastServer
 		}
 
 		b.batchChan <- msg
-		reply.Status = cb.Status_SUCCESS // TODO This shouldn't always be a success
+		bsr.reply(cb.Status_SUCCESS) // TODO This shouldn't always be a success
 
-		if err := stream.Send(reply); err != nil {
-			logger.Info("Cannot send broadcast reply to client")
-			return err
+	}
+}
+
+type broadcastSessionResponder struct {
+	queue chan *ab.BroadcastResponse
+}
+
+func newBroadcastSessionResponder(context context.Context, stream ab.AtomicBroadcast_BroadcastServer, queueSize uint) *broadcastSessionResponder {
+	bsr := &broadcastSessionResponder{
+		queue: make(chan *ab.BroadcastResponse, queueSize),
+	}
+	go bsr.sendReplies(context, stream)
+	return bsr
+}
+
+func (bsr *broadcastSessionResponder) reply(status cb.Status) {
+	bsr.queue <- &ab.BroadcastResponse{Status: status}
+}
+
+func (bsr *broadcastSessionResponder) sendReplies(context context.Context, stream ab.AtomicBroadcast_BroadcastServer) {
+	for {
+		select {
+		case reply := <-bsr.queue:
+			if err := stream.Send(reply); err != nil {
+				logger.Info("Cannot send broadcast reply to client")
+			}
+			logger.Debugf("Sent broadcast reply %v to client", reply.Status.String())
+		case <-context.Done():
+			return
 		}
-		logger.Debugf("Sent broadcast reply %v to client", reply.Status.String())
 	}
 }
diff --git a/orderer/kafka/broadcast_mock_test.go b/orderer/kafka/broadcast_mock_test.go
index 6063581e5f6..7bddf70068f 100644
--- a/orderer/kafka/broadcast_mock_test.go
+++ b/orderer/kafka/broadcast_mock_test.go
@@ -27,7 +27,7 @@ func mockNewBroadcaster(t *testing.T, conf *config.TopLevel, seek int64, disk ch
 	mb := &broadcasterImpl{
 		producer:   mockNewProducer(t, conf, seek, disk),
 		config:     conf,
-		batchChan:  make(chan *cb.Envelope, conf.General.BatchSize),
+		batchChan:  make(chan *cb.Envelope, batchChanSize),
 		messages:   [][]byte{[]byte("checkpoint")},
 		nextNumber: uint64(seek),
 	}
diff --git a/orderer/kafka/broadcast_test.go b/orderer/kafka/broadcast_test.go
index b4a41a9ead8..73899f71fa7 100644
--- a/orderer/kafka/broadcast_test.go
+++ b/orderer/kafka/broadcast_test.go
@@ -135,6 +135,53 @@ func TestBroadcastBatch(t *testing.T) {
 	}
 }
 
+// If the capacity of the response queue is less than the batch size,
+// then if the response queue overflows, the order should not be able
+// to send back a block to the client. (Sending replies and adding
+// messages to the about-to-be-sent block happens on the same routine.)
+func TestBroadcastResponseQueueOverflow(t *testing.T) {
+
+	// Make sure that the response queue is less than the batch size
+	originalQueueSize := testConf.General.QueueSize
+	defer func() { testConf.General.QueueSize = originalQueueSize }()
+	testConf.General.QueueSize = testConf.General.BatchSize - 1
+
+	disk := make(chan []byte)
+
+	mb := mockNewBroadcaster(t, testConf, oldestOffset, disk)
+	defer testClose(t, mb)
+
+	mbs := newMockBroadcastStream(t)
+	go func() {
+		if err := mb.Broadcast(mbs); err != nil {
+			t.Fatal("Broadcast error:", err)
+		}
+	}()
+
+	<-disk // We tested the checkpoint block in a previous test, so we can ignore it now
+
+	// Force the response queue to overflow by blocking the broadcast stream's Send() method
+	mbs.closed = true
+	defer func() { mbs.closed = false }()
+
+	// Pump a batch's worth of messages into the system
+	go func() {
+		for i := 0; i < int(testConf.General.BatchSize); i++ {
+			mbs.incoming <- &cb.Envelope{Payload: []byte("message " + strconv.Itoa(i))}
+		}
+	}()
+
+loop:
+	for {
+		select {
+		case <-mbs.outgoing:
+			t.Fatal("Client shouldn't have received anything from the orderer")
+		case <-time.After(testConf.General.BatchTimeout + timePadding):
+			break loop // This is the success path
+		}
+	}
+}
+
 func TestBroadcastIncompleteBatch(t *testing.T) {
 	if testConf.General.BatchSize <= 1 {
 		t.Skip("Skipping test as it requires a batchsize > 1")
diff --git a/orderer/kafka/config_test.go b/orderer/kafka/config_test.go
index ae4794d1449..1432e466fb1 100644
--- a/orderer/kafka/config_test.go
+++ b/orderer/kafka/config_test.go
@@ -24,10 +24,11 @@ import (
 )
 
 var (
-	brokerID     = int32(0)
-	oldestOffset = int64(100)                            // The oldest block available on the broker
-	newestOffset = int64(1100)                           // The offset that will be assigned to the next block
-	middleOffset = (oldestOffset + newestOffset - 1) / 2 // Just an offset in the middle
+	brokerID      = int32(0)
+	oldestOffset  = int64(100)                            // The oldest block available on the broker
+	newestOffset  = int64(1100)                           // The offset that will be assigned to the next block
+	middleOffset  = (oldestOffset + newestOffset - 1) / 2 // Just an offset in the middle
+	batchChanSize = 1000                                  // Size of batch channel (eventually sync with FAB-821)
 
 	// Amount of time to wait for block processing when doing time-based tests
 	// We generally want this value to be as small as possible so as to make tests execute faster
diff --git a/orderer/kafka/orderer_mock_test.go b/orderer/kafka/orderer_mock_test.go
index 5bb80ce3ef3..4123e27b544 100644
--- a/orderer/kafka/orderer_mock_test.go
+++ b/orderer/kafka/orderer_mock_test.go
@@ -53,6 +53,8 @@ func (mbs *mockBroadcastStream) Recv() (*cb.Envelope, error) {
 }
 
 func (mbs *mockBroadcastStream) Send(reply *ab.BroadcastResponse) error {
+	for mbs.closed {
+	}
 	if !mbs.closed {
 		mbs.outgoing <- reply
 	}
diff --git a/orderer/orderer.yaml b/orderer/orderer.yaml
index 9a1dc77944c..a756eee1bff 100644
--- a/orderer/orderer.yaml
+++ b/orderer/orderer.yaml
@@ -25,7 +25,6 @@ General:
     BatchSize: 10
 
     # Queue Size: The maximum number of messages to allow pending from a gRPC client
-    # When Kafka is chosen as the OrdererType, this option is ignored.
     QueueSize: 10
 
     # Max Window Size: The maximum number of messages to for the orderer Deliver