Skip to content

Commit

Permalink
Add Reconciler
Browse files Browse the repository at this point in the history
Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>
  • Loading branch information
pierDipi committed Sep 1, 2020
1 parent e14bba0 commit 31a6be9
Show file tree
Hide file tree
Showing 29 changed files with 619 additions and 263 deletions.
3 changes: 2 additions & 1 deletion control-plane/cmd/kafka-broker-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"knative.dev/pkg/controller"
"knative.dev/pkg/injection/sharedmain"

brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/trigger"
)
Expand All @@ -34,7 +35,7 @@ const (
)

func main() {
brokerEnvConfigs := broker.EnvConfigs{}
brokerEnvConfigs := brokerbase.EnvConfigs{}

if err := envconfig.Process("", &brokerEnvConfigs); err != nil {
log.Fatal("cannot process environment variables", err)
Expand Down
3 changes: 2 additions & 1 deletion control-plane/pkg/apis/eventing/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ package eventing
import "k8s.io/apimachinery/pkg/runtime/schema"

const (
// GroupName defines the group of KafkaSink objects.
GroupName = "eventing.knative.dev"
)

var (
// KafkaSinkResource represents a Knative Kafka Sink.
// KafkaSinksResource represents a Knative Kafka Sink.
KafkaSinksResource = schema.GroupResource{
Group: GroupName,
Resource: "kafkasinks",
Expand Down
1 change: 0 additions & 1 deletion control-plane/pkg/apis/eventing/v1alpha1/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* limitations under the License.
*/


// +k8s:deepcopy-gen=package
// +groupName=eventing.knative.dev
package v1alpha1
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ package v1alpha1

import "context"

// SetDefaults sets KafkaSink defaults.
func (ks *KafkaSink) SetDefaults(ctx context.Context) {
ks.Spec.SetDefaults(ctx)
}

// SetDefaults sets KafkaSinkSpec defaults.
func (kss *KafkaSinkSpec) SetDefaults(ctx context.Context) {
defaultMode := ModeStructured

Expand Down
17 changes: 17 additions & 0 deletions control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,31 @@ const (
ConditionAddressable apis.ConditionType = "Addressable"
ConditionTopicReady apis.ConditionType = "TopicReady"
ConditionConfigMapUpdated apis.ConditionType = "ConfigMapUpdated"
ConditionConfigParsed apis.ConditionType = "ConfigParsed"
)

var conditionSet = apis.NewLivingConditionSet(
ConditionAddressable,
ConditionTopicReady,
ConditionConfigMapUpdated,
ConditionConfigParsed,
)

func (ks *KafkaSink) GetConditionSet() apis.ConditionSet {
return conditionSet
}

func (ks *KafkaSinkStatus) GetConditionSet() apis.ConditionSet {
return conditionSet
}

// SetAddress makes this Kafka Sink addressable by setting the URI. It also
// sets the ConditionAddressable to true.
func (ks *KafkaSinkStatus) SetAddress(url *apis.URL) {
ks.Address.URL = url
if url != nil {
ks.GetConditionSet().Manage(ks).MarkTrue(ConditionAddressable)
} else {
ks.GetConditionSet().Manage(ks).MarkFalse(ConditionAddressable, "nil URL", "URL is nil")
}
}
7 changes: 6 additions & 1 deletion control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
)

const (
ModeBinary = "binary"
// CloudEvents binary content mode.
ModeBinary = "binary"
// CloudEvents structured content mode.
ModeStructured = "structured"
)

Expand Down Expand Up @@ -58,6 +60,7 @@ var _ apis.Defaultable = (*KafkaSink)(nil)
var _ runtime.Object = (*KafkaSink)(nil)
var _ duckv1.KRShaped = (*KafkaSink)(nil)

// KafkaSinkSpec defines the desired state of the Kafka Sink.
type KafkaSinkSpec struct {
Topic string `json:"topic"`

Expand All @@ -71,6 +74,7 @@ type KafkaSinkSpec struct {
ContentMode *string `json:"contentMode,omitempty"`
}

// KafkaSinkStatus represents the current state of the KafkaSink.
type KafkaSinkStatus struct {
// inherits duck/v1 Status, which currently provides:
// * ObservedGeneration - the 'Generation' of the Kafka Sink that was last processed by the controller.
Expand All @@ -82,6 +86,7 @@ type KafkaSinkStatus struct {

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// KafkaSinkList defines a list of Kafka Sink.
type KafkaSinkList struct {
metav1.TypeMeta `json:",inline"`
// +optional
Expand Down
84 changes: 84 additions & 0 deletions control-plane/pkg/reconciler/base/broker/broker_base.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2020 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package broker

import (
"fmt"

"github.com/Shopify/sarama"
"go.uber.org/zap"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// PathFromObject returns an HTTP request path given a generic object.
func PathFromObject(obj metav1.Object) string {
return Path(obj.GetNamespace(), obj.GetName())
}

// Path returns an HTTP request path given namespace and name of an object.
func Path(namespace, name string) string {
return fmt.Sprintf("/%s/%s", namespace, name)
}

// Topic returns a topic name given a topic prefix and a generic object.
func Topic(prefix string, obj metav1.Object) string {
return fmt.Sprintf("%s%s-%s", prefix, obj.GetNamespace(), obj.GetName())
}

// CreateTopic creates a topic with name 'topic' following the TopicConfig configuration passed as parameter.
//
// It returns the topic name or an error.
//
// If the topic already exists, it will return no errors.
func CreateTopic(admin sarama.ClusterAdmin, logger *zap.Logger, topic string, config *TopicConfig) (string, error) {

topicDetail := &sarama.TopicDetail{
NumPartitions: config.TopicDetail.NumPartitions,
ReplicationFactor: config.TopicDetail.ReplicationFactor,
}

logger.Debug("create topic",
zap.String("topic", topic),
zap.Int16("replicationFactor", topicDetail.ReplicationFactor),
zap.Int32("numPartitions", topicDetail.NumPartitions),
)

createTopicError := admin.CreateTopic(topic, topicDetail, false)
if err, ok := createTopicError.(*sarama.TopicError); ok && err.Err == sarama.ErrTopicAlreadyExists {
return topic, nil
}

return topic, createTopicError
}

// NewClusterAdminFunc creates new sarama.ClusterAdmin.
type NewClusterAdminFunc func(addrs []string, config *sarama.Config) (sarama.ClusterAdmin, error)

// GetClusterAdmin create a new sarama.ClusterAdmin.
//
// The caller is responsible for closing the sarama.ClusterAdmin.
func GetClusterAdmin(adminFunc NewClusterAdminFunc, bootstrapServers []string) (sarama.ClusterAdmin, error) {
config := sarama.NewConfig()
config.Version = sarama.MaxVersion

kafkaClusterAdmin, err := adminFunc(bootstrapServers, config)
if err != nil {
return nil, fmt.Errorf("failed to create cluster admin: %w", err)
}

return kafkaClusterAdmin, nil
}
47 changes: 47 additions & 0 deletions control-plane/pkg/reconciler/base/broker/broker_base_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2020 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package broker

import "testing"

func TestPath(t *testing.T) {
type args struct {
namespace string
name string
}
tests := []struct {
name string
args args
want string
}{
{
name: "namespace/name",
args: args{
namespace: "broker-namespace",
name: "broker-name",
},
want: "/broker-namespace/broker-name",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := Path(tt.args.namespace, tt.args.name); got != tt.want {
t.Errorf("Path() = %v, want %v", got, tt.want)
}
})
}
}
Loading

0 comments on commit 31a6be9

Please sign in to comment.