Skip to content

Commit

Permalink
[FAB-2479] Log consumer errors
Browse files Browse the repository at this point in the history
https://jira.hyperledger.org/browse/FAB-2479

This changeset introduces the `listenForErrors` goroutine. It listens
for and logs any errors that happen during the orderer's consumption of
a Kafka partition (read: chain).

Change-Id: Ia706049ba8c4d36dcfd9c5968b97588670f6046a
Signed-off-by: Kostas Christidis <kostas@christidis.io>
  • Loading branch information
kchristidis committed Feb 25, 2017
1 parent f4448b9 commit 4aa759b
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 19 deletions.
13 changes: 11 additions & 2 deletions orderer/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
"github.com/hyperledger/fabric/orderer/localconfig"
)

// Consumer allows the caller to receive a stream of blobs from the Kafka cluster for a specific partition.
// Consumer allows the caller to receive a stream of blobs
// from the Kafka cluster for a specific partition.
type Consumer interface {
Recv() <-chan *sarama.ConsumerMessage
Errors() <-chan *sarama.ConsumerError
Closeable
}

Expand All @@ -49,11 +51,18 @@ func newConsumer(brokers []string, kafkaVersion sarama.KafkaVersion, tls config.
return c, nil
}

// Recv returns a channel with blobs received from the Kafka cluster for a partition.
// Recv returns a channel with blobs received
// from the Kafka cluster for a partition.
func (c *consumerImpl) Recv() <-chan *sarama.ConsumerMessage {
return c.partition.Messages()
}

// Errors returns a channel with errors occuring during
// the consumption of a partition from the Kafka cluster.
func (c *consumerImpl) Errors() <-chan *sarama.ConsumerError {
return c.partition.Errors()
}

// Close shuts down the partition consumer.
// Invoked by the session deliverer's Close method, which is itself called
// during the processSeek function, between disabling and enabling the push.
Expand Down
4 changes: 4 additions & 0 deletions orderer/kafka/consumer_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ func (mc *mockConsumerImpl) Recv() <-chan *sarama.ConsumerMessage {
return nil
}

func (mc *mockConsumerImpl) Errors() <-chan *sarama.ConsumerError {
return nil
}

func (mc *mockConsumerImpl) Close() error {
if err := mc.chainPartitionManager.Close(); err != nil {
return err
Expand Down
10 changes: 10 additions & 0 deletions orderer/kafka/orderer.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,21 @@ func (ch *chainImpl) Start() {
}
ch.consumer = consumer
close(ch.setupChan)
go ch.listenForErrors()

// 3. Set the loop the keep up to date with the chain.
go ch.loop()
}

func (ch *chainImpl) listenForErrors() {
select {
case <-ch.exitChan:
return
case err := <-ch.consumer.Errors():
logger.Error(err)
}
}

// Halt frees the resources which were allocated for this Chain.
// Implements the multichain.Chain interface.
func (ch *chainImpl) Halt() {
Expand Down
30 changes: 14 additions & 16 deletions orderer/kafka/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,40 +27,25 @@ import (
ab "github.com/hyperledger/fabric/protos/orderer"
)

// TODO Set the returned config file to more appropriate
// defaults as we're getting closer to a stable release
func newBrokerConfig(kafkaVersion sarama.KafkaVersion, chosenStaticPartition int32, tlsConfig config.TLS) *sarama.Config {
brokerConfig := sarama.NewConfig()

brokerConfig.Version = kafkaVersion
// Set the level of acknowledgement reliability needed from the broker.
// WaitForAll means that the partition leader will wait till all ISRs
// got the message before sending back an ACK to the sender.
brokerConfig.Producer.RequiredAcks = sarama.WaitForAll
// A partitioner is actually not needed the way we do things now,
// but we're adding it now to allow for flexibility in the future.
brokerConfig.Producer.Partitioner = newStaticPartitioner(chosenStaticPartition)
// Set equivalent of kafka producer config max.request.bytes to the deafult
// value of a Kafka broker's socket.request.max.bytes property (100 MiB).
brokerConfig.Producer.MaxMessageBytes = int(sarama.MaxRequestSize)
brokerConfig.Consumer.Return.Errors = true

brokerConfig.Net.TLS.Enable = tlsConfig.Enabled

if brokerConfig.Net.TLS.Enable {
// create public/private key pair structure
keyPair, err := tls.X509KeyPair([]byte(tlsConfig.Certificate), []byte(tlsConfig.PrivateKey))
if err != nil {
panic(fmt.Errorf("Unable to decode public/private key pair. Error: %v", err))
}

// create root CA pool
rootCAs := x509.NewCertPool()
for _, certificate := range tlsConfig.RootCAs {
if !rootCAs.AppendCertsFromPEM([]byte(certificate)) {
panic(fmt.Errorf("Unable to decode certificate. Error: %v", err))
}
}

brokerConfig.Net.TLS.Config = &tls.Config{
Certificates: []tls.Certificate{keyPair},
RootCAs: rootCAs,
Expand All @@ -69,6 +54,19 @@ func newBrokerConfig(kafkaVersion sarama.KafkaVersion, chosenStaticPartition int
}
}

// Set the level of acknowledgement reliability needed from the broker.
// WaitForAll means that the partition leader will wait till all ISRs
// got the message before sending back an ACK to the sender.
brokerConfig.Producer.RequiredAcks = sarama.WaitForAll
// A partitioner is actually not needed the way we do things now,
// but we're adding it now to allow for flexibility in the future.
brokerConfig.Producer.Partitioner = newStaticPartitioner(chosenStaticPartition)
// Set equivalent of Kafka producer config max.request.bytes to the default
// value of a Kafka broker's socket.request.max.bytes property (100 MiB).
brokerConfig.Producer.MaxMessageBytes = int(sarama.MaxRequestSize)

brokerConfig.Version = kafkaVersion

return brokerConfig
}

Expand Down
1 change: 0 additions & 1 deletion orderer/orderer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ General:
ClientAuthEnabled: false
ClientRootCAs:


# Log Level: The level at which to log. This accepts logging specifications
# per fabric/docs/Setup/logging-control.md
LogLevel: info
Expand Down

0 comments on commit 4aa759b

Please sign in to comment.