Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-v0.26] Properly handle events without the data field #39

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,22 @@
import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcher;
import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcherListener;
import dev.knative.eventing.kafka.broker.dispatcher.ResponseHandler;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.KafkaConsumerRecordUtils;
import io.cloudevents.CloudEvent;
import io.cloudevents.kafka.CloudEventDeserializer;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.kafka.client.common.tracing.ConsumerTracer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.consumer.impl.KafkaConsumerRecordImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;
import java.util.function.Function;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue;

/**
Expand All @@ -47,6 +49,8 @@ public class RecordDispatcherImpl implements RecordDispatcher {

private static final Logger logger = LoggerFactory.getLogger(RecordDispatcherImpl.class);

private static final CloudEventDeserializer cloudEventDeserializer = new CloudEventDeserializer();

private final Filter filter;
private final Function<KafkaConsumerRecord<String, CloudEvent>, Future<Void>> subscriberSender;
private final Function<KafkaConsumerRecord<String, CloudEvent>, Future<Void>> dlsSender;
Expand Down Expand Up @@ -92,8 +96,6 @@ public RecordDispatcherImpl(
*/
@Override
public Future<Void> dispatch(KafkaConsumerRecord<String, CloudEvent> record) {
Promise<Void> promise = Promise.promise();

/*
That's pretty much what happens here:

Expand All @@ -116,9 +118,23 @@ public Future<Void> dispatch(KafkaConsumerRecord<String, CloudEvent> record) {
+->end<--+
*/

onRecordReceived(record, promise);

return promise.future();
try {
Promise<Void> promise = Promise.promise();
onRecordReceived(maybeDeserializeValueFromHeaders(record), promise);
return promise.future();
} catch (final Exception ex) {
// This is a fatal exception that shouldn't happen in normal cases.
//
// It might happen if folks send bad records to a topic that is
// managed by our system.
//
// So discard record if we can't deal with the record, so that we can
// make progress in the partition.
logError("Exception occurred, discarding the record", record, ex);
recordDispatcherListener.recordReceived(record);
recordDispatcherListener.recordDiscarded(record);
return Future.failedFuture(ex);
}
}

