Skip to content

Commit

Permalink
[FAB-1359] Drop custom flags for Kafka orderer
Browse files Browse the repository at this point in the history
https://jira.hyperledger.org/browse/FAB-1359

As we are slowly moving to a setup where the behavior of the orderer is
controlled by the config options captured in the genesis block and the
orderer YAML file (and their overrides via ENV vars), it's time to drop
the flag support that the Kafka orderer provided.

This changeset then:
1. Removes all flags from the Kafka orderer.
2. Adds a "verbose" option to the YAML file to control logging for the
package that we use to interact with the Kafka cluster (sarama).

Additionally it:
3. Prefixes all test-related variables with "test" so as to make tests
more legible and remove ambiguity.
4. Updates the test config object to match the keys of the actual config
object.
5. Adds a helper test envelope constructor function, and moves some
test-related functions around in an effort to consolidate files.

Change-Id: Id749d0b88f62a4212854e18b8c469d90fe2f6877
Signed-off-by: Kostas Christidis <kostas@christidis.io>
  • Loading branch information
kchristidis committed Dec 13, 2016
1 parent addfd4d commit 95094cd
Show file tree
Hide file tree
Showing 19 changed files with 114 additions and 132 deletions.
2 changes: 1 addition & 1 deletion bddtests/docker-compose-orderer-kafka.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ orderer0:
- ORDERER_KAFKA_BROKERS=[kafka0:9092]
links:
- kafka0
command: orderer -loglevel debug -verbose true
command: orderer
2 changes: 1 addition & 1 deletion bddtests/environments/orderer-1-kafka-1/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ services:
- ORDERER_GENERAL_ORDERERTYPE=kafka
- ORDERER_KAFKA_BROKERS=[kafka0:9092]
working_dir: /opt/gopath/src/github.com/hyperledger/fabric/orderer
command: orderer -loglevel debug -verbose true
command: orderer
depends_on:
- kafka0

Expand Down
2 changes: 1 addition & 1 deletion bddtests/environments/orderer-1-kafka-3/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ services:
- ORDERER_GENERAL_ORDERERTYPE=kafka
- ORDERER_KAFKA_BROKERS=[kafka0:9092,kafka1:9092,kafka2:9092]
working_dir: /opt/gopath/src/github.com/hyperledger/fabric/orderer
command: orderer -loglevel debug -verbose true
command: orderer
depends_on:
- kafka0
- kafka1
Expand Down
1 change: 0 additions & 1 deletion bddtests/environments/orderer-n-kafka-n/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ services:
depends_on:
- zookeeper
- kafka
command: -loglevel debug -verbose true

