Skip to content

Commit

Permalink
Organize topic name builder function and usages
Browse files Browse the repository at this point in the history
  • Loading branch information
aliok committed Nov 5, 2021
1 parent c108f41 commit 9c1642e
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 27 deletions.
4 changes: 2 additions & 2 deletions control-plane/pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker)
return fmt.Errorf("failed to track secret: %w", err)
}

topicName := kafka.Topic(TopicPrefix, broker)
topicName := kafka.BrokerTopic(TopicPrefix, broker)

saramaConfig, err := kafka.GetSaramaConfig(securityOption)
if err != nil {
Expand Down Expand Up @@ -315,7 +315,7 @@ func (r *Reconciler) finalizeKind(ctx context.Context, broker *eventing.Broker)
}
defer kafkaClusterAdminClient.Close()

topic, err := kafka.DeleteTopic(kafkaClusterAdminClient, kafka.Topic(TopicPrefix, broker))
topic, err := kafka.DeleteTopic(kafkaClusterAdminClient, kafka.BrokerTopic(TopicPrefix, broker))
if err != nil {
return err
}
Expand Down
13 changes: 3 additions & 10 deletions control-plane/pkg/reconciler/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"go.uber.org/multierr"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/util/retry"
Expand Down Expand Up @@ -140,7 +139,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, channel *messagingv1beta
return fmt.Errorf("failed to track secret: %w", err)
}

topicName := topic(TopicPrefix, channel)
topicName := kafka.ChannelTopic(TopicPrefix, channel)

