Skip to content

Commit

Permalink
perf(logstream): remove llsnList from replicateTask
Browse files Browse the repository at this point in the history
Removed the llsnList from the replicateTask to improve performance and simplify
the code. The beginLLSN field is now used instead. This change eliminates the
need to fill llsnList, resulting in a slight performance improvement.
  • Loading branch information
ijsong committed Feb 21, 2025
1 parent d694a78 commit 0edb558
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 52 deletions.
2 changes: 1 addition & 1 deletion internal/storagenode/logstream/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (lse *Executor) prepareAppendContext(dataBatch [][]byte, apc *appendContext
// replicate tasks
st.rts = newReplicateTaskSlice()
for i := 0; i < numBackups; i++ {
rt := newReplicateTask(len(dataBatch))
rt := newReplicateTask()
rt.tpid = lse.tpid
rt.lsid = lse.lsid
rt.dataList = dataBatch
Expand Down
5 changes: 1 addition & 4 deletions internal/storagenode/logstream/replicate_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,9 @@ func (rc *replicateClient) sendLoop(ctx context.Context) {

// sendLoopInternal sends a replicate task to the backup replica.
func (rc *replicateClient) sendLoopInternal(_ context.Context, rt *replicateTask, req *snpb.ReplicateRequest) error {
// Remove maxAppendSubBatchSize, since rt already has batched data.
startTime := time.Now()
req.Data = rt.dataList
if len(rt.llsnList) > 0 {
req.BeginLLSN = rt.llsnList[0]
}
req.BeginLLSN = rt.beginLLSN
rt.release()
err := rc.streamClient.Send(req)
inflight := rc.inflight.Add(-1)
Expand Down
10 changes: 5 additions & 5 deletions internal/storagenode/logstream/replicate_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ func TestReplicateClientRPCError(t *testing.T) {
rc.queue = make(chan *replicateTask, 1)
rc.streamClient = mockStreamClient

rt := newReplicateTask(1)
rt := newReplicateTask()
rt.tpid = lse.tpid
rt.lsid = lse.lsid
rt.llsnList = append(rt.llsnList, 1)
rt.beginLLSN = 1
rt.dataList = [][]byte{nil}

err := rc.send(context.Background(), rt)
Expand Down Expand Up @@ -156,7 +156,7 @@ func TestReplicateClientDrain(t *testing.T) {
rc.queue = make(chan *replicateTask, numTasks)

for i := 0; i < numTasks; i++ {
rt := newReplicateTask(1)
rt := newReplicateTask()
err := rc.send(context.Background(), rt)
assert.NoError(t, err)
}
Expand Down Expand Up @@ -206,10 +206,10 @@ func TestReplicateClient(t *testing.T) {
assert.NoError(t, err)
defer rc.stop()

rt := newReplicateTask(1)
rt := newReplicateTask()
rt.tpid = lse.tpid
rt.lsid = lse.lsid
rt.llsnList = append(rt.llsnList, 1)
rt.beginLLSN = 1
rt.dataList = [][]byte{nil}
err = rc.send(context.Background(), rt)
assert.NoError(t, err)
Expand Down
36 changes: 11 additions & 25 deletions internal/storagenode/logstream/replicate_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,20 @@ import (

// replicateTask is a task struct including a list of LLSNs and bytes of data.
type replicateTask struct {
tpid types.TopicID
lsid types.LogStreamID
llsnList []types.LLSN
dataList [][]byte
tpid types.TopicID
lsid types.LogStreamID
beginLLSN types.LLSN
dataList [][]byte
}

// newReplicateTask returns a new replicateTask. The capacity of the returned
// replicateTask's llsnList is equal to or greater than the argument size, and
// its length is zero.
// Since (snpb.ReplicateRequest).LLSN is deprecated, (*replicateTask).llsnList
// will be deprecated soon. Until that, newReplicateTask simplifies the pool
// management of replicateTask.
func newReplicateTask(size int) *replicateTask {
return defaultReplicateTaskPool.get(size)
// newReplicateTask returns a new replicateTask.
func newReplicateTask() *replicateTask {
return defaultReplicateTaskPool.get()
}

// release relreases the task to the pool.
func (rt *replicateTask) release() {
rt.tpid = 0
rt.lsid = 0
rt.llsnList = rt.llsnList[0:0]
rt.dataList = nil
*rt = replicateTask{}
defaultReplicateTaskPool.put(rt)
}

Expand All @@ -47,18 +39,12 @@ type replicateTaskPool struct {

var defaultReplicateTaskPool replicateTaskPool

func (p *replicateTaskPool) get(size int) *replicateTask {
func (p *replicateTaskPool) get() *replicateTask {
rt, ok := p.pool.Get().(*replicateTask)
if ok && cap(rt.llsnList) >= size {
rt.llsnList = rt.llsnList[0:0]
return rt
}
if ok {
p.pool.Put(rt)
}
return &replicateTask{
llsnList: make([]types.LLSN, 0, size),
return rt
}
return &replicateTask{}
}

func (p *replicateTaskPool) put(rt *replicateTask) {
Expand Down
16 changes: 2 additions & 14 deletions internal/storagenode/logstream/replicate_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,13 @@ package logstream

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestReplicateTaskPools(t *testing.T) {
const repeatCount = 1000
lengthList := []int{
1 << 4,
1 << 6,
1 << 8,
1 << 10,
}

for range repeatCount {
for _, length := range lengthList {
rt2 := newReplicateTask(length)
assert.Empty(t, rt2.llsnList)
assert.GreaterOrEqual(t, cap(rt2.llsnList), length)
rt2.release()
}
rt2 := newReplicateTask()
rt2.release()
}
}
8 changes: 5 additions & 3 deletions internal/storagenode/logstream/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,11 @@ func (sq *sequencer) sequenceLoopInternal(ctx context.Context, st *sequenceTask)
if ce := sq.logger.Check(zap.DebugLevel, "sequencer: issued llsn"); ce != nil {
ce.Write(zap.Uint64("llsn", uint64(sq.llsn)))
}
for replicaIdx := 0; replicaIdx < len(st.rts.tasks); replicaIdx++ {
// NOTE: Use "append" since the length of st.rts is not enough to use index. Its capacity is enough because it is created to be reused.
st.rts.tasks[replicaIdx].llsnList = append(st.rts.tasks[replicaIdx].llsnList, sq.llsn)
// NOTE: If we guarantee len(st.awgs) is positive, it can be moved onto for loop.
if dataIdx == 0 {
for replicaIdx := 0; replicaIdx < len(st.rts.tasks); replicaIdx++ {
st.rts.tasks[replicaIdx].beginLLSN = sq.llsn
}
}
//nolint:staticcheck
if err := st.wb.Set(sq.llsn, st.dataBatch[dataIdx]); err != nil {
Expand Down

0 comments on commit 0edb558

Please sign in to comment.