kafka:
build: ../kafka
Expand Down
20 changes: 10 additions & 10 deletions orderer/kafka/broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
func TestBroadcastResponse(t *testing.T) {
disk := make(chan []byte)

mb := mockNewBroadcaster(t, testConf, oldestOffset, disk)
mb := mockNewBroadcaster(t, testConf, testOldestOffset, disk)
defer testClose(t, mb)

mbs := newMockBroadcastStream(t)
Expand Down Expand Up @@ -60,7 +60,7 @@ func TestBroadcastResponse(t *testing.T) {
func TestBroadcastBatch(t *testing.T) {
disk := make(chan []byte)

mb := mockNewBroadcaster(t, testConf, oldestOffset, disk)
mb := mockNewBroadcaster(t, testConf, testOldestOffset, disk)
defer testClose(t, mb)

mbs := newMockBroadcastStream(t)
Expand Down Expand Up @@ -113,7 +113,7 @@ func TestBroadcastBatch(t *testing.T) {
disk := make(chan []byte)
mb := mockNewBroadcaster(t, testConf, oldestOffset, disk)
mb := mockNewBroadcaster(t, testConf, testOldestOffset, disk)
defer testClose(t, mb)
mbs := newMockBroadcastStream(t)
Expand All @@ -139,7 +139,7 @@ loop:
select {
case <-mbs.outgoing:
t.Fatal("Client shouldn't have received anything from the orderer")
case <-time.After(testConf.General.BatchTimeout + timePadding):
case <-time.After(testConf.General.BatchTimeout + testTimePadding):
break loop // This is the success path
}
}
Expand All @@ -154,7 +154,7 @@ func TestBroadcastIncompleteBatch(t *testing.T) {

disk := make(chan []byte)

mb := mockNewBroadcaster(t, testConf, oldestOffset, disk)
mb := mockNewBroadcaster(t, testConf, testOldestOffset, disk)
defer testClose(t, mb)

mbs := newMockBroadcastStream(t)
Expand Down Expand Up @@ -189,7 +189,7 @@ func TestBroadcastIncompleteBatch(t *testing.T) {
t.Fatalf("Expected block to have %d messages instead of %d", messageCount, len(block.Data.Data))
}
return
case <-time.After(testConf.General.BatchTimeout + timePadding):
case <-time.After(testConf.General.BatchTimeout + testTimePadding):
t.Fatal("Should have received a block by now")
}
}
Expand All @@ -206,7 +206,7 @@ func TestBroadcastConsecutiveIncompleteBatches(t *testing.T) {

disk := make(chan []byte)

mb := mockNewBroadcaster(t, testConf, oldestOffset, disk)
mb := mockNewBroadcaster(t, testConf, testOldestOffset, disk)
defer testClose(t, mb)

mbs := newMockBroadcastStream(t)
Expand Down Expand Up @@ -247,7 +247,7 @@ func TestBroadcastConsecutiveIncompleteBatches(t *testing.T) {
t.Fatalf("Expected block to have %d messages instead of %d", messageCount, len(block.Data.Data))
}
return
case <-time.After(testConf.General.BatchTimeout + timePadding):
case <-time.After(testConf.General.BatchTimeout + testTimePadding):
t.Fatal("Should have received a block by now")
}
}
Expand All @@ -256,7 +256,7 @@ func TestBroadcastConsecutiveIncompleteBatches(t *testing.T) {
func TestBroadcastBatchAndQuitEarly(t *testing.T) {
disk := make(chan []byte)

mb := mockNewBroadcaster(t, testConf, oldestOffset, disk)
mb := mockNewBroadcaster(t, testConf, testOldestOffset, disk)
defer testClose(t, mb)

mbs := newMockBroadcastStream(t)
Expand Down Expand Up @@ -301,7 +301,7 @@ func TestBroadcastBatchAndQuitEarly(t *testing.T) {
func TestBroadcastClose(t *testing.T) {
errChan := make(chan error)

mb := mockNewBroadcaster(t, testConf, oldestOffset, make(chan []byte))
mb := mockNewBroadcaster(t, testConf, testOldestOffset, make(chan []byte))
mbs := newMockBroadcastStream(t)
go func() {
if err := mb.Broadcast(mbs); err != nil {
Expand Down
10 changes: 5 additions & 5 deletions orderer/kafka/broker_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,18 @@ type mockBrockerImpl struct {
}

func mockNewBroker(t *testing.T, conf *config.TopLevel) Broker {
mockBroker := sarama.NewMockBroker(t, brokerID)
mockBroker := sarama.NewMockBroker(t, testBrokerID)
handlerMap := make(map[string]sarama.MockResponse)
// The sarama mock package doesn't allow us to return an error
// for invalid offset requests, so we return an offset of -1.
// Note that the mock offset responses below imply a broker with
// newestOffset-1 blocks available. Therefore, if you are using this
// testNewestOffset-1 blocks available. Therefore, if you are using this
// broker as part of a bigger test where you intend to consume blocks,
// make sure that the mockConsumer has been initialized accordingly
// (Set the 'seek' parameter to newestOffset-1.)
// (Set the 'seek' parameter to testNewestOffset-1.)
handlerMap["OffsetRequest"] = sarama.NewMockOffsetResponse(t).
SetOffset(conf.Kafka.Topic, conf.Kafka.PartitionID, sarama.OffsetOldest, oldestOffset).
SetOffset(conf.Kafka.Topic, conf.Kafka.PartitionID, sarama.OffsetNewest, newestOffset)
SetOffset(conf.Kafka.Topic, conf.Kafka.PartitionID, sarama.OffsetOldest, testOldestOffset).
SetOffset(conf.Kafka.Topic, conf.Kafka.PartitionID, sarama.OffsetNewest, testNewestOffset)
mockBroker.SetHandlerByMap(handlerMap)

broker := sarama.NewBroker(mockBroker.Addr())
Expand Down
4 changes: 2 additions & 2 deletions orderer/kafka/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
)

func TestBrokerGetOffset(t *testing.T) {
t.Run("oldest", testBrokerGetOffsetFunc(sarama.OffsetOldest, oldestOffset))
t.Run("newest", testBrokerGetOffsetFunc(sarama.OffsetNewest, newestOffset))
t.Run("oldest", testBrokerGetOffsetFunc(sarama.OffsetOldest, testOldestOffset))
t.Run("newest", testBrokerGetOffsetFunc(sarama.OffsetNewest, testNewestOffset))
}

func testBrokerGetOffsetFunc(given, expected int64) func(t *testing.T) {
Expand Down
18 changes: 9 additions & 9 deletions orderer/kafka/client_deliver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import (
)

func TestClientDeliverSeekWrong(t *testing.T) {
t.Run("out-of-range-1", testClientDeliverSeekWrongFunc(uint64(oldestOffset)-1, 10))
t.Run("out-of-range-2", testClientDeliverSeekWrongFunc(uint64(newestOffset), 10))
t.Run("bad-window-1", testClientDeliverSeekWrongFunc(uint64(oldestOffset), 0))
t.Run("bad-window-2", testClientDeliverSeekWrongFunc(uint64(oldestOffset), uint64(testConf.General.MaxWindowSize+1)))
t.Run("out-of-range-1", testClientDeliverSeekWrongFunc(uint64(testOldestOffset)-1, 10))
t.Run("out-of-range-2", testClientDeliverSeekWrongFunc(uint64(testNewestOffset), 10))
t.Run("bad-window-1", testClientDeliverSeekWrongFunc(uint64(testOldestOffset), 0))
t.Run("bad-window-2", testClientDeliverSeekWrongFunc(uint64(testOldestOffset), uint64(testConf.General.MaxWindowSize+1)))
}

func testClientDeliverSeekWrongFunc(seek, window uint64) func(t *testing.T) {
Expand Down Expand Up @@ -65,7 +65,7 @@ func testClientDeliverSeekWrongFunc(seek, window uint64) func(t *testing.T) {

func TestClientDeliverSeek(t *testing.T) {
t.Run("oldest", testClientDeliverSeekFunc("oldest", 0, 10, 10))
t.Run("in-between", testClientDeliverSeekFunc("specific", uint64(middleOffset), 10, 10))
t.Run("in-between", testClientDeliverSeekFunc("specific", uint64(testMiddleOffset), 10, 10))
t.Run("newest", testClientDeliverSeekFunc("newest", 0, 10, 1))
}

Expand Down Expand Up @@ -104,8 +104,8 @@ func testClientDeliverSeekFunc(label string, seek, window uint64, expected int)
}

func TestClientDeliverAckWrong(t *testing.T) {
t.Run("out-of-range-ack-1", testClientDeliverAckWrongFunc(uint64(middleOffset)-2))
t.Run("out-of-range-ack-2", testClientDeliverAckWrongFunc(uint64(newestOffset)))
t.Run("out-of-range-ack-1", testClientDeliverAckWrongFunc(uint64(testMiddleOffset)-2))
t.Run("out-of-range-ack-2", testClientDeliverAckWrongFunc(uint64(testNewestOffset)))
}

func testClientDeliverAckWrongFunc(ack uint64) func(t *testing.T) {
Expand All @@ -123,7 +123,7 @@ func testClientDeliverAckWrongFunc(ack uint64) func(t *testing.T) {
}
}()

mds.incoming <- testNewSeekMessage("specific", uint64(middleOffset), 10)
mds.incoming <- testNewSeekMessage("specific", uint64(testMiddleOffset), 10)
mds.incoming <- testNewAckMessage(ack)
for {
select {
Expand All @@ -141,7 +141,7 @@ func testClientDeliverAckWrongFunc(ack uint64) func(t *testing.T) {
}

func TestClientDeliverAck(t *testing.T) {
t.Run("in-between", testClientDeliverAckFunc("specific", uint64(middleOffset), 10, 10, 2*10))
t.Run("in-between", testClientDeliverAckFunc("specific", uint64(testMiddleOffset), 10, 10, 2*10))
t.Run("newest", testClientDeliverAckFunc("newest", 0, 10, 1, 1))
}

Expand Down
61 changes: 0 additions & 61 deletions orderer/kafka/common_test.go

This file was deleted.

68 changes: 59 additions & 9 deletions orderer/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,88 @@ limitations under the License.
package kafka

import (
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/hyperledger/fabric/orderer/localconfig"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
)

var (
brokerID = int32(0)
oldestOffset = int64(100) // The oldest block available on the broker
newestOffset = int64(1100) // The offset that will be assigned to the next block
middleOffset = (oldestOffset + newestOffset - 1) / 2 // Just an offset in the middle
testBrokerID = int32(0)
testOldestOffset = int64(100) // The oldest block available on the broker
testNewestOffset = int64(1100) // The offset that will be assigned to the next block
testMiddleOffset = (testOldestOffset + testNewestOffset - 1) / 2 // Just an offset in the middle

// Amount of time to wait for block processing when doing time-based tests
// We generally want this value to be as small as possible so as to make tests execute faster
// But this may have to be bumped up in slower machines
timePadding = 200 * time.Millisecond
testTimePadding = 200 * time.Millisecond
)

var testConf = &config.TopLevel{
General: config.General{
OrdererType: "kafka",
LedgerType: "ram",
BatchTimeout: 500 * time.Millisecond,
BatchSize: 100,
QueueSize: 100,
MaxWindowSize: 100,
ListenAddress: "127.0.0.1",
ListenPort: 7050,
GenesisMethod: "static",
},
Kafka: config.Kafka{
Brokers: []string{"127.0.0.1:9092"},
Topic: "test",
PartitionID: 0,
Version: sarama.V0_9_0_1,
Brokers: []string{"127.0.0.1:9092"},
Retry: config.Retry{
Period: 3 * time.Second,
Stop: 60 * time.Second,
},
Verbose: false,
Version: sarama.V0_9_0_1,
},
}

func testClose(t *testing.T, x Closeable) {
if err := x.Close(); err != nil {
t.Fatal("Cannot close mock resource:", err)
}
}

func newTestEnvelope(content string) *cb.Envelope {
return &cb.Envelope{Payload: []byte(content)}
}

func testNewSeekMessage(startLabel string, seekNo, windowNo uint64) *ab.DeliverUpdate {
var startVal ab.SeekInfo_StartType
switch startLabel {
case "oldest":
startVal = ab.SeekInfo_OLDEST
case "newest":
startVal = ab.SeekInfo_NEWEST
default:
startVal = ab.SeekInfo_SPECIFIED

}
return &ab.DeliverUpdate{
Type: &ab.DeliverUpdate_Seek{
Seek: &ab.SeekInfo{
Start: startVal,
SpecifiedNumber: seekNo,
WindowSize: windowNo,
},
},
}
}

func testNewAckMessage(ackNo uint64) *ab.DeliverUpdate {
return &ab.DeliverUpdate{
Type: &ab.DeliverUpdate_Acknowledgement{
Acknowledgement: &ab.Acknowledgement{
Number: ackNo,
},
},
}
}
4 changes: 2 additions & 2 deletions orderer/kafka/consumer_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func mockNewConsumer(t *testing.T, conf *config.TopLevel, seek int64) (Consumer,
t: t,
}
// Stop-gap hack until #745 is resolved:
if seek >= oldestOffset && seek <= (newestOffset-1) {
if seek >= testOldestOffset && seek <= (testNewestOffset-1) {
mc.testFillWithBlocks(seek - 1) // Prepare the consumer so that the next Recv gives you block "seek"
} else {
err = fmt.Errorf("Out of range seek number given to consumer")
Expand All @@ -73,7 +73,7 @@ func mockNewConsumer(t *testing.T, conf *config.TopLevel, seek int64) (Consumer,
}

func (mc *mockConsumerImpl) Recv() <-chan *sarama.ConsumerMessage {
if mc.consumedOffset >= newestOffset-1 {
if mc.consumedOffset >= testNewestOffset-1 {
return nil
}
mc.consumedOffset++
Expand Down
Loading

0 comments on commit 95094cd

Please sign in to comment.