Skip to content

Commit

Permalink
perf(storagenode): wrap replicateTask slice with struct
Browse files Browse the repository at this point in the history
The `replicateTaskSlicePool` pooled slices of `replicateTask`. The function
`releaseReplicateTaskSlice` received borrowed slice and then put it into `replicateTaskSlicePool`.
However, the `sync.(Pool).Put` parameter is an interface, so the slice escaped to the heap
unnecessarily.

Not to use unnecessary heap objects, this PR wraps the `replicateTask` slice with the struct
replicateTaskSlice.

Resolves #75
  • Loading branch information
ijsong committed Apr 13, 2023
1 parent abf2db7 commit bf5fb22
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 18 deletions.
4 changes: 2 additions & 2 deletions internal/storagenode/logstream/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (lse *Executor) prepareAppendContextInternal(dataBatch [][]byte, begin, end
rt.tpid = lse.tpid
rt.lsid = lse.lsid
rt.dataList = batchletData
st.rts = append(st.rts, rt)
st.rts.tasks = append(st.rts.tasks, rt)
}

// write wait group
Expand Down Expand Up @@ -138,7 +138,7 @@ func (lse *Executor) sendSequenceTasks(ctx context.Context, sts []*sequenceTask)
// _ = st.dwb.Close()
_ = st.wb.Close()
releaseCommitWaitTaskList(st.cwts)
releaseReplicateTasks(st.rts)
releaseReplicateTasks(st.rts.tasks)
releaseReplicateTaskSlice(st.rts)
st.release()
}
Expand Down
18 changes: 12 additions & 6 deletions internal/storagenode/logstream/replicate_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,23 @@ var (

const defaultLengthOfReplicationTaskSlice = 3

type replicateTaskSlice struct {
tasks []*replicateTask
}

var replicateTaskSlicePool = sync.Pool{
New: func() interface{} {
return make([]*replicateTask, 0, defaultLengthOfReplicationTaskSlice)
return &replicateTaskSlice{
tasks: make([]*replicateTask, 0, defaultLengthOfReplicationTaskSlice),
}
},
}

func newReplicateTaskSlice() []*replicateTask {
return replicateTaskSlicePool.Get().([]*replicateTask)
func newReplicateTaskSlice() *replicateTaskSlice {
return replicateTaskSlicePool.Get().(*replicateTaskSlice)
}

func releaseReplicateTaskSlice(rts []*replicateTask) {
rts = rts[0:0]
replicateTaskSlicePool.Put(rts) //nolint:staticcheck
func releaseReplicateTaskSlice(rts *replicateTaskSlice) {
rts.tasks = rts.tasks[0:0]
replicateTaskSlicePool.Put(rts)
}
16 changes: 8 additions & 8 deletions internal/storagenode/logstream/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ func (sq *sequencer) sequenceLoopInternal(ctx context.Context, st *sequenceTask)
sq.llsn++
st.awgs[dataIdx].setLLSN(sq.llsn)
sq.logger.Debug("sequencer: issued llsn", zap.Uint64("llsn", uint64(sq.llsn)))
for replicaIdx := 0; replicaIdx < len(st.rts); replicaIdx++ {
for replicaIdx := 0; replicaIdx < len(st.rts.tasks); replicaIdx++ {
// NOTE: Use "append" since the length of st.rts is not enough to use index. Its capacity is enough because it is created to be reused.
st.rts[replicaIdx].llsnList = append(st.rts[replicaIdx].llsnList, sq.llsn)
st.rts.tasks[replicaIdx].llsnList = append(st.rts.tasks[replicaIdx].llsnList, sq.llsn)
}
//nolint:staticcheck
if err := st.wb.Set(sq.llsn, st.dataBatch[dataIdx]); err != nil {
Expand Down Expand Up @@ -141,7 +141,7 @@ func (sq *sequencer) sequenceLoopInternal(ctx context.Context, st *sequenceTask)
st.wwg.done(err)
_ = st.wb.Close()
releaseCommitWaitTaskList(cwts)
releaseReplicateTasks(rts)
releaseReplicateTasks(rts.tasks)
releaseReplicateTaskSlice(rts)
st.release()
return
Expand All @@ -154,24 +154,24 @@ func (sq *sequencer) sequenceLoopInternal(ctx context.Context, st *sequenceTask)
st.wwg.done(err)
_ = st.wb.Close()
releaseCommitWaitTaskList(cwts)
releaseReplicateTasks(rts)
releaseReplicateTasks(rts.tasks)
releaseReplicateTaskSlice(rts)
st.release()
return
}

// send to replicator
ridx := 0
for ridx < len(rts) {
err := sq.lse.rcs.clients[ridx].send(ctx, rts[ridx])
for ridx < len(rts.tasks) {
err := sq.lse.rcs.clients[ridx].send(ctx, rts.tasks[ridx])
if err != nil {
sq.logger.Error("could not send to replicate client", zap.Error(err))
sq.lse.esm.compareAndSwap(executorStateAppendable, executorStateSealing)
break
}
ridx++
}
releaseReplicateTasks(rts[ridx:])
releaseReplicateTasks(rts.tasks[ridx:])
releaseReplicateTaskSlice(rts)
}

Expand Down Expand Up @@ -248,7 +248,7 @@ type sequenceTask struct {
wb *storage.WriteBatch
dataBatch [][]byte
cwts *listQueue
rts []*replicateTask
rts *replicateTaskSlice
}

func newSequenceTask() *sequenceTask {
Expand Down
8 changes: 6 additions & 2 deletions internal/storagenode/logstream/sequencer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ func testSequenceTask(stg *storage.Storage) *sequenceTask {
st.cwts = newListQueue()
st.cwts.PushFront(newCommitWaitTask(awg))

st.rts = &replicateTaskSlice{}

return st
}

Expand Down Expand Up @@ -192,8 +194,10 @@ func TestSequencer_FailToSendToReplicateClient(t *testing.T) {
}

st := testSequenceTask(stg)
st.rts = []*replicateTask{
{},
st.rts = &replicateTaskSlice{
tasks: []*replicateTask{
{},
},
}
sq.sequenceLoopInternal(context.Background(), st)
assert.Len(t, wr.queue, 1)
Expand Down

0 comments on commit bf5fb22

Please sign in to comment.