From 0edb558417f678cbfdf1a8c174e514c42db711e3 Mon Sep 17 00:00:00 2001 From: Injun Song Date: Thu, 6 Feb 2025 15:08:19 +0900 Subject: [PATCH] perf(logstream): remove llsnList from replicateTask Removed the llsnList from the replicateTask to improve performance and simplify the code. The beginLLSN field is now used instead. This change eliminates the need to fill llsnList, resulting in a slight performance improvement. --- internal/storagenode/logstream/append.go | 2 +- .../storagenode/logstream/replicate_client.go | 5 +-- .../logstream/replicate_client_test.go | 10 +++--- .../storagenode/logstream/replicate_task.go | 36 ++++++------------- .../logstream/replicate_task_test.go | 16 ++------- internal/storagenode/logstream/sequencer.go | 8 +++-- 6 files changed, 25 insertions(+), 52 deletions(-) diff --git a/internal/storagenode/logstream/append.go b/internal/storagenode/logstream/append.go index e4d7a2599..9fecf42cd 100644 --- a/internal/storagenode/logstream/append.go +++ b/internal/storagenode/logstream/append.go @@ -238,7 +238,7 @@ func (lse *Executor) prepareAppendContext(dataBatch [][]byte, apc *appendContext // replicate tasks st.rts = newReplicateTaskSlice() for i := 0; i < numBackups; i++ { - rt := newReplicateTask(len(dataBatch)) + rt := newReplicateTask() rt.tpid = lse.tpid rt.lsid = lse.lsid rt.dataList = dataBatch diff --git a/internal/storagenode/logstream/replicate_client.go b/internal/storagenode/logstream/replicate_client.go index 6a3ff58e6..1a1069780 100644 --- a/internal/storagenode/logstream/replicate_client.go +++ b/internal/storagenode/logstream/replicate_client.go @@ -128,12 +128,9 @@ func (rc *replicateClient) sendLoop(ctx context.Context) { // sendLoopInternal sends a replicate task to the backup replica. func (rc *replicateClient) sendLoopInternal(_ context.Context, rt *replicateTask, req *snpb.ReplicateRequest) error { - // Remove maxAppendSubBatchSize, since rt already has batched data. startTime := time.Now() req.Data = rt.dataList - if len(rt.llsnList) > 0 { - req.BeginLLSN = rt.llsnList[0] - } + req.BeginLLSN = rt.beginLLSN rt.release() err := rc.streamClient.Send(req) inflight := rc.inflight.Add(-1) diff --git a/internal/storagenode/logstream/replicate_client_test.go b/internal/storagenode/logstream/replicate_client_test.go index 2c8bf70ce..c518b4ad4 100644 --- a/internal/storagenode/logstream/replicate_client_test.go +++ b/internal/storagenode/logstream/replicate_client_test.go @@ -123,10 +123,10 @@ func TestReplicateClientRPCError(t *testing.T) { rc.queue = make(chan *replicateTask, 1) rc.streamClient = mockStreamClient - rt := newReplicateTask(1) + rt := newReplicateTask() rt.tpid = lse.tpid rt.lsid = lse.lsid - rt.llsnList = append(rt.llsnList, 1) + rt.beginLLSN = 1 rt.dataList = [][]byte{nil} err := rc.send(context.Background(), rt) @@ -156,7 +156,7 @@ func TestReplicateClientDrain(t *testing.T) { rc.queue = make(chan *replicateTask, numTasks) for i := 0; i < numTasks; i++ { - rt := newReplicateTask(1) + rt := newReplicateTask() err := rc.send(context.Background(), rt) assert.NoError(t, err) } @@ -206,10 +206,10 @@ func TestReplicateClient(t *testing.T) { assert.NoError(t, err) defer rc.stop() - rt := newReplicateTask(1) + rt := newReplicateTask() rt.tpid = lse.tpid rt.lsid = lse.lsid - rt.llsnList = append(rt.llsnList, 1) + rt.beginLLSN = 1 rt.dataList = [][]byte{nil} err = rc.send(context.Background(), rt) assert.NoError(t, err) diff --git a/internal/storagenode/logstream/replicate_task.go b/internal/storagenode/logstream/replicate_task.go index a0922bc55..445e1e9f4 100644 --- a/internal/storagenode/logstream/replicate_task.go +++ b/internal/storagenode/logstream/replicate_task.go @@ -8,28 +8,20 @@ import ( // replicateTask is a task struct including a list of LLSNs and bytes of data. type replicateTask struct { - tpid types.TopicID - lsid types.LogStreamID - llsnList []types.LLSN - dataList [][]byte + tpid types.TopicID + lsid types.LogStreamID + beginLLSN types.LLSN + dataList [][]byte } -// newReplicateTask returns a new replicateTask. The capacity of the returned -// replicateTask's llsnList is equal to or greater than the argument size, and -// its length is zero. -// Since (snpb.ReplicateRequest).LLSN is deprecated, (*replicateTask).llsnList -// will be deprecated soon. Until that, newReplicateTask simplifies the pool -// management of replicateTask. -func newReplicateTask(size int) *replicateTask { - return defaultReplicateTaskPool.get(size) +// newReplicateTask returns a new replicateTask. +func newReplicateTask() *replicateTask { + return defaultReplicateTaskPool.get() } // release relreases the task to the pool. func (rt *replicateTask) release() { - rt.tpid = 0 - rt.lsid = 0 - rt.llsnList = rt.llsnList[0:0] - rt.dataList = nil + *rt = replicateTask{} defaultReplicateTaskPool.put(rt) } @@ -47,18 +39,12 @@ type replicateTaskPool struct { var defaultReplicateTaskPool replicateTaskPool -func (p *replicateTaskPool) get(size int) *replicateTask { +func (p *replicateTaskPool) get() *replicateTask { rt, ok := p.pool.Get().(*replicateTask) - if ok && cap(rt.llsnList) >= size { - rt.llsnList = rt.llsnList[0:0] - return rt - } if ok { - p.pool.Put(rt) - } - return &replicateTask{ - llsnList: make([]types.LLSN, 0, size), + return rt } + return &replicateTask{} } func (p *replicateTaskPool) put(rt *replicateTask) { diff --git a/internal/storagenode/logstream/replicate_task_test.go b/internal/storagenode/logstream/replicate_task_test.go index 80a823a5b..066e603f6 100644 --- a/internal/storagenode/logstream/replicate_task_test.go +++ b/internal/storagenode/logstream/replicate_task_test.go @@ -2,25 +2,13 @@ package logstream import ( "testing" - - "github.com/stretchr/testify/assert" ) func TestReplicateTaskPools(t *testing.T) { const repeatCount = 1000 - lengthList := []int{ - 1 << 4, - 1 << 6, - 1 << 8, - 1 << 10, - } for range repeatCount { - for _, length := range lengthList { - rt2 := newReplicateTask(length) - assert.Empty(t, rt2.llsnList) - assert.GreaterOrEqual(t, cap(rt2.llsnList), length) - rt2.release() - } + rt2 := newReplicateTask() + rt2.release() } } diff --git a/internal/storagenode/logstream/sequencer.go b/internal/storagenode/logstream/sequencer.go index 9c2cb6bf4..7e7c9355f 100644 --- a/internal/storagenode/logstream/sequencer.go +++ b/internal/storagenode/logstream/sequencer.go @@ -108,9 +108,11 @@ func (sq *sequencer) sequenceLoopInternal(ctx context.Context, st *sequenceTask) if ce := sq.logger.Check(zap.DebugLevel, "sequencer: issued llsn"); ce != nil { ce.Write(zap.Uint64("llsn", uint64(sq.llsn))) } - for replicaIdx := 0; replicaIdx < len(st.rts.tasks); replicaIdx++ { - // NOTE: Use "append" since the length of st.rts is not enough to use index. Its capacity is enough because it is created to be reused. - st.rts.tasks[replicaIdx].llsnList = append(st.rts.tasks[replicaIdx].llsnList, sq.llsn) + // NOTE: If we guarantee len(st.awgs) is positive, it can be moved onto for loop. + if dataIdx == 0 { + for replicaIdx := 0; replicaIdx < len(st.rts.tasks); replicaIdx++ { + st.rts.tasks[replicaIdx].beginLLSN = sq.llsn + } } //nolint:staticcheck if err := st.wb.Set(sq.llsn, st.dataBatch[dataIdx]); err != nil {