Skip to content

Commit 942bf0e

Browse files
committed
feat: add optional logger wherever possible
This commit introduces an optional logger parameter to various structs. This enhancement allows users to provide custom logging implementations.
1 parent 5069fd6 commit 942bf0e

File tree

15 files changed

+339
-170
lines changed

15 files changed

+339
-170
lines changed

internal/log.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,3 +77,66 @@ func (l LogLevelT) InfoOrAbove() bool {
7777
func (l LogLevelT) DebugOrAbove() bool {
7878
return l >= LogLevelDebug
7979
}
80+
81+
// LoggerWithLevel is a logger interface with leveled logging methods.
82+
//
83+
// This interface can be implemented by custom loggers to provide leveled logging.
84+
type LoggerWithLevel interface {
85+
// Infof logs an info level message
86+
Infof(ctx context.Context, format string, v ...interface{})
87+
88+
// Warnf logs a warning level message
89+
Warnf(ctx context.Context, format string, v ...interface{})
90+
91+
// Debugf logs a debug level message
92+
Debugf(ctx context.Context, format string, v ...interface{})
93+
94+
// Errorf logs an error level message
95+
Errorf(ctx context.Context, format string, v ...interface{})
96+
97+
// Enabled reports whether the given log level is enabled in the logger
98+
Enabled(ctx context.Context, level LogLevelT) bool
99+
}
100+
101+
// legacyLoggerAdapter is a logger that implements LoggerWithLevel interface
102+
// using the global [Logger] and [LogLevel] variables.
103+
type legacyLoggerAdapter struct{}
104+
105+
func (l *legacyLoggerAdapter) Infof(ctx context.Context, format string, v ...interface{}) {
106+
if LogLevel.InfoOrAbove() {
107+
Logger.Printf(ctx, format, v...)
108+
}
109+
}
110+
111+
func (l *legacyLoggerAdapter) Warnf(ctx context.Context, format string, v ...interface{}) {
112+
if LogLevel.WarnOrAbove() {
113+
Logger.Printf(ctx, format, v...)
114+
}
115+
}
116+
117+
func (l *legacyLoggerAdapter) Debugf(ctx context.Context, format string, v ...interface{}) {
118+
if LogLevel.DebugOrAbove() {
119+
Logger.Printf(ctx, format, v...)
120+
}
121+
}
122+
123+
func (l legacyLoggerAdapter) Errorf(ctx context.Context, format string, v ...interface{}) {
124+
Logger.Printf(ctx, format, v...)
125+
}
126+
127+
func (l legacyLoggerAdapter) Enabled(_ context.Context, level LogLevelT) bool {
128+
switch level {
129+
case LogLevelWarn:
130+
return LogLevel.WarnOrAbove()
131+
case LogLevelInfo:
132+
return LogLevel.InfoOrAbove()
133+
case LogLevelDebug:
134+
return LogLevel.DebugOrAbove()
135+
case LogLevelError:
136+
fallthrough
137+
default:
138+
return true
139+
}
140+
}
141+
142+
var LegacyLoggerWithLevel LoggerWithLevel = &legacyLoggerAdapter{}

internal/pool/pool.go

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,9 @@ type Options struct {
115115
// DialerRetryTimeout is the backoff duration between retry attempts.
116116
// Default: 100ms
117117
DialerRetryTimeout time.Duration
118+
119+
// Optional logger for connection pool operations.
120+
Logger internal.LoggerWithLevel
118121
}
119122

120123
type lastDialErrorWrap struct {
@@ -223,7 +226,7 @@ func (p *ConnPool) checkMinIdleConns() {
223226
p.idleConnsLen.Add(-1)
224227

225228
p.freeTurn()
226-
internal.Logger.Printf(context.Background(), "addIdleConn panic: %+v", err)
229+
p.logger().Errorf(context.Background(), "addIdleConn panic: %+v", err)
227230
}
228231
}()
229232

@@ -379,7 +382,7 @@ func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) {
379382
return cn, nil
380383
}
381384

