Skip to content

Commit

Permalink
[compat][server][client][test] Global RT DIV improvement (part 2): Ch…
Browse files Browse the repository at this point in the history
…unking support for DIV message

This change mainly focuses on adding chunking support for DIV messages when they are produced to Kafka topics,
as the size of DIV message can be large. We leverage today's chunking mechanism for regular records and extend
it to support DIV with the following modifications:

1. All the DIV messages are of type {@link MessageType#CONTROL_MESSAGE_DIV} in its KafkaKey and their corresponding
   KafkaMessageEnvelope uses Put as the payload.
2. Inside the Put payload, the actual message is stored in the putValue field and the schemaId can have the following cases:
   - If the DIV message is non-chunked, the schemaId is set to GLOBAL_DIV_STATE.
   - If the DIV message is chunk message, the schemaId is set to CHUNK.
   - If the DIV message is a chunk manifest message, the schemaId is set to CHUNKED_VALUE_MANIFEST.
3. ChunkAssembler is adapted, on the receiver side, to buffer, assemble, and deserialize DIV messages (chunked/non-chunked).
  • Loading branch information
lluwm committed Dec 5, 2024
1 parent 9a03934 commit 88d1d2d
Show file tree
Hide file tree
Showing 15 changed files with 860 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1907,6 +1907,20 @@ protected boolean shouldProcessRecord(PubSubMessage<KafkaKey, KafkaMessageEnvelo
}
}
}

