Skip to content

Commit

Permalink
refactor(storagenode): add LogStreamReplicaMetadata to `/varlog.snp…
Browse files Browse the repository at this point in the history
…b.LogIO` to replace `LogStreamMetadata`
  • Loading branch information
ijsong committed Sep 12, 2022
1 parent c4d0c8d commit 4ff870f
Show file tree
Hide file tree
Showing 13 changed files with 761 additions and 98 deletions.
11 changes: 11 additions & 0 deletions internal/storagenode/client/log_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,17 @@ func (c *LogClient) LogStreamMetadata(ctx context.Context, tpid types.TopicID, l
return rsp.GetLogStreamDescriptor(), nil
}

func (c *LogClient) LogStreamReplicaMetadata(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (snpb.LogStreamReplicaMetadataDescriptor, error) {
rsp, err := c.rpcClient.LogStreamReplicaMetadata(ctx, &snpb.LogStreamReplicaMetadataRequest{
TopicID: tpid,
LogStreamID: lsid,
})
if err != nil {
return snpb.LogStreamReplicaMetadataDescriptor{}, fmt.Errorf("logclient: %w", verrors.FromStatusError(err))
}
return rsp.LogStreamReplica, nil
}

// Target returns connected storage node.
func (c *LogClient) Target() varlogpb.StorageNode {
return c.target
Expand Down
13 changes: 13 additions & 0 deletions internal/storagenode/log_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,19 @@ func (ls logServer) TrimDeprecated(ctx context.Context, req *snpb.TrimDeprecated
return &pbtypes.Empty{}, nil
}

func (ls logServer) LogStreamReplicaMetadata(_ context.Context, req *snpb.LogStreamReplicaMetadataRequest) (*snpb.LogStreamReplicaMetadataResponse, error) {
lse, loaded := ls.sn.executors.Load(req.TopicID, req.LogStreamID)
if !loaded {
return nil, errors.New("storage: no such logstream")
}

lsrmd, err := lse.Metadata()
if err != nil {
return nil, err
}
return &snpb.LogStreamReplicaMetadataResponse{LogStreamReplica: lsrmd}, nil
}

func (ls logServer) LogStreamMetadata(_ context.Context, req *snpb.LogStreamMetadataRequest) (*snpb.LogStreamMetadataResponse, error) {
lse, loaded := ls.sn.executors.Load(req.TopicID, req.LogStreamID)
if !loaded {
Expand Down
13 changes: 11 additions & 2 deletions pkg/varlog/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/pkg/util/runner"
"github.com/kakao/varlog/pkg/util/syncutil/atomicutil"
"github.com/kakao/varlog/proto/snpb"
"github.com/kakao/varlog/proto/varlogpb"
)

Expand Down Expand Up @@ -43,9 +44,13 @@ type Log interface {
// It returns an error if the log stream does not exist or fails to
// fetch metadata from all replicas.
//
// FIXME (jun): Change the return type of this method to
// snpb.LogStreamMetadataDescriptor.
// Deprecated: Use LogStreamReplicaMetdata.
LogStreamMetadata(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (varlogpb.LogStreamDescriptor, error)

// LogStreamReplicaMetadata returns metadata of log stream replica
// specified by the arguments tpid and lsid. It returns the first
// successful result among all replicas.
LogStreamReplicaMetadata(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (snpb.LogStreamReplicaMetadataDescriptor, error)
}

type AppendResult struct {
Expand Down Expand Up @@ -181,6 +186,10 @@ func (v *logImpl) LogStreamMetadata(ctx context.Context, tpid types.TopicID, lsi
return v.logStreamMetadata(ctx, tpid, lsid)
}

func (v *logImpl) LogStreamReplicaMetadata(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (snpb.LogStreamReplicaMetadataDescriptor, error) {
return v.logStreamReplicaMetadata(ctx, tpid, lsid)
}

func (v *logImpl) Close() (err error) {
if v.closed.Load() {
return
Expand Down
16 changes: 16 additions & 0 deletions pkg/varlog/log_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions pkg/varlog/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,27 @@ func (v *logImpl) logStreamMetadata(ctx context.Context, tpID types.TopicID, lsI
}
return lsd, err
}

func (v *logImpl) logStreamReplicaMetadata(ctx context.Context, tpID types.TopicID, lsID types.LogStreamID) (snpb.LogStreamReplicaMetadataDescriptor, error) {
replicas, ok := v.replicasRetriever.Retrieve(tpID, lsID)
if !ok {
return snpb.LogStreamReplicaMetadataDescriptor{}, errNoLogStream
}

var err error
for _, replica := range replicas {
cl, cerr := v.logCLManager.GetOrConnect(ctx, replica.StorageNodeID, replica.Address)
if cerr != nil {
err = multierr.Append(err, cerr)
continue
}

lsrmd, cerr := cl.LogStreamReplicaMetadata(ctx, tpID, lsID)
if cerr != nil {
err = multierr.Append(err, cerr)
continue
}
return lsrmd, nil
}
return snpb.LogStreamReplicaMetadataDescriptor{}, err
}
41 changes: 23 additions & 18 deletions pkg/varlogtest/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package varlogtest

import (
"context"
"path/filepath"
"time"

"github.com/gogo/protobuf/proto"
"github.com/gogo/status"
"github.com/pkg/errors"
"google.golang.org/grpc/codes"

"github.com/kakao/varlog/internal/storagenode/volume"
"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/pkg/varlog"
"github.com/kakao/varlog/pkg/verrors"
Expand Down Expand Up @@ -88,33 +90,33 @@ func (c *testAdmin) GetStorageNodes(ctx context.Context, opts ...varlog.AdminCal
return ret, nil
}

// FIXME: Argument snid
func (c *testAdmin) AddStorageNode(ctx context.Context, storageNodeID types.StorageNodeID, addr string, opts ...varlog.AdminCallOption) (*vmspb.StorageNodeMetadata, error) {
func (c *testAdmin) AddStorageNode(_ context.Context, snid types.StorageNodeID, addr string, _ ...varlog.AdminCallOption) (*vmspb.StorageNodeMetadata, error) {
if err := c.lock(); err != nil {
return nil, err
}
defer c.unlock()

// NOTE: Use UTC rather than local to use gogoproto's non-nullable stdtime.
now := time.Now().UTC()
if storageNodeID.Invalid() {
storageNodeID = c.vt.generateStorageNodeID()
if snid.Invalid() {
snid = c.vt.generateStorageNodeID()
}

snpath := filepath.Join("/tmp", volume.StorageNodeDirName(c.vt.clusterID, snid))
storageNodeMetaDesc := snpb.StorageNodeMetadataDescriptor{
ClusterID: c.vt.clusterID,
StorageNode: varlogpb.StorageNode{
StorageNodeID: storageNodeID,
StorageNodeID: snid,
Address: addr,
},
Status: varlogpb.StorageNodeStatusRunning,
Storages: []varlogpb.StorageDescriptor{
{Path: "/tmp"},
{Path: snpath},
},
LogStreamReplicas: nil,
StartTime: now,
}
c.vt.storageNodes[storageNodeID] = storageNodeMetaDesc
c.vt.storageNodes[snid] = storageNodeMetaDesc

return &vmspb.StorageNodeMetadata{
StorageNodeMetadataDescriptor: *proto.Clone(&storageNodeMetaDesc).(*snpb.StorageNodeMetadataDescriptor),
Expand Down Expand Up @@ -219,7 +221,7 @@ func (c *testAdmin) DescribeTopic(ctx context.Context, topicID types.TopicID, op
return rsp, nil
}

func (c *testAdmin) AddLogStream(ctx context.Context, topicID types.TopicID, logStreamReplicas []*varlogpb.ReplicaDescriptor, opts ...varlog.AdminCallOption) (*varlogpb.LogStreamDescriptor, error) {
func (c *testAdmin) AddLogStream(_ context.Context, topicID types.TopicID, logStreamReplicas []*varlogpb.ReplicaDescriptor, opts ...varlog.AdminCallOption) (*varlogpb.LogStreamDescriptor, error) {
if err := c.lock(); err != nil {
return nil, err
}
Expand All @@ -235,35 +237,38 @@ func (c *testAdmin) AddLogStream(ctx context.Context, topicID types.TopicID, log
}

logStreamID := c.vt.generateLogStreamID()
logStreamDesc := varlogpb.LogStreamDescriptor{
lsd := varlogpb.LogStreamDescriptor{
LogStreamID: logStreamID,
TopicID: topicID,
Status: varlogpb.LogStreamStatusRunning,
Replicas: make([]*varlogpb.ReplicaDescriptor, c.vt.replicationFactor),
}

if logStreamReplicas == nil {
snIDs := c.vt.storageNodeIDs()
for i, j := range c.vt.rng.Perm(len(snIDs))[:c.vt.replicationFactor] {
snID := snIDs[j]
logStreamDesc.Replicas[i] = &varlogpb.ReplicaDescriptor{
StorageNodeID: c.vt.storageNodes[snID].StorageNode.StorageNodeID,
StorageNodePath: c.vt.storageNodes[snID].Storages[0].Path,
snids := c.vt.storageNodeIDs()
for i, j := range c.vt.rng.Perm(len(snids))[:c.vt.replicationFactor] {
snid := snids[j]
snpath := c.vt.storageNodes[snid].Storages[0].Path
dataPath := filepath.Join(snpath, volume.LogStreamDirName(topicID, logStreamID))
lsd.Replicas[i] = &varlogpb.ReplicaDescriptor{
StorageNodeID: c.vt.storageNodes[snid].StorageNode.StorageNodeID,
StorageNodePath: c.vt.storageNodes[snid].Storages[0].Path,
DataPath: dataPath,
}
}
} else {
logStreamDesc.Replicas = logStreamReplicas
lsd.Replicas = logStreamReplicas
}

c.vt.logStreams[logStreamID] = logStreamDesc
c.vt.logStreams[logStreamID] = lsd

invalidLogEntry := varlogpb.InvalidLogEntry()
c.vt.localLogEntries[logStreamID] = []*varlogpb.LogEntry{&invalidLogEntry}

topicDesc.LogStreams = append(topicDesc.LogStreams, logStreamID)
c.vt.topics[topicID] = topicDesc

return proto.Clone(&logStreamDesc).(*varlogpb.LogStreamDescriptor), nil
return proto.Clone(&lsd).(*varlogpb.LogStreamDescriptor), nil
}

func (c *testAdmin) UpdateLogStream(ctx context.Context, topicID types.TopicID, logStreamID types.LogStreamID, poppedReplica varlogpb.ReplicaDescriptor, pushedReplica varlogpb.ReplicaDescriptor, opts ...varlog.AdminCallOption) (*varlogpb.LogStreamDescriptor, error) {
Expand Down
60 changes: 60 additions & 0 deletions pkg/varlogtest/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@ package varlogtest
import (
"context"
"io"
"path/filepath"
"sync"

"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"

"github.com/kakao/varlog/internal/storagenode/volume"
"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/pkg/varlog"
"github.com/kakao/varlog/pkg/verrors"
"github.com/kakao/varlog/proto/snpb"
"github.com/kakao/varlog/proto/varlogpb"
)

Expand Down Expand Up @@ -297,6 +300,63 @@ func (c *testLog) LogStreamMetadata(_ context.Context, topicID types.TopicID, lo
return logStreamDesc, nil
}

func (c *testLog) LogStreamReplicaMetadata(_ context.Context, tpid types.TopicID, lsid types.LogStreamID) (snpb.LogStreamReplicaMetadataDescriptor, error) {
if err := c.lock(); err != nil {
return snpb.LogStreamReplicaMetadataDescriptor{}, err
}
defer c.unlock()

topicDesc, ok := c.vt.topics[tpid]
if !ok {
return snpb.LogStreamReplicaMetadataDescriptor{}, errors.New("no such topic")
}

if !topicDesc.HasLogStream(lsid) {
return snpb.LogStreamReplicaMetadataDescriptor{}, errors.New("no such log stream")
}

lsd, ok := c.vt.logStreams[lsid]
if !ok {
return snpb.LogStreamReplicaMetadataDescriptor{}, errors.New("no such log stream")
}

snid := lsd.Replicas[0].StorageNodeID
snmd, ok := c.vt.storageNodes[snid]
if !ok {
return snpb.LogStreamReplicaMetadataDescriptor{}, errors.New("no such storage node")
}

snpath := snmd.Storages[0].Path
dataPath := filepath.Join(snpath, volume.LogStreamDirName(tpid, lsid))

n := len(c.vt.globalLogEntries[tpid])
lastGLSN := c.vt.globalLogEntries[tpid][n-1].GLSN
head, tail := c.vt.peek(tpid, lsid)
return snpb.LogStreamReplicaMetadataDescriptor{
LogStreamReplica: varlogpb.LogStreamReplica{
StorageNode: varlogpb.StorageNode{
StorageNodeID: snid,
},
TopicLogStream: varlogpb.TopicLogStream{
TopicID: tpid,
LogStreamID: lsid,
},
},
Status: varlogpb.LogStreamStatusRunning,
Version: c.vt.version,
GlobalHighWatermark: lastGLSN,
LocalLowWatermark: varlogpb.LogSequenceNumber{
GLSN: head.GLSN,
LLSN: head.LLSN,
},
LocalHighWatermark: varlogpb.LogSequenceNumber{
GLSN: tail.GLSN,
LLSN: tail.LLSN,
},
Path: dataPath,
}, nil
}

type errSubscriber struct {
err error
}
Expand Down
Loading

0 comments on commit 4ff870f

Please sign in to comment.