Skip to content

Commit

Permalink
[da-vinci] DVC batch incremental push status across all partitions on…
Browse files Browse the repository at this point in the history
… the same node (#1365)

This feature is controlled by config:
davinci.push.status.check.interval.in.ms

When the config value is non-negative (default value is -1, disabled),
the feature is enabled, and DVC would batch both batch push status and
incremental push status across all partitions hosted on the node.

For each incremental push version on a DVC node, there would be at most
two status event messages:
START_OF_INCREMENTAL_PUSH_RECEIVED - at least one partition on the DVC
                                     node starts ingesting data for this
                                     incremental push version;
END_OF_INCREMENTAL_PUSH_RECEIVED   - all hosted partitions on the DVC node
                                     finish ingesting data for this
                                     incremental push version.

The batching feature for incremental pushes reuse the "DaVinciPushStatusUpdateTask"
for batch push status; "DaVinciPushStatusUpdateTask" is a bachground task which
wakes up regularly based on the above config value, and check what status events
it should send; besides, it would not shutdown itself anymore after batch push
completes, since incremental pushes could happen anytime.

New version level key for incremental push status: version + incremental_push_version.
  • Loading branch information
huangminchn authored Dec 10, 2024
1 parent 891c65a commit 6c7e13f
Show file tree
Hide file tree
Showing 8 changed files with 476 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -625,10 +625,10 @@ protected void reportPushStatus(
VersionBackend versionBackend = versionByTopicMap.get(kafkaTopic);
if (versionBackend != null && versionBackend.isReportingPushStatus()) {
Version version = versionBackend.getVersion();
if (writeBatchingPushStatus && !incrementalPushVersion.isPresent()) {
// Batching the push statuses from all partitions for batch pushes;
if (writeBatchingPushStatus) {
// Batching the push statuses from all partitions;
// VersionBackend will handle the push status update to Venice backend
versionBackend.updatePartitionStatus(partition, status);
versionBackend.updatePartitionStatus(partition, status, incrementalPushVersion);
} else {
pushStatusStoreWriter
.writePushStatus(version.getStoreName(), version.getNumber(), partition, status, incrementalPushVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,9 +523,9 @@ private List<Integer> getPartitions(ComplementSet<Integer> partitions) {
.collect(Collectors.toList());
}

public void updatePartitionStatus(int partition, ExecutionStatus status) {
public void updatePartitionStatus(int partition, ExecutionStatus status, Optional<String> incrementalPushVersion) {
if (daVinciPushStatusUpdateTask != null) {
daVinciPushStatusUpdateTask.updatePartitionStatus(partition, status);
daVinciPushStatusUpdateTask.updatePartitionStatus(partition, status, incrementalPushVersion);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,32 @@
import com.linkedin.venice.pushstatushelper.PushStatusStoreWriter;
import com.linkedin.venice.utils.DaemonThreadFactory;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;


/**
* This is a scheduler for sending batching push status in DaVinci.
*/
public class DaVinciPushStatusUpdateTask {
private static final Logger LOGGER = LogManager.getLogger(DaVinciPushStatusUpdateTask.class);
private final Version version;
private final PushStatusStoreWriter pushStatusStoreWriter;
private boolean batchPushStartSignalSent;
private boolean batchPushEndSignalSent;
private final long daVinciPushStatusCheckIntervalInMs;
private final Supplier<Boolean> areAllPartitionFuturesCompletedSuccessfully;
private final Map<Integer, ExecutionStatus> partitionStatus = new VeniceConcurrentHashMap<>();
// Executor for scheduling tasks
private final Map<Integer, ExecutionStatus> batchPushPartitionStatus = new VeniceConcurrentHashMap<>();
private final Map<String, IncrementalPushStatus> incrementalPushVersionToStatus = new VeniceConcurrentHashMap<>();
private final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(1, new DaemonThreadFactory("davinci-push-status-update-task-scheduler"));

Expand Down Expand Up @@ -57,75 +63,149 @@ public void batchPushEndSignalSent() {
this.batchPushEndSignalSent = true;
}

public void updatePartitionStatus(int partition, ExecutionStatus status) {
partitionStatus.put(partition, status);
public void updatePartitionStatus(int partition, ExecutionStatus status, Optional<String> incrementalPushVersion) {
if (incrementalPushVersion.isPresent()) {
updateIncrementalPushStatus(incrementalPushVersion.get(), partition, status);
} else {
batchPushPartitionStatus.put(partition, status);
}
}

private void updateIncrementalPushStatus(String incrementalPushVersion, int partition, ExecutionStatus status) {
IncrementalPushStatus incrementalPushStatus = incrementalPushVersionToStatus
.computeIfAbsent(incrementalPushVersion, k -> new IncrementalPushStatus(incrementalPushVersion));

if (incrementalPushStatus.endSignalSent) {
LOGGER.warn(
"Received status update for store version {} partition {} with status {} for incremental push version {} after terminal signal is sent; ignoring it",
version.kafkaTopicName(),
partition,
status,
incrementalPushVersion);
return;
}
incrementalPushStatus.partitionStatus.put(partition, status);
}

private void maybeSendBatchingStatus() {
// First, check if the batch push start signal has been sent, if not, send out STARTED status for this version
// immediately, regardless if there is any partition subscription happened (this is also needed for delayed version
// swap for DaVinci during target colo pushes)
// If STARTED is sent, and if COMPLETED is not sent, check if all partitions are in COMPLETED status. If so, send
// the COMPLETED status.
handleBatchPushStatus();
handleIncrementalPushStatus();
}

private void handleBatchPushStatus() {
if (!isBatchPushStartSignalSent()) {
pushStatusStoreWriter.writeVersionLevelPushStatus(
version.getStoreName(),
version.getNumber(),
ExecutionStatus.STARTED,
getTrackedPartitions());
sendPushStatus(ExecutionStatus.STARTED, Optional.empty());
batchPushStartSignalSent();
} else if (!isBatchPushEndSignalSent() && areAllPartitionsOnSameTerminalStatus(ExecutionStatus.COMPLETED)
} else if (!isBatchPushEndSignalSent()
&& areAllPartitionsOnSameTerminalStatus(ExecutionStatus.COMPLETED, Optional.empty())
&& areAllPartitionFuturesCompletedSuccessfully.get()) {
pushStatusStoreWriter.writeVersionLevelPushStatus(
version.getStoreName(),
version.getNumber(),
ExecutionStatus.COMPLETED,
getTrackedPartitions());
sendPushStatus(ExecutionStatus.COMPLETED, Optional.empty());
batchPushEndSignalSent();
// Shutdown the scheduler after sending the final status
shutdown();
} else if (!isBatchPushEndSignalSent() && isAnyPartitionOnErrorStatus()) {
pushStatusStoreWriter.writeVersionLevelPushStatus(
version.getStoreName(),
version.getNumber(),
ExecutionStatus.ERROR,
getTrackedPartitions());
batchPushPartitionStatus.clear();
} else if (!isBatchPushEndSignalSent() && isAnyPartitionOnErrorStatus(Optional.empty())) {
sendPushStatus(ExecutionStatus.ERROR, Optional.empty());
batchPushEndSignalSent();
// Shutdown the scheduler after sending the final status
shutdown();
batchPushPartitionStatus.clear();
}
}

/**
* Get the partition id set that is being tracked
*/
public Set<Integer> getTrackedPartitions() {
return partitionStatus.keySet();
private void handleIncrementalPushStatus() {
incrementalPushVersionToStatus.forEach((incrementalPushVersion, incrementalPushStatus) -> {
if (!incrementalPushStatus.startSignalSent) {
sendPushStatus(ExecutionStatus.START_OF_INCREMENTAL_PUSH_RECEIVED, Optional.of(incrementalPushVersion));
incrementalPushStatus.startSignalSent = true;
} else if (!incrementalPushStatus.endSignalSent && areAllPartitionsOnSameTerminalStatus(
ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED,
Optional.of(incrementalPushVersion))) {
sendPushStatus(ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED, Optional.of(incrementalPushVersion));
incrementalPushStatus.endSignalSent = true;
// Clean up the status map for incremental push versions in case there are too many of them and cause OOM
incrementalPushStatus.partitionStatus.clear();
}
});
}

public boolean areAllPartitionsOnSameTerminalStatus(ExecutionStatus status) {
if (partitionStatus.isEmpty()) {
return false;
private void sendPushStatus(ExecutionStatus status, Optional<String> incrementalPushVersion) {
pushStatusStoreWriter.writeVersionLevelPushStatus(
version.getStoreName(),
version.getNumber(),
status,
getTrackedPartitions(incrementalPushVersion),
incrementalPushVersion);
}

public Set<Integer> getTrackedPartitions(Optional<String> incrementalPushVersion) {
if (incrementalPushVersion.isPresent()) {
IncrementalPushStatus incrementalPushStatus = incrementalPushVersionToStatus.get(incrementalPushVersion.get());
return (incrementalPushStatus == null) ? Collections.emptySet() : incrementalPushStatus.partitionStatus.keySet();
} else {
return batchPushPartitionStatus.keySet();
}
return partitionStatus.values().stream().allMatch(status::equals);
}

public boolean isAnyPartitionOnErrorStatus() {
public boolean areAllPartitionsOnSameTerminalStatus(ExecutionStatus status, Optional<String> incrementalPushVersion) {
return arePartitionsOnTargetStatus(status, incrementalPushVersion, true);
}

public boolean isAnyPartitionOnErrorStatus(Optional<String> incrementalPushVersion) {
return arePartitionsOnTargetStatus(ExecutionStatus.ERROR, incrementalPushVersion, false);
}

private boolean arePartitionsOnTargetStatus(
ExecutionStatus status,
Optional<String> incrementalPushVersion,
boolean allMatch) {
Map<Integer, ExecutionStatus> partitionStatus = getPartitionStatus(incrementalPushVersion);
if (partitionStatus.isEmpty()) {
return false;
}
return partitionStatus.values().stream().anyMatch(ExecutionStatus.ERROR::equals);
return allMatch
? partitionStatus.values().stream().allMatch(status::equals)
: partitionStatus.values().stream().anyMatch(status::equals);
}

private Map<Integer, ExecutionStatus> getPartitionStatus(Optional<String> incrementalPushVersion) {
if (incrementalPushVersion.isPresent()) {
IncrementalPushStatus incrementalPushStatus = incrementalPushVersionToStatus.get(incrementalPushVersion.get());
if (incrementalPushStatus == null) {
return Collections.EMPTY_MAP;
}
return incrementalPushStatus.partitionStatus;
} else {
return batchPushPartitionStatus;
}
}

public void start() {
// Schedule a task to check current status of all hosted partitions periodically
scheduler.scheduleAtFixedRate(() -> {
maybeSendBatchingStatus();
}, 0, daVinciPushStatusCheckIntervalInMs, TimeUnit.MILLISECONDS);
scheduler.scheduleAtFixedRate(
this::maybeSendBatchingStatus,
0,
daVinciPushStatusCheckIntervalInMs,
TimeUnit.MILLISECONDS);
}

// Shutdown the scheduler gracefully
public void shutdown() {
scheduler.shutdown();
}

private static class IncrementalPushStatus {
private final String incrementalPushVersion;
private boolean startSignalSent;
private boolean endSignalSent;
private Map<Integer, ExecutionStatus> partitionStatus;

public IncrementalPushStatus(String incrementalPushVersion) {
this.incrementalPushVersion = incrementalPushVersion;
this.partitionStatus = new ConcurrentHashMap<>();
this.startSignalSent = false;
this.endSignalSent = false;
}

@Override
public String toString() {
return "IncrementalPushStatus{" + "incrementalPushVersion='" + incrementalPushVersion + '\''
+ ", startSignalSent=" + startSignalSent + ", endSignalSent=" + endSignalSent + ", partitionStatus="
+ partitionStatus + '}';
}
}
}
Loading

0 comments on commit 6c7e13f

Please sign in to comment.