/**
* Leader is consuming from remote version topic, and if it read div control messages in the remote version
* topic, it should ignore them and not process or apply them to its own div state.
*/
if (record.getKey().isDivControlMessage()) {
String msg = String.format(
"Leader for replica: %s received a div control message in remote version topic. Skipping the message.",
partitionConsumptionState.getReplicaId());
if (!REDUNDANT_LOGGING_FILTER.isRedundantException(msg)) {
LOGGER.info(msg);
}
return false;
}
}
if (!record.getTopicPartition().getPubSubTopic().equals(currentLeaderTopic)) {
String errorMsg =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS;
import static com.linkedin.venice.LogMessages.KILLED_JOB_MESSAGE;
import static com.linkedin.venice.kafka.protocol.enums.ControlMessageType.START_OF_SEGMENT;
import static com.linkedin.venice.serialization.avro.AvroProtocolDefinition.GLOBAL_DIV_STATE;
import static com.linkedin.venice.utils.Utils.FATAL_DATA_VALIDATION_ERROR;
import static com.linkedin.venice.utils.Utils.getReplicaId;
import static java.util.concurrent.TimeUnit.HOURS;
Expand Down Expand Up @@ -47,6 +48,7 @@
import com.linkedin.venice.common.VeniceSystemStoreType;
import com.linkedin.venice.common.VeniceSystemStoreUtils;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.compression.NoopCompressor;
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.exceptions.DiskLimitExhaustedException;
import com.linkedin.venice.exceptions.MemoryLimitExhaustedException;
Expand Down Expand Up @@ -241,6 +243,8 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {
* flushed to the metadata partition of the storage engine regularly in {@link #syncOffset(String, PartitionConsumptionState)}
*/
private final KafkaDataIntegrityValidator kafkaDataIntegrityValidator;
private final ChunkAssembler divChunkAssembler;

protected final HostLevelIngestionStats hostLevelIngestionStats;
protected final AggVersionedDIVStats versionedDIVStats;
protected final AggVersionedIngestionStats versionedIngestionStats;
Expand Down Expand Up @@ -451,6 +455,8 @@ public StoreIngestionTask(
new IngestionNotificationDispatcher(notifiers, kafkaVersionTopic, isCurrentVersion);
this.missingSOPCheckExecutor.execute(() -> waitForStateVersion(kafkaVersionTopic));
this.chunkAssembler = new ChunkAssembler(storeName);
this.divChunkAssembler =
builder.getDivChunkAssembler() != null ? builder.getDivChunkAssembler() : new ChunkAssembler(storeName);
this.cacheBackend = cacheBackend;

if (recordTransformerFunction != null) {
Expand Down Expand Up @@ -1125,6 +1131,13 @@ private int handleSingleMessage(
record.getTopicPartition().getPartitionNumber(),
partitionConsumptionStateMap.get(topicPartition.getPartitionNumber()));
}
} else if (record.getKey().isDivControlMessage()) {
// This is a control message from the DIV topic, process it and return early.
// TODO: This is a placeholder for the actual implementation.
if (isGlobalRtDivEnabled) {
processDivControlMessage(record);
}
return 0;
}

// This function may modify the original record in KME and it is unsafe to use the payload from KME directly after
Expand Down Expand Up @@ -1169,6 +1182,28 @@ private int handleSingleMessage(
return record.getPayloadSize();
}

void processDivControlMessage(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> record) {
KafkaKey key = record.getKey();
KafkaMessageEnvelope value = record.getValue();
Put put = (Put) value.getPayloadUnion();

Object assembledObject = divChunkAssembler.bufferAndAssembleRecord(
record.getTopicPartition(),
put.getSchemaId(),
key.getKey(),
put.getPutValue(),
record.getOffset(),
GLOBAL_DIV_STATE,
put.getSchemaId(),
new NoopCompressor());

// If the assembled object is null, it means that the object is not yet fully assembled, so we can return early.
if (assembledObject == null) {
return;
}
// TODO: We will add the code to process DIV control message later in here.
}

/**
* This function is in charge of producing the consumer records to the writer buffers maintained by {@link StoreBufferService}.
*
Expand Down Expand Up @@ -2374,6 +2409,15 @@ protected boolean shouldProcessRecord(PubSubMessage<KafkaKey, KafkaMessageEnvelo
return false;
}

/**
* Only version topics are used for DIV control messages to be produced to and consumed from. It is unexpected to
* read div control messages from real-time topics. Skip them and log a warning.
*/
if (record.getKey().isDivControlMessage() && record.getTopic().isRealTime()) {
LOGGER.warn("Skipping control message from real-time topic-partition: {}", record.getTopicPartition());
return false;
}

if (partitionConsumptionState.isEndOfPushReceived() && partitionConsumptionState.isBatchOnly()) {
KafkaKey key = record.getKey();
KafkaMessageEnvelope value = record.getValue();
Expand Down Expand Up @@ -3739,6 +3783,11 @@ private void waitReadyToProcessRecord(PubSubMessage<KafkaKey, KafkaMessageEnvelo
return;
}

// Do not need to check schema availability for DIV messages as schema is already known.
if (record.getKey().isDivControlMessage()) {
return;
}

switch (MessageType.valueOf(kafkaValue)) {
case PUT:
Put put = (Put) kafkaValue.payloadUnion;
Expand Down Expand Up @@ -4445,4 +4494,8 @@ void setVersionRole(PartitionReplicaIngestionContext.VersionRole versionRole) {
protected boolean isDaVinciClient() {
return isDaVinciClient;
}

ChunkAssembler getDivChunkAssembler() {
return this.divChunkAssembler;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend;
import com.linkedin.davinci.store.view.VeniceViewWriterFactory;
import com.linkedin.davinci.utils.ChunkAssembler;
import com.linkedin.venice.kafka.protocol.state.PartitionState;
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
Expand Down Expand Up @@ -122,6 +123,8 @@ public static class Builder {
private Runnable runnableForKillIngestionTasksForNonCurrentVersions;
private ExecutorService aaWCWorkLoadProcessingThreadPool;

private ChunkAssembler divChunkAssembler;

private interface Setter {
void apply();
}
Expand Down Expand Up @@ -331,5 +334,13 @@ public Builder setAAWCWorkLoadProcessingThreadPool(ExecutorService executorServi
public ExecutorService getAAWCWorkLoadProcessingThreadPool() {
return this.aaWCWorkLoadProcessingThreadPool;
}

public Builder setDivChunkAssembler(ChunkAssembler divChunkAssembler) {
return set(() -> this.divChunkAssembler = divChunkAssembler);
}

public ChunkAssembler getDivChunkAssembler() {
return divChunkAssembler;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.serialization.RawBytesStoreDeserializerCache;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
import com.linkedin.venice.serializer.RecordDeserializer;
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.lazy.Lazy;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.avro.specific.SpecificRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -84,6 +87,51 @@ public <T> T bufferAndAssembleRecord(
return decompressedAndDeserializedRecord;
}

/**
* This method is used to buffer and assemble chunking records consumed from a Kafka topic. For chunked records, we
* buffer the chunks in memory until we have all the chunks for a given key. Once we have all the chunks indicated by
* receiving the chunk manifest record, we assemble the chunks and deserialize it from binary back into an object
* by using the provided deserializer and return the fully assembled record.
*
* The provided deserializer is associated with an AvroProtocolDefinition to select the appropriate
* protocol serializer for deserialization.
*
* Note that if the passed-in record is a regular record (not chunked), we will return the record after
* deserializing it without buffering it in memory.
*/
public <T> T bufferAndAssembleRecord(
PubSubTopicPartition pubSubTopicPartition,
int schemaId,
byte[] keyBytes,
ByteBuffer valueBytes,
long recordOffset,
AvroProtocolDefinition protocol,
int readerSchemaId,
VeniceCompressor compressor) {
ByteBuffer assembledRecord = bufferAndAssembleRecord(
pubSubTopicPartition,
schemaId,
keyBytes,
valueBytes,
recordOffset,
readerSchemaId,
compressor);
T decompressedAndDeserializedRecord = null;

// Record is a chunk. Return null
if (assembledRecord == null) {
return decompressedAndDeserializedRecord;
}

try {
decompressedAndDeserializedRecord = decompressAndDeserialize(protocol, compressor, assembledRecord);
} catch (Exception e) {
throw new RuntimeException(e);
}

return decompressedAndDeserializedRecord;
}

/**
* Buffers and assembles chunks of a record.
*
Expand All @@ -105,17 +153,20 @@ public ByteBuffer bufferAndAssembleRecord(
}
// If this is a record chunk, store the chunk and return null for processing this record
if (schemaId == AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion()) {
// We need to extract data from valueBytes, otherwise it could contain non-data in the array.
inMemoryStorageEngine.put(
pubSubTopicPartition.getPartitionNumber(),
keyBytes,
ValueRecord.create(schemaId, valueBytes.array()).serialize());
ValueRecord.create(schemaId, ByteUtils.extractByteArray(valueBytes)).serialize());
return null;
} else if (schemaId == AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion()) {
}

if (schemaId == AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion()) {
// This is the last value. Store it, and now read it from the in memory store as a fully assembled value
inMemoryStorageEngine.put(
pubSubTopicPartition.getPartitionNumber(),
keyBytes,
ValueRecord.create(schemaId, valueBytes.array()).serialize());
ValueRecord.create(schemaId, ByteUtils.extractByteArray(valueBytes)).serialize());
try {
assembledRecord = RawBytesChunkingAdapter.INSTANCE.get(
inMemoryStorageEngine,
Expand All @@ -137,7 +188,8 @@ public ByteBuffer bufferAndAssembleRecord(
LOGGER.warn(
"Encountered error assembling chunked record, this can happen when seeking between chunked records. Skipping offset {} on topic {}",
recordOffset,
pubSubTopicPartition.getPubSubTopic().getName());
pubSubTopicPartition.getPubSubTopic().getName(),
ex);
}
} else {
// this is a fully specified record, no need to buffer and assemble it, just return the valueBytes
Expand All @@ -163,7 +215,16 @@ protected <T> T decompressAndDeserialize(
return deserializer.deserialize(compressor.decompress(value));
}

protected <T extends SpecificRecord> T decompressAndDeserialize(
AvroProtocolDefinition protocol,
VeniceCompressor compressor,
ByteBuffer value) throws IOException {
InternalAvroSpecificSerializer<T> deserializer = protocol.getSerializer();
return deserializer
.deserialize(ByteUtils.extractByteArray(compressor.decompress(value)), protocol.getCurrentProtocolVersion());
}

public void clearInMemoryDB() {
inMemoryStorageEngine.drop();
}
}
}
Loading

0 comments on commit 88d1d2d

Please sign in to comment.