Skip to content

Commit

Permalink
Make configmap reading reusable (#1551)
Browse files Browse the repository at this point in the history
* Rename `configFromConfigMap` to `TopicConfigFromConfigMap`

* Characterization test for broker_config

* Make topicConfig reading code reusable, do the broker specific topicConfig validation in broker package

* Make topicConfig reading code reusable, do the broker specific topicConfig validation in broker package

* Validate topicConfig from configmap in-place

* Fix IDE method extraction problems

* Fix issues after rebase
  • Loading branch information
aliok authored Nov 29, 2021
1 parent bb1e7de commit e956d93
Show file tree
Hide file tree
Showing 9 changed files with 194 additions and 125 deletions.
15 changes: 10 additions & 5 deletions control-plane/pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,12 +364,12 @@ func (r *Reconciler) topicConfig(logger *zap.Logger, broker *eventing.Broker) (*
return nil, nil, fmt.Errorf("failed to get configmap %s/%s: %w", namespace, broker.Spec.Config.Name, err)
}

brokerConfig, err := configFromConfigMap(logger, cm)
topicConfig, err := kafka.TopicConfigFromConfigMap(logger, cm)
if err != nil {
return nil, cm, err
return nil, cm, fmt.Errorf("unable to build topic config from configmap: %w - ConfigMap data: %v", err, cm.Data)
}

return brokerConfig, cm, nil
return topicConfig, cm, nil
}

func (r *Reconciler) defaultTopicDetail() sarama.TopicDetail {
Expand Down Expand Up @@ -431,8 +431,13 @@ func (r *Reconciler) ConfigMapUpdated(ctx context.Context) func(configMap *corev

return func(configMap *corev1.ConfigMap) {

topicConfig, err := configFromConfigMap(logger, configMap)
topicConfig, err := kafka.TopicConfigFromConfigMap(logger, configMap)
if err != nil {
logger.Error("failed to build broker config from configmap",
zap.String("configMap.Namespace", configMap.Namespace),
zap.String("configMap.Name", configMap.Name),
zap.Any("configMap.Data", configMap.Data),
zap.Error(err))
return
}

Expand Down Expand Up @@ -472,7 +477,7 @@ func (r *Reconciler) getDefaultBootstrapServersOrFail() ([]string, error) {
defer r.bootstrapServersLock.RUnlock()

if len(r.bootstrapServers) == 0 {
return nil, fmt.Errorf("no %s provided", BootstrapServersConfigMapKey)
return nil, fmt.Errorf("no %s provided", kafka.BootstrapServersConfigMapKey)
}

return r.bootstrapServers, nil
Expand Down
66 changes: 0 additions & 66 deletions control-plane/pkg/reconciler/broker/broker_config.go

This file was deleted.

Loading

0 comments on commit e956d93

Please sign in to comment.