Skip to content

Commit

Permalink
feat(client): add LogStreamAppender
Browse files Browse the repository at this point in the history
This PR adds LogStreamAppender to the client, and it is a client to append asynchronously only a
particular log stream.

Resolves #433.
  • Loading branch information
ijsong committed Jun 1, 2023
1 parent 5bc3f8b commit 05f590e
Show file tree
Hide file tree
Showing 10 changed files with 640 additions and 1 deletion.
4 changes: 4 additions & 0 deletions internal/storagenode/client/log_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func (c *LogClient) Append(ctx context.Context, tpid types.TopicID, lsid types.L
return rsp.Results, nil
}

func (c *LogClient) AppendStream(ctx context.Context) (snpb.LogIO_AppendClient, error) {
return c.rpcClient.Append(ctx)
}

// Subscribe gets log entries continuously from the storage node. It guarantees that LLSNs of log
// entries taken are sequential.
func (c *LogClient) Subscribe(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID, begin, end types.GLSN) (<-chan SubscribeResult, error) {
Expand Down
72 changes: 72 additions & 0 deletions pkg/varlog/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package varlog
import (
"context"
"time"

"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/proto/varlogpb"
)

type adminConfig struct {
Expand Down Expand Up @@ -91,3 +94,72 @@ func WithTimeout(timeout time.Duration) AdminCallOption {
cfg.timeout.set = true
})
}

const (
defaultPipelineSize = 2
minPipelineSize = 1
maxPipelineSize = 8
)

type logStreamAppenderConfig struct {
defaultBatchCallback BatchCallback
tpid types.TopicID
lsid types.LogStreamID
pipelineSize int
}

func newLogStreamAppenderConfig(opts []LogStreamAppenderOption) logStreamAppenderConfig {
cfg := logStreamAppenderConfig{
pipelineSize: defaultPipelineSize,
defaultBatchCallback: func([]varlogpb.LogEntryMeta, error) {},
}
for _, opt := range opts {
opt.applyLogStreamAppender(&cfg)
}
cfg.ensureDefault()
return cfg
}

func (cfg *logStreamAppenderConfig) ensureDefault() {
if cfg.pipelineSize < minPipelineSize {
cfg.pipelineSize = minPipelineSize
}
if cfg.pipelineSize > maxPipelineSize {
cfg.pipelineSize = maxPipelineSize
}
}

type funcLogStreamAppenderOption struct {
f func(*logStreamAppenderConfig)
}

func newFuncLogStreamAppenderOption(f func(config *logStreamAppenderConfig)) *funcLogStreamAppenderOption {
return &funcLogStreamAppenderOption{f: f}
}

func (fo *funcLogStreamAppenderOption) applyLogStreamAppender(cfg *logStreamAppenderConfig) {
fo.f(cfg)
}

// LogStreamAppenderOption configures a LogStreamAppender.
type LogStreamAppenderOption interface {
applyLogStreamAppender(config *logStreamAppenderConfig)
}

// WithPipelineSize sets request pipeline size. The default pipeline size is
// two. Any value below one will be set to one, and any above eight will be
// limited to eight.
func WithPipelineSize(pipelineSize int) LogStreamAppenderOption {
return newFuncLogStreamAppenderOption(func(cfg *logStreamAppenderConfig) {
cfg.pipelineSize = pipelineSize
})
}

// WithDefaultBatchCallback sets the default callback function. The default callback
// function can be overridden by the argument callback of the AppendBatch
// method.
func WithDefaultBatchCallback(defaultBatchCallback BatchCallback) LogStreamAppenderOption {
return newFuncLogStreamAppenderOption(func(cfg *logStreamAppenderConfig) {
cfg.defaultBatchCallback = defaultBatchCallback
})
}
5 changes: 5 additions & 0 deletions pkg/varlog/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package varlog

import "errors"

