From 325d8a8e1681c0e525a1e0bde485f8ba94918b58 Mon Sep 17 00:00:00 2001 From: Ayush Kataria <31301636+ayushKataria@users.noreply.github.com> Date: Fri, 14 Oct 2022 04:30:50 +0530 Subject: [PATCH] =?UTF-8?q?Use=20ReplicationFailedException=20instead=20of?= =?UTF-8?q?=20OpensearchException=20in=20Repl=E2=80=A6=20(#4725)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Use ReplicationFailedException instead of OpensearchException in ReplicationTarget Signed-off-by: Ayush Kataria * CHANGELOG.md updated Signed-off-by: Ayush Kataria * test fixes Signed-off-by: Ayush Kataria * spotless fix Signed-off-by: Ayush Kataria * spotless fix Signed-off-by: Ayush Kataria * fixes for failing test as suggested in PR comments Signed-off-by: Ayush Kataria Signed-off-by: Ayush Kataria --- CHANGELOG.md | 1 + .../indices/recovery/RecoveryFailedException.java | 4 ++-- .../indices/recovery/RecoveryListener.java | 4 ++-- .../opensearch/indices/recovery/RecoveryTarget.java | 8 ++++---- .../replication/SegmentReplicationTarget.java | 9 +++------ .../SegmentReplicationTargetService.java | 13 +++++++------ .../replication/common/ReplicationCollection.java | 8 +++----- .../common/ReplicationFailedException.java | 12 ++++++++++++ .../replication/common/ReplicationListener.java | 4 +--- .../replication/common/ReplicationTarget.java | 7 +++---- .../shard/SegmentReplicationIndexShardTests.java | 5 ++--- .../opensearch/indices/recovery/RecoveryTests.java | 4 ++-- .../SegmentReplicationTargetServiceTests.java | 5 +++-- .../replication/SegmentReplicationTargetTests.java | 12 ++++++------ .../recovery/ReplicationCollectionTests.java | 5 +++-- .../opensearch/index/shard/IndexShardTestCase.java | 10 +++++++--- 16 files changed, 61 insertions(+), 50 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a92a6e5d63a19..b27403096b7ea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Dependencies - Update nebula-publishing-plugin to 19.2.0 ([#5704](https://github.com/opensearch-project/OpenSearch/pull/5704)) ### Changed +- Use ReplicationFailedException instead of OpensearchException in ReplicationTarget ([#4725](https://github.com/opensearch-project/OpenSearch/pull/4725)) ### Deprecated ### Removed ### Fixed diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryFailedException.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryFailedException.java index f2665e8bbb1a7..12393ab12c95d 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryFailedException.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryFailedException.java @@ -32,11 +32,11 @@ package org.opensearch.indices.recovery; -import org.opensearch.OpenSearchException; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.Nullable; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.replication.common.ReplicationFailedException; import java.io.IOException; @@ -45,7 +45,7 @@ * * @opensearch.internal */ -public class RecoveryFailedException extends OpenSearchException { +public class RecoveryFailedException extends ReplicationFailedException { public RecoveryFailedException(StartRecoveryRequest request, Throwable cause) { this(request, null, cause); diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryListener.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryListener.java index b93c054ffa4bf..c8c2fbfc896eb 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryListener.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryListener.java @@ -8,9 +8,9 @@ package org.opensearch.indices.recovery; -import org.opensearch.OpenSearchException; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.indices.cluster.IndicesClusterStateService; +import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationListener; import org.opensearch.indices.replication.common.ReplicationState; @@ -49,7 +49,7 @@ public void onDone(ReplicationState state) { } @Override - public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) { + public void onFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { indicesClusterStateService.handleRecoveryFailure(shardRouting, sendShardFailure, e); } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java index 62aad1ce263ad..f03cbb650fccf 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java @@ -37,7 +37,6 @@ import org.apache.lucene.index.IndexFormatTooOldException; import org.opensearch.Assertions; import org.opensearch.ExceptionsHelper; -import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.cluster.node.DiscoveryNode; @@ -56,10 +55,11 @@ import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.Translog; +import org.opensearch.indices.replication.common.ReplicationCollection; +import org.opensearch.indices.replication.common.ReplicationFailedException; +import org.opensearch.indices.replication.common.ReplicationListener; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.indices.replication.common.ReplicationTarget; -import org.opensearch.indices.replication.common.ReplicationListener; -import org.opensearch.indices.replication.common.ReplicationCollection; import java.io.IOException; import java.nio.channels.FileChannel; @@ -135,7 +135,7 @@ public String description() { } @Override - public void notifyListener(OpenSearchException e, boolean sendShardFailure) { + public void notifyListener(ReplicationFailedException e, boolean sendShardFailure) { listener.onFailure(state(), new RecoveryFailedException(state(), e.getMessage(), e), sendShardFailure); } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index babb4baffea50..aadcb577f6174 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -18,7 +18,6 @@ import org.apache.lucene.store.ByteBuffersIndexInput; import org.apache.lucene.store.ChecksumIndexInput; import org.opensearch.ExceptionsHelper; -import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.StepListener; import org.opensearch.common.UUIDs; @@ -105,16 +104,14 @@ public String description() { } @Override - public void notifyListener(OpenSearchException e, boolean sendShardFailure) { + public void notifyListener(ReplicationFailedException e, boolean sendShardFailure) { // Cancellations still are passed to our SegmentReplicationListner as failures, if we have failed because of cancellation // update the stage. final Throwable cancelledException = ExceptionsHelper.unwrap(e, CancellableThreads.ExecutionCancelledException.class); if (cancelledException != null) { state.setStage(SegmentReplicationState.Stage.CANCELLED); - listener.onFailure(state(), (CancellableThreads.ExecutionCancelledException) cancelledException, sendShardFailure); - } else { - listener.onFailure(state(), e, sendShardFailure); } + listener.onFailure(state(), e, sendShardFailure); } @Override @@ -150,7 +147,7 @@ public void startReplication(ActionListener listener) { // SegmentReplicationSource does not share CancellableThreads. final CancellableThreads.ExecutionCancelledException executionCancelledException = new CancellableThreads.ExecutionCancelledException("replication was canceled reason [" + reason + "]"); - notifyListener(executionCancelledException, false); + notifyListener(new ReplicationFailedException("Segment replication failed", executionCancelledException), false); throw executionCancelledException; }); state.setStage(SegmentReplicationState.Stage.REPLICATING); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 8fc53ccd3bc08..b633f0fa3b9a0 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -12,7 +12,6 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.ExceptionsHelper; -import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.Nullable; @@ -27,6 +26,7 @@ import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationCollection; import org.opensearch.indices.replication.common.ReplicationCollection.ReplicationRef; +import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationListener; import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.tasks.Task; @@ -196,7 +196,7 @@ public void onReplicationDone(SegmentReplicationState state) { } @Override - public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { + public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { logger.trace( () -> new ParameterizedMessage( "[shardId {}] [replication id {}] Replication failed, timing data: {}", @@ -249,13 +249,13 @@ default void onDone(ReplicationState state) { } @Override - default void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) { + default void onFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { onReplicationFailure((SegmentReplicationState) state, e, sendShardFailure); } void onReplicationDone(SegmentReplicationState state); - void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure); + void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure); } /** @@ -293,13 +293,14 @@ public void onFailure(Exception e) { Throwable cause = ExceptionsHelper.unwrapCause(e); if (cause instanceof CancellableThreads.ExecutionCancelledException) { if (onGoingReplications.getTarget(replicationId) != null) { + IndexShard indexShard = onGoingReplications.getTarget(replicationId).indexShard(); // if the target still exists in our collection, the primary initiated the cancellation, fail the replication // but do not fail the shard. Cancellations initiated by this node from Index events will be removed with // onGoingReplications.cancel and not appear in the collection when this listener resolves. - onGoingReplications.fail(replicationId, (CancellableThreads.ExecutionCancelledException) cause, false); + onGoingReplications.fail(replicationId, new ReplicationFailedException(indexShard, cause), false); } } else { - onGoingReplications.fail(replicationId, new OpenSearchException("Segment Replication failed", e), true); + onGoingReplications.fail(replicationId, new ReplicationFailedException("Segment Replication failed", e), true); } } }); diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java index 20600856c9444..e918ac0a79691 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java @@ -34,8 +34,6 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.opensearch.OpenSearchException; -import org.opensearch.OpenSearchTimeoutException; import org.opensearch.common.concurrent.AutoCloseableRefCounted; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.AbstractRunnable; @@ -134,7 +132,7 @@ public T reset(final long id, final TimeValue activityTimeout) { } catch (Exception e) { // fail shard to be safe assert oldTarget != null; - oldTarget.notifyListener(new OpenSearchException("Unable to reset target", e), true); + oldTarget.notifyListener(new ReplicationFailedException("Unable to reset target", e), true); return null; } } @@ -187,7 +185,7 @@ public boolean cancel(long id, String reason) { * @param e exception with reason for the failure * @param sendShardFailure true a shard failed message should be sent to the master */ - public void fail(long id, OpenSearchException e, boolean sendShardFailure) { + public void fail(long id, ReplicationFailedException e, boolean sendShardFailure) { T removed = onGoingTargetEvents.remove(id); if (removed != null) { logger.trace("failing {}. Send shard failure: [{}]", removed.description(), sendShardFailure); @@ -299,7 +297,7 @@ protected void doRun() throws Exception { String message = "no activity after [" + checkInterval + "]"; fail( id, - new OpenSearchTimeoutException(message), + new ReplicationFailedException(message), true // to be safe, we don't know what go stuck ); return; diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationFailedException.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationFailedException.java index afdd0ce466f9b..23ad4d0e096b5 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationFailedException.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationFailedException.java @@ -38,4 +38,16 @@ public ReplicationFailedException(ShardId shardId, @Nullable String extraInfo, T public ReplicationFailedException(StreamInput in) throws IOException { super(in); } + + public ReplicationFailedException(Exception e) { + super(e); + } + + public ReplicationFailedException(String msg) { + super(msg); + } + + public ReplicationFailedException(String msg, Throwable cause) { + super(msg, cause); + } } diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationListener.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationListener.java index 0666f475d496a..1d3c48d084f60 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationListener.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationListener.java @@ -8,8 +8,6 @@ package org.opensearch.indices.replication.common; -import org.opensearch.OpenSearchException; - /** * Interface for listeners that run when there's a change in {@link ReplicationState} * @@ -19,5 +17,5 @@ public interface ReplicationListener { void onDone(ReplicationState state); - void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure); + void onFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure); } diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java index 501ff46eeb2ff..44da7f0f065ae 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java @@ -11,7 +11,6 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.store.RateLimiter; import org.opensearch.ExceptionsHelper; -import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.support.ChannelActionListener; import org.opensearch.common.CheckedFunction; @@ -78,7 +77,7 @@ public CancellableThreads cancellableThreads() { return cancellableThreads; } - public abstract void notifyListener(OpenSearchException e, boolean sendShardFailure); + public abstract void notifyListener(ReplicationFailedException e, boolean sendShardFailure); public ReplicationTarget(String name, IndexShard indexShard, ReplicationLuceneIndex stateIndex, ReplicationListener listener) { super(name); @@ -170,7 +169,7 @@ public void cancel(String reason) { * @param e exception that encapsulates the failure * @param sendShardFailure indicates whether to notify the master of the shard failure */ - public void fail(OpenSearchException e, boolean sendShardFailure) { + public void fail(ReplicationFailedException e, boolean sendShardFailure) { if (finished.compareAndSet(false, true)) { try { notifyListener(e, sendShardFailure); @@ -187,7 +186,7 @@ public void fail(OpenSearchException e, boolean sendShardFailure) { protected void ensureRefCount() { if (refCount() <= 0) { - throw new OpenSearchException( + throw new ReplicationFailedException( "ReplicationTarget is used but it's refcount is 0. Probably a mismatch between incRef/decRef calls" ); } diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 8562e6de53332..ed1f5a9b06dc1 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -11,7 +11,6 @@ import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.SegmentInfos; import org.junit.Assert; -import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.delete.DeleteRequest; import org.opensearch.action.index.IndexRequest; @@ -45,6 +44,7 @@ import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.CopyState; +import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -790,8 +790,7 @@ public void onReplicationDone(SegmentReplicationState state) { } @Override - public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { - assertTrue(e instanceof CancellableThreads.ExecutionCancelledException); + public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { assertFalse(sendShardFailure); assertEquals(SegmentReplicationState.Stage.CANCELLED, state.getStage()); latch.countDown(); diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java index cc5100fba9010..3d04f808bc30c 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java @@ -41,7 +41,6 @@ import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.store.AlreadyClosedException; import org.opensearch.ExceptionsHelper; -import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.bulk.BulkShardRequest; @@ -70,6 +69,7 @@ import org.opensearch.index.store.Store; import org.opensearch.index.translog.SnapshotMatchers; import org.opensearch.index.translog.Translog; +import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationListener; import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.indices.replication.common.ReplicationType; @@ -471,7 +471,7 @@ public void onDone(ReplicationState state) { } @Override - public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) { + public void onFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { assertThat(ExceptionsHelper.unwrap(e, IOException.class).getMessage(), equalTo("simulated")); } })) diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 7437cb22e44d1..9a9c5b989dea9 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -22,6 +22,7 @@ import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationType; import java.io.IOException; @@ -104,7 +105,7 @@ public void onReplicationDone(SegmentReplicationState state) { } @Override - public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { + public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { logger.error("Unexpected error", e); Assert.fail("Test should succeed"); } @@ -149,7 +150,7 @@ public void onReplicationDone(SegmentReplicationState state) { } @Override - public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { + public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { // failures leave state object in last entered stage. assertEquals(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO, state.getStage()); assertEquals(expectedError, e.getCause()); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index f8341573770a6..002f745fabda2 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -27,7 +27,6 @@ import org.junit.Assert; import org.mockito.Mockito; import org.opensearch.ExceptionsHelper; -import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; @@ -40,6 +39,7 @@ import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.store.StoreTests; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.DummyShardLock; import org.opensearch.test.IndexSettingsModule; @@ -199,7 +199,7 @@ public void onResponse(Void replicationResponse) { @Override public void onFailure(Exception e) { assertEquals(exception, e.getCause().getCause()); - segrepTarget.fail(new OpenSearchException(e), false); + segrepTarget.fail(new ReplicationFailedException(e), false); } }); } @@ -242,7 +242,7 @@ public void onResponse(Void replicationResponse) { @Override public void onFailure(Exception e) { assertEquals(exception, e.getCause().getCause()); - segrepTarget.fail(new OpenSearchException(e), false); + segrepTarget.fail(new ReplicationFailedException(e), false); } }); } @@ -287,7 +287,7 @@ public void onResponse(Void replicationResponse) { @Override public void onFailure(Exception e) { assertEquals(exception, e.getCause()); - segrepTarget.fail(new OpenSearchException(e), false); + segrepTarget.fail(new ReplicationFailedException(e), false); } }); } @@ -332,7 +332,7 @@ public void onResponse(Void replicationResponse) { @Override public void onFailure(Exception e) { assertEquals(exception, e.getCause()); - segrepTarget.fail(new OpenSearchException(e), false); + segrepTarget.fail(new ReplicationFailedException(e), false); } }); } @@ -374,7 +374,7 @@ public void onResponse(Void replicationResponse) { @Override public void onFailure(Exception e) { assert (e instanceof IllegalStateException); - segrepTarget.fail(new OpenSearchException(e), false); + segrepTarget.fail(new ReplicationFailedException(e), false); } }); } diff --git a/server/src/test/java/org/opensearch/recovery/ReplicationCollectionTests.java b/server/src/test/java/org/opensearch/recovery/ReplicationCollectionTests.java index 1789dd3b2a288..75ac1075e8ee0 100644 --- a/server/src/test/java/org/opensearch/recovery/ReplicationCollectionTests.java +++ b/server/src/test/java/org/opensearch/recovery/ReplicationCollectionTests.java @@ -39,6 +39,7 @@ import org.opensearch.index.shard.ShardId; import org.opensearch.index.store.Store; import org.opensearch.indices.replication.common.ReplicationCollection; +import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationListener; import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.indices.recovery.RecoveryState; @@ -59,7 +60,7 @@ public void onDone(ReplicationState state) { } @Override - public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) { + public void onFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { } }; @@ -93,7 +94,7 @@ public void onDone(ReplicationState state) { } @Override - public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) { + public void onFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { failed.set(true); latch.countDown(); } diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index f01a189e974bf..0ee8cc1fcbe0c 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -40,7 +40,6 @@ import org.junit.Assert; import org.mockito.Mockito; import org.opensearch.ExceptionsHelper; -import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.action.admin.indices.flush.FlushRequest; @@ -125,6 +124,7 @@ import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.common.CopyState; import org.opensearch.indices.replication.common.ReplicationCollection; +import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationListener; import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.repositories.IndexId; @@ -189,7 +189,7 @@ public void onDone(ReplicationState state) { } @Override - public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) { + public void onFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { throw new AssertionError(e); } }; @@ -1324,7 +1324,11 @@ public void onReplicationDone(SegmentReplicationState state) { } @Override - public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { + public void onReplicationFailure( + SegmentReplicationState state, + ReplicationFailedException e, + boolean sendShardFailure + ) { logger.error("Unexpected replication failure in test", e); Assert.fail("test replication should not fail: " + e); }