Skip to content

Commit

Permalink
Receiver handles ingress probes (#1431)
Browse files Browse the repository at this point in the history
* Receiver handles ingress probes

Probing is done using the knative/networking data-plane contract
(already implemented in Eventing through knative/pkg).
ref issue knative/eventing#3911

Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com>

* Add docs, improve naming, remove start imports

Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com>
  • Loading branch information
pierDipi authored Nov 10, 2021
1 parent d24bcb7 commit da9c0fb
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@
import java.util.function.Function;

import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue;
import static dev.knative.eventing.kafka.broker.receiver.impl.handler.ControlPlaneProbeRequestUtil.PROBE_HASH_HEADER_NAME;
import static dev.knative.eventing.kafka.broker.receiver.impl.handler.ControlPlaneProbeRequestUtil.isControlPlaneProbeRequest;
import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;

/**
* This verticle is responsible for implementing the logic of the receiver.
Expand Down Expand Up @@ -118,10 +121,14 @@ public void handle(HttpServerRequest request) {
IngressProducer producer = this.ingressProducerStore.resolve(request.path());
if (producer == null) {
request.response().setStatusCode(NOT_FOUND.code()).end();
logger.warn("Resource not found {}", keyValue("path", request.path()));
return;
}

logger.warn("Resource not found {}",
keyValue("path", request.path())
);
if (isControlPlaneProbeRequest(request)) {
request.response()
.putHeader(PROBE_HASH_HEADER_NAME, request.getHeader(PROBE_HASH_HEADER_NAME))
.setStatusCode(OK.code()).end();
return;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright © 2018 Knative Authors (knative-dev@googlegroups.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.receiver.impl.handler;

import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerRequest;

public final class ControlPlaneProbeRequestUtil {

private ControlPlaneProbeRequestUtil() {
}

// Control plane probing is based on the Knative Networking Data Plane Contract.
// For a description of what these headers are see [1].
//
// [1]: https://github.com/knative/pkg/blob/b558677ab03404ed118ba3e179a7438fe2d8509a/network/network.go#L38-L51
public static final String PROBE_HEADER_NAME = "K-Network-Probe";
public static final String PROBE_HEADER_VALUE = "probe";
public static final String PROBE_HASH_HEADER_NAME = "K-Network-Hash";

/**
* @param request HTTP request
* @return true if the provided request conforms to the Knative Networking Data Plane Contract, false otherwise.
*/
public static boolean isControlPlaneProbeRequest(final HttpServerRequest request) {
final var headers = request.headers();
return HttpMethod.GET.equals(request.method()) &&
headers.contains(PROBE_HEADER_NAME) &&
headers.get(PROBE_HEADER_NAME).equals(PROBE_HEADER_VALUE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.slf4j.LoggerFactory;

import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue;
import static dev.knative.eventing.kafka.broker.receiver.impl.handler.ControlPlaneProbeRequestUtil.isControlPlaneProbeRequest;
import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;

/**
Expand All @@ -41,14 +42,11 @@ public MethodNotAllowedHandler(final Handler<HttpServerRequest> next) {

@Override
public void handle(final HttpServerRequest request) {
if (request.method() != HttpMethod.POST) {
request.response().setStatusCode(METHOD_NOT_ALLOWED.code()).end();

logger.warn("Only POST method is allowed. Method not allowed: {}",
keyValue("method", request.method())
);
if (HttpMethod.POST.equals(request.method()) || isControlPlaneProbeRequest(request)) {
this.next.handle(request);
return;
}
this.next.handle(request);
request.response().setStatusCode(METHOD_NOT_ALLOWED.code()).end();
logger.warn("Method not allowed: {}", keyValue("method", request.method()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import dev.knative.eventing.kafka.broker.core.security.AuthProvider;
import dev.knative.eventing.kafka.broker.core.testing.CloudEventSerializerMock;
import dev.knative.eventing.kafka.broker.receiver.impl.handler.IngressRequestHandlerImpl;
import dev.knative.eventing.kafka.broker.receiver.impl.handler.ControlPlaneProbeRequestUtil;
import dev.knative.eventing.kafka.broker.receiver.main.ReceiverEnv;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.v1.CloudEventBuilder;
Expand All @@ -36,6 +37,7 @@
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.ext.web.client.HttpRequest;
import io.vertx.ext.web.client.HttpResponse;
Expand Down Expand Up @@ -65,6 +67,7 @@
import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED;
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -329,6 +332,32 @@ private static List<TestCase> getValidNonValidEvents() {
.addTopics("topic-name-42")
.setIngress(DataPlaneContract.Ingress.newBuilder().setPath("/broker-ns/broker-name5"))
.build()
),
new TestCase(
new ProducerRecord<>(
"topic-name-42",
null,
null
),
"/broker-ns/broker-name5",
probeRequestSender(null),
OK.code(),
DataPlaneContract.Resource.newBuilder()
.setUid("1")
.addTopics("topic-name-42")
.setIngress(DataPlaneContract.Ingress.newBuilder().setPath("/broker-ns/broker-name5"))
.build()
),
new TestCase(
new ProducerRecord<>(
"topic-name-42",
null,
null
),
"/broker-ns/broker-name5",
probeRequestSender(null),
NOT_FOUND.code(),
DataPlaneContract.Resource.newBuilder().build()
)
);
}
Expand All @@ -338,6 +367,14 @@ private static Function<HttpRequest<Buffer>, Future<HttpResponse<Buffer>>> ceReq
return request -> VertxMessageFactory.createWriter(request).writeBinary(event);
}

private static Function<HttpRequest<Buffer>, Future<HttpResponse<Buffer>>> probeRequestSender(
final CloudEvent event) {
return request -> request
.method(HttpMethod.GET)
.putHeader(ControlPlaneProbeRequestUtil.PROBE_HEADER_NAME, ControlPlaneProbeRequestUtil.PROBE_HEADER_VALUE)
.send();
}

static final class TestCase {

final ProducerRecord<String, CloudEvent> record;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright © 2018 Knative Authors (knative-dev@googlegroups.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.receiver.impl.handler;

import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.impl.headers.HeadersMultiMap;
import org.junit.jupiter.api.Test;

import static dev.knative.eventing.kafka.broker.receiver.impl.handler.ControlPlaneProbeRequestUtil.PROBE_HEADER_NAME;
import static dev.knative.eventing.kafka.broker.receiver.impl.handler.ControlPlaneProbeRequestUtil.PROBE_HEADER_VALUE;
import static dev.knative.eventing.kafka.broker.receiver.impl.handler.ControlPlaneProbeRequestUtil.isControlPlaneProbeRequest;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class ProbeRequestUtilTest {

@Test
public void shouldBeProbeRequest() {
final var headers = new HeadersMultiMap()
.add(PROBE_HEADER_NAME, PROBE_HEADER_VALUE);
final var request = mock(HttpServerRequest.class);
when(request.method()).thenReturn(HttpMethod.GET);
when(request.headers()).thenReturn(headers);

assertThat(isControlPlaneProbeRequest(request)).isTrue();
}

@Test
public void shouldNotBeProbeRequestWrongValue() {
final var headers = new HeadersMultiMap()
.add(PROBE_HEADER_NAME, PROBE_HEADER_VALUE + "a");
final var request = mock(HttpServerRequest.class);
when(request.method()).thenReturn(HttpMethod.GET);
when(request.headers()).thenReturn(headers);

assertThat(isControlPlaneProbeRequest(request)).isFalse();
}

@Test
public void shouldNotBeProbeRequestNoProbeHeader() {
final var headers = new HeadersMultiMap();
final var request = mock(HttpServerRequest.class);
when(request.method()).thenReturn(HttpMethod.GET);
when(request.headers()).thenReturn(headers);

assertThat(isControlPlaneProbeRequest(request)).isFalse();
}

@Test
public void shouldNotBeProbeRequestWrongMethod() {
final var headers = new HeadersMultiMap();
final var request = mock(HttpServerRequest.class);
when(request.method()).thenReturn(HttpMethod.POST);
when(request.headers()).thenReturn(headers);

assertThat(isControlPlaneProbeRequest(request)).isFalse();
}
}

0 comments on commit da9c0fb

Please sign in to comment.