Skip to content

Commit

Permalink
[FAB-6167] use go-logging for sarama logging
Browse files Browse the repository at this point in the history
- sarama kafka client library will log to a go-logging
  logger with id: orderer/consensus/kafka/sarama.
- the logger can be enabled via Kafka.Verbose config or
  by explicitly setting to DEBUG in the log specification
  string.

Change-Id: Ieb91ef06a7d7b8587b711439d26e116d12260dd9
Signed-off-by: Luis Sanchez <sanchezl@us.ibm.com>
  • Loading branch information
Luis Sanchez committed Sep 15, 2017
1 parent 40e41a5 commit d06c012
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 29 deletions.
7 changes: 1 addition & 6 deletions orderer/common/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ package server
import (
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
_ "net/http/pprof" // This is essentially the main package for the orderer
Expand All @@ -32,7 +31,6 @@ import (
ab "github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric/protos/utils"

"github.com/Shopify/sarama"
"github.com/hyperledger/fabric/common/localmsp"
mspmgmt "github.com/hyperledger/fabric/msp/mgmt"
"github.com/hyperledger/fabric/orderer/common/performance"
Expand Down Expand Up @@ -100,9 +98,6 @@ func Start(cmd string, conf *config.TopLevel) {
func initializeLoggingLevel(conf *config.TopLevel) {
flogging.InitBackend(flogging.SetFormat(conf.General.LogFormat), os.Stderr)
flogging.InitFromSpec(conf.General.LogLevel)
if conf.Kafka.Verbose {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.Ldate|log.Lmicroseconds|log.Lshortfile)
}
}

// Start the profiling service if enabled.
Expand Down Expand Up @@ -227,7 +222,7 @@ func initializeMultichannelRegistrar(conf *config.TopLevel, signer crypto.LocalS

consenters := make(map[string]consensus.Consenter)
consenters["solo"] = solo.New()
consenters["kafka"] = kafka.New(conf.Kafka.TLS, conf.Kafka.Retry, conf.Kafka.Version)
consenters["kafka"] = kafka.New(conf.Kafka.TLS, conf.Kafka.Retry, conf.Kafka.Version, conf.Kafka.Verbose)

return multichannel.NewRegistrar(lf, consenters, signer)
}
3 changes: 0 additions & 3 deletions orderer/common/server/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/hyperledger/fabric/bccsp/factory"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/localmsp"
Expand All @@ -40,11 +39,9 @@ func TestInitializeLoggingLevel(t *testing.T) {
// global log level setting in tests of this package (for example,
// the benchmark-related ones) that would occur otherwise.
General: config.General{LogLevel: "foo=debug"},
Kafka: config.Kafka{Verbose: true},
},
)
assert.Equal(t, flogging.GetModuleLevel("foo"), "DEBUG")
assert.NotNil(t, sarama.Logger)
}

