Skip to content

Commit

Permalink
Add set topic configs endpoint (#1065)
Browse files Browse the repository at this point in the history
* proto: update protos for SetTopicConfig

* chore: compile protos

* backend: implement SetTopicConfigurations endpoint
  • Loading branch information
weeco authored Feb 2, 2024
1 parent 6b73b38 commit e724c8d
Show file tree
Hide file tree
Showing 16 changed files with 754 additions and 376 deletions.
258 changes: 258 additions & 0 deletions backend/pkg/api/connect/integration/topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,3 +855,261 @@ func (s *APISuite) TestUpdateTopicConfiguration() {
assert.Error(err)
})
}

func (s *APISuite) TestSetTopicConfiguration() {
t := s.T()
require := require.New(t)
assert := assert.New(t)

t.Run("set topic configuration of a valid topic (connect-go)", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 12*time.Second)
defer cancel()

// 1. Create new topic
topicName := "console-integration-test-update-topic-config-valid-connect-go"
topicConfigs := map[string]*string{
"cleanup.policy": kmsg.StringPtr("delete"),
"retention.bytes": kmsg.StringPtr("1000"),
"compression.type": kmsg.StringPtr("snappy"),
}
_, err := s.kafkaAdminClient.CreateTopic(ctx, 1, 1, topicConfigs, topicName)
require.NoError(err)

defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 6*time.Second)
defer cancel()
_, err := s.kafkaAdminClient.DeleteTopics(ctx, topicName)
assert.NoError(err)
}()

// 2. Update two topic configs where one shall be removed and another set to a different value
client := v1alpha1connect.NewTopicServiceClient(http.DefaultClient, s.httpAddress())
setConfigReq := &v1alpha1.SetTopicConfigurationsRequest{
TopicName: topicName,
Configurations: []*v1alpha1.SetTopicConfigurationsRequest_SetConfiguration{
{
Key: "cleanup.policy",
Value: kmsg.StringPtr("delete"),
},
{
Key: "compression.type",
Value: kmsg.StringPtr("producer"),
},
},
}
response, err := client.SetTopicConfigurations(ctx, connect.NewRequest(setConfigReq))
require.NoError(err)
require.NotNil(response.Msg.Configurations)
assert.GreaterOrEqual(len(response.Msg.Configurations), 10) // We expect at least 10 config props to be returned

// 3. Compare the returned config values against our expectations
var cleanupPolicyConfig *v1alpha1.Topic_Configuration
var compressionTypeConfig *v1alpha1.Topic_Configuration
var retentionBytesConfig *v1alpha1.Topic_Configuration
for _, config := range response.Msg.Configurations {
switch config.Name {
case "cleanup.policy":
cleanupPolicyConfig = config
case "compression.type":
compressionTypeConfig = config
case "retention.bytes":
retentionBytesConfig = config
}
}
require.NotNil(cleanupPolicyConfig)
require.NotNil(compressionTypeConfig)
require.NotNil(retentionBytesConfig)

assert.Equal("delete", *cleanupPolicyConfig.Value)
assert.Equal(v1alpha1.ConfigSource_CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG.String(), cleanupPolicyConfig.Source.String())

assert.Equal(kmsg.StringPtr("producer"), compressionTypeConfig.Value)
assert.Equal(v1alpha1.ConfigSource_CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG.String(), compressionTypeConfig.Source.String())

assert.Equal(v1alpha1.ConfigSource_CONFIG_SOURCE_DEFAULT_CONFIG.String(), retentionBytesConfig.Source.String())
})

t.Run("set topic configuration of a valid topic (http)", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 12*time.Second)
defer cancel()

// 1. Create new topic
topicName := "console-integration-test-update-topic-config-valid-http"
topicConfigs := map[string]*string{
"cleanup.policy": kmsg.StringPtr("delete"),
"retention.bytes": kmsg.StringPtr("1000"),
"compression.type": kmsg.StringPtr("snappy"),
}
_, err := s.kafkaAdminClient.CreateTopic(ctx, 1, 1, topicConfigs, topicName)
require.NoError(err)

defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 6*time.Second)
defer cancel()
_, err := s.kafkaAdminClient.DeleteTopics(ctx, topicName)
assert.NoError(err)
}()

