diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java index 8c8637c9fe..5484c0d065 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java @@ -37,7 +37,10 @@ import java.util.function.Function; import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue; +import static dev.knative.eventing.kafka.broker.receiver.impl.handler.ProbeRequestUtil.PROBE_HASH_HEADER_NAME; +import static dev.knative.eventing.kafka.broker.receiver.impl.handler.ProbeRequestUtil.isProbeRequest; import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; /** * This verticle is responsible for implementing the logic of the receiver. @@ -118,10 +121,14 @@ public void handle(HttpServerRequest request) { IngressProducer producer = this.ingressProducerStore.resolve(request.path()); if (producer == null) { request.response().setStatusCode(NOT_FOUND.code()).end(); + logger.warn("Resource not found {}", keyValue("path", request.path())); + return; + } - logger.warn("Resource not found {}", - keyValue("path", request.path()) - ); + if (isProbeRequest(request)) { + request.response() + .putHeader(PROBE_HASH_HEADER_NAME, request.getHeader(PROBE_HASH_HEADER_NAME)) + .setStatusCode(OK.code()).end(); return; } diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/MethodNotAllowedHandler.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/MethodNotAllowedHandler.java index 12190dd1df..ff6182bc6a 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/MethodNotAllowedHandler.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/MethodNotAllowedHandler.java @@ -22,6 +22,7 @@ import org.slf4j.LoggerFactory; import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue; +import static dev.knative.eventing.kafka.broker.receiver.impl.handler.ProbeRequestUtil.*; import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED; /** @@ -41,14 +42,11 @@ public MethodNotAllowedHandler(final Handler next) { @Override public void handle(final HttpServerRequest request) { - if (request.method() != HttpMethod.POST) { - request.response().setStatusCode(METHOD_NOT_ALLOWED.code()).end(); - - logger.warn("Only POST method is allowed. Method not allowed: {}", - keyValue("method", request.method()) - ); + if (HttpMethod.POST.equals(request.method()) || isProbeRequest(request)) { + this.next.handle(request); return; } - this.next.handle(request); + request.response().setStatusCode(METHOD_NOT_ALLOWED.code()).end(); + logger.warn("Method not allowed: {}", keyValue("method", request.method())); } } diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/ProbeRequestUtil.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/ProbeRequestUtil.java new file mode 100644 index 0000000000..f4e5c1a14e --- /dev/null +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/ProbeRequestUtil.java @@ -0,0 +1,35 @@ +/* + * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.knative.eventing.kafka.broker.receiver.impl.handler; + +import io.vertx.core.http.HttpMethod; +import io.vertx.core.http.HttpServerRequest; + +public class ProbeRequestUtil { + + public static final String PROBE_HEADER_NAME = "K-Network-Probe"; + + public static final String PROBE_HEADER_VALUE = "probe"; + + public static final String PROBE_HASH_HEADER_NAME = "K-Network-Hash"; + + public static boolean isProbeRequest(final HttpServerRequest request) { + final var headers = request.headers(); + return HttpMethod.GET.equals(request.method()) && + headers.contains(PROBE_HEADER_NAME) && + headers.get(PROBE_HEADER_NAME).equals(PROBE_HEADER_VALUE); + } +} diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTest.java index c0c2f35dc5..4f71483d3b 100644 --- a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTest.java +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTest.java @@ -26,6 +26,7 @@ import dev.knative.eventing.kafka.broker.core.security.AuthProvider; import dev.knative.eventing.kafka.broker.core.testing.CloudEventSerializerMock; import dev.knative.eventing.kafka.broker.receiver.impl.handler.IngressRequestHandlerImpl; +import dev.knative.eventing.kafka.broker.receiver.impl.handler.ProbeRequestUtil; import dev.knative.eventing.kafka.broker.receiver.main.ReceiverEnv; import io.cloudevents.CloudEvent; import io.cloudevents.core.v1.CloudEventBuilder; @@ -36,6 +37,7 @@ import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpMethod; import io.vertx.core.http.HttpServerOptions; import io.vertx.ext.web.client.HttpRequest; import io.vertx.ext.web.client.HttpResponse; @@ -65,6 +67,7 @@ import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED; import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -329,6 +332,32 @@ private static List getValidNonValidEvents() { .addTopics("topic-name-42") .setIngress(DataPlaneContract.Ingress.newBuilder().setPath("/broker-ns/broker-name5")) .build() + ), + new TestCase( + new ProducerRecord<>( + "topic-name-42", + null, + null + ), + "/broker-ns/broker-name5", + probeRequestSender(null), + OK.code(), + DataPlaneContract.Resource.newBuilder() + .setUid("1") + .addTopics("topic-name-42") + .setIngress(DataPlaneContract.Ingress.newBuilder().setPath("/broker-ns/broker-name5")) + .build() + ), + new TestCase( + new ProducerRecord<>( + "topic-name-42", + null, + null + ), + "/broker-ns/broker-name5", + probeRequestSender(null), + NOT_FOUND.code(), + DataPlaneContract.Resource.newBuilder().build() ) ); } @@ -338,6 +367,14 @@ private static Function, Future>> ceReq return request -> VertxMessageFactory.createWriter(request).writeBinary(event); } + private static Function, Future>> probeRequestSender( + final CloudEvent event) { + return request -> request + .method(HttpMethod.GET) + .putHeader(ProbeRequestUtil.PROBE_HEADER_NAME, ProbeRequestUtil.PROBE_HEADER_VALUE) + .send(); + } + static final class TestCase { final ProducerRecord record; diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/ProbeRequestUtilTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/ProbeRequestUtilTest.java new file mode 100644 index 0000000000..4aac0a7d69 --- /dev/null +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/ProbeRequestUtilTest.java @@ -0,0 +1,73 @@ +/* + * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.knative.eventing.kafka.broker.receiver.impl.handler; + +import io.vertx.core.http.HttpMethod; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.http.impl.headers.HeadersMultiMap; +import org.junit.jupiter.api.Test; + +import static dev.knative.eventing.kafka.broker.receiver.impl.handler.ProbeRequestUtil.PROBE_HEADER_NAME; +import static dev.knative.eventing.kafka.broker.receiver.impl.handler.ProbeRequestUtil.PROBE_HEADER_VALUE; +import static dev.knative.eventing.kafka.broker.receiver.impl.handler.ProbeRequestUtil.isProbeRequest; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ProbeRequestUtilTest { + + @Test + public void shouldBeProbeRequest() { + final var headers = new HeadersMultiMap() + .add(PROBE_HEADER_NAME, PROBE_HEADER_VALUE); + final var request = mock(HttpServerRequest.class); + when(request.method()).thenReturn(HttpMethod.GET); + when(request.headers()).thenReturn(headers); + + assertThat(isProbeRequest(request)).isTrue(); + } + + @Test + public void shouldNotBeProbeRequestWrongValue() { + final var headers = new HeadersMultiMap() + .add(PROBE_HEADER_NAME, PROBE_HEADER_VALUE + "a"); + final var request = mock(HttpServerRequest.class); + when(request.method()).thenReturn(HttpMethod.GET); + when(request.headers()).thenReturn(headers); + + assertThat(isProbeRequest(request)).isFalse(); + } + + @Test + public void shouldNotBeProbeRequestNoProbeHeader() { + final var headers = new HeadersMultiMap(); + final var request = mock(HttpServerRequest.class); + when(request.method()).thenReturn(HttpMethod.GET); + when(request.headers()).thenReturn(headers); + + assertThat(isProbeRequest(request)).isFalse(); + } + + @Test + public void shouldNotBeProbeRequestWrongMethod() { + final var headers = new HeadersMultiMap(); + final var request = mock(HttpServerRequest.class); + when(request.method()).thenReturn(HttpMethod.POST); + when(request.headers()).thenReturn(headers); + + assertThat(isProbeRequest(request)).isFalse(); + } +}