diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifier.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifier.java index e12c764de3..521b4f4514 100644 --- a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifier.java +++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifier.java @@ -16,61 +16,11 @@ package dev.knative.eventing.kafka.broker.core.oidc; import io.vertx.core.Future; -import io.vertx.core.Vertx; import io.vertx.core.http.HttpServerRequest; import org.jose4j.jwt.JwtClaims; -import org.jose4j.jwt.consumer.InvalidJwtException; -import org.jose4j.jwt.consumer.JwtConsumer; -import org.jose4j.jwt.consumer.JwtConsumerBuilder; -import org.jose4j.jwt.consumer.JwtContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public class TokenVerifier { +public interface TokenVerifier { + Future verify(String token, String expectedAudience); - private static final Logger logger = LoggerFactory.getLogger(TokenVerifier.class); - - private final Vertx vertx; - - private final OIDCDiscoveryConfig oidcDiscoveryConfig; - - public TokenVerifier(Vertx vertx, OIDCDiscoveryConfig oidcDiscoveryConfig) { - this.vertx = vertx; - this.oidcDiscoveryConfig = oidcDiscoveryConfig; - } - - public Future verify(String token, String expectedAudience) { - return this.vertx.executeBlocking(promise -> { - // execute blocking, as jose .process() is blocking - - JwtConsumer jwtConsumer = new JwtConsumerBuilder() - .setVerificationKeyResolver(this.oidcDiscoveryConfig.getJwksVerificationKeyResolver()) - .setExpectedAudience(expectedAudience) - .setExpectedIssuer(this.oidcDiscoveryConfig.getIssuer()) - .build(); - - try { - JwtContext jwtContext = jwtConsumer.process(token); - - promise.complete(jwtContext.getJwtClaims()); - } catch (InvalidJwtException e) { - promise.fail(e); - } - }); - } - - public Future verify(HttpServerRequest request, String expectedAudience) { - String authHeader = request.getHeader("Authorization"); - if (authHeader == null || authHeader.isEmpty()) { - return Future.failedFuture("Request didn't contain Authorization header"); // change to exception - } - - if (!authHeader.startsWith("Bearer ") && authHeader.length() <= "Bearer ".length()) { - return Future.failedFuture("Authorization header didn't contain Bearer token"); // change to exception - } - - String token = authHeader.substring("Bearer ".length()); - - return verify(token, expectedAudience); - } + Future verify(HttpServerRequest request, String expectedAudience); } diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifierImpl.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifierImpl.java new file mode 100644 index 0000000000..d490128f9e --- /dev/null +++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifierImpl.java @@ -0,0 +1,77 @@ +/* + * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.knative.eventing.kafka.broker.core.oidc; + +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpServerRequest; +import org.jose4j.jwt.JwtClaims; +import org.jose4j.jwt.consumer.InvalidJwtException; +import org.jose4j.jwt.consumer.JwtConsumer; +import org.jose4j.jwt.consumer.JwtConsumerBuilder; +import org.jose4j.jwt.consumer.JwtContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TokenVerifierImpl implements TokenVerifier { + + private static final Logger logger = LoggerFactory.getLogger(TokenVerifierImpl.class); + + private final Vertx vertx; + + private final OIDCDiscoveryConfig oidcDiscoveryConfig; + + public TokenVerifierImpl(Vertx vertx, OIDCDiscoveryConfig oidcDiscoveryConfig) { + this.vertx = vertx; + this.oidcDiscoveryConfig = oidcDiscoveryConfig; + } + + public Future verify(String token, String expectedAudience) { + return this.vertx.executeBlocking(promise -> { + // execute blocking, as jose .process() is blocking + + JwtConsumer jwtConsumer = new JwtConsumerBuilder() + .setVerificationKeyResolver(this.oidcDiscoveryConfig.getJwksVerificationKeyResolver()) + .setExpectedAudience(expectedAudience) + .setExpectedIssuer(this.oidcDiscoveryConfig.getIssuer()) + .build(); + + try { + JwtContext jwtContext = jwtConsumer.process(token); + + promise.complete(jwtContext.getJwtClaims()); + } catch (InvalidJwtException e) { + promise.fail(e); + } + }); + } + + public Future verify(final HttpServerRequest request, String expectedAudience) { + String authHeader = request.getHeader("Authorization"); + if (authHeader == null || authHeader.isEmpty()) { + return Future.failedFuture("Request didn't contain Authorization header"); + } + + if (!authHeader.startsWith("Bearer ") && authHeader.length() <= "Bearer ".length()) { + return Future.failedFuture("Authorization header didn't contain Bearer token"); + } + + String token = authHeader.substring("Bearer ".length()); + + request.pause(); + return verify(token, expectedAudience).onSuccess(v -> request.resume()); + } +} diff --git a/data-plane/receiver-loom/src/main/java/dev/knative/eventing/kafka/broker/receiverloom/Main.java b/data-plane/receiver-loom/src/main/java/dev/knative/eventing/kafka/broker/receiverloom/Main.java index ae325bfc90..b218b3767f 100644 --- a/data-plane/receiver-loom/src/main/java/dev/knative/eventing/kafka/broker/receiverloom/Main.java +++ b/data-plane/receiver-loom/src/main/java/dev/knative/eventing/kafka/broker/receiverloom/Main.java @@ -16,9 +16,10 @@ package dev.knative.eventing.kafka.broker.receiverloom; import java.io.IOException; +import java.util.concurrent.ExecutionException; public class Main { - public static void main(String[] args) throws IOException { + public static void main(String[] args) throws IOException, ExecutionException, InterruptedException { dev.knative.eventing.kafka.broker.receiver.main.Main.start(args, new LoomProducerFactory<>()); } } diff --git a/data-plane/receiver-vertx/src/main/java/dev/knative/eventing/kafka/broker/receiververtx/Main.java b/data-plane/receiver-vertx/src/main/java/dev/knative/eventing/kafka/broker/receiververtx/Main.java index 1c0dce4f4f..5365e96414 100644 --- a/data-plane/receiver-vertx/src/main/java/dev/knative/eventing/kafka/broker/receiververtx/Main.java +++ b/data-plane/receiver-vertx/src/main/java/dev/knative/eventing/kafka/broker/receiververtx/Main.java @@ -16,9 +16,10 @@ package dev.knative.eventing.kafka.broker.receiververtx; import java.io.IOException; +import java.util.concurrent.ExecutionException; public class Main { - public static void main(String[] args) throws IOException { + public static void main(String[] args) throws IOException, ExecutionException, InterruptedException { dev.knative.eventing.kafka.broker.receiver.main.Main.start(args, new VertxProducerFactory<>()); } } diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/IngressProducer.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/IngressProducer.java index 116067adc6..149191540d 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/IngressProducer.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/IngressProducer.java @@ -50,4 +50,9 @@ default Future send(ProducerRecord record) { * @return the resource associated with this producer. */ DataPlaneContract.Reference getReference(); + + /** + * @return the OIDC audience for the ingress. + */ + String getAudience(); } diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/IngressProducerReconcilableStore.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/IngressProducerReconcilableStore.java index 3926d2e3c2..1f3dafa51d 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/IngressProducerReconcilableStore.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/IngressProducerReconcilableStore.java @@ -266,6 +266,7 @@ private static class IngressProducerImpl implements IngressProducer { private final String host; private final Properties producerProperties; private final DataPlaneContract.Reference reference; + private final String audience; IngressProducerImpl( final ReactiveKafkaProducer producer, @@ -276,6 +277,7 @@ private static class IngressProducerImpl implements IngressProducer { this.producer = producer; this.topic = resource.getTopics(0); this.reference = resource.getReference(); + this.audience = resource.getIngress().getAudience(); this.path = path; this.host = host; this.producerProperties = producerProperties; @@ -291,6 +293,11 @@ public String getTopic() { return topic; } + @Override + public String getAudience() { + return audience; + } + @Override public DataPlaneContract.Reference getReference() { return reference; diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java index 115383e954..24e3415a7a 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java @@ -22,11 +22,15 @@ import static io.netty.handler.codec.http.HttpResponseStatus.OK; import dev.knative.eventing.kafka.broker.core.file.FileWatcher; +import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfig; +import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifier; +import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifierImpl; import dev.knative.eventing.kafka.broker.core.reconciler.IngressReconcilerListener; import dev.knative.eventing.kafka.broker.core.reconciler.ResourcesReconciler; import dev.knative.eventing.kafka.broker.receiver.IngressProducer; import dev.knative.eventing.kafka.broker.receiver.IngressRequestHandler; import dev.knative.eventing.kafka.broker.receiver.RequestContext; +import dev.knative.eventing.kafka.broker.receiver.impl.handler.AuthenticationHandler; import dev.knative.eventing.kafka.broker.receiver.impl.handler.MethodNotAllowedHandler; import dev.knative.eventing.kafka.broker.receiver.impl.handler.ProbeHandler; import dev.knative.eventing.kafka.broker.receiver.main.ReceiverEnv; @@ -83,12 +87,13 @@ public class ReceiverVerticle extends AbstractVerticle implements Handler ingressProducerStoreFactory; private final IngressRequestHandler ingressRequestHandler; private final ReceiverEnv env; + private final OIDCDiscoveryConfig oidcDiscoveryConfig; + private AuthenticationHandler authenticationHandler; private HttpServer httpServer; private HttpServer httpsServer; private MessageConsumer messageConsumer; private IngressProducerReconcilableStore ingressProducerStore; - private FileWatcher secretWatcher; public ReceiverVerticle( @@ -97,7 +102,8 @@ public ReceiverVerticle( final HttpServerOptions httpsServerOptions, final Function ingressProducerStoreFactory, final IngressRequestHandler ingressRequestHandler, - final String secretVolumePath) { + final String secretVolumePath, + final OIDCDiscoveryConfig oidcDiscoveryConfig) { Objects.requireNonNull(env); Objects.requireNonNull(httpServerOptions); @@ -114,6 +120,7 @@ public ReceiverVerticle( this.secretVolume = new File(secretVolumePath); this.tlsKeyFile = new File(secretVolumePath + "/tls.key"); this.tlsCrtFile = new File(secretVolumePath + "/tls.crt"); + this.oidcDiscoveryConfig = oidcDiscoveryConfig; } public HttpServerOptions getHttpsServerOptions() { @@ -127,6 +134,9 @@ public void start(final Promise startPromise) { .watchIngress(IngressReconcilerListener.all(this.ingressProducerStore, this.ingressRequestHandler)) .buildAndListen(vertx); + TokenVerifier tokenVerifier = new TokenVerifierImpl(vertx, oidcDiscoveryConfig); + this.authenticationHandler = new AuthenticationHandler(tokenVerifier); + this.httpServer = vertx.createHttpServer(this.httpServerOptions); // check whether the secret volume is mounted @@ -200,10 +210,7 @@ public void stop(Promise stopPromise) throws Exception { } @Override - public void handle(HttpServerRequest request) { - - final var requestContext = new RequestContext(request); - + public void handle(final HttpServerRequest request) { // Look up for the ingress producer IngressProducer producer = this.ingressProducerStore.resolve(request.host(), request.path()); if (producer == null) { @@ -224,8 +231,11 @@ public void handle(HttpServerRequest request) { return; } - // Invoke the ingress request handler - this.ingressRequestHandler.handle(requestContext, producer); + this.authenticationHandler.handle(request, producer, req -> { + // Invoke the ingress request handler + final var requestContext = new RequestContext(req); + this.ingressRequestHandler.handle(requestContext, producer); + }); } public void updateServerConfig() { diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandler.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandler.java new file mode 100644 index 0000000000..a1283ac275 --- /dev/null +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandler.java @@ -0,0 +1,61 @@ +/* + * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.knative.eventing.kafka.broker.receiver.impl.handler; + +import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue; + +import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifier; +import dev.knative.eventing.kafka.broker.receiver.IngressProducer; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.Handler; +import io.vertx.core.http.HttpServerRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Handler checking that the provided request contained a valid JWT. + */ +public class AuthenticationHandler { + + private static final Logger logger = LoggerFactory.getLogger(AuthenticationHandler.class); + private final TokenVerifier tokenVerifier; + + public AuthenticationHandler(final TokenVerifier tokenVerifier) { + this.tokenVerifier = tokenVerifier; + } + + public void handle( + final HttpServerRequest request, final IngressProducer ingressInfo, final Handler next) { + if (ingressInfo.getAudience().isEmpty()) { + logger.debug("No audience for ingress set. Continue without authentication check..."); + next.handle(request); + return; + } + + tokenVerifier + .verify(request, ingressInfo.getAudience()) + .onFailure(e -> { + logger.debug("Failed to verify authentication of request: {}", keyValue("error", e.getMessage())); + request.response() + .setStatusCode(HttpResponseStatus.UNAUTHORIZED.code()) + .end(); + }) + .onSuccess(jwtClaims -> { + logger.debug("Request contained valid JWT. Continuing..."); + next.handle(request); + }); + } +} diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java index 8638931f45..af04178fc8 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java @@ -22,6 +22,7 @@ import dev.knative.eventing.kafka.broker.core.eventbus.ContractPublisher; import dev.knative.eventing.kafka.broker.core.file.FileWatcher; import dev.knative.eventing.kafka.broker.core.metrics.Metrics; +import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfig; import dev.knative.eventing.kafka.broker.core.reconciler.impl.ResourcesReconcilerMessageHandler; import dev.knative.eventing.kafka.broker.core.tracing.TracingConfig; import dev.knative.eventing.kafka.broker.core.utils.Configurations; @@ -39,6 +40,7 @@ import java.io.File; import java.io.IOException; import java.util.Properties; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import org.apache.kafka.clients.producer.ProducerConfig; @@ -60,7 +62,7 @@ public class Main { * @param args command line arguments. */ public static void start(final String[] args, final ReactiveProducerFactory kafkaProducerFactory) - throws IOException { + throws IOException, ExecutionException, InterruptedException { ReceiverEnv env = new ReceiverEnv(System::getenv); OpenTelemetrySdk openTelemetry = @@ -97,6 +99,12 @@ public static void start(final String[] args, final ReactiveProducerFactory kafk httpsServerOptions.setPort(env.getIngressTLSPort()); httpsServerOptions.setTracingPolicy(TracingPolicy.PROPAGATE); + // Setup OIDC discovery config + OIDCDiscoveryConfig oidcDiscoveryConfig = OIDCDiscoveryConfig.build(vertx) + .toCompletionStage() + .toCompletableFuture() + .get(); + // Configure the verticle to deploy and the deployment options final Supplier receiverVerticleFactory = new ReceiverVerticleFactory( env, @@ -104,7 +112,8 @@ public static void start(final String[] args, final ReactiveProducerFactory kafk Metrics.getRegistry(), httpServerOptions, httpsServerOptions, - kafkaProducerFactory); + kafkaProducerFactory, + oidcDiscoveryConfig); DeploymentOptions deploymentOptions = new DeploymentOptions().setInstances(Runtime.getRuntime().availableProcessors()); diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java index 1647694589..72438f31ae 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java @@ -16,6 +16,7 @@ package dev.knative.eventing.kafka.broker.receiver.main; import dev.knative.eventing.kafka.broker.core.ReactiveProducerFactory; +import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfig; import dev.knative.eventing.kafka.broker.core.security.AuthProvider; import dev.knative.eventing.kafka.broker.receiver.IngressRequestHandler; import dev.knative.eventing.kafka.broker.receiver.impl.IngressProducerReconcilableStore; @@ -39,6 +40,7 @@ class ReceiverVerticleFactory implements Supplier { private final String secretVolumePath = "/etc/receiver-tls-secret"; private final IngressRequestHandler ingressRequestHandler; + private final OIDCDiscoveryConfig oidcDiscoveryConfig; private ReactiveProducerFactory kafkaProducerFactory; @@ -48,16 +50,16 @@ class ReceiverVerticleFactory implements Supplier { final MeterRegistry metricsRegistry, final HttpServerOptions httpServerOptions, final HttpServerOptions httpsServerOptions, - final ReactiveProducerFactory kafkaProducerFactory) { - { - this.env = env; - this.producerConfigs = producerConfigs; - this.httpServerOptions = httpServerOptions; - this.httpsServerOptions = httpsServerOptions; - this.ingressRequestHandler = - new IngressRequestHandlerImpl(StrictRequestToRecordMapper.getInstance(), metricsRegistry); - this.kafkaProducerFactory = kafkaProducerFactory; - } + final ReactiveProducerFactory kafkaProducerFactory, + final OIDCDiscoveryConfig oidcDiscoveryConfig) { + this.env = env; + this.producerConfigs = producerConfigs; + this.httpServerOptions = httpServerOptions; + this.httpsServerOptions = httpsServerOptions; + this.ingressRequestHandler = + new IngressRequestHandlerImpl(StrictRequestToRecordMapper.getInstance(), metricsRegistry); + this.kafkaProducerFactory = kafkaProducerFactory; + this.oidcDiscoveryConfig = oidcDiscoveryConfig; } @Override @@ -71,6 +73,7 @@ public Verticle get() { producerConfigs, properties -> kafkaProducerFactory.create(v, properties)), this.ingressRequestHandler, - secretVolumePath); + secretVolumePath, + oidcDiscoveryConfig); } } diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTest.java index eb37cfecca..0d1684ed5f 100644 --- a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTest.java +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTest.java @@ -149,7 +149,8 @@ public void setUpHTTP(final Vertx vertx, final VertxTestContext testContext) { httpsServerOptions, v -> store, new IngressRequestHandlerImpl(StrictRequestToRecordMapper.getInstance(), registry), - SECRET_VOLUME_PATH); + SECRET_VOLUME_PATH, + null); vertx.deployVerticle(verticle, testContext.succeeding(ar -> testContext.completeNow())); // Connect to the logger in ReceiverVerticle diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTracingTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTracingTest.java index 950d3bd314..bcc41cdde8 100644 --- a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTracingTest.java +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTracingTest.java @@ -132,7 +132,8 @@ public void setup() throws ExecutionException, InterruptedException { httpsServerOptions, v -> store, new IngressRequestHandlerImpl(StrictRequestToRecordMapper.getInstance(), Metrics.getRegistry()), - SECRET_VOLUME_PATH); + SECRET_VOLUME_PATH, + null); vertx.deployVerticle(verticle).toCompletionStage().toCompletableFuture().get(); } diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandlerTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandlerTest.java new file mode 100644 index 0000000000..494f9b3814 --- /dev/null +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandlerTest.java @@ -0,0 +1,138 @@ +/* + * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.knative.eventing.kafka.broker.receiver.impl.handler; + +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.when; + +import dev.knative.eventing.kafka.broker.contract.DataPlaneContract; +import dev.knative.eventing.kafka.broker.core.ReactiveKafkaProducer; +import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifier; +import dev.knative.eventing.kafka.broker.receiver.IngressProducer; +import io.cloudevents.CloudEvent; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.http.HttpServerResponse; +import org.jose4j.jwt.JwtClaims; +import org.junit.jupiter.api.Test; + +public class AuthenticationHandlerTest { + @Test + public void shouldReturnUnauthorizedWhenJWTValidationFails() { + final HttpServerRequest request = mock(HttpServerRequest.class); + final var response = mockResponse(request, HttpResponseStatus.UNAUTHORIZED.code()); + + TokenVerifier tokenVerifier = new TokenVerifier() { + @Override + public Future verify(String token, String expectedAudience) { + return Future.failedFuture("JWT validation failed"); + } + + @Override + public Future verify(HttpServerRequest request, String expectedAudience) { + return Future.failedFuture("JWT validation failed"); + } + }; + + final AuthenticationHandler authHandler = new AuthenticationHandler(tokenVerifier); + + authHandler.handle( + request, + new IngressProducer() { + @Override + public ReactiveKafkaProducer getKafkaProducer() { + return null; + } + + @Override + public String getTopic() { + return null; + } + + @Override + public DataPlaneContract.Reference getReference() { + return null; + } + + @Override + public String getAudience() { + return "some-required-audience"; + } + }, + mock(Handler.class)); + + verify(response, times(1)).setStatusCode(HttpResponseStatus.UNAUTHORIZED.code()); + verify(response, times(1)).end(); + } + + @Test + public void shouldContinueWithRequestWhenJWTSucceeds() { + final HttpServerRequest request = mock(HttpServerRequest.class); + final var next = mock(Handler.class); // mockHandler(request); + + TokenVerifier tokenVerifier = new TokenVerifier() { + @Override + public Future verify(String token, String expectedAudience) { + return Future.succeededFuture(new JwtClaims()); + } + + @Override + public Future verify(HttpServerRequest request, String expectedAudience) { + return Future.succeededFuture(new JwtClaims()); + } + }; + + final AuthenticationHandler authHandler = new AuthenticationHandler(tokenVerifier); + + authHandler.handle( + request, + new IngressProducer() { + @Override + public ReactiveKafkaProducer getKafkaProducer() { + return null; + } + + @Override + public String getTopic() { + return null; + } + + @Override + public DataPlaneContract.Reference getReference() { + return null; + } + + @Override + public String getAudience() { + return "some-required-audience"; + } + }, + next); + + verify(next, times(1)).handle(request); + } + + private static HttpServerResponse mockResponse(final HttpServerRequest request, final int statusCode) { + final var response = mock(HttpServerResponse.class); + when(response.setStatusCode(statusCode)).thenReturn(response); + when(request.response()).thenReturn(response); + + return response; + } +} diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/IngressRequestHandlerImplTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/IngressRequestHandlerImplTest.java index d95cc20d4c..462326bc71 100644 --- a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/IngressRequestHandlerImplTest.java +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/IngressRequestHandlerImplTest.java @@ -98,6 +98,11 @@ public String getTopic() { public DataPlaneContract.Reference getReference() { return DataPlaneContract.Reference.newBuilder().build(); } + + @Override + public String getAudience() { + return ""; + } }); verifySetStatusCodeAndTerminateResponse(statusCode, response); @@ -129,6 +134,11 @@ public String getTopic() { public DataPlaneContract.Reference getReference() { return DataPlaneContract.Reference.newBuilder().build(); } + + @Override + public String getAudience() { + return ""; + } }); verifySetStatusCodeAndTerminateResponse(IngressRequestHandlerImpl.MAPPER_FAILED, response); diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactoryTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactoryTest.java index d18cad6157..970ca0d0f0 100644 --- a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactoryTest.java +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactoryTest.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.mock; import dev.knative.eventing.kafka.broker.core.metrics.Metrics; +import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfig; import dev.knative.eventing.kafka.broker.receiver.MockReactiveProducerFactory; import io.micrometer.core.instrument.MeterRegistry; import io.vertx.core.http.HttpServerOptions; @@ -42,7 +43,8 @@ public void shouldCreateMultipleReceiverVerticleInstances() { mock(MeterRegistry.class), mock(HttpServerOptions.class), mock(HttpServerOptions.class), - mock(MockReactiveProducerFactory.class)); + mock(MockReactiveProducerFactory.class), + mock(OIDCDiscoveryConfig.class)); assertThat(supplier.get()).isNotSameAs(supplier.get()); } diff --git a/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java b/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java index e8d1841e7c..02ce2b8b43 100644 --- a/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java +++ b/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java @@ -392,7 +392,8 @@ private ReceiverVerticle setUpReceiver(final Vertx vertx, final VertxTestContext AuthProvider.noAuth(), producerConfigs(), properties -> getReactiveProducerFactory() .create(v, properties)), new IngressRequestHandlerImpl(StrictRequestToRecordMapper.getInstance(), Metrics.getRegistry()), - SECRET_VOLUME_PATH); + SECRET_VOLUME_PATH, + null); final CountDownLatch latch = new CountDownLatch(1); vertx.deployVerticle(verticle, context.succeeding(h -> latch.countDown())); diff --git a/test/channel-reconciler-tests.sh b/test/channel-reconciler-tests.sh index 03a1942a78..2bd58ca226 100755 --- a/test/channel-reconciler-tests.sh +++ b/test/channel-reconciler-tests.sh @@ -3,3 +3,9 @@ source $(dirname $0)/e2e-common.sh go_test_e2e -tags=e2e,cloudevents -timeout=1h ./test/e2e_new_channel/... || fail_test "E2E (new - KafkaChannel) suite failed" + +echo "Running E2E Channel Reconciler Tests with OIDC authentication enabled" + +kubectl apply -Rf "$(dirname "$0")/config-oidc-authentication" + +go_test_e2e -timeout=1h ./test/e2e_new_channel/... -run OIDC || fail_test diff --git a/test/config-oidc-authentication/features.yaml b/test/config-oidc-authentication/features.yaml new file mode 100644 index 0000000000..ae227d1072 --- /dev/null +++ b/test/config-oidc-authentication/features.yaml @@ -0,0 +1,31 @@ +# Copyright 2021 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: v1 +kind: ConfigMap +metadata: + name: config-features + namespace: knative-eventing + labels: + knative.dev/config-propagation: original + knative.dev/config-category: eventing +data: + kreference-group: "disabled" + delivery-retryafter: "disabled" + delivery-timeout: "enabled" + kreference-mapping: "disabled" + new-trigger-filters: "enabled" + transport-encryption: "strict" + eventtype-auto-create: "disabled" + authentication-oidc: "enabled" diff --git a/test/e2e_new_channel/kafka_channel_test.go b/test/e2e_new_channel/kafka_channel_test.go index 5471791b66..9a93f46f83 100644 --- a/test/e2e_new_channel/kafka_channel_test.go +++ b/test/e2e_new_channel/kafka_channel_test.go @@ -28,6 +28,7 @@ import ( "knative.dev/pkg/system" "knative.dev/reconciler-test/pkg/environment" + "knative.dev/reconciler-test/pkg/eventshub" "knative.dev/reconciler-test/pkg/feature" "knative.dev/reconciler-test/pkg/k8s" "knative.dev/reconciler-test/pkg/knative" @@ -83,13 +84,13 @@ func TestKafkaChannelOIDC(t *testing.T) { knative.WithLoggingConfig, knative.WithTracingConfig, k8s.WithEventListener, - environment.WithPollTimings(3*time.Second, 120*time.Second), + environment.WithPollTimings(2*time.Second, 12*time.Minute), environment.Managed(t), + eventshub.WithTLS(t), ) name := feature.MakeRandomK8sName("kafkaChannel") env.Prerequisite(ctx, t, channel.ImplGoesReady(name)) - env.Test(ctx, t, oidc.AddressableHasAudiencePopulated(kafkachannelresource.GVR(), kafkachannelresource.GVK().Kind, name, env.Namespace())) - // when the KafkaChannel supports all the OIDC features, we can do `TestKafkaChannelOIDC = rekt.TestChannelImplSupportsOIDC` too + env.TestSet(ctx, t, oidc.AddressableOIDCConformance(kafkachannelresource.GVR(), "KafkaChannel", name, env.Namespace())) } diff --git a/test/reconciler-tests.sh b/test/reconciler-tests.sh index 57fbab16ef..c94b04ff17 100755 --- a/test/reconciler-tests.sh +++ b/test/reconciler-tests.sh @@ -54,6 +54,12 @@ kubectl apply -Rf "$(dirname "$0")/config-transport-encryption" go_test_e2e -timeout=1h ./test/e2e_new -run TLS || fail_test +echo "Running E2E Reconciler Tests with OIDC authentication enabled" + +kubectl apply -Rf "$(dirname "$0")/config-oidc-authentication" + +go_test_e2e -timeout=1h ./test/e2e_new -run OIDC || fail_test + if ! ${LOCAL_DEVELOPMENT}; then go_test_e2e -tags=sacura -timeout=40m ./test/e2e/... || fail_test "E2E (sacura) suite failed" fi