Skip to content

Commit a775ec0

Browse files
committed
Added new ephem-msk mode
1 parent 0fee57e commit a775ec0

File tree

12 files changed

+596
-261
lines changed

12 files changed

+596
-261
lines changed

apis/cloud.redhat.com/v1alpha1/clowdenvironment_types.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ type MetricsConfig struct {
151151
}
152152

153153
// KafkaMode details the mode of operation of the Clowder Kafka Provider
154-
// +kubebuilder:validation:Enum=managed-ephem;managed;operator;app-interface;local;none
154+
// +kubebuilder:validation:Enum=ephem-msk;managed;operator;app-interface;local;none
155155
type KafkaMode string
156156

157157
// KafkaClusterConfig defines options related to the Kafka cluster managed/monitored by Clowder
@@ -252,6 +252,12 @@ type KafkaConfig struct {
252252
// Managed topic prefix for the managed cluster. Only used in (*_managed_*) mode.
253253
ManagedPrefix string `json:"managedPrefix,omitempty"`
254254

255+
// Namespace that kafkaTopics should be written to for (*_msk_*) mode.
256+
TopicNamespace string `json:"topicNamespace,omitempty"`
257+
258+
// Cluster annotation identifier for (*_msk_*) mode.
259+
ClusterAnnotation string `json:"clusterAnnotation,omitempty"`
260+
255261
// (Deprecated) Defines the cluster name to be used by the Kafka Provider this will
256262
// be used in some modes to locate the Kafka instance.
257263
ClusterName string `json:"clusterName,omitempty"`

build/kube_setup.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ function install_keda_operator {
286286
fi
287287

288288
echo "*** Applying keda-operator manifest ..."
289-
${KUBECTL_CMD} apply -f https://github.com/kedacore/keda/releases/download/v2.10.1/keda-2.10.1.yaml
289+
${KUBECTL_CMD} apply -f https://github.com/kedacore/keda/releases/download/v2.12.0/keda-2.12.0.yaml --server-side
290290

291291
echo "*** Will wait for keda-operator to come up in background"
292292
${KUBECTL_CMD} rollout status deployment/$DEPLOYMENT -n $OPERATOR_NS | sed "s/^/[keda-operator] /" &

config/crd/bases/cloud.redhat.com_clowdenvironments.yaml

+8-1
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,9 @@ spec:
258258
description: Version. If unset, default is '2.5.0'
259259
type: string
260260
type: object
261+
clusterAnnotation:
262+
description: Cluster annotation identifier for (*_msk_*) mode.
263+
type: string
261264
clusterName:
262265
description: (Deprecated) Defines the cluster name to be used
263266
by the Kafka Provider this will be used in some modes to
@@ -342,7 +345,7 @@ spec:
342345
a small instance of Kafka is created in the desired cluster
343346
namespace and configured to auto-create topics.'
344347
enum:
345-
- managed-ephem
348+
- ephem-msk
346349
- managed
347350
- operator
348351
- app-interface
@@ -362,6 +365,10 @@ spec:
362365
suffix:
363366
description: (Deprecated) (Unused)
364367
type: string
368+
topicNamespace:
369+
description: Namespace that kafkaTopics should be written
370+
to for (*_msk_*) mode.
371+
type: string
365372
required:
366373
- mode
367374
type: object

controllers/cloud.redhat.com/providers/kafka/managed.go

+2-79
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,12 @@ package kafka
22

33
import (
44
"fmt"
5-
"strconv"
65

76
crd "github.com/RedHatInsights/clowder/apis/cloud.redhat.com/v1alpha1"
87
"github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/config"
9-
"github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/errors"
108
"github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/providers"
11-
"github.com/RedHatInsights/rhc-osdk-utils/utils"
129

1310
core "k8s.io/api/core/v1"
14-
"k8s.io/apimachinery/pkg/types"
1511
)
1612

1713
type managedKafkaProvider struct {
@@ -36,12 +32,12 @@ func (k *managedKafkaProvider) Provide(app *crd.ClowdApp) error {
3632
var secret *core.Secret
3733
var broker config.BrokerConfig
3834

39-
secret, err = k.getSecret()
35+
secret, err = getSecret(k)
4036
if err != nil {
4137
return err
4238
}
4339

44-
broker, err = k.getBrokerConfig(secret)
40+
broker, err = getBrokerConfig(k, secret)
4541
if err != nil {
4642
return err
4743
}
@@ -68,52 +64,6 @@ func (k *managedKafkaProvider) appendTopic(topic crd.KafkaTopicSpec, kafkaConfig
6864
)
6965
}
7066

71-
func (k *managedKafkaProvider) destructureSecret(secret *core.Secret) (int, string, string, string, string, string, error) {
72-
port, err := strconv.ParseUint(string(secret.Data["port"]), 10, 16)
73-
if err != nil {
74-
return 0, "", "", "", "", "", err
75-
}
76-
password := string(secret.Data["password"])
77-
username := string(secret.Data["username"])
78-
hostname := string(secret.Data["hostname"])
79-
cacert := ""
80-
if val, ok := secret.Data["cacert"]; ok {
81-
cacert = string(val)
82-
}
83-
saslMechanism := "PLAIN"
84-
if val, ok := secret.Data["saslMechanism"]; ok {
85-
saslMechanism = string(val)
86-
}
87-
return int(port), password, username, hostname, cacert, saslMechanism, nil
88-
}
89-
90-
func (k *managedKafkaProvider) getBrokerConfig(secret *core.Secret) (config.BrokerConfig, error) {
91-
broker := config.BrokerConfig{}
92-
93-
port, password, username, hostname, cacert, saslMechanism, err := k.destructureSecret(secret)
94-
if err != nil {
95-
return broker, err
96-
}
97-
98-
saslType := config.BrokerConfigAuthtypeSasl
99-
100-
broker.Hostname = hostname
101-
broker.Port = &port
102-
broker.Authtype = &saslType
103-
if cacert != "" {
104-
broker.Cacert = &cacert
105-
}
106-
broker.Sasl = &config.KafkaSASLConfig{
107-
Password: &password,
108-
Username: &username,
109-
SecurityProtocol: utils.StringPtr("SASL_SSL"),
110-
SaslMechanism: utils.StringPtr(saslMechanism),
111-
}
112-
broker.SecurityProtocol = utils.StringPtr("SASL_SSL")
113-
114-
return broker, nil
115-
}
116-
11767
func (k *managedKafkaProvider) getKafkaConfig(broker config.BrokerConfig, app *crd.ClowdApp) *config.KafkaConfig {
11868
kafkaConfig := &config.KafkaConfig{}
11969
kafkaConfig.Brokers = []config.BrokerConfig{broker}
@@ -126,30 +76,3 @@ func (k *managedKafkaProvider) getKafkaConfig(broker config.BrokerConfig, app *c
12676
return kafkaConfig
12777

12878
}
129-
130-
func (k *managedKafkaProvider) getSecret() (*core.Secret, error) {
131-
secretRef, err := k.getSecretRef()
132-
if err != nil {
133-
return nil, err
134-
}
135-
136-
secret := &core.Secret{}
137-
138-
if err = k.Client.Get(k.Ctx, secretRef, secret); err != nil {
139-
return nil, err
140-
}
141-
142-
return secret, nil
143-
}
144-
145-
func (k *managedKafkaProvider) getSecretRef() (types.NamespacedName, error) {
146-
secretRef := types.NamespacedName{
147-
Name: k.Env.Spec.Providers.Kafka.ManagedSecretRef.Name,
148-
Namespace: k.Env.Spec.Providers.Kafka.ManagedSecretRef.Namespace,
149-
}
150-
nullName := types.NamespacedName{}
151-
if secretRef == nullName {
152-
return nullName, errors.NewClowderError("no secret ref defined for managed Kafka")
153-
}
154-
return secretRef, nil
155-
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
package kafka
2+
3+
import (
4+
"fmt"
5+
"strings"
6+
7+
crd "github.com/RedHatInsights/clowder/apis/cloud.redhat.com/v1alpha1"
8+
strimzi "github.com/RedHatInsights/strimzi-client-go/apis/kafka.strimzi.io/v1beta2"
9+
10+
"github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/clowderconfig"
11+
"github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/config"
12+
"github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/errors"
13+
"github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/providers"
14+
core "k8s.io/api/core/v1"
15+
16+
rc "github.com/RedHatInsights/rhc-osdk-utils/resourceCache"
17+
)
18+
19+
// KafkaTopic is the resource ident for a KafkaTopic object.
20+
var MSKKafkaTopic = rc.NewSingleResourceIdent(ProvName, "msk_kafka_topic", &strimzi.KafkaTopic{}, rc.ResourceOptions{WriteNow: true})
21+
22+
// MSKKafkaConnect is the resource ident for a KafkaConnect object.
23+
var MSKKafkaConnect = rc.NewSingleResourceIdent(ProvName, "msk_kafka_connect", &strimzi.KafkaConnect{}, rc.ResourceOptions{WriteNow: true})
24+
25+
type mskProvider struct {
26+
providers.Provider
27+
}
28+
29+
// NewStrimzi returns a new strimzi provider object.
30+
func NewMSK(p *providers.Provider) (providers.ClowderProvider, error) {
31+
p.Cache.AddPossibleGVKFromIdent(
32+
CyndiPipeline,
33+
CyndiAppSecret,
34+
CyndiHostInventoryAppSecret,
35+
CyndiConfigMap,
36+
MSKKafkaTopic,
37+
MSKKafkaConnect,
38+
)
39+
return &mskProvider{Provider: *p}, nil
40+
}
41+
42+
func (s *mskProvider) EnvProvide() error {
43+
return s.configureBrokers()
44+
}
45+
46+
func (s *mskProvider) Provide(app *crd.ClowdApp) error {
47+
if len(app.Spec.KafkaTopics) == 0 {
48+
return nil
49+
}
50+
51+
s.processTopics(app, s.Config.Kafka)
52+
53+
if app.Spec.Cyndi.Enabled {
54+
err := createCyndiPipeline(s, app, getConnectNamespace(s.Env), getConnectClusterName(s.Env))
55+
if err != nil {
56+
return err
57+
}
58+
}
59+
60+
return nil
61+
}
62+
63+
func (s *mskProvider) getBootstrapServersString() string {
64+
strArray := []string{}
65+
for _, bc := range s.Config.Kafka.Brokers {
66+
if bc.Port != nil {
67+
strArray = append(strArray, fmt.Sprintf("%s:%d", bc.Hostname, *bc.Port))
68+
} else {
69+
strArray = append(strArray, bc.Hostname)
70+
}
71+
}
72+
return strings.Join(strArray, ",")
73+
}
74+
75+
func (s *mskProvider) getKafkaConfig(broker config.BrokerConfig) *config.KafkaConfig {
76+
kafkaConfig := &config.KafkaConfig{}
77+
kafkaConfig.Brokers = []config.BrokerConfig{broker}
78+
kafkaConfig.Topics = []config.TopicConfig{}
79+
80+
return kafkaConfig
81+
82+
}
83+
84+
func (s *mskProvider) configureListeners() error {
85+
var err error
86+
var secret *core.Secret
87+
var broker config.BrokerConfig
88+
89+
secret, err = getSecret(s)
90+
if err != nil {
91+
return err
92+
}
93+
94+
broker, err = getBrokerConfig(s, secret)
95+
if err != nil {
96+
return err
97+
}
98+
99+
s.Config.Kafka = s.getKafkaConfig(broker)
100+
101+
return nil
102+
}
103+
104+
func (s *mskProvider) configureBrokers() error {
105+
// Look up Kafka cluster's listeners and configure s.Config.Brokers
106+
// (we need to know the bootstrap server addresses before provisioning KafkaConnect)
107+
if err := s.configureListeners(); err != nil {
108+
clowdErr := errors.Wrap("unable to determine kafka broker addresses", err)
109+
clowdErr.Requeue = true
110+
return clowdErr
111+
}
112+
113+
if err := configureKafkaConnectCluster(s); err != nil {
114+
return errors.Wrap("failed to provision kafka connect cluster", err)
115+
}
116+
117+
return nil
118+
}
119+
120+
func (s *mskProvider) processTopics(app *crd.ClowdApp, c *config.KafkaConfig) error {
121+
return processTopics(s, app, c)
122+
}
123+
124+
func (s *mskProvider) getConnectClusterUserName() string {
125+
return fmt.Sprintf("%s-connect", s.Env.Name)
126+
}
127+
128+
func (s *mskProvider) KafkaTopicName(topic crd.KafkaTopicSpec, namespace string) string {
129+
if clowderconfig.LoadedConfig.Features.UseComplexStrimziTopicNames {
130+
return fmt.Sprintf("%s-%s-%s", topic.TopicName, s.Env.Name, namespace)
131+
}
132+
return topic.TopicName
133+
}
134+
135+
func (s *mskProvider) KafkaName() string {
136+
return s.Env.Spec.Providers.Kafka.ClusterAnnotation
137+
}
138+
139+
func (s *mskProvider) KafkaNamespace() string {
140+
if s.Env.Spec.Providers.Kafka.TopicNamespace == "" {
141+
return s.Env.Status.TargetNamespace
142+
}
143+
return s.Env.Spec.Providers.Kafka.TopicNamespace
144+
}

0 commit comments

Comments
 (0)