From 2838c63f496a4731fb66733c4f772c9f1b248cc1 Mon Sep 17 00:00:00 2001 From: Kai-Sern Lim Date: Mon, 24 Feb 2025 12:46:32 -0800 Subject: [PATCH] =?UTF-8?q?addressed=20minor=20review=20comments.=20?= =?UTF-8?q?=F0=9F=A6=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../davinci/kafka/consumer/StoreIngestionTaskTest.java | 3 --- .../com/linkedin/venice/kafka/protocol/enums/MessageType.java | 3 ++- .../java/com/linkedin/venice/kafka/validation/Segment.java | 3 ++- .../java/com/linkedin/venice/memory/InstanceSizeEstimator.java | 2 +- .../resources/avro/GlobalRtDivState/v1/GlobalRtDivState.avsc | 2 +- 5 files changed, 6 insertions(+), 7 deletions(-) diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index 7e406ccd09c..55d11ff1a1c 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -106,7 +106,6 @@ import com.linkedin.davinci.store.record.ValueRecord; import com.linkedin.davinci.store.rocksdb.RocksDBServerConfig; import com.linkedin.davinci.transformer.TestStringRecordTransformer; -import com.linkedin.davinci.utils.ChunkAssembler; import com.linkedin.venice.compression.CompressionStrategy; import com.linkedin.venice.exceptions.MemoryLimitExhaustedException; import com.linkedin.venice.exceptions.VeniceException; @@ -367,7 +366,6 @@ public static Object[][] sortedInputAndAAConfigProvider() { private AggVersionedDIVStats mockVersionedDIVStats; private AggVersionedIngestionStats mockVersionedStorageIngestionStats; private StoreIngestionTask storeIngestionTaskUnderTest; - private ChunkAssembler divChunkAssembler; private ExecutorService taskPollingService; private StoreBufferService storeBufferService; private AggKafkaConsumerService aggKafkaConsumerService; @@ -553,7 +551,6 @@ public void methodSetUp() throws Exception { mockRemoteKafkaConsumer = mock(PubSubConsumerAdapter.class); kafkaUrlToRecordsThrottler = new HashMap<>(); kafkaClusterBasedRecordThrottler = new KafkaClusterBasedRecordThrottler(kafkaUrlToRecordsThrottler); - divChunkAssembler = mock(ChunkAssembler.class); mockTopicManager = mock(TopicManager.class); mockTopicManagerRepository = mock(TopicManagerRepository.class); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/kafka/protocol/enums/MessageType.java b/internal/venice-common/src/main/java/com/linkedin/venice/kafka/protocol/enums/MessageType.java index 3097f10a81a..b3fe08e7317 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/kafka/protocol/enums/MessageType.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/kafka/protocol/enums/MessageType.java @@ -22,6 +22,7 @@ public enum MessageType implements VeniceEnumValue { PUT(0, Constants.PUT_KEY_HEADER_BYTE), DELETE(1, Constants.PUT_KEY_HEADER_BYTE), CONTROL_MESSAGE(2, Constants.CONTROL_MESSAGE_KEY_HEADER_BYTE), UPDATE(3, Constants.UPDATE_KEY_HEADER_BYTE), + // GLOBAL_RT_DIV is the same as PUT, but contains a DIV object rather than user data GLOBAL_RT_DIV(4, Constants.GLOBAL_RT_DIV_KEY_HEADER_BYTE); private static final List TYPES = EnumUtils.getEnumValuesList(MessageType.class); @@ -64,7 +65,7 @@ public byte getKeyHeaderByte() { public Object getNewInstance() { switch (valueOf(value)) { case PUT: - case GLOBAL_RT_DIV: + case GLOBAL_RT_DIV: // GLOBAL_RT_DIV is the same as PUT, but contains a DIV object rather than user data return new Put(); case DELETE: return new Delete(); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/kafka/validation/Segment.java b/internal/venice-common/src/main/java/com/linkedin/venice/kafka/validation/Segment.java index 911d721de7b..bd363acbf8a 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/kafka/validation/Segment.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/kafka/validation/Segment.java @@ -291,7 +291,8 @@ public synchronized boolean addToCheckSum(KafkaKey key, KafkaMessageEnvelope mes + controlMessage.getControlMessageType()); } case PUT: - case GLOBAL_RT_DIV: + case GLOBAL_RT_DIV: // GLOBAL_RT_DIV is the same as PUT, but contains a DIV object rather than user data + // TODO: revisit to see if the GLOBAL_RT_DIV message is needed as part of the checksum updateCheckSum(messageEnvelope.getMessageType()); updateCheckSum(key.getKey()); Put putPayload = (Put) messageEnvelope.getPayloadUnion(); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/memory/InstanceSizeEstimator.java b/internal/venice-common/src/main/java/com/linkedin/venice/memory/InstanceSizeEstimator.java index 2e5fd10fb44..43aa7c03faa 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/memory/InstanceSizeEstimator.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/memory/InstanceSizeEstimator.java @@ -159,7 +159,7 @@ public static int getSize(KafkaMessageEnvelope kme) { int size = KME_PARTIAL_CLASS_OVERHEAD; switch (MessageType.valueOf(kme)) { case PUT: - case GLOBAL_RT_DIV: + case GLOBAL_RT_DIV: // GLOBAL_RT_DIV is the same as PUT, but contains a DIV object rather than user data size += getSize((Put) kme.payloadUnion); break; case DELETE: diff --git a/internal/venice-common/src/main/resources/avro/GlobalRtDivState/v1/GlobalRtDivState.avsc b/internal/venice-common/src/main/resources/avro/GlobalRtDivState/v1/GlobalRtDivState.avsc index 1b8a7fa4ef3..c6c39286f23 100644 --- a/internal/venice-common/src/main/resources/avro/GlobalRtDivState/v1/GlobalRtDivState.avsc +++ b/internal/venice-common/src/main/resources/avro/GlobalRtDivState/v1/GlobalRtDivState.avsc @@ -6,7 +6,7 @@ "fields": [ { "name": "srcUrl", - "doc": "Upstream Kafka bootstrap server url.", + "doc": "Upstream broker bootstrap server url.", "type": [ "null", "string"