Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Knative error extensions #2374

Merged
merged 15 commits into from
Aug 23, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.InvalidCloudEvent;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.KafkaConsumerRecordUtils;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
Expand All @@ -37,11 +38,14 @@
import io.vertx.kafka.client.common.tracing.ConsumerTracer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.consumer.impl.KafkaConsumerRecordImpl;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -68,6 +72,11 @@ public class RecordDispatcherImpl implements RecordDispatcher {
// Invalid cloud event records that are discarded by dispatch may not have a record type. So we set Tag as below.
private static final Tag INVALID_EVENT_TYPE_TAG = Tag.of(Metrics.Tags.EVENT_TYPE, "InvalidCloudEvent");

private static final String KN_ERROR_DEST_EXT_NAME = "knativeerrordest";
private static final String KN_ERROR_CODE_EXT_NAME = "knativeerrorcode";
private static final String KN_ERROR_DATA_EXT_NAME = "knativeerrordata";
private static final int KN_ERROR_DATA_MAX_BYTES = 1024;

private final Filter filter;
private final Function<KafkaConsumerRecord<Object, CloudEvent>, Future<HttpResponse<?>>> subscriberSender;
private final Function<KafkaConsumerRecord<Object, CloudEvent>, Future<HttpResponse<?>>> dlsSender;
Expand Down Expand Up @@ -254,13 +263,70 @@ private void onSubscriberFailure(final Throwable failure,
final ConsumerRecordContext recordContext,
final Promise<Void> finalProm) {

final var response = getResponse(failure);
var response = getResponse(failure);
incrementEventCount(response, recordContext);
recordDispatchLatency(response, recordContext);

dlsSender.apply(recordContext.getRecord())
.onSuccess(v -> onDeadLetterSinkSuccess(recordContext, finalProm))
.onFailure(ex -> onDeadLetterSinkFailure(recordContext, ex, finalProm));
if (response == null && failure instanceof ResponseFailureException) {
response = ((ResponseFailureException) failure).getResponse();
}

// enhance event with extension attributes prior to forwarding to the dead letter sink
final var transformedRecordContext = errorTransform(recordContext, response);

dlsSender.apply(transformedRecordContext.getRecord())
.onSuccess(v -> onDeadLetterSinkSuccess(transformedRecordContext, finalProm))
.onFailure(ex -> onDeadLetterSinkFailure(transformedRecordContext, ex, finalProm));
}

private ConsumerRecordContext errorTransform(final ConsumerRecordContext recordContext, @Nullable final HttpResponse<?> response) {
final var destination = resourceContext.getEgress().getDestination();
if (response == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when a sink returns something like this:

	log.Print("Got request, return a bad one")
	w.WriteHeader(http.StatusUnprocessableEntity)
	w.Write([]byte("Bad error"))

The response here is actually null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seeing something:

{"@timestamp":"2022-08-17T15:55:01.886Z","@version":"1","message":"Failed to handle subscriber response topic=knative-broker-default-my-demo-kafka-broker partition=7 offset=0","logger_name":"dev.knative.eventing.kafka.broker.dispatcher.impl.RecordDispatcherImpl","thread_name":"vert.x-worker-thread-16","level":"ERROR","level_value":40000,"stack_trace":"java.lang.IllegalStateException: Unable to decode response: unknown encoding and non empty response\n\tat dev.knative.eventing.kafka.broker.dispatcher.impl.BaseResponseHandler.handle(BaseResponseHandler.java:67)\n\tat dev.knative.eventing.kafka.broker.dispatcher.impl.RecordDispatcherImpl.lambda$composeSenderAndSinkHandler$8(RecordDispatcherImpl.java:369)\n\tat io.vertx.core.impl.future.Composition.onSuccess(Composition.java:38)\n\tat io.vertx.core.impl.future.FutureImpl$ListenerArray.onSuccess(FutureImpl.java:262)\n\tat io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)\n\tat io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)\n\tat io.vertx.core.impl.future.Composition$1.onSuccess(Composition.java:62)\n\tat io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)\n\tat io.vertx.core.impl.future.SucceededFuture.addListener(SucceededFuture.java:88)\n\tat io.vertx.core.impl.future.Composition.onSuccess(Composition.java:43)\n\tat io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)\n\tat io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)\n\tat io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23)\n\tat dev.knative.eventing.kafka.broker.dispatcher.impl.http.WebClientCloudEventSender.lambda$send$5(WebClientCloudEventSender.java:175)\n\tat io.vertx.core.impl.future.FutureImpl$1.onSuccess(FutureImpl.java:91)\n\tat io.vertx.core.impl.future.FutureImpl$ListenerArray.onSuccess(FutureImpl.java:262)\n\tat io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)\n\tat io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)\n\tat io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23)\n\tat io.vertx.core.impl.future.PromiseImpl.onSuccess(PromiseImpl.java:49)\n\tat io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:41)\n\tat io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:23)\n\tat io.vertx.ext.web.client.impl.HttpContext.handleDispatchResponse(HttpContext.java:400)\n\tat io.vertx.ext.web.client.impl.HttpContext.execute(HttpContext.java:387)\n\tat io.vertx.ext.web.client.impl.HttpContext.next(HttpContext.java:365)\n\tat io.vertx.ext.web.client.impl.HttpContext.fire(HttpContext.java:332)\n\tat io.vertx.ext.web.client.impl.HttpContext.dispatchResponse(HttpContext.java:294)\n\tat io.vertx.ext.web.client.impl.HttpContext.lambda$null$8(HttpContext.java:550)\n\tat io.vertx.core.impl.AbstractContext.dispatch(AbstractContext.java:100)\n\tat io.vertx.core.impl.WorkerContext.lambda$run$1(WorkerContext.java:83)\n\tat io.vertx.core.impl.TaskQueue.run(TaskQueue.java:76)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\tat java.base/java.lang.Thread.run(Thread.java:833)\nCaused by: io.cloudevents.rw.CloudEventRWException: Could not parse. Unknown encoding. Invalid content type or spec version\n\tat io.cloudevents.rw.CloudEventRWException.newUnknownEncodingException(CloudEventRWException.java:201)\n\tat io.cloudevents.core.message.impl.MessageUtils.parseStructuredOrBinaryMessage(MessageUtils.java:80)\n\tat io.cloudevents.http.vertx.VertxMessageFactory.createReader(VertxMessageFactory.java:46)\n\tat io.cloudevents.http.vertx.VertxMessageFactory.createReader(VertxMessageFactory.java:89)\n\tat dev.knative.eventing.kafka.broker.dispatcher.impl.BaseResponseHandler.handle(BaseResponseHandler.java:51)\n\t... 34 common frames omitted\n","topic":"knative-broker-default-my-demo-kafka-broker","partition":7,"offset":0}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To get the response from a failure we can use this logic:

if (failure instanceof ResponseFailureException) {
  response = ((ResponseFailureException) failure).getResponse();
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will go that route - thx

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually IllegalStateException. The above log statement has this extracted snippet:

java.lang.IllegalStateException: Unable to decode response: unknown encoding and non empty response\n\tat dev.knative.eventing.kafka.broker.dispatcher.impl.BaseResponseHandler.handl......

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After we have the fix #2521 in,

(and I added the cast to ResponseFailureException), I am getting:

☁️  cloudevents.Event
Validation: valid
Context Attributes,
  specversion: 1.0
  type: demo
  source: /my/curl/command
  id: 4711
  datacontenttype: application/json
Extensions,
  knativeerrorcode: 422
  knativeerrordata: Bad error
  knativeerrordest: http://error-sender.default.svc.cluster.local
  smartevent: super-duper-event-extension
Data,
  {
    "message": "Hallo World"
  }


on the DeadletterSink

/cc @pierDipi

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@egorlepa I will PR you a diff to your PR, than we get this landed !

// if response is null we still want to add destination
return addExtensions(recordContext, Map.of(KN_ERROR_DEST_EXT_NAME, destination));
}

final var extensions = new HashMap<String, String>();
extensions.put(KN_ERROR_DEST_EXT_NAME, destination);
extensions.put(KN_ERROR_CODE_EXT_NAME, String.valueOf(response.statusCode()));

var data = response.bodyAsString();
if (data != null) {
if (data.length() > KN_ERROR_DATA_MAX_BYTES) {
data = data.substring(0, KN_ERROR_DATA_MAX_BYTES);
}
extensions.put(KN_ERROR_DATA_EXT_NAME, data);
}

return addExtensions(recordContext, extensions);
}

// creates a new instance of ConsumerRecordContext with added extension attributes for underlying CloudEvent
private ConsumerRecordContext addExtensions(final ConsumerRecordContext recordContext, final Map<String, String> extensions) {
final var cloudEvent = recordContext.getRecord().value();

final var builder = CloudEventBuilder.v1(cloudEvent);
extensions.forEach(builder::withExtension);
final var transformedCloudEvent = builder.build();

final var cr = new ConsumerRecord<>(
recordContext.getRecord().record().topic(),
recordContext.getRecord().record().partition(),
recordContext.getRecord().record().offset(),
recordContext.getRecord().record().timestamp(),
recordContext.getRecord().record().timestampType(),
null,
recordContext.getRecord().record().serializedKeySize(),
recordContext.getRecord().record().serializedValueSize(),
recordContext.getRecord().record().key(),
transformedCloudEvent,
recordContext.getRecord().record().headers(),
recordContext.getRecord().record().leaderEpoch()
);

final var kcr = new KafkaConsumerRecordImpl<>(cr);

return new ConsumerRecordContext(kcr);
}

private void onDeadLetterSinkSuccess(final ConsumerRecordContext recordContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import io.vertx.core.Future;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpVersion;
import io.vertx.ext.web.client.impl.HttpResponseImpl;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
Expand All @@ -41,27 +45,23 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;

import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;

@ExtendWith(VertxExtension.class)
public class RecordDispatcherTest {

private static final ResourceContext resourceContext = new ResourceContext(
DataPlaneContract.Resource.newBuilder().build(),
DataPlaneContract.Egress.newBuilder().build()
DataPlaneContract.Egress.newBuilder().setDestination("testdest").build()
);

static {
Expand Down Expand Up @@ -184,7 +184,7 @@ final var record = record();
assertTrue(subscriberSenderSendCalled.get());
assertTrue(dlsSenderSendCalled.get());
verify(receiver, times(1)).recordReceived(record);
verify(receiver, times(1)).successfullySentToDeadLetterSink(record);
verify(receiver, times(1)).successfullySentToDeadLetterSink(any());
verify(receiver, never()).successfullySentToSubscriber(any());
verify(receiver, never()).failedToSendToDeadLetterSink(any(), any());
verify(receiver, never()).recordDiscarded(any());
Expand Down Expand Up @@ -228,7 +228,7 @@ final var record = record();
assertTrue(subscriberSenderSendCalled.get());
assertTrue(dlsSenderSendCalled.get());
verify(receiver, times(1)).recordReceived(record);
verify(receiver, times(1)).failedToSendToDeadLetterSink(eq(record), any());
verify(receiver, times(1)).failedToSendToDeadLetterSink(any(), any());
verify(receiver, never()).successfullySentToDeadLetterSink(any());
verify(receiver, never()).successfullySentToSubscriber(any());
verify(receiver, never()).recordDiscarded(any());
Expand All @@ -239,6 +239,140 @@ final var record = record();
assertNoDiscardedEventCount();
}

@Test
public void failedEventsShouldBeEnhancedWithErrorExtensionsPriorToSendingToDls() {

final var subscriberSenderSendCalled = new AtomicBoolean(false);
final var dlsSenderSendCalled = new AtomicBoolean(false);
final RecordDispatcherListener receiver = offsetManagerMock();

int errorCode = 422;
String errorBody = "{ \"message\": \"bad bad things happened\" }";

final var dispatcherHandler = new RecordDispatcherImpl(
resourceContext,
value -> true,
new CloudEventSenderMock(
record -> {
subscriberSenderSendCalled.set(true);
return Future.failedFuture(new ResponseFailureException(makeHttpResponse(errorCode, errorBody), ""));
}
),
new CloudEventSenderMock(
record -> {
dlsSenderSendCalled.set(true);
return Future.succeededFuture();
}
), new ResponseHandlerMock(),
receiver,
null,
registry
);
final var record = record();
dispatcherHandler.dispatch(record);

ArgumentCaptor<KafkaConsumerRecord<Object, CloudEvent>> captor = ArgumentCaptor.forClass(KafkaConsumerRecord.class);

assertTrue(subscriberSenderSendCalled.get());
assertTrue(dlsSenderSendCalled.get());
verify(receiver, times(1)).recordReceived(record);
verify(receiver, times(1)).successfullySentToDeadLetterSink(captor.capture());
verify(receiver, never()).successfullySentToSubscriber(any());
verify(receiver, never()).failedToSendToDeadLetterSink(any(), any());
verify(receiver, never()).recordDiscarded(any());

KafkaConsumerRecord<Object, CloudEvent> failedRecord = captor.getValue();
assertEquals(record.topic(), failedRecord.topic());
assertEquals(record.partition(), failedRecord.partition());
assertEquals(record.offset(), failedRecord.offset());
assertEquals(record.key(), failedRecord.key());
assertEquals(record.value().getId(), failedRecord.value().getId());
assertEquals(record.value().getAttributeNames(), failedRecord.value().getAttributeNames());
assertEquals(record.value().getData(), failedRecord.value().getData());
assertEquals("testdest", failedRecord.value().getExtension("knativeerrordest"));
assertEquals(String.valueOf(errorCode), failedRecord.value().getExtension("knativeerrorcode"));
assertEquals(errorBody, failedRecord.value().getExtension("knativeerrordata"));

assertEventDispatchLatency();
assertEventProcessingLatency();
assertEventCount();
assertNoDiscardedEventCount();
}

@Test
public void failedEventsShouldBeEnhancedWithErrorExtensionsPriorToSendingToDlsBodyTooLarge() {

final var subscriberSenderSendCalled = new AtomicBoolean(false);
final var dlsSenderSendCalled = new AtomicBoolean(false);
final RecordDispatcherListener receiver = offsetManagerMock();

int errorCode = 422;
String errorBody = "A".repeat(1024);
String errorBodyTooLarge = errorBody + "QWERTY";

final var dispatcherHandler = new RecordDispatcherImpl(
resourceContext,
value -> true,
new CloudEventSenderMock(
record -> {
subscriberSenderSendCalled.set(true);
return Future.failedFuture(new ResponseFailureException(makeHttpResponse(errorCode, errorBodyTooLarge), ""));
}
),
new CloudEventSenderMock(
record -> {
dlsSenderSendCalled.set(true);
return Future.succeededFuture();
}
), new ResponseHandlerMock(),
receiver,
null,
registry
);
final var record = record();
dispatcherHandler.dispatch(record);

ArgumentCaptor<KafkaConsumerRecord<Object, CloudEvent>> captor = ArgumentCaptor.forClass(KafkaConsumerRecord.class);

assertTrue(subscriberSenderSendCalled.get());
assertTrue(dlsSenderSendCalled.get());
verify(receiver, times(1)).recordReceived(record);
verify(receiver, times(1)).successfullySentToDeadLetterSink(captor.capture());
verify(receiver, never()).successfullySentToSubscriber(any());
verify(receiver, never()).failedToSendToDeadLetterSink(any(), any());
verify(receiver, never()).recordDiscarded(any());

KafkaConsumerRecord<Object, CloudEvent> failedRecord = captor.getValue();
assertEquals(record.topic(), failedRecord.topic());
assertEquals(record.partition(), failedRecord.partition());
assertEquals(record.offset(), failedRecord.offset());
assertEquals(record.key(), failedRecord.key());
assertEquals(record.value().getId(), failedRecord.value().getId());
assertEquals(record.value().getAttributeNames(), failedRecord.value().getAttributeNames());
assertEquals(record.value().getData(), failedRecord.value().getData());
assertEquals("testdest", failedRecord.value().getExtension("knativeerrordest"));
assertEquals(String.valueOf(errorCode), failedRecord.value().getExtension("knativeerrorcode"));
assertEquals(errorBody, failedRecord.value().getExtension("knativeerrordata"));

assertEventDispatchLatency();
assertEventProcessingLatency();
assertEventCount();
assertNoDiscardedEventCount();
}

private HttpResponseImpl<Buffer> makeHttpResponse(int statusCode, String body) {
return new HttpResponseImpl<Buffer>(
HttpVersion.HTTP_2,
statusCode,
"",
MultiMap.caseInsensitiveMultiMap(),
MultiMap.caseInsensitiveMultiMap(),
Collections.emptyList(),
Buffer.buffer(body, "UTF-8"),
Collections.emptyList()
);
}

@Test
public void shouldCallFailedToSendToDeadLetterSinkIfValueMatchesAndSubscriberSenderFailsAndNoDeadLetterSinkSender() {
final var subscriberSenderSendCalled = new AtomicBoolean(false);
Expand All @@ -265,7 +399,7 @@ final var record = record();

assertTrue(subscriberSenderSendCalled.get());
verify(receiver, times(1)).recordReceived(record);
verify(receiver, times(1)).failedToSendToDeadLetterSink(eq(record), any());
verify(receiver, times(1)).failedToSendToDeadLetterSink(any(), any());
verify(receiver, never()).successfullySentToDeadLetterSink(any());
verify(receiver, never()).successfullySentToSubscriber(any());
verify(receiver, never()).recordDiscarded(any());
Expand Down
Loading