Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Receiver reject requests for wrong audience #3675

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,61 +16,11 @@
package dev.knative.eventing.kafka.broker.core.oidc;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServerRequest;
import org.jose4j.jwt.JwtClaims;
import org.jose4j.jwt.consumer.InvalidJwtException;
import org.jose4j.jwt.consumer.JwtConsumer;
import org.jose4j.jwt.consumer.JwtConsumerBuilder;
import org.jose4j.jwt.consumer.JwtContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TokenVerifier {
public interface TokenVerifier {
Future<JwtClaims> verify(String token, String expectedAudience);

private static final Logger logger = LoggerFactory.getLogger(TokenVerifier.class);

private final Vertx vertx;

private final OIDCDiscoveryConfig oidcDiscoveryConfig;

public TokenVerifier(Vertx vertx, OIDCDiscoveryConfig oidcDiscoveryConfig) {
this.vertx = vertx;
this.oidcDiscoveryConfig = oidcDiscoveryConfig;
}

public Future<JwtClaims> verify(String token, String expectedAudience) {
return this.vertx.<JwtClaims>executeBlocking(promise -> {
// execute blocking, as jose .process() is blocking

JwtConsumer jwtConsumer = new JwtConsumerBuilder()
.setVerificationKeyResolver(this.oidcDiscoveryConfig.getJwksVerificationKeyResolver())
.setExpectedAudience(expectedAudience)
.setExpectedIssuer(this.oidcDiscoveryConfig.getIssuer())
.build();

try {
JwtContext jwtContext = jwtConsumer.process(token);

promise.complete(jwtContext.getJwtClaims());
} catch (InvalidJwtException e) {
promise.fail(e);
}
});
}

public Future<JwtClaims> verify(HttpServerRequest request, String expectedAudience) {
String authHeader = request.getHeader("Authorization");
if (authHeader == null || authHeader.isEmpty()) {
return Future.failedFuture("Request didn't contain Authorization header"); // change to exception
}

if (!authHeader.startsWith("Bearer ") && authHeader.length() <= "Bearer ".length()) {
return Future.failedFuture("Authorization header didn't contain Bearer token"); // change to exception
}

String token = authHeader.substring("Bearer ".length());

return verify(token, expectedAudience);
}
Future<JwtClaims> verify(HttpServerRequest request, String expectedAudience);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright © 2018 Knative Authors (knative-dev@googlegroups.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.core.oidc;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServerRequest;
import org.jose4j.jwt.JwtClaims;
import org.jose4j.jwt.consumer.InvalidJwtException;
import org.jose4j.jwt.consumer.JwtConsumer;
import org.jose4j.jwt.consumer.JwtConsumerBuilder;
import org.jose4j.jwt.consumer.JwtContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TokenVerifierImpl implements TokenVerifier {

private static final Logger logger = LoggerFactory.getLogger(TokenVerifierImpl.class);

private final Vertx vertx;

private final OIDCDiscoveryConfig oidcDiscoveryConfig;

public TokenVerifierImpl(Vertx vertx, OIDCDiscoveryConfig oidcDiscoveryConfig) {
this.vertx = vertx;
this.oidcDiscoveryConfig = oidcDiscoveryConfig;
}

public Future<JwtClaims> verify(String token, String expectedAudience) {
return this.vertx.<JwtClaims>executeBlocking(promise -> {

Check warning on line 43 in data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifierImpl.java

View check run for this annotation

Codecov / codecov/patch

data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifierImpl.java#L43

Added line #L43 was not covered by tests
// execute blocking, as jose .process() is blocking

JwtConsumer jwtConsumer = new JwtConsumerBuilder()
.setVerificationKeyResolver(this.oidcDiscoveryConfig.getJwksVerificationKeyResolver())
.setExpectedAudience(expectedAudience)
.setExpectedIssuer(this.oidcDiscoveryConfig.getIssuer())
.build();

Check warning on line 50 in data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifierImpl.java

View check run for this annotation

Codecov / codecov/patch

data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifierImpl.java#L46-L50

Added lines #L46 - L50 were not covered by tests

try {
JwtContext jwtContext = jwtConsumer.process(token);

Check warning on line 53 in data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifierImpl.java

View check run for this annotation

Codecov / codecov/patch

data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifierImpl.java#L53

Added line #L53 was not covered by tests

promise.complete(jwtContext.getJwtClaims());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does getting JWT claims from processing the token imply that the token had the correct audience?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. jwtConsumer.process(token) fails, when the JWT does not contain the audience which was set in line 48.

} catch (InvalidJwtException e) {
promise.fail(e);
}
});

Check warning on line 59 in data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifierImpl.java

View check run for this annotation

Codecov / codecov/patch

data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifierImpl.java#L55-L59

Added lines #L55 - L59 were not covered by tests
}

public Future<JwtClaims> verify(final HttpServerRequest request, String expectedAudience) {
String authHeader = request.getHeader("Authorization");

Check warning on line 63 in data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifierImpl.java

View check run for this annotation

Codecov / codecov/patch

data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifierImpl.java#L63

Added line #L63 was not covered by tests
if (authHeader == null || authHeader.isEmpty()) {
return Future.failedFuture("Request didn't contain Authorization header");

Check warning on line 65 in data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifierImpl.java

View check run for this annotation

Codecov / codecov/patch

data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifierImpl.java#L65

Added line #L65 was not covered by tests
}

if (!authHeader.startsWith("Bearer ") && authHeader.length() <= "Bearer ".length()) {
return Future.failedFuture("Authorization header didn't contain Bearer token");

Check warning on line 69 in data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifierImpl.java

View check run for this annotation

Codecov / codecov/patch

data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifierImpl.java#L69

Added line #L69 was not covered by tests
}

String token = authHeader.substring("Bearer ".length());

Check warning on line 72 in data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifierImpl.java

View check run for this annotation

Codecov / codecov/patch

data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifierImpl.java#L72

Added line #L72 was not covered by tests

request.pause();
return verify(token, expectedAudience).onSuccess(v -> request.resume());

Check warning on line 75 in data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifierImpl.java

View check run for this annotation

Codecov / codecov/patch

data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/oidc/TokenVerifierImpl.java#L74-L75

Added lines #L74 - L75 were not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
package dev.knative.eventing.kafka.broker.receiverloom;

import java.io.IOException;
import java.util.concurrent.ExecutionException;

public class Main {
public static void main(String[] args) throws IOException {
public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
dev.knative.eventing.kafka.broker.receiver.main.Main.start(args, new LoomProducerFactory<>());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
package dev.knative.eventing.kafka.broker.receiververtx;

import java.io.IOException;
import java.util.concurrent.ExecutionException;

public class Main {
public static void main(String[] args) throws IOException {
public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
dev.knative.eventing.kafka.broker.receiver.main.Main.start(args, new VertxProducerFactory<>());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,9 @@ default Future<RecordMetadata> send(ProducerRecord<String, CloudEvent> record) {
* @return the resource associated with this producer.
*/
DataPlaneContract.Reference getReference();

/**
* @return the OIDC audience for the ingress.
*/
String getAudience();
}
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ private static class IngressProducerImpl implements IngressProducer {
private final String host;
private final Properties producerProperties;
private final DataPlaneContract.Reference reference;
private final String audience;

IngressProducerImpl(
final ReactiveKafkaProducer<String, CloudEvent> producer,
Expand All @@ -276,6 +277,7 @@ private static class IngressProducerImpl implements IngressProducer {
this.producer = producer;
this.topic = resource.getTopics(0);
this.reference = resource.getReference();
this.audience = resource.getIngress().getAudience();
this.path = path;
this.host = host;
this.producerProperties = producerProperties;
Expand All @@ -291,6 +293,11 @@ public String getTopic() {
return topic;
}

@Override
public String getAudience() {
return audience;
}

@Override
public DataPlaneContract.Reference getReference() {
return reference;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@
import static io.netty.handler.codec.http.HttpResponseStatus.OK;

import dev.knative.eventing.kafka.broker.core.file.FileWatcher;
import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfig;
import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifier;
import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifierImpl;
import dev.knative.eventing.kafka.broker.core.reconciler.IngressReconcilerListener;
import dev.knative.eventing.kafka.broker.core.reconciler.ResourcesReconciler;
import dev.knative.eventing.kafka.broker.receiver.IngressProducer;
import dev.knative.eventing.kafka.broker.receiver.IngressRequestHandler;
import dev.knative.eventing.kafka.broker.receiver.RequestContext;
import dev.knative.eventing.kafka.broker.receiver.impl.handler.AuthenticationHandler;
import dev.knative.eventing.kafka.broker.receiver.impl.handler.MethodNotAllowedHandler;
import dev.knative.eventing.kafka.broker.receiver.impl.handler.ProbeHandler;
import dev.knative.eventing.kafka.broker.receiver.main.ReceiverEnv;
Expand Down Expand Up @@ -83,12 +87,13 @@ public class ReceiverVerticle extends AbstractVerticle implements Handler<HttpSe
private final Function<Vertx, IngressProducerReconcilableStore> ingressProducerStoreFactory;
private final IngressRequestHandler ingressRequestHandler;
private final ReceiverEnv env;
private final OIDCDiscoveryConfig oidcDiscoveryConfig;

private AuthenticationHandler authenticationHandler;
private HttpServer httpServer;
private HttpServer httpsServer;
private MessageConsumer<Object> messageConsumer;
private IngressProducerReconcilableStore ingressProducerStore;

private FileWatcher secretWatcher;

public ReceiverVerticle(
Expand All @@ -97,7 +102,8 @@ public ReceiverVerticle(
final HttpServerOptions httpsServerOptions,
final Function<Vertx, IngressProducerReconcilableStore> ingressProducerStoreFactory,
final IngressRequestHandler ingressRequestHandler,
final String secretVolumePath) {
final String secretVolumePath,
final OIDCDiscoveryConfig oidcDiscoveryConfig) {

Objects.requireNonNull(env);
Objects.requireNonNull(httpServerOptions);
Expand All @@ -114,6 +120,7 @@ public ReceiverVerticle(
this.secretVolume = new File(secretVolumePath);
this.tlsKeyFile = new File(secretVolumePath + "/tls.key");
this.tlsCrtFile = new File(secretVolumePath + "/tls.crt");
this.oidcDiscoveryConfig = oidcDiscoveryConfig;
}

public HttpServerOptions getHttpsServerOptions() {
Expand All @@ -127,6 +134,9 @@ public void start(final Promise<Void> startPromise) {
.watchIngress(IngressReconcilerListener.all(this.ingressProducerStore, this.ingressRequestHandler))
.buildAndListen(vertx);

TokenVerifier tokenVerifier = new TokenVerifierImpl(vertx, oidcDiscoveryConfig);
this.authenticationHandler = new AuthenticationHandler(tokenVerifier);

this.httpServer = vertx.createHttpServer(this.httpServerOptions);

// check whether the secret volume is mounted
Expand Down Expand Up @@ -200,10 +210,7 @@ public void stop(Promise<Void> stopPromise) throws Exception {
}

@Override
public void handle(HttpServerRequest request) {

final var requestContext = new RequestContext(request);

public void handle(final HttpServerRequest request) {
// Look up for the ingress producer
IngressProducer producer = this.ingressProducerStore.resolve(request.host(), request.path());
if (producer == null) {
Expand All @@ -224,8 +231,11 @@ public void handle(HttpServerRequest request) {
return;
}

// Invoke the ingress request handler
this.ingressRequestHandler.handle(requestContext, producer);
this.authenticationHandler.handle(request, producer, req -> {
// Invoke the ingress request handler
final var requestContext = new RequestContext(req);
this.ingressRequestHandler.handle(requestContext, producer);
});
}

public void updateServerConfig() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright © 2018 Knative Authors (knative-dev@googlegroups.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.receiver.impl.handler;

import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue;

import dev.knative.eventing.kafka.broker.core.oidc.TokenVerifier;
import dev.knative.eventing.kafka.broker.receiver.IngressProducer;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpServerRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Handler checking that the provided request contained a valid JWT.
*/
public class AuthenticationHandler {

private static final Logger logger = LoggerFactory.getLogger(AuthenticationHandler.class);
private final TokenVerifier tokenVerifier;

public AuthenticationHandler(final TokenVerifier tokenVerifier) {
this.tokenVerifier = tokenVerifier;
}

public void handle(
final HttpServerRequest request, final IngressProducer ingressInfo, final Handler<HttpServerRequest> next) {
if (ingressInfo.getAudience().isEmpty()) {
logger.debug("No audience for ingress set. Continue without authentication check...");
next.handle(request);
return;
}

tokenVerifier
.verify(request, ingressInfo.getAudience())
.onFailure(e -> {
logger.debug("Failed to verify authentication of request: {}", keyValue("error", e.getMessage()));
request.response()
.setStatusCode(HttpResponseStatus.UNAUTHORIZED.code())
.end();
})
.onSuccess(jwtClaims -> {
logger.debug("Request contained valid JWT. Continuing...");
next.handle(request);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import dev.knative.eventing.kafka.broker.core.eventbus.ContractPublisher;
import dev.knative.eventing.kafka.broker.core.file.FileWatcher;
import dev.knative.eventing.kafka.broker.core.metrics.Metrics;
import dev.knative.eventing.kafka.broker.core.oidc.OIDCDiscoveryConfig;
import dev.knative.eventing.kafka.broker.core.reconciler.impl.ResourcesReconcilerMessageHandler;
import dev.knative.eventing.kafka.broker.core.tracing.TracingConfig;
import dev.knative.eventing.kafka.broker.core.utils.Configurations;
Expand All @@ -39,6 +40,7 @@
import java.io.File;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand All @@ -60,7 +62,7 @@
* @param args command line arguments.
*/
public static void start(final String[] args, final ReactiveProducerFactory kafkaProducerFactory)
throws IOException {
throws IOException, ExecutionException, InterruptedException {
ReceiverEnv env = new ReceiverEnv(System::getenv);

OpenTelemetrySdk openTelemetry =
Expand Down Expand Up @@ -97,14 +99,21 @@
httpsServerOptions.setPort(env.getIngressTLSPort());
httpsServerOptions.setTracingPolicy(TracingPolicy.PROPAGATE);

// Setup OIDC discovery config
OIDCDiscoveryConfig oidcDiscoveryConfig = OIDCDiscoveryConfig.build(vertx)
.toCompletionStage()
.toCompletableFuture()
.get();

Check warning on line 106 in data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java

View check run for this annotation

Codecov / codecov/patch

data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java#L103-L106

Added lines #L103 - L106 were not covered by tests

// Configure the verticle to deploy and the deployment options
final Supplier<Verticle> receiverVerticleFactory = new ReceiverVerticleFactory(
env,
producerConfigs,
Metrics.getRegistry(),
httpServerOptions,
httpsServerOptions,
kafkaProducerFactory);
kafkaProducerFactory,
oidcDiscoveryConfig);
DeploymentOptions deploymentOptions =
new DeploymentOptions().setInstances(Runtime.getRuntime().availableProcessors());

Expand Down
Loading
Loading