func TestInitializeProfilingService(t *testing.T) {
Expand Down
14 changes: 4 additions & 10 deletions orderer/consensus/kafka/consenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,17 @@ package kafka

import (
"github.com/Shopify/sarama"
"github.com/hyperledger/fabric/common/flogging"
localconfig "github.com/hyperledger/fabric/orderer/common/localconfig"
"github.com/hyperledger/fabric/orderer/consensus"
cb "github.com/hyperledger/fabric/protos/common"
logging "github.com/op/go-logging"
)

const pkgLogID = "orderer/consensus/kafka"

var logger *logging.Logger

func init() {
logger = flogging.MustGetLogger(pkgLogID)
}

// New creates a Kafka-based consenter. Called by orderer's main.go.
func New(tlsConfig localconfig.TLS, retryOptions localconfig.Retry, kafkaVersion sarama.KafkaVersion) consensus.Consenter {
func New(tlsConfig localconfig.TLS, retryOptions localconfig.Retry, kafkaVersion sarama.KafkaVersion, verbose bool) consensus.Consenter {
if verbose {
logging.SetLevel(logging.DEBUG, saramaLogID)
}
brokerConfig := newBrokerConfig(tlsConfig, retryOptions, kafkaVersion, defaultPartition)
return &consenterImpl{
brokerConfigVal: brokerConfig,
Expand Down
14 changes: 4 additions & 10 deletions orderer/consensus/kafka/consenter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ package kafka

import (
"fmt"
"log"
"os"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -54,15 +52,15 @@ func init() {
mockLocalConfig = newMockLocalConfig(false, mockRetryOptions, false)
mockBrokerConfig = newMockBrokerConfig(mockLocalConfig.General.TLS, mockLocalConfig.Kafka.Retry, mockLocalConfig.Kafka.Version, defaultPartition)
mockConsenter = newMockConsenter(mockBrokerConfig, mockLocalConfig.General.TLS, mockLocalConfig.Kafka.Retry, mockLocalConfig.Kafka.Version)
setupTestLogging("ERROR", mockLocalConfig.Kafka.Verbose)
setupTestLogging("ERROR")
}

func TestNew(t *testing.T) {
_ = consensus.Consenter(New(mockLocalConfig.General.TLS, mockLocalConfig.Kafka.Retry, mockLocalConfig.Kafka.Version))
_ = consensus.Consenter(New(mockLocalConfig.General.TLS, mockLocalConfig.Kafka.Retry, mockLocalConfig.Kafka.Version, mockLocalConfig.Kafka.Verbose))
}

func TestHandleChain(t *testing.T) {
consenter := consensus.Consenter(New(mockLocalConfig.General.TLS, mockLocalConfig.Kafka.Retry, mockLocalConfig.Kafka.Version))
consenter := consensus.Consenter(New(mockLocalConfig.General.TLS, mockLocalConfig.Kafka.Retry, mockLocalConfig.Kafka.Version, mockLocalConfig.Kafka.Verbose))

oldestOffset := int64(0)
newestOffset := int64(5)
Expand Down Expand Up @@ -154,15 +152,11 @@ func newMockLocalConfig(enableTLS bool, retryOptions localconfig.Retry, verboseL
}
}

func setupTestLogging(logLevel string, verbose bool) {
func setupTestLogging(logLevel string) {
// This call allows us to (a) get the logging backend initialization that
// takes place in the `flogging` package, and (b) adjust the verbosity of
// the logs when running tests on this package.
flogging.SetModuleLevel(pkgLogID, logLevel)

if verbose {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.Ldate|log.Lmicroseconds|log.Lshortfile)
}
}

func tamperBytes(original []byte) []byte {
Expand Down
56 changes: 56 additions & 0 deletions orderer/consensus/kafka/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package kafka

import (
"fmt"

"github.com/Shopify/sarama"
"github.com/hyperledger/fabric/common/flogging"
logging "github.com/op/go-logging"
)

const (
pkgLogID = "orderer/consensus/kafka"
saramaLogID = pkgLogID + "/sarama"
)

var logger *logging.Logger

// init initializes the package logger
func init() {
logger = flogging.MustGetLogger(pkgLogID)
}

// init initializes the samara logger
func init() {
loggingProvider := flogging.MustGetLogger(saramaLogID)
loggingProvider.ExtraCalldepth = 3
sarama.Logger = &saramaLoggerImpl{
logger: loggingProvider,
}
}

type saramaLoggerImpl struct {
logger *logging.Logger
}

func (l saramaLoggerImpl) Print(args ...interface{}) {
l.print(fmt.Sprint(args...))
}

func (l saramaLoggerImpl) Printf(format string, args ...interface{}) {
l.print(fmt.Sprintf(format, args...))
}

func (l saramaLoggerImpl) Println(args ...interface{}) {
l.print(fmt.Sprintln(args...))
}

func (l saramaLoggerImpl) print(message string) {
l.logger.Debug(message)
}
18 changes: 18 additions & 0 deletions orderer/consensus/kafka/logger_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package kafka

import (
"testing"

"github.com/Shopify/sarama"
"github.com/stretchr/testify/assert"
)

func TestLoggerInit(t *testing.T) {
assert.IsType(t, &saramaLoggerImpl{}, sarama.Logger, "Sarama logger not properly initialized")
}

0 comments on commit d06c012

Please sign in to comment.