Skip to content

Commit

Permalink
addressed minor review comments. 🦓
Browse files Browse the repository at this point in the history
  • Loading branch information
KaiSernLim committed Feb 24, 2025
1 parent edd31be commit 2838c63
Show file tree
Hide file tree
Showing 5 changed files with 6 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@
import com.linkedin.davinci.store.record.ValueRecord;
import com.linkedin.davinci.store.rocksdb.RocksDBServerConfig;
import com.linkedin.davinci.transformer.TestStringRecordTransformer;
import com.linkedin.davinci.utils.ChunkAssembler;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.exceptions.MemoryLimitExhaustedException;
import com.linkedin.venice.exceptions.VeniceException;
Expand Down Expand Up @@ -367,7 +366,6 @@ public static Object[][] sortedInputAndAAConfigProvider() {
private AggVersionedDIVStats mockVersionedDIVStats;
private AggVersionedIngestionStats mockVersionedStorageIngestionStats;
private StoreIngestionTask storeIngestionTaskUnderTest;
private ChunkAssembler divChunkAssembler;
private ExecutorService taskPollingService;
private StoreBufferService storeBufferService;
private AggKafkaConsumerService aggKafkaConsumerService;
Expand Down Expand Up @@ -553,7 +551,6 @@ public void methodSetUp() throws Exception {
mockRemoteKafkaConsumer = mock(PubSubConsumerAdapter.class);
kafkaUrlToRecordsThrottler = new HashMap<>();
kafkaClusterBasedRecordThrottler = new KafkaClusterBasedRecordThrottler(kafkaUrlToRecordsThrottler);
divChunkAssembler = mock(ChunkAssembler.class);

mockTopicManager = mock(TopicManager.class);
mockTopicManagerRepository = mock(TopicManagerRepository.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
public enum MessageType implements VeniceEnumValue {
PUT(0, Constants.PUT_KEY_HEADER_BYTE), DELETE(1, Constants.PUT_KEY_HEADER_BYTE),
CONTROL_MESSAGE(2, Constants.CONTROL_MESSAGE_KEY_HEADER_BYTE), UPDATE(3, Constants.UPDATE_KEY_HEADER_BYTE),
// GLOBAL_RT_DIV is the same as PUT, but contains a DIV object rather than user data
GLOBAL_RT_DIV(4, Constants.GLOBAL_RT_DIV_KEY_HEADER_BYTE);

private static final List<MessageType> TYPES = EnumUtils.getEnumValuesList(MessageType.class);
Expand Down Expand Up @@ -64,7 +65,7 @@ public byte getKeyHeaderByte() {
public Object getNewInstance() {
switch (valueOf(value)) {
case PUT:
case GLOBAL_RT_DIV:
case GLOBAL_RT_DIV: // GLOBAL_RT_DIV is the same as PUT, but contains a DIV object rather than user data
return new Put();
case DELETE:
return new Delete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,8 @@ public synchronized boolean addToCheckSum(KafkaKey key, KafkaMessageEnvelope mes
+ controlMessage.getControlMessageType());
}
case PUT:
case GLOBAL_RT_DIV:
case GLOBAL_RT_DIV: // GLOBAL_RT_DIV is the same as PUT, but contains a DIV object rather than user data
// TODO: revisit to see if the GLOBAL_RT_DIV message is needed as part of the checksum
updateCheckSum(messageEnvelope.getMessageType());
updateCheckSum(key.getKey());
Put putPayload = (Put) messageEnvelope.getPayloadUnion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public static int getSize(KafkaMessageEnvelope kme) {
int size = KME_PARTIAL_CLASS_OVERHEAD;
switch (MessageType.valueOf(kme)) {
case PUT:
case GLOBAL_RT_DIV:
case GLOBAL_RT_DIV: // GLOBAL_RT_DIV is the same as PUT, but contains a DIV object rather than user data
size += getSize((Put) kme.payloadUnion);
break;
case DELETE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"fields": [
{
"name": "srcUrl",
"doc": "Upstream Kafka bootstrap server url.",
"doc": "Upstream broker bootstrap server url.",
"type": [
"null",
"string"
Expand Down

0 comments on commit 2838c63

Please sign in to comment.