Skip to content

Commit

Permalink
perf(storagenode): check log level
Browse files Browse the repository at this point in the history
This PR slightly improves the storage node's performance by checking the log level, making the
logger not make slices in advance.
  • Loading branch information
ijsong committed Apr 13, 2023
1 parent 7c81800 commit c6330cd
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 43 deletions.
38 changes: 24 additions & 14 deletions internal/storagenode/logstream/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,12 @@ func (cm *committer) sendCommitWaitTask(_ context.Context, cwts *listQueue) (err
if err != nil {
inflight = atomic.AddInt64(&cm.inflightCommitWait, -int64(cnt))
}
cm.logger.Debug("send committer commit wait tasks",
zap.Int64("inflight", inflight),
zap.Error(err),
)
if ce := cm.logger.Check(zap.DebugLevel, "send committer commit wait tasks"); ce != nil {
ce.Write(
zap.Int64("inflight", inflight),
zap.Error(err),
)
}
}()

switch cm.lse.esm.load() {
Expand Down Expand Up @@ -143,17 +145,21 @@ func (cm *committer) commitLoopInternal(ctx context.Context, ct *commitTask) {
// TODO: Move these condition expressions to `internal/storagenode/logstream.(*committer).commit` method.
commitVersion, _, _, invalid := cm.lse.lsc.reportCommitBase()
if ct.stale(commitVersion) {
cm.logger.Debug("discard a stale commit message",
zap.Any("replica", commitVersion),
zap.Any("commit", ct.version),
)
if ce := cm.logger.Check(zap.DebugLevel, "discard a stale commit message"); ce != nil {
ce.Write(
zap.Any("replica", commitVersion),
zap.Any("commit", ct.version),
)
}
return
}
if invalid {
// Synchronization should fix this invalid replica status
// caused by the inconsistency between the commit context and
// the last log entry.
cm.logger.Debug("discard a commit message due to invalid replica status")
if ce := cm.logger.Check(zap.DebugLevel, "discard a commit message due to invalid replica status"); ce != nil {
ce.Write()
}
return
}

Expand Down Expand Up @@ -323,10 +329,12 @@ func (cm *committer) commitInternal(cc storage.CommitContext, requireCommitWaitT

// drainCommitWaitQ drains the commit wait tasks in commitWaitQ.
func (cm *committer) drainCommitWaitQ(cause error) {
cm.logger.Debug("draining commit wait tasks",
zap.Int64("inflight", atomic.LoadInt64(&cm.inflightCommitWait)),
zap.Error(cause),
)
if ce := cm.logger.Check(zap.DebugLevel, "draining commit wait tasks"); ce != nil {
ce.Write(
zap.Int64("inflight", atomic.LoadInt64(&cm.inflightCommitWait)),
zap.Error(cause),
)
}

for atomic.LoadInt64(&cm.inflightCommitWait) > 0 {
cwt := cm.commitWaitQ.pop()
Expand All @@ -336,7 +344,9 @@ func (cm *committer) drainCommitWaitQ(cause error) {
cwt.awg.commitDone(cause)
cwt.release()
inflight := atomic.AddInt64(&cm.inflightCommitWait, -1)
cm.logger.Debug("discard a commit wait task", zap.Int64("inflight", inflight))
if ce := cm.logger.Check(zap.DebugLevel, "discard a commit wait task"); ce != nil {
ce.Write(zap.Int64("inflight", inflight))
}
}
}

Expand Down
18 changes: 12 additions & 6 deletions internal/storagenode/logstream/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,10 +344,12 @@ func (lse *Executor) Unseal(_ context.Context, replicas []varlogpb.LogStreamRepl
}

func (lse *Executor) resetInternalState(lastCommittedLLSN types.LLSN, discardCommitWaitTasks bool) {
lse.logger.Debug("resetting internal state",
zap.Int64("inflight", atomic.LoadInt64(&lse.inflight)),
zap.Int64("inflight_append", atomic.LoadInt64(&lse.inflightAppend)),
)
if ce := lse.logger.Check(zap.DebugLevel, "resetting internal state"); ce != nil {
ce.Write(
zap.Int64("inflight", atomic.LoadInt64(&lse.inflight)),
zap.Int64("inflight_append", atomic.LoadInt64(&lse.inflightAppend)),
)
}

// close replicClients in replica connector
lse.rcs.close()
Expand Down Expand Up @@ -406,7 +408,9 @@ func (lse *Executor) Report(_ context.Context) (report snpb.LogStreamUncommitRep
}
prevUncommittedLLSNEnd := lse.prevUncommittedLLSNEnd.Load()
if prevUncommittedLLSNEnd != uncommittedLLSNEnd {
lse.logger.Debug("log stream: report", zap.Any("report", report))
if ce := lse.logger.Check(zap.DebugLevel, "log stream: report"); ce != nil {
ce.Write(zap.Any("report", report))
}
lse.prevUncommittedLLSNEnd.Store(uncommittedLLSNEnd)
}

Expand All @@ -433,7 +437,9 @@ func (lse *Executor) Commit(ctx context.Context, commitResult snpb.LogStreamComm
}

if types.Version(atomic.LoadUint64(&lse.prevCommitVersion)) != commitResult.Version {
lse.logger.Debug("commit", zap.String("commit_result", commitResult.String()))
if ce := lse.logger.Check(zap.DebugLevel, "commit"); ce != nil {
ce.Write(zap.String("commit_result", commitResult.String()))
}
atomic.StoreUint64(&lse.prevCommitVersion, uint64(commitResult.Version))
}

Expand Down
10 changes: 6 additions & 4 deletions internal/storagenode/logstream/replicate_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,12 @@ func (rc *replicateClient) send(ctx context.Context, rt *replicateTask) (err err
if err != nil {
inflight = atomic.AddInt64(&rc.inflight, -1)
}
rc.logger.Debug("sent replicate client a task",
zap.Int64("inflight", inflight),
zap.Error(err),
)
if ce := rc.logger.Check(zap.DebugLevel, "sent replicate client a task"); ce != nil {
ce.Write(
zap.Int64("inflight", inflight),
zap.Error(err),
)
}
}()

switch rc.lse.esm.load() {
Expand Down
24 changes: 15 additions & 9 deletions internal/storagenode/logstream/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,12 @@ func (sq *sequencer) send(ctx context.Context, st *sequenceTask) (err error) {
if err != nil {
inflight = atomic.AddInt64(&sq.inflight, -1)
}
sq.logger.Debug("sent seqeuencer a task",
zap.Int64("inflight", inflight),
zap.Error(err),
)
if ce := sq.logger.Check(zap.DebugLevel, "sent seqeuencer a task"); ce != nil {
ce.Write(
zap.Int64("inflight", inflight),
zap.Error(err),
)
}
}()

switch sq.lse.esm.load() {
Expand Down Expand Up @@ -103,7 +105,9 @@ func (sq *sequencer) sequenceLoopInternal(ctx context.Context, st *sequenceTask)
for dataIdx := 0; dataIdx < len(st.awgs); dataIdx++ {
sq.llsn++
st.awgs[dataIdx].setLLSN(sq.llsn)
sq.logger.Debug("sequencer: issued llsn", zap.Uint64("llsn", uint64(sq.llsn)))
if ce := sq.logger.Check(zap.DebugLevel, "sequencer: issued llsn"); ce != nil {
ce.Write(zap.Uint64("llsn", uint64(sq.llsn)))
}
for replicaIdx := 0; replicaIdx < len(st.rts); replicaIdx++ {
// NOTE: Use "append" since the length of st.rts is not enough to use index. Its capacity is enough because it is created to be reused.
st.rts[replicaIdx].llsnList = append(st.rts[replicaIdx].llsnList, sq.llsn)
Expand Down Expand Up @@ -183,10 +187,12 @@ func (sq *sequencer) waitForDrainage(cause error, forceDrain bool) {
timer := time.NewTimer(tick)
defer timer.Stop()

sq.logger.Debug("draining sequencer tasks",
zap.Int64("inflight", atomic.LoadInt64(&sq.inflight)),
zap.Error(cause),
)
if ce := sq.logger.Check(zap.DebugLevel, "draining sequencer tasks"); ce != nil {
ce.Write(
zap.Int64("inflight", atomic.LoadInt64(&sq.inflight)),
zap.Error(cause),
)
}

for atomic.LoadInt64(&sq.inflight) > 0 {
if !forceDrain {
Expand Down
8 changes: 6 additions & 2 deletions internal/storagenode/logstream/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,9 @@ func (lse *Executor) SyncReplicate(_ context.Context, srcReplica varlogpb.LogStr
if err != nil {
return err
}
lse.logger.Info("log stream: sync replicate: copy", zap.String("log entry", entry.String()))
if ce := lse.logger.Check(zap.DebugLevel, "log stream: sync replicate: copy"); ce != nil {
ce.Write(zap.String("log entry", entry.String()))
}
uncommittedLLSNBegin = entry.LLSN + 1
uncommittedGLSNBegin = entry.GLSN + 1
lem = &varlogpb.LogEntryMeta{
Expand All @@ -531,7 +533,9 @@ func (lse *Executor) SyncReplicate(_ context.Context, srcReplica varlogpb.LogStr
if err != nil {
return err
}
lse.logger.Info("log stream: sync replicate: copy", zap.String("commit context", cc.String()))
if ce := lse.logger.Check(zap.DebugLevel, "log stream: sync replicate: copy"); ce != nil {
ce.Write(zap.String("commit context", cc.String()))
}

ver = cc.Version
hwm = cc.HighWatermark
Expand Down
20 changes: 12 additions & 8 deletions internal/storagenode/logstream/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@ func (w *writer) send(ctx context.Context, st *sequenceTask) (err error) {
if err != nil {
inflight = atomic.AddInt64(&w.inflight, -1)
}
w.logger.Debug("sent writer a task",
zap.Int64("inflight", inflight),
zap.Error(err),
)
if ce := w.logger.Check(zap.DebugLevel, "sent writer a task"); ce != nil {
ce.Write(
zap.Int64("inflight", inflight),
zap.Error(err),
)
}
}()

switch w.lse.esm.load() {
Expand Down Expand Up @@ -131,10 +133,12 @@ func (w *writer) waitForDrainage(cause error, forceDrain bool) {
timer := time.NewTimer(tick)
defer timer.Stop()

w.logger.Debug("draining writer tasks",
zap.Int64("inflight", atomic.LoadInt64(&w.inflight)),
zap.Error(cause),
)
if ce := w.logger.Check(zap.DebugLevel, "draining writer tasks"); ce != nil {
ce.Write(
zap.Int64("inflight", atomic.LoadInt64(&w.inflight)),
zap.Error(cause),
)
}

for atomic.LoadInt64(&w.inflight) > 0 {
if !forceDrain {
Expand Down

0 comments on commit c6330cd

Please sign in to comment.