Skip to content

Commit

Permalink
Remove non-determinism in tests (undo FAB-839)
Browse files Browse the repository at this point in the history
The changeset submitted for FAB-839 introduces some non-determinism which
makes the tests in the kafka package block. To test that this is the
case, checkout that changeset [1] and run the tests in the kafka package
with the "-count 50" option. A quick inspection shows we may be dealing
with context leakage, but I'll defer to Luis to investigate this further.
Until then, this changeset disables the new test and restores the old
send-replies logic of the code. It passes all tests.

[1] https://gerrit.hyperledger.org/r/#/c/2043/

Change-Id: Ie29bc38288fad3c893bc332771974ec1f19b7a4b
Signed-off-by: Kostas Christidis <kostas@christidis.io>
  • Loading branch information
kchristidis committed Nov 20, 2016
1 parent 9d3abd1 commit 548e9d7
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 40 deletions.
36 changes: 6 additions & 30 deletions orderer/kafka/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/hyperledger/fabric/orderer/config"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
"golang.org/x/net/context"

"github.com/golang/protobuf/proto"
)
Expand Down Expand Up @@ -142,9 +141,7 @@ func (b *broadcasterImpl) cutBlock(period time.Duration, maxSize uint) {
}

func (b *broadcasterImpl) recvRequests(stream ab.AtomicBroadcast_BroadcastServer) error {
context, cancel := context.WithCancel(context.Background())
defer cancel()
bsr := newBroadcastSessionResponder(context, stream, b.config.General.QueueSize)
reply := new(ab.BroadcastResponse)
for {
msg, err := stream.Recv()
if err != nil {
Expand All @@ -153,33 +150,12 @@ func (b *broadcasterImpl) recvRequests(stream ab.AtomicBroadcast_BroadcastServer
}

b.batchChan <- msg
bsr.reply(cb.Status_SUCCESS) // TODO This shouldn't always be a success
reply.Status = cb.Status_SUCCESS // TODO This shouldn't always be a success

}
}

func newBroadcastSessionResponder(context context.Context, stream ab.AtomicBroadcast_BroadcastServer, queueSize uint) *broadcastSessionResponder {
bsr := &broadcastSessionResponder{
queue: make(chan *ab.BroadcastResponse, queueSize),
}
go bsr.sendReplies(context, stream)
return bsr
}

func (bsr *broadcastSessionResponder) reply(status cb.Status) {
bsr.queue <- &ab.BroadcastResponse{Status: status}
}

func (bsr *broadcastSessionResponder) sendReplies(context context.Context, stream ab.AtomicBroadcast_BroadcastServer) {
for {
select {
case reply := <-bsr.queue:
if err := stream.Send(reply); err != nil {
logger.Info("Cannot send broadcast reply to client")
}
logger.Debugf("Sent broadcast reply %v to client", reply.Status.String())
case <-context.Done():
return
if err := stream.Send(reply); err != nil {
logger.Info("Cannot send broadcast reply to client")
}
logger.Debugf("Sent broadcast reply %s to client", reply.Status.String())

}
}
2 changes: 1 addition & 1 deletion orderer/kafka/broadcast_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func mockNewBroadcaster(t *testing.T, conf *config.TopLevel, seek int64, disk ch
mb := &broadcasterImpl{
producer: mockNewProducer(t, conf, seek, disk),
config: conf,
batchChan: make(chan *cb.Envelope, batchChanSize),
batchChan: make(chan *cb.Envelope, conf.General.BatchSize),
messages: [][]byte{[]byte("checkpoint")},
nextNumber: uint64(seek),
}
Expand Down
4 changes: 2 additions & 2 deletions orderer/kafka/broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func TestBroadcastBatch(t *testing.T) {
// then if the response queue overflows, the order should not be able
// to send back a block to the client. (Sending replies and adding
// messages to the about-to-be-sent block happens on the same routine.)
func TestBroadcastResponseQueueOverflow(t *testing.T) {
/* func TestBroadcastResponseQueueOverflow(t *testing.T) {
// Make sure that the response queue is less than the batch size
originalQueueSize := testConf.General.QueueSize
Expand Down Expand Up @@ -180,7 +180,7 @@ loop:
break loop // This is the success path
}
}
}
} */

func TestBroadcastIncompleteBatch(t *testing.T) {
if testConf.General.BatchSize <= 1 {
Expand Down
9 changes: 4 additions & 5 deletions orderer/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@ import (
)

var (
brokerID = int32(0)
oldestOffset = int64(100) // The oldest block available on the broker
newestOffset = int64(1100) // The offset that will be assigned to the next block
middleOffset = (oldestOffset + newestOffset - 1) / 2 // Just an offset in the middle
batchChanSize = 1000 // Size of batch channel (eventually sync with FAB-821)
brokerID = int32(0)
oldestOffset = int64(100) // The oldest block available on the broker
newestOffset = int64(1100) // The offset that will be assigned to the next block
middleOffset = (oldestOffset + newestOffset - 1) / 2 // Just an offset in the middle

// Amount of time to wait for block processing when doing time-based tests
// We generally want this value to be as small as possible so as to make tests execute faster
Expand Down
2 changes: 0 additions & 2 deletions orderer/kafka/orderer_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ func (mbs *mockBroadcastStream) Recv() (*cb.Envelope, error) {
}

func (mbs *mockBroadcastStream) Send(reply *ab.BroadcastResponse) error {
for mbs.closed {
}
if !mbs.closed {
mbs.outgoing <- reply
}
Expand Down
1 change: 1 addition & 0 deletions orderer/orderer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ General:
BatchSize: 10

# Queue Size: The maximum number of messages to allow pending from a gRPC client
# This option is currently ignored for the Kafka OrdererType.
QueueSize: 10

# Max Window Size: The maximum number of messages to for the orderer Deliver
Expand Down

0 comments on commit 548e9d7

Please sign in to comment.