Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storagenode): change append rpc from unary to stream #449

Merged
merged 1 commit into from
Jun 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions internal/storagenode/client/log_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,30 @@ func (c *LogClient) reset(rpcConn *rpc.Conn, _ types.ClusterID, target varlogpb.
// The backup indicates the storage nodes that have backup replicas of that log stream.
// It returns valid GLSN if the append completes successfully.
func (c *LogClient) Append(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID, data [][]byte) ([]snpb.AppendResult, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

stream, err := c.rpcClient.Append(ctx)
if err != nil {
return nil, err
}
defer func() {
_ = stream.CloseSend()
}()

req := &snpb.AppendRequest{
TopicID: tpid,
LogStreamID: lsid,
Payload: data,
}
rsp, err := c.rpcClient.Append(ctx, req)
err = stream.Send(req)
if err != nil {
return nil, fmt.Errorf("logclient: %w", verrors.FromStatusError(err))
return nil, err
}

rsp, err := stream.Recv()
if err != nil {
return nil, err
}
return rsp.Results, nil
}
Expand Down
18 changes: 12 additions & 6 deletions internal/storagenode/client/log_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type storageNode struct {
llsn types.LLSN
logEntries map[types.GLSN][]byte
glsnToLLSN map[types.GLSN]types.LLSN
appendReq *snpb.AppendRequest
mu sync.Mutex
}

Expand All @@ -46,16 +47,19 @@ func newMockStorageNodeServiceClient(ctrl *gomock.Controller, sn *storageNode, t
mockClient := mock.NewMockLogIOClient(ctrl)

// Append
mockClient.EXPECT().Append(
gomock.Any(),
gomock.Any(),
gomock.Any(),
).DoAndReturn(func(ctx context.Context, req *snpb.AppendRequest, opts ...grpc.CallOption) (*snpb.AppendResponse, error) {
mockAppendClient := mock.NewMockLogIO_AppendClient(ctrl)
mockAppendClient.EXPECT().Send(gomock.Any()).DoAndReturn(func(req *snpb.AppendRequest) error {
sn.mu.Lock()
defer sn.mu.Unlock()
sn.appendReq = req
return nil
}).AnyTimes()
mockAppendClient.EXPECT().Recv().DoAndReturn(func() (*snpb.AppendResponse, error) {
sn.mu.Lock()
defer sn.mu.Unlock()

rsp := &snpb.AppendResponse{}
for _, buf := range req.GetPayload() {
for _, buf := range sn.appendReq.GetPayload() {
sn.logEntries[sn.glsn] = buf
sn.glsnToLLSN[sn.glsn] = sn.llsn
rsp.Results = append(rsp.Results, snpb.AppendResult{
Expand All @@ -71,6 +75,8 @@ func newMockStorageNodeServiceClient(ctrl *gomock.Controller, sn *storageNode, t
}
return rsp, nil
}).AnyTimes()
mockAppendClient.EXPECT().CloseSend().Return(nil).AnyTimes()
mockClient.EXPECT().Append(gomock.Any(), gomock.Any()).Return(mockAppendClient, nil).AnyTimes()

// Read
mockClient.EXPECT().Read(
Expand Down
20 changes: 20 additions & 0 deletions internal/storagenode/client/testing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package client

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/kakao/varlog/proto/snpb"
"github.com/kakao/varlog/proto/varlogpb"
)

func TestNewLogClient(t *testing.T, rpcClient snpb.LogIOClient, target varlogpb.StorageNode) *LogClient {
require.NotNil(t, rpcClient)
require.False(t, target.StorageNodeID.Invalid())
require.NotEmpty(t, target.Address)
return &LogClient{
rpcClient: rpcClient,
target: target,
}
}
62 changes: 39 additions & 23 deletions internal/storagenode/log_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package storagenode
import (
"context"
"errors"
"io"

pbtypes "github.com/gogo/protobuf/types"
"go.uber.org/multierr"
Expand All @@ -22,33 +23,48 @@ type logServer struct {

var _ snpb.LogIOServer = (*logServer)(nil)

func (ls logServer) Append(ctx context.Context, req *snpb.AppendRequest) (*snpb.AppendResponse, error) {
err := snpb.ValidateTopicLogStream(req)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
func (ls logServer) Append(stream snpb.LogIO_AppendServer) (err error) {
req, rsp := &snpb.AppendRequest{}, &snpb.AppendResponse{}

payload := req.GetPayload()
req.Payload = nil
lse, loaded := ls.sn.executors.Load(req.TopicID, req.LogStreamID)
if !loaded {
return nil, status.Error(codes.NotFound, "no such log stream")
}
for {
req.Reset()
err = stream.RecvMsg(req)
if err == io.EOF {
return nil
}
if err != nil {
return err
}

res, err := lse.Append(ctx, payload)
if err != nil {
var code codes.Code
switch err {
case verrors.ErrSealed:
code = codes.FailedPrecondition
case snerrors.ErrNotPrimary:
code = codes.Unavailable
default:
code = status.FromContextError(err).Code()
err = snpb.ValidateTopicLogStream(req)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}

lse, loaded := ls.sn.executors.Load(req.TopicID, req.LogStreamID)
if !loaded {
return status.Error(codes.NotFound, "no such log stream")
}
res, err := lse.Append(stream.Context(), req.Payload)
if err != nil {
var code codes.Code
switch err {
case verrors.ErrSealed:
code = codes.FailedPrecondition
case snerrors.ErrNotPrimary:
code = codes.Unavailable
default:
code = status.FromContextError(err).Code()
}
return status.Error(code, err.Error())
}

rsp.Results = res
err = stream.Send(rsp)
if err != nil {
return err
}
return nil, status.Error(code, err.Error())
}
return &snpb.AppendResponse{Results: res}, nil
}

func (ls logServer) Read(context.Context, *snpb.ReadRequest) (*snpb.ReadResponse, error) {
Expand Down
74 changes: 23 additions & 51 deletions internal/storagenode/storagenode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,61 +456,45 @@ func TestStorageNode_Append(t *testing.T) {

tcs := []struct {
name string
testf func(t *testing.T, addr string, lc snpb.LogIOClient)
testf func(t *testing.T, addr string, lc *client.LogClient)
}{
{
name: "InvalidTopicID",
testf: func(t *testing.T, _ string, lc snpb.LogIOClient) {
testf: func(t *testing.T, _ string, lc *client.LogClient) {
const invalidTopicID = types.TopicID(0)
_, err := lc.Append(context.Background(), &snpb.AppendRequest{
TopicID: invalidTopicID,
LogStreamID: lsid,
Payload: payload,
})
_, err := lc.Append(context.Background(), invalidTopicID, lsid, payload)
require.Error(t, err)
require.Equal(t, codes.InvalidArgument, status.Code(err))
},
},
{
name: "InvalidLogStreamID",
testf: func(t *testing.T, _ string, lc snpb.LogIOClient) {
testf: func(t *testing.T, _ string, lc *client.LogClient) {
const invalidLogStreamID = types.LogStreamID(0)
_, err := lc.Append(context.Background(), &snpb.AppendRequest{
TopicID: tpid,
LogStreamID: invalidLogStreamID,
Payload: payload,
})
_, err := lc.Append(context.Background(), tpid, invalidLogStreamID, payload)
require.Error(t, err)
require.Equal(t, codes.InvalidArgument, status.Code(err))
},
},
{
name: "NoSuchTopic",
testf: func(t *testing.T, _ string, lc snpb.LogIOClient) {
_, err := lc.Append(context.Background(), &snpb.AppendRequest{
TopicID: tpid + 1,
LogStreamID: lsid,
Payload: payload,
})
testf: func(t *testing.T, _ string, lc *client.LogClient) {
_, err := lc.Append(context.Background(), tpid+1, lsid, payload)
require.Error(t, err)
require.Equal(t, codes.NotFound, status.Code(err))
},
},
{
name: "NoSuchLogStream",
testf: func(t *testing.T, _ string, lc snpb.LogIOClient) {
_, err := lc.Append(context.Background(), &snpb.AppendRequest{
TopicID: tpid,
LogStreamID: lsid + 1,
Payload: payload,
})
testf: func(t *testing.T, _ string, lc *client.LogClient) {
_, err := lc.Append(context.Background(), tpid, lsid+1, payload)
require.Error(t, err)
require.Equal(t, codes.NotFound, status.Code(err))
},
},
{
name: "NotPrimary",
testf: func(t *testing.T, addr string, lc snpb.LogIOClient) {
testf: func(t *testing.T, addr string, lc *client.LogClient) {
lss, lastGLSN := TestSealLogStreamReplica(t, cid, snid, tpid, lsid, types.InvalidGLSN, addr)
require.Equal(t, varlogpb.LogStreamStatusSealed, lss)
require.True(t, lastGLSN.Invalid())
Expand Down Expand Up @@ -538,34 +522,26 @@ func TestStorageNode_Append(t *testing.T) {
},
}, addr)

_, err := lc.Append(context.Background(), &snpb.AppendRequest{
TopicID: tpid,
LogStreamID: lsid,
Payload: payload,
})
_, err := lc.Append(context.Background(), tpid, lsid, payload)
require.Error(t, err)
require.Equal(t, codes.Unavailable, status.Code(err))
},
},
{
name: "Sealed",
testf: func(t *testing.T, addr string, lc snpb.LogIOClient) {
testf: func(t *testing.T, addr string, lc *client.LogClient) {
lss, lastGLSN := TestSealLogStreamReplica(t, cid, snid, tpid, lsid, types.InvalidGLSN, addr)
require.Equal(t, varlogpb.LogStreamStatusSealed, lss)
require.True(t, lastGLSN.Invalid())

_, err := lc.Append(context.Background(), &snpb.AppendRequest{
TopicID: tpid,
LogStreamID: lsid,
Payload: payload,
})
_, err := lc.Append(context.Background(), tpid, lsid, payload)
require.Error(t, err)
require.Equal(t, codes.FailedPrecondition, status.Code(err))
},
},
{
name: "DeadlineExceeded",
testf: func(t *testing.T, addr string, lc snpb.LogIOClient) {
testf: func(t *testing.T, addr string, lc *client.LogClient) {
lss, lastGLSN := TestSealLogStreamReplica(t, cid, snid, tpid, lsid, types.InvalidGLSN, addr)
require.Equal(t, varlogpb.LogStreamStatusSealed, lss)
require.True(t, lastGLSN.Invalid())
Expand All @@ -585,18 +561,14 @@ func TestStorageNode_Append(t *testing.T) {

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
_, err := lc.Append(ctx, &snpb.AppendRequest{
TopicID: tpid,
LogStreamID: lsid,
Payload: payload,
})
_, err := lc.Append(ctx, tpid, lsid, payload)
require.Error(t, err)
require.Equal(t, codes.DeadlineExceeded, status.Code(err))
},
},
{
name: "Canceled",
testf: func(t *testing.T, addr string, lc snpb.LogIOClient) {
testf: func(t *testing.T, addr string, lc *client.LogClient) {
lss, lastGLSN := TestSealLogStreamReplica(t, cid, snid, tpid, lsid, types.InvalidGLSN, addr)
require.Equal(t, varlogpb.LogStreamStatusSealed, lss)
require.True(t, lastGLSN.Invalid())
Expand All @@ -623,11 +595,7 @@ func TestStorageNode_Append(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
_, err := lc.Append(ctx, &snpb.AppendRequest{
TopicID: tpid,
LogStreamID: lsid,
Payload: payload,
})
_, err := lc.Append(ctx, tpid, lsid, payload)
assert.Error(t, err)
assert.Equal(t, codes.Canceled, status.Code(err))
}()
Expand Down Expand Up @@ -663,8 +631,12 @@ func TestStorageNode_Append(t *testing.T) {
defer func() {
require.NoError(t, rpcConn.Close())
}()
lc := snpb.NewLogIOClient(rpcConn.Conn)

lc := client.TestNewLogClient(t, snpb.NewLogIOClient(rpcConn.Conn),
varlogpb.StorageNode{
StorageNodeID: snid,
Address: addr,
},
)
tc.testf(t, addr, lc)
})
}
Expand Down
Loading