382-
internal.Logger.Printf(ctx, "redis: connection pool: failed to dial after %d attempts: %v", maxRetries, lastErr)
385+
p.logger().Errorf(ctx, "redis: connection pool: failed to dial after %d attempts: %v", maxRetries, lastErr)
383386
// All retries failed - handle error tracking
384387
p.setLastDialError(lastErr)
385388
if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.cfg.PoolSize) {
@@ -452,7 +455,7 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
452455

453456
for {
454457
if attempts >= getAttempts {
455-
internal.Logger.Printf(ctx, "redis: connection pool: was not able to get a healthy connection after %d attempts", attempts)
458+
p.logger().Errorf(ctx, "redis: connection pool: was not able to get a healthy connection after %d attempts", attempts)
456459
break
457460
}
458461
attempts++
@@ -479,12 +482,12 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
479482
if hookManager != nil {
480483
acceptConn, err := hookManager.ProcessOnGet(ctx, cn, false)
481484
if err != nil {
482-
internal.Logger.Printf(ctx, "redis: connection pool: failed to process idle connection by hook: %v", err)
485+
p.logger().Errorf(ctx, "redis: connection pool: failed to process idle connection by hook: %v", err)
483486
_ = p.CloseConn(cn)
484487
continue
485488
}
486489
if !acceptConn {
487-
internal.Logger.Printf(ctx, "redis: connection pool: conn[%d] rejected by hook, returning to pool", cn.GetID())
490+
p.logger().Errorf(ctx, "redis: connection pool: conn[%d] rejected by hook, returning to pool", cn.GetID())
488491
p.Put(ctx, cn)
489492
cn = nil
490493
continue
@@ -509,7 +512,7 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
509512
// this should not happen with a new connection, but we handle it gracefully
510513
if err != nil || !acceptConn {
511514
// Failed to process connection, discard it
512-
internal.Logger.Printf(ctx, "redis: connection pool: failed to process new connection conn[%d] by hook: accept=%v, err=%v", newcn.GetID(), acceptConn, err)
515+
p.logger().Errorf(ctx, "redis: connection pool: failed to process new connection conn[%d] by hook: accept=%v, err=%v", newcn.GetID(), acceptConn, err)
513516
_ = p.CloseConn(newcn)
514517
return nil, err
515518
}
@@ -703,7 +706,7 @@ func (p *ConnPool) popIdle() (*Conn, error) {
703706

704707
// If we exhausted all attempts without finding a usable connection, return nil
705708
if attempts > 1 && attempts >= maxAttempts && int32(attempts) >= p.poolSize.Load() {
706-
internal.Logger.Printf(context.Background(), "redis: connection pool: failed to get a usable connection after %d attempts", attempts)
709+
p.logger().Errorf(context.Background(), "redis: connection pool: failed to get a usable connection after %d attempts", attempts)
707710
return nil, nil
708711
}
709712

@@ -720,7 +723,7 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
720723
// Peek at the reply type to check if it's a push notification
721724
if replyType, err := cn.PeekReplyTypeSafe(); err != nil || replyType != proto.RespPush {
722725
// Not a push notification or error peeking, remove connection
723-
internal.Logger.Printf(ctx, "Conn has unread data (not push notification), removing it")
726+
p.logger().Errorf(ctx, "Conn has unread data (not push notification), removing it")
724727
p.Remove(ctx, cn, err)
725728
}
726729
// It's a push notification, allow pooling (client will handle it)
@@ -733,7 +736,7 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
733736
if hookManager != nil {
734737
shouldPool, shouldRemove, err = hookManager.ProcessOnPut(ctx, cn)
735738
if err != nil {
736-
internal.Logger.Printf(ctx, "Connection hook error: %v", err)
739+
p.logger().Errorf(ctx, "Connection hook error: %v", err)
737740
p.Remove(ctx, cn, err)
738741
return
739742
}
@@ -835,7 +838,7 @@ func (p *ConnPool) removeConn(cn *Conn) {
835838
// this can be idle conn
836839
for idx, ic := range p.idleConns {
837840
if ic.GetID() == cid {
838-
internal.Logger.Printf(context.Background(), "redis: connection pool: removing idle conn[%d]", cid)
841+
p.logger().Errorf(context.Background(), "redis: connection pool: removing idle conn[%d]", cid)
839842
p.idleConns = append(p.idleConns[:idx], p.idleConns[idx+1:]...)
840843
p.idleConnsLen.Add(-1)
841844
break
@@ -951,7 +954,7 @@ func (p *ConnPool) isHealthyConn(cn *Conn, now time.Time) bool {
951954
if replyType, err := cn.rd.PeekReplyType(); err == nil && replyType == proto.RespPush {
952955
// For RESP3 connections with push notifications, we allow some buffered data
953956
// The client will process these notifications before using the connection
954-
internal.Logger.Printf(context.Background(), "push: conn[%d] has buffered data, likely push notifications - will be processed by client", cn.GetID())
957+
p.logger().Infof(context.Background(), "push: conn[%d] has buffered data, likely push notifications - will be processed by client", cn.GetID())
955958
return true // Connection is healthy, client will handle notifications
956959
}
957960
return false // Unexpected data, not push notifications, connection is unhealthy
@@ -961,3 +964,11 @@ func (p *ConnPool) isHealthyConn(cn *Conn, now time.Time) bool {
961964
}
962965
return true
963966
}
967+
968+
func (p *ConnPool) logger() internal.LoggerWithLevel {
969+
if p.cfg.Logger != nil {
970+
return p.cfg.Logger
971+
}
972+
973+
return internal.LegacyLoggerWithLevel
974+
}

logging/logging.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,3 +89,4 @@ func (l *filterLogger) Printf(ctx context.Context, format string, v ...interface
8989
return
9090
}
9191
}
92+

maintnotifications/circuit_breaker.go

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,7 @@ func (cb *CircuitBreaker) Execute(fn func() error) error {
102102
if cb.state.CompareAndSwap(int32(CircuitBreakerOpen), int32(CircuitBreakerHalfOpen)) {
103103
cb.requests.Store(0)
104104
cb.successes.Store(0)
105-
if internal.LogLevel.InfoOrAbove() {
106-
internal.Logger.Printf(context.Background(), logs.CircuitBreakerTransitioningToHalfOpen(cb.endpoint))
107-
}
105+
cb.logger().Infof(context.Background(), logs.CircuitBreakerTransitioningToHalfOpen(cb.endpoint))
108106
// Fall through to half-open logic
109107
} else {
110108
return ErrCircuitBreakerOpen
@@ -144,17 +142,13 @@ func (cb *CircuitBreaker) recordFailure() {
144142
case CircuitBreakerClosed:
145143
if failures >= int64(cb.failureThreshold) {
146144
if cb.state.CompareAndSwap(int32(CircuitBreakerClosed), int32(CircuitBreakerOpen)) {
147-
if internal.LogLevel.WarnOrAbove() {
148-
internal.Logger.Printf(context.Background(), logs.CircuitBreakerOpened(cb.endpoint, failures))
149-
}
145+
cb.logger().Warnf(context.Background(), logs.CircuitBreakerOpened(cb.endpoint, failures))
150146
}
151147
}
152148
case CircuitBreakerHalfOpen:
153149
// Any failure in half-open state immediately opens the circuit
154150
if cb.state.CompareAndSwap(int32(CircuitBreakerHalfOpen), int32(CircuitBreakerOpen)) {
155-
if internal.LogLevel.WarnOrAbove() {
156-
internal.Logger.Printf(context.Background(), logs.CircuitBreakerReopened(cb.endpoint))
157-
}
151+
cb.logger().Warnf(context.Background(), logs.CircuitBreakerReopened(cb.endpoint))
158152
}
159153
}
160154
}
@@ -176,9 +170,7 @@ func (cb *CircuitBreaker) recordSuccess() {
176170
if successes >= int64(cb.maxRequests) {
177171
if cb.state.CompareAndSwap(int32(CircuitBreakerHalfOpen), int32(CircuitBreakerClosed)) {
178172
cb.failures.Store(0)
179-
if internal.LogLevel.InfoOrAbove() {
180-
internal.Logger.Printf(context.Background(), logs.CircuitBreakerClosed(cb.endpoint, successes))
181-
}
173+
cb.logger().Infof(context.Background(), logs.CircuitBreakerClosed(cb.endpoint, successes))
182174
}
183175
}
184176
}
@@ -202,6 +194,13 @@ func (cb *CircuitBreaker) GetStats() CircuitBreakerStats {
202194
}
203195
}
204196

197+
func (cb *CircuitBreaker) logger() internal.LoggerWithLevel {
198+
if cb.config != nil && cb.config.Logger != nil {
199+
return cb.config.Logger
200+
}
201+
return internal.LegacyLoggerWithLevel
202+
}
203+
205204
// CircuitBreakerStats provides statistics about a circuit breaker
206205
type CircuitBreakerStats struct {
207206
Endpoint string
@@ -325,8 +324,8 @@ func (cbm *CircuitBreakerManager) cleanup() {
325324
}
326325

327326
// Log cleanup results
328-
if len(toDelete) > 0 && internal.LogLevel.InfoOrAbove() {
329-
internal.Logger.Printf(context.Background(), logs.CircuitBreakerCleanup(len(toDelete), count))
327+
if len(toDelete) > 0 {
328+
cbm.logger().Infof(context.Background(), logs.CircuitBreakerCleanup(len(toDelete), count))
330329
}
331330

332331
cbm.lastCleanup.Store(now.Unix())
@@ -351,3 +350,10 @@ func (cbm *CircuitBreakerManager) Reset() {
351350
return true
352351
})
353352
}
353+
354+
func (cbm *CircuitBreakerManager) logger() internal.LoggerWithLevel {
355+
if cbm.config != nil && cbm.config.Logger != nil {
356+
return cbm.config.Logger
357+
}
358+
return internal.LegacyLoggerWithLevel
359+
}

maintnotifications/config.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ type Config struct {
128128
// After this many retries, the connection will be removed from the pool.
129129
// Default: 3
130130
MaxHandoffRetries int
131+
132+
// Logger is an optional custom logger for maintenance notifications.
133+
Logger internal.LoggerWithLevel
131134
}
132135

133136
func (c *Config) IsEnabled() bool {
@@ -312,10 +315,9 @@ func (c *Config) ApplyDefaultsWithPoolConfig(poolSize int, maxActiveConns int) *
312315
result.CircuitBreakerMaxRequests = c.CircuitBreakerMaxRequests
313316
}
314317

315-
if internal.LogLevel.DebugOrAbove() {
316-
internal.Logger.Printf(context.Background(), logs.DebugLoggingEnabled())
317-
internal.Logger.Printf(context.Background(), logs.ConfigDebug(result))
318-
}
318+
c.logger().Debugf(context.Background(), logs.DebugLoggingEnabled())
319+
c.logger().Debugf(context.Background(), logs.ConfigDebug(result))
320+
319321
return result
320322
}
321323

@@ -341,6 +343,8 @@ func (c *Config) Clone() *Config {
341343

342344
// Configuration fields
343345
MaxHandoffRetries: c.MaxHandoffRetries,
346+
347+
Logger: c.Logger,
344348
}
345349
}
346350

@@ -365,6 +369,13 @@ func (c *Config) applyWorkerDefaults(poolSize int) {
365369
}
366370
}
367371

372+
func (c *Config) logger() internal.LoggerWithLevel {
373+
if c.Logger != nil {
374+
return c.Logger
375+
}
376+
return internal.LegacyLoggerWithLevel
377+
}
378+
368379
// DetectEndpointType automatically detects the appropriate endpoint type
369380
// based on the connection address and TLS configuration.
370381
//

0 commit comments

Comments
 (0)