Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

E2E tests for channel: TLS key pair rotation #3406

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
f5ac537
Save work progress
Leo6Leo Oct 17, 2023
26758d9
Expose the TLS port
Leo6Leo Oct 17, 2023
faa9c76
Adding the logger to see what is happening
Leo6Leo Oct 17, 2023
4c63fd3
Java - Adding the debugging information
Leo6Leo Oct 18, 2023
006ec33
Merge branch 'main' into channel-rekt-keypair-rotation
Leo6Leo Oct 18, 2023
10678c9
Adding the path to the contract
Leo6Leo Oct 20, 2023
3280c52
Comment out the certificate rotation test portion
Leo6Leo Oct 20, 2023
85ed59a
Resolve the source certificate not found issue
Leo6Leo Oct 20, 2023
dc9f1f6
Fix the issue in the test
Leo6Leo Oct 20, 2023
aaaf376
Update control-plane/pkg/prober/prober.go
Leo6Leo Oct 20, 2023
9fff9f7
Update control-plane/pkg/reconciler/channel/channel.go
Leo6Leo Oct 20, 2023
b086371
Update control-plane/pkg/reconciler/channel/channel.go
Leo6Leo Oct 20, 2023
add2b7c
Update control-plane/pkg/reconciler/channel/resources/service.go
Leo6Leo Oct 20, 2023
5ff4cca
Update data-plane/receiver/src/main/java/dev/knative/eventing/kafka/b…
Leo6Leo Oct 20, 2023
855cfbf
Fix the inconsistent varable name
Leo6Leo Oct 20, 2023
cb94ca4
Merge branch 'main' into channel-rekt-keypair-rotation
Leo6Leo Oct 20, 2023
bf7a082
Fix the failed build issue
Leo6Leo Oct 20, 2023
b4dd5ef
Remove the logger
Leo6Leo Oct 20, 2023
becbf40
Run formatting
Leo6Leo Oct 20, 2023
18982a1
Update data-plane/receiver/src/main/java/dev/knative/eventing/kafka/b…
Leo6Leo Oct 20, 2023
228d0dc
Remove the logger
Leo6Leo Oct 20, 2023
316ca1c
Code gen
Leo6Leo Oct 20, 2023
25f0a30
Merge branch 'main' into channel-rekt-keypair-rotation
Leo6Leo Oct 23, 2023
1d22a88
Update control-plane/pkg/reconciler/channel/channel.go
Leo6Leo Oct 23, 2023
a9f57d0
Remove the uncessary code
Leo6Leo Oct 23, 2023
71a7b69
Fix the failing reconciler tests due to the missing newly added filed…
Leo6Leo Oct 23, 2023
feb7c5a
Merge remote-tracking branch 'origin/channel-rekt-keypair-rotation' i…
Leo6Leo Oct 23, 2023
92f4efb
Format fix
Leo6Leo Oct 23, 2023
a26cb6e
Merge branch 'main' into channel-rekt-keypair-rotation
Leo6Leo Oct 23, 2023
0784406
Merge main branch
Leo6Leo Oct 25, 2023
8c94e7d
Merge branch 'main' into channel-rekt-keypair-rotation
Leo6Leo Oct 26, 2023
35a647a
Instead of using channel service name, we directly use channel name f…
Leo6Leo Oct 26, 2023
939a589
Instead of using channel service name, we directly use channel name f…
Leo6Leo Oct 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions control-plane/pkg/prober/prober.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,15 @@ func probe(ctx context.Context, client httpClient, logger *zap.Logger, address s

if response.StatusCode != http.StatusOK {
logger.Info("Resource not ready", zap.Int("statusCode", response.StatusCode))
logger.Info("[haha] Response body", zap.Any("response", response.Body))
logger.Info("[haha] Response header", zap.Any("response", response.Header))
logger.Info("[haha] Response status", zap.Any("response", response.Status))
logger.Info("[haha] Response proto", zap.Any("response", response.Proto))
logger.Info("[haha] Response proto major", zap.Any("response", response.ProtoMajor))
logger.Info("[haha] Response proto minor", zap.Any("response", response.ProtoMinor))
logger.Info("[haha] Response transfer encoding", zap.Any("response", response.Request))
logger.Info("[haha] Response transfer encoding", zap.Any("response", response.ContentLength))

Leo6Leo marked this conversation as resolved.
Show resolved Hide resolved
return StatusNotReady
}

Expand Down
7 changes: 6 additions & 1 deletion control-plane/pkg/reconciler/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,12 +697,17 @@ func (r *Reconciler) getChannelContractResource(ctx context.Context, topic strin
}

func (r *Reconciler) reconcileChannelService(ctx context.Context, channel *messagingv1beta1.KafkaChannel) (*corev1.Service, error) {
expected, err := resources.MakeK8sService(channel, resources.ExternalService(r.DataPlaneNamespace, NewChannelIngressServiceName))
logger := kafkalogging.CreateReconcileMethodLogger(ctx, channel)
expected, err := resources.MakeK8sService(channel, logger, resources.ExternalService(r.DataPlaneNamespace, NewChannelIngressServiceName))
if err != nil {
return expected, fmt.Errorf("failed to create the channel service object: %w", err)
}

logger.Debug("[hahap] Channel service created", zap.Any("service", expected))
Leo6Leo marked this conversation as resolved.
Show resolved Hide resolved

svc, err := r.ServiceLister.Services(channel.Namespace).Get(resources.MakeChannelServiceName(channel.Name))

Leo6Leo marked this conversation as resolved.
Show resolved Hide resolved
logger.Debug("[hahap] Channel service got", zap.Any("service", svc))
Leo6Leo marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
if apierrors.IsNotFound(err) {
_, err = r.KubeClient.CoreV1().Services(channel.Namespace).Create(ctx, expected, metav1.CreateOptions{})
Expand Down
12 changes: 11 additions & 1 deletion control-plane/pkg/reconciler/channel/controller.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*

Check failure on line 1 in control-plane/pkg/reconciler/channel/controller.go

View workflow job for this annotation

GitHub Actions / style / Golang / Auto-format and Check

Please run goimports. diff --git a/control-plane/pkg/reconciler/channel/controller.go b/control-plane/pkg/reconciler/channel/controller.go index 581313f..46b1121 100644 --- a/control-plane/pkg/reconciler/channel/controller.go +++ b/control-plane/pkg/reconciler/channel/controller.go @@ -18,6 +18,7 @@ package channel import ( "context" + "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" "github.com/IBM/sarama"
* Copyright 2021 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -18,6 +18,7 @@

import (
"context"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka"

"github.com/IBM/sarama"
"go.uber.org/zap"
Expand Down Expand Up @@ -81,6 +82,9 @@

logger := logging.FromContext(ctx)

featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"))
featureStore.WatchConfigs(watcher)

_, err := reconciler.GetOrCreateDataPlaneConfigMap(ctx)
if err != nil {
logger.Fatal("Failed to get or create data plane config map",
Expand All @@ -96,7 +100,13 @@
logger.Warn("Failed to get CA certs when at least one address uses TLS", zap.Error(err))
}

impl := kafkachannelreconciler.NewImpl(ctx, reconciler)
impl := kafkachannelreconciler.NewImpl(ctx, reconciler,
func(impl *controller.Impl) controller.Options {
return controller.Options{
ConfigStore: featureStore,
PromoteFilterFunc: kafka.BrokerClassFilter(),
Leo6Leo marked this conversation as resolved.
Show resolved Hide resolved
}
})
IPsLister := prober.IdentityIPsLister()
reconciler.IngressHost = network.GetServiceHostname(configs.IngressName, configs.SystemNamespace)
reconciler.Prober, err = prober.NewComposite(ctx, "", "", IPsLister, impl.EnqueueKey, &caCerts)
Expand Down
4 changes: 4 additions & 0 deletions control-plane/pkg/reconciler/channel/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,12 @@ func TestNewController(t *testing.T) {
configmap.NewStaticWatcher(&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: apisconfig.FlagsConfigName,
}}, &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "config-features",
},
}),

configs,
)
if controller == nil {
Expand Down
17 changes: 14 additions & 3 deletions control-plane/pkg/reconciler/channel/resources/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package resources
import (
"fmt"

"go.uber.org/zap"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/pkg/kmeta"
Expand All @@ -28,8 +30,12 @@ import (
)

const (
portName = "http"
portNumber = 80
portName = "http"
portNumber = 80

TLSportName = "https"
TLSportNumber = 443
Leo6Leo marked this conversation as resolved.
Show resolved Hide resolved

MessagingRoleLabel = "messaging.knative.dev/role"
MessagingRole = "kafka-channel"

Expand Down Expand Up @@ -62,7 +68,7 @@ func ExternalService(namespace, service string) ServiceOption {
// MakeK8sService creates a new K8s Service for a Channel resource. It also sets the appropriate
// OwnerReferences on the resource so handleObject can discover the Channel resource that 'owns' it.
// As well as being garbage collected when the Channel is deleted.
func MakeK8sService(kc *v1beta1.KafkaChannel, opts ...ServiceOption) (*corev1.Service, error) {
func MakeK8sService(kc *v1beta1.KafkaChannel, logger *zap.Logger, opts ...ServiceOption) (*corev1.Service, error) {
Leo6Leo marked this conversation as resolved.
Show resolved Hide resolved
// Add annotations
svc := &corev1.Service{
TypeMeta: metav1.TypeMeta{
Expand All @@ -86,6 +92,11 @@ func MakeK8sService(kc *v1beta1.KafkaChannel, opts ...ServiceOption) (*corev1.Se
Protocol: corev1.ProtocolTCP,
Port: portNumber,
},
{
Name: TLSportName,
Protocol: corev1.ProtocolTCP,
Port: TLSportNumber,
},
},
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,12 @@ public IngressProducerReconcilableStore(
}

public IngressProducer resolve(String host, String path) {
Logger logger = LoggerFactory.getLogger(ProducerHolder.class);
// Ignore the host when there's a path given in the request.
// That means, we support these modes:
// - Request coming to "/path" --> path is used for matching
// - Request coming to "/" --> hostname is used for matching
logger.error("[haha java] print the pathMapper: {}", pathMapper);
Leo6Leo marked this conversation as resolved.
Show resolved Hide resolved
Leo6Leo marked this conversation as resolved.
Show resolved Hide resolved
final var p = pathMapper.get(removeTrailingSlash(path));
if (p != null) {
return p;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ public void stop(Promise<Void> stopPromise) throws Exception {
@Override
public void handle(HttpServerRequest request) {

logger.error("Received request: {}", request.path());
Leo6Leo marked this conversation as resolved.
Show resolved Hide resolved
Leo6Leo marked this conversation as resolved.
Show resolved Hide resolved

final var requestContext = new RequestContext(request);

// Look up for the ingress producer
Leo6Leo marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
48 changes: 48 additions & 0 deletions test/e2e_new/channel_eventing_tls_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//go:build e2e
// +build e2e

/*
* Copyright 2023 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package e2e_new

import (
"testing"
"time"

"knative.dev/eventing-kafka-broker/test/rekt/features"
"knative.dev/pkg/system"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/eventshub"
"knative.dev/reconciler-test/pkg/k8s"
"knative.dev/reconciler-test/pkg/knative"
)

func TestChannelTLSCARotation(t *testing.T) {
t.Parallel()

ctx, env := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.Managed(t),
eventshub.WithTLS(t),
environment.WithPollTimings(5*time.Second, 4*time.Minute),
)

env.Test(ctx, t, features.RotateChannelTLSCertificates())
}
93 changes: 93 additions & 0 deletions test/rekt/features/channel_tls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2023 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package features

import (
"knative.dev/eventing-kafka-broker/test/rekt/resources/kafkachannel"
"knative.dev/eventing/test/rekt/features/featureflags"
"knative.dev/reconciler-test/pkg/eventshub"
"knative.dev/reconciler-test/pkg/feature"
)

func RotateChannelTLSCertificates() *feature.Feature {
//
//ingressCertificateName := "kafka-channel-ingress-server-tls"
//ingressSecretName := "kafka-channel-ingress-server-tls"

channelName := feature.MakeRandomK8sName("channel")
//subscriptionName := feature.MakeRandomK8sName("sub")
sink := feature.MakeRandomK8sName("sink")
//source := feature.MakeRandomK8sName("source")

f := feature.NewFeatureNamed("Rotate Kafka Channel TLS certificate")

f.Prerequisite("transport encryption is strict", featureflags.TransportEncryptionStrict())
f.Prerequisite("should not run when Istio is enabled", featureflags.IstioDisabled())

//f.Setup("Rotate ingress certificate", certificate.Rotate(certificate.RotateCertificate{
// Certificate: types.NamespacedName{
// Namespace: system.Namespace(),
// Name: ingressCertificateName,
// },
//}))

f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiverTLS))
f.Setup("install channel", kafkachannel.Install(channelName,
kafkachannel.WithNumPartitions("3"),
kafkachannel.WithReplicationFactor("1"),
kafkachannel.WithRetentionDuration("P1D"),
))
f.Setup("channel is ready", kafkachannel.IsReady(channelName))

//f.Setup("install subscription", func(ctx context.Context, t feature.T) {
// d := service.AsDestinationRef(sink)
// d.CACerts = eventshub.GetCaCerts(ctx)
// subscription.Install(subscriptionName,
// subscription.WithChannel(kafkachannel.AsRef(channelName)),
// subscription.WithSubscriberFromDestination(d))(ctx, t)
//})
//
//f.Setup("subscription is ready", subscription.IsReady(subscriptionName))
//
//f.Setup("Channel has HTTPS address", kafkachannel.ValidateAddress(channelName, addressable.AssertHTTPSAddress))
//
//event := cetest.FullEvent()
//event.SetID(uuid.New().String())
//
//f.Requirement("install source", eventshub.Install(source,
// eventshub.StartSenderToResourceTLS(kafkachannel.GVR(), channelName, nil),
// eventshub.InputEvent(event),
// // Send multiple events so that we take into account that the certificate rotation might
// // be detected by the server after some time.
// eventshub.SendMultipleEvents(100, 3*time.Second),
//))
//
//f.Assert("Event sent", assert.OnStore(source).
// MatchSentEvent(cetest.HasId(event.ID())).
// AtLeast(1),
//)
//f.Assert("Event received", assert.OnStore(sink).
// MatchReceivedEvent(cetest.HasId(event.ID())).
// AtLeast(1),
//)
//f.Assert("Source match updated peer certificate", assert.OnStore(source).
// MatchPeerCertificatesReceived(assert.MatchPeerCertificatesFromSecret(system.Namespace(), ingressSecretName, "tls.crt")).
// AtLeast(1),
//)

return f
}
55 changes: 55 additions & 0 deletions test/rekt/resources/kafkachannel/kafkachannel.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*

Check failure on line 1 in test/rekt/resources/kafkachannel/kafkachannel.go

View workflow job for this annotation

GitHub Actions / style / Golang / Auto-format and Check

Please run goimports. diff --git a/test/rekt/resources/kafkachannel/kafkachannel.go b/test/rekt/resources/kafkachannel/kafkachannel.go index 93921d6..294e5e1 100644 --- a/test/rekt/resources/kafkachannel/kafkachannel.go +++ b/test/rekt/resources/kafkachannel/kafkachannel.go @@ -19,9 +19,10 @@ package kafkachannel import ( "context" "embed" + "time" + "knative.dev/eventing/test/rekt/resources/addressable" duckv1 "knative.dev/pkg/apis/duck/v1" - "time" "k8s.io/apimachinery/pkg/runtime/schema" "knative.dev/reconciler-test/pkg/feature"
Copyright 2021 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -19,6 +19,8 @@
import (
"context"
"embed"
"knative.dev/eventing/test/rekt/resources/addressable"
duckv1 "knative.dev/pkg/apis/duck/v1"
"time"

"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -34,6 +36,17 @@
return schema.GroupVersionResource{Group: "messaging.knative.dev", Version: "v1beta1", Resource: "kafkachannels"}
}

func GVK() schema.GroupVersionKind {
return schema.ParseGroupKind(EnvCfg.ChannelGK).WithVersion(EnvCfg.ChannelV)
}

var EnvCfg EnvConfig

type EnvConfig struct {
ChannelGK string `envconfig:"CHANNEL_GROUP_KIND" default:"KafkaChannel.messaging.knative.dev" required:"true"`
ChannelV string `envconfig:"CHANNEL_VERSION" default:"v1beta1" required:"true"`
}

// Install will create a KafkaChannel resource, using the latest version, augmented with the config fn options.
func Install(name string, opts ...manifest.CfgFn) feature.StepFn {
cfg := map[string]interface{}{
Expand Down Expand Up @@ -84,3 +97,45 @@
cfg["retentionDuration"] = retentionDuration
}
}

// AsRef returns a KRef for a Channel without namespace.
func AsRef(name string) *duckv1.KReference {
apiVersion, kind := GVK().ToAPIVersionAndKind()
return &duckv1.KReference{
Kind: kind,
APIVersion: apiVersion,
Name: name,
}
}

// AsRef returns a KRef for a Channel without namespace.
func AsDestinationRef(name string) *duckv1.Destination {
apiVersion, kind := GVK().ToAPIVersionAndKind()
return &duckv1.Destination{
Ref: &duckv1.KReference{
Kind: kind,
APIVersion: apiVersion,
Name: name,
},
}
}

// Address returns a Channel's address.
func Address(ctx context.Context, name string, timings ...time.Duration) (*duckv1.Addressable, error) {
return addressable.Address(ctx, GVR(), name, timings...)
}

// ValidateAddress validates the address retured by Address
func ValidateAddress(name string, validate addressable.ValidateAddress, timings ...time.Duration) feature.StepFn {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you might need to look at the change made by @creydr here: c5a0002 for this to work

return func(ctx context.Context, t feature.T) {
addr, err := Address(ctx, name, timings...)
if err != nil {
t.Error(err)
return
}
if err := validate(addr); err != nil {
t.Error(err)
return
}
}
}
Loading