Skip to content

Commit

Permalink
feat(varlogtest): support ListLogStreams API (#607)
Browse files Browse the repository at this point in the history
### What this PR does

This PR implements ListLogStreams for the varlogtest package.
  • Loading branch information
ijsong authored Oct 10, 2023
2 parents 8180573 + c0c2e9d commit 1f42351
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 28 deletions.
44 changes: 30 additions & 14 deletions pkg/varlogtest/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,32 +197,48 @@ func (c *testAdmin) GetLogStream(ctx context.Context, tpid types.TopicID, lsid t
}

func (c *testAdmin) ListLogStreams(ctx context.Context, tpid types.TopicID, opts ...varlog.AdminCallOption) ([]varlogpb.LogStreamDescriptor, error) {
panic("not implemented")
_, lsds, err := c.listLogStreamsInternal(tpid)
if err != nil {
return nil, err
}
return lsds, nil
}

func (c *testAdmin) DescribeTopic(ctx context.Context, topicID types.TopicID, opts ...varlog.AdminCallOption) (*admpb.DescribeTopicResponse, error) {
if err := c.lock(); err != nil {
td, lsds, err := c.listLogStreamsInternal(topicID)
if err != nil {
return nil, err
}
defer c.unlock()
return &admpb.DescribeTopicResponse{
Topic: td,
LogStreams: lsds,
}, nil
}

topicDesc, ok := c.vt.topics[topicID]
if !ok || topicDesc.Status.Deleted() {
return nil, errors.New("no such topic")
func (c *testAdmin) listLogStreamsInternal(tpid types.TopicID) (td varlogpb.TopicDescriptor, lsds []varlogpb.LogStreamDescriptor, err error) {
err = c.lock()
if err != nil {
return
}
defer c.unlock()

rsp := &admpb.DescribeTopicResponse{
Topic: *proto.Clone(&topicDesc).(*varlogpb.TopicDescriptor),
LogStreams: make([]varlogpb.LogStreamDescriptor, len(topicDesc.LogStreams)),
td, ok := c.vt.topics[tpid]
if !ok || td.Status.Deleted() {
err = errors.New("no such topic")
return
}
for i, lsID := range topicDesc.LogStreams {
lsDesc, ok := c.vt.logStreams[lsID]
td = *proto.Clone(&td).(*varlogpb.TopicDescriptor)

lsds = make([]varlogpb.LogStreamDescriptor, len(td.LogStreams))
for i, lsid := range td.LogStreams {
lsd, ok := c.vt.logStreams[lsid]
if !ok {
panic(errors.Errorf("inconsistency: no logstream %d in topic %d", lsID, topicID))
panic(errors.Errorf("inconsistency: no logstream %d in topic %d", lsid, tpid))
}
rsp.LogStreams[i] = *proto.Clone(&lsDesc).(*varlogpb.LogStreamDescriptor)
lsds[i] = *proto.Clone(&lsd).(*varlogpb.LogStreamDescriptor)
}
return rsp, nil

return td, lsds, nil
}

func (c *testAdmin) AddLogStream(_ context.Context, topicID types.TopicID, logStreamReplicas []*varlogpb.ReplicaDescriptor, opts ...varlog.AdminCallOption) (*varlogpb.LogStreamDescriptor, error) {
Expand Down
21 changes: 7 additions & 14 deletions pkg/varlogtest/varlogtest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ func TestVarlogTest(t *testing.T) {
)

// No topic 1
_, err = adm.DescribeTopic(context.Background(), types.TopicID(1))
_, err = adm.ListLogStreams(context.Background(), types.TopicID(1))
require.Error(t, err)

// Add topics
Expand All @@ -375,10 +375,9 @@ func TestVarlogTest(t *testing.T) {
topicIDs = append(topicIDs, topicDesc.TopicID)
topicLogStreamsMap[topicDesc.TopicID] = topicDesc.LogStreams

rsp, err := adm.DescribeTopic(context.Background(), topicDesc.TopicID)
lsds, err := adm.ListLogStreams(context.Background(), topicDesc.TopicID)
require.NoError(t, err)
require.Equal(t, *topicDesc, rsp.Topic)
require.Empty(t, rsp.LogStreams)
require.Empty(t, lsds)
}

// Append logs, but no log stream
Expand Down Expand Up @@ -423,17 +422,11 @@ func TestVarlogTest(t *testing.T) {
}
require.Len(t, snIDSet, replicationFactor)

rsp, err := adm.DescribeTopic(context.Background(), tpID)
lsds, err := adm.ListLogStreams(context.Background(), tpID)
require.NoError(t, err)
require.Contains(t, rsp.Topic.LogStreams, lsDesc.LogStreamID)
require.Condition(t, func() bool {
for _, lsID := range rsp.Topic.LogStreams {
if lsID == lsDesc.LogStreamID {
return true
}
}
return false
})
require.True(t, slices.ContainsFunc(lsds, func(lsd varlogpb.LogStreamDescriptor) bool {
return lsd.LogStreamID == lsDesc.LogStreamID
}))

logStreamIDs = append(logStreamIDs, lsDesc.LogStreamID)
topicLogStreamsMap[tpID] = append(topicLogStreamsMap[tpID], lsDesc.LogStreamID)
Expand Down

0 comments on commit 1f42351

Please sign in to comment.