Skip to content

Commit

Permalink
IGNITE-23906 Minor fixes in WatchProcessor (#4855)
Browse files Browse the repository at this point in the history
* `updateCompactionRevision` and `updateOnlyRevision` used to always trigger the Failure Handler;
* remove inefficient byte array manipulations
  • Loading branch information
sashapolo authored Dec 9, 2024
1 parent 225f19b commit 6dd178c
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
import static org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler.IDEMPOTENT_COMMAND_PREFIX_BYTES;
import static org.apache.ignite.internal.thread.ThreadOperation.NOTHING_ALLOWED;
import static org.apache.ignite.internal.util.CompletableFutures.emptyListCompletedFuture;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -183,13 +184,8 @@ public CompletableFuture<Void> notifyWatches(List<Entry> updatedEntries, HybridT
long newRevision = updatedEntries.get(0).revision();

List<Entry> filteredUpdatedEntries = updatedEntries.stream()
.filter(entry ->
entry.key().length <= IDEMPOTENT_COMMAND_PREFIX_BYTES.length
||
entry.key().length > IDEMPOTENT_COMMAND_PREFIX_BYTES.length
&& !ByteBuffer.wrap(entry.key(), 0, IDEMPOTENT_COMMAND_PREFIX_BYTES.length)
.equals(ByteBuffer.wrap(IDEMPOTENT_COMMAND_PREFIX_BYTES)))
.collect(Collectors.toList());
.filter(WatchProcessor::isNotIdempotentCacheCommand)
.collect(toList());

// Collect all the events for each watch.
CompletableFuture<List<WatchAndEvents>> watchesAndEventsFuture =
Expand All @@ -212,10 +208,6 @@ public CompletableFuture<Void> notifyWatches(List<Entry> updatedEntries, HybridT

notificationFuture.whenComplete((unused, e) -> {
maybeLogLongProcessing(filteredUpdatedEntries, startTimeNanos);

if (e != null) {
notifyFailureHandlerOnFirstFailureInNotificationChain(e);
}
});

return notificationFuture;
Expand Down Expand Up @@ -259,19 +251,17 @@ private CompletableFuture<Void> notifyWatches(
e = e.getCause();
}

if (!(e instanceof NodeStoppingException)) {
notifyFailureHandlerOnFirstFailureInNotificationChain(e);
}

watchAndEvents.watch.onError(e);

notifyFailureHandlerOnFirstFailureInNotificationChain(e);
}
});
} catch (Throwable throwable) {
watchAndEvents.watch.onError(throwable);

notifyWatchFuture = failedFuture(throwable);

notifyFailureHandlerOnFirstFailureInNotificationChain(throwable);

notifyWatchFuture = failedFuture(throwable);
}

notifyWatchFutures[i] = notifyWatchFuture;
Expand Down Expand Up @@ -360,6 +350,10 @@ public void advanceSafeTime(HybridTimestamp time) {
}

private void notifyFailureHandlerOnFirstFailureInNotificationChain(Throwable e) {
if (e instanceof NodeStoppingException) {
return;
}

if (firedFailureOnChain.compareAndSet(false, true)) {
LOG.info("Notification chain encountered an error, so no notifications will be ever fired for subsequent revisions "
+ "until a restart. Notifying the FailureManager");
Expand Down Expand Up @@ -426,7 +420,11 @@ void updateCompactionRevision(long compactionRevision, HybridTimestamp time) {

watchEventHandlingCallback.onSafeTimeAdvanced(time);
}, watchExecutor)
.whenComplete((ignored, e) -> notifyFailureHandlerOnFirstFailureInNotificationChain(e));
.whenComplete((ignored, e) -> {
if (e != null) {
notifyFailureHandlerOnFirstFailureInNotificationChain(e);
}
});
}

/**
Expand All @@ -440,6 +438,24 @@ void updateOnlyRevision(long newRevision, HybridTimestamp time) {
notificationFuture = notificationFuture
.thenComposeAsync(unused -> notifyUpdateRevisionListeners(newRevision), watchExecutor)
.thenRunAsync(() -> invokeOnRevisionCallback(newRevision, time), watchExecutor)
.whenComplete((ignored, e) -> notifyFailureHandlerOnFirstFailureInNotificationChain(e));
.whenComplete((ignored, e) -> {
if (e != null) {
notifyFailureHandlerOnFirstFailureInNotificationChain(e);
}
});
}

private static boolean isNotIdempotentCacheCommand(Entry entry) {
int prefixLength = IDEMPOTENT_COMMAND_PREFIX_BYTES.length;

//noinspection SimplifiableIfStatement
if (entry.key().length <= prefixLength) {
return true;
}

return !Arrays.equals(
entry.key(), 0, prefixLength,
IDEMPOTENT_COMMAND_PREFIX_BYTES, 0, prefixLength
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.ignite.internal.util.ByteUtils.byteToBoolean;
import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
import static org.apache.ignite.internal.util.ByteUtils.toByteArrayList;
import static org.apache.ignite.internal.util.StringUtils.toStringWithoutPrefix;

import java.io.Serializable;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -70,7 +71,6 @@
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -375,8 +375,9 @@ void onSnapshotLoad() {
try (Cursor<Entry> cursor = storage.range(keyFrom, keyTo)) {
for (Entry entry : cursor) {
if (!entry.tombstone()) {
CommandId commandId = CommandId.fromString(
ByteUtils.stringFromBytes(entry.key()).substring(IDEMPOTENT_COMMAND_PREFIX.length()));
String commandIdString = toStringWithoutPrefix(entry.key(), IDEMPOTENT_COMMAND_PREFIX_BYTES.length);

CommandId commandId = CommandId.fromString(commandIdString);

Serializable result;
if (entry.value().length == 1) {
Expand Down

0 comments on commit 6dd178c

Please sign in to comment.