Skip to content

Commit

Permalink
Properly handle events without data field
Browse files Browse the repository at this point in the history
A valid CloudEvent in the CE binary protocol binding of Kafka
might be composed by only Headers.

KafkaConsumer doesn't call the deserializer if the value
is null.

That means that we get a record with a null value even though
the record is a valid CloudEvent.

This patch handles events without the data field properly
by creating the CloudEvent object from record headers, if
the above conditions apply.

Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com>
  • Loading branch information
pierDipi committed Nov 10, 2021
1 parent d24bcb7 commit 8267d11
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,22 @@
import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcher;
import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcherListener;
import dev.knative.eventing.kafka.broker.dispatcher.ResponseHandler;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.CloudEventDeserializer;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.KafkaConsumerRecordUtils;
import io.cloudevents.CloudEvent;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.kafka.client.common.tracing.ConsumerTracer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.consumer.impl.KafkaConsumerRecordImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;
import java.util.function.Function;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue;

/**
Expand All @@ -47,6 +49,8 @@ public class RecordDispatcherImpl implements RecordDispatcher {

private static final Logger logger = LoggerFactory.getLogger(RecordDispatcherImpl.class);

private static final CloudEventDeserializer cloudEventDeserializer = new CloudEventDeserializer();

private final Filter filter;
private final Function<KafkaConsumerRecord<Object, CloudEvent>, Future<Void>> subscriberSender;
private final Function<KafkaConsumerRecord<Object, CloudEvent>, Future<Void>> dlsSender;
Expand Down Expand Up @@ -116,7 +120,7 @@ public Future<Void> dispatch(KafkaConsumerRecord<Object, CloudEvent> record) {
+->end<--+
*/

onRecordReceived(record, promise);
onRecordReceived(buildCloudEventValueIfMissing(record), promise);

return promise.future();
}
Expand Down Expand Up @@ -192,6 +196,23 @@ private void onDeadLetterSinkFailure(final KafkaConsumerRecord<Object, CloudEven
finalProm.complete();
}

private static KafkaConsumerRecord<Object, CloudEvent> buildCloudEventValueIfMissing(KafkaConsumerRecord<Object, CloudEvent> record) {
// A valid CloudEvent in the CE binary protocol binding of Kafka
// might be composed by only Headers.
//
// KafkaConsumer doesn't call the deserializer if the value
// is null.
//
// That means that we get a record with a null value event though
// the record is a valid CloudEvent.
if (record.value() == null) {
logDebug("Value is null", record);
final var value = cloudEventDeserializer.deserialize(record.record().topic(), record.record().headers(), null);
return new KafkaConsumerRecordImpl<>(KafkaConsumerRecordUtils.copyRecordAssigningValue(record.record(), value));
}
return record;
}

