Skip to content

Commit

Permalink
[Segment Replication] Add SegmentReplicationTargetService to orchestr…
Browse files Browse the repository at this point in the history
…ate replication events. (#3439)

* Add SegmentReplicationTargetService to orchestrate replication events.

This change introduces  boilerplate classes for Segment Replication and a target service
to orchestrate replication events.

It also includes two refactors of peer recovery components for reuse.
1. Rename RecoveryFileChunkRequest to FileChunkRequest and extract code to handle throttling into
ReplicationTarget.
2. Extracts a component to execute retryable requests over the transport layer.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Code cleanup.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Make SegmentReplicationTargetService component final so that it can not
be extended by plugins.

Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 authored and Bukhtawar committed Jun 20, 2022
1 parent 3dd7121 commit 31c5be5
Show file tree
Hide file tree
Showing 21 changed files with 1,017 additions and 207 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
import org.opensearch.index.shard.IndexShardState;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryFileChunkRequest;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.plugins.Plugin;
import org.opensearch.snapshots.SnapshotState;
Expand Down Expand Up @@ -397,7 +397,7 @@ public void testCorruptionOnNetworkLayerFinalizingRecovery() throws ExecutionExc
internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()),
(connection, requestId, action, request, options) -> {
if (corrupt.get() && action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) {
RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request;
FileChunkRequest req = (FileChunkRequest) request;
byte[] array = BytesRef.deepCopyOf(req.content().toBytesRef()).bytes;
int i = randomIntBetween(0, req.content().length() - 1);
array[i] = (byte) ~array[i]; // flip one byte in the content
Expand Down Expand Up @@ -474,11 +474,11 @@ public void testCorruptionOnNetworkLayer() throws ExecutionException, Interrupte
internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()),
(connection, requestId, action, request, options) -> {
if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) {
RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request;
FileChunkRequest req = (FileChunkRequest) request;
if (truncate && req.length() > 1) {
BytesRef bytesRef = req.content().toBytesRef();
BytesArray array = new BytesArray(bytesRef.bytes, bytesRef.offset, (int) req.length() - 1);
request = new RecoveryFileChunkRequest(
request = new FileChunkRequest(
req.recoveryId(),
req.requestSeqNo(),
req.shardId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
import org.opensearch.index.shard.IndexShardState;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryFileChunkRequest;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.plugins.Plugin;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
Expand Down Expand Up @@ -809,7 +809,7 @@ public void sendRequest(
TransportRequestOptions options
) throws IOException {
if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) {
RecoveryFileChunkRequest chunkRequest = (RecoveryFileChunkRequest) request;
FileChunkRequest chunkRequest = (FileChunkRequest) request;
if (chunkRequest.name().startsWith(IndexFileNames.SEGMENTS)) {
// corrupting the segments_N files in order to make sure future recovery re-send files
logger.debug("corrupting [{}] to {}. file name: [{}]", action, connection.getNode(), chunkRequest.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryFileChunkRequest;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.node.RecoverySettingsChunkSizePlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand Down Expand Up @@ -146,7 +146,7 @@ public void testCancelRecoveryAndResume() throws Exception {
internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()),
(connection, requestId, action, request, options) -> {
if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) {
RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request;
FileChunkRequest req = (FileChunkRequest) request;
logger.info("file chunk [{}] lastChunk: {}", req, req.lastChunk());
if ((req.name().endsWith("cfs") || req.name().endsWith("fdt")) && req.lastChunk() && truncate.get()) {
latch.countDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@
import java.io.IOException;

/**
* Request for a recovery file chunk
* Request containing a file chunk.
*
* @opensearch.internal
*/
public final class RecoveryFileChunkRequest extends RecoveryTransportRequest {
public final class FileChunkRequest extends RecoveryTransportRequest {
private final boolean lastChunk;
private final long recoveryId;
private final ShardId shardId;
Expand All @@ -58,7 +58,7 @@ public final class RecoveryFileChunkRequest extends RecoveryTransportRequest {

private final int totalTranslogOps;

public RecoveryFileChunkRequest(StreamInput in) throws IOException {
public FileChunkRequest(StreamInput in) throws IOException {
super(in);
recoveryId = in.readLong();
shardId = new ShardId(in);
Expand All @@ -75,7 +75,7 @@ public RecoveryFileChunkRequest(StreamInput in) throws IOException {
sourceThrottleTimeInNanos = in.readLong();
}

public RecoveryFileChunkRequest(
public FileChunkRequest(
long recoveryId,
final long requestSeqNo,
ShardId shardId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,17 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.RateLimiter;
import org.opensearch.ExceptionsHelper;
import org.opensearch.LegacyESVersion;
import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionRunnable;
import org.opensearch.action.support.ChannelActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateObserver;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.Nullable;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.settings.Settings;
Expand All @@ -60,7 +57,6 @@
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.engine.RecoveryEngineException;
import org.opensearch.index.mapper.MapperException;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.IllegalIndexShardStateException;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;
Expand All @@ -71,7 +67,6 @@
import org.opensearch.index.translog.TranslogCorruptedException;
import org.opensearch.indices.replication.common.ReplicationCollection;
import org.opensearch.indices.replication.common.ReplicationCollection.ReplicationRef;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.indices.replication.common.ReplicationTimer;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -148,7 +143,7 @@ public PeerRecoveryTargetService(
transportService.registerRequestHandler(
Actions.FILE_CHUNK,
ThreadPool.Names.GENERIC,
RecoveryFileChunkRequest::new,
FileChunkRequest::new,
new FileChunkTransportRequestHandler()
);
transportService.registerRequestHandler(
Expand Down Expand Up @@ -354,12 +349,13 @@ class PrepareForTranslogOperationsRequestHandler implements TransportRequestHand
@Override
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel, Task task) {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
final ActionListener<Void> listener = createOrFinishListener(recoveryRef, channel, Actions.PREPARE_TRANSLOG, request);
final RecoveryTarget recoveryTarget = recoveryRef.get();
final ActionListener<Void> listener = recoveryTarget.createOrFinishListener(channel, Actions.PREPARE_TRANSLOG, request);
if (listener == null) {
return;
}

recoveryRef.get().prepareForTranslogOperations(request.totalTranslogOps(), listener);
recoveryTarget.prepareForTranslogOperations(request.totalTranslogOps(), listener);
}
}
}
Expand All @@ -369,12 +365,13 @@ class FinalizeRecoveryRequestHandler implements TransportRequestHandler<Recovery
@Override
public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel, Task task) throws Exception {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
final ActionListener<Void> listener = createOrFinishListener(recoveryRef, channel, Actions.FINALIZE, request);
final RecoveryTarget recoveryTarget = recoveryRef.get();
final ActionListener<Void> listener = recoveryTarget.createOrFinishListener(channel, Actions.FINALIZE, request);
if (listener == null) {
return;
}

recoveryRef.get().finalizeRecovery(request.globalCheckpoint(), request.trimAboveSeqNo(), listener);
recoveryTarget.finalizeRecovery(request.globalCheckpoint(), request.trimAboveSeqNo(), listener);
}
}
}
Expand All @@ -399,8 +396,7 @@ public void messageReceived(final RecoveryTranslogOperationsRequest request, fin
throws IOException {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
final RecoveryTarget recoveryTarget = recoveryRef.get();
final ActionListener<Void> listener = createOrFinishListener(
recoveryRef,
final ActionListener<Void> listener = recoveryTarget.createOrFinishListener(
channel,
Actions.TRANSLOG_OPS,
request,
Expand Down Expand Up @@ -484,20 +480,20 @@ class FilesInfoRequestHandler implements TransportRequestHandler<RecoveryFilesIn
@Override
public void messageReceived(RecoveryFilesInfoRequest request, TransportChannel channel, Task task) throws Exception {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
final ActionListener<Void> listener = createOrFinishListener(recoveryRef, channel, Actions.FILES_INFO, request);
final RecoveryTarget recoveryTarget = recoveryRef.get();
final ActionListener<Void> listener = recoveryTarget.createOrFinishListener(channel, Actions.FILES_INFO, request);
if (listener == null) {
return;
}

recoveryRef.get()
.receiveFileInfo(
request.phase1FileNames,
request.phase1FileSizes,
request.phase1ExistingFileNames,
request.phase1ExistingFileSizes,
request.totalTranslogOps,
listener
);
recoveryTarget.receiveFileInfo(
request.phase1FileNames,
request.phase1FileSizes,
request.phase1ExistingFileNames,
request.phase1ExistingFileSizes,
request.totalTranslogOps,
listener
);
}
}
}
Expand All @@ -507,90 +503,37 @@ class CleanFilesRequestHandler implements TransportRequestHandler<RecoveryCleanF
@Override
public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel, Task task) throws Exception {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
final ActionListener<Void> listener = createOrFinishListener(recoveryRef, channel, Actions.CLEAN_FILES, request);
final RecoveryTarget recoveryTarget = recoveryRef.get();
final ActionListener<Void> listener = recoveryTarget.createOrFinishListener(channel, Actions.CLEAN_FILES, request);
if (listener == null) {
return;
}

recoveryRef.get()
.cleanFiles(request.totalTranslogOps(), request.getGlobalCheckpoint(), request.sourceMetaSnapshot(), listener);
recoveryTarget.cleanFiles(
request.totalTranslogOps(),
request.getGlobalCheckpoint(),
request.sourceMetaSnapshot(),
listener
);
}
}
}

class FileChunkTransportRequestHandler implements TransportRequestHandler<RecoveryFileChunkRequest> {
class FileChunkTransportRequestHandler implements TransportRequestHandler<FileChunkRequest> {

// How many bytes we've copied since we last called RateLimiter.pause
final AtomicLong bytesSinceLastPause = new AtomicLong();

@Override
public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel, Task task) throws Exception {
public void messageReceived(final FileChunkRequest request, TransportChannel channel, Task task) throws Exception {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
final RecoveryTarget recoveryTarget = recoveryRef.get();
final ActionListener<Void> listener = createOrFinishListener(recoveryRef, channel, Actions.FILE_CHUNK, request);
if (listener == null) {
return;
}

final ReplicationLuceneIndex indexState = recoveryTarget.state().getIndex();
if (request.sourceThrottleTimeInNanos() != ReplicationLuceneIndex.UNKNOWN) {
indexState.addSourceThrottling(request.sourceThrottleTimeInNanos());
}

RateLimiter rateLimiter = recoverySettings.rateLimiter();
if (rateLimiter != null) {
long bytes = bytesSinceLastPause.addAndGet(request.content().length());
if (bytes > rateLimiter.getMinPauseCheckBytes()) {
// Time to pause
bytesSinceLastPause.addAndGet(-bytes);
long throttleTimeInNanos = rateLimiter.pause(bytes);
indexState.addTargetThrottling(throttleTimeInNanos);
recoveryTarget.indexShard().recoveryStats().addThrottleTime(throttleTimeInNanos);
}
}
recoveryTarget.writeFileChunk(
request.metadata(),
request.position(),
request.content(),
request.lastChunk(),
request.totalTranslogOps(),
listener
);
final ActionListener<Void> listener = recoveryTarget.createOrFinishListener(channel, Actions.FILE_CHUNK, request);
recoveryTarget.handleFileChunk(request, recoveryTarget, bytesSinceLastPause, recoverySettings.rateLimiter(), listener);
}
}
}

private ActionListener<Void> createOrFinishListener(
final ReplicationRef<RecoveryTarget> recoveryRef,
final TransportChannel channel,
final String action,
final RecoveryTransportRequest request
) {
return createOrFinishListener(recoveryRef, channel, action, request, nullVal -> TransportResponse.Empty.INSTANCE);
}

private ActionListener<Void> createOrFinishListener(
final ReplicationRef<RecoveryTarget> recoveryRef,
final TransportChannel channel,
final String action,
final RecoveryTransportRequest request,
final CheckedFunction<Void, TransportResponse, Exception> responseFn
) {
final RecoveryTarget recoveryTarget = recoveryRef.get();
final ActionListener<TransportResponse> channelListener = new ChannelActionListener<>(channel, action, request);
final ActionListener<Void> voidListener = ActionListener.map(channelListener, responseFn);

final long requestSeqNo = request.requestSeqNo();
final ActionListener<Void> listener;
if (requestSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
listener = recoveryTarget.markRequestReceivedAndCreateListener(requestSeqNo, voidListener);
} else {
listener = voidListener;
}

return listener;
}

class RecoveryRunner extends AbstractRunnable {

final long recoveryId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ public Translog getTranslog() {
return translog;
}

@Override
public ReplicationTimer getTimer() {
return timer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.lucene.index.IndexFormatTooOldException;
import org.opensearch.Assertions;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -141,7 +142,7 @@ public String description() {
}

@Override
public void notifyListener(Exception e, boolean sendShardFailure) {
public void notifyListener(OpenSearchException e, boolean sendShardFailure) {
listener.onFailure(state(), new RecoveryFailedException(state(), e.getMessage(), e), sendShardFailure);
}

Expand Down
Loading

0 comments on commit 31c5be5

Please sign in to comment.