Skip to content

Commit

Permalink
enhancing failed events with knativeerror* extensions before sending …
Browse files Browse the repository at this point in the history
…to dls
  • Loading branch information
Egor Lepa committed Jul 4, 2022
1 parent ff6e94c commit f6ff42c
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -65,6 +67,11 @@ public class RecordDispatcherImpl implements RecordDispatcher {
// Invalid cloud event records that are discarded by dispatch may not have a record type. So we set Tag as below.
private static final Tag INVALID_EVENT_TYPE_TAG = Tag.of(Metrics.Tags.EVENT_TYPE, "InvalidCloudEvent");

private static final String KN_ERROR_DEST_EXT_NAME = "knativeerrordest";
private static final String KN_ERROR_CODE_EXT_NAME = "knativeerrorcode";
private static final String KN_ERROR_DATA_EXT_NAME = "knativeerrordata";
private static final int KN_ERROR_DATA_MAX_BYTES = 1024;

private final Filter filter;
private final Function<KafkaConsumerRecord<Object, CloudEvent>, Future<HttpResponse<?>>> subscriberSender;
private final Function<KafkaConsumerRecord<Object, CloudEvent>, Future<HttpResponse<?>>> dlsSender;
Expand Down Expand Up @@ -255,6 +262,7 @@ private void onSubscriberFailure(final Throwable failure,
incrementEventCount(response, recordContext);
recordDispatchLatency(response, recordContext);

// enhance event with extension attributes prior to forwarding to the dead letter sink
ConsumerRecordContext transformedRecordContext = errorTransform(recordContext, response);

dlsSender.apply(transformedRecordContext.getRecord())
Expand All @@ -263,24 +271,33 @@ private void onSubscriberFailure(final Throwable failure,
}

private ConsumerRecordContext errorTransform(final ConsumerRecordContext recordContext, @Nullable final HttpResponse<?> response) {
String destination = resourceContext.getEgress().getDestination();
if (response == null) {
return recordContext;
// if response is null we still want to add destination
return addExtensions(recordContext, Map.of(KN_ERROR_DEST_EXT_NAME, destination));
}

String url = resourceContext.getEgress().getDestination();
int code = response.statusCode();
Map<String, String> extensions = new HashMap<>();
extensions.put(KN_ERROR_DEST_EXT_NAME, destination);
extensions.put(KN_ERROR_CODE_EXT_NAME, String.valueOf(response.statusCode()));
String data = response.bodyAsString();
if (data.length() > 1024) {
data = data.substring(0, 1024);
if (data != null) {
if (data.length() > KN_ERROR_DATA_MAX_BYTES) {
data = data.substring(0, KN_ERROR_DATA_MAX_BYTES);
}
extensions.put(KN_ERROR_DATA_EXT_NAME, data);
}

return addExtensions(recordContext, extensions);
}

// creates a new instance of ConsumerRecordContext with added extension attributes for underlying CloudEvent
private ConsumerRecordContext addExtensions(final ConsumerRecordContext recordContext, final Map<String, String> extensions) {
CloudEvent cloudEvent = recordContext.getRecord().value();

CloudEvent transformedCloudEvent = CloudEventBuilder.v1(cloudEvent)
.withExtension("knativeerrordest", url)
.withExtension("knativeerrorcode", code)
.withExtension("knativeerrordata", data)
.build();
CloudEventBuilder builder = CloudEventBuilder.v1(cloudEvent);
extensions.forEach(builder::withExtension);
CloudEvent transformedCloudEvent = builder.build();

ConsumerRecord<Object, CloudEvent> cr = new ConsumerRecord<>(
recordContext.getRecord().record().topic(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,17 @@
import dev.knative.eventing.kafka.broker.contract.DataPlaneContract;
import dev.knative.eventing.kafka.broker.core.metrics.Metrics;
import dev.knative.eventing.kafka.broker.core.testing.CoreObjects;
import dev.knative.eventing.kafka.broker.dispatcher.CloudEventSender;
import dev.knative.eventing.kafka.broker.dispatcher.CloudEventSenderMock;
import dev.knative.eventing.kafka.broker.dispatcher.Filter;
import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcher;
import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcherListener;
import dev.knative.eventing.kafka.broker.dispatcher.ResponseHandler;
import dev.knative.eventing.kafka.broker.dispatcher.ResponseHandlerMock;
import dev.knative.eventing.kafka.broker.dispatcher.*;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.InvalidCloudEvent;
import io.cloudevents.CloudEvent;
import io.micrometer.core.instrument.search.MeterNotFoundException;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import io.vertx.core.Future;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpVersion;
import io.vertx.ext.web.client.impl.HttpResponseImpl;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
Expand All @@ -41,27 +39,23 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;

import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;

@ExtendWith(VertxExtension.class)
public class RecordDispatcherTest {

private static final ResourceContext resourceContext = new ResourceContext(
DataPlaneContract.Resource.newBuilder().build(),
DataPlaneContract.Egress.newBuilder().build()
DataPlaneContract.Egress.newBuilder().setDestination("testdest").build()
);

static {
Expand Down Expand Up @@ -184,7 +178,7 @@ final var record = record();
assertTrue(subscriberSenderSendCalled.get());
assertTrue(dlsSenderSendCalled.get());
verify(receiver, times(1)).recordReceived(record);
verify(receiver, times(1)).successfullySentToDeadLetterSink(record);
verify(receiver, times(1)).successfullySentToDeadLetterSink(any());
verify(receiver, never()).successfullySentToSubscriber(any());
verify(receiver, never()).failedToSendToDeadLetterSink(any(), any());
verify(receiver, never()).recordDiscarded(any());
Expand Down Expand Up @@ -228,7 +222,7 @@ final var record = record();
assertTrue(subscriberSenderSendCalled.get());
assertTrue(dlsSenderSendCalled.get());
verify(receiver, times(1)).recordReceived(record);
verify(receiver, times(1)).failedToSendToDeadLetterSink(eq(record), any());
verify(receiver, times(1)).failedToSendToDeadLetterSink(any(), any());
verify(receiver, never()).successfullySentToDeadLetterSink(any());
verify(receiver, never()).successfullySentToSubscriber(any());
verify(receiver, never()).recordDiscarded(any());
Expand All @@ -239,6 +233,140 @@ final var record = record();
assertNoDiscardedEventCount();
}

@Test
public void failedEventsShouldBeEnhancedWithErrorExtensionsPriorToSendingToDls() {

final var subscriberSenderSendCalled = new AtomicBoolean(false);
final var dlsSenderSendCalled = new AtomicBoolean(false);
final RecordDispatcherListener receiver = offsetManagerMock();

int errorCode = 422;
String errorBody = "{ \"message\": \"bad bad things happened\" }";

final var dispatcherHandler = new RecordDispatcherImpl(
resourceContext,
value -> true,
new CloudEventSenderMock(
record -> {
subscriberSenderSendCalled.set(true);
return Future.failedFuture(new ResponseFailureException(makeHttpResponse(errorCode, errorBody), ""));
}
),
new CloudEventSenderMock(
record -> {
dlsSenderSendCalled.set(true);
return Future.succeededFuture();
}
), new ResponseHandlerMock(),
receiver,
null,
registry
);
final var record = record();
dispatcherHandler.dispatch(record);

ArgumentCaptor<KafkaConsumerRecord<Object, CloudEvent>> captor = ArgumentCaptor.forClass(KafkaConsumerRecord.class);

assertTrue(subscriberSenderSendCalled.get());
assertTrue(dlsSenderSendCalled.get());
verify(receiver, times(1)).recordReceived(record);
verify(receiver, times(1)).successfullySentToDeadLetterSink(captor.capture());
verify(receiver, never()).successfullySentToSubscriber(any());
verify(receiver, never()).failedToSendToDeadLetterSink(any(), any());
verify(receiver, never()).recordDiscarded(any());

KafkaConsumerRecord<Object, CloudEvent> failedRecord = captor.getValue();
assertEquals(record.topic(), failedRecord.topic());
assertEquals(record.partition(), failedRecord.partition());
assertEquals(record.offset(), failedRecord.offset());
assertEquals(record.key(), failedRecord.key());
assertEquals(record.value().getId(), failedRecord.value().getId());
assertEquals(record.value().getAttributeNames(), failedRecord.value().getAttributeNames());
assertEquals(record.value().getData(), failedRecord.value().getData());
assertEquals("testdest", failedRecord.value().getExtension("knativeerrordest"));
assertEquals(String.valueOf(errorCode), failedRecord.value().getExtension("knativeerrorcode"));
assertEquals(errorBody, failedRecord.value().getExtension("knativeerrordata"));

assertEventDispatchLatency();
assertEventProcessingLatency();
assertEventCount();
assertNoDiscardedEventCount();
}

@Test
public void failedEventsShouldBeEnhancedWithErrorExtensionsPriorToSendingToDlsBodyTooLarge() {

final var subscriberSenderSendCalled = new AtomicBoolean(false);
final var dlsSenderSendCalled = new AtomicBoolean(false);
final RecordDispatcherListener receiver = offsetManagerMock();

int errorCode = 422;
String errorBody = "A".repeat(1024);
String errorBodyTooLarge = errorBody + "QWERTY";

final var dispatcherHandler = new RecordDispatcherImpl(
resourceContext,
value -> true,
new CloudEventSenderMock(
record -> {
subscriberSenderSendCalled.set(true);
return Future.failedFuture(new ResponseFailureException(makeHttpResponse(errorCode, errorBodyTooLarge), ""));
}
),
new CloudEventSenderMock(
record -> {
dlsSenderSendCalled.set(true);
return Future.succeededFuture();
}
), new ResponseHandlerMock(),
receiver,
null,
registry
);
final var record = record();
dispatcherHandler.dispatch(record);

ArgumentCaptor<KafkaConsumerRecord<Object, CloudEvent>> captor = ArgumentCaptor.forClass(KafkaConsumerRecord.class);

assertTrue(subscriberSenderSendCalled.get());
assertTrue(dlsSenderSendCalled.get());
verify(receiver, times(1)).recordReceived(record);
verify(receiver, times(1)).successfullySentToDeadLetterSink(captor.capture());
verify(receiver, never()).successfullySentToSubscriber(any());
verify(receiver, never()).failedToSendToDeadLetterSink(any(), any());
verify(receiver, never()).recordDiscarded(any());

KafkaConsumerRecord<Object, CloudEvent> failedRecord = captor.getValue();
assertEquals(record.topic(), failedRecord.topic());
assertEquals(record.partition(), failedRecord.partition());
assertEquals(record.offset(), failedRecord.offset());
assertEquals(record.key(), failedRecord.key());
assertEquals(record.value().getId(), failedRecord.value().getId());
assertEquals(record.value().getAttributeNames(), failedRecord.value().getAttributeNames());
assertEquals(record.value().getData(), failedRecord.value().getData());
assertEquals("testdest", failedRecord.value().getExtension("knativeerrordest"));
assertEquals(String.valueOf(errorCode), failedRecord.value().getExtension("knativeerrorcode"));
assertEquals(errorBody, failedRecord.value().getExtension("knativeerrordata"));

assertEventDispatchLatency();
assertEventProcessingLatency();
assertEventCount();
assertNoDiscardedEventCount();
}

private HttpResponseImpl<Buffer> makeHttpResponse(int statusCode, String body) {
return new HttpResponseImpl<Buffer>(
HttpVersion.HTTP_2,
statusCode,
"",
MultiMap.caseInsensitiveMultiMap(),
MultiMap.caseInsensitiveMultiMap(),
Collections.emptyList(),
Buffer.buffer(body, "UTF-8"),
Collections.emptyList()
);
}

@Test
public void shouldCallFailedToSendToDeadLetterSinkIfValueMatchesAndSubscriberSenderFailsAndNoDeadLetterSinkSender() {
final var subscriberSenderSendCalled = new AtomicBoolean(false);
Expand All @@ -265,7 +393,7 @@ final var record = record();

assertTrue(subscriberSenderSendCalled.get());
verify(receiver, times(1)).recordReceived(record);
verify(receiver, times(1)).failedToSendToDeadLetterSink(eq(record), any());
verify(receiver, times(1)).failedToSendToDeadLetterSink(any(), any());
verify(receiver, never()).successfullySentToDeadLetterSink(any());
verify(receiver, never()).successfullySentToSubscriber(any());
verify(receiver, never()).recordDiscarded(any());
Expand Down

0 comments on commit f6ff42c

Please sign in to comment.