kafkaClusterAdminSaramaConfig, err := kafka.GetSaramaConfig(saramaSecurityOption)
if err != nil {
Expand Down Expand Up @@ -372,7 +371,7 @@ func (r *Reconciler) finalizeKind(ctx context.Context, channel *messagingv1beta1
}
defer kafkaClusterAdminClient.Close()

topic, err := kafka.DeleteTopic(kafkaClusterAdminClient, topic(TopicPrefix, channel))
topic, err := kafka.DeleteTopic(kafkaClusterAdminClient, kafka.ChannelTopic(TopicPrefix, channel))
if err != nil {
return err
}
Expand Down Expand Up @@ -439,7 +438,7 @@ func (r *Reconciler) reconcileInitialOffset(ctx context.Context, channel *messag
return nil
}

topicName := topic(TopicPrefix, channel)
topicName := kafka.ChannelTopic(TopicPrefix, channel)
groupID := consumerGroup(channel, sub)
_, err := offset.InitOffsets(ctx, kafkaClient, kafkaClusterAdmin, []string{topicName}, groupID)
return err
Expand Down Expand Up @@ -537,12 +536,6 @@ func (r *Reconciler) getChannelContractResource(ctx context.Context, topic strin
return resource, nil
}

// topic returns a topic name given a topic prefix and a generic object.
// This function uses a different format than the kafkatopic.Topic function
func topic(prefix string, obj metav1.Object) string {
return fmt.Sprintf("%s.%s.%s", prefix, obj.GetNamespace(), obj.GetName())
}

// consumerGroup returns a consumerGroup name for the given channel and subscription
func consumerGroup(channel *messagingv1beta1.KafkaChannel, sub *v1.SubscriberSpec) string {
return fmt.Sprintf("kafka.%s.%s.%s", channel.Namespace, channel.Name, string(sub.UID))
Expand Down
9 changes: 7 additions & 2 deletions control-plane/pkg/reconciler/kafka/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,16 @@ func BootstrapServersArray(bootstrapServers string) []string {
return bss[:j]
}

// Topic returns a topic name given a topic prefix and a generic object.
func Topic(prefix string, obj metav1.Object) string {
// BrokerTopic returns a topic name given a topic prefix and a Broker.
func BrokerTopic(prefix string, obj metav1.Object) string {
return fmt.Sprintf("%s%s-%s", prefix, obj.GetNamespace(), obj.GetName())
}

// ChannelTopic returns a topic name given a topic prefix and a KafkaChannel.
func ChannelTopic(prefix string, obj metav1.Object) string {
return fmt.Sprintf("%s.%s.%s", prefix, obj.GetNamespace(), obj.GetName())
}

// CreateTopicIfDoesntExist creates a topic with name 'topic' following the TopicConfig configuration passed as parameter.
//
// It returns the topic name or an error.
Expand Down
4 changes: 2 additions & 2 deletions control-plane/pkg/reconciler/kafka/topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func TestTopic(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := Topic(tt.args.prefix, tt.args.obj); got != tt.want {
if got := BrokerTopic(tt.args.prefix, tt.args.obj); got != tt.want {
t.Errorf("Topic() = %v, want %v", got, tt.want)
}
})
Expand Down Expand Up @@ -293,7 +293,7 @@ func TestCreateTopicTopicAlreadyExists(t *testing.T) {
Namespace: "bnamespace",
},
}
topic := Topic("", b)
topic := BrokerTopic("", b)
errMsg := "topic already exists"

ca := &kafkatesting.MockKafkaClusterAdmin{
Expand Down
4 changes: 2 additions & 2 deletions control-plane/pkg/reconciler/testing/objects_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ const (

func BrokerTopic() string {
broker := NewBroker().(metav1.Object)
return kafka.Topic(TopicPrefix, broker)
return kafka.BrokerTopic(TopicPrefix, broker)
}

// NewBroker creates a new Broker with broker class equals to kafka.BrokerClass.
Expand Down Expand Up @@ -181,7 +181,7 @@ func BrokerConfigMapUpdatedReady(configs *Configs) func(broker *eventing.Broker)
func BrokerTopicReady(broker *eventing.Broker) {
broker.GetConditionSet().Manage(broker.GetStatus()).MarkTrueWithReason(
base.ConditionTopicReady,
fmt.Sprintf("Topic %s created", kafka.Topic(TopicPrefix, broker)),
fmt.Sprintf("Topic %s created", kafka.BrokerTopic(TopicPrefix, broker)),
"",
)
}
Expand Down
8 changes: 1 addition & 7 deletions control-plane/pkg/reconciler/testing/objects_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
eventing "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing-kafka-broker/control-plane/pkg/config"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/kafka"
sinkreconciler "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/sink"
)

Expand All @@ -46,8 +45,6 @@ const (
SinkReplicationFactor = 3

SinkNotPresentErrFormat = "failed to describe topics %v: %v"

topicPrefix = "knative-sink-"
)

var (
Expand Down Expand Up @@ -92,10 +89,7 @@ func NewDeletedSink(options ...SinkOption) runtime.Object {
}

func SinkTopic() string {
return kafka.Topic(topicPrefix, &metav1.ObjectMeta{
Name: SinkName,
Namespace: SinkNamespace,
})
return fmt.Sprintf("knative-sink-%s-%s", SinkNamespace, SinkName)
}

func BootstrapServers(bootstrapServers []string) func(sink *eventing.KafkaSink) {
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/broker_trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestBrokerTrigger(t *testing.T) {
BootstrapServers: testingpkg.BootstrapServersPlaintext,
ReplicationFactor: defaultReplicationFactor,
NumPartitions: defaultNumPartitions,
Topic: kafka.Topic(broker.TopicPrefix, br),
Topic: kafka.BrokerTopic(broker.TopicPrefix, br),
}

err := kafkatest.VerifyNumPartitionAndReplicationFactor(
Expand Down Expand Up @@ -253,7 +253,7 @@ func TestBrokerWithConfig(t *testing.T) {
BootstrapServers: testingpkg.BootstrapServersPlaintext,
ReplicationFactor: replicationFactor,
NumPartitions: numPartitions,
Topic: kafka.Topic(broker.TopicPrefix, br),
Topic: kafka.BrokerTopic(broker.TopicPrefix, br),
}

err := kafkatest.VerifyNumPartitionAndReplicationFactor(
Expand Down

0 comments on commit 9c1642e

Please sign in to comment.