// 2. Update two topic configs where one shall be removed and another set to a different value
type setTopicConfigRequest struct {
Key string `json:"key"`
Value *string `json:"value"`
}
type setTopicConfigResponse struct {
ConfigSynonyms []any `json:"config_synonyms"`
Documentation string `json:"documentation"`
IsReadOnly bool `json:"is_read_only"`
IsSensitive bool `json:"is_sensitive"`
Name string `json:"name"`
Source string `json:"source"`
Type string `json:"type"`
Value *string `json:"value"`
}

var httpRes []setTopicConfigResponse
httpReq := []setTopicConfigRequest{
{
Key: "cleanup.policy",
Value: kmsg.StringPtr("delete"),
},
{
Key: "compression.type",
Value: kmsg.StringPtr("producer"),
},
}

var errResponse string
err = requests.
URL(s.httpAddress() + fmt.Sprintf("/v1alpha1/topics/%v/configurations", topicName)).
BodyJSON(&httpReq).
ToJSON(&httpRes).
Put().
AddValidator(requests.ValidatorHandler(
requests.CheckStatus(http.StatusOK),
requests.ToString(&errResponse),
)).
Fetch(ctx)
assert.Empty(errResponse)
require.NoError(err)
require.NotNil(httpRes)
assert.GreaterOrEqual(len(httpRes), 10) // We expect at least 10 config props to be returned

// 3. Compare the returned config values against our expectations
var cleanupPolicyConfig *setTopicConfigResponse
var compressionTypeConfig *setTopicConfigResponse
var retentionBytesConfig *setTopicConfigResponse
for _, config := range httpRes {
copiedConfig := config
switch config.Name {
case "cleanup.policy":
cleanupPolicyConfig = &copiedConfig
case "compression.type":
compressionTypeConfig = &copiedConfig
case "retention.bytes":
retentionBytesConfig = &copiedConfig
}
}
require.NotNil(cleanupPolicyConfig)
require.NotNil(compressionTypeConfig)
require.NotNil(retentionBytesConfig)

assert.Equal("delete", *cleanupPolicyConfig.Value)
assert.Equal(v1alpha1.ConfigSource_CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG.String(), cleanupPolicyConfig.Source)

assert.Equal("producer", *compressionTypeConfig.Value)
assert.Equal(v1alpha1.ConfigSource_CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG.String(), compressionTypeConfig.Source)

assert.Equal(v1alpha1.ConfigSource_CONFIG_SOURCE_DEFAULT_CONFIG.String(), retentionBytesConfig.Source)
})

t.Run("set topic configuration with an invalid request (connect-go)", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 12*time.Second)
defer cancel()

// 1. Create new topic
topicName := "console-integration-test-update-topic-config-invalid-connect-go"
topicConfigs := map[string]*string{
"cleanup.policy": kmsg.StringPtr("delete"),
"retention.bytes": kmsg.StringPtr("1000"),
"compression.type": kmsg.StringPtr("snappy"),
}
_, err := s.kafkaAdminClient.CreateTopic(ctx, 1, 1, topicConfigs, topicName)
require.NoError(err)

defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 6*time.Second)
defer cancel()
_, err := s.kafkaAdminClient.DeleteTopics(ctx, topicName)
assert.NoError(err)
}()

// 2. Send alter config request with invalid config key
client := v1alpha1connect.NewTopicServiceClient(http.DefaultClient, s.httpAddress())
setConfigReq := &v1alpha1.SetTopicConfigurationsRequest{
TopicName: topicName,
Configurations: []*v1alpha1.SetTopicConfigurationsRequest_SetConfiguration{
{
Key: "key-doesnt-exist",
Value: kmsg.StringPtr("delete"),
},
},
}
response, err := client.SetTopicConfigurations(ctx, connect.NewRequest(setConfigReq))
assert.Error(err)
assert.Nil(response)
assert.Equal(connect.CodeInternal.String(), connect.CodeOf(err).String())
})

t.Run("set topic configuration for a non existent topic (connect-go)", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 12*time.Second)
defer cancel()

// 2. Send alter config request with invalid config key
client := v1alpha1connect.NewTopicServiceClient(http.DefaultClient, s.httpAddress())
setConfigReq := &v1alpha1.SetTopicConfigurationsRequest{
TopicName: "topic-does-not-exist",
Configurations: []*v1alpha1.SetTopicConfigurationsRequest_SetConfiguration{
{
Key: "cleanup.policy",
Value: kmsg.StringPtr("delete"),
},
},
}
response, err := client.SetTopicConfigurations(ctx, connect.NewRequest(setConfigReq))
assert.Error(err)
assert.Nil(response)
assert.Equal(connect.CodeNotFound.String(), connect.CodeOf(err).String())
})

