Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[controller] Fix mismatch between hybrid version partition count and real-time partition count #1338

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,7 @@ private boolean canSwitchToLeaderTopic(PartitionConsumptionState pcs) {
private boolean isLocalVersionTopicPartitionFullyConsumed(PartitionConsumptionState pcs) {
long localVTOff = pcs.getLatestProcessedLocalVersionTopicOffset();
long localVTEndOffset = getTopicPartitionEndOffSet(localKafkaServer, versionTopic, pcs.getPartition());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect there is a bug here in this function. I need to think through

if (localVTEndOffset == StatsErrorCode.LAG_MEASUREMENT_FAILURE.code) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS;
import static com.linkedin.venice.LogMessages.KILLED_JOB_MESSAGE;
import static com.linkedin.venice.kafka.protocol.enums.ControlMessageType.START_OF_SEGMENT;
import static com.linkedin.venice.pubsub.PubSubConstants.UNKNOWN_LATEST_OFFSET;
import static com.linkedin.venice.utils.Utils.FATAL_DATA_VALIDATION_ERROR;
import static com.linkedin.venice.utils.Utils.getReplicaId;
import static java.util.concurrent.TimeUnit.HOURS;
Expand Down Expand Up @@ -2295,9 +2296,13 @@ protected long getTopicPartitionEndOffSet(String kafkaUrl, PubSubTopic pubSubTop
}
try {
return RetryUtils.executeWithMaxAttemptAndExponentialBackoff(() -> {
LOGGER.info(
"Topic: {} exists: {}",
pubSubTopic,
getTopicManager(kafkaUrl).containsTopicAndAllPartitionsAreOnline(pubSubTopic, partition));
long offset = getTopicManager(kafkaUrl).getLatestOffsetCachedNonBlocking(pubSubTopic, partition);
if (offset == -1) {
throw new VeniceException("Found latest offset -1");
if (offset == UNKNOWN_LATEST_OFFSET) {
throw new VeniceException("Latest offset is unknown. Check if the topic exists.");
}
return offset;
},
Expand Down Expand Up @@ -3045,6 +3050,13 @@ private boolean processControlMessage(
processEndOfIncrementalPush(controlMessage, partitionConsumptionState);
break;
case TOPIC_SWITCH:
TopicSwitch topicSwitch = (TopicSwitch) controlMessage.controlMessageUnion;
LOGGER.info(
"Received {} control message. Replica: {}, Offset: {} NewSource: {}",
type.name(),
partitionConsumptionState.getReplicaId(),
offset,
topicSwitch.getSourceKafkaServers());
checkReadyToServeAfterProcess =
processTopicSwitch(controlMessage, partition, offset, partitionConsumptionState);
break;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package com.linkedin.venice.controllerapi;

import com.linkedin.venice.meta.Version.PushType;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;


public class RequestTopicForPushRequest {
private final String clusterName;
private final String storeName;
private final PushType pushType;
private final String pushJobId;

private boolean sendStartOfPush = false;
private boolean sorted = false; // an inefficient but safe default
private boolean isWriteComputeEnabled = false;
private long rewindTimeInSecondsOverride = -1L;
private boolean deferVersionSwap = false;
private String targetedRegions = null;
private int repushSourceVersion = -1;
private Set<String> partitioners = Collections.emptySet();
private String compressionDictionary = null;
private X509Certificate certificateInRequest = null;
private String sourceGridFabric = null;
private String emergencySourceRegion = null;

public RequestTopicForPushRequest(String clusterName, String storeName, PushType pushType, String pushJobId) {
if (clusterName == null || clusterName.isEmpty()) {
throw new IllegalArgumentException("clusterName is required");
}
if (storeName == null || storeName.isEmpty()) {
throw new IllegalArgumentException("storeName is required");
}
if (pushType == null) {
throw new IllegalArgumentException("pushType is required");
}

if (pushJobId == null || pushJobId.isEmpty()) {
throw new IllegalArgumentException("pushJobId is required");
}

this.clusterName = clusterName;
this.storeName = storeName;
this.pushType = pushType;
this.pushJobId = pushJobId;
}

public static PushType extractPushType(String pushTypeString) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the PushType enum itself, we already have an valueOf (int), would it be better to add this method there to have all the valueOf in one place?

And this method is simply catch an illegalArgumentException and throw a new illegalArgumentException with a more informative error message. Catching exception is expensive. So maybe inside the PushType enum, you can pre-build a static map of pushType string, if the inputString does not match any, you can throw an illegalArgumentException.

Just my two-cents.

try {
return PushType.valueOf(pushTypeString);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
pushTypeString + " is an invalid push type. Valid push types are: " + Arrays.toString(PushType.values()));
}
}

public String getClusterName() {
return clusterName;
}

public String getStoreName() {
return storeName;
}

public PushType getPushType() {
return pushType;
}

public String getPushJobId() {
return pushJobId;
}

public boolean isSendStartOfPush() {
return sendStartOfPush;
}

public boolean isSorted() {
return sorted;
}

public boolean isWriteComputeEnabled() {
return isWriteComputeEnabled;
}

public String getSourceGridFabric() {
return sourceGridFabric;
}

public long getRewindTimeInSecondsOverride() {
return rewindTimeInSecondsOverride;
}

public boolean isDeferVersionSwap() {
return deferVersionSwap;
}

public String getTargetedRegions() {
return targetedRegions;
}

public int getRepushSourceVersion() {
return repushSourceVersion;
}

public Set<String> getPartitioners() {
return partitioners;
}

public String getCompressionDictionary() {
return compressionDictionary;
}

public X509Certificate getCertificateInRequest() {
return certificateInRequest;
}

public String getEmergencySourceRegion() {
return emergencySourceRegion;
}

public void setSendStartOfPush(boolean sendStartOfPush) {
this.sendStartOfPush = sendStartOfPush;
}

public void setSorted(boolean sorted) {
this.sorted = sorted;
}

public void setWriteComputeEnabled(boolean writeComputeEnabled) {
isWriteComputeEnabled = writeComputeEnabled;
}

public void setSourceGridFabric(String sourceGridFabric) {
this.sourceGridFabric = sourceGridFabric;
}

public void setRewindTimeInSecondsOverride(long rewindTimeInSecondsOverride) {
this.rewindTimeInSecondsOverride = rewindTimeInSecondsOverride;
}

public void setDeferVersionSwap(boolean deferVersionSwap) {
this.deferVersionSwap = deferVersionSwap;
}

public void setTargetedRegions(String targetedRegions) {
this.targetedRegions = targetedRegions;
}

public void setRepushSourceVersion(int repushSourceVersion) {
this.repushSourceVersion = repushSourceVersion;
}

public void setPartitioners(String commaSeparatedPartitioners) {
if (commaSeparatedPartitioners == null || commaSeparatedPartitioners.isEmpty()) {
return;
}
setPartitioners(new HashSet<>(Arrays.asList(commaSeparatedPartitioners.split(","))));
}

public void setPartitioners(Set<String> partitioners) {
this.partitioners = partitioners;
}

public void setCompressionDictionary(String compressionDictionary) {
this.compressionDictionary = compressionDictionary;
}

public void setCertificateInRequest(X509Certificate certificateInRequest) {
this.certificateInRequest = certificateInRequest;
}

public void setEmergencySourceRegion(String emergencySourceRegion) {
this.emergencySourceRegion = emergencySourceRegion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class PubSubConstants {
* Default value of sleep interval for polling topic deletion status from ZK.
*/
public static final int PUBSUB_TOPIC_DELETION_STATUS_POLL_INTERVAL_MS_DEFAULT_VALUE = 2 * Time.MS_PER_SECOND;
public static final long UNKNOWN_LATEST_OFFSET = -12345;

private static final Duration PUBSUB_OFFSET_API_TIMEOUT_DURATION_DEFAULT_VALUE_DEFAULT = Duration.ofMinutes(1);
private static Duration PUBSUB_OFFSET_API_TIMEOUT_DURATION_DEFAULT_VALUE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ long getLatestOffsetCachedNonBlocking(PubSubTopicPartition pubSubTopicPartition)
if (cachedValue == null) {
cachedValue = latestOffsetCache.get(pubSubTopicPartition);
if (cachedValue == null) {
return -1;
return PubSubConstants.UNKNOWN_LATEST_OFFSET;
}
}
return cachedValue.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,12 @@ public static long parseLongFromString(String value, String fieldName) {
* any string that are not equal to 'true', We validate the string by our own.
*/
public static boolean parseBooleanFromString(String value, String fieldName) {
if (value == null) {
throw new VeniceHttpException(
HttpStatus.SC_BAD_REQUEST,
fieldName + " must be a boolean, but value is null",
ErrorType.BAD_REQUEST);
}
if (value.equalsIgnoreCase("true") || value.equalsIgnoreCase("false")) {
return Boolean.parseBoolean(value);
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package com.linkedin.venice.controllerapi;

import static com.linkedin.venice.meta.Version.PushType.BATCH;
import static com.linkedin.venice.meta.Version.PushType.STREAM;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;


public class RequestTopicForPushRequestTest {
private RequestTopicForPushRequest request;

@BeforeMethod
public void setUp() {
request = new RequestTopicForPushRequest("clusterA", "storeA", BATCH, "job123");
}

@Test
public void testRequestTopicForPushRequestConstructorArgs() {
assertEquals(request.getClusterName(), "clusterA");
assertEquals(request.getStoreName(), "storeA");
assertEquals(request.getPushType(), BATCH);
assertEquals(request.getPushJobId(), "job123");

// Invalid clusterName
IllegalArgumentException ex1 = Assert.expectThrows(
IllegalArgumentException.class,
() -> new RequestTopicForPushRequest("", "storeA", BATCH, "job123"));
assertEquals(ex1.getMessage(), "clusterName is required");

// Invalid storeName
IllegalArgumentException ex2 = Assert.expectThrows(
IllegalArgumentException.class,
() -> new RequestTopicForPushRequest("clusterA", "", BATCH, "job123"));
assertEquals(ex2.getMessage(), "storeName is required");

// Null pushType
IllegalArgumentException ex3 = Assert.expectThrows(
IllegalArgumentException.class,
() -> new RequestTopicForPushRequest("clusterA", "storeA", null, "job123"));
assertEquals(ex3.getMessage(), "pushType is required");

// Invalid pushJobId
IllegalArgumentException ex4 = Assert.expectThrows(
IllegalArgumentException.class,
() -> new RequestTopicForPushRequest("clusterA", "storeA", BATCH, ""));
assertEquals(ex4.getMessage(), "pushJobId is required");
}

@Test
public void testExtractPushTypeValidAndInvalidValues() {
// Valid cases
assertEquals(RequestTopicForPushRequest.extractPushType("BATCH"), BATCH);
assertEquals(RequestTopicForPushRequest.extractPushType("STREAM"), STREAM);

// Invalid case
IllegalArgumentException ex = Assert
.expectThrows(IllegalArgumentException.class, () -> RequestTopicForPushRequest.extractPushType("INVALID"));
assertTrue(ex.getMessage().contains("INVALID is an invalid push type"));
}

@Test
public void testRequestTopicForPushRequestSettersAndGetters() {
request.setSendStartOfPush(true);
request.setSorted(true);
request.setWriteComputeEnabled(true);
request.setSourceGridFabric("fabricA");
request.setRewindTimeInSecondsOverride(3600);
request.setDeferVersionSwap(true);
request.setTargetedRegions("regionA,regionB");
request.setRepushSourceVersion(42);
request.setPartitioners("partitioner1,partitioner2");
request.setCompressionDictionary("compressionDict");
request.setEmergencySourceRegion("regionX");

assertTrue(request.isSendStartOfPush());
assertTrue(request.isSorted());
assertTrue(request.isWriteComputeEnabled());
assertEquals(request.getSourceGridFabric(), "fabricA");
assertEquals(request.getRewindTimeInSecondsOverride(), 3600);
assertTrue(request.isDeferVersionSwap());
assertEquals(request.getTargetedRegions(), "regionA,regionB");
assertEquals(request.getRepushSourceVersion(), 42);
assertEquals(request.getPartitioners(), new HashSet<>(Arrays.asList("partitioner1", "partitioner2")));
assertEquals(request.getCompressionDictionary(), "compressionDict");
assertEquals(request.getEmergencySourceRegion(), "regionX");
}

@Test
public void testSetPartitionersValidAndEmptyCases() {
// Valid partitioners
request.setPartitioners("partitioner1");
assertEquals(request.getPartitioners(), new HashSet<>(Collections.singletonList("partitioner1")));
request.setPartitioners("partitioner1,partitioner2");
assertEquals(request.getPartitioners(), new HashSet<>(Arrays.asList("partitioner1", "partitioner2")));

// Empty set
request.setPartitioners(Collections.emptySet());
assertEquals(request.getPartitioners(), Collections.emptySet());

// Null and empty string
request.setPartitioners((String) null);
assertEquals(request.getPartitioners(), Collections.emptySet());

request.setPartitioners("");
assertEquals(request.getPartitioners(), Collections.emptySet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.linkedin.venice.kafka.protocol.ProducerMetadata;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.pubsub.ImmutablePubSubMessage;
import com.linkedin.venice.pubsub.PubSubConstants;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicPartitionInfo;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
Expand Down Expand Up @@ -262,7 +263,7 @@ public void testGetTopicLatestOffsets() {
assertEquals(res.get(1), 222L);
assertEquals(
topicMetadataFetcher.getLatestOffsetCachedNonBlocking(new PubSubTopicPartitionImpl(pubSubTopic, 0)),
-1);
PubSubConstants.UNKNOWN_LATEST_OFFSET);

verify(consumerMock, times(3)).partitionsFor(pubSubTopic);
verify(consumerMock, times(1)).endOffsets(eq(offsetsMap.keySet()), any(Duration.class));
Expand Down
Loading
Loading