private static Function<KafkaConsumerRecord<Object, CloudEvent>, Future<Void>> composeSenderAndSinkHandler(
CloudEventSender sender, ResponseHandler sinkHandler, String senderType) {
return rec -> sender.send(rec.value())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.stream.Stream;

import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue;
import static dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.KafkaConsumerRecordUtils.copyRecordAssigningValue;
import static io.cloudevents.kafka.PartitionKeyExtensionInterceptor.PARTITION_KEY_EXTENSION;

/**
Expand Down Expand Up @@ -174,20 +175,7 @@ private ConsumerRecord<Object, CloudEvent> validRecord(final ConsumerRecord<Obje
}

// Copy consumer record and set value to a valid CloudEvent.
return new ConsumerRecord<>(
record.topic(),
record.partition(),
record.offset(),
record.timestamp(),
record.timestampType(),
record.checksum(),
record.serializedKeySize(),
record.serializedValueSize(),
record.key(),
value.build(),
record.headers(),
record.leaderEpoch()
);
return copyRecordAssigningValue(record, value.build());
}

private static void setKey(CloudEventBuilder value, final Object key) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright © 2018 Knative Authors (knative-dev@googlegroups.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.dispatcher.impl.consumer;

import io.cloudevents.CloudEvent;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public final class KafkaConsumerRecordUtils {

private KafkaConsumerRecordUtils() {
}

public static <T> ConsumerRecord<T, CloudEvent> copyRecordAssigningValue(final ConsumerRecord<T, CloudEvent> record,
final CloudEvent value) {
return new ConsumerRecord<>(
record.topic(),
record.partition(),
record.offset(),
record.timestamp(),
record.timestampType(),
record.checksum(),
record.serializedKeySize(),
record.serializedValueSize(),
record.key(),
value,
record.headers(),
record.leaderEpoch()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -117,35 +118,40 @@ public static void setUp(final Vertx vertx, final VertxTestContext context) thro

/*
1: event sent by the source to the Broker
2: event sent by the service in the response
2: event sent by the trigger 1 in the response
3: event sent by the trigger 2 in the response
2
+----------------------+
| |
| +-----+-----+
| 1 | |
| +---------->+ Trigger 1 |
v | | |
v | 3 | |
+------------+ +-------------+ +-------+----+----+ +-----------+
| | 1 | | 2 | |
| HTTPClient +--------->+ Receiver | +--------+ Dispatcher |
| | | | | | |
+------------+ +------+------+ | +--------+---+----+ +-----------+
| | ^ | | |
| | ^ | 3 | |
| v | +---------->+ Trigger 2 |
1 | +--------+--------+ | 2 | |
| | | 1 | +-----------+
+----->+ Kafka +--------+
| | 2 +-----------+
+-----------------+ | |
+-----------------+ 3 | |
| Trigger 3 |
| |
+-----------+
*/
@Test
@Timeout(timeUnit = TimeUnit.MINUTES, value = 1)
public void execute(final Vertx vertx, final VertxTestContext context) {
public void execute(final Vertx vertx, final VertxTestContext context) throws InterruptedException {

final var checkpoints = context.checkpoint(3);
final var checkpoints = context.checkpoint(4);

// event sent by the source to the Broker (see 1 in diagram)
final var expectedRequestEvent = CloudEventBuilder.v1()
Expand All @@ -158,7 +164,7 @@ public void execute(final Vertx vertx, final VertxTestContext context) {
.build();

// event sent in the response by the Callable service (see 2 in diagram)
final var expectedResponseEvent = CloudEventBuilder.v03()
final var expectedResponseEventService2 = CloudEventBuilder.v03()
.withId(UUID.randomUUID().toString())
.withDataSchema(URI.create("/api/data-schema-ce-2"))
.withSubject("subject-ce-2")
Expand All @@ -167,6 +173,20 @@ public void execute(final Vertx vertx, final VertxTestContext context) {
.withType(TYPE_CE_2)
.build();

// event sent in the response by the Callable service 2 (see 3 in diagram)
final var expectedResponseEventService1 = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withDataSchema(URI.create("/api/data-schema-ce-3"))
.withSource(URI.create("/api/rossi"))
.withSubject("subject-ce-3")
.withType(TYPE_CE_1)
.build();

final var service1ExpectedEventsIterator = List.of(
expectedRequestEvent,
expectedResponseEventService1
).iterator();

final var resource = DataPlaneContract.Resource.newBuilder()
.addTopics(TOPIC)
.setIngress(DataPlaneContract.Ingress.newBuilder().setPath(format("/%s/%s", BROKER_NAMESPACE, BROKER_NAME)))
Expand Down Expand Up @@ -207,9 +227,13 @@ public void execute(final Vertx vertx, final VertxTestContext context) {
new ContractPublisher(vertx.eventBus(), ResourcesReconcilerMessageHandler.ADDRESS)
.accept(DataPlaneContract.Contract.newBuilder().addResources(resource).build());

await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> assertThat(vertx.deploymentIDs())
await()
.atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> assertThat(vertx.deploymentIDs())
.hasSize(resource.getEgressesCount() + NUM_RESOURCES + NUM_SYSTEM_VERTICLES));

Thread.sleep(2000); // Give consumers time to start

// start service
vertx.createHttpServer()
.exceptionHandler(context::failNow)
Expand All @@ -221,22 +245,29 @@ public void execute(final Vertx vertx, final VertxTestContext context) {

// service 1 receives event sent by the HTTPClient
if (request.path().equals(PATH_SERVICE_1)) {
final var expectedEvent = service1ExpectedEventsIterator.next();
context.verify(() -> {
assertThat(event).isEqualTo(expectedRequestEvent);
assertThat(event).isEqualTo(expectedEvent);
checkpoints.flag(); // 2
});

// write event to the response, the event will be handled by service 2
VertxMessageFactory.createWriter(request.response())
.writeBinary(expectedResponseEvent);
if (service1ExpectedEventsIterator.hasNext()) {
// write event to the response, the event will be handled by service 2
VertxMessageFactory.createWriter(request.response())
.writeBinary(expectedResponseEventService2);
}
}

// service 2 receives event in the response
if (request.path().equals(PATH_SERVICE_2)) {
context.verify(() -> {
assertThat(event).isEqualTo(expectedResponseEvent);
assertThat(event).isEqualTo(expectedResponseEventService2);
checkpoints.flag(); // 3
});

// write event to the response, the event will be handled by service 2
VertxMessageFactory.createWriter(request.response())
.writeBinary(expectedResponseEventService1);
}

if (request.path().equals(PATH_SERVICE_3)) {
Expand Down

0 comments on commit 8267d11

Please sign in to comment.