Skip to content

Commit

Permalink
refactor(logstream): consolidate append wait groups into a single object
Browse files Browse the repository at this point in the history
This PR reduces append wait groups to a single object for each append batch. The
append wait groups are part of the following structs:

- appendContext
- commitWaitTask
- sequenceTask

Previously, multiple append wait groups existed because each log entry in a
batch had its own wait group. With the changes for issue #843, a single append
wait group now corresponds to an entire batch. This PR consolidates the list of
append wait groups into a single append wait group.

TODO: We are still on the road to resolving #843. The rest of the work mostly
involves cleaning up the code base.
  • Loading branch information
ijsong committed Feb 21, 2025
1 parent 48dd48a commit a05f7e3
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 92 deletions.
54 changes: 20 additions & 34 deletions internal/storagenode/logstream/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,27 +69,23 @@ func (at *AppendTask) WaitForCompletion(ctx context.Context) ([]snpb.AppendResul
go func() {
defer close(done)

// TODO(jun): We will reduce appendWaitGroups to a single object for
// each append batch.
for i, awg := range at.apc.awgs {
cerr := awg.wait(cctx)
if cerr != nil {
if err == nil {
err = cerr
}
if cctx.Err() == nil {
// Since it's neither canceled nor timed out, we can
// release awg.
at.apc.awgs[i].release()
}
continue
err = at.apc.awg.wait(cctx)
if err != nil {
if cctx.Err() == nil {
// Since it's neither canceled nor timed out, we can release
// awg.
at.apc.awg.release()
}
return
}

for i := range at.apc.awg.batchLen {
res[i].Meta.TopicID = at.lse.tpid
res[i].Meta.LogStreamID = at.lse.lsid
res[i].Meta.GLSN = at.apc.awgs[i].glsn
res[i].Meta.LLSN = at.apc.awgs[i].llsn
at.apc.awgs[i].release()
res[i].Meta.GLSN = at.apc.awg.beginLSN.GLSN + types.GLSN(i)
res[i].Meta.LLSN = at.apc.awg.beginLSN.LLSN + types.LLSN(i)
}
at.apc.awg.release()
}()

select {
Expand All @@ -116,15 +112,9 @@ func appendTaskDeferredFunc(at *AppendTask) {
}

type appendContext struct {
st *sequenceTask
wwg *writeWaitGroup
// NOTE(jun): awgs represents a collection of wait groups corresponding to
// each log entry in the batch. While storage typically writes log entries
// in a batch simultaneously, the commit operation, although expected to
// handle all entries in a batch, is not strictly enforced to do so.
// Therefore, we should maintain awgs until we can guarantee batch-level
// atomic commits.
awgs []*appendWaitGroup
st *sequenceTask
wwg *writeWaitGroup
awg *appendWaitGroup
totalBytes int64
}

Expand All @@ -150,10 +140,6 @@ func (lse *Executor) AppendAsync(ctx context.Context, dataBatch [][]byte, append
return snerrors.ErrNotPrimary
}

appendTask.apc = appendContext{
awgs: make([]*appendWaitGroup, 0, dataBatchLen),
}

var preparationDuration time.Duration
defer func() {
if lse.lsm != nil {
Expand Down Expand Up @@ -204,11 +190,11 @@ func (lse *Executor) prepareAppendContext(dataBatch [][]byte, apc *appendContext
// TODO: Set the correct status code.
lse.lsm.LogRPCServerLogEntrySize.Record(context.Background(), telemetry.RPCKindAppend, codes.OK, logEntrySize)
}
awg := newAppendWaitGroup(st.wwg)
apc.awgs = append(apc.awgs, awg)
}
st.cwt = newCommitWaitTask(apc.awgs, len(apc.awgs))
st.awgs = apc.awgs
awg := newAppendWaitGroup(st.wwg, len(dataBatch))
apc.awg = awg
st.cwt = newCommitWaitTask(awg, len(dataBatch))
st.awg = awg
}

func (lse *Executor) sendSequenceTask(ctx context.Context, st *sequenceTask) {
Expand Down
23 changes: 14 additions & 9 deletions internal/storagenode/logstream/append_waitgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"

"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/proto/varlogpb"
)

var writeWaitGroupPool = sync.Pool{
Expand Down Expand Up @@ -51,37 +52,41 @@ var appendWaitGroupPool = &sync.Pool{
}

type appendWaitGroup struct {
glsn types.GLSN
llsn types.LLSN
// beginLSN is the first log sequence number of the batch.
beginLSN varlogpb.LogSequenceNumber
// batchLen is the number of log entries in the batch.
batchLen int
wwg *writeWaitGroup
cwg sync.WaitGroup
commitErr error
}

func newAppendWaitGroup(wwg *writeWaitGroup) *appendWaitGroup {
func newAppendWaitGroup(wwg *writeWaitGroup, batchLen int) *appendWaitGroup {
awg := appendWaitGroupPool.Get().(*appendWaitGroup)
awg.batchLen = batchLen
awg.wwg = wwg
awg.cwg.Add(1)
return awg
}

func (awg *appendWaitGroup) release() {
awg.glsn = types.InvalidGLSN
awg.llsn = types.InvalidLLSN
awg.beginLSN.GLSN = types.InvalidGLSN
awg.beginLSN.LLSN = types.InvalidLLSN
awg.batchLen = 0
awg.wwg = nil
awg.cwg = sync.WaitGroup{}
appendWaitGroupPool.Put(awg)
}

func (awg *appendWaitGroup) setGLSN(glsn types.GLSN) {
func (awg *appendWaitGroup) setBeginGLSN(glsn types.GLSN) {
if awg != nil {
awg.glsn = glsn
awg.beginLSN.GLSN = glsn
}
}

func (awg *appendWaitGroup) setLLSN(llsn types.LLSN) {
func (awg *appendWaitGroup) setBeginLLSN(llsn types.LLSN) {
if awg != nil {
awg.llsn = llsn
awg.beginLSN.LLSN = llsn
}
}

Expand Down
22 changes: 13 additions & 9 deletions internal/storagenode/logstream/commit_wait_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/proto/varlogpb"
)

func TestCommitWaitQueue(t *testing.T) {
Expand All @@ -27,8 +28,11 @@ func TestCommitWaitQueue(t *testing.T) {

for i := 0; i < n; i++ {
assert.Equal(t, i, cwq.size())
cwt := newCommitWaitTask([]*appendWaitGroup{
{llsn: types.LLSN(i + 1)},
cwt := newCommitWaitTask(&appendWaitGroup{
beginLSN: varlogpb.LogSequenceNumber{
LLSN: types.LLSN(i + 1),
},
batchLen: 1,
}, 1)
err := cwq.push(cwt)
assert.NoError(t, err)
Expand All @@ -39,16 +43,16 @@ func TestCommitWaitQueue(t *testing.T) {
for i := 0; i < n; i++ {
assert.True(t, iter.valid())
cwt := iter.task()
assert.Len(t, cwt.awgs, 1)
assert.Equal(t, types.LLSN(i+1), cwt.awgs[0].llsn)
assert.Equal(t, 1, cwt.awg.batchLen)
assert.Equal(t, types.LLSN(i+1), cwt.awg.beginLSN.LLSN)
valid := iter.next()
assert.Equal(t, i < n-1, valid)
}

for i := 0; i < n; i++ {
cwt := cwq.pop()
assert.Len(t, cwt.awgs, 1)
assert.Equal(t, types.LLSN(i+1), cwt.awgs[0].llsn)
assert.Equal(t, 1, cwt.awg.batchLen)
assert.Equal(t, types.LLSN(i+1), cwt.awg.beginLSN.LLSN)
cwt.release()
}
assert.Nil(t, cwq.pop())
Expand All @@ -68,9 +72,9 @@ func TestCommitWaitQueueConcurrentPushPop(t *testing.T) {
defer wg.Done()
for i := 0; i < numRepeat; i++ {
for j := 0; j < cwtsLength; j++ {
awg := newAppendWaitGroup(nil)
awg.llsn = types.LLSN(cwtsLength*i + j)
cwt := newCommitWaitTask([]*appendWaitGroup{awg}, 1)
awg := newAppendWaitGroup(nil, 1)
awg.setBeginGLSN(types.GLSN(cwtsLength*i + j))
cwt := newCommitWaitTask(awg, 1)
err := cwq.push(cwt)
assert.NoError(t, err)
}
Expand Down
30 changes: 12 additions & 18 deletions internal/storagenode/logstream/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,9 @@ func (cm *committer) commitInternal(cc storage.CommitContext) (err error) {
return err
}

if len(cwt.awgs) > 0 {
cwt.awgs[i].setGLSN(glsn)
if i == 0 && cwt.awg != nil {
// set GLSN of the first log entry in the batch
cwt.awg.beginLSN.GLSN = glsn
}

err = cb.Set(llsn, glsn)
Expand Down Expand Up @@ -325,9 +326,7 @@ func (cm *committer) commitInternal(cc storage.CommitContext) (err error) {
})

for _, cwt := range committedTasks {
for _, awg := range cwt.awgs {
awg.commitDone(nil)
}
cwt.awg.commitDone(nil)
cwt.release()
}

Expand All @@ -352,9 +351,7 @@ func (cm *committer) drainCommitWaitQ(cause error) {
if cwt == nil {
continue
}
for _, awg := range cwt.awgs {
awg.commitDone(cause)
}
cwt.awg.commitDone(cause)
cwt.release()
inflight := cm.inflightCommitWait.Add(-1)
if ce := cm.logger.Check(zap.DebugLevel, "discard a commit wait task"); ce != nil {
Expand Down Expand Up @@ -420,21 +417,18 @@ var commitWaitTaskPool = sync.Pool{
}

type commitWaitTask struct {
// In the primary replica's commit operation, the size and the length of
// awgs are the same. However, in the backup replica's commit operation,
// the length of args is 0, and size indicates the number of log entries to
// In the primary replica's commit operation, the size and the awg.batchLen
// are the same. However, in the backup replica's commit operation,
// the awg.batchLen is 0, and size indicates the number of log entries to
// be committed. When committing log entries during the bootstrap's
// recovery process, the length of args is 0, and the size is 1.
//
// TODO(jun): Next refactoring phase, we will reduce awgs into a single
// awg, that is, a single awg for an append batch.
awgs []*appendWaitGroup
// recovery process, the awg.batchLen is 0, and the size is 1.
awg *appendWaitGroup
size int // the number of log entries to be committed
}

func newCommitWaitTask(awgs []*appendWaitGroup, size int) *commitWaitTask {
func newCommitWaitTask(awg *appendWaitGroup, size int) *commitWaitTask {
cwt := commitWaitTaskPool.Get().(*commitWaitTask)
cwt.awgs = awgs
cwt.awg = awg
cwt.size = size
return cwt
}
Expand Down
13 changes: 8 additions & 5 deletions internal/storagenode/logstream/committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.uber.org/zap"

"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/proto/varlogpb"
)

func TestCommitter_InvalidConfig(t *testing.T) {
Expand Down Expand Up @@ -61,9 +62,11 @@ func TestCommitter_ShouldNotAcceptTasksWhileNotAppendable(t *testing.T) {
})

cwt := &commitWaitTask{
awgs: []*appendWaitGroup{{
llsn: types.MinLLSN,
}},
awg: &appendWaitGroup{
beginLSN: varlogpb.LogSequenceNumber{
LLSN: types.MinLLSN,
},
},
size: 1,
}

Expand Down Expand Up @@ -144,8 +147,8 @@ func TestCommitter_DrainCommitWaitQ(t *testing.T) {
cm.lse = lse
cm.logger = zap.NewNop()

awg := newAppendWaitGroup(newWriteWaitGroup())
cwt := newCommitWaitTask([]*appendWaitGroup{awg}, 1)
awg := newAppendWaitGroup(newWriteWaitGroup(), 1)
cwt := newCommitWaitTask(awg, 1)
err := cm.sendCommitWaitTask(context.Background(), cwt, false /*ignoreSealing*/)
assert.NoError(t, err)

Expand Down
16 changes: 8 additions & 8 deletions internal/storagenode/logstream/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,11 @@ func (sq *sequencer) sequenceLoopInternal(ctx context.Context, st *sequenceTask)

startTime = time.Now()

for dataIdx := 0; dataIdx < len(st.awgs); dataIdx++ {
for dataIdx := 0; dataIdx < len(st.dataBatch); dataIdx++ {
sq.llsn++
st.awgs[dataIdx].setLLSN(sq.llsn)
if dataIdx == 0 {
st.awg.setBeginLLSN(sq.llsn)
}
if ce := sq.logger.Check(zap.DebugLevel, "sequencer: issued llsn"); ce != nil {
ce.Write(zap.Uint64("llsn", uint64(sq.llsn)))
}
Expand Down Expand Up @@ -205,10 +207,8 @@ func (sq *sequencer) waitForDrainage(cause error, forceDrain bool) {
case <-timer.C:
timer.Reset(tick)
case st := <-sq.queue:
for i := 0; i < len(st.awgs); i++ {
st.awgs[i].writeDone(cause)
st.awgs[i].commitDone(nil)
}
st.awg.writeDone(cause)
st.awg.commitDone(nil)
sq.inflight.Add(-1)
}
}
Expand Down Expand Up @@ -249,7 +249,7 @@ var sequenceTaskPool = sync.Pool{

type sequenceTask struct {
wwg *writeWaitGroup
awgs []*appendWaitGroup
awg *appendWaitGroup
wb *storage.WriteBatch
dataBatch [][]byte
cwt *commitWaitTask
Expand All @@ -263,7 +263,7 @@ func newSequenceTask() *sequenceTask {

func (st *sequenceTask) release() {
st.wwg = nil
st.awgs = nil
st.awg = nil
st.wb = nil
st.dataBatch = nil
st.cwt = nil
Expand Down
6 changes: 3 additions & 3 deletions internal/storagenode/logstream/sequencer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@ func testSequenceTask(stg *storage.Storage) *sequenceTask {
st := newSequenceTask()

st.wwg = newWriteWaitGroup()
awg := newAppendWaitGroup(st.wwg)
st.awgs = append(st.awgs, awg)
awg := newAppendWaitGroup(st.wwg, 1)
st.awg = awg

st.wb = stg.NewWriteBatch()
st.dataBatch = [][]byte{nil}

st.cwt = newCommitWaitTask([]*appendWaitGroup{awg}, 1)
st.cwt = newCommitWaitTask(awg, 1)

st.rts = &replicateTaskSlice{}

Expand Down
9 changes: 4 additions & 5 deletions internal/storagenode/logstream/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"go.uber.org/zap"

"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/pkg/util/runner"
"github.com/kakao/varlog/pkg/verrors"
)
Expand Down Expand Up @@ -85,7 +86,7 @@ func (w *writer) writeLoop(ctx context.Context) {
func (w *writer) writeLoopInternal(_ context.Context, st *sequenceTask) {
startTime := time.Now()
var err error
cnt := len(st.awgs)
cnt := len(st.dataBatch)
defer func() {
st.wwg.done(err)
_ = st.wb.Close()
Expand All @@ -106,7 +107,7 @@ func (w *writer) writeLoopInternal(_ context.Context, st *sequenceTask) {
return
}

oldLLSN, newLLSN := st.awgs[0].llsn, st.awgs[cnt-1].llsn+1
oldLLSN, newLLSN := st.awg.beginLSN.LLSN, st.awg.beginLSN.LLSN+types.LLSN(cnt)
if !w.lse.lsc.uncommittedLLSNEnd.CompareAndSwap(oldLLSN, newLLSN) {
// NOTE: If this panic occurs, it may be very subtle.
// We can't simply guarantee whether unexpected LLSNs are
Expand Down Expand Up @@ -149,9 +150,7 @@ func (w *writer) waitForDrainage(cause error, forceDrain bool) {
case <-timer.C:
timer.Reset(tick)
case st := <-w.queue:
for i := 0; i < len(st.awgs); i++ {
st.awgs[i].writeDone(cause)
}
st.awg.writeDone(cause)
st.release()
w.inflight.Add(-1)
}
Expand Down
Loading

0 comments on commit a05f7e3

Please sign in to comment.