From 675d094da8886d92099574ca6353d1f3aca9ba0c Mon Sep 17 00:00:00 2001 From: Kostas Christidis Date: Sun, 11 Dec 2016 03:04:11 -0500 Subject: [PATCH] [FAB-1367] Rebase Kafka on common components https://jira.hyperledger.org/browse/FAB-1367 This is the changeset that ties together all the previous changesets in this series: it rebases the Kafka consenter on the components that are meant to be common across all consensus implementations. The major addition here is the new logic which resides in "main.go" in the kafka package. On top of the existing unit tests for the Kafka consenter's sub-components (producer, consumer, broker), this changeset introduces some basic unit tests for the new logic, identical to the ones for the solo package. More tests are needed however, and we are in the process of writing BDD feature files for that purpose - see https://jira.hyperledger.org/browse/FAB-1335 for more. This changeset also adds the following: 1. A "disk" (a channel for passing messages) to the mock Consumer. The mock Consumer will use that "disk" to fetch the replies that it returns during the Recv() call. This is required for the units tests we introduced, where we want the (mock) consumer to respond with the messages that the (mock) producer posted. The two mock constructs use this disk channel to communicate, simulating a consumer that reads the messages that a producer posted to a partition. 2. A timeout to the mock consumer's Recv() method to prevent blocking in for/select loops. Finally, this changeset makes the following refactoring-related changes: 1. It establishes a common path for both solo and the kafka consenters in the orderer package's "main.go" - the previous version of this file was whipped up together quickly and was essentially a giant if-else construct. As we get closer to a release, some refactoring was in order. 2. It makes all dependencies in the kafka package explicit. In the previous version, we would pass around the entire config object to functions and let them pick what they needed from it. While this made for symmetric and simple-looking function signatures, it allowed for ambiguous and implicit dependencies, and would increase confusion and maintenance burden. This specific edit affects almost all files in the kafka package and makes for a bigger diff than necessary. I had unfortunately began working on this before beginning the 'rebasing on common components' work, and proceeded with both tasks in parallel. (In hindsight, I should have scheduled this a bit better.) Change-Id: I46392b8079ec02e273b16f51a7e8ed514c64748f Signed-off-by: Kostas Christidis --- orderer/kafka/broadcast.go | 159 ----------- orderer/kafka/broadcast_mock_test.go | 64 ----- orderer/kafka/broadcast_test.go | 328 ---------------------- orderer/kafka/broker.go | 88 +++--- orderer/kafka/broker_mock_test.go | 17 +- orderer/kafka/broker_test.go | 67 ++--- orderer/kafka/client_deliver.go | 236 ---------------- orderer/kafka/client_deliver_mock_test.go | 51 ---- orderer/kafka/client_deliver_test.go | 183 ------------ orderer/kafka/config_test.go | 33 --- orderer/kafka/consumer.go | 26 +- orderer/kafka/consumer_mock_test.go | 113 ++++---- orderer/kafka/consumer_test.go | 26 +- orderer/kafka/deliver.go | 64 ----- orderer/kafka/deliver_mock_test.go | 50 ---- orderer/kafka/deliver_test.go | 94 ------- orderer/kafka/main.go | 246 ++++++++++++++++ orderer/kafka/main_test.go | 215 ++++++++++++++ orderer/kafka/orderer.go | 63 ----- orderer/kafka/orderer_mock_test.go | 100 ------- orderer/kafka/producer.go | 35 ++- orderer/kafka/producer_mock_test.go | 71 +++-- orderer/kafka/producer_test.go | 21 +- orderer/kafka/util.go | 30 +- orderer/kafka/util_test.go | 63 +++-- orderer/localconfig/config.go | 20 +- orderer/main.go | 63 +---- orderer/orderer.yaml | 15 +- 28 files changed, 791 insertions(+), 1750 deletions(-) delete mode 100644 orderer/kafka/broadcast.go delete mode 100644 orderer/kafka/broadcast_mock_test.go delete mode 100644 orderer/kafka/broadcast_test.go delete mode 100644 orderer/kafka/client_deliver.go delete mode 100644 orderer/kafka/client_deliver_mock_test.go delete mode 100644 orderer/kafka/client_deliver_test.go delete mode 100644 orderer/kafka/deliver.go delete mode 100644 orderer/kafka/deliver_mock_test.go delete mode 100644 orderer/kafka/deliver_test.go create mode 100644 orderer/kafka/main.go create mode 100644 orderer/kafka/main_test.go delete mode 100644 orderer/kafka/orderer.go delete mode 100644 orderer/kafka/orderer_mock_test.go diff --git a/orderer/kafka/broadcast.go b/orderer/kafka/broadcast.go deleted file mode 100644 index 39354ffab65..00000000000 --- a/orderer/kafka/broadcast.go +++ /dev/null @@ -1,159 +0,0 @@ -/* -Copyright IBM Corp. 2016 All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kafka - -import ( - "fmt" - "sync" - "time" - - "github.com/hyperledger/fabric/orderer/common/bootstrap/provisional" - "github.com/hyperledger/fabric/orderer/localconfig" - cb "github.com/hyperledger/fabric/protos/common" - ab "github.com/hyperledger/fabric/protos/orderer" - - "github.com/golang/protobuf/proto" -) - -// Broadcaster allows the caller to submit messages to the orderer -type Broadcaster interface { - Broadcast(stream ab.AtomicBroadcast_BroadcastServer) error - Closeable -} - -type broadcasterImpl struct { - producer Producer - config *config.TopLevel - once sync.Once - - batchChan chan *cb.Envelope - messages [][]byte - nextNumber uint64 - prevHash []byte -} - -func newBroadcaster(conf *config.TopLevel) Broadcaster { - b := &broadcasterImpl{ - producer: newProducer(conf), - config: conf, - batchChan: make(chan *cb.Envelope, conf.General.BatchSize), - messages: provisional.New(conf).GenesisBlock().GetData().Data, - nextNumber: 0, - } - - b.once.Do(func() { - // Send the genesis block to create the topic - // otherwise consumers will throw an exception. - b.sendBlock() - // Spawn the goroutine that cuts blocks - go b.cutBlock(b.config.General.BatchTimeout, b.config.General.BatchSize) - }) - - return b -} - -// Broadcast receives ordering requests by clients and sends back an -// acknowledgement for each received message in order, indicating -// success or type of failure -func (b *broadcasterImpl) Broadcast(stream ab.AtomicBroadcast_BroadcastServer) error { - return b.recvRequests(stream) -} - -// Close shuts down the broadcast side of the orderer -func (b *broadcasterImpl) Close() error { - if b.producer != nil { - return b.producer.Close() - } - return nil -} - -func (b *broadcasterImpl) sendBlock() error { - data := &cb.BlockData{ - Data: b.messages, - } - block := &cb.Block{ - Header: &cb.BlockHeader{ - Number: b.nextNumber, - PreviousHash: b.prevHash, - DataHash: data.Hash(), - }, - Data: data, - } - logger.Debugf("Prepared block %d with %d messages (%+v)", block.Header.Number, len(block.Data.Data), block) - - b.messages = [][]byte{} - b.nextNumber++ - b.prevHash = block.Header.Hash() - - blockBytes, err := proto.Marshal(block) - if err != nil { - logger.Fatalf("Error marshaling block: %s", err) - } - - return b.producer.Send(blockBytes) -} - -func (b *broadcasterImpl) cutBlock(period time.Duration, maxSize uint32) { - timer := time.NewTimer(period) - - for { - select { - case msg := <-b.batchChan: - data, err := proto.Marshal(msg) - if err != nil { - panic(fmt.Errorf("Error marshaling what should be a valid proto message: %s", err)) - } - b.messages = append(b.messages, data) - if len(b.messages) >= int(maxSize) { - if !timer.Stop() { - <-timer.C - } - timer.Reset(period) - if err := b.sendBlock(); err != nil { - panic(fmt.Errorf("Cannot communicate with Kafka broker: %s", err)) - } - } - case <-timer.C: - timer.Reset(period) - if len(b.messages) > 0 { - if err := b.sendBlock(); err != nil { - panic(fmt.Errorf("Cannot communicate with Kafka broker: %s", err)) - } - } - } - } -} - -func (b *broadcasterImpl) recvRequests(stream ab.AtomicBroadcast_BroadcastServer) error { - reply := new(ab.BroadcastResponse) - for { - msg, err := stream.Recv() - if err != nil { - logger.Debug("Can no longer receive requests from client (exited?)") - return err - } - - b.batchChan <- msg - reply.Status = cb.Status_SUCCESS // TODO This shouldn't always be a success - - if err := stream.Send(reply); err != nil { - logger.Info("Cannot send broadcast reply to client") - } - logger.Debugf("Sent broadcast reply %s to client", reply.Status.String()) - - } -} diff --git a/orderer/kafka/broadcast_mock_test.go b/orderer/kafka/broadcast_mock_test.go deleted file mode 100644 index 891f9916425..00000000000 --- a/orderer/kafka/broadcast_mock_test.go +++ /dev/null @@ -1,64 +0,0 @@ -/* -Copyright IBM Corp. 2016 All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kafka - -import ( - "fmt" - "testing" - - "github.com/hyperledger/fabric/orderer/common/bootstrap/provisional" - "github.com/hyperledger/fabric/orderer/localconfig" - cb "github.com/hyperledger/fabric/protos/common" - - "github.com/golang/protobuf/proto" -) - -func mockNewBroadcaster(t *testing.T, conf *config.TopLevel, seek int64, disk chan []byte) Broadcaster { - genesisBlock := provisional.New(conf).GenesisBlock() - wait := make(chan struct{}) - - mb := &broadcasterImpl{ - producer: mockNewProducer(t, conf, seek, disk), - config: conf, - batchChan: make(chan *cb.Envelope, conf.General.BatchSize), - messages: genesisBlock.GetData().Data, - nextNumber: uint64(seek), - } - - go func() { - rxBlockBytes := <-disk - rxBlock := &cb.Block{} - if err := proto.Unmarshal(rxBlockBytes, rxBlock); err != nil { - panic(err) - } - if !proto.Equal(rxBlock.GetData(), genesisBlock.GetData()) { - panic(fmt.Errorf("Broadcaster not functioning as expected")) - } - close(wait) - }() - - mb.once.Do(func() { - // Send the genesis block to create the topic - // otherwise consumers will throw an exception. - mb.sendBlock() - // Spawn the goroutine that cuts blocks - go mb.cutBlock(mb.config.General.BatchTimeout, mb.config.General.BatchSize) - }) - <-wait - - return mb -} diff --git a/orderer/kafka/broadcast_test.go b/orderer/kafka/broadcast_test.go deleted file mode 100644 index 569249c15c5..00000000000 --- a/orderer/kafka/broadcast_test.go +++ /dev/null @@ -1,328 +0,0 @@ -/* -Copyright IBM Corp. 2016 All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kafka - -import ( - "strconv" - "sync" - "testing" - "time" - - "github.com/golang/protobuf/proto" - cb "github.com/hyperledger/fabric/protos/common" -) - -func TestBroadcastResponse(t *testing.T) { - disk := make(chan []byte) - - mb := mockNewBroadcaster(t, testConf, testOldestOffset, disk) - defer testClose(t, mb) - - mbs := newMockBroadcastStream(t) - go func() { - if err := mb.Broadcast(mbs); err != nil { - t.Fatal("Broadcast error:", err) - } - }() - - // Send a message to the orderer - go func() { - mbs.incoming <- &cb.Envelope{Payload: []byte("single message")} - }() - - for { - select { - case reply := <-mbs.outgoing: - if reply.Status != cb.Status_SUCCESS { - t.Fatal("Client should have received a SUCCESS reply") - } - return - case <-time.After(500 * time.Millisecond): - t.Fatal("Should have received a broadcast reply by the orderer by now") - } - } -} - -func TestBroadcastBatch(t *testing.T) { - disk := make(chan []byte) - - mb := mockNewBroadcaster(t, testConf, testOldestOffset, disk) - defer testClose(t, mb) - - mbs := newMockBroadcastStream(t) - go func() { - if err := mb.Broadcast(mbs); err != nil { - t.Fatal("Broadcast error:", err) - } - }() - - // Pump a batch's worth of messages into the system - go func() { - for i := 0; i < int(testConf.General.BatchSize); i++ { - mbs.incoming <- &cb.Envelope{Payload: []byte("message " + strconv.Itoa(i))} - } - }() - - // Ignore the broadcast replies as they have been tested elsewhere - for i := 0; i < int(testConf.General.BatchSize); i++ { - <-mbs.outgoing - } - - for { - select { - case in := <-disk: - block := new(cb.Block) - err := proto.Unmarshal(in, block) - if err != nil { - t.Fatal("Expected a block on the broker's disk") - } - if len(block.Data.Data) != int(testConf.General.BatchSize) { - t.Fatalf("Expected block to have %d messages instead of %d", testConf.General.BatchSize, len(block.Data.Data)) - } - return - case <-time.After(500 * time.Millisecond): - t.Fatal("Should have received a block by now") - } - } -} - -// If the capacity of the response queue is less than the batch size, -// then if the response queue overflows, the order should not be able -// to send back a block to the client. (Sending replies and adding -// messages to the about-to-be-sent block happens on the same routine.) -/* func TestBroadcastResponseQueueOverflow(t *testing.T) { - - // Make sure that the response queue is less than the batch size - originalQueueSize := testConf.General.QueueSize - defer func() { testConf.General.QueueSize = originalQueueSize }() - testConf.General.QueueSize = testConf.General.BatchSize - 1 - - disk := make(chan []byte) - - mb := mockNewBroadcaster(t, testConf, testOldestOffset, disk) - defer testClose(t, mb) - - mbs := newMockBroadcastStream(t) - go func() { - if err := mb.Broadcast(mbs); err != nil { - t.Fatal("Broadcast error:", err) - } - }() - - // Force the response queue to overflow by blocking the broadcast stream's Send() method - mbs.closed = true - defer func() { mbs.closed = false }() - - // Pump a batch's worth of messages into the system - go func() { - for i := 0; i < int(testConf.General.BatchSize); i++ { - mbs.incoming <- &cb.Envelope{Payload: []byte("message " + strconv.Itoa(i))} - } - }() - -loop: - for { - select { - case <-mbs.outgoing: - t.Fatal("Client shouldn't have received anything from the orderer") - case <-time.After(testConf.General.BatchTimeout + testTimePadding): - break loop // This is the success path - } - } -} */ - -func TestBroadcastIncompleteBatch(t *testing.T) { - if testConf.General.BatchSize <= 1 { - t.Skip("Skipping test as it requires a batchsize > 1") - } - - messageCount := int(testConf.General.BatchSize) - 1 - - disk := make(chan []byte) - - mb := mockNewBroadcaster(t, testConf, testOldestOffset, disk) - defer testClose(t, mb) - - mbs := newMockBroadcastStream(t) - go func() { - if err := mb.Broadcast(mbs); err != nil { - t.Fatal("Broadcast error:", err) - } - }() - - // Pump less than batchSize messages into the system - go func() { - for i := 0; i < messageCount; i++ { - payload, _ := proto.Marshal(&cb.Payload{Data: []byte("message " + strconv.Itoa(i))}) - mbs.incoming <- &cb.Envelope{Payload: payload} - } - }() - - // Ignore the broadcast replies as they have been tested elsewhere - for i := 0; i < messageCount; i++ { - <-mbs.outgoing - } - - for { - select { - case in := <-disk: - block := new(cb.Block) - err := proto.Unmarshal(in, block) - if err != nil { - t.Fatal("Expected a block on the broker's disk") - } - if len(block.Data.Data) != messageCount { - t.Fatalf("Expected block to have %d messages instead of %d", messageCount, len(block.Data.Data)) - } - return - case <-time.After(testConf.General.BatchTimeout + testTimePadding): - t.Fatal("Should have received a block by now") - } - } -} - -func TestBroadcastConsecutiveIncompleteBatches(t *testing.T) { - if testConf.General.BatchSize <= 1 { - t.Skip("Skipping test as it requires a batchsize > 1") - } - - var once sync.Once - - messageCount := int(testConf.General.BatchSize) - 1 - - disk := make(chan []byte) - - mb := mockNewBroadcaster(t, testConf, testOldestOffset, disk) - defer testClose(t, mb) - - mbs := newMockBroadcastStream(t) - go func() { - if err := mb.Broadcast(mbs); err != nil { - t.Fatal("Broadcast error:", err) - } - }() - - for i := 0; i < 2; i++ { - // Pump less than batchSize messages into the system - go func() { - for i := 0; i < messageCount; i++ { - payload, _ := proto.Marshal(&cb.Payload{Data: []byte("message " + strconv.Itoa(i))}) - mbs.incoming <- &cb.Envelope{Payload: payload} - } - }() - - // Ignore the broadcast replies as they have been tested elsewhere - for i := 0; i < messageCount; i++ { - <-mbs.outgoing - } - - once.Do(func() { - <-disk // First incomplete block, tested elsewhere - }) - } - - for { - select { - case in := <-disk: - block := new(cb.Block) - err := proto.Unmarshal(in, block) - if err != nil { - t.Fatal("Expected a block on the broker's disk") - } - if len(block.Data.Data) != messageCount { - t.Fatalf("Expected block to have %d messages instead of %d", messageCount, len(block.Data.Data)) - } - return - case <-time.After(testConf.General.BatchTimeout + testTimePadding): - t.Fatal("Should have received a block by now") - } - } -} - -func TestBroadcastBatchAndQuitEarly(t *testing.T) { - disk := make(chan []byte) - - mb := mockNewBroadcaster(t, testConf, testOldestOffset, disk) - defer testClose(t, mb) - - mbs := newMockBroadcastStream(t) - go func() { - if err := mb.Broadcast(mbs); err != nil { - t.Fatal("Broadcast error:", err) - } - }() - - // Pump a batch's worth of messages into the system - go func() { - for i := 0; i < int(testConf.General.BatchSize); i++ { - mbs.incoming <- &cb.Envelope{Payload: []byte("message " + strconv.Itoa(i))} - } - }() - - // In contrast to TestBroadcastBatch, do not receive any replies. - // This simulates the case where you quit early (though you would - // most likely still get replies in a real world scenario, as long - // as you don't receive all of them we're on the same page). - for !mbs.CloseOut() { - } - - for { - select { - case in := <-disk: - block := new(cb.Block) - err := proto.Unmarshal(in, block) - if err != nil { - t.Fatal("Expected a block on the broker's disk") - } - if len(block.Data.Data) != int(testConf.General.BatchSize) { - t.Fatalf("Expected block to have %d messages instead of %d", testConf.General.BatchSize, len(block.Data.Data)) - } - return - case <-time.After(500 * time.Millisecond): - t.Fatal("Should have received a block by now") - } - } -} - -func TestBroadcastClose(t *testing.T) { - errChan := make(chan error) - - mb := mockNewBroadcaster(t, testConf, testOldestOffset, make(chan []byte)) - mbs := newMockBroadcastStream(t) - go func() { - if err := mb.Broadcast(mbs); err != nil { - t.Fatal("Broadcast error:", err) - } - }() - - go func() { - errChan <- mb.Close() - }() - - for { - select { - case err := <-errChan: - if err != nil { - t.Fatal("Error when closing the broadcaster:", err) - } - return - case <-time.After(500 * time.Millisecond): - t.Fatal("Broadcaster should have closed its producer by now") - } - } - -} diff --git a/orderer/kafka/broker.go b/orderer/kafka/broker.go index d0030cb4ca9..dcd37524e35 100644 --- a/orderer/kafka/broker.go +++ b/orderer/kafka/broker.go @@ -20,90 +20,88 @@ import ( "fmt" "github.com/Shopify/sarama" - "github.com/hyperledger/fabric/orderer/localconfig" ) -// Broker allows the caller to get info on the orderer's stream +// Broker allows the caller to get info on the cluster's partitions type Broker interface { - GetOffset(req *sarama.OffsetRequest) (int64, error) + GetOffset(cp ChainPartition, req *sarama.OffsetRequest) (int64, error) Closeable } type brokerImpl struct { broker *sarama.Broker - config *config.TopLevel } -func newBroker(conf *config.TopLevel) Broker { +// Connects to the broker that handles all produce and consume +// requests for the given chain (Partition Leader Replica) +func newBroker(brokers []string, cp ChainPartition) (Broker, error) { + var candidateBroker, connectedBroker, leaderBroker *sarama.Broker - // connect to one of the bootstrap servers - var bootstrapServer *sarama.Broker - for _, hostPort := range conf.Kafka.Brokers { - broker := sarama.NewBroker(hostPort) - if err := broker.Open(nil); err != nil { - logger.Warningf("Failed to connect to bootstrap server at %s: %v.", hostPort, err) + // Connect to one of the given brokers + for _, hostPort := range brokers { + candidateBroker = sarama.NewBroker(hostPort) + if err := candidateBroker.Open(nil); err != nil { + logger.Warningf("Failed to connect to broker %s: %s", hostPort, err) continue } - if connected, err := broker.Connected(); !connected { - logger.Warningf("Failed to connect to bootstrap server at %s: %v.", hostPort, err) + if connected, err := candidateBroker.Connected(); !connected { + logger.Warningf("Failed to connect to broker %s: %s", hostPort, err) continue } - bootstrapServer = broker + connectedBroker = candidateBroker break } - if bootstrapServer == nil { - panic(fmt.Errorf("Failed to connect to any of the bootstrap servers (%v) for metadata request.", conf.Kafka.Brokers)) + + if connectedBroker == nil { + return nil, fmt.Errorf("Failed to connect to any of the given brokers (%v) for metadata request", brokers) } - logger.Debugf("Connected to bootstrap server at %s.", bootstrapServer.Addr()) + logger.Debugf("Connected to broker %s", connectedBroker.Addr()) - // get metadata for topic - topic := conf.Kafka.Topic - metadata, err := bootstrapServer.GetMetadata(&sarama.MetadataRequest{Topics: []string{topic}}) + // Get metadata for the topic that corresponds to this chain + metadata, err := connectedBroker.GetMetadata(&sarama.MetadataRequest{Topics: []string{cp.Topic()}}) if err != nil { - panic(fmt.Errorf("GetMetadata failed for topic %s: %v", topic, err)) + return nil, fmt.Errorf("Failed to get metadata for topic %s: %s", cp, err) } - // get leader broker for given topic/partition - var broker *sarama.Broker - partitionID := conf.Kafka.PartitionID - if (partitionID >= 0) && (partitionID < int32(len(metadata.Topics[0].Partitions))) { - leader := metadata.Topics[0].Partitions[partitionID].Leader - logger.Debugf("Leading broker for topic %s/partition %d is broker ID %d", topic, partitionID, leader) - for _, b := range metadata.Brokers { - if b.ID() == leader { - broker = b + // Get the leader broker for this chain partition + if (cp.Partition() >= 0) && (cp.Partition() < int32(len(metadata.Topics[0].Partitions))) { + leaderBrokerID := metadata.Topics[0].Partitions[cp.Partition()].Leader + logger.Debugf("Leading broker for chain %s is broker ID %d", cp, leaderBrokerID) + for _, availableBroker := range metadata.Brokers { + if availableBroker.ID() == leaderBrokerID { + leaderBroker = availableBroker break } } } - if broker == nil { - panic(fmt.Errorf("Can't find leader for topic %s/partition %d", topic, partitionID)) + + if leaderBroker == nil { + return nil, fmt.Errorf("Can't find leader for chain %s", cp) } - // connect to broker - if err := broker.Open(nil); err != nil { - panic(fmt.Errorf("Failed to open Kafka broker: %v", err)) + // Connect to broker + if err := leaderBroker.Open(nil); err != nil { + return nil, fmt.Errorf("Failed to connect ho Kafka broker: %s", err) } - if connected, err := broker.Connected(); !connected { - panic(fmt.Errorf("Failed to open Kafka broker: %v", err)) + if connected, err := leaderBroker.Connected(); !connected { + return nil, fmt.Errorf("Failed to connect to Kafka broker: %s", err) } - return &brokerImpl{ - broker: broker, - config: conf, - } + return &brokerImpl{broker: leaderBroker}, nil } -// GetOffset retrieves the offset number that corresponds to the requested position in the log -func (b *brokerImpl) GetOffset(req *sarama.OffsetRequest) (int64, error) { +// GetOffset retrieves the offset number that corresponds +// to the requested position in the log. +func (b *brokerImpl) GetOffset(cp ChainPartition, req *sarama.OffsetRequest) (int64, error) { resp, err := b.broker.GetAvailableOffsets(req) if err != nil { return int64(-1), err } - return resp.GetBlock(b.config.Kafka.Topic, b.config.Kafka.PartitionID).Offsets[0], nil + return resp.GetBlock(cp.Topic(), cp.Partition()).Offsets[0], nil } -// Close terminates the broker +// Close terminates the broker. +// This is invoked by the session deliverer's getOffset method. func (b *brokerImpl) Close() error { return b.broker.Close() } diff --git a/orderer/kafka/broker_mock_test.go b/orderer/kafka/broker_mock_test.go index bea5ac76640..65c5c59ac88 100644 --- a/orderer/kafka/broker_mock_test.go +++ b/orderer/kafka/broker_mock_test.go @@ -17,10 +17,10 @@ limitations under the License. package kafka import ( + "fmt" "testing" "github.com/Shopify/sarama" - "github.com/hyperledger/fabric/orderer/localconfig" ) type mockBrockerImpl struct { @@ -30,34 +30,33 @@ type mockBrockerImpl struct { handlerMap map[string]sarama.MockResponse } -func mockNewBroker(t *testing.T, conf *config.TopLevel) Broker { +func mockNewBroker(t *testing.T, cp ChainPartition) (Broker, error) { mockBroker := sarama.NewMockBroker(t, testBrokerID) handlerMap := make(map[string]sarama.MockResponse) // The sarama mock package doesn't allow us to return an error // for invalid offset requests, so we return an offset of -1. // Note that the mock offset responses below imply a broker with - // testNewestOffset-1 blocks available. Therefore, if you are using this + // newestOffset-1 blocks available. Therefore, if you are using this // broker as part of a bigger test where you intend to consume blocks, // make sure that the mockConsumer has been initialized accordingly - // (Set the 'seek' parameter to testNewestOffset-1.) + // (Set the 'offset' parameter to newestOffset-1.) handlerMap["OffsetRequest"] = sarama.NewMockOffsetResponse(t). - SetOffset(conf.Kafka.Topic, conf.Kafka.PartitionID, sarama.OffsetOldest, testOldestOffset). - SetOffset(conf.Kafka.Topic, conf.Kafka.PartitionID, sarama.OffsetNewest, testNewestOffset) + SetOffset(cp.Topic(), cp.Partition(), sarama.OffsetOldest, testOldestOffset). + SetOffset(cp.Topic(), cp.Partition(), sarama.OffsetNewest, testNewestOffset) mockBroker.SetHandlerByMap(handlerMap) broker := sarama.NewBroker(mockBroker.Addr()) if err := broker.Open(nil); err != nil { - t.Fatal("Cannot connect to mock broker:", err) + return nil, fmt.Errorf("Cannot connect to mock broker: %s", err) } return &mockBrockerImpl{ brokerImpl: brokerImpl{ broker: broker, - config: conf, }, mockBroker: mockBroker, handlerMap: handlerMap, - } + }, nil } func (mb *mockBrockerImpl) Close() error { diff --git a/orderer/kafka/broker_test.go b/orderer/kafka/broker_test.go index 8717ee8b65a..b40316181ec 100644 --- a/orderer/kafka/broker_test.go +++ b/orderer/kafka/broker_test.go @@ -20,6 +20,7 @@ import ( "testing" "github.com/Shopify/sarama" + "github.com/hyperledger/fabric/orderer/common/bootstrap/provisional" ) func TestBrokerGetOffset(t *testing.T) { @@ -28,71 +29,55 @@ func TestBrokerGetOffset(t *testing.T) { } func testBrokerGetOffsetFunc(given, expected int64) func(t *testing.T) { + cp := newChainPartition(provisional.TestChainID, rawPartition) return func(t *testing.T) { - mb := mockNewBroker(t, testConf) + mb, _ := mockNewBroker(t, cp) defer testClose(t, mb) - offset, _ := mb.GetOffset(newOffsetReq(mb.(*mockBrockerImpl).config, given)) - if offset != expected { - t.Fatalf("Expected offset %d, got %d instead", expected, offset) + ofs, _ := mb.GetOffset(cp, newOffsetReq(cp, given)) + if ofs != expected { + t.Fatalf("Expected offset %d, got %d instead", expected, ofs) } } } func TestNewBrokerReturnsPartitionLeader(t *testing.T) { - - // sarama.Logger = log.New(os.Stdout, "[sarama] ", log.Lshortfile) - // SetLogLevel("debug") - - broker1 := sarama.NewMockBroker(t, 1001) - broker2 := sarama.NewMockBroker(t, 1002) - broker3 := sarama.NewMockBroker(t, 1003) - - // shutdown broker1 - broker1.Close() - - // update list of bootstrap brokers in config - originalKafkaBrokers := testConf.Kafka.Brokers + cp := newChainPartition(provisional.TestChainID, rawPartition) + broker1 := sarama.NewMockBroker(t, 1) + broker2 := sarama.NewMockBroker(t, 2) + broker3 := sarama.NewMockBroker(t, 3) defer func() { - testConf.Kafka.Brokers = originalKafkaBrokers + broker2.Close() + broker3.Close() }() - // add broker1, and broker2 to list of bootstrap brokers - // broker1 is 'down' - // broker3 will be discovered via a metadata request - testConf.Kafka.Brokers = []string{broker1.Addr(), broker2.Addr()} - // handy references - topic := testConf.Kafka.Topic - partition := testConf.Kafka.PartitionID + // Use broker1 and broker2 as bootstrap brokers, but shutdown broker1 right away + broker1.Close() - // add expectation that broker2 will return a metadata response that - // identifies broker3 as the topic partition leader + // Add expectation that broker2 will return a metadata response + // that identifies broker3 as the topic partition leader broker2.SetHandlerByMap(map[string]sarama.MockResponse{ "MetadataRequest": sarama.NewMockMetadataResponse(t). SetBroker(broker1.Addr(), broker1.BrokerID()). SetBroker(broker2.Addr(), broker2.BrokerID()). SetBroker(broker3.Addr(), broker3.BrokerID()). - SetLeader(topic, partition, broker3.BrokerID()), + SetLeader(cp.Topic(), cp.Partition(), broker3.BrokerID()), }) - // add expectation that broker3 respond to an offset request + // Add expectation that broker3 responds to an offset request broker3.SetHandlerByMap(map[string]sarama.MockResponse{ "OffsetRequest": sarama.NewMockOffsetResponse(t). - SetOffset(topic, partition, sarama.OffsetOldest, 0). - SetOffset(topic, partition, sarama.OffsetNewest, 42), + SetOffset(cp.Topic(), cp.Partition(), sarama.OffsetOldest, testOldestOffset). + SetOffset(cp.Topic(), cp.Partition(), sarama.OffsetNewest, testNewestOffset), }) - // get leader for topic partition - broker := newBroker(testConf) + // Get leader for the test chain partition + leaderBroker, _ := newBroker([]string{broker1.Addr(), broker2.Addr()}, cp) - // only broker3 will respond successfully to an offset request + // Only broker3 will respond successfully to an offset request offsetRequest := new(sarama.OffsetRequest) - offsetRequest.AddBlock(topic, partition, -1, 1) - if _, err := broker.GetOffset(offsetRequest); err != nil { - t.Fatal(err) + offsetRequest.AddBlock(cp.Topic(), cp.Partition(), -1, 1) + if _, err := leaderBroker.GetOffset(cp, offsetRequest); err != nil { + t.Fatal("Expected leader broker to respond to request:", err) } - - broker2.Close() - broker3.Close() - } diff --git a/orderer/kafka/client_deliver.go b/orderer/kafka/client_deliver.go deleted file mode 100644 index 3a889c83912..00000000000 --- a/orderer/kafka/client_deliver.go +++ /dev/null @@ -1,236 +0,0 @@ -/* -Copyright IBM Corp. 2016 All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kafka - -import ( - "errors" - "fmt" - - "github.com/golang/protobuf/proto" - "github.com/hyperledger/fabric/orderer/localconfig" - cb "github.com/hyperledger/fabric/protos/common" - ab "github.com/hyperledger/fabric/protos/orderer" -) - -type clientDelivererImpl struct { - brokerFunc func(*config.TopLevel) Broker - consumerFunc func(*config.TopLevel, int64) (Consumer, error) // This resets the consumer. - - consumer Consumer - config *config.TopLevel - deadChan chan struct{} - - errChan chan error - updChan chan *ab.DeliverUpdate - tokenChan chan struct{} - lastACK int64 - window int64 -} - -func newClientDeliverer(conf *config.TopLevel, deadChan chan struct{}) Deliverer { - brokerFunc := func(conf *config.TopLevel) Broker { - return newBroker(conf) - } - consumerFunc := func(conf *config.TopLevel, seek int64) (Consumer, error) { - return newConsumer(conf, seek) - } - - return &clientDelivererImpl{ - brokerFunc: brokerFunc, - consumerFunc: consumerFunc, - - config: conf, - deadChan: deadChan, - errChan: make(chan error), - updChan: make(chan *ab.DeliverUpdate), // TODO Size this properly - } -} - -// Deliver receives updates from a client and returns a stream of blocks to them -func (cd *clientDelivererImpl) Deliver(stream ab.AtomicBroadcast_DeliverServer) error { - go cd.recvUpdates(stream) - return cd.sendBlocks(stream) -} - -// Close shuts down the Deliver server assigned by the orderer to a client -func (cd *clientDelivererImpl) Close() error { - if cd.consumer != nil { - return cd.consumer.Close() - } - return nil -} - -func (cd *clientDelivererImpl) recvUpdates(stream ab.AtomicBroadcast_DeliverServer) { - for { - upd, err := stream.Recv() - if err != nil { - cd.errChan <- err - return - } - cd.updChan <- upd - } -} - -func (cd *clientDelivererImpl) sendBlocks(stream ab.AtomicBroadcast_DeliverServer) error { - var err error - var reply *ab.DeliverResponse - var upd *ab.DeliverUpdate - block := new(cb.Block) - for { - select { - case <-cd.deadChan: - logger.Debug("sendBlocks goroutine for client-deliverer received shutdown signal") - return nil - case err = <-cd.errChan: - return err - case upd = <-cd.updChan: - switch t := upd.GetType().(type) { - case *ab.DeliverUpdate_Seek: - err = cd.processSeek(t) - case *ab.DeliverUpdate_Acknowledgement: - err = cd.processACK(t) - } - if err != nil { - var errorStatus cb.Status - // TODO Will need to flesh this out into - // a proper error handling system eventually. - switch err.Error() { - case seekOutOfRangeError: - errorStatus = cb.Status_NOT_FOUND - case ackOutOfRangeError, windowOutOfRangeError: - errorStatus = cb.Status_BAD_REQUEST - default: - errorStatus = cb.Status_SERVICE_UNAVAILABLE - } - reply = new(ab.DeliverResponse) - reply.Type = &ab.DeliverResponse_Error{Error: errorStatus} - if err := stream.Send(reply); err != nil { - return fmt.Errorf("Failed to send error response to the client: %s", err) - } - return fmt.Errorf("Failed to process received update: %s", err) - } - case <-cd.tokenChan: - select { - case data := <-cd.consumer.Recv(): - err := proto.Unmarshal(data.Value, block) - if err != nil { - logger.Info("Failed to unmarshal retrieved block from ordering service:", err) - } - reply = new(ab.DeliverResponse) - reply.Type = &ab.DeliverResponse_Block{Block: block} - err = stream.Send(reply) - if err != nil { - return fmt.Errorf("Failed to send block to the client: %s", err) - } - logger.Debugf("Sent block %v to client (prevHash: %v, messages: %v)\n", - block.Header.Number, block.Header.PreviousHash, block.Data.Data) - default: - // Return the push token if there are no messages - // available from the ordering service. - cd.tokenChan <- struct{}{} - } - } - } -} - -func (cd *clientDelivererImpl) processSeek(msg *ab.DeliverUpdate_Seek) error { - var err error - var seek, window int64 - logger.Debug("Received SEEK message") - - window = int64(msg.Seek.WindowSize) - if window <= 0 || window > int64(cd.config.General.MaxWindowSize) { - return errors.New(windowOutOfRangeError) - } - cd.window = window - logger.Debug("Requested window size set to", cd.window) - - oldestAvailable, err := cd.getOffset(int64(-2)) - if err != nil { - return err - } - newestAvailable, err := cd.getOffset(int64(-1)) - if err != nil { - return err - } - newestAvailable-- // Cause in the case of newest, the library actually gives us the seqNo of the *next* new block - - switch msg.Seek.Start { - case ab.SeekInfo_OLDEST: - seek = oldestAvailable - case ab.SeekInfo_NEWEST: - seek = newestAvailable - case ab.SeekInfo_SPECIFIED: - seek = int64(msg.Seek.SpecifiedNumber) - if !(seek >= oldestAvailable && seek <= newestAvailable) { - return errors.New(seekOutOfRangeError) - } - } - - logger.Debug("Requested seek number set to", seek) - - cd.disablePush() - if err := cd.Close(); err != nil { - return err - } - cd.lastACK = seek - 1 - logger.Debug("Set last ACK for this client's consumer to", cd.lastACK) - - cd.consumer, err = cd.consumerFunc(cd.config, seek) - if err != nil { - return err - } - - cd.enablePush(cd.window) - return nil -} - -func (cd *clientDelivererImpl) getOffset(seek int64) (int64, error) { - broker := cd.brokerFunc(cd.config) - defer broker.Close() - return broker.GetOffset(newOffsetReq(cd.config, seek)) -} - -func (cd *clientDelivererImpl) disablePush() int64 { - // No need to add a lock to ensure these operations happen atomically. - // The caller is the only function that can modify the tokenChan. - remTokens := int64(len(cd.tokenChan)) - cd.tokenChan = nil - logger.Debugf("Pushing blocks to client paused; found %v unused push token(s)", remTokens) - return remTokens -} - -func (cd *clientDelivererImpl) enablePush(newTokenCount int64) { - cd.tokenChan = make(chan struct{}, newTokenCount) - for i := int64(0); i < newTokenCount; i++ { - cd.tokenChan <- struct{}{} - } - logger.Debugf("Pushing blocks to client resumed; %v push token(s) available", newTokenCount) -} - -func (cd *clientDelivererImpl) processACK(msg *ab.DeliverUpdate_Acknowledgement) error { - logger.Debug("Received ACK for block", msg.Acknowledgement.Number) - remTokens := cd.disablePush() - newACK := int64(msg.Acknowledgement.Number) // TODO Optionally mark this offset in Kafka - if (newACK < cd.lastACK) || (newACK > cd.lastACK+cd.window) { - return errors.New(ackOutOfRangeError) - } - newTokenCount := newACK - cd.lastACK + remTokens - cd.lastACK = newACK - cd.enablePush(newTokenCount) - return nil -} diff --git a/orderer/kafka/client_deliver_mock_test.go b/orderer/kafka/client_deliver_mock_test.go deleted file mode 100644 index fc49f777a9f..00000000000 --- a/orderer/kafka/client_deliver_mock_test.go +++ /dev/null @@ -1,51 +0,0 @@ -/* -Copyright IBM Corp. 2016 All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kafka - -import ( - "testing" - - "github.com/hyperledger/fabric/orderer/localconfig" - ab "github.com/hyperledger/fabric/protos/orderer" -) - -type mockClientDelivererImpl struct { - clientDelivererImpl - t *testing.T -} - -func mockNewClientDeliverer(t *testing.T, conf *config.TopLevel, deadChan chan struct{}) Deliverer { - mockBrokerFunc := func(conf *config.TopLevel) Broker { - return mockNewBroker(t, conf) - } - mockConsumerFunc := func(conf *config.TopLevel, seek int64) (Consumer, error) { - return mockNewConsumer(t, conf, seek) - } - - return &mockClientDelivererImpl{ - clientDelivererImpl: clientDelivererImpl{ - brokerFunc: mockBrokerFunc, - consumerFunc: mockConsumerFunc, - - config: conf, - deadChan: deadChan, - errChan: make(chan error), - updChan: make(chan *ab.DeliverUpdate), - }, - t: t, - } -} diff --git a/orderer/kafka/client_deliver_test.go b/orderer/kafka/client_deliver_test.go deleted file mode 100644 index 31d5b017985..00000000000 --- a/orderer/kafka/client_deliver_test.go +++ /dev/null @@ -1,183 +0,0 @@ -/* -Copyright IBM Corp. 2016 All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kafka - -import ( - "testing" - "time" - - ab "github.com/hyperledger/fabric/protos/orderer" -) - -func TestClientDeliverSeekWrong(t *testing.T) { - t.Run("out-of-range-1", testClientDeliverSeekWrongFunc(uint64(testOldestOffset)-1, 10)) - t.Run("out-of-range-2", testClientDeliverSeekWrongFunc(uint64(testNewestOffset), 10)) - t.Run("bad-window-1", testClientDeliverSeekWrongFunc(uint64(testOldestOffset), 0)) - t.Run("bad-window-2", testClientDeliverSeekWrongFunc(uint64(testOldestOffset), uint64(testConf.General.MaxWindowSize+1))) -} - -func testClientDeliverSeekWrongFunc(seek, window uint64) func(t *testing.T) { - return func(t *testing.T) { - mds := newMockDeliverStream(t) - - dc := make(chan struct{}) - defer close(dc) // Kill the getBlocks goroutine - - mcd := mockNewClientDeliverer(t, testConf, dc) - defer testClose(t, mcd) - go func() { - if err := mcd.Deliver(mds); err == nil { - t.Fatal("Should have received an error response") - } - }() - - mds.incoming <- testNewSeekMessage("specific", seek, window) - - for { - select { - case msg := <-mds.outgoing: - switch msg.GetType().(type) { - case *ab.DeliverResponse_Error: - return // This is the success path for this test - default: - t.Fatal("Should have received an error response") - } - case <-time.After(500 * time.Millisecond): - t.Fatal("Should have received an error response") - } - } - } -} - -func TestClientDeliverSeek(t *testing.T) { - t.Run("oldest", testClientDeliverSeekFunc("oldest", 0, 10, 10)) - t.Run("in-between", testClientDeliverSeekFunc("specific", uint64(testMiddleOffset), 10, 10)) - t.Run("newest", testClientDeliverSeekFunc("newest", 0, 10, 1)) -} - -func testClientDeliverSeekFunc(label string, seek, window uint64, expected int) func(*testing.T) { - return func(t *testing.T) { - mds := newMockDeliverStream(t) - - dc := make(chan struct{}) - defer close(dc) // Kill the getBlocks goroutine - - mcd := mockNewClientDeliverer(t, testConf, dc) - defer testClose(t, mcd) - go func() { - if err := mcd.Deliver(mds); err != nil { - t.Fatal("Deliver error:", err) - } - }() - - count := 0 - mds.incoming <- testNewSeekMessage(label, seek, window) - for { - select { - case <-mds.outgoing: - count++ - if count > expected { - t.Fatalf("Delivered %d blocks to the client w/o ACK, expected %d", count, expected) - } - case <-time.After(500 * time.Millisecond): - if count != expected { - t.Fatalf("Delivered %d blocks to the client w/o ACK, expected %d", count, expected) - } - return - } - } - } -} - -func TestClientDeliverAckWrong(t *testing.T) { - t.Run("out-of-range-ack-1", testClientDeliverAckWrongFunc(uint64(testMiddleOffset)-2)) - t.Run("out-of-range-ack-2", testClientDeliverAckWrongFunc(uint64(testNewestOffset))) -} - -func testClientDeliverAckWrongFunc(ack uint64) func(t *testing.T) { - return func(t *testing.T) { - mds := newMockDeliverStream(t) - - dc := make(chan struct{}) - defer close(dc) // Kill the getBlocks goroutine - - mcd := mockNewClientDeliverer(t, testConf, dc) - defer testClose(t, mcd) - go func() { - if err := mcd.Deliver(mds); err == nil { - t.Fatal("Should have received an error response") - } - }() - - mds.incoming <- testNewSeekMessage("specific", uint64(testMiddleOffset), 10) - mds.incoming <- testNewAckMessage(ack) - for { - select { - case msg := <-mds.outgoing: - switch msg.GetType().(type) { - case *ab.DeliverResponse_Error: - return // This is the success path for this test - default: - } - case <-time.After(500 * time.Millisecond): - t.Fatal("Should have returned earlier due to wrong ACK") - } - } - } -} - -func TestClientDeliverAck(t *testing.T) { - t.Run("in-between", testClientDeliverAckFunc("specific", uint64(testMiddleOffset), 10, 10, 2*10)) - t.Run("newest", testClientDeliverAckFunc("newest", 0, 10, 1, 1)) -} - -func testClientDeliverAckFunc(label string, seek, window uint64, threshold, expected int) func(t *testing.T) { - return func(t *testing.T) { - mds := newMockDeliverStream(t) - - dc := make(chan struct{}) - defer close(dc) // Kill the getBlocks goroutine - - mcd := mockNewClientDeliverer(t, testConf, dc) - defer testClose(t, mcd) - go func() { - if err := mcd.Deliver(mds); err != nil { - t.Fatal("Deliver error:", err) - } - }() - - mds.incoming <- testNewSeekMessage(label, seek, window) - count := 0 - for { - select { - case msg := <-mds.outgoing: - count++ - if count == threshold { - mds.incoming <- testNewAckMessage(msg.GetBlock().Header.Number) - } - if count > expected { - t.Fatalf("Delivered %d blocks to the client w/o ACK, expected %d", count, expected) - } - case <-time.After(500 * time.Millisecond): - if count != expected { - t.Fatalf("Delivered %d blocks to the client w/o ACK, expected %d", count, expected) - } - return - } - } - } -} diff --git a/orderer/kafka/config_test.go b/orderer/kafka/config_test.go index 3bd460583b8..7b366d3c052 100644 --- a/orderer/kafka/config_test.go +++ b/orderer/kafka/config_test.go @@ -23,7 +23,6 @@ import ( "github.com/Shopify/sarama" "github.com/hyperledger/fabric/orderer/localconfig" cb "github.com/hyperledger/fabric/protos/common" - ab "github.com/hyperledger/fabric/protos/orderer" ) var ( @@ -70,35 +69,3 @@ func testClose(t *testing.T, x Closeable) { func newTestEnvelope(content string) *cb.Envelope { return &cb.Envelope{Payload: []byte(content)} } - -func testNewSeekMessage(startLabel string, seekNo, windowNo uint64) *ab.DeliverUpdate { - var startVal ab.SeekInfo_StartType - switch startLabel { - case "oldest": - startVal = ab.SeekInfo_OLDEST - case "newest": - startVal = ab.SeekInfo_NEWEST - default: - startVal = ab.SeekInfo_SPECIFIED - - } - return &ab.DeliverUpdate{ - Type: &ab.DeliverUpdate_Seek{ - Seek: &ab.SeekInfo{ - Start: startVal, - SpecifiedNumber: seekNo, - WindowSize: windowNo, - }, - }, - } -} - -func testNewAckMessage(ackNo uint64) *ab.DeliverUpdate { - return &ab.DeliverUpdate{ - Type: &ab.DeliverUpdate_Acknowledgement{ - Acknowledgement: &ab.Acknowledgement{ - Number: ackNo, - }, - }, - } -} diff --git a/orderer/kafka/consumer.go b/orderer/kafka/consumer.go index f1f3681c3a7..29714e89019 100644 --- a/orderer/kafka/consumer.go +++ b/orderer/kafka/consumer.go @@ -16,12 +16,9 @@ limitations under the License. package kafka -import ( - "github.com/Shopify/sarama" - "github.com/hyperledger/fabric/orderer/localconfig" -) +import "github.com/Shopify/sarama" -// Consumer allows the caller to receive a stream of messages from the orderer +// Consumer allows the caller to receive a stream of blobs from the Kafka cluster for a specific partition. type Consumer interface { Recv() <-chan *sarama.ConsumerMessage Closeable @@ -32,26 +29,31 @@ type consumerImpl struct { partition sarama.PartitionConsumer } -func newConsumer(conf *config.TopLevel, seek int64) (Consumer, error) { - parent, err := sarama.NewConsumer(conf.Kafka.Brokers, newBrokerConfig(conf)) +func newConsumer(brokers []string, kafkaVersion sarama.KafkaVersion, cp ChainPartition, offset int64) (Consumer, error) { + parent, err := sarama.NewConsumer(brokers, newBrokerConfig(kafkaVersion, rawPartition)) if err != nil { return nil, err } - partition, err := parent.ConsumePartition(conf.Kafka.Topic, conf.Kafka.PartitionID, seek) + partition, err := parent.ConsumePartition(cp.Topic(), cp.Partition(), offset) if err != nil { return nil, err } - c := &consumerImpl{parent: parent, partition: partition} - logger.Debug("Created new consumer for client beginning from block", seek) + c := &consumerImpl{ + parent: parent, + partition: partition, + } + logger.Debugf("Created new consumer for session (partition %s, beginning offset %d)", cp, offset) return c, nil } -// Recv returns a channel with messages received from the orderer +// Recv returns a channel with blobs received from the Kafka cluster for a partition. func (c *consumerImpl) Recv() <-chan *sarama.ConsumerMessage { return c.partition.Messages() } -// Close shuts down the partition consumer +// Close shuts down the partition consumer. +// Invoked by the session deliverer's Close method, which is itself called +// during the processSeek function, between disabling and enabling the push. func (c *consumerImpl) Close() error { if err := c.partition.Close(); err != nil { return err diff --git a/orderer/kafka/consumer_mock_test.go b/orderer/kafka/consumer_mock_test.go index 1e1207f2ecc..69855aa9c29 100644 --- a/orderer/kafka/consumer_mock_test.go +++ b/orderer/kafka/consumer_mock_test.go @@ -18,57 +18,70 @@ package kafka import ( "fmt" - "strconv" "testing" + "time" - "github.com/hyperledger/fabric/orderer/localconfig" - cb "github.com/hyperledger/fabric/protos/common" + ab "github.com/hyperledger/fabric/protos/orderer" + "github.com/hyperledger/fabric/protos/utils" "github.com/Shopify/sarama" "github.com/Shopify/sarama/mocks" - "github.com/golang/protobuf/proto" ) type mockConsumerImpl struct { consumedOffset int64 - parent *mocks.Consumer - partMgr *mocks.PartitionConsumer - partition sarama.PartitionConsumer - topic string - t *testing.T + chainPartition ChainPartition + + parentConsumer *mocks.Consumer + chainPartitionManager *mocks.PartitionConsumer + chainPartitionConsumer sarama.PartitionConsumer + disk chan *ab.KafkaMessage + isSetup chan struct{} + t *testing.T } -func mockNewConsumer(t *testing.T, conf *config.TopLevel, seek int64) (Consumer, error) { +func mockNewConsumer(t *testing.T, cp ChainPartition, offset int64, disk chan *ab.KafkaMessage) (Consumer, error) { var err error - parent := mocks.NewConsumer(t, nil) + parentConsumer := mocks.NewConsumer(t, nil) // NOTE The seek flag seems to be useless here. // The mock partition will have its highWatermarkOffset // initialized to 0 no matter what. I've opened up an issue // in the sarama repo: https://github.com/Shopify/sarama/issues/745 // Until this is resolved, use the testFillWithBlocks() hack below. - partMgr := parent.ExpectConsumePartition(conf.Kafka.Topic, conf.Kafka.PartitionID, seek) - partition, err := parent.ConsumePartition(conf.Kafka.Topic, conf.Kafka.PartitionID, seek) + cpManager := parentConsumer.ExpectConsumePartition(cp.Topic(), cp.Partition(), offset) + cpConsumer, err := parentConsumer.ConsumePartition(cp.Topic(), cp.Partition(), offset) // mockNewConsumer is basically a helper function when testing. // Any errors it generates internally, should result in panic // and not get propagated further; checking its errors in the // calling functions (i.e. the actual tests) increases boilerplate. if err != nil { - t.Fatal("Cannot create partition consumer:", err) + t.Fatal("Cannot create mock partition consumer:", err) } mc := &mockConsumerImpl{ consumedOffset: 0, - parent: parent, - partMgr: partMgr, - partition: partition, - topic: conf.Kafka.Topic, - t: t, + chainPartition: cp, + + parentConsumer: parentConsumer, + chainPartitionManager: cpManager, + chainPartitionConsumer: cpConsumer, + disk: disk, + isSetup: make(chan struct{}), + t: t, + } + // Stop-gap hack until sarama issue #745 is resolved: + if offset >= testOldestOffset && offset <= (testNewestOffset-1) { + mc.testFillWithBlocks(offset - 1) // Prepare the consumer so that the next Recv gives you blob #offset + } else { + err = fmt.Errorf("Out of range offset (seek number) given to consumer: %d", offset) + return mc, err } - // Stop-gap hack until #745 is resolved: - if seek >= testOldestOffset && seek <= (testNewestOffset-1) { - mc.testFillWithBlocks(seek - 1) // Prepare the consumer so that the next Recv gives you block "seek" + + if mc.consumedOffset == offset-1 { + close(mc.isSetup) } else { - err = fmt.Errorf("Out of range seek number given to consumer") + mc.t.Fatal("Mock consumer failed to initialize itself properly") } + return mc, err } @@ -76,42 +89,46 @@ func (mc *mockConsumerImpl) Recv() <-chan *sarama.ConsumerMessage { if mc.consumedOffset >= testNewestOffset-1 { return nil } - mc.consumedOffset++ - mc.partMgr.YieldMessage(testNewConsumerMessage(mc.consumedOffset, mc.topic)) - return mc.partition.Messages() + + // This is useful in cases where we want to <-Recv() in a for/select loop in + // a non-blocking manner. Without the timeout, the Go runtime will always + // execute the body of the Recv() method. If there in no outgoing message + // available, it will block while waiting on mc.disk. All the other cases in + // the original for/select loop then won't be evaluated until we unblock on + // <-mc.disk (which may never happen). + select { + case <-time.After(testTimePadding / 2): + case outgoingMsg := <-mc.disk: + mc.consumedOffset++ + mc.chainPartitionManager.YieldMessage(testNewConsumerMessage(mc.chainPartition, mc.consumedOffset, outgoingMsg)) + return mc.chainPartitionConsumer.Messages() + } + + return nil } func (mc *mockConsumerImpl) Close() error { - if err := mc.partition.Close(); err != nil { + if err := mc.chainPartitionManager.Close(); err != nil { return err } - return mc.parent.Close() + return mc.parentConsumer.Close() } -func (mc *mockConsumerImpl) testFillWithBlocks(seek int64) { - for i := int64(1); i <= seek; i++ { +func (mc *mockConsumerImpl) testFillWithBlocks(offset int64) { + for i := int64(1); i <= offset; i++ { + go func() { + mc.disk <- newRegularMessage(utils.MarshalOrPanic(newTestEnvelope(fmt.Sprintf("consumer fill-in %d", i)))) + }() <-mc.Recv() } + return } -func testNewConsumerMessage(offset int64, topic string) *sarama.ConsumerMessage { - blockData := &cb.BlockData{ - Data: [][]byte{[]byte(strconv.FormatInt(offset, 10))}, - } - block := &cb.Block{ - Header: &cb.BlockHeader{ - Number: uint64(offset), - }, - Data: blockData, - } - - data, err := proto.Marshal(block) - if err != nil { - panic("Error marshaling block") - } - +func testNewConsumerMessage(cp ChainPartition, offset int64, kafkaMessage *ab.KafkaMessage) *sarama.ConsumerMessage { return &sarama.ConsumerMessage{ - Value: sarama.ByteEncoder(data), - Topic: topic, + Value: sarama.ByteEncoder(utils.MarshalOrPanic(kafkaMessage)), + Topic: cp.Topic(), + Partition: cp.Partition(), + Offset: offset, } } diff --git a/orderer/kafka/consumer_test.go b/orderer/kafka/consumer_test.go index 737a5c3bb4d..9cab8731f9c 100644 --- a/orderer/kafka/consumer_test.go +++ b/orderer/kafka/consumer_test.go @@ -16,13 +16,18 @@ limitations under the License. package kafka -import "testing" +import ( + "testing" + + "github.com/hyperledger/fabric/orderer/common/bootstrap/provisional" + ab "github.com/hyperledger/fabric/protos/orderer" +) func TestConsumerInitWrong(t *testing.T) { cases := []int64{testOldestOffset - 1, testNewestOffset} - for _, seek := range cases { - mc, err := mockNewConsumer(t, testConf, seek) + for _, offset := range cases { + mc, err := mockNewConsumer(t, newChainPartition(provisional.TestChainID, rawPartition), offset, make(chan *ab.KafkaMessage)) testClose(t, mc) if err == nil { t.Fatal("Consumer should have failed with out-of-range error") @@ -37,18 +42,23 @@ func TestConsumerRecv(t *testing.T) { } func testConsumerRecvFunc(given, expected int64) func(t *testing.T) { + disk := make(chan *ab.KafkaMessage) return func(t *testing.T) { - mc, err := mockNewConsumer(t, testConf, given) + cp := newChainPartition(provisional.TestChainID, rawPartition) + mc, err := mockNewConsumer(t, cp, given, disk) if err != nil { testClose(t, mc) - t.Fatalf("Consumer should have proceeded normally: %s", err) + t.Fatal("Consumer should have proceeded normally:", err) } + go func() { + disk <- newRegularMessage([]byte("foo")) + }() msg := <-mc.Recv() - if (msg.Topic != testConf.Kafka.Topic) || - msg.Partition != testConf.Kafka.PartitionID || + if (msg.Topic != cp.Topic()) || + msg.Partition != cp.Partition() || msg.Offset != mc.(*mockConsumerImpl).consumedOffset || msg.Offset != expected { - t.Fatalf("Expected block %d, got %d", expected, msg.Offset) + t.Fatalf("Expected message with offset %d, got %d", expected, msg.Offset) } testClose(t, mc) } diff --git a/orderer/kafka/deliver.go b/orderer/kafka/deliver.go deleted file mode 100644 index 1a8a07aefed..00000000000 --- a/orderer/kafka/deliver.go +++ /dev/null @@ -1,64 +0,0 @@ -/* -Copyright IBM Corp. 2016 All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kafka - -import ( - "sync" - - "github.com/hyperledger/fabric/orderer/localconfig" - ab "github.com/hyperledger/fabric/protos/orderer" -) - -// Deliverer allows the caller to receive blocks from the orderer -type Deliverer interface { - Deliver(stream ab.AtomicBroadcast_DeliverServer) error - Closeable -} - -type delivererImpl struct { - config *config.TopLevel - deadChan chan struct{} - wg sync.WaitGroup -} - -func newDeliverer(conf *config.TopLevel) Deliverer { - return &delivererImpl{ - config: conf, - deadChan: make(chan struct{}), - } -} - -// Deliver receives updates from connected clients and adjusts -// the transmission of ordered messages to them accordingly -func (d *delivererImpl) Deliver(stream ab.AtomicBroadcast_DeliverServer) error { - cd := newClientDeliverer(d.config, d.deadChan) - - d.wg.Add(1) - defer d.wg.Done() - - defer cd.Close() - return cd.Deliver(stream) -} - -// Close shuts down the delivery side of the orderer -func (d *delivererImpl) Close() error { - close(d.deadChan) - // Wait till all the client-deliverer consumers have closed - // Note that their recvReplies goroutines keep on going - d.wg.Wait() - return nil -} diff --git a/orderer/kafka/deliver_mock_test.go b/orderer/kafka/deliver_mock_test.go deleted file mode 100644 index d1751be17a6..00000000000 --- a/orderer/kafka/deliver_mock_test.go +++ /dev/null @@ -1,50 +0,0 @@ -/* -Copyright IBM Corp. 2016 All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kafka - -import ( - "testing" - - "github.com/hyperledger/fabric/orderer/localconfig" - ab "github.com/hyperledger/fabric/protos/orderer" -) - -type mockDelivererImpl struct { - delivererImpl - t *testing.T -} - -func mockNewDeliverer(t *testing.T, conf *config.TopLevel) Deliverer { - md := &mockDelivererImpl{ - delivererImpl: delivererImpl{ - config: conf, - deadChan: make(chan struct{}), - }, - t: t, - } - return md -} - -func (md *mockDelivererImpl) Deliver(stream ab.AtomicBroadcast_DeliverServer) error { - mcd := mockNewClientDeliverer(md.t, md.config, md.deadChan) - - md.wg.Add(1) - defer md.wg.Done() - - defer mcd.Close() - return mcd.Deliver(stream) -} diff --git a/orderer/kafka/deliver_test.go b/orderer/kafka/deliver_test.go deleted file mode 100644 index 34b84d437f9..00000000000 --- a/orderer/kafka/deliver_test.go +++ /dev/null @@ -1,94 +0,0 @@ -/* -Copyright IBM Corp. 2016 All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kafka - -import ( - "testing" - "time" -) - -func TestDeliverMultipleClients(t *testing.T) { - connectedClients := 3 - seekMsgs := []struct { - start string - seek, window uint64 - }{ - {"oldest", 0, 10}, {"newest", 0, 10}, {"specific", uint64(testMiddleOffset), 10}, - } - expected := 21 // 10 + 1 + 10 - - md := mockNewDeliverer(t, testConf) - defer testClose(t, md) - - var mds []*mockDeliverStream - for i := 0; i < connectedClients; i++ { - mds = append(mds, newMockDeliverStream(t)) - go func() { - if err := md.Deliver(mds[i]); err != nil { - t.Fatal("Deliver error:", err) - } - }() - mds[i].incoming <- testNewSeekMessage(seekMsgs[i].start, seekMsgs[i].seek, seekMsgs[i].window) - } - - count := 0 - - for i := 0; i < connectedClients; i++ { - client: - for { - select { - case <-mds[i].outgoing: - count++ - case <-time.After(500 * time.Millisecond): - break client - } - } - } - - if count != expected { - t.Fatalf("Expected %d blocks total delivered to all clients, got %d", expected, count) - } -} - -func TestDeliverClose(t *testing.T) { - errChan := make(chan error) - - md := mockNewDeliverer(t, testConf) - mds := newMockDeliverStream(t) - go func() { - if err := md.Deliver(mds); err != nil { - t.Fatal("Deliver error:", err) - } - }() - - go func() { - errChan <- md.Close() - }() - - for { - select { - case err := <-errChan: - if err != nil { - t.Fatal("Error when closing the deliverer:", err) - } - return - case <-time.After(500 * time.Millisecond): - t.Fatal("Deliverer should have closed all client deliverers by now") - } - } - -} diff --git a/orderer/kafka/main.go b/orderer/kafka/main.go new file mode 100644 index 00000000000..0b4c2e05663 --- /dev/null +++ b/orderer/kafka/main.go @@ -0,0 +1,246 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kafka + +import ( + "github.com/Shopify/sarama" + "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric/orderer/localconfig" + "github.com/hyperledger/fabric/orderer/multichain" + cb "github.com/hyperledger/fabric/protos/common" + ab "github.com/hyperledger/fabric/protos/orderer" + "github.com/hyperledger/fabric/protos/utils" +) + +// New creates a Kafka-backed consenter. Called by orderer's main.go. +func New(kv sarama.KafkaVersion, ro config.Retry) multichain.Consenter { + return newConsenter(kv, ro, bfValue, pfValue, cfValue) +} + +// New calls here because we need to pass additional arguments to +// the constructor and New() should only read from the config file. +func newConsenter(kv sarama.KafkaVersion, ro config.Retry, bf bfType, pf pfType, cf cfType) multichain.Consenter { + return &consenterImpl{kv, ro, bf, pf, cf} +} + +// bfType defines the signature of the broker constructor. +type bfType func([]string, ChainPartition) (Broker, error) + +// pfType defines the signature of the producer constructor. +type pfType func([]string, sarama.KafkaVersion, config.Retry) Producer + +// cfType defines the signature of the consumer constructor. +type cfType func([]string, sarama.KafkaVersion, ChainPartition, int64) (Consumer, error) + +// bfValue holds the value for the broker constructor that's used in the non-test case. +var bfValue = func(brokers []string, cp ChainPartition) (Broker, error) { + return newBroker(brokers, cp) +} + +// pfValue holds the value for the producer constructor that's used in the non-test case. +var pfValue = func(brokers []string, kafkaVersion sarama.KafkaVersion, retryOptions config.Retry) Producer { + return newProducer(brokers, kafkaVersion, retryOptions) +} + +// cfValue holds the value for the consumer constructor that's used in the non-test case. +var cfValue = func(brokers []string, kafkaVersion sarama.KafkaVersion, cp ChainPartition, offset int64) (Consumer, error) { + return newConsumer(brokers, kafkaVersion, cp, offset) +} + +// consenterImpl holds the implementation of type that satisfies the +// multichain.Consenter and testableConsenter interfaces. The former +// is needed because that is what the HandleChain contract requires. +// The latter is needed for testing. +type consenterImpl struct { + kv sarama.KafkaVersion + ro config.Retry + bf bfType + pf pfType + cf cfType +} + +// HandleChain creates/returns a reference to a Chain for the given set of support resources. +// Implements the multichain.Consenter interface. Called by multichain.newChainSupport(), which +// is itself called by multichain.NewManagerImpl() when ranging over the ledgerFactory's existingChains. +func (co *consenterImpl) HandleChain(cs multichain.ConsenterSupport) (multichain.Chain, error) { + return newChain(co, cs), nil +} + +// When testing we need to inject our own broker/producer/consumer. +// Therefore we need to (a) hold a reference to an object that stores +// the broker/producer/consumer constructors, and (b) refer to that +// object via its interface type, so that we can use a different +// implementation when testing. This, in turn, calls for (c) —- the +// definition of an interface (see testableConsenter below) that will +// be satisfied by both the actual and the mock object and will allow +// us to retrieve these constructors. +func newChain(consenter testableConsenter, support multichain.ConsenterSupport) *chainImpl { + return &chainImpl{ + consenter: consenter, + support: support, + partition: newChainPartition(support.ChainID(), rawPartition), + lastProcessed: sarama.OffsetOldest - 1, // TODO This should be retrieved by ConsenterSupport; also see note in loop() below + producer: consenter.prodFunc()(support.SharedConfig().KafkaBrokers(), consenter.kafkaVersion(), consenter.retryOptions()), + halted: false, // Redundant as the default value for booleans is false but added for readability + exitChan: make(chan struct{}), + haltedChan: make(chan struct{}), + } +} + +// Satisfied by both chainImpl consenterImpl and mockConsenterImpl. +// Defined so as to facilitate testing. +type testableConsenter interface { + kafkaVersion() sarama.KafkaVersion + retryOptions() config.Retry + brokFunc() bfType + prodFunc() pfType + consFunc() cfType +} + +func (co *consenterImpl) kafkaVersion() sarama.KafkaVersion { return co.kv } +func (co *consenterImpl) retryOptions() config.Retry { return co.ro } +func (co *consenterImpl) brokFunc() bfType { return co.bf } +func (co *consenterImpl) prodFunc() pfType { return co.pf } +func (co *consenterImpl) consFunc() cfType { return co.cf } + +type chainImpl struct { + consenter testableConsenter + support multichain.ConsenterSupport + + partition ChainPartition + lastProcessed int64 + + producer Producer + consumer Consumer + + halted bool // For the Enqueue() calls + exitChan chan struct{} // For the Chain's Halt() method + + haltedChan chan struct{} // Hook for testing +} + +// Start allocates the necessary resources for staying up to date with this Chain. +// Implements the multichain.Chain interface. Called by multichain.NewManagerImpl() +// which is invoked when the ordering process is launched, before the call to NewServer(). +func (ch *chainImpl) Start() { + // 1. Post the CONNECT message to prevent panicking that occurs + // when seeking on a partition that hasn't been created yet. + logger.Debug("Posting the CONNECT message...") + if err := ch.producer.Send(ch.partition, utils.MarshalOrPanic(newConnectMessage())); err != nil { + logger.Criticalf("Couldn't post CONNECT message to %s: %s", ch.partition, err) + close(ch.exitChan) + ch.halted = true + return + } + + // 2. Set up the listener/consumer for this partition. + // TODO When restart support gets added to the common components level, start + // the consumer from lastProcessed. For now, hard-code to oldest available. + consumer, err := ch.consenter.consFunc()(ch.support.SharedConfig().KafkaBrokers(), ch.consenter.kafkaVersion(), ch.partition, ch.lastProcessed+1) + if err != nil { + logger.Criticalf("Cannot retrieve required offset from Kafka cluster for chain %s: %s", ch.partition, err) + close(ch.exitChan) + ch.halted = true + return + } + ch.consumer = consumer + + // 3. Set the loop the keep up to date with the chain. + go ch.loop() +} + +// Halt frees the resources which were allocated for this Chain. +// Implements the multichain.Chain interface. +func (ch *chainImpl) Halt() { + select { + case <-ch.exitChan: + // This construct is useful because it allows Halt() to be + // called multiple times w/o panicking. Recal that a receive + // from a closed channel returns (the zero value) immediately. + default: + close(ch.exitChan) + } +} + +// Enqueue accepts a message and returns true on acceptance, or false on shutdown. +// Implements the multichain.Chain interface. Called by the drainQueue goroutine, +// which is spawned when the broadcast handler's Handle() function is invoked. +func (ch *chainImpl) Enqueue(env *cb.Envelope) bool { + if ch.halted { + return false + } + + logger.Debug("Enqueueing:", env) + if err := ch.producer.Send(ch.partition, utils.MarshalOrPanic(newRegularMessage(utils.MarshalOrPanic(env)))); err != nil { + logger.Errorf("Couldn't post to %s: %s", ch.partition, err) + return false + } + + return !ch.halted // If ch.halted has been set to true while sending, we should return false +} + +func (ch *chainImpl) loop() { + msg := new(ab.KafkaMessage) + + defer close(ch.haltedChan) + defer ch.producer.Close() + defer func() { ch.halted = true }() + defer ch.consumer.Close() + + // TODO Add support for time-based block cutting + + for { + select { + case in := <-ch.consumer.Recv(): + logger.Debug("Received:", in) + if err := proto.Unmarshal(in.Value, msg); err != nil { + // This shouldn't happen, it should be filtered at ingress + logger.Critical("Unable to unmarshal consumed message:", err) + } + logger.Debug("Unmarshaled to:", msg) + switch msg.Type.(type) { + case *ab.KafkaMessage_Connect, *ab.KafkaMessage_TimeToCut: + logger.Debugf("Ignoring message") + continue + case *ab.KafkaMessage_Regular: + env := new(cb.Envelope) + if err := proto.Unmarshal(msg.GetRegular().Payload, env); err != nil { + // This shouldn't happen, it should be filtered at ingress + logger.Critical("Unable to unmarshal consumed message:", err) + continue + } + batches, committers, ok := ch.support.BlockCutter().Ordered(env) + logger.Debugf("Ordering results: batches: %v, ok: %v", batches, ok) + if ok && len(batches) == 0 { + continue + } + // If !ok, batches == nil, so this will be skipped + for i, batch := range batches { + ch.support.WriteBlock(batch, nil, committers[i]) + } + } + case <-ch.exitChan: // when Halt() is called + logger.Infof("Consenter for chain %s exiting", ch.partition.Topic()) + return + } + } +} + +// Closeable allows the shut down of the calling resource. +type Closeable interface { + Close() error +} diff --git a/orderer/kafka/main_test.go b/orderer/kafka/main_test.go new file mode 100644 index 00000000000..c1b773d0616 --- /dev/null +++ b/orderer/kafka/main_test.go @@ -0,0 +1,215 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kafka + +import ( + "sync" + "testing" + "time" + + "github.com/Shopify/sarama" + "github.com/hyperledger/fabric/orderer/common/bootstrap/provisional" + "github.com/hyperledger/fabric/orderer/localconfig" + mockblockcutter "github.com/hyperledger/fabric/orderer/mocks/blockcutter" + mockmultichain "github.com/hyperledger/fabric/orderer/mocks/multichain" + mocksharedconfig "github.com/hyperledger/fabric/orderer/mocks/sharedconfig" + "github.com/hyperledger/fabric/orderer/multichain" + cb "github.com/hyperledger/fabric/protos/common" + ab "github.com/hyperledger/fabric/protos/orderer" +) + +var cp = newChainPartition(provisional.TestChainID, rawPartition) + +func newMockSharedConfigManager() *mocksharedconfig.Manager { + return &mocksharedconfig.Manager{KafkaBrokersVal: testConf.Kafka.Brokers} +} + +func syncQueueMessage(msg *cb.Envelope, chain multichain.Chain, bc *mockblockcutter.Receiver) { + chain.Enqueue(msg) + bc.Block <- struct{}{} +} + +type mockConsenterImpl struct { + consenterImpl + prodDisk, consDisk chan *ab.KafkaMessage + consumerSetUp bool + t *testing.T +} + +func mockNewConsenter(t *testing.T, kafkaVersion sarama.KafkaVersion, retryOptions config.Retry) *mockConsenterImpl { + prodDisk := make(chan *ab.KafkaMessage) + consDisk := make(chan *ab.KafkaMessage) + + mockBfValue := func(brokers []string, cp ChainPartition) (Broker, error) { + return mockNewBroker(t, cp) + } + mockPfValue := func(brokers []string, kafkaVersion sarama.KafkaVersion, retryOptions config.Retry) Producer { + return mockNewProducer(t, cp, testOldestOffset, prodDisk) + } + mockCfValue := func(brokers []string, kafkaVersion sarama.KafkaVersion, cp ChainPartition, offset int64) (Consumer, error) { + return mockNewConsumer(t, cp, offset, consDisk) + } + + return &mockConsenterImpl{ + consenterImpl: consenterImpl{ + kv: kafkaVersion, + ro: retryOptions, + bf: mockBfValue, + pf: mockPfValue, + cf: mockCfValue, + }, + prodDisk: prodDisk, + consDisk: consDisk, + t: t, + } +} + +func TestKafkaConsenterEmptyBatch(t *testing.T) { + var wg sync.WaitGroup + defer wg.Wait() + + cs := &mockmultichain.ConsenterSupport{ + Batches: make(chan []*cb.Envelope), + BlockCutterVal: mockblockcutter.NewReceiver(), + ChainIDVal: provisional.TestChainID, + SharedConfigVal: newMockSharedConfigManager(), + } + defer close(cs.BlockCutterVal.Block) + + co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry) + ch := newChain(co, cs) + ch.lastProcessed = testOldestOffset - 1 + + go ch.Start() + defer ch.Halt() + + wg.Add(1) + go func() { + defer wg.Done() + // Wait until the mock producer is done before messing around with its disk + select { + case <-ch.producer.(*mockProducerImpl).isSetup: + // Dispense the CONNECT message that is posted with Start() + <-co.prodDisk + case <-time.After(testTimePadding): + t.Fatal("Mock producer not setup in time") + } + }() + wg.Wait() + + wg.Add(1) + go func() { + defer wg.Done() + // Pick up the message that will be posted via the syncQueueMessage/Enqueue call below + msg := <-co.prodDisk + // Place it to the right location so that the mockConsumer can read it + co.consDisk <- msg + }() + + syncQueueMessage(newTestEnvelope("one"), ch, cs.BlockCutterVal) + // The message has already been moved to the consumer's disk, + // otherwise syncQueueMessage wouldn't return, so the Wait() + // here is unnecessary but let's be paranoid. + wg.Wait() + + // Stop the loop + ch.Halt() + + select { + case <-cs.Batches: + t.Fatal("Expected no invocations of Append") + case <-ch.haltedChan: // If we're here, we definitely had a chance to invoke Append but didn't (which is great) + } +} + +func TestKafkaConsenterConfigStyleMultiBatch(t *testing.T) { + var wg sync.WaitGroup + defer wg.Wait() + + cs := &mockmultichain.ConsenterSupport{ + Batches: make(chan []*cb.Envelope), + BlockCutterVal: mockblockcutter.NewReceiver(), + ChainIDVal: provisional.TestChainID, + SharedConfigVal: newMockSharedConfigManager(), + } + defer close(cs.BlockCutterVal.Block) + + co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry) + ch := newChain(co, cs) + ch.lastProcessed = testOldestOffset - 1 + + go ch.Start() + defer ch.Halt() + + wg.Add(1) + go func() { + defer wg.Done() + // Wait until the mock producer is done before messing around with its disk + select { + case <-ch.producer.(*mockProducerImpl).isSetup: + // Dispense the CONNECT message that is posted with Start() + <-co.prodDisk + case <-time.After(testTimePadding): + t.Fatal("Mock producer not setup in time") + } + }() + wg.Wait() + + wg.Add(1) + go func() { + defer wg.Done() + // Pick up the message that will be posted via the syncQueueMessage/Enqueue call below + msg := <-co.prodDisk + // Place it to the right location so that the mockConsumer can read it + co.consDisk <- msg + }() + syncQueueMessage(newTestEnvelope("one"), ch, cs.BlockCutterVal) + wg.Wait() + + cs.BlockCutterVal.IsolatedTx = true + + wg.Add(1) + go func() { + defer wg.Done() + // Pick up the message that will be posted via the syncQueueMessage/Enqueue call below + msg := <-co.prodDisk + // Place it to the right location so that the mockConsumer can read it + co.consDisk <- msg + }() + syncQueueMessage(newTestEnvelope("two"), ch, cs.BlockCutterVal) + wg.Wait() + + ch.Halt() + + select { + case <-cs.Batches: + case <-time.After(testTimePadding): + t.Fatal("Expected two blocks to be cut but never got the first") + } + + select { + case <-cs.Batches: + case <-time.After(testTimePadding): + t.Fatal("Expected the config type tx to create two blocks, but only got the first") + } + + select { + case <-time.After(testTimePadding): + t.Fatal("Should have exited") + case <-ch.haltedChan: + } +} diff --git a/orderer/kafka/orderer.go b/orderer/kafka/orderer.go deleted file mode 100644 index b67428ee6af..00000000000 --- a/orderer/kafka/orderer.go +++ /dev/null @@ -1,63 +0,0 @@ -/* -Copyright IBM Corp. 2016 All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kafka - -import ( - "github.com/hyperledger/fabric/orderer/localconfig" - ab "github.com/hyperledger/fabric/protos/orderer" -) - -// Orderer allows the caller to submit to and receive messages from the orderer -type Orderer interface { - Broadcast(stream ab.AtomicBroadcast_BroadcastServer) error - Deliver(stream ab.AtomicBroadcast_DeliverServer) error - Teardown() error -} - -// Closeable allows the shut down of the calling resource -type Closeable interface { - Close() error -} - -type serverImpl struct { - broadcaster Broadcaster - deliverer Deliverer -} - -// New creates a new orderer -func New(conf *config.TopLevel) Orderer { - return &serverImpl{ - broadcaster: newBroadcaster(conf), - deliverer: newDeliverer(conf), - } -} - -// Broadcast submits messages for ordering -func (s *serverImpl) Broadcast(stream ab.AtomicBroadcast_BroadcastServer) error { - return s.broadcaster.Broadcast(stream) -} - -// Deliver returns a stream of ordered messages -func (s *serverImpl) Deliver(stream ab.AtomicBroadcast_DeliverServer) error { - return s.deliverer.Deliver(stream) -} - -// Teardown shuts down the orderer -func (s *serverImpl) Teardown() error { - s.deliverer.Close() - return s.broadcaster.Close() -} diff --git a/orderer/kafka/orderer_mock_test.go b/orderer/kafka/orderer_mock_test.go deleted file mode 100644 index 129cec0caaf..00000000000 --- a/orderer/kafka/orderer_mock_test.go +++ /dev/null @@ -1,100 +0,0 @@ -/* -Copyright IBM Corp. 2016 All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kafka - -import ( - "testing" - - "github.com/hyperledger/fabric/orderer/localconfig" - cb "github.com/hyperledger/fabric/protos/common" - ab "github.com/hyperledger/fabric/protos/orderer" - "google.golang.org/grpc" -) - -func mockNew(t *testing.T, conf *config.TopLevel, disk chan []byte) Orderer { - return &serverImpl{ - broadcaster: mockNewBroadcaster(t, conf, testOldestOffset, disk), - deliverer: mockNewDeliverer(t, conf), - } -} - -type mockBroadcastStream struct { - grpc.ServerStream - incoming chan *cb.Envelope - outgoing chan *ab.BroadcastResponse - t *testing.T - closed bool // Set to true if the outgoing channel is closed -} - -func newMockBroadcastStream(t *testing.T) *mockBroadcastStream { - return &mockBroadcastStream{ - incoming: make(chan *cb.Envelope), - outgoing: make(chan *ab.BroadcastResponse), - t: t, - } -} - -func (mbs *mockBroadcastStream) Recv() (*cb.Envelope, error) { - return <-mbs.incoming, nil -} - -func (mbs *mockBroadcastStream) Send(reply *ab.BroadcastResponse) error { - if !mbs.closed { - mbs.outgoing <- reply - } - return nil -} - -func (mbs *mockBroadcastStream) CloseOut() bool { - close(mbs.outgoing) - mbs.closed = true - return mbs.closed -} - -type mockDeliverStream struct { - grpc.ServerStream - incoming chan *ab.DeliverUpdate - outgoing chan *ab.DeliverResponse - t *testing.T - closed bool -} - -func newMockDeliverStream(t *testing.T) *mockDeliverStream { - return &mockDeliverStream{ - incoming: make(chan *ab.DeliverUpdate), - outgoing: make(chan *ab.DeliverResponse), - t: t, - } -} - -func (mds *mockDeliverStream) Recv() (*ab.DeliverUpdate, error) { - return <-mds.incoming, nil - -} - -func (mds *mockDeliverStream) Send(reply *ab.DeliverResponse) error { - if !mds.closed { - mds.outgoing <- reply - } - return nil -} - -func (mds *mockDeliverStream) CloseOut() bool { - close(mds.outgoing) - mds.closed = true - return mds.closed -} diff --git a/orderer/kafka/producer.go b/orderer/kafka/producer.go index 2e46c332d59..3cc3a727c4a 100644 --- a/orderer/kafka/producer.go +++ b/orderer/kafka/producer.go @@ -24,24 +24,23 @@ import ( "github.com/hyperledger/fabric/orderer/localconfig" ) -// Producer allows the caller to send blocks to the orderer +// Producer allows the caller to post blobs to a chain partition on the Kafka cluster. type Producer interface { - Send(payload []byte) error + Send(cp ChainPartition, payload []byte) error Closeable } type producerImpl struct { producer sarama.SyncProducer - topic string } -func newProducer(conf *config.TopLevel) Producer { - brokerConfig := newBrokerConfig(conf) +func newProducer(brokers []string, kafkaVersion sarama.KafkaVersion, retryOptions config.Retry) Producer { var p sarama.SyncProducer var err error + brokerConfig := newBrokerConfig(kafkaVersion, rawPartition) - repeatTick := time.NewTicker(conf.Kafka.Retry.Period) - panicTick := time.NewTicker(conf.Kafka.Retry.Stop) + repeatTick := time.NewTicker(retryOptions.Period) + panicTick := time.NewTicker(retryOptions.Stop) defer repeatTick.Stop() defer panicTick.Stop() @@ -51,28 +50,34 @@ loop: case <-panicTick.C: panic(fmt.Errorf("Failed to create Kafka producer: %v", err)) case <-repeatTick.C: - logger.Debug("Connecting to Kafka brokers:", conf.Kafka.Brokers) - p, err = sarama.NewSyncProducer(conf.Kafka.Brokers, brokerConfig) + logger.Debug("Connecting to Kafka cluster:", brokers) + p, err = sarama.NewSyncProducer(brokers, brokerConfig) if err == nil { break loop } } } - logger.Debug("Connected to Kafka brokers") - return &producerImpl{producer: p, topic: conf.Kafka.Topic} + logger.Debug("Connected to the Kafka cluster") + return &producerImpl{producer: p} } +// Close shuts down the Producer component of the orderer. func (p *producerImpl) Close() error { return p.producer.Close() } -func (p *producerImpl) Send(payload []byte) error { - _, offset, err := p.producer.SendMessage(newMsg(payload, p.topic)) +// Send posts a blob to a chain partition on the Kafka cluster. +func (p *producerImpl) Send(cp ChainPartition, payload []byte) error { + prt, ofs, err := p.producer.SendMessage(newProducerMessage(cp, payload)) + if prt != cp.Partition() { + // If this happens, something's up with the partitioner + logger.Warningf("Blob destined for partition %d, but posted to %d instead", cp.Partition(), prt) + } if err == nil { - logger.Debugf("Forwarded block %v to ordering service", offset) + logger.Debugf("Forwarded blob with offset number %d to chain partition %s on the Kafka cluster", ofs, cp) } else { - logger.Info("Failed to send to Kafka brokers:", err) + logger.Infof("Failed to send message to chain partition %s on the Kafka cluster: %s", cp, err) } return err } diff --git a/orderer/kafka/producer_mock_test.go b/orderer/kafka/producer_mock_test.go index b567661cb57..d07069092c2 100644 --- a/orderer/kafka/producer_mock_test.go +++ b/orderer/kafka/producer_mock_test.go @@ -21,46 +21,59 @@ import ( "testing" "github.com/Shopify/sarama/mocks" - "github.com/hyperledger/fabric/orderer/localconfig" + "github.com/golang/protobuf/proto" + ab "github.com/hyperledger/fabric/protos/orderer" + "github.com/hyperledger/fabric/protos/utils" ) type mockProducerImpl struct { - config *config.TopLevel producer *mocks.SyncProducer + checker mocks.ValueChecker - checker mocks.ValueChecker - disk chan []byte // This simulates the broker's "disk" where the producer's messages eventually end up + // This simulates the broker's "disk" where the producer's + // blobs for a certain chain partition eventually end up. + disk chan *ab.KafkaMessage producedOffset int64 + isSetup chan struct{} t *testing.T } -func mockNewProducer(t *testing.T, conf *config.TopLevel, seek int64, disk chan []byte) Producer { +// Create a new producer whose next "Send" on ChainPartition gives you blob #offset. +func mockNewProducer(t *testing.T, cp ChainPartition, offset int64, disk chan *ab.KafkaMessage) Producer { mp := &mockProducerImpl{ - config: conf, producer: mocks.NewSyncProducer(t, nil), checker: nil, disk: disk, producedOffset: 0, + isSetup: make(chan struct{}), t: t, } - if seek >= testOldestOffset && seek <= (testNewestOffset-1) { - mp.testFillWithBlocks(seek - 1) // Prepare the producer so that the next Send gives you block "seek" + mp.init(cp, offset) + + if mp.producedOffset == offset-1 { + close(mp.isSetup) } else { - panic(fmt.Errorf("Out of range seek number given to producer")) + mp.t.Fatal("Mock producer failed to initialize itself properly") } + return mp } -func (mp *mockProducerImpl) Send(payload []byte) error { +func (mp *mockProducerImpl) Send(cp ChainPartition, payload []byte) error { mp.producer.ExpectSendMessageWithCheckerFunctionAndSucceed(mp.checker) - mp.producedOffset++ - prt, ofs, err := mp.producer.SendMessage(newMsg(payload, mp.config.Kafka.Topic)) - if err != nil || - prt != mp.config.Kafka.PartitionID || - ofs != mp.producedOffset { - mp.t.Fatal("Producer not functioning as expected") + mp.producedOffset++ // This is the offset that will be assigned to the sent message + _, ofs, err := mp.producer.SendMessage(newProducerMessage(cp, payload)) + // We do NOT check the assigned partition because the mock + // producer always posts to partition 0 no matter what. + // This is a deficiency of the Kafka library that we use. + if err != nil || ofs != mp.producedOffset { + mp.t.Fatal("Mock producer not functioning as expected") } - mp.disk <- payload // Reaches the broker's disk + msg := new(ab.KafkaMessage) + if err := proto.Unmarshal(payload, msg); err != nil { + mp.t.Fatalf("Failed to unmarshal message that reached producer's disk: %s", err) + } + mp.disk <- msg // Reaches the cluster's disk for that chain partition return err } @@ -68,26 +81,38 @@ func (mp *mockProducerImpl) Close() error { return mp.producer.Close() } -func (mp *mockProducerImpl) testFillWithBlocks(seek int64) { - dyingChan := make(chan struct{}) +// Initializes the mock producer by setting up the offsets. +func (mp *mockProducerImpl) init(cp ChainPartition, offset int64) { + if offset >= testOldestOffset && offset <= (testNewestOffset-1) { + // Prepare the producer so that the next Send + // on that chain partition gives you blob #offset. + mp.testFillWithBlocks(cp, offset-1) + } else { + panic(fmt.Errorf("Out of range offset (seek number) given to producer: %d", offset)) + } +} + +func (mp *mockProducerImpl) testFillWithBlocks(cp ChainPartition, offset int64) { + dieChan := make(chan struct{}) deadChan := make(chan struct{}) go func() { // This goroutine is meant to read only the "fill-in" blocks for { select { case <-mp.disk: - case <-dyingChan: + case <-dieChan: close(deadChan) return } } }() - for i := int64(1); i <= seek; i++ { - mp.Send([]byte("fill-in")) + for i := int64(1); i <= offset; i++ { + mp.Send(cp, utils.MarshalOrPanic(newRegularMessage(utils.MarshalOrPanic(newTestEnvelope(fmt.Sprintf("producer fill-in %d", i)))))) } - close(dyingChan) + close(dieChan) <-deadChan + return } diff --git a/orderer/kafka/producer_test.go b/orderer/kafka/producer_test.go index 2ec33cbbd8f..7d649cee660 100644 --- a/orderer/kafka/producer_test.go +++ b/orderer/kafka/producer_test.go @@ -16,9 +16,24 @@ limitations under the License. package kafka -import "testing" +import ( + "testing" -func TestProducer(t *testing.T) { - mp := mockNewProducer(t, testConf, testMiddleOffset, make(chan []byte)) + "github.com/hyperledger/fabric/orderer/common/bootstrap/provisional" + ab "github.com/hyperledger/fabric/protos/orderer" + "github.com/hyperledger/fabric/protos/utils" +) + +func TestProducerSend(t *testing.T) { + cp := newChainPartition(provisional.TestChainID, rawPartition) + mp := mockNewProducer(t, cp, testMiddleOffset, make(chan *ab.KafkaMessage)) defer testClose(t, mp) + + go func() { + <-mp.(*mockProducerImpl).disk // Retrieve the message that we'll be sending below + }() + + if err := mp.Send(cp, utils.MarshalOrPanic(newRegularMessage([]byte("foo")))); err != nil { + t.Fatalf("Mock producer was not initialized correctly: %s", err) + } } diff --git a/orderer/kafka/util.go b/orderer/kafka/util.go index ad31b5021f7..a20b6283926 100644 --- a/orderer/kafka/util.go +++ b/orderer/kafka/util.go @@ -17,24 +17,21 @@ limitations under the License. package kafka import ( + "strconv" + "github.com/Shopify/sarama" - "github.com/hyperledger/fabric/orderer/localconfig" ab "github.com/hyperledger/fabric/protos/orderer" ) -const ( - ackOutOfRangeError = "ACK out of range" - seekOutOfRangeError = "Seek out of range" - windowOutOfRangeError = "Window out of range" -) - -func newBrokerConfig(conf *config.TopLevel) *sarama.Config { +// TODO Set the returned config file to more appropriate +// defaults as we're getting closer to a stable release +func newBrokerConfig(kafkaVersion sarama.KafkaVersion, chosenStaticPartition int32) *sarama.Config { brokerConfig := sarama.NewConfig() - brokerConfig.Version = conf.Kafka.Version + brokerConfig.Version = kafkaVersion // A partitioner is actually not needed the way we do things now, // but we're adding it now to allow for flexibility in the future. - brokerConfig.Producer.Partitioner = newStaticPartitioner(conf.Kafka.PartitionID) + brokerConfig.Producer.Partitioner = newStaticPartitioner(chosenStaticPartition) // Set equivalent of kafka producer config max.request.bytes to the deafult // value of a Kafka broker's socket.request.max.bytes property (100 MiB). brokerConfig.Producer.MaxMessageBytes = int(sarama.MaxRequestSize) @@ -72,20 +69,21 @@ func newTimeToCutMessage(blockNumber uint64) *ab.KafkaMessage { } } -func newMsg(payload []byte, topic string) *sarama.ProducerMessage { +func newProducerMessage(cp ChainPartition, payload []byte) *sarama.ProducerMessage { return &sarama.ProducerMessage{ - Topic: topic, + Topic: cp.Topic(), + Key: sarama.StringEncoder(strconv.Itoa(int(cp.Partition()))), // TODO Consider writing an IntEncoder? Value: sarama.ByteEncoder(payload), } } -func newOffsetReq(conf *config.TopLevel, seek int64) *sarama.OffsetRequest { +func newOffsetReq(cp ChainPartition, offset int64) *sarama.OffsetRequest { req := &sarama.OffsetRequest{} - // If seek == -1, ask for the for the offset assigned to next new message - // If seek == -2, ask for the earliest available offset + // If offset (seek) == -1, ask for the offset assigned to next new message. + // If offset (seek) == -2, ask for the earliest available offset. // The last parameter in the AddBlock call is needed for God-knows-why reasons. // From the Kafka folks themselves: "We agree that this API is slightly funky." // https://mail-archives.apache.org/mod_mbox/kafka-users/201411.mbox/%3Cc159383825e04129b77253ffd6c448aa@BY2PR02MB505.namprd02.prod.outlook.com%3E - req.AddBlock(conf.Kafka.Topic, conf.Kafka.PartitionID, seek, 1) + req.AddBlock(cp.Topic(), cp.Partition(), offset, 1) return req } diff --git a/orderer/kafka/util_test.go b/orderer/kafka/util_test.go index eddae299888..6a8a45713dc 100644 --- a/orderer/kafka/util_test.go +++ b/orderer/kafka/util_test.go @@ -20,25 +20,31 @@ import ( "testing" "github.com/Shopify/sarama" + "github.com/hyperledger/fabric/orderer/common/bootstrap/provisional" ) func TestProducerConfigMessageMaxBytes(t *testing.T) { + cp := newChainPartition(provisional.TestChainID, rawPartition) - topic := testConf.Kafka.Topic - - broker := sarama.NewMockBroker(t, 1000) + broker := sarama.NewMockBroker(t, 1) + defer func() { + broker.Close() + }() broker.SetHandlerByMap(map[string]sarama.MockResponse{ "MetadataRequest": sarama.NewMockMetadataResponse(t). SetBroker(broker.Addr(), broker.BrokerID()). - SetLeader(topic, 0, broker.BrokerID()), + SetLeader(cp.Topic(), cp.Partition(), broker.BrokerID()), "ProduceRequest": sarama.NewMockProduceResponse(t), }) - config := newBrokerConfig(testConf) + config := newBrokerConfig(testConf.Kafka.Version, rawPartition) producer, err := sarama.NewSyncProducer([]string{broker.Addr()}, config) if err != nil { t.Fatal(err) } + defer func() { + producer.Close() + }() testCases := []struct { name string @@ -51,56 +57,53 @@ func TestProducerConfigMessageMaxBytes(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - _, _, err = producer.SendMessage(&sarama.ProducerMessage{Topic: topic, Value: sarama.ByteEncoder(make([]byte, tc.size))}) + _, _, err = producer.SendMessage(&sarama.ProducerMessage{ + Topic: cp.Topic(), + Value: sarama.ByteEncoder(make([]byte, tc.size)), + }) if err != tc.err { t.Fatal(err) } }) } - - producer.Close() - broker.Close() } func TestNewBrokerConfig(t *testing.T) { + // Use a partition ID that is not the 'default' (rawPartition) + var differentPartition int32 = 2 + cp = newChainPartition(provisional.TestChainID, differentPartition) - topic := testConf.Kafka.Topic - - // use a partition id that is not the 'default' 0 - var partition int32 = 2 - originalPartitionID := testConf.Kafka.PartitionID + // Setup a mock broker that reports that it has 3 partitions for the topic + broker := sarama.NewMockBroker(t, 1) defer func() { - testConf.Kafka.PartitionID = originalPartitionID + broker.Close() }() - testConf.Kafka.PartitionID = partition - - // setup a mock broker that reports that it has 3 partitions for the topic - broker := sarama.NewMockBroker(t, 1000) broker.SetHandlerByMap(map[string]sarama.MockResponse{ "MetadataRequest": sarama.NewMockMetadataResponse(t). SetBroker(broker.Addr(), broker.BrokerID()). - SetLeader(topic, 0, broker.BrokerID()). - SetLeader(topic, 1, broker.BrokerID()). - SetLeader(topic, 2, broker.BrokerID()), + SetLeader(cp.Topic(), 0, broker.BrokerID()). + SetLeader(cp.Topic(), 1, broker.BrokerID()). + SetLeader(cp.Topic(), 2, broker.BrokerID()), "ProduceRequest": sarama.NewMockProduceResponse(t), }) - config := newBrokerConfig(testConf) + config := newBrokerConfig(testConf.Kafka.Version, differentPartition) producer, err := sarama.NewSyncProducer([]string{broker.Addr()}, config) if err != nil { - t.Fatal(err) + t.Fatal("Failed to create producer:", err) } + defer func() { + producer.Close() + }() for i := 0; i < 10; i++ { - assignedPartition, _, err := producer.SendMessage(&sarama.ProducerMessage{Topic: topic}) + assignedPartition, _, err := producer.SendMessage(&sarama.ProducerMessage{Topic: cp.Topic()}) if err != nil { - t.Fatal(err) + t.Fatal("Failed to send message:", err) } - if assignedPartition != partition { - t.Fatalf("Expected: %v. Actual: %v", partition, assignedPartition) + if assignedPartition != differentPartition { + t.Fatalf("Message wasn't posted to the right partition - expected: %d, got %v", differentPartition, assignedPartition) } } - producer.Close() - broker.Close() } diff --git a/orderer/localconfig/config.go b/orderer/localconfig/config.go index 7be925fe91d..50bd29ec139 100644 --- a/orderer/localconfig/config.go +++ b/orderer/localconfig/config.go @@ -70,12 +70,10 @@ type FileLedger struct { // Kafka contains config for the Kafka orderer type Kafka struct { - Brokers []string // TODO This should be deprecated and this information should be stored in the config block - Topic string - PartitionID int32 - Retry Retry - Verbose bool - Version sarama.KafkaVersion + Brokers []string // TODO This should be deprecated and this information should be stored in the config block + Retry Retry + Verbose bool + Version sarama.KafkaVersion } // Retry contains config for the reconnection attempts to the Kafka brokers @@ -120,14 +118,13 @@ var defaults = TopLevel{ Prefix: "hyperledger-fabric-rawledger", }, Kafka: Kafka{ - Brokers: []string{"127.0.0.1:9092"}, - Topic: "test", - PartitionID: 0, - Version: sarama.V0_9_0_1, + Brokers: []string{"127.0.0.1:9092"}, Retry: Retry{ Period: 3 * time.Second, Stop: 60 * time.Second, }, + Verbose: false, + Version: sarama.V0_9_0_1, }, } @@ -171,9 +168,6 @@ func (c *TopLevel) completeInitialization() { case c.Kafka.Brokers == nil: logger.Infof("Kafka.Brokers unset, setting to %v", defaults.Kafka.Brokers) c.Kafka.Brokers = defaults.Kafka.Brokers - case c.Kafka.Topic == "": - logger.Infof("Kafka.Topic unset, setting to %v", defaults.Kafka.Topic) - c.Kafka.Topic = defaults.Kafka.Topic case c.Kafka.Retry.Period == 0*time.Second: logger.Infof("Kafka.Retry.Period unset, setting to %v", defaults.Kafka.Retry.Period) c.Kafka.Retry.Period = defaults.Kafka.Retry.Period diff --git a/orderer/main.go b/orderer/main.go index d06327f09b7..f7051f8687c 100644 --- a/orderer/main.go +++ b/orderer/main.go @@ -24,8 +24,8 @@ import ( "net/http" _ "net/http/pprof" "os" - "os/signal" + "github.com/Shopify/sarama" "github.com/hyperledger/fabric/orderer/common/bootstrap/provisional" "github.com/hyperledger/fabric/orderer/kafka" "github.com/hyperledger/fabric/orderer/localconfig" @@ -37,18 +37,21 @@ import ( cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" - "github.com/Shopify/sarama" "github.com/op/go-logging" "google.golang.org/grpc" ) var logger = logging.MustGetLogger("orderer/main") +func init() { + logging.SetLevel(logging.DEBUG, "") +} + func main() { conf := config.Load() - // Start the profiling service if enabled. The ListenAndServe() - // call does not return unless an error occurs. + // Start the profiling service if enabled. + // The ListenAndServe() call does not return unless an error occurs. if conf.General.Profile.Enabled { go func() { logger.Infof("Starting Go pprof profiling service on %s", conf.General.Profile.Address) @@ -56,21 +59,6 @@ func main() { }() } - switch conf.General.OrdererType { - case "solo": - launchGeneric(conf) - case "kafka": - launchKafka(conf) - default: - panic("Invalid orderer type specified in config") - } -} - -func init() { - logging.SetLevel(logging.DEBUG, "") -} - -func launchGeneric(conf *config.TopLevel) { grpcServer := grpc.NewServer() lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", conf.General.ListenAddress, conf.General.ListenPort)) @@ -100,7 +88,6 @@ func launchGeneric(conf *config.TopLevel) { panic(fmt.Errorf("Error creating temp dir: %s", err)) } } - lf, _ = fileledger.New(location, genesisBlock) case "ram": fallthrough @@ -108,8 +95,13 @@ func launchGeneric(conf *config.TopLevel) { lf, _ = ramledger.New(int(conf.RAMLedger.HistorySize), genesisBlock) } + if conf.Kafka.Verbose { + sarama.Logger = log.New(os.Stdout, "[sarama] ", log.Lshortfile) + } + consenters := make(map[string]multichain.Consenter) consenters["solo"] = solo.New(conf.General.BatchTimeout) + consenters["kafka"] = kafka.New(conf.Kafka.Version, conf.Kafka.Retry) manager := multichain.NewManagerImpl(lf, consenters) @@ -122,34 +114,3 @@ func launchGeneric(conf *config.TopLevel) { ab.RegisterAtomicBroadcastServer(grpcServer, server) grpcServer.Serve(lis) } - -func launchKafka(conf *config.TopLevel) { - var kafkaVersion = sarama.V0_9_0_1 // TODO Ideally we'd set this in the YAML file but its type makes this impossible - conf.Kafka.Version = kafkaVersion - - if conf.Kafka.Verbose { - sarama.Logger = log.New(os.Stdout, "[sarama] ", log.Lshortfile) - } - - ordererSrv := kafka.New(conf) - defer ordererSrv.Teardown() - - lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", conf.General.ListenAddress, conf.General.ListenPort)) - if err != nil { - panic(err) - } - rpcSrv := grpc.NewServer() // TODO Add TLS support - ab.RegisterAtomicBroadcastServer(rpcSrv, ordererSrv) - go rpcSrv.Serve(lis) - - // Trap SIGINT to trigger a shutdown - // We must use a buffered channel or risk missing the signal - // if we're not ready to receive when the signal is sent. - signalChan := make(chan os.Signal, 1) - signal.Notify(signalChan, os.Interrupt) - - for range signalChan { - logger.Info("Server shutting down") - return - } -} diff --git a/orderer/orderer.yaml b/orderer/orderer.yaml index 685b76e95ad..e219ac122ba 100644 --- a/orderer/orderer.yaml +++ b/orderer/orderer.yaml @@ -14,8 +14,7 @@ General: OrdererType: solo # Ledger Type: The ledger type to provide to the orderer (if needed) - # Available types are "ram", "file". When "kafka" is chosen as the - # OrdererType, this option is ignored. + # Available types are "ram", "file". LedgerType: ram # Batch Timeout: The amount of time to wait before creating a batch @@ -25,7 +24,7 @@ General: BatchSize: 10 # Queue Size: The maximum number of messages to allow pending from a gRPC - # client. This option is currently ignored for the Kafka OrdererType. + # client. QueueSize: 10 # Max Window Size: The maximum number of messages to for the orderer Deliver @@ -92,13 +91,6 @@ Kafka: Brokers: - 127.0.0.1:9092 - # Topic: The Kafka topic the orderer writes to/reads from - Topic: test - - # Partition ID: The partition of the Kafka topic the orderer writes to/reads - # from - PartitionID: 0 - # Retry: What to do if none of the Kafka brokers are available Retry: # The producer should attempt to reconnect every @@ -106,5 +98,6 @@ Kafka: # Panic if has elapsed and no connection has been established Stop: 60s - # Verbose: Turn on logging for the Kafka library + # Verbose: Turn on logging for sarama, the client library that we use to + # interact with the Kafka cluster Verbose: false