From d06c01294c1b6696a1355d6e67e496f504a9b0f2 Mon Sep 17 00:00:00 2001 From: Luis Sanchez Date: Thu, 14 Sep 2017 13:56:21 -0400 Subject: [PATCH] [FAB-6167] use go-logging for sarama logging - sarama kafka client library will log to a go-logging logger with id: orderer/consensus/kafka/sarama. - the logger can be enabled via Kafka.Verbose config or by explicitly setting to DEBUG in the log specification string. Change-Id: Ieb91ef06a7d7b8587b711439d26e116d12260dd9 Signed-off-by: Luis Sanchez --- orderer/common/server/main.go | 7 +-- orderer/common/server/main_test.go | 3 -- orderer/consensus/kafka/consenter.go | 14 ++---- orderer/consensus/kafka/consenter_test.go | 14 ++---- orderer/consensus/kafka/logger.go | 56 +++++++++++++++++++++++ orderer/consensus/kafka/logger_test.go | 18 ++++++++ 6 files changed, 83 insertions(+), 29 deletions(-) create mode 100644 orderer/consensus/kafka/logger.go create mode 100644 orderer/consensus/kafka/logger_test.go diff --git a/orderer/common/server/main.go b/orderer/common/server/main.go index 2a58b8e5663..cee02c2e7f9 100644 --- a/orderer/common/server/main.go +++ b/orderer/common/server/main.go @@ -9,7 +9,6 @@ package server import ( "fmt" "io/ioutil" - "log" "net" "net/http" _ "net/http/pprof" // This is essentially the main package for the orderer @@ -32,7 +31,6 @@ import ( ab "github.com/hyperledger/fabric/protos/orderer" "github.com/hyperledger/fabric/protos/utils" - "github.com/Shopify/sarama" "github.com/hyperledger/fabric/common/localmsp" mspmgmt "github.com/hyperledger/fabric/msp/mgmt" "github.com/hyperledger/fabric/orderer/common/performance" @@ -100,9 +98,6 @@ func Start(cmd string, conf *config.TopLevel) { func initializeLoggingLevel(conf *config.TopLevel) { flogging.InitBackend(flogging.SetFormat(conf.General.LogFormat), os.Stderr) flogging.InitFromSpec(conf.General.LogLevel) - if conf.Kafka.Verbose { - sarama.Logger = log.New(os.Stdout, "[sarama] ", log.Ldate|log.Lmicroseconds|log.Lshortfile) - } } // Start the profiling service if enabled. @@ -227,7 +222,7 @@ func initializeMultichannelRegistrar(conf *config.TopLevel, signer crypto.LocalS consenters := make(map[string]consensus.Consenter) consenters["solo"] = solo.New() - consenters["kafka"] = kafka.New(conf.Kafka.TLS, conf.Kafka.Retry, conf.Kafka.Version) + consenters["kafka"] = kafka.New(conf.Kafka.TLS, conf.Kafka.Retry, conf.Kafka.Version, conf.Kafka.Verbose) return multichannel.NewRegistrar(lf, consenters, signer) } diff --git a/orderer/common/server/main_test.go b/orderer/common/server/main_test.go index ba7b2f40394..ebec5f2988d 100644 --- a/orderer/common/server/main_test.go +++ b/orderer/common/server/main_test.go @@ -17,7 +17,6 @@ import ( "testing" "time" - "github.com/Shopify/sarama" "github.com/hyperledger/fabric/bccsp/factory" "github.com/hyperledger/fabric/common/flogging" "github.com/hyperledger/fabric/common/localmsp" @@ -40,11 +39,9 @@ func TestInitializeLoggingLevel(t *testing.T) { // global log level setting in tests of this package (for example, // the benchmark-related ones) that would occur otherwise. General: config.General{LogLevel: "foo=debug"}, - Kafka: config.Kafka{Verbose: true}, }, ) assert.Equal(t, flogging.GetModuleLevel("foo"), "DEBUG") - assert.NotNil(t, sarama.Logger) } func TestInitializeProfilingService(t *testing.T) { diff --git a/orderer/consensus/kafka/consenter.go b/orderer/consensus/kafka/consenter.go index a7214a1b5e4..2c8eac74c06 100644 --- a/orderer/consensus/kafka/consenter.go +++ b/orderer/consensus/kafka/consenter.go @@ -8,23 +8,17 @@ package kafka import ( "github.com/Shopify/sarama" - "github.com/hyperledger/fabric/common/flogging" localconfig "github.com/hyperledger/fabric/orderer/common/localconfig" "github.com/hyperledger/fabric/orderer/consensus" cb "github.com/hyperledger/fabric/protos/common" logging "github.com/op/go-logging" ) -const pkgLogID = "orderer/consensus/kafka" - -var logger *logging.Logger - -func init() { - logger = flogging.MustGetLogger(pkgLogID) -} - // New creates a Kafka-based consenter. Called by orderer's main.go. -func New(tlsConfig localconfig.TLS, retryOptions localconfig.Retry, kafkaVersion sarama.KafkaVersion) consensus.Consenter { +func New(tlsConfig localconfig.TLS, retryOptions localconfig.Retry, kafkaVersion sarama.KafkaVersion, verbose bool) consensus.Consenter { + if verbose { + logging.SetLevel(logging.DEBUG, saramaLogID) + } brokerConfig := newBrokerConfig(tlsConfig, retryOptions, kafkaVersion, defaultPartition) return &consenterImpl{ brokerConfigVal: brokerConfig, diff --git a/orderer/consensus/kafka/consenter_test.go b/orderer/consensus/kafka/consenter_test.go index b41b4f86104..a6d234b2a39 100644 --- a/orderer/consensus/kafka/consenter_test.go +++ b/orderer/consensus/kafka/consenter_test.go @@ -8,8 +8,6 @@ package kafka import ( "fmt" - "log" - "os" "strings" "testing" "time" @@ -54,15 +52,15 @@ func init() { mockLocalConfig = newMockLocalConfig(false, mockRetryOptions, false) mockBrokerConfig = newMockBrokerConfig(mockLocalConfig.General.TLS, mockLocalConfig.Kafka.Retry, mockLocalConfig.Kafka.Version, defaultPartition) mockConsenter = newMockConsenter(mockBrokerConfig, mockLocalConfig.General.TLS, mockLocalConfig.Kafka.Retry, mockLocalConfig.Kafka.Version) - setupTestLogging("ERROR", mockLocalConfig.Kafka.Verbose) + setupTestLogging("ERROR") } func TestNew(t *testing.T) { - _ = consensus.Consenter(New(mockLocalConfig.General.TLS, mockLocalConfig.Kafka.Retry, mockLocalConfig.Kafka.Version)) + _ = consensus.Consenter(New(mockLocalConfig.General.TLS, mockLocalConfig.Kafka.Retry, mockLocalConfig.Kafka.Version, mockLocalConfig.Kafka.Verbose)) } func TestHandleChain(t *testing.T) { - consenter := consensus.Consenter(New(mockLocalConfig.General.TLS, mockLocalConfig.Kafka.Retry, mockLocalConfig.Kafka.Version)) + consenter := consensus.Consenter(New(mockLocalConfig.General.TLS, mockLocalConfig.Kafka.Retry, mockLocalConfig.Kafka.Version, mockLocalConfig.Kafka.Verbose)) oldestOffset := int64(0) newestOffset := int64(5) @@ -154,15 +152,11 @@ func newMockLocalConfig(enableTLS bool, retryOptions localconfig.Retry, verboseL } } -func setupTestLogging(logLevel string, verbose bool) { +func setupTestLogging(logLevel string) { // This call allows us to (a) get the logging backend initialization that // takes place in the `flogging` package, and (b) adjust the verbosity of // the logs when running tests on this package. flogging.SetModuleLevel(pkgLogID, logLevel) - - if verbose { - sarama.Logger = log.New(os.Stdout, "[sarama] ", log.Ldate|log.Lmicroseconds|log.Lshortfile) - } } func tamperBytes(original []byte) []byte { diff --git a/orderer/consensus/kafka/logger.go b/orderer/consensus/kafka/logger.go new file mode 100644 index 00000000000..62123afa245 --- /dev/null +++ b/orderer/consensus/kafka/logger.go @@ -0,0 +1,56 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package kafka + +import ( + "fmt" + + "github.com/Shopify/sarama" + "github.com/hyperledger/fabric/common/flogging" + logging "github.com/op/go-logging" +) + +const ( + pkgLogID = "orderer/consensus/kafka" + saramaLogID = pkgLogID + "/sarama" +) + +var logger *logging.Logger + +// init initializes the package logger +func init() { + logger = flogging.MustGetLogger(pkgLogID) +} + +// init initializes the samara logger +func init() { + loggingProvider := flogging.MustGetLogger(saramaLogID) + loggingProvider.ExtraCalldepth = 3 + sarama.Logger = &saramaLoggerImpl{ + logger: loggingProvider, + } +} + +type saramaLoggerImpl struct { + logger *logging.Logger +} + +func (l saramaLoggerImpl) Print(args ...interface{}) { + l.print(fmt.Sprint(args...)) +} + +func (l saramaLoggerImpl) Printf(format string, args ...interface{}) { + l.print(fmt.Sprintf(format, args...)) +} + +func (l saramaLoggerImpl) Println(args ...interface{}) { + l.print(fmt.Sprintln(args...)) +} + +func (l saramaLoggerImpl) print(message string) { + l.logger.Debug(message) +} diff --git a/orderer/consensus/kafka/logger_test.go b/orderer/consensus/kafka/logger_test.go new file mode 100644 index 00000000000..1283dec2cd5 --- /dev/null +++ b/orderer/consensus/kafka/logger_test.go @@ -0,0 +1,18 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package kafka + +import ( + "testing" + + "github.com/Shopify/sarama" + "github.com/stretchr/testify/assert" +) + +func TestLoggerInit(t *testing.T) { + assert.IsType(t, &saramaLoggerImpl{}, sarama.Logger, "Sarama logger not properly initialized") +}