private void onRecordReceived(final KafkaConsumerRecord<String, CloudEvent> record, Promise<Void> finalProm) {
Expand Down Expand Up @@ -192,6 +208,23 @@ private void onDeadLetterSinkFailure(final KafkaConsumerRecord<String, CloudEven
finalProm.complete();
}

private static KafkaConsumerRecord<String, CloudEvent> maybeDeserializeValueFromHeaders(KafkaConsumerRecord<String, CloudEvent> record) {
if (record.value() != null) {
return record;
}
// A valid CloudEvent in the CE binary protocol binding of Kafka
// might be composed by only Headers.
//
// KafkaConsumer doesn't call the deserializer if the value
// is null.
//
// That means that we get a record with a null value and some CE
// headers even though the record is a valid CloudEvent.
logDebug("Value is null", record);
final var value = cloudEventDeserializer.deserialize(record.record().topic(), record.record().headers(), null);
return new KafkaConsumerRecordImpl<>(KafkaConsumerRecordUtils.copyRecordAssigningValue(record.record(), value));
}

private static Function<KafkaConsumerRecord<String, CloudEvent>, Future<Void>> composeSenderAndSinkHandler(
CloudEventSender sender, ResponseHandler sinkHandler, String senderType) {
return rec -> sender.send(rec.value())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright © 2018 Knative Authors (knative-dev@googlegroups.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.dispatcher.impl.consumer;

import io.cloudevents.CloudEvent;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public final class KafkaConsumerRecordUtils {

private KafkaConsumerRecordUtils() {
}

public static <T> ConsumerRecord<T, CloudEvent> copyRecordAssigningValue(final ConsumerRecord<T, CloudEvent> record,
final CloudEvent value) {
return new ConsumerRecord<>(
record.topic(),
record.partition(),
record.offset(),
record.timestamp(),
record.timestampType(),
record.checksum(),
record.serializedKeySize(),
record.serializedValueSize(),
record.key(),
value,
record.headers(),
record.leaderEpoch()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -117,35 +118,40 @@ public static void setUp(final Vertx vertx, final VertxTestContext context) thro

/*
1: event sent by the source to the Broker
2: event sent by the service in the response
2: event sent by the trigger 1 in the response
3: event sent by the trigger 2 in the response
2
+----------------------+
| |
| +-----+-----+
| 1 | |
| +---------->+ Trigger 1 |
v | | |
v | 3 | |
+------------+ +-------------+ +-------+----+----+ +-----------+
| | 1 | | 2 | |
| HTTPClient +--------->+ Receiver | +--------+ Dispatcher |
| | | | | | |
+------------+ +------+------+ | +--------+---+----+ +-----------+
| | ^ | | |
| | ^ | 3 | |
| v | +---------->+ Trigger 2 |
1 | +--------+--------+ | 2 | |
| | | 1 | +-----------+
+----->+ Kafka +--------+
| | 2 +-----------+
+-----------------+ | |
+-----------------+ 3 | |
| Trigger 3 |
| |
+-----------+




*/
@Test
@Timeout(timeUnit = TimeUnit.MINUTES, value = 1)
public void execute(final Vertx vertx, final VertxTestContext context) {
public void execute(final Vertx vertx, final VertxTestContext context) throws InterruptedException {

final var checkpoints = context.checkpoint(3);
final var checkpoints = context.checkpoint(4);

// event sent by the source to the Broker (see 1 in diagram)
final var expectedRequestEvent = CloudEventBuilder.v1()
Expand All @@ -158,7 +164,7 @@ public void execute(final Vertx vertx, final VertxTestContext context) {
.build();

// event sent in the response by the Callable service (see 2 in diagram)
final var expectedResponseEvent = CloudEventBuilder.v03()
final var expectedResponseEventService2 = CloudEventBuilder.v03()
.withId(UUID.randomUUID().toString())
.withDataSchema(URI.create("/api/data-schema-ce-2"))
.withSubject("subject-ce-2")
Expand All @@ -167,6 +173,20 @@ public void execute(final Vertx vertx, final VertxTestContext context) {
.withType(TYPE_CE_2)
.build();

// event sent in the response by the Callable service 2 (see 3 in diagram)
final var expectedResponseEventService1 = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withDataSchema(URI.create("/api/data-schema-ce-3"))
.withSource(URI.create("/api/rossi"))
.withSubject("subject-ce-3")
.withType(TYPE_CE_1)
.build();

final var service1ExpectedEventsIterator = List.of(
expectedRequestEvent,
expectedResponseEventService1
).iterator();

final var resource = DataPlaneContract.Resource.newBuilder()
.addTopics(TOPIC)
.setIngress(DataPlaneContract.Ingress.newBuilder().setPath(format("/%s/%s", BROKER_NAMESPACE, BROKER_NAME)))
Expand Down Expand Up @@ -207,9 +227,13 @@ public void execute(final Vertx vertx, final VertxTestContext context) {
new ContractPublisher(vertx.eventBus(), ResourcesReconcilerMessageHandler.ADDRESS)
.accept(DataPlaneContract.Contract.newBuilder().addResources(resource).build());

await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> assertThat(vertx.deploymentIDs())
await()
.atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> assertThat(vertx.deploymentIDs())
.hasSize(resource.getEgressesCount() + NUM_RESOURCES + NUM_SYSTEM_VERTICLES));

Thread.sleep(2000); // Give consumers time to start

// start service
vertx.createHttpServer()
.exceptionHandler(context::failNow)
Expand All @@ -221,22 +245,29 @@ public void execute(final Vertx vertx, final VertxTestContext context) {

// service 1 receives event sent by the HTTPClient
if (request.path().equals(PATH_SERVICE_1)) {
final var expectedEvent = service1ExpectedEventsIterator.next();
context.verify(() -> {
assertThat(event).isEqualTo(expectedRequestEvent);
assertThat(event).isEqualTo(expectedEvent);
checkpoints.flag(); // 2
});

// write event to the response, the event will be handled by service 2
VertxMessageFactory.createWriter(request.response())
.writeBinary(expectedResponseEvent);
if (service1ExpectedEventsIterator.hasNext()) {
// write event to the response, the event will be handled by service 2
VertxMessageFactory.createWriter(request.response())
.writeBinary(expectedResponseEventService2);
}
}

// service 2 receives event in the response
if (request.path().equals(PATH_SERVICE_2)) {
context.verify(() -> {
assertThat(event).isEqualTo(expectedResponseEvent);
assertThat(event).isEqualTo(expectedResponseEventService2);
checkpoints.flag(); // 3
});

// write event to the response, the event will be handled by service 2
VertxMessageFactory.createWriter(request.response())
.writeBinary(expectedResponseEventService1);
}

if (request.path().equals(PATH_SERVICE_3)) {
Expand Down