t.Run("set topic configuration for a non existent topic (http)", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 12*time.Second)
defer cancel()

// 2. Update two topic configs where one shall be removed and another set to a different value
type setTopicConfigRequest struct {
Key string `json:"key"`
Value *string `json:"value"`
}
httpReq := []setTopicConfigRequest{
{
Key: "cleanup.policy",
Value: kmsg.StringPtr("delete"),
},
}

var errResponse string
err := requests.
URL(s.httpAddress() + fmt.Sprintf("/v1alpha1/topics/%v/configurations", "topic-does-not-exist")).
BodyJSON(&httpReq).
Put().
AddValidator(requests.ValidatorHandler(
requests.CheckStatus(http.StatusOK),
requests.ToString(&errResponse),
)).
Fetch(ctx)
assert.NotEmpty(errResponse)
require.Error(err)
assert.Contains(errResponse, "RESOURCE_NOT_FOUND")
assert.Truef(requests.HasStatusErr(err, http.StatusNotFound), "Status code should be 404")
})
}
23 changes: 23 additions & 0 deletions backend/pkg/api/connect/service/topic/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,3 +195,26 @@ func (*kafkaClientMapper) kafkaTopicMetadataToProto(topicMetadata kmsg.MetadataR
ReplicationFactor: int32(replicationFactor),
}
}

func (k *kafkaClientMapper) setTopicConfigurationsToKafka(req *v1alpha1.SetTopicConfigurationsRequest) *kmsg.AlterConfigsRequest {
alterConfigResource := kmsg.NewAlterConfigsRequestResource()
alterConfigResource.ResourceType = kmsg.ConfigResourceTypeTopic
alterConfigResource.ResourceName = req.TopicName

for _, config := range req.Configurations {
alterConfigResource.Configs = append(alterConfigResource.Configs, k.setTopicConfigurationsResourceToKafka(config))
}

kafkaReq := kmsg.NewAlterConfigsRequest()
kafkaReq.Resources = []kmsg.AlterConfigsRequestResource{alterConfigResource}

return &kafkaReq
}

func (*kafkaClientMapper) setTopicConfigurationsResourceToKafka(req *v1alpha1.SetTopicConfigurationsRequest_SetConfiguration) kmsg.AlterConfigsRequestResourceConfig {
kafkaReq := kmsg.NewAlterConfigsRequestResourceConfig()
kafkaReq.Name = req.Key
kafkaReq.Value = req.Value

return kafkaReq
}
83 changes: 74 additions & 9 deletions backend/pkg/api/connect/service/topic/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
apierrors "github.com/redpanda-data/console/backend/pkg/api/connect/errors"
"github.com/redpanda-data/console/backend/pkg/config"
"github.com/redpanda-data/console/backend/pkg/console"
commonv1alpha1 "github.com/redpanda-data/console/backend/pkg/protogen/redpanda/api/common/v1alpha1"
v1alpha1 "github.com/redpanda-data/console/backend/pkg/protogen/redpanda/api/dataplane/v1alpha1"
"github.com/redpanda-data/console/backend/pkg/protogen/redpanda/api/dataplane/v1alpha1/dataplanev1alpha1connect"
)
Expand Down Expand Up @@ -230,16 +229,82 @@ func (s *Service) UpdateTopicConfigurations(ctx context.Context, req *connect.Re
return connect.NewResponse(&v1alpha1.UpdateTopicConfigurationsResponse{Configurations: protoTopicConfigs}), nil
}

