Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove some abstractions from TransportReplicationAction #40706

Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
85b1cdb
Use ChannelActionListener in OperationTransportHandler
DaveCTurner Apr 1, 2019
4302320
Generic PrimaryResult
DaveCTurner Apr 1, 2019
16350a6
No need for ShardReference superclass
DaveCTurner Apr 1, 2019
efdab0c
Use ActionListener instead of Channel for primary response handler
DaveCTurner Apr 1, 2019
e1fef41
Use ActionListener instead of Channel for replica response handler
DaveCTurner Apr 1, 2019
4844f9a
Use ConcreteShardRequest throughout AsyncPrimaryAction
DaveCTurner Apr 1, 2019
2135ff9
Use ConcreteReplicaRequest throughout AbstractReplicaAction
DaveCTurner Apr 1, 2019
240edcf
Replace OperationTransportHandler with lambda
DaveCTurner Apr 1, 2019
a076dec
Replace PrimaryOperationTransportHandler with lambda
DaveCTurner Apr 1, 2019
83c6c67
Replace ReplicaOperationTransportHandler with lambda
DaveCTurner Apr 1, 2019
3d7e753
Allow subclasses to control whether primary action is forced
DaveCTurner Apr 1, 2019
df7437c
Imports
DaveCTurner Apr 1, 2019
9f89af7
Revert "Allow subclasses to control whether primary action is forced"
DaveCTurner Apr 1, 2019
a5bfc84
Revert "Revert "Allow subclasses to control whether primary action is…
DaveCTurner Apr 1, 2019
7182fd6
Merge branch 'master' into 2019-04-01-refactor-transport-replication-…
DaveCTurner Apr 2, 2019
fb06813
Rename listener -> onCompletionListener
DaveCTurner Apr 2, 2019
ffc7256
onCompletionListener.onResponse cannot fail
DaveCTurner Apr 2, 2019
c4d9c40
Revert "Revert "Revert "Allow subclasses to control whether primary a…
DaveCTurner Apr 2, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,11 @@ protected ReplicationResponse newResponseInstance() {
}

@Override
protected PrimaryResult shardOperationOnPrimary(BasicReplicationRequest shardRequest, IndexShard primary) {
protected PrimaryResult<BasicReplicationRequest, ReplicationResponse> shardOperationOnPrimary(
BasicReplicationRequest shardRequest, IndexShard primary) {
primary.refresh("api");
logger.trace("{} refresh request executed on primary", primary.shardId());
return new PrimaryResult(shardRequest, new ReplicationResponse());
return new PrimaryResult<>(shardRequest, new ReplicationResponse());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.function.Supplier;

public class TransportResyncReplicationAction extends TransportWriteAction<ResyncReplicationRequest,
ResyncReplicationRequest, ResyncReplicationResponse> implements PrimaryReplicaSyncer.SyncAction {
Expand All @@ -64,18 +63,9 @@ public TransportResyncReplicationAction(Settings settings, TransportService tran
}

@Override
protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier<ResyncReplicationRequest> request,
Supplier<ResyncReplicationRequest> replicaRequest, String executor) {
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler());
protected boolean forcePrimaryActionExecution() {
// we should never reject resync because of thread pool capacity on primary
transportService.registerRequestHandler(transportPrimaryAction,
() -> new ConcreteShardRequest<>(request),
executor, true, true,
new PrimaryOperationTransportHandler());
transportService.registerRequestHandler(transportReplicaAction,
() -> new ConcreteReplicaRequest<>(replicaRequest),
executor, true, true,
new ReplicaOperationTransportHandler());
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (Exception e1) {
e1.addSuppressed(e);
logger.warn(() -> new ParameterizedMessage(
"Failed to send error response for action [{}] and request [{}]", actionName, request), e1);
}
Expand Down
Loading