Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: do not build OIDC config unless enabled #4021

Merged
35 changes: 23 additions & 12 deletions control-plane/pkg/reconciler/base/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import (
"context"
"fmt"
"strconv"
"time"

"go.uber.org/zap"
"google.golang.org/protobuf/encoding/protojson"
Expand Down Expand Up @@ -67,6 +67,9 @@
// volume generation annotation data plane pods.
VolumeGenerationAnnotationKey = "volumeGeneration"

// config features update time annotation for data plane pods.
ConfigFeaturesUpdatedAnnotationKey = "configFeaturesUpdatedAt"

Protobuf = "protobuf"
Json = "json"
)
Expand Down Expand Up @@ -281,23 +284,31 @@
return nil
}

func (r *Reconciler) UpdateDispatcherPodsAnnotation(ctx context.Context, logger *zap.Logger, volumeGeneration uint64) error {
func (r *Reconciler) UpdateDispatcherPodsContractGenerationAnnotation(ctx context.Context, logger *zap.Logger, volumeGeneration uint64) error {
pods, errors := r.PodLister.Pods(r.DataPlaneNamespace).List(r.dispatcherSelector())
if errors != nil {
return fmt.Errorf("failed to list dispatcher pods in namespace %s: %w", r.DataPlaneNamespace, errors)
}
return r.UpdatePodsAnnotation(ctx, logger, "dispatcher", volumeGeneration, pods)
return r.UpdatePodsAnnotation(ctx, logger, "dispatcher", VolumeGenerationAnnotationKey, fmt.Sprint(volumeGeneration), pods)
}

func (r *Reconciler) UpdateReceiverPodsAnnotation(ctx context.Context, logger *zap.Logger, volumeGeneration uint64) error {
func (r *Reconciler) UpdateReceiverPodsContractGenerationAnnotation(ctx context.Context, logger *zap.Logger, volumeGeneration uint64) error {
pods, errors := r.PodLister.Pods(r.DataPlaneNamespace).List(r.ReceiverSelector())
if errors != nil {
return fmt.Errorf("failed to list receiver pods in namespace %s: %w", r.DataPlaneNamespace, errors)
}
return r.UpdatePodsAnnotation(ctx, logger, "receiver", volumeGeneration, pods)
return r.UpdatePodsAnnotation(ctx, logger, "receiver", VolumeGenerationAnnotationKey, fmt.Sprint(volumeGeneration), pods)
}

func (r *Reconciler) UpdateReceiverConfigFeaturesUpdatedAnnotation(ctx context.Context, logger *zap.Logger) error {
pods, err := r.PodLister.Pods(r.DataPlaneNamespace).List(r.ReceiverSelector())
if err != nil {
return fmt.Errorf("failed to list receiver pods in namespace %s: %s", r.DataPlaneNamespace, err)

Check warning on line 306 in control-plane/pkg/reconciler/base/reconciler.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/base/reconciler.go#L303-L306

Added lines #L303 - L306 were not covered by tests
}
return r.UpdatePodsAnnotation(ctx, logger, "receiver", ConfigFeaturesUpdatedAnnotationKey, time.Now().String(), pods)

Check warning on line 308 in control-plane/pkg/reconciler/base/reconciler.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/base/reconciler.go#L308

Added line #L308 was not covered by tests
}

func (r *Reconciler) UpdatePodsAnnotation(ctx context.Context, logger *zap.Logger, component string, volumeGeneration uint64, pods []*corev1.Pod) error {
func (r *Reconciler) UpdatePodsAnnotation(ctx context.Context, logger *zap.Logger, component, annotationKey, annotationValue string, pods []*corev1.Pod) error {

var errors error

Expand All @@ -306,7 +317,7 @@
logger.Debug(
"Update "+component+" pod annotation",
zap.String("pod", fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)),
zap.Uint64("volumeGeneration", volumeGeneration),
zap.String(annotationKey, annotationValue),
)

// do not update cache copy
Expand All @@ -318,15 +329,15 @@
}

// Check whether pod's annotation is the expected one.
if v, ok := annotations[VolumeGenerationAnnotationKey]; ok {
v, err := strconv.ParseUint(v /* base */, 10 /* bitSize */, 64)
if err == nil && v == volumeGeneration {
// Volume generation already matches the expected volume generation number.
if v, ok := annotations[annotationKey]; ok {
if v == annotationValue {
logger.Debug(component + " pod annotation already up to date")

Check warning on line 334 in control-plane/pkg/reconciler/base/reconciler.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/base/reconciler.go#L333-L334

Added lines #L333 - L334 were not covered by tests
// annotation is already correct.
continue
}
}

annotations[VolumeGenerationAnnotationKey] = fmt.Sprint(volumeGeneration)
annotations[annotationKey] = annotationValue
pod.SetAnnotations(annotations)

if _, err := r.KubeClient.CoreV1().Pods(pod.Namespace).Update(ctx, pod, metav1.UpdateOptions{}); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions control-plane/pkg/reconciler/base/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func TestUpdateReceiverPodAnnotation(t *testing.T) {
ReceiverLabel: base.SinkReceiverLabel,
}

err := r.UpdateReceiverPodsAnnotation(ctx, logging.FromContext(ctx).Desugar(), 1)
err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logging.FromContext(ctx).Desugar(), 1)
require.Nil(t, err)
}

Expand All @@ -247,7 +247,7 @@ func TestUpdateDispatcherPodAnnotation(t *testing.T) {
DispatcherLabel: label,
}

err := r.UpdateDispatcherPodsAnnotation(ctx, logging.FromContext(ctx).Desugar(), 1)
err := r.UpdateDispatcherPodsContractGenerationAnnotation(ctx, logging.FromContext(ctx).Desugar(), 1)
require.Nil(t, err)
}

Expand Down
8 changes: 4 additions & 4 deletions control-plane/pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker)
// the update even if here eventually means seconds or minutes after the actual update.

