Skip to content

Commit

Permalink
feat(logstream): introduce commit wait task for append batch
Browse files Browse the repository at this point in the history
This commit introduces a commit wait task that represents an entire append
batch, rather than individual log entries. This is a crucial step towards
implementing atomic append operations.

Previously, a separate commit wait task was created for each log entry in an
append batch. This approach made it difficult to handle the batch atomically, as
commit wait tasks were processed individually.

With this change, a single commit wait task is created for the entire batch.
This allows the committer to process the batch atomically, ensuring that either
all log entries in the batch are committed or none are.

This change also brings a slight performance improvement, as the committer now
needs to process fewer tasks. However, no specific benchmarks have been
performed to measure the exact gain.

The client API does not yet support atomic append operations, and partial
success/failure is still allowed. This will be addressed in a future update.

This change is a major step towards resolving #843, which aims to implement
atomic append operations.
  • Loading branch information
ijsong committed Feb 21, 2025
1 parent c312a6c commit 30645f0
Show file tree
Hide file tree
Showing 10 changed files with 143 additions and 144 deletions.
5 changes: 2 additions & 3 deletions internal/storagenode/logstream/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ func (lse *Executor) prepareAppendContext(dataBatch [][]byte, apc *appendContext
apc.wwg = st.wwg

st.wb = lse.stg.NewWriteBatch()
st.cwts = newListQueue()
for i := 0; i < len(dataBatch); i++ {
logEntrySize := int64(len(dataBatch[i]))
apc.totalBytes += logEntrySize
Expand All @@ -205,17 +204,17 @@ func (lse *Executor) prepareAppendContext(dataBatch [][]byte, apc *appendContext
lse.lsm.LogRPCServerLogEntrySize.Record(context.Background(), telemetry.RPCKindAppend, codes.OK, logEntrySize)
}
awg := newAppendWaitGroup(st.wwg)
st.cwts.PushFront(newCommitWaitTask(awg))
apc.awgs = append(apc.awgs, awg)
}
st.cwt = newCommitWaitTask(apc.awgs, len(apc.awgs))
st.awgs = apc.awgs
}

