diff --git a/control-plane/config/eventing-kafka-broker/100-channel/100-kafka-channel.yaml b/control-plane/config/eventing-kafka-broker/100-channel/100-kafka-channel.yaml index f14c64d2ec..ba1d7ea832 100644 --- a/control-plane/config/eventing-kafka-broker/100-channel/100-kafka-channel.yaml +++ b/control-plane/config/eventing-kafka-broker/100-channel/100-kafka-channel.yaml @@ -155,8 +155,28 @@ spec: required: - url properties: + name: + type: string url: type: string + CACerts: + type: string + audience: + type: string + addresses: + description: Kafka Sink is Addressable. It exposes the endpoints as URIs to get events delivered into the Kafka topic. + type: array + items: + type: object + properties: + name: + type: string + url: + type: string + CACerts: + type: string + audience: + type: string annotations: description: Annotations is additional Status fields for the Resource to save some additional State as well as convey more information to the user. This is roughly akin to Annotations on any k8s resource, just the reconciler conveying richer information outwards. type: object diff --git a/control-plane/pkg/reconciler/channel/channel.go b/control-plane/pkg/reconciler/channel/channel.go index fd09240de8..3b1d69e2f5 100644 --- a/control-plane/pkg/reconciler/channel/channel.go +++ b/control-plane/pkg/reconciler/channel/channel.go @@ -93,7 +93,7 @@ type Reconciler struct { ServiceLister corelisters.ServiceLister SubscriptionLister messaginglisters.SubscriptionLister - Prober prober.Prober + Prober prober.NewProber IngressHost string @@ -316,7 +316,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, channel *messagingv1beta } httpAddress := receiver.ChannelHTTPAddress(channelHttpHost) - httpsAddress := receiver.HTTPSAddress(channelHttpsHost, channelService, caCerts) + httpsAddress := receiver.HTTPSAddress(channelHttpsHost, channel, caCerts) // Permissive mode: // - status.address http address with path-based routing // - status.addresses: @@ -334,7 +334,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, channel *messagingv1beta return err } - httpsAddress := receiver.HTTPSAddress(channelHttpsHost, channelService, caCerts) + httpsAddress := receiver.HTTPSAddress(channelHttpsHost, channel, caCerts) addressableStatus.Addresses = []duckv1.Addressable{httpsAddress} addressableStatus.Address = &httpsAddress } else { @@ -343,9 +343,8 @@ func (r *Reconciler) reconcileKind(ctx context.Context, channel *messagingv1beta addressableStatus.Addresses = []duckv1.Addressable{httpAddress} } - address := addressableStatus.Address.URL.URL() - proberAddressable := prober.Addressable{ - Address: address, + proberAddressable := prober.NewAddressable{ + AddressStatus: &addressableStatus, ResourceKey: types.NamespacedName{ Namespace: channel.GetNamespace(), Name: channel.GetName(), @@ -426,9 +425,12 @@ func (r *Reconciler) finalizeKind(ctx context.Context, channel *messagingv1beta1 // See (under discussions KIPs, unlikely to be accepted as they are): // - https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446 // - https://cwiki.apache.org/confluence/display/KAFKA/KIP-286%3A+producer.send%28%29+should+not+block+on+metadata+update - address := receiver.Address(r.IngressHost, channel) - proberAddressable := prober.Addressable{ - Address: address, + address := receiver.HTTPAddress(r.IngressHost, channel) + proberAddressable := prober.NewAddressable{ + AddressStatus: &duckv1.AddressStatus{ + Address: &address, + Addresses: []duckv1.Addressable{address}, + }, ResourceKey: types.NamespacedName{ Namespace: channel.GetNamespace(), Name: channel.GetName(), @@ -667,6 +669,7 @@ func (r *Reconciler) getChannelContractResource(ctx context.Context, topic strin Ingress: &contract.Ingress{ Host: receiver.Host(channel.GetNamespace(), channel.GetName()), EnableAutoCreateEventTypes: feature.FromContext(ctx).IsEnabled(feature.EvenTypeAutoCreate), + Path: receiver.Path(channel.GetNamespace(), channel.GetName()), }, BootstrapServers: config.GetBootstrapServers(), Reference: &contract.Reference{ diff --git a/control-plane/pkg/reconciler/channel/channel_test.go b/control-plane/pkg/reconciler/channel/channel_test.go index 559403e4ba..1496768e61 100644 --- a/control-plane/pkg/reconciler/channel/channel_test.go +++ b/control-plane/pkg/reconciler/channel/channel_test.go @@ -140,7 +140,7 @@ func TestReconcileKind(t *testing.T) { NewConfigMapWithBinaryData(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, nil), }, OtherTestData: map[string]interface{}{ - testProber: probertesting.MockProber(prober.StatusNotReady), + testProber: probertesting.MockNewProber(prober.StatusNotReady), }, }, { @@ -157,7 +157,7 @@ func TestReconcileKind(t *testing.T) { }, WantErr: true, OtherTestData: map[string]interface{}{ - testProber: probertesting.MockProber(prober.StatusReady), + testProber: probertesting.MockNewProber(prober.StatusReady), }, }, { @@ -188,6 +188,7 @@ func TestReconcileKind(t *testing.T) { BootstrapServers: ChannelBootstrapServers, Reference: ChannelReference(), Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, }, @@ -262,6 +263,7 @@ func TestReconcileKind(t *testing.T) { BootstrapServers: ChannelBootstrapServers, Reference: ChannelReference(), Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, EgressConfig: &contract.EgressConfig{ @@ -338,6 +340,7 @@ func TestReconcileKind(t *testing.T) { BootstrapServers: ChannelBootstrapServers, Reference: ChannelReference(), Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, }, @@ -377,7 +380,7 @@ func TestReconcileKind(t *testing.T) { finalizerUpdatedEvent, }, OtherTestData: map[string]interface{}{ - testProber: probertesting.MockProber(prober.StatusNotReady), + testProber: probertesting.MockNewProber(prober.StatusNotReady), }, }, { @@ -408,6 +411,7 @@ func TestReconcileKind(t *testing.T) { BootstrapServers: ChannelBootstrapServers, Reference: ChannelReference(), Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, }, @@ -447,7 +451,7 @@ func TestReconcileKind(t *testing.T) { finalizerUpdatedEvent, }, OtherTestData: map[string]interface{}{ - testProber: probertesting.MockProber(prober.StatusUnknown), + testProber: probertesting.MockNewProber(prober.StatusUnknown), }, }, { @@ -482,6 +486,7 @@ func TestReconcileKind(t *testing.T) { BootstrapServers: ChannelBootstrapServers, Reference: ChannelReference(), Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, Egresses: []*contract.Egress{}, @@ -554,6 +559,7 @@ func TestReconcileKind(t *testing.T) { BootstrapServers: ChannelBootstrapServers, Reference: ChannelReference(), Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, Egresses: []*contract.Egress{{ @@ -631,6 +637,7 @@ func TestReconcileKind(t *testing.T) { BootstrapServers: ChannelBootstrapServers, Reference: ChannelReference(), Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, Egresses: []*contract.Egress{{ @@ -713,6 +720,7 @@ func TestReconcileKind(t *testing.T) { BootstrapServers: ChannelBootstrapServers, Reference: ChannelReference(), Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, Egresses: []*contract.Egress{{ @@ -795,6 +803,7 @@ func TestReconcileKind(t *testing.T) { BootstrapServers: ChannelBootstrapServers, Reference: ChannelReference(), Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, Egresses: []*contract.Egress{{ @@ -883,6 +892,7 @@ func TestReconcileKind(t *testing.T) { BootstrapServers: ChannelBootstrapServers, Reference: ChannelReference(), Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, Egresses: []*contract.Egress{{ @@ -1173,6 +1183,7 @@ func TestReconcileKind(t *testing.T) { BootstrapServers: ChannelBootstrapServers, Reference: ChannelReference(), Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, }, @@ -1270,6 +1281,7 @@ func TestReconcileKind(t *testing.T) { }, }, Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, Egresses: []*contract.Egress{{ @@ -1376,6 +1388,7 @@ func TestReconcileKind(t *testing.T) { }, }, Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, Egresses: []*contract.Egress{{ @@ -1479,6 +1492,7 @@ func TestReconcileKind(t *testing.T) { }, }, Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, Egresses: []*contract.Egress{{ @@ -1557,6 +1571,7 @@ func TestReconcileKind(t *testing.T) { BootstrapServers: ChannelBootstrapServers, Reference: ChannelReference(), Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, }, @@ -1626,6 +1641,7 @@ func TestReconcileKind(t *testing.T) { BootstrapServers: ChannelBootstrapServers, Reference: ChannelReference(), Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, }, @@ -1738,6 +1754,7 @@ func TestReconcileKind(t *testing.T) { BootstrapServers: ChannelBootstrapServers, Reference: ChannelReference(), Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, }, @@ -1823,6 +1840,7 @@ func TestReconcileKind(t *testing.T) { BootstrapServers: ChannelBootstrapServers, Reference: ChannelReference(), Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, EgressConfig: &contract.EgressConfig{ @@ -1864,7 +1882,7 @@ func TestReconcileKind(t *testing.T) { WithChannelAddresses([]duckv1.Addressable{ { Name: pointer.String("https"), - URL: httpsURL(ChannelServiceName, ChannelNamespace), + URL: httpsURL(ChannelName, ChannelNamespace), CACerts: pointer.String(testCaCerts), }, { @@ -1925,6 +1943,7 @@ func TestReconcileKind(t *testing.T) { BootstrapServers: ChannelBootstrapServers, Reference: ChannelReference(), Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, EgressConfig: &contract.EgressConfig{ @@ -1966,13 +1985,13 @@ func TestReconcileKind(t *testing.T) { WithChannelAddresses([]duckv1.Addressable{ { Name: pointer.String("https"), - URL: httpsURL(ChannelServiceName, ChannelNamespace), + URL: httpsURL(ChannelName, ChannelNamespace), CACerts: pointer.String(testCaCerts), }, }), WithChannelAddress(duckv1.Addressable{ Name: pointer.String("https"), - URL: httpsURL(ChannelServiceName, ChannelNamespace), + URL: httpsURL(ChannelName, ChannelNamespace), CACerts: pointer.String(testCaCerts), }), WithChannelAddessable(), @@ -2014,6 +2033,7 @@ func TestFinalizeKind(t *testing.T) { BootstrapServers: ChannelBootstrapServers, Reference: ChannelReference(), Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, }, @@ -2048,7 +2068,7 @@ func TestFinalizeKind(t *testing.T) { }, SkipNamespaceValidation: true, // WantCreates compare the source namespace with configmap namespace, so skip it OtherTestData: map[string]interface{}{ - testProber: probertesting.MockProber(prober.StatusNotReady), + testProber: probertesting.MockNewProber(prober.StatusNotReady), }, }, } @@ -2058,9 +2078,9 @@ func TestFinalizeKind(t *testing.T) { func useTable(t *testing.T, table TableTest, env config.Env) { table.Test(t, NewFactory(&env, func(ctx context.Context, listers *Listers, env *config.Env, row *TableRow) controller.Reconciler { - proberMock := probertesting.MockProber(prober.StatusReady) + proberMock := probertesting.MockNewProber(prober.StatusReady) if p, ok := row.OtherTestData[testProber]; ok { - proberMock = p.(prober.Prober) + proberMock = p.(prober.NewProber) } var featureFlags *apisconfig.KafkaFeatureFlags diff --git a/control-plane/pkg/reconciler/channel/controller.go b/control-plane/pkg/reconciler/channel/controller.go index d03944e145..9da73a4e2c 100644 --- a/control-plane/pkg/reconciler/channel/controller.go +++ b/control-plane/pkg/reconciler/channel/controller.go @@ -18,13 +18,13 @@ package channel import ( "context" - "net/http" "github.com/Shopify/sarama" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/cache" + "knative.dev/eventing/pkg/apis/feature" subscriptioninformer "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription" messagingv1beta "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/messaging/v1beta1" @@ -81,6 +81,9 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf logger := logging.FromContext(ctx) + featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store")) + featureStore.WatchConfigs(watcher) + _, err := reconciler.GetOrCreateDataPlaneConfigMap(ctx) if err != nil { logger.Fatal("Failed to get or create data plane config map", @@ -89,10 +92,25 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf ) } - impl := kafkachannelreconciler.NewImpl(ctx, reconciler) + features := feature.FromContext(ctx) + caCerts, err := reconciler.getCaCerts() + if err != nil && (features.IsStrictTransportEncryption() || features.IsPermissiveTransportEncryption()) { + // We only need to warn here as the broker won't reconcile properly without the proper certs because the prober won't succeed + logger.Warn("Failed to get CA certs when at least one address uses TLS", zap.Error(err)) + } + + impl := kafkachannelreconciler.NewImpl(ctx, reconciler, + func(impl *controller.Impl) controller.Options { + return controller.Options{ + ConfigStore: featureStore, + } + }) IPsLister := prober.IdentityIPsLister() - reconciler.Prober = prober.NewAsync(ctx, http.DefaultClient, "", IPsLister, impl.EnqueueKey) reconciler.IngressHost = network.GetServiceHostname(configs.IngressName, configs.SystemNamespace) + reconciler.Prober, err = prober.NewComposite(ctx, "", "", IPsLister, impl.EnqueueKey, &caCerts) + if err != nil { + logger.Fatal("Failed to create prober", zap.Error(err)) + } channelInformer := kafkachannelinformer.Get(ctx) @@ -115,6 +133,16 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf Handler: controller.HandleAll(globalResync), }) + rotateCACerts := func(obj interface{}) { + newCerts, err := reconciler.getCaCerts() + if err != nil && (features.IsPermissiveTransportEncryption() || features.IsStrictTransportEncryption()) { + // We only need to warn here as the broker won't reconcile properly without the proper certs because the prober won't succeed + logger.Warn("Failed to get new CA certs while rotating CA certs when at least one address uses TLS", zap.Error(err)) + } + reconciler.Prober.RotateRootCaCerts(&newCerts) + globalResync(obj) + } + configmapInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: controller.FilterWithNameAndNamespace(configs.DataPlaneConfigMapNamespace, configs.ContractConfigMapName), Handler: cache.ResourceEventHandlerFuncs{ @@ -128,7 +156,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf secretinformer.Get(ctx).Informer().AddEventHandler(controller.HandleAll(reconciler.Tracker.OnChanged)) secretinformer.Get(ctx).Informer().AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: controller.FilterWithName(kafkaChannelTLSSecretName), - Handler: controller.HandleAll(globalResync), + Handler: controller.HandleAll(rotateCACerts), }) configmapinformer.Get(ctx).Informer().AddEventHandler(controller.HandleAll( diff --git a/control-plane/pkg/reconciler/channel/controller_test.go b/control-plane/pkg/reconciler/channel/controller_test.go index 297c9b352e..5325324b9a 100644 --- a/control-plane/pkg/reconciler/channel/controller_test.go +++ b/control-plane/pkg/reconciler/channel/controller_test.go @@ -86,8 +86,12 @@ func TestNewController(t *testing.T) { configmap.NewStaticWatcher(&corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: apisconfig.FlagsConfigName, + }}, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "config-features", }, }), + configs, ) if controller == nil { diff --git a/control-plane/pkg/reconciler/channel/resources/service.go b/control-plane/pkg/reconciler/channel/resources/service.go index 6b9d1410e2..002b2080e9 100644 --- a/control-plane/pkg/reconciler/channel/resources/service.go +++ b/control-plane/pkg/reconciler/channel/resources/service.go @@ -28,8 +28,12 @@ import ( ) const ( - portName = "http" - portNumber = 80 + portName = "http" + portNumber = 80 + + tlsPortName = "https" + tlsPortNumber = 443 + MessagingRoleLabel = "messaging.knative.dev/role" MessagingRole = "kafka-channel" @@ -86,6 +90,11 @@ func MakeK8sService(kc *v1beta1.KafkaChannel, opts ...ServiceOption) (*corev1.Se Protocol: corev1.ProtocolTCP, Port: portNumber, }, + { + Name: tlsPortName, + Protocol: corev1.ProtocolTCP, + Port: tlsPortNumber, + }, }, }, } diff --git a/control-plane/pkg/reconciler/channel/v2/channelv2.go b/control-plane/pkg/reconciler/channel/v2/channelv2.go index 0df8e3fc0e..3247443b17 100644 --- a/control-plane/pkg/reconciler/channel/v2/channelv2.go +++ b/control-plane/pkg/reconciler/channel/v2/channelv2.go @@ -103,7 +103,7 @@ type Reconciler struct { ServiceLister corelisters.ServiceLister SubscriptionLister messaginglisters.SubscriptionLister - Prober prober.Prober + Prober prober.NewProber IngressHost string @@ -313,10 +313,8 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta addressableStatus.Addresses = []duckv1.Addressable{httpAddress} } - address := addressableStatus.Address.URL.URL() - - proberAddressable := prober.Addressable{ - Address: address, + proberAddressable := prober.NewAddressable{ + AddressStatus: &addressableStatus, ResourceKey: types.NamespacedName{ Namespace: channel.GetNamespace(), Name: channel.GetName(), @@ -420,9 +418,12 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, channel *messagingv1beta1 // See (under discussions KIPs, unlikely to be accepted as they are): // - https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446 // - https://cwiki.apache.org/confluence/display/KAFKA/KIP-286%3A+producer.send%28%29+should+not+block+on+metadata+update - address := receiver.Address(r.IngressHost, channel) - proberAddressable := prober.Addressable{ - Address: address, + address := receiver.HTTPAddress(r.IngressHost, channel) + proberAddressable := prober.NewAddressable{ + AddressStatus: &duckv1.AddressStatus{ + Address: &address, + Addresses: []duckv1.Addressable{address}, + }, ResourceKey: types.NamespacedName{ Namespace: channel.GetNamespace(), Name: channel.GetName(), diff --git a/control-plane/pkg/reconciler/channel/v2/channelv2_test.go b/control-plane/pkg/reconciler/channel/v2/channelv2_test.go index 7676b5871d..90076641a3 100644 --- a/control-plane/pkg/reconciler/channel/v2/channelv2_test.go +++ b/control-plane/pkg/reconciler/channel/v2/channelv2_test.go @@ -135,7 +135,7 @@ func TestReconcileKind(t *testing.T) { NewConfigMapWithBinaryData(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, nil), }, OtherTestData: map[string]interface{}{ - testProber: probertesting.MockProber(prober.StatusNotReady), + testProber: probertesting.MockNewProber(prober.StatusNotReady), }, }, { @@ -152,7 +152,7 @@ func TestReconcileKind(t *testing.T) { }, WantErr: true, OtherTestData: map[string]interface{}{ - testProber: probertesting.MockProber(prober.StatusReady), + testProber: probertesting.MockNewProber(prober.StatusReady), }, }, { @@ -186,7 +186,7 @@ func TestReconcileKind(t *testing.T) { }), }, OtherTestData: map[string]interface{}{ - testProber: probertesting.MockProber(prober.StatusNotReady), + testProber: probertesting.MockNewProber(prober.StatusNotReady), }, }, { @@ -371,7 +371,7 @@ func TestReconcileKind(t *testing.T) { finalizerUpdatedEvent, }, OtherTestData: map[string]interface{}{ - testProber: probertesting.MockProber(prober.StatusNotReady), + testProber: probertesting.MockNewProber(prober.StatusNotReady), }, }, { @@ -424,7 +424,7 @@ func TestReconcileKind(t *testing.T) { finalizerUpdatedEvent, }, OtherTestData: map[string]interface{}{ - testProber: probertesting.MockProber(prober.StatusUnknown), + testProber: probertesting.MockNewProber(prober.StatusUnknown), }, }, { @@ -1907,9 +1907,9 @@ func TestReconcileKind(t *testing.T) { } table.Test(t, NewFactory(&env, func(ctx context.Context, listers *Listers, env *config.Env, row *TableRow) controller.Reconciler { - proberMock := probertesting.MockProber(prober.StatusReady) + proberMock := probertesting.MockNewProber(prober.StatusReady) if p, ok := row.OtherTestData[testProber]; ok { - proberMock = p.(prober.Prober) + proberMock = p.(prober.NewProber) } var featureFlags *apisconfig.KafkaFeatureFlags diff --git a/control-plane/pkg/reconciler/channel/v2/controllerv2.go b/control-plane/pkg/reconciler/channel/v2/controllerv2.go index ee8c2c9cc3..e0b6fda664 100644 --- a/control-plane/pkg/reconciler/channel/v2/controllerv2.go +++ b/control-plane/pkg/reconciler/channel/v2/controllerv2.go @@ -18,12 +18,12 @@ package v2 import ( "context" - "net/http" "github.com/Shopify/sarama" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/cache" + "knative.dev/eventing/pkg/apis/feature" serviceinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/service" "knative.dev/pkg/configmap" "knative.dev/pkg/logging" @@ -90,6 +90,12 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf ) } + features := feature.FromContext(ctx) + caCerts, err := reconciler.getCaCerts() + if err != nil && (features.IsStrictTransportEncryption() || features.IsPermissiveTransportEncryption()) { + logger.Warn("Failed to get CA certs when at least one address uses TLS", zap.Error(err)) + } + impl := kafkachannelreconciler.NewImpl(ctx, reconciler) kafkaConfigStore := apisconfig.NewStore(ctx, func(name string, value *apisconfig.KafkaFeatureFlags) { @@ -99,15 +105,27 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf kafkaConfigStore.WatchConfigs(watcher) IPsLister := prober.IdentityIPsLister() - reconciler.Prober = prober.NewAsync(ctx, http.DefaultClient, "", IPsLister, impl.EnqueueKey) reconciler.IngressHost = network.GetServiceHostname(configs.IngressName, configs.SystemNamespace) reconciler.Resolver = resolver.NewURIResolverFromTracker(ctx, impl.Tracker) + reconciler.Prober, err = prober.NewComposite(ctx, "", "", IPsLister, impl.EnqueueKey, &caCerts) + if err != nil { + logger.Fatal("Failed to create prober", zap.Error(err)) + } + rotateCACerts := func(obj interface{}) { + newCerts, err := reconciler.getCaCerts() + if err != nil && (features.IsPermissiveTransportEncryption() || features.IsStrictTransportEncryption()) { + // We only need to warn here as the broker won't reconcile properly without the proper certs because the prober won't succeed + logger.Warn("Failed to get new CA certs while rotating CA certs when at least one address uses TLS", zap.Error(err)) + } + reconciler.Prober.RotateRootCaCerts(&newCerts) + consumergroup.Enqueue("kafkachannel", impl.EnqueueKey) + } reconciler.Tracker = impl.Tracker secretinformer.Get(ctx).Informer().AddEventHandler(controller.HandleAll(reconciler.Tracker.OnChanged)) secretinformer.Get(ctx).Informer().AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: controller.FilterWithName(kafkaChannelTLSSecretName), - Handler: controller.HandleAll(consumergroup.Enqueue("kafkachannel", impl.EnqueueKey)), + Handler: controller.HandleAll(rotateCACerts), }) reconciler.Tracker = impl.Tracker diff --git a/control-plane/pkg/reconciler/testing/objects_channel.go b/control-plane/pkg/reconciler/testing/objects_channel.go index 026f6220d9..b59b6c1875 100644 --- a/control-plane/pkg/reconciler/testing/objects_channel.go +++ b/control-plane/pkg/reconciler/testing/objects_channel.go @@ -393,6 +393,11 @@ func NewPerChannelService(env *config.Env) *corev1.Service { Protocol: corev1.ProtocolTCP, Port: 80, }, + { + Name: "https", + Protocol: corev1.ProtocolTCP, + Port: 443, + }, }, }, } diff --git a/test/e2e_new/channel_eventing_tls_test.go b/test/e2e_new/channel_eventing_tls_test.go new file mode 100644 index 0000000000..4e0440f4a3 --- /dev/null +++ b/test/e2e_new/channel_eventing_tls_test.go @@ -0,0 +1,48 @@ +//go:build e2e +// +build e2e + +/* + * Copyright 2023 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package e2e_new + +import ( + "testing" + "time" + + "knative.dev/eventing-kafka-broker/test/rekt/features" + "knative.dev/pkg/system" + "knative.dev/reconciler-test/pkg/environment" + "knative.dev/reconciler-test/pkg/eventshub" + "knative.dev/reconciler-test/pkg/k8s" + "knative.dev/reconciler-test/pkg/knative" +) + +func TestChannelTLSCARotation(t *testing.T) { + t.Parallel() + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.Managed(t), + eventshub.WithTLS(t), + environment.WithPollTimings(5*time.Second, 4*time.Minute), + ) + + env.Test(ctx, t, features.RotateChannelTLSCertificates()) +} diff --git a/test/rekt/features/channel_tls.go b/test/rekt/features/channel_tls.go new file mode 100644 index 0000000000..d8cb4f7514 --- /dev/null +++ b/test/rekt/features/channel_tls.go @@ -0,0 +1,105 @@ +/* + * Copyright 2023 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package features + +import ( + "context" + "time" + + cetest "github.com/cloudevents/sdk-go/v2/test" + "github.com/google/uuid" + "k8s.io/apimachinery/pkg/types" + "knative.dev/eventing-kafka-broker/test/rekt/resources/kafkachannel" + "knative.dev/eventing/test/rekt/features/featureflags" + "knative.dev/eventing/test/rekt/resources/addressable" + "knative.dev/eventing/test/rekt/resources/subscription" + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/system" + "knative.dev/reconciler-test/pkg/eventshub" + "knative.dev/reconciler-test/pkg/eventshub/assert" + "knative.dev/reconciler-test/pkg/feature" + "knative.dev/reconciler-test/pkg/resources/service" + "knative.dev/reconciler-test/resources/certificate" +) + +func RotateChannelTLSCertificates() *feature.Feature { + // + ingressCertificateName := "kafka-channel-ingress-server-tls" + ingressSecretName := "kafka-channel-ingress-server-tls" + + channelName := feature.MakeRandomK8sName("channel") + subscriptionName := feature.MakeRandomK8sName("subscription") + sink := feature.MakeRandomK8sName("sink") + source := feature.MakeRandomK8sName("source") + + f := feature.NewFeatureNamed("Rotate Kafka Channel TLS certificate") + + f.Prerequisite("transport encryption is strict", featureflags.TransportEncryptionStrict()) + f.Prerequisite("should not run when Istio is enabled", featureflags.IstioDisabled()) + + f.Setup("Rotate ingress certificate", certificate.Rotate(certificate.RotateCertificate{ + Certificate: types.NamespacedName{ + Namespace: system.Namespace(), + Name: ingressCertificateName, + }, + })) + + f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiverTLS)) + f.Setup("install channel", kafkachannel.Install(channelName, + kafkachannel.WithNumPartitions("3"), + kafkachannel.WithReplicationFactor("1"), + kafkachannel.WithRetentionDuration("P1D"), + )) + f.Setup("channel is ready", kafkachannel.IsReady(channelName)) + + f.Setup("install subscription", func(ctx context.Context, t feature.T) { + d := service.AsDestinationRef(sink) + subscription.Install(subscriptionName, + subscription.WithChannel(&duckv1.KReference{ + Kind: "KafkaChannel", + Name: channelName, + APIVersion: kafkachannel.GVR().GroupVersion().String(), + }), + subscription.WithSubscriberFromDestination(d))(ctx, t) + }) + + f.Setup("subscription is ready", subscription.IsReady(subscriptionName)) + + f.Setup("Channel has HTTPS address", kafkachannel.ValidateAddress(channelName, addressable.AssertHTTPSAddress)) + + event := cetest.FullEvent() + event.SetID(uuid.New().String()) + + f.Requirement("install source", eventshub.Install(source, + eventshub.StartSenderToResourceTLS(kafkachannel.GVR(), channelName, nil), + eventshub.InputEvent(event), + // Send multiple events so that we take into account that the certificate rotation might + // be detected by the server after some time. + eventshub.SendMultipleEvents(100, 3*time.Second), + )) + + f.Assert("Event sent", assert.OnStore(source). + MatchSentEvent(cetest.HasId(event.ID())). + AtLeast(1), + ) + f.Assert("Source match updated peer certificate", assert.OnStore(source). + MatchPeerCertificatesReceived(assert.MatchPeerCertificatesFromSecret(system.Namespace(), ingressSecretName, "tls.crt")). + AtLeast(1), + ) + + return f +} diff --git a/test/rekt/resources/kafkachannel/kafkachannel.go b/test/rekt/resources/kafkachannel/kafkachannel.go index 5ec67d4537..2a5db33c5e 100644 --- a/test/rekt/resources/kafkachannel/kafkachannel.go +++ b/test/rekt/resources/kafkachannel/kafkachannel.go @@ -21,6 +21,9 @@ import ( "embed" "time" + "knative.dev/eventing/test/rekt/resources/addressable" + duckv1 "knative.dev/pkg/apis/duck/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "knative.dev/reconciler-test/pkg/feature" "knative.dev/reconciler-test/pkg/k8s" @@ -34,6 +37,17 @@ func GVR() schema.GroupVersionResource { return schema.GroupVersionResource{Group: "messaging.knative.dev", Version: "v1beta1", Resource: "kafkachannels"} } +func GVK() schema.GroupVersionKind { + return schema.ParseGroupKind(EnvCfg.ChannelGK).WithVersion(EnvCfg.ChannelV) +} + +var EnvCfg EnvConfig + +type EnvConfig struct { + ChannelGK string `envconfig:"CHANNEL_GROUP_KIND" default:"KafkaChannel.messaging.knative.dev" required:"true"` + ChannelV string `envconfig:"CHANNEL_VERSION" default:"v1beta1" required:"true"` +} + // Install will create a KafkaChannel resource, using the latest version, augmented with the config fn options. func Install(name string, opts ...manifest.CfgFn) feature.StepFn { cfg := map[string]interface{}{ @@ -84,3 +98,43 @@ func WithRetentionDuration(retentionDuration string) manifest.CfgFn { cfg["retentionDuration"] = retentionDuration } } + +// AsRef returns a KRef for a Channel without namespace. +func AsRef(name string) *duckv1.KReference { + return &duckv1.KReference{ + Kind: EnvCfg.ChannelGK, + APIVersion: EnvCfg.ChannelV, + Name: name, + } +} + +// AsRef returns a KRef for a Channel without namespace. +func AsDestinationRef(name string) *duckv1.Destination { + return &duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: EnvCfg.ChannelGK, + APIVersion: EnvCfg.ChannelV, + Name: name, + }, + } +} + +// Address returns a Channel's address. +func Address(ctx context.Context, name string, timings ...time.Duration) (*duckv1.Addressable, error) { + return addressable.Address(ctx, GVR(), name, timings...) +} + +// ValidateAddress validates the address retured by Address +func ValidateAddress(name string, validate addressable.ValidateAddress, timings ...time.Duration) feature.StepFn { + return func(ctx context.Context, t feature.T) { + addr, err := Address(ctx, name, timings...) + if err != nil { + t.Error(err) + return + } + if err := validate(addr); err != nil { + t.Error(err) + return + } + } +}