// Update volume generation annotation of receiver pods
if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
logger.Error("Failed to update receiver pod annotation", zap.Error(
statusConditionManager.FailedToUpdateReceiverPodsAnnotation(err),
))
Expand All @@ -217,7 +217,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker)
logger.Debug("Updated receiver pod annotation")

// Update volume generation annotation of dispatcher pods
if err := r.UpdateDispatcherPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateDispatcherPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
// Failing to update dispatcher pods annotation leads to config map refresh delayed by several seconds.
// Since the dispatcher side is the consumer side, we don't lose availability, and we can consider the Broker
// ready. So, log out the error and move on to the next step.
Expand Down Expand Up @@ -496,11 +496,11 @@ func (r *Reconciler) deleteResourceFromContractConfigMap(ctx context.Context, lo
// Note: if there aren't changes to be done at the pod annotation level, we just skip the update.

// Update volume generation annotation of receiver pods
if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
return err
}
// Update volume generation annotation of dispatcher pods
if err := r.UpdateDispatcherPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateDispatcherPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
return err
}

Expand Down
4 changes: 4 additions & 0 deletions control-plane/pkg/reconciler/broker/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@
if globalResync != nil {
globalResync(obj)
}
err = reconciler.UpdateReceiverConfigFeaturesUpdatedAnnotation(ctx, logger.Desugar())
if err != nil {
logger.Warn("config-features updated, but the receiver pods were not successfully annotated. This may lead to features not working as expected.", zap.Error(err))

Check warning on line 107 in control-plane/pkg/reconciler/broker/controller.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/broker/controller.go#L107

Added line #L107 was not covered by tests
}
})
featureStore.WatchConfigs(watcher)

Expand Down
4 changes: 2 additions & 2 deletions control-plane/pkg/reconciler/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta
// the update even if here eventually means seconds or minutes after the actual update.

// Update volume generation annotation of receiver pods
if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
logger.Error("Failed to update receiver pod annotation", zap.Error(
statusConditionManager.FailedToUpdateReceiverPodsAnnotation(err),
))
Expand Down Expand Up @@ -415,7 +415,7 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, channel *messagingv1beta1
// Note: if there aren't changes to be done at the pod annotation level, we just skip the update.

// Update volume generation annotation of receiver pods
if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
return err
}

Expand Down
4 changes: 4 additions & 0 deletions control-plane/pkg/reconciler/channel/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@
if globalResync != nil {
globalResync(nil)
}
err = reconciler.UpdateReceiverConfigFeaturesUpdatedAnnotation(ctx, logger.Desugar())
if err != nil {
logger.Warn("config-features updated, but the receiver pods were not successfully annotated. This may lead to features not working as expected.", zap.Error(err))

Check warning on line 109 in control-plane/pkg/reconciler/channel/controller.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/channel/controller.go#L109

Added line #L109 was not covered by tests
}
})
featureStore.WatchConfigs(watcher)

Expand Down
2 changes: 1 addition & 1 deletion control-plane/pkg/reconciler/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ func (r *Reconciler) schedule(ctx context.Context, logger *zap.Logger, c *kafkai
return false, err
}

return true, b.UpdatePodsAnnotation(ctx, logger, "dispatcher" /* component, for logging */, ct.Generation, []*corev1.Pod{p})
return true, b.UpdatePodsAnnotation(ctx, logger, "dispatcher" /* component, for logging */, base.VolumeGenerationAnnotationKey, fmt.Sprint(ct.Generation), []*corev1.Pod{p})
}

func (r *Reconciler) commonReconciler(p *corev1.Pod, cmName string) base.Reconciler {
Expand Down
4 changes: 4 additions & 0 deletions control-plane/pkg/reconciler/sink/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@
if globalResync != nil {
globalResync(obj)
}
err = reconciler.UpdateReceiverConfigFeaturesUpdatedAnnotation(ctx, logger.Desugar())
if err != nil {
logger.Warn("config-features updated, but the receiver pods were not successfully annotated. This may lead to features not working as expected.", zap.Error(err))

Check warning on line 91 in control-plane/pkg/reconciler/sink/controller.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/sink/controller.go#L91

Added line #L91 was not covered by tests
}
})
featureStore.WatchConfigs(watcher)