func (lse *Executor) sendSequenceTask(ctx context.Context, st *sequenceTask) {
if err := lse.sq.send(ctx, st); err != nil {
st.wwg.done(err)
_ = st.wb.Close()
releaseCommitWaitTaskList(st.cwts)
st.cwt.release()
releaseReplicateTasks(st.rts.tasks)
releaseReplicateTaskSlice(st.rts)
st.release()
Expand Down
26 changes: 8 additions & 18 deletions internal/storagenode/logstream/commit_wait_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ func (iter commitWaitQueueIterator) valid() bool {
}

type commitWaitQueue struct {
queue *listQueue
mu sync.Mutex
queue *listQueue
numLogEntries int
mu sync.Mutex
}

func newCommitWaitQueue() *commitWaitQueue {
Expand All @@ -46,24 +47,11 @@ func (cwq *commitWaitQueue) push(cwt *commitWaitTask) error {
}
cwq.mu.Lock()
cwq.queue.PushFront(cwt)
cwq.numLogEntries += cwt.size
cwq.mu.Unlock()
return nil
}

func (cwq *commitWaitQueue) pushList(cwts *listQueue) error {
if cwts == nil {
panic("log stream: commit wait queue: task is nil")
}
if cwts.Len() == 0 {
panic("log stream: commit wait queue: empty tasks")
}
cwq.mu.Lock()
cwq.queue.ConcatFront(cwts)
cwq.mu.Unlock()
cwts.release()
return nil
}

func (cwq *commitWaitQueue) peekIterator() commitWaitQueueIterator {
cwq.mu.Lock()
iter := commitWaitQueueIterator{
Expand All @@ -80,12 +68,14 @@ func (cwq *commitWaitQueue) pop() *commitWaitTask {
if elem := cwq.queue.Back(); elem == nil {
return nil
}
return cwq.queue.RemoveBack().(*commitWaitTask)
cwt := cwq.queue.RemoveBack().(*commitWaitTask)
cwq.numLogEntries -= cwt.size
return cwt
}

func (cwq *commitWaitQueue) size() int {
cwq.mu.Lock()
ret := cwq.queue.Len()
ret := cwq.numLogEntries
cwq.mu.Unlock()
return ret
}
25 changes: 12 additions & 13 deletions internal/storagenode/logstream/commit_wait_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,31 @@ func TestCommitWaitQueue(t *testing.T) {
assert.Nil(t, iter.task())

assert.Panics(t, func() { _ = cwq.push(nil) })
assert.Panics(t, func() { _ = cwq.pushList(nil) })
assert.Panics(t, func() {
cwts := newListQueue()
_ = cwq.pushList(cwts)
})

for i := 0; i < n; i++ {
assert.Equal(t, i, cwq.size())
err := cwq.push(newCommitWaitTask(&appendWaitGroup{llsn: types.LLSN(i + 1)}))
cwt := newCommitWaitTask([]*appendWaitGroup{
{llsn: types.LLSN(i + 1)},
}, 1)
err := cwq.push(cwt)
assert.NoError(t, err)
assert.Equal(t, i+1, cwq.size())
}

iter = cwq.peekIterator()
for i := 0; i < n; i++ {
assert.True(t, iter.valid())
assert.Equal(t, types.LLSN(i+1), iter.task().awg.llsn)
cwt := iter.task()
assert.Len(t, cwt.awgs, 1)
assert.Equal(t, types.LLSN(i+1), cwt.awgs[0].llsn)
valid := iter.next()
assert.Equal(t, i < n-1, valid)
}

for i := 0; i < n; i++ {
cwt := cwq.pop()
assert.Equal(t, types.LLSN(i+1), cwt.awg.llsn)
assert.Len(t, cwt.awgs, 1)
assert.Equal(t, types.LLSN(i+1), cwt.awgs[0].llsn)
cwt.release()
}
assert.Nil(t, cwq.pop())
Expand All @@ -66,15 +67,13 @@ func TestCommitWaitQueueConcurrentPushPop(t *testing.T) {
go func() {
defer wg.Done()
for i := 0; i < numRepeat; i++ {
cwts := newListQueue()
for j := 0; j < cwtsLength; j++ {
awg := newAppendWaitGroup(nil)
awg.llsn = types.LLSN(cwtsLength*i + j)
cwt := newCommitWaitTask(awg)
cwts.PushFront(cwt)
cwt := newCommitWaitTask([]*appendWaitGroup{awg}, 1)
err := cwq.push(cwt)
assert.NoError(t, err)
}
err := cwq.pushList(cwts)
assert.NoError(t, err)
runtime.Gosched()
}
}()
Expand Down
112 changes: 66 additions & 46 deletions internal/storagenode/logstream/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,25 +46,24 @@ func newCommitter(cfg committerConfig) (*committer, error) {
return cm, nil
}

// sendCommitWaitTask sends a list of commit wait tasks to the committer.
// sendCommitWaitTask sends a commit wait task to the committer.
// The commit wait task is pushed into commitWaitQ in the committer.
// The writer calls this method internally to push commit wait tasks to the committer.
// If the input list of commit wait tasks are nil or empty, it panics.
func (cm *committer) sendCommitWaitTask(_ context.Context, cwts *listQueue, ignoreSealing bool) (err error) {
if cwts == nil {
panic("log stream: committer: commit wait task list is nil")
// The writer calls this method internally to push a commit wait task to the committer.
// If the input commit wait task is nil or empty, it panics.
func (cm *committer) sendCommitWaitTask(_ context.Context, cwt *commitWaitTask, ignoreSealing bool) (err error) {
if cwt == nil {
panic("log stream: committer: commit wait task is nil")
}
cnt := cwts.Len()
if cnt == 0 {
panic("log stream: committer: commit wait task list is empty")
if cwt.size == 0 {
panic("log stream: committer: commit wait task is empty")
}

inflight := cm.inflightCommitWait.Add(int64(cnt))
inflight := cm.inflightCommitWait.Add(1)
defer func() {
if err != nil {
inflight = cm.inflightCommitWait.Add(-int64(cnt))
inflight = cm.inflightCommitWait.Add(-1)
}
if ce := cm.logger.Check(zap.DebugLevel, "send committer commit wait tasks"); ce != nil {
if ce := cm.logger.Check(zap.DebugLevel, "send committer commit wait task"); ce != nil {
ce.Write(
zap.Int64("inflight", inflight),
zap.Error(err),
Expand All @@ -86,7 +85,7 @@ func (cm *committer) sendCommitWaitTask(_ context.Context, cwts *listQueue, igno
return err
}

_ = cm.commitWaitQ.pushList(cwts)
_ = cm.commitWaitQ.push(cwt)
return nil
}

Expand Down Expand Up @@ -260,40 +259,56 @@ func (cm *committer) commitInternal(cc storage.CommitContext) (err error) {
}()

iter := cm.commitWaitQ.peekIterator()
for i := 0; i < numCommits; i++ {
llsn := cc.CommittedLLSNBegin + types.LLSN(i)
glsn := cc.CommittedGLSNBegin + types.GLSN(i)
numLogEntries := 0
numCWTs := 0
for numLogEntries < numCommits && iter.valid() {
cwt := iter.task()

if uncommttedLLSNBegin+types.LLSN(i) != llsn {
err = errors.New("log stream: committer: llsn mismatch")
return err
}
// invariant: numCommits-numLogEntries >= cwt.size
for i := range cwt.size {
llsn := cc.CommittedLLSNBegin + types.LLSN(numLogEntries)
glsn := cc.CommittedGLSNBegin + types.GLSN(numLogEntries)

// Since the number of tasks in commitWaitQ is inspected above, cwt
// must exist.
cwt := iter.task()
cwt.awg.setGLSN(glsn)
if uncommttedLLSNBegin+types.LLSN(numLogEntries) != llsn {
err = errors.New("log stream: committer: llsn mismatch")
return err
}

err = cb.Set(llsn, glsn)
if err != nil {
return err
if len(cwt.awgs) > 0 {
cwt.awgs[i].setGLSN(glsn)
}

err = cb.Set(llsn, glsn)
if err != nil {
return err
}

numLogEntries++
}

iter.next()
numCWTs++
}
if numLogEntries != numCommits {
cm.logger.Panic("commit corrupted: not matched between commit wait tasks and commit message",
zap.Int("numCommits", numCommits),
zap.Int("numLogEntries", numLogEntries),
zap.Any("uncommittedBegin", uncommittedBegin),
zap.Any("commitContext", cc),
)
}
err = cb.Apply()
if err != nil {
return err
}

committedTasks := make([]*commitWaitTask, 0, numCommits)
for i := 0; i < numCommits; i++ {
committedTasks := make([]*commitWaitTask, 0, numCWTs)
for i := 0; i < numCWTs; i++ {
// NOTE: This cwt should not be nil, because the size of commitWaitQ is inspected
// above.
cwt := cm.commitWaitQ.pop()
committedTasks = append(committedTasks, cwt)
}

if numCommits > 0 {
// only the first commit changes local low watermark
cm.lse.lsc.localLWM.CompareAndSwap(varlogpb.LogSequenceNumber{}, varlogpb.LogSequenceNumber{
Expand All @@ -310,12 +325,14 @@ func (cm *committer) commitInternal(cc storage.CommitContext) (err error) {
})

for _, cwt := range committedTasks {
cwt.awg.commitDone(nil)
for _, awg := range cwt.awgs {
awg.commitDone(nil)
}
cwt.release()
}

if len(committedTasks) > 0 {
cm.inflightCommitWait.Add(int64(-numCommits))
cm.inflightCommitWait.Add(int64(-numCWTs))
}

return nil
Expand All @@ -335,7 +352,9 @@ func (cm *committer) drainCommitWaitQ(cause error) {
if cwt == nil {
continue
}
cwt.awg.commitDone(cause)
for _, awg := range cwt.awgs {
awg.commitDone(cause)
}
cwt.release()
inflight := cm.inflightCommitWait.Add(-1)
if ce := cm.logger.Check(zap.DebugLevel, "discard a commit wait task"); ce != nil {
Expand Down Expand Up @@ -401,29 +420,30 @@ var commitWaitTaskPool = sync.Pool{
}

type commitWaitTask struct {
awg *appendWaitGroup
// In the primary replica's commit operation, the size and the length of
// awgs are the same. However, in the backup replica's commit operation,
// the length of args is 0, and size indicates the number of log entries to
// be committed. When committing log entries during the bootstrap's
// recovery process, the length of args is 0, and the size is 1.
//
// TODO(jun): Next refactoring phase, we will reduce awgs into a single
// awg, that is, a single awg for an append batch.
awgs []*appendWaitGroup
size int // the number of log entries to be committed
}

func newCommitWaitTask(awg *appendWaitGroup) *commitWaitTask {
// Backup replica has no awg.
func newCommitWaitTask(awgs []*appendWaitGroup, size int) *commitWaitTask {
cwt := commitWaitTaskPool.Get().(*commitWaitTask)
cwt.awg = awg
cwt.awgs = awgs
cwt.size = size
return cwt
}

func (cwt *commitWaitTask) release() {
cwt.awg = nil
*cwt = commitWaitTask{}
commitWaitTaskPool.Put(cwt)
}

func releaseCommitWaitTaskList(cwts *listQueue) {
cnt := cwts.Len()
for i := 0; i < cnt; i++ {
cwt := cwts.RemoveBack().(*commitWaitTask)
cwt.release()
}
}

var commitTaskPool = sync.Pool{
New: func() interface{} {
return &commitTask{}
Expand Down
27 changes: 15 additions & 12 deletions internal/storagenode/logstream/committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (

"github.com/stretchr/testify/assert"
"go.uber.org/zap"

"github.com/kakao/varlog/pkg/types"
)

func TestCommitter_InvalidConfig(t *testing.T) {
Expand Down Expand Up @@ -49,31 +51,32 @@ func TestCommitter_ShouldNotAcceptTasksWhileNotAppendable(t *testing.T) {
cm.lse = lse
cm.logger = zap.NewNop()

// sendCommitWaitTask
cwts := newListQueue()
defer cwts.release()

lse.esm.store(executorStateAppendable)
assert.Panics(t, func() {
_ = cm.sendCommitWaitTask(context.Background(), cwts, false /*ignoreSealing*/)
_ = cm.sendCommitWaitTask(context.Background(), &commitWaitTask{}, false /*ignoreSealing*/)
})

assert.Panics(t, func() {
_ = cm.sendCommitWaitTask(context.Background(), nil, false /*ignoreSealing*/)
})

cwts.PushFront(&commitWaitTask{})
cwt := &commitWaitTask{
awgs: []*appendWaitGroup{{
llsn: types.MinLLSN,
}},
size: 1,
}

lse.esm.store(executorStateSealing)
err := cm.sendCommitWaitTask(context.Background(), cwts, false /*ignoreSealing*/)
err := cm.sendCommitWaitTask(context.Background(), cwt, false /*ignoreSealing*/)
assert.Error(t, err)

lse.esm.store(executorStateSealed)
err = cm.sendCommitWaitTask(context.Background(), cwts, false /*ignoreSealing*/)
err = cm.sendCommitWaitTask(context.Background(), cwt, false /*ignoreSealing*/)
assert.Error(t, err)

lse.esm.store(executorStateClosed)
err = cm.sendCommitWaitTask(context.Background(), cwts, false /*ignoreSealing*/)
err = cm.sendCommitWaitTask(context.Background(), cwt, false /*ignoreSealing*/)
assert.Error(t, err)

// sendCommitTask
Expand Down Expand Up @@ -141,9 +144,9 @@ func TestCommitter_DrainCommitWaitQ(t *testing.T) {
cm.lse = lse
cm.logger = zap.NewNop()

cwts := newListQueue()
cwts.PushFront(newCommitWaitTask(newAppendWaitGroup(newWriteWaitGroup())))
err := cm.sendCommitWaitTask(context.Background(), cwts, false /*ignoreSealing*/)
awg := newAppendWaitGroup(newWriteWaitGroup())
cwt := newCommitWaitTask([]*appendWaitGroup{awg}, 1)
err := cm.sendCommitWaitTask(context.Background(), cwt, false /*ignoreSealing*/)
assert.NoError(t, err)

assert.EqualValues(t, 1, cm.inflightCommitWait.Load())
Expand Down
Loading

0 comments on commit 30645f0

Please sign in to comment.