From 37250e1670831baa7d472e3c4832c4f21701944c Mon Sep 17 00:00:00 2001 From: Injun Song Date: Thu, 13 Apr 2023 18:48:42 +0900 Subject: [PATCH] perf(storagenode): wrap replicateTask slice with struct The `replicateTaskSlicePool` pooled slices of `replicateTask`. The function `releaseReplicateTaskSlice` received borrowed slice and then put it into `replicateTaskSlicePool`. However, the `sync.(Pool).Put` parameter is an interface, so the slice escaped to the heap unnecessarily. Not to use unnecessary heap objects, this PR wraps the `replicateTask` slice with the struct replicateTaskSlice. Resolves #75 --- internal/storagenode/logstream/append.go | 4 ++-- .../storagenode/logstream/replicate_task.go | 18 ++++++++++++------ internal/storagenode/logstream/sequencer.go | 16 ++++++++-------- .../storagenode/logstream/sequencer_test.go | 8 ++++++-- 4 files changed, 28 insertions(+), 18 deletions(-) diff --git a/internal/storagenode/logstream/append.go b/internal/storagenode/logstream/append.go index aeb124c76..40ca41aa0 100644 --- a/internal/storagenode/logstream/append.go +++ b/internal/storagenode/logstream/append.go @@ -102,7 +102,7 @@ func (lse *Executor) prepareAppendContextInternal(dataBatch [][]byte, begin, end rt.tpid = lse.tpid rt.lsid = lse.lsid rt.dataList = batchletData - st.rts = append(st.rts, rt) + st.rts.tasks = append(st.rts.tasks, rt) } // write wait group @@ -138,7 +138,7 @@ func (lse *Executor) sendSequenceTasks(ctx context.Context, sts []*sequenceTask) // _ = st.dwb.Close() _ = st.wb.Close() releaseCommitWaitTaskList(st.cwts) - releaseReplicateTasks(st.rts) + releaseReplicateTasks(st.rts.tasks) releaseReplicateTaskSlice(st.rts) st.release() } diff --git a/internal/storagenode/logstream/replicate_task.go b/internal/storagenode/logstream/replicate_task.go index 1bcddc619..f828b2a39 100644 --- a/internal/storagenode/logstream/replicate_task.go +++ b/internal/storagenode/logstream/replicate_task.go @@ -79,17 +79,23 @@ var ( const defaultLengthOfReplicationTaskSlice = 3 +type replicateTaskSlice struct { + tasks []*replicateTask +} + var replicateTaskSlicePool = sync.Pool{ New: func() interface{} { - return make([]*replicateTask, 0, defaultLengthOfReplicationTaskSlice) + return &replicateTaskSlice{ + tasks: make([]*replicateTask, 0, defaultLengthOfReplicationTaskSlice), + } }, } -func newReplicateTaskSlice() []*replicateTask { - return replicateTaskSlicePool.Get().([]*replicateTask) +func newReplicateTaskSlice() *replicateTaskSlice { + return replicateTaskSlicePool.Get().(*replicateTaskSlice) } -func releaseReplicateTaskSlice(rts []*replicateTask) { - rts = rts[0:0] - replicateTaskSlicePool.Put(rts) //nolint:staticcheck +func releaseReplicateTaskSlice(rts *replicateTaskSlice) { + rts.tasks = rts.tasks[0:0] + replicateTaskSlicePool.Put(rts) } diff --git a/internal/storagenode/logstream/sequencer.go b/internal/storagenode/logstream/sequencer.go index 149be111f..b85a04657 100644 --- a/internal/storagenode/logstream/sequencer.go +++ b/internal/storagenode/logstream/sequencer.go @@ -104,9 +104,9 @@ func (sq *sequencer) sequenceLoopInternal(ctx context.Context, st *sequenceTask) sq.llsn++ st.awgs[dataIdx].setLLSN(sq.llsn) sq.logger.Debug("sequencer: issued llsn", zap.Uint64("llsn", uint64(sq.llsn))) - for replicaIdx := 0; replicaIdx < len(st.rts); replicaIdx++ { + for replicaIdx := 0; replicaIdx < len(st.rts.tasks); replicaIdx++ { // NOTE: Use "append" since the length of st.rts is not enough to use index. Its capacity is enough because it is created to be reused. - st.rts[replicaIdx].llsnList = append(st.rts[replicaIdx].llsnList, sq.llsn) + st.rts.tasks[replicaIdx].llsnList = append(st.rts.tasks[replicaIdx].llsnList, sq.llsn) } //nolint:staticcheck if err := st.wb.Set(sq.llsn, st.dataBatch[dataIdx]); err != nil { @@ -141,7 +141,7 @@ func (sq *sequencer) sequenceLoopInternal(ctx context.Context, st *sequenceTask) st.wwg.done(err) _ = st.wb.Close() releaseCommitWaitTaskList(cwts) - releaseReplicateTasks(rts) + releaseReplicateTasks(rts.tasks) releaseReplicateTaskSlice(rts) st.release() return @@ -154,7 +154,7 @@ func (sq *sequencer) sequenceLoopInternal(ctx context.Context, st *sequenceTask) st.wwg.done(err) _ = st.wb.Close() releaseCommitWaitTaskList(cwts) - releaseReplicateTasks(rts) + releaseReplicateTasks(rts.tasks) releaseReplicateTaskSlice(rts) st.release() return @@ -162,8 +162,8 @@ func (sq *sequencer) sequenceLoopInternal(ctx context.Context, st *sequenceTask) // send to replicator ridx := 0 - for ridx < len(rts) { - err := sq.lse.rcs.clients[ridx].send(ctx, rts[ridx]) + for ridx < len(rts.tasks) { + err := sq.lse.rcs.clients[ridx].send(ctx, rts.tasks[ridx]) if err != nil { sq.logger.Error("could not send to replicate client", zap.Error(err)) sq.lse.esm.compareAndSwap(executorStateAppendable, executorStateSealing) @@ -171,7 +171,7 @@ func (sq *sequencer) sequenceLoopInternal(ctx context.Context, st *sequenceTask) } ridx++ } - releaseReplicateTasks(rts[ridx:]) + releaseReplicateTasks(rts.tasks[ridx:]) releaseReplicateTaskSlice(rts) } @@ -248,7 +248,7 @@ type sequenceTask struct { wb *storage.WriteBatch dataBatch [][]byte cwts *listQueue - rts []*replicateTask + rts *replicateTaskSlice } func newSequenceTask() *sequenceTask { diff --git a/internal/storagenode/logstream/sequencer_test.go b/internal/storagenode/logstream/sequencer_test.go index 8ce3928b8..e47c77a48 100644 --- a/internal/storagenode/logstream/sequencer_test.go +++ b/internal/storagenode/logstream/sequencer_test.go @@ -81,6 +81,8 @@ func testSequenceTask(stg *storage.Storage) *sequenceTask { st.cwts = newListQueue() st.cwts.PushFront(newCommitWaitTask(awg)) + st.rts = &replicateTaskSlice{} + return st } @@ -192,8 +194,10 @@ func TestSequencer_FailToSendToReplicateClient(t *testing.T) { } st := testSequenceTask(stg) - st.rts = []*replicateTask{ - {}, + st.rts = &replicateTaskSlice{ + tasks: []*replicateTask{ + {}, + }, } sq.sequenceLoopInternal(context.Background(), st) assert.Len(t, wr.queue, 1)