Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ReplicationFailedException instead of OpensearchException in Repl… #4725

Merged
merged 9 commits into from
Oct 13, 2022
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Update to Apache Lucene 9.4.0 ([#4661](https://github.com/opensearch-project/OpenSearch/pull/4661))
- Controlling discovery for decommissioned nodes ([#4590](https://github.com/opensearch-project/OpenSearch/pull/4590))
- Backport Apache Lucene version change for 2.4.0 ([#4677](https://github.com/opensearch-project/OpenSearch/pull/4677))
- Use ReplicationFailedException instead of OpensearchException in ReplicationTarget ([#4725](https://github.com/opensearch-project/OpenSearch/pull/4725))
- Refactor Base Action class javadocs to OpenSearch.API ([#4732](https://github.com/opensearch-project/OpenSearch/pull/4732))

### Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@

package org.opensearch.indices.recovery;

import org.opensearch.OpenSearchException;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.Nullable;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.replication.common.ReplicationFailedException;

import java.io.IOException;

Expand All @@ -45,7 +45,7 @@
*
* @opensearch.internal
*/
public class RecoveryFailedException extends OpenSearchException {
public class RecoveryFailedException extends ReplicationFailedException {

public RecoveryFailedException(StartRecoveryRequest request, Throwable cause) {
this(request, null, cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

package org.opensearch.indices.recovery;

import org.opensearch.OpenSearchException;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.indices.cluster.IndicesClusterStateService;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationListener;
import org.opensearch.indices.replication.common.ReplicationState;

Expand Down Expand Up @@ -49,7 +49,7 @@ public void onDone(ReplicationState state) {
}

@Override
public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) {
public void onFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
indicesClusterStateService.handleRecoveryFailure(shardRouting, sendShardFailure, e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.lucene.index.IndexFormatTooOldException;
import org.opensearch.Assertions;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.cluster.node.DiscoveryNode;
Expand All @@ -56,10 +55,11 @@
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.replication.common.ReplicationCollection;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationListener;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.indices.replication.common.ReplicationTarget;
import org.opensearch.indices.replication.common.ReplicationListener;
import org.opensearch.indices.replication.common.ReplicationCollection;

import java.io.IOException;
import java.nio.channels.FileChannel;
Expand Down Expand Up @@ -135,7 +135,7 @@ public String description() {
}

@Override
public void notifyListener(OpenSearchException e, boolean sendShardFailure) {
public void notifyListener(ReplicationFailedException e, boolean sendShardFailure) {
listener.onFailure(state(), new RecoveryFailedException(state(), e.getMessage(), e), sendShardFailure);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.apache.lucene.store.ByteBuffersIndexInput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
import org.opensearch.common.UUIDs;
Expand Down Expand Up @@ -105,16 +104,14 @@ public String description() {
}

@Override
public void notifyListener(OpenSearchException e, boolean sendShardFailure) {
public void notifyListener(ReplicationFailedException e, boolean sendShardFailure) {
// Cancellations still are passed to our SegmentReplicationListner as failures, if we have failed because of cancellation
// update the stage.
final Throwable cancelledException = ExceptionsHelper.unwrap(e, CancellableThreads.ExecutionCancelledException.class);
if (cancelledException != null) {
state.setStage(SegmentReplicationState.Stage.CANCELLED);
listener.onFailure(state(), (CancellableThreads.ExecutionCancelledException) cancelledException, sendShardFailure);
} else {
listener.onFailure(state(), e, sendShardFailure);
}
listener.onFailure(state(), e, sendShardFailure);
}

@Override
Expand Down Expand Up @@ -150,7 +147,7 @@ public void startReplication(ActionListener<Void> listener) {
// SegmentReplicationSource does not share CancellableThreads.
final CancellableThreads.ExecutionCancelledException executionCancelledException =
new CancellableThreads.ExecutionCancelledException("replication was canceled reason [" + reason + "]");
notifyListener(executionCancelledException, false);
notifyListener(new ReplicationFailedException("Segment replication failed", executionCancelledException), false);
throw executionCancelledException;
});
state.setStage(SegmentReplicationState.Stage.REPLICATING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.Nullable;
Expand All @@ -27,6 +26,7 @@
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationCollection;
import org.opensearch.indices.replication.common.ReplicationCollection.ReplicationRef;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationListener;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.tasks.Task;
Expand Down Expand Up @@ -196,7 +196,7 @@ public void onReplicationDone(SegmentReplicationState state) {
}

@Override
public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) {
public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication failed, timing data: {}",
Expand Down Expand Up @@ -249,13 +249,13 @@ default void onDone(ReplicationState state) {
}

@Override
default void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) {
default void onFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
onReplicationFailure((SegmentReplicationState) state, e, sendShardFailure);
}

void onReplicationDone(SegmentReplicationState state);

void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure);
void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure);
}

/**
Expand Down Expand Up @@ -293,13 +293,14 @@ public void onFailure(Exception e) {
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof CancellableThreads.ExecutionCancelledException) {
if (onGoingReplications.getTarget(replicationId) != null) {
IndexShard indexShard = onGoingReplications.getTarget(replicationId).indexShard();
// if the target still exists in our collection, the primary initiated the cancellation, fail the replication
// but do not fail the shard. Cancellations initiated by this node from Index events will be removed with
// onGoingReplications.cancel and not appear in the collection when this listener resolves.
onGoingReplications.fail(replicationId, (CancellableThreads.ExecutionCancelledException) cause, false);
onGoingReplications.fail(replicationId, new ReplicationFailedException(indexShard, cause), false);
}
} else {
onGoingReplications.fail(replicationId, new OpenSearchException("Segment Replication failed", e), true);
onGoingReplications.fail(replicationId, new ReplicationFailedException("Segment Replication failed", e), true);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.common.concurrent.AutoCloseableRefCounted;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.AbstractRunnable;
Expand Down Expand Up @@ -134,7 +132,7 @@ public T reset(final long id, final TimeValue activityTimeout) {
} catch (Exception e) {
// fail shard to be safe
assert oldTarget != null;
oldTarget.notifyListener(new OpenSearchException("Unable to reset target", e), true);
oldTarget.notifyListener(new ReplicationFailedException("Unable to reset target", e), true);
return null;
}
}
Expand Down Expand Up @@ -187,7 +185,7 @@ public boolean cancel(long id, String reason) {
* @param e exception with reason for the failure
* @param sendShardFailure true a shard failed message should be sent to the master
*/
public void fail(long id, OpenSearchException e, boolean sendShardFailure) {
public void fail(long id, ReplicationFailedException e, boolean sendShardFailure) {
T removed = onGoingTargetEvents.remove(id);
if (removed != null) {
logger.trace("failing {}. Send shard failure: [{}]", removed.description(), sendShardFailure);
Expand Down Expand Up @@ -299,7 +297,7 @@ protected void doRun() throws Exception {
String message = "no activity after [" + checkInterval + "]";
fail(
id,
new OpenSearchTimeoutException(message),
new ReplicationFailedException(message),
true // to be safe, we don't know what go stuck
);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,16 @@ public ReplicationFailedException(ShardId shardId, @Nullable String extraInfo, T
public ReplicationFailedException(StreamInput in) throws IOException {
super(in);
}

public ReplicationFailedException(Exception e) {
super(e);
}

public ReplicationFailedException(String msg) {
super(msg);
}

public ReplicationFailedException(String msg, Throwable cause) {
super(msg, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

package org.opensearch.indices.replication.common;

import org.opensearch.OpenSearchException;

/**
* Interface for listeners that run when there's a change in {@link ReplicationState}
*
Expand All @@ -19,5 +17,5 @@ public interface ReplicationListener {

void onDone(ReplicationState state);

void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure);
void onFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.RateLimiter;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.ChannelActionListener;
import org.opensearch.common.CheckedFunction;
Expand Down Expand Up @@ -78,7 +77,7 @@ public CancellableThreads cancellableThreads() {
return cancellableThreads;
}

public abstract void notifyListener(OpenSearchException e, boolean sendShardFailure);
public abstract void notifyListener(ReplicationFailedException e, boolean sendShardFailure);

public ReplicationTarget(String name, IndexShard indexShard, ReplicationLuceneIndex stateIndex, ReplicationListener listener) {
super(name);
Expand Down Expand Up @@ -170,7 +169,7 @@ public void cancel(String reason) {
* @param e exception that encapsulates the failure
* @param sendShardFailure indicates whether to notify the master of the shard failure
*/
public void fail(OpenSearchException e, boolean sendShardFailure) {
public void fail(ReplicationFailedException e, boolean sendShardFailure) {
if (finished.compareAndSet(false, true)) {
try {
notifyListener(e, sendShardFailure);
Expand All @@ -187,7 +186,7 @@ public void fail(OpenSearchException e, boolean sendShardFailure) {

protected void ensureRefCount() {
if (refCount() <= 0) {
throw new OpenSearchException(
throw new ReplicationFailedException(
"ReplicationTarget is used but it's refcount is 0. Probably a mismatch between incRef/decRef calls"
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.SegmentInfos;
import org.junit.Assert;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.index.IndexRequest;
Expand Down Expand Up @@ -44,6 +43,7 @@
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.CopyState;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -670,8 +670,7 @@ public void onReplicationDone(SegmentReplicationState state) {
}

@Override
public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) {
assertTrue(e instanceof CancellableThreads.ExecutionCancelledException);
public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
assertFalse(sendShardFailure);
assertEquals(SegmentReplicationState.Stage.CANCELLED, state.getStage());
latch.countDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.action.bulk.BulkShardRequest;
Expand Down Expand Up @@ -70,6 +69,7 @@
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.SnapshotMatchers;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationListener;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.indices.replication.common.ReplicationType;
Expand Down Expand Up @@ -471,7 +471,7 @@ public void onDone(ReplicationState state) {
}

@Override
public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) {
public void onFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
assertThat(ExceptionsHelper.unwrap(e, IOException.class).getMessage(), equalTo("simulated"));
}
}))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationType;

import java.io.IOException;
Expand Down Expand Up @@ -104,7 +105,7 @@ public void onReplicationDone(SegmentReplicationState state) {
}

@Override
public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) {
public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
logger.error("Unexpected error", e);
Assert.fail("Test should succeed");
}
Expand Down Expand Up @@ -149,7 +150,7 @@ public void onReplicationDone(SegmentReplicationState state) {
}

@Override
public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) {
public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
// failures leave state object in last entered stage.
assertEquals(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO, state.getStage());
assertEquals(expectedError, e.getCause());
Expand Down
Loading