diff --git a/internal/storagenode/logstream/committer.go b/internal/storagenode/logstream/committer.go index 966889858..d7796ecab 100644 --- a/internal/storagenode/logstream/committer.go +++ b/internal/storagenode/logstream/committer.go @@ -64,10 +64,12 @@ func (cm *committer) sendCommitWaitTask(_ context.Context, cwts *listQueue) (err if err != nil { inflight = atomic.AddInt64(&cm.inflightCommitWait, -int64(cnt)) } - cm.logger.Debug("send committer commit wait tasks", - zap.Int64("inflight", inflight), - zap.Error(err), - ) + if ce := cm.logger.Check(zap.DebugLevel, "send committer commit wait tasks"); ce != nil { + ce.Write( + zap.Int64("inflight", inflight), + zap.Error(err), + ) + } }() switch cm.lse.esm.load() { @@ -143,17 +145,21 @@ func (cm *committer) commitLoopInternal(ctx context.Context, ct *commitTask) { // TODO: Move these condition expressions to `internal/storagenode/logstream.(*committer).commit` method. commitVersion, _, _, invalid := cm.lse.lsc.reportCommitBase() if ct.stale(commitVersion) { - cm.logger.Debug("discard a stale commit message", - zap.Any("replica", commitVersion), - zap.Any("commit", ct.version), - ) + if ce := cm.logger.Check(zap.DebugLevel, "discard a stale commit message"); ce != nil { + ce.Write( + zap.Any("replica", commitVersion), + zap.Any("commit", ct.version), + ) + } return } if invalid { // Synchronization should fix this invalid replica status // caused by the inconsistency between the commit context and // the last log entry. - cm.logger.Debug("discard a commit message due to invalid replica status") + if ce := cm.logger.Check(zap.DebugLevel, "discard a commit message due to invalid replica status"); ce != nil { + ce.Write() + } return } @@ -323,10 +329,12 @@ func (cm *committer) commitInternal(cc storage.CommitContext, requireCommitWaitT // drainCommitWaitQ drains the commit wait tasks in commitWaitQ. func (cm *committer) drainCommitWaitQ(cause error) { - cm.logger.Debug("draining commit wait tasks", - zap.Int64("inflight", atomic.LoadInt64(&cm.inflightCommitWait)), - zap.Error(cause), - ) + if ce := cm.logger.Check(zap.DebugLevel, "draining commit wait tasks"); ce != nil { + ce.Write( + zap.Int64("inflight", atomic.LoadInt64(&cm.inflightCommitWait)), + zap.Error(cause), + ) + } for atomic.LoadInt64(&cm.inflightCommitWait) > 0 { cwt := cm.commitWaitQ.pop() @@ -336,7 +344,9 @@ func (cm *committer) drainCommitWaitQ(cause error) { cwt.awg.commitDone(cause) cwt.release() inflight := atomic.AddInt64(&cm.inflightCommitWait, -1) - cm.logger.Debug("discard a commit wait task", zap.Int64("inflight", inflight)) + if ce := cm.logger.Check(zap.DebugLevel, "discard a commit wait task"); ce != nil { + ce.Write(zap.Int64("inflight", inflight)) + } } } diff --git a/internal/storagenode/logstream/executor.go b/internal/storagenode/logstream/executor.go index 5f3727164..0ccb41c25 100644 --- a/internal/storagenode/logstream/executor.go +++ b/internal/storagenode/logstream/executor.go @@ -344,10 +344,12 @@ func (lse *Executor) Unseal(_ context.Context, replicas []varlogpb.LogStreamRepl } func (lse *Executor) resetInternalState(lastCommittedLLSN types.LLSN, discardCommitWaitTasks bool) { - lse.logger.Debug("resetting internal state", - zap.Int64("inflight", atomic.LoadInt64(&lse.inflight)), - zap.Int64("inflight_append", atomic.LoadInt64(&lse.inflightAppend)), - ) + if ce := lse.logger.Check(zap.DebugLevel, "resetting internal state"); ce != nil { + ce.Write( + zap.Int64("inflight", atomic.LoadInt64(&lse.inflight)), + zap.Int64("inflight_append", atomic.LoadInt64(&lse.inflightAppend)), + ) + } // close replicClients in replica connector lse.rcs.close() @@ -406,7 +408,9 @@ func (lse *Executor) Report(_ context.Context) (report snpb.LogStreamUncommitRep } prevUncommittedLLSNEnd := lse.prevUncommittedLLSNEnd.Load() if prevUncommittedLLSNEnd != uncommittedLLSNEnd { - lse.logger.Debug("log stream: report", zap.Any("report", report)) + if ce := lse.logger.Check(zap.DebugLevel, "log stream: report"); ce != nil { + ce.Write(zap.Any("report", report)) + } lse.prevUncommittedLLSNEnd.Store(uncommittedLLSNEnd) } @@ -433,7 +437,9 @@ func (lse *Executor) Commit(ctx context.Context, commitResult snpb.LogStreamComm } if types.Version(atomic.LoadUint64(&lse.prevCommitVersion)) != commitResult.Version { - lse.logger.Debug("commit", zap.String("commit_result", commitResult.String())) + if ce := lse.logger.Check(zap.DebugLevel, "commit"); ce != nil { + ce.Write(zap.String("commit_result", commitResult.String())) + } atomic.StoreUint64(&lse.prevCommitVersion, uint64(commitResult.Version)) } diff --git a/internal/storagenode/logstream/replicate_client.go b/internal/storagenode/logstream/replicate_client.go index 1819d5f32..b5490bd50 100644 --- a/internal/storagenode/logstream/replicate_client.go +++ b/internal/storagenode/logstream/replicate_client.go @@ -74,10 +74,12 @@ func (rc *replicateClient) send(ctx context.Context, rt *replicateTask) (err err if err != nil { inflight = atomic.AddInt64(&rc.inflight, -1) } - rc.logger.Debug("sent replicate client a task", - zap.Int64("inflight", inflight), - zap.Error(err), - ) + if ce := rc.logger.Check(zap.DebugLevel, "sent replicate client a task"); ce != nil { + ce.Write( + zap.Int64("inflight", inflight), + zap.Error(err), + ) + } }() switch rc.lse.esm.load() { diff --git a/internal/storagenode/logstream/sequencer.go b/internal/storagenode/logstream/sequencer.go index 149be111f..c7c6a633c 100644 --- a/internal/storagenode/logstream/sequencer.go +++ b/internal/storagenode/logstream/sequencer.go @@ -47,10 +47,12 @@ func (sq *sequencer) send(ctx context.Context, st *sequenceTask) (err error) { if err != nil { inflight = atomic.AddInt64(&sq.inflight, -1) } - sq.logger.Debug("sent seqeuencer a task", - zap.Int64("inflight", inflight), - zap.Error(err), - ) + if ce := sq.logger.Check(zap.DebugLevel, "sent seqeuencer a task"); ce != nil { + ce.Write( + zap.Int64("inflight", inflight), + zap.Error(err), + ) + } }() switch sq.lse.esm.load() { @@ -103,7 +105,9 @@ func (sq *sequencer) sequenceLoopInternal(ctx context.Context, st *sequenceTask) for dataIdx := 0; dataIdx < len(st.awgs); dataIdx++ { sq.llsn++ st.awgs[dataIdx].setLLSN(sq.llsn) - sq.logger.Debug("sequencer: issued llsn", zap.Uint64("llsn", uint64(sq.llsn))) + if ce := sq.logger.Check(zap.DebugLevel, "sequencer: issued llsn"); ce != nil { + ce.Write(zap.Uint64("llsn", uint64(sq.llsn))) + } for replicaIdx := 0; replicaIdx < len(st.rts); replicaIdx++ { // NOTE: Use "append" since the length of st.rts is not enough to use index. Its capacity is enough because it is created to be reused. st.rts[replicaIdx].llsnList = append(st.rts[replicaIdx].llsnList, sq.llsn) @@ -183,10 +187,12 @@ func (sq *sequencer) waitForDrainage(cause error, forceDrain bool) { timer := time.NewTimer(tick) defer timer.Stop() - sq.logger.Debug("draining sequencer tasks", - zap.Int64("inflight", atomic.LoadInt64(&sq.inflight)), - zap.Error(cause), - ) + if ce := sq.logger.Check(zap.DebugLevel, "draining sequencer tasks"); ce != nil { + ce.Write( + zap.Int64("inflight", atomic.LoadInt64(&sq.inflight)), + zap.Error(cause), + ) + } for atomic.LoadInt64(&sq.inflight) > 0 { if !forceDrain { diff --git a/internal/storagenode/logstream/sync.go b/internal/storagenode/logstream/sync.go index 4a42bcf70..e3b515aba 100644 --- a/internal/storagenode/logstream/sync.go +++ b/internal/storagenode/logstream/sync.go @@ -504,7 +504,9 @@ func (lse *Executor) SyncReplicate(_ context.Context, srcReplica varlogpb.LogStr if err != nil { return err } - lse.logger.Info("log stream: sync replicate: copy", zap.String("log entry", entry.String())) + if ce := lse.logger.Check(zap.DebugLevel, "log stream: sync replicate: copy"); ce != nil { + ce.Write(zap.String("log entry", entry.String())) + } uncommittedLLSNBegin = entry.LLSN + 1 uncommittedGLSNBegin = entry.GLSN + 1 lem = &varlogpb.LogEntryMeta{ @@ -531,7 +533,9 @@ func (lse *Executor) SyncReplicate(_ context.Context, srcReplica varlogpb.LogStr if err != nil { return err } - lse.logger.Info("log stream: sync replicate: copy", zap.String("commit context", cc.String())) + if ce := lse.logger.Check(zap.DebugLevel, "log stream: sync replicate: copy"); ce != nil { + ce.Write(zap.String("commit context", cc.String())) + } ver = cc.Version hwm = cc.HighWatermark diff --git a/internal/storagenode/logstream/writer.go b/internal/storagenode/logstream/writer.go index 34a9f8bbd..8161340ab 100644 --- a/internal/storagenode/logstream/writer.go +++ b/internal/storagenode/logstream/writer.go @@ -42,10 +42,12 @@ func (w *writer) send(ctx context.Context, st *sequenceTask) (err error) { if err != nil { inflight = atomic.AddInt64(&w.inflight, -1) } - w.logger.Debug("sent writer a task", - zap.Int64("inflight", inflight), - zap.Error(err), - ) + if ce := w.logger.Check(zap.DebugLevel, "sent writer a task"); ce != nil { + ce.Write( + zap.Int64("inflight", inflight), + zap.Error(err), + ) + } }() switch w.lse.esm.load() { @@ -131,10 +133,12 @@ func (w *writer) waitForDrainage(cause error, forceDrain bool) { timer := time.NewTimer(tick) defer timer.Stop() - w.logger.Debug("draining writer tasks", - zap.Int64("inflight", atomic.LoadInt64(&w.inflight)), - zap.Error(cause), - ) + if ce := w.logger.Check(zap.DebugLevel, "draining writer tasks"); ce != nil { + ce.Write( + zap.Int64("inflight", atomic.LoadInt64(&w.inflight)), + zap.Error(cause), + ) + } for atomic.LoadInt64(&w.inflight) > 0 { if !forceDrain {