Skip to content

Commit

Permalink
Merge branch 'phu/klog'
Browse files Browse the repository at this point in the history
  • Loading branch information
NgoKimPhu committed Dec 29, 2023
2 parents e60177d + 660c1c6 commit d076e15
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 15 deletions.
27 changes: 16 additions & 11 deletions batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import (
"sync/atomic"
"time"

"github.com/KyberNetwork/logger"
"github.com/pkg/errors"

"github.com/KyberNetwork/kutils/klog"
)

//go:generate mockgen -source=batcher.go -destination mocks/mocks.go -package mocks
Expand Down Expand Up @@ -77,7 +78,8 @@ func (c *ChanTask[R]) Result() (R, error) {
func (c *ChanTask[R]) Resolve(ret R, err error) {
select {
case <-c.done:
logger.Errorf("ChanTask.Resolve|called twice, ignored|c.Ret=%v,c.Err=%v|Ret=%v,Err=%v", c.Ret, c.Err, ret, err)
klog.Errorf(c.ctx, "ChanTask.Resolve|called twice, ignored|c.Ret=%v,c.Err=%v|Ret=%v,Err=%v",
c.Ret, c.Err, ret, err)
default:
c.Ret, c.Err = ret, err
close(c.done)
Expand Down Expand Up @@ -143,7 +145,8 @@ func (b *ChanBatcher[T, R]) batchFnWithRecover(tasks []T) {
if p == nil {
return
}
logger.Errorf("ChanBatcher.goBatchFn|recovered from panic: %v\n%s", p, string(debug.Stack()))
klog.Errorf(context.Background(), "ChanBatcher.goBatchFn|recovered from panic: %v\n%s",
p, string(debug.Stack()))
var ret R
err, ok := p.(error)
if ok {
Expand All @@ -166,7 +169,8 @@ func (b *ChanBatcher[T, R]) batchFnWithRecover(tasks []T) {
func (b *ChanBatcher[T, R]) worker() {
defer func() {
if p := recover(); p != nil {
logger.Errorf("ChanBatcher.worker|recovered from panic: %v\n%s", p, string(debug.Stack()))
klog.Errorf(context.Background(), "ChanBatcher.worker|recovered from panic: %v\n%s",
p, string(debug.Stack()))
}
}()
var tasks []T
Expand All @@ -178,29 +182,30 @@ func (b *ChanBatcher[T, R]) worker() {
if len(tasks) == 0 {
break
}
logger.Debugf("ChanBatcher.worker|timer|%d tasks", len(tasks))
klog.Debugf(tasks[0].Ctx(), "ChanBatcher.worker|timer|%d tasks", len(tasks))
go b.batchFnWithRecover(tasks)
tasks = tasks[:0:0]
case task, ok := <-b.taskCh:
ctx := task.Ctx()
if !ok {
logger.Debugf("ChanBatcher.worker|closed|%d tasks", len(tasks))
klog.Debugf(ctx, "ChanBatcher.worker|closed|%d tasks", len(tasks))
if len(tasks) > 0 {
go b.batchFnWithRecover(tasks)
}
return
}
if !task.IsDone() {
select {
case <-task.Ctx().Done():
logger.Infof("ChanBatcher.worker|skip|task=%v", task)
task.Resolve(*new(R), task.Ctx().Err())
case <-ctx.Done():
klog.Infof(ctx, "ChanBatcher.worker|skip|task=%v", task)
task.Resolve(*new(R), ctx.Err())
continue
default:
}
}
duration, batchCount := b.batchCfg()
if len(tasks) == 0 {
logger.Debugf("ChanBatcher.worker|timer start|duration=%s", duration)
klog.Debugf(ctx, "ChanBatcher.worker|timer start|duration=%s", duration)
if !batchTimer.Stop() {
select {
case <-batchTimer.C:
Expand All @@ -211,7 +216,7 @@ func (b *ChanBatcher[T, R]) worker() {
}
tasks = append(tasks, task)
if len(tasks) >= batchCount {
logger.Debugf("ChanBatcher.worker|max|%d tasks", len(tasks))
klog.Debugf(ctx, "ChanBatcher.worker|max|%d tasks", len(tasks))
go b.batchFnWithRecover(tasks)
tasks = tasks[:0:0]
}
Expand Down
5 changes: 3 additions & 2 deletions batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import (
"testing"
"time"

"github.com/KyberNetwork/logger"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"

"github.com/KyberNetwork/kutils/klog"
)

func TestChanBatcher(t *testing.T) {
Expand Down Expand Up @@ -70,7 +71,7 @@ func TestChanBatcher(t *testing.T) {
batcher.Batch(tasks[i])
}
// 1k: 2.561804ms; 1M: 2.62s - average overhead per task = 2.6µs
logger.Warnf("done %d tasks in %v", taskCnt, time.Since(start))
klog.Warnf(ctx, "done %d tasks in %v", taskCnt, time.Since(start))
for i := 0; i < taskCnt; i++ {
ret, err := tasks[i].Result()
assert.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import (
"net/http"
"time"

"github.com/KyberNetwork/kutils/internal/json"

"github.com/go-resty/resty/v2"
"github.com/hashicorp/go-retryablehttp"

"github.com/KyberNetwork/kutils/internal/json"
)

// HttpCfg is the resty http client configs
Expand Down
144 changes: 144 additions & 0 deletions klog/klog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package klog

import (
"context"

"github.com/KyberNetwork/logger"
)

func DefaultLogger() Logger {
return logger.DefaultLogger()
}

type Logger logger.Logger

var log logger.Logger

type Configuration struct {
EnableConsole bool
EnableJSONFormat bool
ConsoleLevel string
EnableFile bool
FileJSONFormat bool
FileLevel string
FileLocation string
}

type LoggerBackend logger.LoggerBackend

const (
// LoggerBackendZap logging using Uber's zap backend
LoggerBackendZap = LoggerBackend(logger.LoggerBackendZap)
// LoggerBackendLogrus logging using logrus backend
LoggerBackendLogrus = LoggerBackend(logger.LoggerBackendLogrus)
)

func InitLogger(config Configuration, backend LoggerBackend) (Logger, error) {
var err error
log, err = logger.InitLogger(logger.Configuration{
EnableConsole: config.EnableConsole,
EnableJSONFormat: config.EnableJSONFormat,
ConsoleLevel: config.ConsoleLevel,
EnableFile: config.EnableFile,
FileJSONFormat: config.FileJSONFormat,
FileLevel: config.FileLevel,
FileLocation: config.FileLocation,
}, logger.LoggerBackend(backend))
return log, err
}

func Log() Logger {
if log == nil {
log = DefaultLogger()
}
return log
}

func NewLogger(config Configuration, backend LoggerBackend) (Logger, error) {
return logger.NewLogger(logger.Configuration{
EnableConsole: config.EnableConsole,
EnableJSONFormat: config.EnableJSONFormat,
ConsoleLevel: config.ConsoleLevel,
EnableFile: config.EnableFile,
FileJSONFormat: config.FileJSONFormat,
FileLevel: config.FileLevel,
FileLocation: config.FileLocation,
}, logger.LoggerBackend(backend))
}

type CtxKeyLogger struct{}

var ctxKeyLogger CtxKeyLogger

func LoggerFromCtx(ctx context.Context) Logger {
if ctx == nil {
return Log()
}
ctxLog, _ := ctx.Value(ctxKeyLogger).(Logger)
if ctxLog != nil {
return ctxLog
}
return Log()
}

func CtxWithLogger(ctx context.Context, log Logger) context.Context {
return context.WithValue(ctx, ctxKeyLogger, log)
}

func Debug(ctx context.Context, msg string) {
LoggerFromCtx(ctx).Debug(msg)
}

func Debugf(ctx context.Context, format string, args ...any) {
LoggerFromCtx(ctx).Debugf(format, args...)
}

func Info(ctx context.Context, msg string) {
LoggerFromCtx(ctx).Info(msg)
}

func Infof(ctx context.Context, format string, args ...any) {
LoggerFromCtx(ctx).Infof(format, args...)
}

func Infoln(ctx context.Context, msg string) {
LoggerFromCtx(ctx).Infoln(msg)
}

func Warn(ctx context.Context, msg string) {
LoggerFromCtx(ctx).Warn(msg)
}

func Warnf(ctx context.Context, format string, args ...any) {
LoggerFromCtx(ctx).Warnf(format, args...)
}

func Error(ctx context.Context, msg string) {
LoggerFromCtx(ctx).Error(msg)
}

func Errorf(ctx context.Context, format string, args ...any) {
LoggerFromCtx(ctx).Errorf(format, args...)
}

func Fatal(ctx context.Context, msg string) {
LoggerFromCtx(ctx).Fatal(msg)
}

func Fatalf(ctx context.Context, format string, args ...any) {
LoggerFromCtx(ctx).Fatalf(format, args...)
}

type Fields logger.Fields

func WithFields(ctx context.Context, keyValues Fields) Logger {
return LoggerFromCtx(ctx).WithFields(logger.Fields(keyValues))
}

func GetDelegate(ctx context.Context) any {
return LoggerFromCtx(ctx).GetDelegate()
}

func SetLogLevel(ctx context.Context, level string) error {
return LoggerFromCtx(ctx).SetLogLevel(level)
}

0 comments on commit d076e15

Please sign in to comment.