// SetTopicConfiguration applies the given configuration to a topic, which may reset
// SetTopicConfigurations applies the given configuration to a topic, which may reset
// or overwrite existing configurations that are not provided as part of the request.
// If you want to patch certain configurations use UpdateTopicConfiguration instead.
func (*Service) SetTopicConfiguration(context.Context, *connect.Request[v1alpha1.SetTopicConfigurationRequest]) (*connect.Response[v1alpha1.SetTopicConfigurationResponse], error) {
return nil, apierrors.NewConnectError(
connect.CodeUnimplemented,
errors.New("this endpoint is not yet implemented"),
apierrors.NewErrorInfo(commonv1alpha1.Reason_REASON_INVALID_INPUT.String()),
apierrors.NewHelp(apierrors.NewHelpLinkConsoleReferenceConfig()),
)
func (s *Service) SetTopicConfigurations(ctx context.Context, req *connect.Request[v1alpha1.SetTopicConfigurationsRequest]) (*connect.Response[v1alpha1.SetTopicConfigurationsResponse], error) {
// 1. Map proto request to a Kafka request that can be processed by the Kafka client.
kafkaReq := s.mapper.setTopicConfigurationsToKafka(req.Msg)

// 2. Send incremental alter request and handle errors
alterConfigsRes, err := s.consoleSvc.AlterConfigs(ctx, kafkaReq)
if err != nil {
return nil, apierrors.NewConnectError(
connect.CodeInternal,
err,
apierrors.NewErrorInfo(v1alpha1.Reason_REASON_KAFKA_API_ERROR.String(), apierrors.KeyValsFromKafkaError(err)...),
)
}

if len(alterConfigsRes.Resources) != 1 {
// Should never happen since we only edit configs for one topic, but if it happens we want to err early.
return nil, apierrors.NewConnectError(
connect.CodeInternal,
errors.New("unexpected number of resources in alter configs response"),
apierrors.NewErrorInfo(v1alpha1.Reason_REASON_CONSOLE_ERROR.String(), apierrors.KeyVal{
Key: "retrieved_results",
Value: strconv.Itoa(len(alterConfigsRes.Resources)),
}),
)
}

// Check for inner Kafka error
result := alterConfigsRes.Resources[0]
if connectErr := s.handleKafkaTopicError(result.ErrorCode, result.ErrorMessage); connectErr != nil {
return nil, connectErr
}

// 3. Now let's describe the topic config and return the entire topic configuration.
// This is very similar to GetTopicConfigurations, but we handle errors differently
describeKafkaReq := s.mapper.describeTopicConfigsToKafka(&v1alpha1.GetTopicConfigurationsRequest{TopicName: req.Msg.TopicName})
configsRes, err := s.consoleSvc.DescribeConfigs(ctx, &describeKafkaReq)
if err != nil {
return nil, apierrors.NewConnectError(
connect.CodeInternal,
fmt.Errorf("failed to describe topic configs after successfully applying config change: %w", err),
apierrors.NewErrorInfo(v1alpha1.Reason_REASON_KAFKA_API_ERROR.String(), apierrors.KeyValsFromKafkaError(err)...),
)
}

if len(configsRes.Resources) != 1 {
// Should never happen since we only describe one topic, but if it happens we want to err early.
return nil, apierrors.NewConnectError(
connect.CodeInternal,
errors.New("failed to describe topic configs after successfully applying config change: unexpected number of resources in describe configs response"),
apierrors.NewErrorInfo(v1alpha1.Reason_REASON_CONSOLE_ERROR.String(), apierrors.KeyVal{
Key: "retrieved_resources",
Value: strconv.Itoa(len(configsRes.Resources)),
}),
)
}

// Check for inner Kafka error
describeConfigsResult := configsRes.Resources[0]
if connectErr := s.handleKafkaTopicError(describeConfigsResult.ErrorCode, describeConfigsResult.ErrorMessage); connectErr != nil {
return nil, connectErr
}

// 4. Convert describe topic config response into the proto response
protoTopicConfigs, err := s.mapper.describeTopicConfigsToProto(describeConfigsResult.Configs)
if err != nil {
return nil, apierrors.NewConnectError(
connect.CodeInternal,
err,
apierrors.NewErrorInfo(v1alpha1.Reason_REASON_CONSOLE_ERROR.String()),
)
}

return connect.NewResponse(&v1alpha1.SetTopicConfigurationsResponse{Configurations: protoTopicConfigs}), nil
}

// NewService creates a new user service handler.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,10 @@ func (s *Service) IncrementalAlterConfigs(ctx context.Context,
func (s *Service) IncrementalAlterConfigsKafka(ctx context.Context, req *kmsg.IncrementalAlterConfigsRequest) (*kmsg.IncrementalAlterConfigsResponse, error) {
return s.kafkaSvc.IncrementalAlterConfigs(ctx, req)
}

// AlterConfigs proxies the request/response to set configs (not incrementally) via the Kafka API. The difference
// between AlterConfigs and IncrementalAlterConfigs is that AlterConfigs sets the entire configuration so that
// all properties that are not set as part of this request will be reset to their default values.
func (s *Service) AlterConfigs(ctx context.Context, req *kmsg.AlterConfigsRequest) (*kmsg.AlterConfigsResponse, error) {
return s.kafkaSvc.AlterConfigs(ctx, req)
}
Loading

0 comments on commit e724c8d

Please sign in to comment.