Skip to content

Commit

Permalink
Add list topics endpoint (#1043)
Browse files Browse the repository at this point in the history
* proto: internalize list topics response type

* backend: add GetMetadata method

* chore: compile protos

* backend: implement ListTopics endpoint

* backend: fix lint issue

* chore: compile merged protos
  • Loading branch information
weeco authored Jan 30, 2024
1 parent b56cb2b commit 47ff54b
Show file tree
Hide file tree
Showing 16 changed files with 656 additions and 523 deletions.
28 changes: 28 additions & 0 deletions backend/pkg/api/connect/service/topic/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,31 @@ func (k *kafkaClientMapper) updateTopicConfigsToKafka(req *v1alpha1.UpdateTopicC

return &kafkaReq, nil
}

func (k *kafkaClientMapper) kafkaMetadataToProto(metadata *kmsg.MetadataResponse) *v1alpha1.ListTopicsResponse {
topics := make([]*v1alpha1.ListTopicsResponse_Topic, len(metadata.Topics))
for i, topicMetadata := range metadata.Topics {
topics[i] = k.kafkaTopicMetadataToProto(topicMetadata)
}
return &v1alpha1.ListTopicsResponse{
Topics: topics,
}
}

func (*kafkaClientMapper) kafkaTopicMetadataToProto(topicMetadata kmsg.MetadataResponseTopic) *v1alpha1.ListTopicsResponse_Topic {
// We iterate through all partitions to figure out the replication factor,
// in case we get an error for the first partitions
replicationFactor := -1
for _, partition := range topicMetadata.Partitions {
if len(partition.Replicas) > replicationFactor {
replicationFactor = len(partition.Replicas)
}
}

return &v1alpha1.ListTopicsResponse_Topic{
Name: *topicMetadata.Topic,
IsInternal: topicMetadata.IsInternal,
PartitionCount: int32(len(topicMetadata.Partitions)),
ReplicationFactor: int32(replicationFactor),
}
}
36 changes: 28 additions & 8 deletions backend/pkg/api/connect/service/topic/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ import (
"fmt"
"net/http"
"strconv"
"strings"

"connectrpc.com/connect"
"github.com/twmb/franz-go/pkg/kmsg"
"go.uber.org/zap"

apierrors "github.com/redpanda-data/console/backend/pkg/api/connect/errors"
Expand All @@ -39,14 +41,32 @@ type Service struct {
mapper kafkaClientMapper
}

// ListTopics lists all Kafka topics.
func (*Service) ListTopics(context.Context, *connect.Request[v1alpha1.ListTopicsRequest]) (*connect.Response[v1alpha1.ListTopicsResponse], error) {
return nil, apierrors.NewConnectError(
connect.CodeUnimplemented,
errors.New("this endpoint is not yet implemented"),
apierrors.NewErrorInfo(commonv1alpha1.Reason_REASON_INVALID_INPUT.String()),
apierrors.NewHelp(apierrors.NewHelpLinkConsoleReferenceConfig()),
)
// ListTopics lists all Kafka topics with their most important metadata.
func (s *Service) ListTopics(ctx context.Context, req *connect.Request[v1alpha1.ListTopicsRequest]) (*connect.Response[v1alpha1.ListTopicsResponse], error) {
kafkaReq := kmsg.NewMetadataRequest()
kafkaRes, err := s.consoleSvc.GetMetadata(ctx, &kafkaReq)
if err != nil {
return nil, apierrors.NewConnectError(
connect.CodeInternal,
err,
apierrors.NewErrorInfo(v1alpha1.Reason_REASON_KAFKA_API_ERROR.String(), apierrors.KeyValsFromKafkaError(err)...),
)
}

// Filter topics if a filter is set
if req.Msg.Filter != nil && req.Msg.Filter.NameContains != "" {
filteredTopics := make([]kmsg.MetadataResponseTopic, 0, len(kafkaRes.Topics))
for _, topic := range kafkaRes.Topics {
if strings.Contains(*topic.Topic, req.Msg.Filter.NameContains) {
filteredTopics = append(filteredTopics, topic)
}
}
kafkaRes.Topics = filteredTopics
}

protoResponse := s.mapper.kafkaMetadataToProto(kafkaRes)

return connect.NewResponse(protoResponse), nil
}

// DeleteTopic deletes a Kafka topic.
Expand Down
2 changes: 1 addition & 1 deletion backend/pkg/console/broker_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type BrokerConfigSynonym struct {

// GetAllBrokerConfigs retrieves broker configs.
func (s *Service) GetAllBrokerConfigs(ctx context.Context) (map[int32]BrokerConfig, error) {
metadata, err := s.kafkaSvc.GetMetadata(ctx, nil)
metadata, err := s.kafkaSvc.GetMetadataTopics(ctx, nil)
if err != nil {
return nil, fmt.Errorf("failed to get broker ids: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion backend/pkg/console/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (s *Service) GetClusterInfo(ctx context.Context) (*ClusterInfo, error) {

eg.Go(func() error {
var err error
metadata, err = s.kafkaSvc.GetMetadata(childCtx, nil)
metadata, err = s.kafkaSvc.GetMetadataTopics(childCtx, nil)
if err != nil {
return err
}
Expand Down
21 changes: 21 additions & 0 deletions backend/pkg/console/get_metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package console

import (
"context"

"github.com/twmb/franz-go/pkg/kmsg"
)

// GetMetadata proxies the get metadata Kafka request/response.
func (s *Service) GetMetadata(ctx context.Context, kafkaReq *kmsg.MetadataRequest) (*kmsg.MetadataResponse, error) {
return s.kafkaSvc.GetMetadata(ctx, kafkaReq)
}
2 changes: 1 addition & 1 deletion backend/pkg/console/list_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s *Service) ListMessages(ctx context.Context, listReq ListMessageRequest,

progress.OnPhase("Get Partitions")
// Create array of partitionIDs which shall be consumed (always do that to ensure the requested topic exists at all)
metadata, restErr := s.kafkaSvc.GetSingleMetadata(ctx, listReq.TopicName)
metadata, restErr := s.kafkaSvc.GetSingleTopicMetadata(ctx, listReq.TopicName)
if restErr != nil {
return fmt.Errorf("failed to get partitions: %w", restErr.Err)
}
Expand Down
2 changes: 1 addition & 1 deletion backend/pkg/console/list_offsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type PartitionOffset struct {
// ListOffsets lists partition offsets (either earliest or latest, depending on the timestamp parameter)
// of one or topic names.
func (s *Service) ListOffsets(ctx context.Context, topicNames []string, timestamp int64) ([]TopicOffset, error) {
metadata, err := s.kafkaSvc.GetMetadata(ctx, topicNames)
metadata, err := s.kafkaSvc.GetMetadataTopics(ctx, topicNames)
if err != nil {
return nil, fmt.Errorf("failed to request partition info for topics")
}
Expand Down
2 changes: 2 additions & 0 deletions backend/pkg/console/servicer.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ type Servicer interface {
DescribeConfigs(ctx context.Context, req *kmsg.DescribeConfigsRequest) (*kmsg.DescribeConfigsResponse, error)
// DeleteTopics proxies the request/response to delete topics via the Kafka API.
DeleteTopics(ctx context.Context, deleteReq *kmsg.DeleteTopicsRequest) (*kmsg.DeleteTopicsResponse, error)
// GetMetadata proxies the request/response to retrieve metadata via the Kafka API.
GetMetadata(ctx context.Context, metadataReq *kmsg.MetadataRequest) (*kmsg.MetadataResponse, error)
// IncrementalAlterConfigsKafka proxies the request/response to incrementally alter configs via the Kafka API.
IncrementalAlterConfigsKafka(ctx context.Context, req *kmsg.IncrementalAlterConfigsRequest) (*kmsg.IncrementalAlterConfigsResponse, error)
}
4 changes: 2 additions & 2 deletions backend/pkg/console/topic_overview.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type TopicSummary struct {
// GetTopicsOverview returns a TopicSummary for all Kafka Topics
func (s *Service) GetTopicsOverview(ctx context.Context) ([]*TopicSummary, error) {
// 1. Request metadata
metadata, err := s.kafkaSvc.GetMetadata(ctx, nil)
metadata, err := s.kafkaSvc.GetMetadataTopics(ctx, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -139,7 +139,7 @@ func (s *Service) GetTopicsOverview(ctx context.Context) ([]*TopicSummary, error
func (s *Service) GetAllTopicNames(ctx context.Context, metadata *kmsg.MetadataResponse) ([]string, error) {
if metadata == nil {
var err error
metadata, err = s.kafkaSvc.GetMetadata(ctx, nil)
metadata, err = s.kafkaSvc.GetMetadataTopics(ctx, nil)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion backend/pkg/console/topic_partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (s *Service) GetTopicDetails(ctx context.Context, topicNames []string) ([]T
}

func (s *Service) getTopicPartitionMetadata(ctx context.Context, topicNames []string) (map[string]TopicDetails, *rest.Error) {
metadata, err := s.kafkaSvc.GetMetadata(ctx, topicNames)
metadata, err := s.kafkaSvc.GetMetadataTopics(ctx, topicNames)
if err != nil {
return nil, &rest.Error{
Err: err,
Expand Down
15 changes: 10 additions & 5 deletions backend/pkg/kafka/get_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
"github.com/twmb/franz-go/pkg/kmsg"
)

// GetMetadata returns some generic information about the brokers in the given cluster
func (s *Service) GetMetadata(ctx context.Context, topics []string) (*kmsg.MetadataResponse, error) {
// GetMetadataTopics returns some generic information about the brokers in the given cluster
func (s *Service) GetMetadataTopics(ctx context.Context, topics []string) (*kmsg.MetadataResponse, error) {
metadataRequestTopics := make([]kmsg.MetadataRequestTopic, len(topics))
for i, topic := range topics {
topicReq := kmsg.NewMetadataRequestTopic()
Expand All @@ -40,9 +40,9 @@ func (s *Service) GetMetadata(ctx context.Context, topics []string) (*kmsg.Metad
return req.RequestWith(ctx, s.KafkaClient)
}

// GetSingleMetadata returns metadata for a single topic.
func (s *Service) GetSingleMetadata(ctx context.Context, topic string) (kmsg.MetadataResponseTopic, *rest.Error) {
metadata, err := s.GetMetadata(ctx, []string{topic})
// GetSingleTopicMetadata returns metadata for a single topic.
func (s *Service) GetSingleTopicMetadata(ctx context.Context, topic string) (kmsg.MetadataResponseTopic, *rest.Error) {
metadata, err := s.GetMetadataTopics(ctx, []string{topic})
if err != nil {
return kmsg.MetadataResponseTopic{}, &rest.Error{
Err: err,
Expand Down Expand Up @@ -80,3 +80,8 @@ func (s *Service) GetSingleMetadata(ctx context.Context, topic string) (kmsg.Met

return topicMetadata, nil
}

// GetMetadata executes the metadata request.
func (s *Service) GetMetadata(ctx context.Context, req *kmsg.MetadataRequest) (*kmsg.MetadataResponse, error) {
return req.RequestWith(ctx, s.KafkaClient)
}
Loading

0 comments on commit 47ff54b

Please sign in to comment.