Skip to content

Commit

Permalink
enhancing failed events with knativeerror* extensions before sending …
Browse files Browse the repository at this point in the history
…to dls
  • Loading branch information
Egor Lepa committed Jul 4, 2022
1 parent decd959 commit ff6e94c
Showing 1 changed file with 48 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,12 @@

import dev.knative.eventing.kafka.broker.core.AsyncCloseable;
import dev.knative.eventing.kafka.broker.core.metrics.Metrics;
import dev.knative.eventing.kafka.broker.dispatcher.CloudEventSender;
import dev.knative.eventing.kafka.broker.dispatcher.Filter;
import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcher;
import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcherListener;
import dev.knative.eventing.kafka.broker.dispatcher.ResponseHandler;
import dev.knative.eventing.kafka.broker.dispatcher.*;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.CloudEventDeserializer;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.InvalidCloudEvent;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.KafkaConsumerRecordUtils;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
Expand All @@ -37,6 +34,7 @@
import io.vertx.kafka.client.common.tracing.ConsumerTracer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.consumer.impl.KafkaConsumerRecordImpl;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -257,9 +255,51 @@ private void onSubscriberFailure(final Throwable failure,
incrementEventCount(response, recordContext);
recordDispatchLatency(response, recordContext);

dlsSender.apply(recordContext.getRecord())
.onSuccess(v -> onDeadLetterSinkSuccess(recordContext, finalProm))
.onFailure(ex -> onDeadLetterSinkFailure(recordContext, ex, finalProm));
ConsumerRecordContext transformedRecordContext = errorTransform(recordContext, response);

dlsSender.apply(transformedRecordContext.getRecord())
.onSuccess(v -> onDeadLetterSinkSuccess(transformedRecordContext, finalProm))
.onFailure(ex -> onDeadLetterSinkFailure(transformedRecordContext, ex, finalProm));
}

private ConsumerRecordContext errorTransform(final ConsumerRecordContext recordContext, @Nullable final HttpResponse<?> response) {
if (response == null) {
return recordContext;
}

String url = resourceContext.getEgress().getDestination();
int code = response.statusCode();
String data = response.bodyAsString();
if (data.length() > 1024) {
data = data.substring(0, 1024);
}

CloudEvent cloudEvent = recordContext.getRecord().value();

CloudEvent transformedCloudEvent = CloudEventBuilder.v1(cloudEvent)
.withExtension("knativeerrordest", url)
.withExtension("knativeerrorcode", code)
.withExtension("knativeerrordata", data)
.build();

ConsumerRecord<Object, CloudEvent> cr = new ConsumerRecord<>(
recordContext.getRecord().record().topic(),
recordContext.getRecord().record().partition(),
recordContext.getRecord().record().offset(),
recordContext.getRecord().record().timestamp(),
recordContext.getRecord().record().timestampType(),
null,
recordContext.getRecord().record().serializedKeySize(),
recordContext.getRecord().record().serializedValueSize(),
recordContext.getRecord().record().key(),
transformedCloudEvent,
recordContext.getRecord().record().headers(),
recordContext.getRecord().record().leaderEpoch()
);

KafkaConsumerRecordImpl<Object, CloudEvent> kcr = new KafkaConsumerRecordImpl<>(cr);

return new ConsumerRecordContext(kcr);
}

private void onDeadLetterSinkSuccess(final ConsumerRecordContext recordContext,
Expand Down

0 comments on commit ff6e94c

Please sign in to comment.