diff --git a/pkg/varlogtest/admin.go b/pkg/varlogtest/admin.go index de7aa4cf9..04d55d5ff 100644 --- a/pkg/varlogtest/admin.go +++ b/pkg/varlogtest/admin.go @@ -197,32 +197,48 @@ func (c *testAdmin) GetLogStream(ctx context.Context, tpid types.TopicID, lsid t } func (c *testAdmin) ListLogStreams(ctx context.Context, tpid types.TopicID, opts ...varlog.AdminCallOption) ([]varlogpb.LogStreamDescriptor, error) { - panic("not implemented") + _, lsds, err := c.listLogStreamsInternal(tpid) + if err != nil { + return nil, err + } + return lsds, nil } func (c *testAdmin) DescribeTopic(ctx context.Context, topicID types.TopicID, opts ...varlog.AdminCallOption) (*admpb.DescribeTopicResponse, error) { - if err := c.lock(); err != nil { + td, lsds, err := c.listLogStreamsInternal(topicID) + if err != nil { return nil, err } - defer c.unlock() + return &admpb.DescribeTopicResponse{ + Topic: td, + LogStreams: lsds, + }, nil +} - topicDesc, ok := c.vt.topics[topicID] - if !ok || topicDesc.Status.Deleted() { - return nil, errors.New("no such topic") +func (c *testAdmin) listLogStreamsInternal(tpid types.TopicID) (td varlogpb.TopicDescriptor, lsds []varlogpb.LogStreamDescriptor, err error) { + err = c.lock() + if err != nil { + return } + defer c.unlock() - rsp := &admpb.DescribeTopicResponse{ - Topic: *proto.Clone(&topicDesc).(*varlogpb.TopicDescriptor), - LogStreams: make([]varlogpb.LogStreamDescriptor, len(topicDesc.LogStreams)), + td, ok := c.vt.topics[tpid] + if !ok || td.Status.Deleted() { + err = errors.New("no such topic") + return } - for i, lsID := range topicDesc.LogStreams { - lsDesc, ok := c.vt.logStreams[lsID] + td = *proto.Clone(&td).(*varlogpb.TopicDescriptor) + + lsds = make([]varlogpb.LogStreamDescriptor, len(td.LogStreams)) + for i, lsid := range td.LogStreams { + lsd, ok := c.vt.logStreams[lsid] if !ok { - panic(errors.Errorf("inconsistency: no logstream %d in topic %d", lsID, topicID)) + panic(errors.Errorf("inconsistency: no logstream %d in topic %d", lsid, tpid)) } - rsp.LogStreams[i] = *proto.Clone(&lsDesc).(*varlogpb.LogStreamDescriptor) + lsds[i] = *proto.Clone(&lsd).(*varlogpb.LogStreamDescriptor) } - return rsp, nil + + return td, lsds, nil } func (c *testAdmin) AddLogStream(_ context.Context, topicID types.TopicID, logStreamReplicas []*varlogpb.ReplicaDescriptor, opts ...varlog.AdminCallOption) (*varlogpb.LogStreamDescriptor, error) { diff --git a/pkg/varlogtest/varlogtest_test.go b/pkg/varlogtest/varlogtest_test.go index 9cb838421..543fe9e4e 100644 --- a/pkg/varlogtest/varlogtest_test.go +++ b/pkg/varlogtest/varlogtest_test.go @@ -362,7 +362,7 @@ func TestVarlogTest(t *testing.T) { ) // No topic 1 - _, err = adm.DescribeTopic(context.Background(), types.TopicID(1)) + _, err = adm.ListLogStreams(context.Background(), types.TopicID(1)) require.Error(t, err) // Add topics @@ -375,10 +375,9 @@ func TestVarlogTest(t *testing.T) { topicIDs = append(topicIDs, topicDesc.TopicID) topicLogStreamsMap[topicDesc.TopicID] = topicDesc.LogStreams - rsp, err := adm.DescribeTopic(context.Background(), topicDesc.TopicID) + lsds, err := adm.ListLogStreams(context.Background(), topicDesc.TopicID) require.NoError(t, err) - require.Equal(t, *topicDesc, rsp.Topic) - require.Empty(t, rsp.LogStreams) + require.Empty(t, lsds) } // Append logs, but no log stream @@ -423,17 +422,11 @@ func TestVarlogTest(t *testing.T) { } require.Len(t, snIDSet, replicationFactor) - rsp, err := adm.DescribeTopic(context.Background(), tpID) + lsds, err := adm.ListLogStreams(context.Background(), tpID) require.NoError(t, err) - require.Contains(t, rsp.Topic.LogStreams, lsDesc.LogStreamID) - require.Condition(t, func() bool { - for _, lsID := range rsp.Topic.LogStreams { - if lsID == lsDesc.LogStreamID { - return true - } - } - return false - }) + require.True(t, slices.ContainsFunc(lsds, func(lsd varlogpb.LogStreamDescriptor) bool { + return lsd.LogStreamID == lsDesc.LogStreamID + })) logStreamIDs = append(logStreamIDs, lsDesc.LogStreamID) topicLogStreamsMap[tpID] = append(topicLogStreamsMap[tpID], lsDesc.LogStreamID)