From 2fde8a55b7e1ea7d3fc34e56697b90aaafe04298 Mon Sep 17 00:00:00 2001 From: Egor Lepa Date: Mon, 4 Jul 2022 14:55:48 +0300 Subject: [PATCH 01/15] enhancing failed events with knativeerror* extensions before sending to dls --- .../dispatcher/impl/RecordDispatcherImpl.java | 56 ++++++++++++++++--- 1 file changed, 48 insertions(+), 8 deletions(-) diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java index 9b22c45d5b..ea7119f948 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java @@ -17,15 +17,12 @@ import dev.knative.eventing.kafka.broker.core.AsyncCloseable; import dev.knative.eventing.kafka.broker.core.metrics.Metrics; -import dev.knative.eventing.kafka.broker.dispatcher.CloudEventSender; -import dev.knative.eventing.kafka.broker.dispatcher.Filter; -import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcher; -import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcherListener; -import dev.knative.eventing.kafka.broker.dispatcher.ResponseHandler; +import dev.knative.eventing.kafka.broker.dispatcher.*; import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.CloudEventDeserializer; import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.InvalidCloudEvent; import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.KafkaConsumerRecordUtils; import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Tags; @@ -37,6 +34,7 @@ import io.vertx.kafka.client.common.tracing.ConsumerTracer; import io.vertx.kafka.client.consumer.KafkaConsumerRecord; import io.vertx.kafka.client.consumer.impl.KafkaConsumerRecordImpl; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -258,9 +256,51 @@ private void onSubscriberFailure(final Throwable failure, incrementEventCount(response, recordContext); recordDispatchLatency(response, recordContext); - dlsSender.apply(recordContext.getRecord()) - .onSuccess(v -> onDeadLetterSinkSuccess(recordContext, finalProm)) - .onFailure(ex -> onDeadLetterSinkFailure(recordContext, ex, finalProm)); + ConsumerRecordContext transformedRecordContext = errorTransform(recordContext, response); + + dlsSender.apply(transformedRecordContext.getRecord()) + .onSuccess(v -> onDeadLetterSinkSuccess(transformedRecordContext, finalProm)) + .onFailure(ex -> onDeadLetterSinkFailure(transformedRecordContext, ex, finalProm)); + } + + private ConsumerRecordContext errorTransform(final ConsumerRecordContext recordContext, @Nullable final HttpResponse response) { + if (response == null) { + return recordContext; + } + + String url = resourceContext.getEgress().getDestination(); + int code = response.statusCode(); + String data = response.bodyAsString(); + if (data.length() > 1024) { + data = data.substring(0, 1024); + } + + CloudEvent cloudEvent = recordContext.getRecord().value(); + + CloudEvent transformedCloudEvent = CloudEventBuilder.v1(cloudEvent) + .withExtension("knativeerrordest", url) + .withExtension("knativeerrorcode", code) + .withExtension("knativeerrordata", data) + .build(); + + ConsumerRecord cr = new ConsumerRecord<>( + recordContext.getRecord().record().topic(), + recordContext.getRecord().record().partition(), + recordContext.getRecord().record().offset(), + recordContext.getRecord().record().timestamp(), + recordContext.getRecord().record().timestampType(), + null, + recordContext.getRecord().record().serializedKeySize(), + recordContext.getRecord().record().serializedValueSize(), + recordContext.getRecord().record().key(), + transformedCloudEvent, + recordContext.getRecord().record().headers(), + recordContext.getRecord().record().leaderEpoch() + ); + + KafkaConsumerRecordImpl kcr = new KafkaConsumerRecordImpl<>(cr); + + return new ConsumerRecordContext(kcr); } private void onDeadLetterSinkSuccess(final ConsumerRecordContext recordContext, From c2fb6fab5194a4a50f7d0de4d3b73ab8b38621b0 Mon Sep 17 00:00:00 2001 From: Egor Lepa Date: Mon, 4 Jul 2022 19:07:58 +0300 Subject: [PATCH 02/15] enhancing failed events with knativeerror* extensions before sending to dls --- .../dispatcher/impl/RecordDispatcherImpl.java | 37 ++-- .../dispatcher/impl/RecordDispatcherTest.java | 166 ++++++++++++++++-- 2 files changed, 174 insertions(+), 29 deletions(-) diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java index ea7119f948..6188110a96 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java @@ -40,6 +40,8 @@ import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -66,6 +68,11 @@ public class RecordDispatcherImpl implements RecordDispatcher { // Invalid cloud event records that are discarded by dispatch may not have a record type. So we set Tag as below. private static final Tag INVALID_EVENT_TYPE_TAG = Tag.of(Metrics.Tags.EVENT_TYPE, "InvalidCloudEvent"); + private static final String KN_ERROR_DEST_EXT_NAME = "knativeerrordest"; + private static final String KN_ERROR_CODE_EXT_NAME = "knativeerrorcode"; + private static final String KN_ERROR_DATA_EXT_NAME = "knativeerrordata"; + private static final int KN_ERROR_DATA_MAX_BYTES = 1024; + private final Filter filter; private final Function, Future>> subscriberSender; private final Function, Future>> dlsSender; @@ -256,6 +263,7 @@ private void onSubscriberFailure(final Throwable failure, incrementEventCount(response, recordContext); recordDispatchLatency(response, recordContext); + // enhance event with extension attributes prior to forwarding to the dead letter sink ConsumerRecordContext transformedRecordContext = errorTransform(recordContext, response); dlsSender.apply(transformedRecordContext.getRecord()) @@ -264,24 +272,33 @@ private void onSubscriberFailure(final Throwable failure, } private ConsumerRecordContext errorTransform(final ConsumerRecordContext recordContext, @Nullable final HttpResponse response) { + String destination = resourceContext.getEgress().getDestination(); if (response == null) { - return recordContext; + // if response is null we still want to add destination + return addExtensions(recordContext, Map.of(KN_ERROR_DEST_EXT_NAME, destination)); } - String url = resourceContext.getEgress().getDestination(); - int code = response.statusCode(); + Map extensions = new HashMap<>(); + extensions.put(KN_ERROR_DEST_EXT_NAME, destination); + extensions.put(KN_ERROR_CODE_EXT_NAME, String.valueOf(response.statusCode())); String data = response.bodyAsString(); - if (data.length() > 1024) { - data = data.substring(0, 1024); + if (data != null) { + if (data.length() > KN_ERROR_DATA_MAX_BYTES) { + data = data.substring(0, KN_ERROR_DATA_MAX_BYTES); + } + extensions.put(KN_ERROR_DATA_EXT_NAME, data); } + return addExtensions(recordContext, extensions); + } + + // creates a new instance of ConsumerRecordContext with added extension attributes for underlying CloudEvent + private ConsumerRecordContext addExtensions(final ConsumerRecordContext recordContext, final Map extensions) { CloudEvent cloudEvent = recordContext.getRecord().value(); - CloudEvent transformedCloudEvent = CloudEventBuilder.v1(cloudEvent) - .withExtension("knativeerrordest", url) - .withExtension("knativeerrorcode", code) - .withExtension("knativeerrordata", data) - .build(); + CloudEventBuilder builder = CloudEventBuilder.v1(cloudEvent); + extensions.forEach(builder::withExtension); + CloudEvent transformedCloudEvent = builder.build(); ConsumerRecord cr = new ConsumerRecord<>( recordContext.getRecord().record().topic(), diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java index 9f549995ca..ea8285f17b 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java @@ -18,19 +18,17 @@ import dev.knative.eventing.kafka.broker.contract.DataPlaneContract; import dev.knative.eventing.kafka.broker.core.metrics.Metrics; import dev.knative.eventing.kafka.broker.core.testing.CoreObjects; -import dev.knative.eventing.kafka.broker.dispatcher.CloudEventSender; -import dev.knative.eventing.kafka.broker.dispatcher.CloudEventSenderMock; -import dev.knative.eventing.kafka.broker.dispatcher.Filter; -import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcher; -import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcherListener; -import dev.knative.eventing.kafka.broker.dispatcher.ResponseHandler; -import dev.knative.eventing.kafka.broker.dispatcher.ResponseHandlerMock; +import dev.knative.eventing.kafka.broker.dispatcher.*; import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.InvalidCloudEvent; import io.cloudevents.CloudEvent; import io.micrometer.core.instrument.search.MeterNotFoundException; import io.micrometer.prometheus.PrometheusConfig; import io.micrometer.prometheus.PrometheusMeterRegistry; import io.vertx.core.Future; +import io.vertx.core.MultiMap; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpVersion; +import io.vertx.ext.web.client.impl.HttpResponseImpl; import io.vertx.junit5.VertxExtension; import io.vertx.junit5.VertxTestContext; import io.vertx.kafka.client.consumer.KafkaConsumerRecord; @@ -41,27 +39,23 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import java.util.Collections; import java.util.concurrent.atomic.AtomicBoolean; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; +import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; @ExtendWith(VertxExtension.class) public class RecordDispatcherTest { private static final ResourceContext resourceContext = new ResourceContext( DataPlaneContract.Resource.newBuilder().build(), - DataPlaneContract.Egress.newBuilder().build() + DataPlaneContract.Egress.newBuilder().setDestination("testdest").build() ); static { @@ -184,7 +178,7 @@ final var record = record(); assertTrue(subscriberSenderSendCalled.get()); assertTrue(dlsSenderSendCalled.get()); verify(receiver, times(1)).recordReceived(record); - verify(receiver, times(1)).successfullySentToDeadLetterSink(record); + verify(receiver, times(1)).successfullySentToDeadLetterSink(any()); verify(receiver, never()).successfullySentToSubscriber(any()); verify(receiver, never()).failedToSendToDeadLetterSink(any(), any()); verify(receiver, never()).recordDiscarded(any()); @@ -228,7 +222,7 @@ final var record = record(); assertTrue(subscriberSenderSendCalled.get()); assertTrue(dlsSenderSendCalled.get()); verify(receiver, times(1)).recordReceived(record); - verify(receiver, times(1)).failedToSendToDeadLetterSink(eq(record), any()); + verify(receiver, times(1)).failedToSendToDeadLetterSink(any(), any()); verify(receiver, never()).successfullySentToDeadLetterSink(any()); verify(receiver, never()).successfullySentToSubscriber(any()); verify(receiver, never()).recordDiscarded(any()); @@ -239,6 +233,140 @@ final var record = record(); assertNoDiscardedEventCount(); } + @Test + public void failedEventsShouldBeEnhancedWithErrorExtensionsPriorToSendingToDls() { + + final var subscriberSenderSendCalled = new AtomicBoolean(false); + final var dlsSenderSendCalled = new AtomicBoolean(false); + final RecordDispatcherListener receiver = offsetManagerMock(); + + int errorCode = 422; + String errorBody = "{ \"message\": \"bad bad things happened\" }"; + + final var dispatcherHandler = new RecordDispatcherImpl( + resourceContext, + value -> true, + new CloudEventSenderMock( + record -> { + subscriberSenderSendCalled.set(true); + return Future.failedFuture(new ResponseFailureException(makeHttpResponse(errorCode, errorBody), "")); + } + ), + new CloudEventSenderMock( + record -> { + dlsSenderSendCalled.set(true); + return Future.succeededFuture(); + } + ), new ResponseHandlerMock(), + receiver, + null, + registry + ); + final var record = record(); + dispatcherHandler.dispatch(record); + + ArgumentCaptor> captor = ArgumentCaptor.forClass(KafkaConsumerRecord.class); + + assertTrue(subscriberSenderSendCalled.get()); + assertTrue(dlsSenderSendCalled.get()); + verify(receiver, times(1)).recordReceived(record); + verify(receiver, times(1)).successfullySentToDeadLetterSink(captor.capture()); + verify(receiver, never()).successfullySentToSubscriber(any()); + verify(receiver, never()).failedToSendToDeadLetterSink(any(), any()); + verify(receiver, never()).recordDiscarded(any()); + + KafkaConsumerRecord failedRecord = captor.getValue(); + assertEquals(record.topic(), failedRecord.topic()); + assertEquals(record.partition(), failedRecord.partition()); + assertEquals(record.offset(), failedRecord.offset()); + assertEquals(record.key(), failedRecord.key()); + assertEquals(record.value().getId(), failedRecord.value().getId()); + assertEquals(record.value().getAttributeNames(), failedRecord.value().getAttributeNames()); + assertEquals(record.value().getData(), failedRecord.value().getData()); + assertEquals("testdest", failedRecord.value().getExtension("knativeerrordest")); + assertEquals(String.valueOf(errorCode), failedRecord.value().getExtension("knativeerrorcode")); + assertEquals(errorBody, failedRecord.value().getExtension("knativeerrordata")); + + assertEventDispatchLatency(); + assertEventProcessingLatency(); + assertEventCount(); + assertNoDiscardedEventCount(); + } + + @Test + public void failedEventsShouldBeEnhancedWithErrorExtensionsPriorToSendingToDlsBodyTooLarge() { + + final var subscriberSenderSendCalled = new AtomicBoolean(false); + final var dlsSenderSendCalled = new AtomicBoolean(false); + final RecordDispatcherListener receiver = offsetManagerMock(); + + int errorCode = 422; + String errorBody = "A".repeat(1024); + String errorBodyTooLarge = errorBody + "QWERTY"; + + final var dispatcherHandler = new RecordDispatcherImpl( + resourceContext, + value -> true, + new CloudEventSenderMock( + record -> { + subscriberSenderSendCalled.set(true); + return Future.failedFuture(new ResponseFailureException(makeHttpResponse(errorCode, errorBodyTooLarge), "")); + } + ), + new CloudEventSenderMock( + record -> { + dlsSenderSendCalled.set(true); + return Future.succeededFuture(); + } + ), new ResponseHandlerMock(), + receiver, + null, + registry + ); + final var record = record(); + dispatcherHandler.dispatch(record); + + ArgumentCaptor> captor = ArgumentCaptor.forClass(KafkaConsumerRecord.class); + + assertTrue(subscriberSenderSendCalled.get()); + assertTrue(dlsSenderSendCalled.get()); + verify(receiver, times(1)).recordReceived(record); + verify(receiver, times(1)).successfullySentToDeadLetterSink(captor.capture()); + verify(receiver, never()).successfullySentToSubscriber(any()); + verify(receiver, never()).failedToSendToDeadLetterSink(any(), any()); + verify(receiver, never()).recordDiscarded(any()); + + KafkaConsumerRecord failedRecord = captor.getValue(); + assertEquals(record.topic(), failedRecord.topic()); + assertEquals(record.partition(), failedRecord.partition()); + assertEquals(record.offset(), failedRecord.offset()); + assertEquals(record.key(), failedRecord.key()); + assertEquals(record.value().getId(), failedRecord.value().getId()); + assertEquals(record.value().getAttributeNames(), failedRecord.value().getAttributeNames()); + assertEquals(record.value().getData(), failedRecord.value().getData()); + assertEquals("testdest", failedRecord.value().getExtension("knativeerrordest")); + assertEquals(String.valueOf(errorCode), failedRecord.value().getExtension("knativeerrorcode")); + assertEquals(errorBody, failedRecord.value().getExtension("knativeerrordata")); + + assertEventDispatchLatency(); + assertEventProcessingLatency(); + assertEventCount(); + assertNoDiscardedEventCount(); + } + + private HttpResponseImpl makeHttpResponse(int statusCode, String body) { + return new HttpResponseImpl( + HttpVersion.HTTP_2, + statusCode, + "", + MultiMap.caseInsensitiveMultiMap(), + MultiMap.caseInsensitiveMultiMap(), + Collections.emptyList(), + Buffer.buffer(body, "UTF-8"), + Collections.emptyList() + ); + } + @Test public void shouldCallFailedToSendToDeadLetterSinkIfValueMatchesAndSubscriberSenderFailsAndNoDeadLetterSinkSender() { final var subscriberSenderSendCalled = new AtomicBoolean(false); @@ -265,7 +393,7 @@ final var record = record(); assertTrue(subscriberSenderSendCalled.get()); verify(receiver, times(1)).recordReceived(record); - verify(receiver, times(1)).failedToSendToDeadLetterSink(eq(record), any()); + verify(receiver, times(1)).failedToSendToDeadLetterSink(any(), any()); verify(receiver, never()).successfullySentToDeadLetterSink(any()); verify(receiver, never()).successfullySentToSubscriber(any()); verify(receiver, never()).recordDiscarded(any()); From 382bb1015b4627f2d76d531372faecd2cff9b81e Mon Sep 17 00:00:00 2001 From: Egor Lepa Date: Mon, 4 Jul 2022 19:21:21 +0300 Subject: [PATCH 03/15] fix star import --- .../broker/dispatcher/impl/RecordDispatcherImpl.java | 6 +++++- .../broker/dispatcher/impl/RecordDispatcherTest.java | 8 +++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java index 6188110a96..502a972587 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java @@ -17,7 +17,11 @@ import dev.knative.eventing.kafka.broker.core.AsyncCloseable; import dev.knative.eventing.kafka.broker.core.metrics.Metrics; -import dev.knative.eventing.kafka.broker.dispatcher.*; +import dev.knative.eventing.kafka.broker.dispatcher.CloudEventSender; +import dev.knative.eventing.kafka.broker.dispatcher.Filter; +import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcher; +import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcherListener; +import dev.knative.eventing.kafka.broker.dispatcher.ResponseHandler; import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.CloudEventDeserializer; import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.InvalidCloudEvent; import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.KafkaConsumerRecordUtils; diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java index ea8285f17b..c11d828deb 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java @@ -18,7 +18,13 @@ import dev.knative.eventing.kafka.broker.contract.DataPlaneContract; import dev.knative.eventing.kafka.broker.core.metrics.Metrics; import dev.knative.eventing.kafka.broker.core.testing.CoreObjects; -import dev.knative.eventing.kafka.broker.dispatcher.*; +import dev.knative.eventing.kafka.broker.dispatcher.CloudEventSender; +import dev.knative.eventing.kafka.broker.dispatcher.CloudEventSenderMock; +import dev.knative.eventing.kafka.broker.dispatcher.Filter; +import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcher; +import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcherListener; +import dev.knative.eventing.kafka.broker.dispatcher.ResponseHandler; +import dev.knative.eventing.kafka.broker.dispatcher.ResponseHandlerMock; import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.InvalidCloudEvent; import io.cloudevents.CloudEvent; import io.micrometer.core.instrument.search.MeterNotFoundException; From 95cf5a76a27233f3b63a27882c430c87241a2183 Mon Sep 17 00:00:00 2001 From: Egor Lepa Date: Wed, 6 Jul 2022 12:53:14 +0300 Subject: [PATCH 04/15] final vars & e2e draft --- .../dispatcher/impl/RecordDispatcherImpl.java | 18 +- test/e2e_new/dls_extenisons_test.go | 169 ++++++++++++++++++ 2 files changed, 178 insertions(+), 9 deletions(-) create mode 100644 test/e2e_new/dls_extenisons_test.go diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java index 502a972587..6d7c8c3e3c 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java @@ -268,7 +268,7 @@ private void onSubscriberFailure(final Throwable failure, recordDispatchLatency(response, recordContext); // enhance event with extension attributes prior to forwarding to the dead letter sink - ConsumerRecordContext transformedRecordContext = errorTransform(recordContext, response); + final var transformedRecordContext = errorTransform(recordContext, response); dlsSender.apply(transformedRecordContext.getRecord()) .onSuccess(v -> onDeadLetterSinkSuccess(transformedRecordContext, finalProm)) @@ -276,16 +276,16 @@ private void onSubscriberFailure(final Throwable failure, } private ConsumerRecordContext errorTransform(final ConsumerRecordContext recordContext, @Nullable final HttpResponse response) { - String destination = resourceContext.getEgress().getDestination(); + final var destination = resourceContext.getEgress().getDestination(); if (response == null) { // if response is null we still want to add destination return addExtensions(recordContext, Map.of(KN_ERROR_DEST_EXT_NAME, destination)); } - Map extensions = new HashMap<>(); + final var extensions = new HashMap(); extensions.put(KN_ERROR_DEST_EXT_NAME, destination); extensions.put(KN_ERROR_CODE_EXT_NAME, String.valueOf(response.statusCode())); - String data = response.bodyAsString(); + var data = response.bodyAsString(); if (data != null) { if (data.length() > KN_ERROR_DATA_MAX_BYTES) { data = data.substring(0, KN_ERROR_DATA_MAX_BYTES); @@ -298,13 +298,13 @@ private ConsumerRecordContext errorTransform(final ConsumerRecordContext recordC // creates a new instance of ConsumerRecordContext with added extension attributes for underlying CloudEvent private ConsumerRecordContext addExtensions(final ConsumerRecordContext recordContext, final Map extensions) { - CloudEvent cloudEvent = recordContext.getRecord().value(); + final var cloudEvent = recordContext.getRecord().value(); - CloudEventBuilder builder = CloudEventBuilder.v1(cloudEvent); + final var builder = CloudEventBuilder.v1(cloudEvent); extensions.forEach(builder::withExtension); - CloudEvent transformedCloudEvent = builder.build(); + final var transformedCloudEvent = builder.build(); - ConsumerRecord cr = new ConsumerRecord<>( + final var cr = new ConsumerRecord<>( recordContext.getRecord().record().topic(), recordContext.getRecord().record().partition(), recordContext.getRecord().record().offset(), @@ -319,7 +319,7 @@ private ConsumerRecordContext addExtensions(final ConsumerRecordContext recordCo recordContext.getRecord().record().leaderEpoch() ); - KafkaConsumerRecordImpl kcr = new KafkaConsumerRecordImpl<>(cr); + final var kcr = new KafkaConsumerRecordImpl<>(cr); return new ConsumerRecordContext(kcr); } diff --git a/test/e2e_new/dls_extenisons_test.go b/test/e2e_new/dls_extenisons_test.go new file mode 100644 index 0000000000..cb57c59fff --- /dev/null +++ b/test/e2e_new/dls_extenisons_test.go @@ -0,0 +1,169 @@ +//go:build e2e +// +build e2e + +/* + * Copyright 2021 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package e2e_new + +import ( + "context" + "knative.dev/reconciler-test/pkg/environment" + "knative.dev/reconciler-test/resources/svc" + "testing" + + cloudevents "github.com/cloudevents/sdk-go/v2" + cetest "github.com/cloudevents/sdk-go/v2/test" + "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" + "knative.dev/eventing-kafka-broker/test/e2e_new/single_partition_config" + "knative.dev/eventing/test/rekt/resources/broker" + "knative.dev/eventing/test/rekt/resources/trigger" + "knative.dev/pkg/system" + "knative.dev/reconciler-test/pkg/eventshub" + "knative.dev/reconciler-test/pkg/feature" + "knative.dev/reconciler-test/pkg/k8s" + "knative.dev/reconciler-test/pkg/knative" + + . "knative.dev/reconciler-test/pkg/eventshub/assert" +) + +func TestDeadLetterSinkExtensions(t *testing.T) { + t.Parallel() + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.Managed(t), + ) + + env.Test(ctx, t, SubscriberUnreachable()) + env.Test(ctx, t, SubscriberReturnedErrorNoData()) +} + +func SubscriberUnreachable() *feature.Feature { + f := feature.NewFeature() + + sourceName := feature.MakeRandomK8sName("source") + deadLetterSinkName := feature.MakeRandomK8sName("dls") + triggerName := feature.MakeRandomK8sName("trigger") + brokerName := feature.MakeRandomK8sName("broker") + + ev := cetest.FullEvent() + + f.Setup("install one partition configuration", single_partition_config.Install) + f.Setup("install broker", broker.Install( + brokerName, + broker.WithBrokerClass(kafka.BrokerClass), + broker.WithConfig(single_partition_config.ConfigMapName), + )) + f.Setup("broker is ready", broker.IsReady(brokerName)) + f.Setup("broker is addressable", broker.IsAddressable(brokerName)) + + f.Setup("install dead letter sink", eventshub.Install( + deadLetterSinkName, + eventshub.StartReceiver, + )) + f.Setup("install trigger", trigger.Install( + triggerName, + brokerName, + trigger.WithSubscriber(nil, "fake.svc.cluster.local"), + trigger.WithDeadLetterSink(svc.AsKReference(deadLetterSinkName), ""), + )) + f.Setup("trigger is ready", trigger.IsReady(triggerName)) + + f.Setup("install source", eventshub.Install( + sourceName, + eventshub.StartSenderToResource(broker.GVR(), brokerName), + eventshub.InputEventWithEncoding(ev, cloudevents.EncodingBinary), + eventshub.AddSequence, + eventshub.SendMultipleEvents(1, 0), + )) + + f.Assert("knativeerrordest added", assertEnhancedWithKnativeErrorExtensions( + deadLetterSinkName, + cetest.HasExtension("knativeerrordest", "fake.svc.cluster.local"), + )) + + return f +} + +func SubscriberReturnedErrorNoData() *feature.Feature { + f := feature.NewFeature() + + sourceName := feature.MakeRandomK8sName("source") + sinkName := feature.MakeRandomK8sName("sink") + deadLetterSinkName := feature.MakeRandomK8sName("dls") + triggerName := feature.MakeRandomK8sName("trigger") + brokerName := feature.MakeRandomK8sName("broker") + + ev := cetest.FullEvent() + + f.Setup("install one partition configuration", single_partition_config.Install) + f.Setup("install broker", broker.Install( + brokerName, + broker.WithBrokerClass(kafka.BrokerClass), + broker.WithConfig(single_partition_config.ConfigMapName), + )) + f.Setup("broker is ready", broker.IsReady(brokerName)) + f.Setup("broker is addressable", broker.IsAddressable(brokerName)) + + f.Setup("install sink", eventshub.Install( + sinkName, + eventshub.StartReceiver, + eventshub.DropEventsResponseCode(422), + )) + f.Setup("install dead letter sink", eventshub.Install( + deadLetterSinkName, + eventshub.StartReceiver, + )) + f.Setup("install trigger", trigger.Install( + triggerName, + brokerName, + trigger.WithSubscriber(svc.AsKReference(sinkName), ""), + trigger.WithDeadLetterSink(svc.AsKReference(deadLetterSinkName), ""), + )) + f.Setup("trigger is ready", trigger.IsReady(triggerName)) + + f.Setup("install source", eventshub.Install( + sourceName, + eventshub.StartSenderToResource(broker.GVR(), brokerName), + eventshub.InputEventWithEncoding(ev, cloudevents.EncodingBinary), + eventshub.AddSequence, + eventshub.SendMultipleEvents(1, 0), + )) + + f.Assert("knativeerrordest & knativeerrorcode added", assertEnhancedWithKnativeErrorExtensions( + deadLetterSinkName, + cetest.HasExtension("knativeerrordest", "testdest"), + cetest.HasExtension("knativeerrorcode", "422"), + cetest.HasExtension("knativeerrordate", "testdata"), + )) + + return f +} + +func assertEnhancedWithKnativeErrorExtensions(sinkName string, matchers ...cetest.EventMatcher) feature.StepFn { + return func(ctx context.Context, t feature.T) { + _ = eventshub.StoreFromContext(ctx, sinkName).AssertExact( + t, + 1, + MatchKind(EventReceived), + MatchEvent(matchers...), + ) + } +} From 31efb726c5ebb693e02ae2604a55dffdacaecbff Mon Sep 17 00:00:00 2001 From: Egor Lepa Date: Wed, 6 Jul 2022 14:59:32 +0300 Subject: [PATCH 05/15] e2es --- ...enisons_test.go => dls_extensions_test.go} | 129 ++++++++++++++++-- 1 file changed, 116 insertions(+), 13 deletions(-) rename test/e2e_new/{dls_extenisons_test.go => dls_extensions_test.go} (54%) diff --git a/test/e2e_new/dls_extenisons_test.go b/test/e2e_new/dls_extensions_test.go similarity index 54% rename from test/e2e_new/dls_extenisons_test.go rename to test/e2e_new/dls_extensions_test.go index cb57c59fff..01d6b9ccfa 100644 --- a/test/e2e_new/dls_extenisons_test.go +++ b/test/e2e_new/dls_extensions_test.go @@ -21,10 +21,7 @@ package e2e_new import ( "context" - "knative.dev/reconciler-test/pkg/environment" - "knative.dev/reconciler-test/resources/svc" - "testing" - + "fmt" cloudevents "github.com/cloudevents/sdk-go/v2" cetest "github.com/cloudevents/sdk-go/v2/test" "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" @@ -32,12 +29,14 @@ import ( "knative.dev/eventing/test/rekt/resources/broker" "knative.dev/eventing/test/rekt/resources/trigger" "knative.dev/pkg/system" + "knative.dev/reconciler-test/pkg/environment" "knative.dev/reconciler-test/pkg/eventshub" + . "knative.dev/reconciler-test/pkg/eventshub/assert" "knative.dev/reconciler-test/pkg/feature" "knative.dev/reconciler-test/pkg/k8s" "knative.dev/reconciler-test/pkg/knative" - - . "knative.dev/reconciler-test/pkg/eventshub/assert" + "knative.dev/reconciler-test/resources/svc" + "testing" ) func TestDeadLetterSinkExtensions(t *testing.T) { @@ -53,6 +52,8 @@ func TestDeadLetterSinkExtensions(t *testing.T) { env.Test(ctx, t, SubscriberUnreachable()) env.Test(ctx, t, SubscriberReturnedErrorNoData()) + //env.Test(ctx, t, SubscriberReturnedErrorSmallData()) + //env.Test(ctx, t, SubscriberReturnedErrorLargeData()) } func SubscriberUnreachable() *feature.Feature { @@ -81,7 +82,7 @@ func SubscriberUnreachable() *feature.Feature { f.Setup("install trigger", trigger.Install( triggerName, brokerName, - trigger.WithSubscriber(nil, "fake.svc.cluster.local"), + trigger.WithSubscriber(nil, "http://fake.svc.cluster.local"), trigger.WithDeadLetterSink(svc.AsKReference(deadLetterSinkName), ""), )) f.Setup("trigger is ready", trigger.IsReady(triggerName)) @@ -96,7 +97,9 @@ func SubscriberUnreachable() *feature.Feature { f.Assert("knativeerrordest added", assertEnhancedWithKnativeErrorExtensions( deadLetterSinkName, - cetest.HasExtension("knativeerrordest", "fake.svc.cluster.local"), + func(ctx context.Context) cetest.EventMatcher { + return cetest.HasExtension("knativeerrordest", "http://fake.svc.cluster.local") + }, )) return f @@ -147,19 +150,119 @@ func SubscriberReturnedErrorNoData() *feature.Feature { eventshub.SendMultipleEvents(1, 0), )) + address, err := svc.Address(context.Background(), deadLetterSinkName) + f.Assert("deadLetterSinkName is addressable", func(ctx context.Context, t feature.T) { + if err != nil || address == nil { + t.Fail() + } + }) + f.Assert("knativeerrordest & knativeerrorcode added", assertEnhancedWithKnativeErrorExtensions( deadLetterSinkName, - cetest.HasExtension("knativeerrordest", "testdest"), - cetest.HasExtension("knativeerrorcode", "422"), - cetest.HasExtension("knativeerrordate", "testdata"), + func(ctx context.Context) cetest.EventMatcher { + dlsAddress, err := svc.Address(ctx, deadLetterSinkName) + if err != nil || dlsAddress == nil { + fmt.Println("dead letter sink is not addressable", err) + } + fmt.Println("dead letter sink address: ", dlsAddress.Path) + return cetest.HasExtension("knativeerrordest", dlsAddress.Path) + }, + func(ctx context.Context) cetest.EventMatcher { + return cetest.HasExtension("knativeerrorcode", "422") + }, )) return f } -func assertEnhancedWithKnativeErrorExtensions(sinkName string, matchers ...cetest.EventMatcher) feature.StepFn { +//func SubscriberReturnedErrorSmallData() *feature.Feature { +// f := feature.NewFeature() +// +// sourceName := feature.MakeRandomK8sName("source") +// sinkName := feature.MakeRandomK8sName("sink") +// deadLetterSinkName := feature.MakeRandomK8sName("dls") +// triggerName := feature.MakeRandomK8sName("trigger") +// brokerName := feature.MakeRandomK8sName("broker") +// +// ev := cetest.FullEvent() +// +// f.Setup("install one partition configuration", single_partition_config.Install) +// f.Setup("install broker", broker.Install( +// brokerName, +// broker.WithBrokerClass(kafka.BrokerClass), +// broker.WithConfig(single_partition_config.ConfigMapName), +// )) +// f.Setup("broker is ready", broker.IsReady(brokerName)) +// f.Setup("broker is addressable", broker.IsAddressable(brokerName)) +// +// f.Setup("install sink", eventshub.Install( +// sinkName, +// eventshub.StartReceiver, +// eventshub.DropEventsResponseCode(422), +// // todo add small data +// )) +// f.Setup("install dead letter sink", eventshub.Install( +// deadLetterSinkName, +// eventshub.StartReceiver, +// )) +// f.Setup("install trigger", trigger.Install( +// triggerName, +// brokerName, +// trigger.WithSubscriber(svc.AsKReference(sinkName), ""), +// trigger.WithDeadLetterSink(svc.AsKReference(deadLetterSinkName), ""), +// )) +// f.Setup("trigger is ready", trigger.IsReady(triggerName)) +// +// f.Setup("install source", eventshub.Install( +// sourceName, +// eventshub.StartSenderToResource(broker.GVR(), brokerName), +// eventshub.InputEventWithEncoding(ev, cloudevents.EncodingBinary), +// eventshub.AddSequence, +// eventshub.SendMultipleEvents(1, 0), +// )) +// +// address, err := svc.Address(context.Background(), deadLetterSinkName) +// f.Assert("deadLetterSinkName is addressable", func(ctx context.Context, t feature.T) { +// if err != nil || address == nil { +// t.Fail() +// } +// }) +// +// f.Assert("knativeerrordest & knativeerrorcode added", assertEnhancedWithKnativeErrorExtensions( +// deadLetterSinkName, +// func(ctx context.Context) cetest.EventMatcher { +// dlsAddress, err := svc.Address(ctx, deadLetterSinkName) +// if err != nil || dlsAddress == nil { +// fmt.Println("dead letter sink is not addressable", err) +// } +// fmt.Println("dead letter sink address: ", dlsAddress.Path) +// return cetest.HasExtension("knativeerrordest", dlsAddress.Path) +// }, +// func(ctx context.Context) cetest.EventMatcher { +// return cetest.HasExtension("knativeerrorcode", "422") +// }, +// func(ctx context.Context) cetest.EventMatcher { +// return cetest.HasExtension("knativeerrordata", "") // todo verify data +// }, +// )) +// +// return f +//} + +func assertEnhancedWithKnativeErrorExtensions(sinkName string, matcherfns ...func(ctx context.Context) cetest.EventMatcher) feature.StepFn { return func(ctx context.Context, t feature.T) { - _ = eventshub.StoreFromContext(ctx, sinkName).AssertExact( + matchers := make([]cetest.EventMatcher, len(matcherfns)) + for i, fn := range matcherfns { + matchers[i] = fn(ctx) + } + store := eventshub.StoreFromContext(ctx, sinkName) + for _, ei := range store.Collected() { + fmt.Println("actual event extensions:") + for k, v := range ei.Event.Extensions() { + fmt.Println(k, ": ", v) + } + } + _ = store.AssertExact( t, 1, MatchKind(EventReceived), From a50a51fe64dfe7f676ca198d6110d19f2766687e Mon Sep 17 00:00:00 2001 From: Egor Lepa Date: Thu, 7 Jul 2022 14:20:48 +0300 Subject: [PATCH 06/15] upgrade reconciler-test package, fix e2es --- test/e2e_new/dls_extensions_test.go | 234 +++++++++++++++++----------- 1 file changed, 141 insertions(+), 93 deletions(-) diff --git a/test/e2e_new/dls_extensions_test.go b/test/e2e_new/dls_extensions_test.go index 01d6b9ccfa..e1f369cfa7 100644 --- a/test/e2e_new/dls_extensions_test.go +++ b/test/e2e_new/dls_extensions_test.go @@ -1,6 +1,3 @@ -//go:build e2e -// +build e2e - /* * Copyright 2021 The Knative Authors * @@ -21,7 +18,6 @@ package e2e_new import ( "context" - "fmt" cloudevents "github.com/cloudevents/sdk-go/v2" cetest "github.com/cloudevents/sdk-go/v2/test" "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" @@ -36,6 +32,7 @@ import ( "knative.dev/reconciler-test/pkg/k8s" "knative.dev/reconciler-test/pkg/knative" "knative.dev/reconciler-test/resources/svc" + "strings" "testing" ) @@ -52,8 +49,8 @@ func TestDeadLetterSinkExtensions(t *testing.T) { env.Test(ctx, t, SubscriberUnreachable()) env.Test(ctx, t, SubscriberReturnedErrorNoData()) - //env.Test(ctx, t, SubscriberReturnedErrorSmallData()) - //env.Test(ctx, t, SubscriberReturnedErrorLargeData()) + env.Test(ctx, t, SubscriberReturnedErrorSmallData()) + env.Test(ctx, t, SubscriberReturnedErrorLargeData()) } func SubscriberUnreachable() *feature.Feature { @@ -150,6 +147,67 @@ func SubscriberReturnedErrorNoData() *feature.Feature { eventshub.SendMultipleEvents(1, 0), )) + f.Assert("knativeerrordest & knativeerrorcode added", assertEnhancedWithKnativeErrorExtensions( + deadLetterSinkName, + func(ctx context.Context) cetest.EventMatcher { + dlsAddress, _ := svc.Address(ctx, deadLetterSinkName) + return cetest.HasExtension("knativeerrordest", dlsAddress.String()) + }, + func(ctx context.Context) cetest.EventMatcher { + return cetest.HasExtension("knativeerrorcode", "422") + }, + )) + + return f +} + +func SubscriberReturnedErrorSmallData() *feature.Feature { + f := feature.NewFeature() + + sourceName := feature.MakeRandomK8sName("source") + sinkName := feature.MakeRandomK8sName("sink") + deadLetterSinkName := feature.MakeRandomK8sName("dls") + triggerName := feature.MakeRandomK8sName("trigger") + brokerName := feature.MakeRandomK8sName("broker") + + ev := cetest.FullEvent() + + f.Setup("install one partition configuration", single_partition_config.Install) + f.Setup("install broker", broker.Install( + brokerName, + broker.WithBrokerClass(kafka.BrokerClass), + broker.WithConfig(single_partition_config.ConfigMapName), + )) + f.Setup("broker is ready", broker.IsReady(brokerName)) + f.Setup("broker is addressable", broker.IsAddressable(brokerName)) + + errorData := `{ "message": "catastrophic failure" }` + f.Setup("install sink", eventshub.Install( + sinkName, + eventshub.StartReceiver, + eventshub.DropEventsResponseCode(422), + eventshub.DropEventsResponseBody(errorData), + )) + f.Setup("install dead letter sink", eventshub.Install( + deadLetterSinkName, + eventshub.StartReceiver, + )) + f.Setup("install trigger", trigger.Install( + triggerName, + brokerName, + trigger.WithSubscriber(svc.AsKReference(sinkName), ""), + trigger.WithDeadLetterSink(svc.AsKReference(deadLetterSinkName), ""), + )) + f.Setup("trigger is ready", trigger.IsReady(triggerName)) + + f.Setup("install source", eventshub.Install( + sourceName, + eventshub.StartSenderToResource(broker.GVR(), brokerName), + eventshub.InputEventWithEncoding(ev, cloudevents.EncodingBinary), + eventshub.AddSequence, + eventshub.SendMultipleEvents(1, 0), + )) + address, err := svc.Address(context.Background(), deadLetterSinkName) f.Assert("deadLetterSinkName is addressable", func(ctx context.Context, t feature.T) { if err != nil || address == nil { @@ -160,94 +218,91 @@ func SubscriberReturnedErrorNoData() *feature.Feature { f.Assert("knativeerrordest & knativeerrorcode added", assertEnhancedWithKnativeErrorExtensions( deadLetterSinkName, func(ctx context.Context) cetest.EventMatcher { - dlsAddress, err := svc.Address(ctx, deadLetterSinkName) - if err != nil || dlsAddress == nil { - fmt.Println("dead letter sink is not addressable", err) - } - fmt.Println("dead letter sink address: ", dlsAddress.Path) - return cetest.HasExtension("knativeerrordest", dlsAddress.Path) + dlsAddress, _ := svc.Address(ctx, deadLetterSinkName) + return cetest.HasExtension("knativeerrordest", dlsAddress.String()) }, func(ctx context.Context) cetest.EventMatcher { return cetest.HasExtension("knativeerrorcode", "422") }, + func(ctx context.Context) cetest.EventMatcher { + return cetest.HasExtension("knativeerrordata", errorData) + }, )) return f } -//func SubscriberReturnedErrorSmallData() *feature.Feature { -// f := feature.NewFeature() -// -// sourceName := feature.MakeRandomK8sName("source") -// sinkName := feature.MakeRandomK8sName("sink") -// deadLetterSinkName := feature.MakeRandomK8sName("dls") -// triggerName := feature.MakeRandomK8sName("trigger") -// brokerName := feature.MakeRandomK8sName("broker") -// -// ev := cetest.FullEvent() -// -// f.Setup("install one partition configuration", single_partition_config.Install) -// f.Setup("install broker", broker.Install( -// brokerName, -// broker.WithBrokerClass(kafka.BrokerClass), -// broker.WithConfig(single_partition_config.ConfigMapName), -// )) -// f.Setup("broker is ready", broker.IsReady(brokerName)) -// f.Setup("broker is addressable", broker.IsAddressable(brokerName)) -// -// f.Setup("install sink", eventshub.Install( -// sinkName, -// eventshub.StartReceiver, -// eventshub.DropEventsResponseCode(422), -// // todo add small data -// )) -// f.Setup("install dead letter sink", eventshub.Install( -// deadLetterSinkName, -// eventshub.StartReceiver, -// )) -// f.Setup("install trigger", trigger.Install( -// triggerName, -// brokerName, -// trigger.WithSubscriber(svc.AsKReference(sinkName), ""), -// trigger.WithDeadLetterSink(svc.AsKReference(deadLetterSinkName), ""), -// )) -// f.Setup("trigger is ready", trigger.IsReady(triggerName)) -// -// f.Setup("install source", eventshub.Install( -// sourceName, -// eventshub.StartSenderToResource(broker.GVR(), brokerName), -// eventshub.InputEventWithEncoding(ev, cloudevents.EncodingBinary), -// eventshub.AddSequence, -// eventshub.SendMultipleEvents(1, 0), -// )) -// -// address, err := svc.Address(context.Background(), deadLetterSinkName) -// f.Assert("deadLetterSinkName is addressable", func(ctx context.Context, t feature.T) { -// if err != nil || address == nil { -// t.Fail() -// } -// }) -// -// f.Assert("knativeerrordest & knativeerrorcode added", assertEnhancedWithKnativeErrorExtensions( -// deadLetterSinkName, -// func(ctx context.Context) cetest.EventMatcher { -// dlsAddress, err := svc.Address(ctx, deadLetterSinkName) -// if err != nil || dlsAddress == nil { -// fmt.Println("dead letter sink is not addressable", err) -// } -// fmt.Println("dead letter sink address: ", dlsAddress.Path) -// return cetest.HasExtension("knativeerrordest", dlsAddress.Path) -// }, -// func(ctx context.Context) cetest.EventMatcher { -// return cetest.HasExtension("knativeerrorcode", "422") -// }, -// func(ctx context.Context) cetest.EventMatcher { -// return cetest.HasExtension("knativeerrordata", "") // todo verify data -// }, -// )) -// -// return f -//} +func SubscriberReturnedErrorLargeData() *feature.Feature { + f := feature.NewFeature() + + sourceName := feature.MakeRandomK8sName("source") + sinkName := feature.MakeRandomK8sName("sink") + deadLetterSinkName := feature.MakeRandomK8sName("dls") + triggerName := feature.MakeRandomK8sName("trigger") + brokerName := feature.MakeRandomK8sName("broker") + + ev := cetest.FullEvent() + + f.Setup("install one partition configuration", single_partition_config.Install) + f.Setup("install broker", broker.Install( + brokerName, + broker.WithBrokerClass(kafka.BrokerClass), + broker.WithConfig(single_partition_config.ConfigMapName), + )) + f.Setup("broker is ready", broker.IsReady(brokerName)) + f.Setup("broker is addressable", broker.IsAddressable(brokerName)) + + errorDataTruncated := strings.Repeat("X", 1024) + errorData := errorDataTruncated + "YYYYYYY" + f.Setup("install sink", eventshub.Install( + sinkName, + eventshub.StartReceiver, + eventshub.DropEventsResponseCode(422), + eventshub.DropEventsResponseBody(errorData), + )) + f.Setup("install dead letter sink", eventshub.Install( + deadLetterSinkName, + eventshub.StartReceiver, + )) + f.Setup("install trigger", trigger.Install( + triggerName, + brokerName, + trigger.WithSubscriber(svc.AsKReference(sinkName), ""), + trigger.WithDeadLetterSink(svc.AsKReference(deadLetterSinkName), ""), + )) + f.Setup("trigger is ready", trigger.IsReady(triggerName)) + + f.Setup("install source", eventshub.Install( + sourceName, + eventshub.StartSenderToResource(broker.GVR(), brokerName), + eventshub.InputEventWithEncoding(ev, cloudevents.EncodingBinary), + eventshub.AddSequence, + eventshub.SendMultipleEvents(1, 0), + )) + + address, err := svc.Address(context.Background(), deadLetterSinkName) + f.Assert("deadLetterSinkName is addressable", func(ctx context.Context, t feature.T) { + if err != nil || address == nil { + t.Fail() + } + }) + + f.Assert("knativeerrordest & knativeerrorcode added", assertEnhancedWithKnativeErrorExtensions( + deadLetterSinkName, + func(ctx context.Context) cetest.EventMatcher { + dlsAddress, _ := svc.Address(ctx, deadLetterSinkName) + return cetest.HasExtension("knativeerrordest", dlsAddress.String()) + }, + func(ctx context.Context) cetest.EventMatcher { + return cetest.HasExtension("knativeerrorcode", "422") + }, + func(ctx context.Context) cetest.EventMatcher { + return cetest.HasExtension("knativeerrordata", errorDataTruncated) + }, + )) + + return f +} func assertEnhancedWithKnativeErrorExtensions(sinkName string, matcherfns ...func(ctx context.Context) cetest.EventMatcher) feature.StepFn { return func(ctx context.Context, t feature.T) { @@ -255,14 +310,7 @@ func assertEnhancedWithKnativeErrorExtensions(sinkName string, matcherfns ...fun for i, fn := range matcherfns { matchers[i] = fn(ctx) } - store := eventshub.StoreFromContext(ctx, sinkName) - for _, ei := range store.Collected() { - fmt.Println("actual event extensions:") - for k, v := range ei.Event.Extensions() { - fmt.Println(k, ": ", v) - } - } - _ = store.AssertExact( + _ = eventshub.StoreFromContext(ctx, sinkName).AssertExact( t, 1, MatchKind(EventReceived), From 8b330b3094e498ec87a2e84082a5a30689fffa0c Mon Sep 17 00:00:00 2001 From: Egor Lepa Date: Thu, 7 Jul 2022 14:21:31 +0300 Subject: [PATCH 07/15] fix e2es --- test/e2e_new/dls_extensions_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/test/e2e_new/dls_extensions_test.go b/test/e2e_new/dls_extensions_test.go index e1f369cfa7..0ccafa1b47 100644 --- a/test/e2e_new/dls_extensions_test.go +++ b/test/e2e_new/dls_extensions_test.go @@ -1,3 +1,6 @@ +//go:build e2e +// +build e2e + /* * Copyright 2021 The Knative Authors * From 9d159c1617eb38d2affcbdd018306c811a1f2abb Mon Sep 17 00:00:00 2001 From: Egor Lepa Date: Thu, 7 Jul 2022 17:46:33 +0300 Subject: [PATCH 08/15] fix e2es --- test/e2e_new/dls_extensions_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/e2e_new/dls_extensions_test.go b/test/e2e_new/dls_extensions_test.go index 0ccafa1b47..1893cb636d 100644 --- a/test/e2e_new/dls_extensions_test.go +++ b/test/e2e_new/dls_extensions_test.go @@ -218,7 +218,7 @@ func SubscriberReturnedErrorSmallData() *feature.Feature { } }) - f.Assert("knativeerrordest & knativeerrorcode added", assertEnhancedWithKnativeErrorExtensions( + f.Assert("knativeerrordest, knativeerrorcode, knativeerrordata added", assertEnhancedWithKnativeErrorExtensions( deadLetterSinkName, func(ctx context.Context) cetest.EventMatcher { dlsAddress, _ := svc.Address(ctx, deadLetterSinkName) @@ -290,7 +290,7 @@ func SubscriberReturnedErrorLargeData() *feature.Feature { } }) - f.Assert("knativeerrordest & knativeerrorcode added", assertEnhancedWithKnativeErrorExtensions( + f.Assert("knativeerrordest, knativeerrorcode, truncated knativeerrordata added", assertEnhancedWithKnativeErrorExtensions( deadLetterSinkName, func(ctx context.Context) cetest.EventMatcher { dlsAddress, _ := svc.Address(ctx, deadLetterSinkName) From 108ae5f31983d3d34a6011a38026e93b77b6bf79 Mon Sep 17 00:00:00 2001 From: Egor Lepa Date: Thu, 7 Jul 2022 19:00:33 +0300 Subject: [PATCH 09/15] fix e2es --- test/e2e_new/dls_extensions_test.go | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/test/e2e_new/dls_extensions_test.go b/test/e2e_new/dls_extensions_test.go index 1893cb636d..2e5869174d 100644 --- a/test/e2e_new/dls_extensions_test.go +++ b/test/e2e_new/dls_extensions_test.go @@ -211,13 +211,6 @@ func SubscriberReturnedErrorSmallData() *feature.Feature { eventshub.SendMultipleEvents(1, 0), )) - address, err := svc.Address(context.Background(), deadLetterSinkName) - f.Assert("deadLetterSinkName is addressable", func(ctx context.Context, t feature.T) { - if err != nil || address == nil { - t.Fail() - } - }) - f.Assert("knativeerrordest, knativeerrorcode, knativeerrordata added", assertEnhancedWithKnativeErrorExtensions( deadLetterSinkName, func(ctx context.Context) cetest.EventMatcher { @@ -283,13 +276,6 @@ func SubscriberReturnedErrorLargeData() *feature.Feature { eventshub.SendMultipleEvents(1, 0), )) - address, err := svc.Address(context.Background(), deadLetterSinkName) - f.Assert("deadLetterSinkName is addressable", func(ctx context.Context, t feature.T) { - if err != nil || address == nil { - t.Fail() - } - }) - f.Assert("knativeerrordest, knativeerrorcode, truncated knativeerrordata added", assertEnhancedWithKnativeErrorExtensions( deadLetterSinkName, func(ctx context.Context) cetest.EventMatcher { From e8cac0dcbead7208a0c2545c5b81923a62d1e327 Mon Sep 17 00:00:00 2001 From: Egor Lepa Date: Fri, 8 Jul 2022 03:27:07 +0300 Subject: [PATCH 10/15] fix e2es --- test/e2e_new/dls_extensions_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/e2e_new/dls_extensions_test.go b/test/e2e_new/dls_extensions_test.go index 2e5869174d..c297a67239 100644 --- a/test/e2e_new/dls_extensions_test.go +++ b/test/e2e_new/dls_extensions_test.go @@ -87,7 +87,7 @@ func SubscriberUnreachable() *feature.Feature { )) f.Setup("trigger is ready", trigger.IsReady(triggerName)) - f.Setup("install source", eventshub.Install( + f.Requirement("install source", eventshub.Install( sourceName, eventshub.StartSenderToResource(broker.GVR(), brokerName), eventshub.InputEventWithEncoding(ev, cloudevents.EncodingBinary), @@ -142,7 +142,7 @@ func SubscriberReturnedErrorNoData() *feature.Feature { )) f.Setup("trigger is ready", trigger.IsReady(triggerName)) - f.Setup("install source", eventshub.Install( + f.Requirement("install source", eventshub.Install( sourceName, eventshub.StartSenderToResource(broker.GVR(), brokerName), eventshub.InputEventWithEncoding(ev, cloudevents.EncodingBinary), @@ -203,7 +203,7 @@ func SubscriberReturnedErrorSmallData() *feature.Feature { )) f.Setup("trigger is ready", trigger.IsReady(triggerName)) - f.Setup("install source", eventshub.Install( + f.Requirement("install source", eventshub.Install( sourceName, eventshub.StartSenderToResource(broker.GVR(), brokerName), eventshub.InputEventWithEncoding(ev, cloudevents.EncodingBinary), @@ -268,7 +268,7 @@ func SubscriberReturnedErrorLargeData() *feature.Feature { )) f.Setup("trigger is ready", trigger.IsReady(triggerName)) - f.Setup("install source", eventshub.Install( + f.Requirement("install source", eventshub.Install( sourceName, eventshub.StartSenderToResource(broker.GVR(), brokerName), eventshub.InputEventWithEncoding(ev, cloudevents.EncodingBinary), From ff520106bf400fe25ae24575792480f47632a1e5 Mon Sep 17 00:00:00 2001 From: Egor Lepa Date: Fri, 8 Jul 2022 10:22:12 +0300 Subject: [PATCH 11/15] fix eventshub.SendMultipleEvents call --- test/e2e_new/dls_extensions_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/e2e_new/dls_extensions_test.go b/test/e2e_new/dls_extensions_test.go index c297a67239..1fceb74da2 100644 --- a/test/e2e_new/dls_extensions_test.go +++ b/test/e2e_new/dls_extensions_test.go @@ -92,7 +92,7 @@ func SubscriberUnreachable() *feature.Feature { eventshub.StartSenderToResource(broker.GVR(), brokerName), eventshub.InputEventWithEncoding(ev, cloudevents.EncodingBinary), eventshub.AddSequence, - eventshub.SendMultipleEvents(1, 0), + eventshub.SendMultipleEvents(1, 100*time.Millisecond), )) f.Assert("knativeerrordest added", assertEnhancedWithKnativeErrorExtensions( @@ -147,7 +147,7 @@ func SubscriberReturnedErrorNoData() *feature.Feature { eventshub.StartSenderToResource(broker.GVR(), brokerName), eventshub.InputEventWithEncoding(ev, cloudevents.EncodingBinary), eventshub.AddSequence, - eventshub.SendMultipleEvents(1, 0), + eventshub.SendMultipleEvents(1, 100*time.Millisecond), )) f.Assert("knativeerrordest & knativeerrorcode added", assertEnhancedWithKnativeErrorExtensions( @@ -208,7 +208,7 @@ func SubscriberReturnedErrorSmallData() *feature.Feature { eventshub.StartSenderToResource(broker.GVR(), brokerName), eventshub.InputEventWithEncoding(ev, cloudevents.EncodingBinary), eventshub.AddSequence, - eventshub.SendMultipleEvents(1, 0), + eventshub.SendMultipleEvents(1, 100*time.Millisecond), )) f.Assert("knativeerrordest, knativeerrorcode, knativeerrordata added", assertEnhancedWithKnativeErrorExtensions( @@ -273,7 +273,7 @@ func SubscriberReturnedErrorLargeData() *feature.Feature { eventshub.StartSenderToResource(broker.GVR(), brokerName), eventshub.InputEventWithEncoding(ev, cloudevents.EncodingBinary), eventshub.AddSequence, - eventshub.SendMultipleEvents(1, 0), + eventshub.SendMultipleEvents(1, 100*time.Millisecond), )) f.Assert("knativeerrordest, knativeerrorcode, truncated knativeerrordata added", assertEnhancedWithKnativeErrorExtensions( From f016654657881816066b1b224c09fe9e33deb588 Mon Sep 17 00:00:00 2001 From: Egor Lepa Date: Fri, 8 Jul 2022 13:00:19 +0300 Subject: [PATCH 12/15] fix time import --- test/e2e_new/dls_extensions_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/test/e2e_new/dls_extensions_test.go b/test/e2e_new/dls_extensions_test.go index 1fceb74da2..9f3924001e 100644 --- a/test/e2e_new/dls_extensions_test.go +++ b/test/e2e_new/dls_extensions_test.go @@ -37,6 +37,7 @@ import ( "knative.dev/reconciler-test/resources/svc" "strings" "testing" + "time" ) func TestDeadLetterSinkExtensions(t *testing.T) { From b1006a926e07b8db38637d29a723aed3a4e6e46e Mon Sep 17 00:00:00 2001 From: Matthias Wessendorf Date: Fri, 19 Aug 2022 16:06:10 +0200 Subject: [PATCH 13/15] More import fixes Signed-off-by: Matthias Wessendorf --- test/e2e_new/dls_extensions_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/test/e2e_new/dls_extensions_test.go b/test/e2e_new/dls_extensions_test.go index 9f3924001e..94f78609e5 100644 --- a/test/e2e_new/dls_extensions_test.go +++ b/test/e2e_new/dls_extensions_test.go @@ -21,6 +21,10 @@ package e2e_new import ( "context" + "strings" + "testing" + "time" + cloudevents "github.com/cloudevents/sdk-go/v2" cetest "github.com/cloudevents/sdk-go/v2/test" "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" @@ -35,9 +39,6 @@ import ( "knative.dev/reconciler-test/pkg/k8s" "knative.dev/reconciler-test/pkg/knative" "knative.dev/reconciler-test/resources/svc" - "strings" - "testing" - "time" ) func TestDeadLetterSinkExtensions(t *testing.T) { From 963b681ad9937c6ed80231818e37c455907a58b3 Mon Sep 17 00:00:00 2001 From: Matthias Wessendorf Date: Fri, 19 Aug 2022 16:19:40 +0200 Subject: [PATCH 14/15] Populating response from ResponseFailureException Signed-off-by: Matthias Wessendorf --- .../kafka/broker/dispatcher/impl/RecordDispatcherImpl.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java index 6d7c8c3e3c..3a955e2338 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java @@ -263,10 +263,14 @@ private void onSubscriberFailure(final Throwable failure, final ConsumerRecordContext recordContext, final Promise finalProm) { - final var response = getResponse(failure); + var response = getResponse(failure); incrementEventCount(response, recordContext); recordDispatchLatency(response, recordContext); + if (response == null && failure instanceof ResponseFailureException) { + response = ((ResponseFailureException) failure).getResponse(); + } + // enhance event with extension attributes prior to forwarding to the dead letter sink final var transformedRecordContext = errorTransform(recordContext, response); @@ -285,6 +289,7 @@ private ConsumerRecordContext errorTransform(final ConsumerRecordContext recordC final var extensions = new HashMap(); extensions.put(KN_ERROR_DEST_EXT_NAME, destination); extensions.put(KN_ERROR_CODE_EXT_NAME, String.valueOf(response.statusCode())); + var data = response.bodyAsString(); if (data != null) { if (data.length() > KN_ERROR_DATA_MAX_BYTES) { From 01ca092533c8cd37b1bf72e15ce6402e8f0aac0a Mon Sep 17 00:00:00 2001 From: Matthias Wessendorf Date: Fri, 19 Aug 2022 16:20:36 +0200 Subject: [PATCH 15/15] Fixing tests Signed-off-by: Matthias Wessendorf --- test/e2e_new/dls_extensions_test.go | 34 ++++++++++++++--------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/test/e2e_new/dls_extensions_test.go b/test/e2e_new/dls_extensions_test.go index 94f78609e5..1dd14bc990 100644 --- a/test/e2e_new/dls_extensions_test.go +++ b/test/e2e_new/dls_extensions_test.go @@ -122,20 +122,23 @@ func SubscriberReturnedErrorNoData() *feature.Feature { f.Setup("install broker", broker.Install( brokerName, broker.WithBrokerClass(kafka.BrokerClass), - broker.WithConfig(single_partition_config.ConfigMapName), )) + f.Setup("broker is ready", broker.IsReady(brokerName)) f.Setup("broker is addressable", broker.IsAddressable(brokerName)) f.Setup("install sink", eventshub.Install( sinkName, eventshub.StartReceiver, - eventshub.DropEventsResponseCode(422), + eventshub.DropFirstN(1), + eventshub.DropEventsResponseCode(422), // retry error )) + f.Setup("install dead letter sink", eventshub.Install( deadLetterSinkName, eventshub.StartReceiver, )) + f.Setup("install trigger", trigger.Install( triggerName, brokerName, @@ -147,17 +150,16 @@ func SubscriberReturnedErrorNoData() *feature.Feature { f.Requirement("install source", eventshub.Install( sourceName, eventshub.StartSenderToResource(broker.GVR(), brokerName), - eventshub.InputEventWithEncoding(ev, cloudevents.EncodingBinary), - eventshub.AddSequence, - eventshub.SendMultipleEvents(1, 100*time.Millisecond), + eventshub.InputEvent(ev), )) f.Assert("knativeerrordest & knativeerrorcode added", assertEnhancedWithKnativeErrorExtensions( deadLetterSinkName, func(ctx context.Context) cetest.EventMatcher { - dlsAddress, _ := svc.Address(ctx, deadLetterSinkName) - return cetest.HasExtension("knativeerrordest", dlsAddress.String()) + sinkAddress, _ := svc.Address(ctx, sinkName) + return cetest.HasExtension("knativeerrordest", sinkAddress.String()) }, + func(ctx context.Context) cetest.EventMatcher { return cetest.HasExtension("knativeerrorcode", "422") }, @@ -190,6 +192,7 @@ func SubscriberReturnedErrorSmallData() *feature.Feature { f.Setup("install sink", eventshub.Install( sinkName, eventshub.StartReceiver, + eventshub.DropFirstN(1), eventshub.DropEventsResponseCode(422), eventshub.DropEventsResponseBody(errorData), )) @@ -208,16 +211,14 @@ func SubscriberReturnedErrorSmallData() *feature.Feature { f.Requirement("install source", eventshub.Install( sourceName, eventshub.StartSenderToResource(broker.GVR(), brokerName), - eventshub.InputEventWithEncoding(ev, cloudevents.EncodingBinary), - eventshub.AddSequence, - eventshub.SendMultipleEvents(1, 100*time.Millisecond), + eventshub.InputEvent(ev), )) f.Assert("knativeerrordest, knativeerrorcode, knativeerrordata added", assertEnhancedWithKnativeErrorExtensions( deadLetterSinkName, func(ctx context.Context) cetest.EventMatcher { - dlsAddress, _ := svc.Address(ctx, deadLetterSinkName) - return cetest.HasExtension("knativeerrordest", dlsAddress.String()) + sinkAddress, _ := svc.Address(ctx, sinkName) + return cetest.HasExtension("knativeerrordest", sinkAddress.String()) }, func(ctx context.Context) cetest.EventMatcher { return cetest.HasExtension("knativeerrorcode", "422") @@ -255,6 +256,7 @@ func SubscriberReturnedErrorLargeData() *feature.Feature { f.Setup("install sink", eventshub.Install( sinkName, eventshub.StartReceiver, + eventshub.DropFirstN(1), eventshub.DropEventsResponseCode(422), eventshub.DropEventsResponseBody(errorData), )) @@ -273,16 +275,14 @@ func SubscriberReturnedErrorLargeData() *feature.Feature { f.Requirement("install source", eventshub.Install( sourceName, eventshub.StartSenderToResource(broker.GVR(), brokerName), - eventshub.InputEventWithEncoding(ev, cloudevents.EncodingBinary), - eventshub.AddSequence, - eventshub.SendMultipleEvents(1, 100*time.Millisecond), + eventshub.InputEvent(ev), )) f.Assert("knativeerrordest, knativeerrorcode, truncated knativeerrordata added", assertEnhancedWithKnativeErrorExtensions( deadLetterSinkName, func(ctx context.Context) cetest.EventMatcher { - dlsAddress, _ := svc.Address(ctx, deadLetterSinkName) - return cetest.HasExtension("knativeerrordest", dlsAddress.String()) + sinkAddress, _ := svc.Address(ctx, sinkName) + return cetest.HasExtension("knativeerrordest", sinkAddress.String()) }, func(ctx context.Context) cetest.EventMatcher { return cetest.HasExtension("knativeerrorcode", "422")