var ErrClosed = errors.New("client: closed")
7 changes: 7 additions & 0 deletions pkg/varlog/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ type Log interface {
// replica. If none of the replicas' statuses is either appendable or
// sealed, it returns an error.
PeekLogStream(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (first varlogpb.LogSequenceNumber, last varlogpb.LogSequenceNumber, err error)

// NewLogStreamAppender returns a new LogStreamAppender.
NewLogStreamAppender(tpid types.TopicID, lsid types.LogStreamID, opts ...LogStreamAppenderOption) (LogStreamAppender, error)
}

type AppendResult struct {
Expand Down Expand Up @@ -177,6 +180,10 @@ func (v *logImpl) PeekLogStream(ctx context.Context, tpid types.TopicID, lsid ty
return v.peekLogStream(ctx, tpid, lsid)
}

func (v *logImpl) NewLogStreamAppender(tpid types.TopicID, lsid types.LogStreamID, opts ...LogStreamAppenderOption) (LogStreamAppender, error) {
return v.newLogStreamAppender(context.Background(), tpid, lsid, opts...)
}

func (v *logImpl) Close() (err error) {
if v.closed.Load() {
return
Expand Down
20 changes: 20 additions & 0 deletions pkg/varlog/log_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

201 changes: 201 additions & 0 deletions pkg/varlog/log_stream_appender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package varlog

import (
"context"
"errors"
"fmt"
"sync"

"github.com/puzpuzpuz/xsync/v2"

"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/proto/snpb"
"github.com/kakao/varlog/proto/varlogpb"
)

// LogStreamAppender is a client only to be able to append to a particular log
// stream.
type LogStreamAppender interface {
// AppendBatch appends dataBatch to the given log stream asynchronously.
// Users can call this method without being blocked until the pipeline of
// the LogStreamAppender is full. If the pipeline of the LogStreamAppender
// is already full, it may become blocked. However, the process will
// continue once a response is received from the storage node.
// On completion of AppendBatch, the argument callback provided by users
// will be invoked. All callback functions registered to the same
// LogStreamAppender will be called by the same goroutine sequentially.
// Therefore, the callback should be lightweight. If heavy work is
// necessary for the callback, it would be better to use separate worker
// goroutines.
// The only error from the AppendBatch is ErrClosed, which is returned when
// the LogStreamAppender is already closed. It returns nil even if the
// underlying stream is disconnected and notifies errors via callback.
// It is safe to have multiple goroutines calling AppendBatch
// simultaneously, but the order between them is not guaranteed.
AppendBatch(dataBatch [][]byte, callback BatchCallback) error

// Close closes the LogStreamAppender client. Once the client is closed,
// calling AppendBatch will fail immediately. If AppendBatch still waits
// for room of pipeline, Close will be blocked. It also waits for all
// pending callbacks to be called.
// It's important for users to avoid calling Close within the callback
// function, as it may cause indefinite blocking.
Close()
}

// BatchCallback is a callback function to notify the result of
// AppendBatch.
type BatchCallback func([]varlogpb.LogEntryMeta, error)

type cbQueueEntry struct {
cb BatchCallback
err error
}

func newCallbackQueueEntry() *cbQueueEntry {
return callbackQueueEntryPool.Get().(*cbQueueEntry)
}

func (cqe *cbQueueEntry) Release() {
*cqe = cbQueueEntry{}
callbackQueueEntryPool.Put(cqe)
}

var callbackQueueEntryPool = sync.Pool{
New: func() any {
return &cbQueueEntry{}
},
}

type logStreamAppender struct {
logStreamAppenderConfig
stream snpb.LogIO_AppendClient
cancelFunc context.CancelCauseFunc
sema chan struct{}
cbq chan *cbQueueEntry
wg sync.WaitGroup
closed struct {
xsync.RBMutex
value bool
}
}

var _ LogStreamAppender = (*logStreamAppender)(nil)

func (v *logImpl) newLogStreamAppender(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID, opts ...LogStreamAppenderOption) (LogStreamAppender, error) {
replicas, ok := v.replicasRetriever.Retrieve(tpid, lsid)
if !ok {
return nil, fmt.Errorf("client: log stream %d of topic %d does not exist", lsid, tpid)
}

snid := replicas[0].StorageNodeID
addr := replicas[0].Address
cl, err := v.logCLManager.GetOrConnect(ctx, snid, addr)
if err != nil {
v.allowlist.Deny(tpid, lsid)
return nil, fmt.Errorf("client: %w", err)
}

ctx, cancelFunc := context.WithCancelCause(ctx)
stream, err := cl.AppendStream(ctx)
if err != nil {
cancelFunc(err)
return nil, fmt.Errorf("client: %w", err)
}

cfg := newLogStreamAppenderConfig(opts)
cfg.tpid = tpid
cfg.lsid = lsid
lsa := &logStreamAppender{
logStreamAppenderConfig: cfg,
stream: stream,
sema: make(chan struct{}, cfg.pipelineSize),
cbq: make(chan *cbQueueEntry, cfg.pipelineSize),
cancelFunc: cancelFunc,
}
lsa.wg.Add(1)
go lsa.recvLoop()
return lsa, nil
}

func (lsa *logStreamAppender) AppendBatch(dataBatch [][]byte, callback BatchCallback) error {
rt := lsa.closed.RLock()
defer lsa.closed.RUnlock(rt)
if lsa.closed.value {
return ErrClosed
}

lsa.sema <- struct{}{}

qe := newCallbackQueueEntry()
qe.cb = callback

err := lsa.stream.Send(&snpb.AppendRequest{
TopicID: lsa.tpid,
LogStreamID: lsa.lsid,
Payload: dataBatch,
})
if err != nil {
_ = lsa.stream.CloseSend()
qe.err = err
}
lsa.cbq <- qe
return nil
}

func (lsa *logStreamAppender) Close() {
lsa.cancelFunc(nil)

lsa.closed.Lock()
defer lsa.closed.Unlock()
if lsa.closed.value {
return
}
lsa.closed.value = true

close(lsa.cbq)
lsa.wg.Wait()
}

func (lsa *logStreamAppender) recvLoop() {
defer lsa.wg.Done()

var err error
var meta []varlogpb.LogEntryMeta
var cb BatchCallback
rsp := &snpb.AppendResponse{}

for qe := range lsa.cbq {
meta = nil
err = qe.err
if err != nil {
goto Call
}

rsp.Reset()
err = lsa.stream.RecvMsg(rsp)
if err != nil {
goto Call
}

meta = make([]varlogpb.LogEntryMeta, len(rsp.Results))
for idx, res := range rsp.Results {
if len(res.Error) == 0 {
meta[idx] = res.Meta
continue
}
err = errors.New(res.Error)
break
}
Call:
if qe.cb != nil {
cb = qe.cb
} else {
cb = lsa.defaultBatchCallback
}
if cb != nil {
cb(meta, err)
}
<-lsa.sema
}
}
1 change: 1 addition & 0 deletions pkg/varlog/log_stream_appender_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package varlog
Loading

0 comments on commit 05f590e

Please sign in to comment.