Expand Down
4 changes: 2 additions & 2 deletions control-plane/pkg/reconciler/sink/kafka_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink)
// receivers haven't got the Sink, so update failures to receiver pods is a hard failure.

// Update volume generation annotation of receiver pods
if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
return err
}

Expand Down Expand Up @@ -341,7 +341,7 @@ func (r *Reconciler) finalizeKind(ctx context.Context, ks *eventing.KafkaSink) e
// Note: if there aren't changes to be done at the pod annotation level, we just skip the update.

// Update volume generation annotation of receiver pods
if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
return err
}

Expand Down
4 changes: 2 additions & 2 deletions control-plane/pkg/reconciler/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, trigger *eventing.Trigge
}

// Update volume generation annotation of dispatcher pods
if err := r.UpdateDispatcherPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateDispatcherPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
// Failing to update dispatcher pods annotation leads to config map refresh delayed by several seconds.
// Since the dispatcher side is the consumer side, we don't lose availability, and we can consider the Trigger
// ready. So, log out the error and move on to the next step.
Expand Down Expand Up @@ -289,7 +289,7 @@ func (r *Reconciler) finalizeKind(ctx context.Context, trigger *eventing.Trigger
logger.Debug("Updated data plane config map", zap.String("configmap", r.Env.DataPlaneConfigMapAsString()))

// Update volume generation annotation of dispatcher pods
if err := r.UpdateDispatcherPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateDispatcherPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
// Failing to update dispatcher pods annotation leads to config map refresh delayed by several seconds.
// The delete trigger will eventually be seen by the data plane pods, so log out the error and move on to the
// next step.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class FileWatcher implements AutoCloseable {
/**
* All args constructor.
*
* @param contractConsumer updates receiver.
* @param triggerFunction is triggered whenever there is a file change.
* @param file file to watch
*/
public FileWatcher(File file, Runnable triggerFunction) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright © 2018 Knative Authors (knative-dev@googlegroups.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.core.oidc;

import dev.knative.eventing.kafka.broker.core.features.FeaturesConfig;
import dev.knative.eventing.kafka.broker.core.file.FileWatcher;
import io.vertx.core.Vertx;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OIDCDiscoveryConfigListener implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(OIDCDiscoveryConfigListener.class);
private final String featuresConfigPath;
private final Vertx vertx;
private final FileWatcher configFeaturesWatcher;
private final int timeoutSeconds;
private List<Consumer<OIDCDiscoveryConfig>> callbacks;
private OIDCDiscoveryConfig oidcDiscoveryConfig;

public OIDCDiscoveryConfigListener(String featuresConfigPath, Vertx vertx, int timeoutSeconds) throws IOException {
this.featuresConfigPath = featuresConfigPath;
this.vertx = vertx;
this.timeoutSeconds = timeoutSeconds;

this.buildFeaturesAndOIDCDiscoveryConfig();

this.configFeaturesWatcher =
new FileWatcher(new File(featuresConfigPath + "/" + FeaturesConfig.KEY_AUTHENTICATION_OIDC), () -> {
if (this.oidcDiscoveryConfig == null) {
this.buildFeaturesAndOIDCDiscoveryConfig();
if (this.oidcDiscoveryConfig != null && this.callbacks != null) {
this.callbacks.stream()
.filter(Objects::nonNull)
.forEach(c -> c.accept(this.oidcDiscoveryConfig));
}
}
});

this.configFeaturesWatcher.start();
}

public OIDCDiscoveryConfig getOidcDiscoveryConfig() {
return oidcDiscoveryConfig;
}

public int registerCallback(Consumer<OIDCDiscoveryConfig> callback) {
if (this.callbacks == null) {
this.callbacks = new ArrayList<>();
}

this.callbacks.add(callback);
return this.callbacks.size() - 1;
}

public void deregisterCallback(int callbackId) {
this.callbacks.set(callbackId, null);
}

private void buildOIDCDiscoveryConfig() throws ExecutionException, InterruptedException, TimeoutException {
this.oidcDiscoveryConfig = OIDCDiscoveryConfig.build(this.vertx)
.toCompletionStage()
.toCompletableFuture()
.get(this.timeoutSeconds, TimeUnit.SECONDS);
}

private void buildFeaturesAndOIDCDiscoveryConfig() {
try {
FeaturesConfig featuresConfig = new FeaturesConfig(featuresConfigPath);
if (featuresConfig.isAuthenticationOIDC()) {
try {
this.buildOIDCDiscoveryConfig();
} catch (ExecutionException | InterruptedException | TimeoutException e) {
logger.error("Unable to build OIDC Discover Config even though OIDC authentication is enabled", e);
}
}
} catch (IOException e) {
logger.warn("failed to get feature config, skipping building OIDC Discovery Config", e);
}
}

@Override
public void close() throws Exception {
this.configFeaturesWatcher.close();
}
}
Loading
Loading