Skip to content

Commit 1fa1056

Browse files
authored
Add CcrRestoreSourceService to track sessions (#36578)
This commit is related to #36127. It adds a CcrRestoreSourceService to track Engine.IndexCommitRef need for in-process file restores. When a follower starts restoring a shard through the CcrRepository it opens a session with the leader through the PutCcrRestoreSessionAction. The leader responds to the request by telling the follower what files it needs to fetch for a restore. This is not yet implemented. Once, the restore is complete, the follower closes the session with the DeleteCcrRestoreSessionAction action.
1 parent 7de85f5 commit 1fa1056

File tree

10 files changed

+787
-4
lines changed

10 files changed

+787
-4
lines changed

server/src/main/java/org/elasticsearch/common/util/iterable/Iterables.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public Iterables() {
3434

3535
public static <T> Iterable<T> concat(Iterable<T>... inputs) {
3636
Objects.requireNonNull(inputs);
37-
return new ConcatenatedIterable(inputs);
37+
return new ConcatenatedIterable<>(inputs);
3838
}
3939

4040
static class ConcatenatedIterable<T> implements Iterable<T> {

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
package org.elasticsearch.xpack.ccr;
88

9+
import org.apache.lucene.util.SetOnce;
910
import org.elasticsearch.action.ActionRequest;
1011
import org.elasticsearch.action.ActionResponse;
1112
import org.elasticsearch.client.Client;
@@ -23,6 +24,7 @@
2324
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
2425
import org.elasticsearch.env.Environment;
2526
import org.elasticsearch.env.NodeEnvironment;
27+
import org.elasticsearch.index.IndexModule;
2628
import org.elasticsearch.index.IndexSettings;
2729
import org.elasticsearch.index.engine.EngineFactory;
2830
import org.elasticsearch.license.XPackLicenseState;
@@ -57,10 +59,13 @@
5759
import org.elasticsearch.xpack.ccr.action.TransportUnfollowAction;
5860
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
5961
import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction;
62+
import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction;
6063
import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryAction;
64+
import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionAction;
6165
import org.elasticsearch.xpack.ccr.action.repositories.PutInternalCcrRepositoryAction;
6266
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
6367
import org.elasticsearch.xpack.ccr.repository.CcrRepository;
68+
import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService;
6469
import org.elasticsearch.xpack.ccr.rest.RestCcrStatsAction;
6570
import org.elasticsearch.xpack.ccr.rest.RestDeleteAutoFollowPatternAction;
6671
import org.elasticsearch.xpack.ccr.rest.RestFollowStatsAction;
@@ -110,6 +115,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
110115
private final boolean enabled;
111116
private final Settings settings;
112117
private final CcrLicenseChecker ccrLicenseChecker;
118+
private final SetOnce<CcrRestoreSourceService> restoreSourceService = new SetOnce<>();
113119
private Client client;
114120

115121
/**
@@ -150,8 +156,11 @@ public Collection<Object> createComponents(
150156
return emptyList();
151157
}
152158

159+
CcrRestoreSourceService restoreSourceService = new CcrRestoreSourceService(settings);
160+
this.restoreSourceService.set(restoreSourceService);
153161
return Arrays.asList(
154162
ccrLicenseChecker,
163+
restoreSourceService,
155164
new CcrRepositoryManager(settings, clusterService, client),
156165
new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker, threadPool::relativeTimeInMillis)
157166
);
@@ -179,6 +188,10 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
179188
PutInternalCcrRepositoryAction.TransportPutInternalRepositoryAction.class),
180189
new ActionHandler<>(DeleteInternalCcrRepositoryAction.INSTANCE,
181190
DeleteInternalCcrRepositoryAction.TransportDeleteInternalRepositoryAction.class),
191+
new ActionHandler<>(PutCcrRestoreSessionAction.INSTANCE,
192+
PutCcrRestoreSessionAction.TransportPutCcrRestoreSessionAction.class),
193+
new ActionHandler<>(ClearCcrRestoreSessionAction.INSTANCE,
194+
ClearCcrRestoreSessionAction.TransportDeleteCcrRestoreSessionAction.class),
182195
// stats action
183196
new ActionHandler<>(FollowStatsAction.INSTANCE, TransportFollowStatsAction.class),
184197
new ActionHandler<>(CcrStatsAction.INSTANCE, TransportCcrStatsAction.class),
@@ -278,6 +291,11 @@ public Map<String, Repository.Factory> getInternalRepositories(Environment env,
278291
return Collections.singletonMap(CcrRepository.TYPE, repositoryFactory);
279292
}
280293

294+
@Override
295+
public void onIndexModule(IndexModule indexModule) {
296+
indexModule.addIndexEventListener(this.restoreSourceService.get());
297+
}
298+
281299
protected XPackLicenseState getLicenseState() { return XPackPlugin.getSharedLicenseState(); }
282300

283301
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.ccr.action.repositories;
8+
9+
import org.elasticsearch.action.Action;
10+
import org.elasticsearch.action.FailedNodeException;
11+
import org.elasticsearch.action.support.ActionFilters;
12+
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
13+
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
14+
import org.elasticsearch.action.support.nodes.TransportNodesAction;
15+
import org.elasticsearch.cluster.ClusterName;
16+
import org.elasticsearch.cluster.node.DiscoveryNode;
17+
import org.elasticsearch.cluster.service.ClusterService;
18+
import org.elasticsearch.common.inject.Inject;
19+
import org.elasticsearch.common.io.stream.StreamInput;
20+
import org.elasticsearch.common.io.stream.StreamOutput;
21+
import org.elasticsearch.threadpool.ThreadPool;
22+
import org.elasticsearch.transport.TransportService;
23+
import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService;
24+
25+
import java.io.IOException;
26+
import java.util.List;
27+
28+
public class ClearCcrRestoreSessionAction extends Action<ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse> {
29+
30+
public static final ClearCcrRestoreSessionAction INSTANCE = new ClearCcrRestoreSessionAction();
31+
private static final String NAME = "internal:admin/ccr/restore/session/clear";
32+
33+
private ClearCcrRestoreSessionAction() {
34+
super(NAME);
35+
}
36+
37+
@Override
38+
public ClearCcrRestoreSessionResponse newResponse() {
39+
return new ClearCcrRestoreSessionResponse();
40+
}
41+
42+
public static class TransportDeleteCcrRestoreSessionAction extends TransportNodesAction<ClearCcrRestoreSessionRequest,
43+
ClearCcrRestoreSessionResponse, ClearCcrRestoreSessionRequest.Request, Response> {
44+
45+
private final CcrRestoreSourceService ccrRestoreService;
46+
47+
@Inject
48+
public TransportDeleteCcrRestoreSessionAction(ThreadPool threadPool, ClusterService clusterService, ActionFilters actionFilters,
49+
TransportService transportService, CcrRestoreSourceService ccrRestoreService) {
50+
super(NAME, threadPool, clusterService, transportService, actionFilters, ClearCcrRestoreSessionRequest::new,
51+
ClearCcrRestoreSessionRequest.Request::new, ThreadPool.Names.GENERIC, Response.class);
52+
this.ccrRestoreService = ccrRestoreService;
53+
}
54+
55+
@Override
56+
protected ClearCcrRestoreSessionResponse newResponse(ClearCcrRestoreSessionRequest request, List<Response> responses,
57+
List<FailedNodeException> failures) {
58+
return new ClearCcrRestoreSessionResponse(clusterService.getClusterName(), responses, failures);
59+
}
60+
61+
@Override
62+
protected ClearCcrRestoreSessionRequest.Request newNodeRequest(String nodeId, ClearCcrRestoreSessionRequest request) {
63+
return request.getRequest();
64+
}
65+
66+
@Override
67+
protected Response newNodeResponse() {
68+
return new Response();
69+
}
70+
71+
@Override
72+
protected Response nodeOperation(ClearCcrRestoreSessionRequest.Request request) {
73+
ccrRestoreService.closeSession(request.getSessionUUID());
74+
return new Response(clusterService.localNode());
75+
}
76+
}
77+
78+
public static class Response extends BaseNodeResponse {
79+
80+
private Response() {
81+
}
82+
83+
private Response(StreamInput in) throws IOException {
84+
readFrom(in);
85+
}
86+
87+
private Response(DiscoveryNode node) {
88+
super(node);
89+
}
90+
91+
@Override
92+
public void writeTo(StreamOutput out) throws IOException {
93+
super.writeTo(out);
94+
}
95+
96+
@Override
97+
public void readFrom(StreamInput in) throws IOException {
98+
super.readFrom(in);
99+
}
100+
}
101+
102+
public static class ClearCcrRestoreSessionResponse extends BaseNodesResponse<Response> {
103+
104+
ClearCcrRestoreSessionResponse() {
105+
}
106+
107+
ClearCcrRestoreSessionResponse(ClusterName clusterName, List<Response> chunkResponses, List<FailedNodeException> failures) {
108+
super(clusterName, chunkResponses, failures);
109+
}
110+
111+
@Override
112+
protected List<Response> readNodesFrom(StreamInput in) throws IOException {
113+
return in.readList(Response::new);
114+
}
115+
116+
@Override
117+
protected void writeNodesTo(StreamOutput out, List<Response> nodes) throws IOException {
118+
out.writeList(nodes);
119+
}
120+
}
121+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.ccr.action.repositories;
8+
9+
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
10+
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
11+
import org.elasticsearch.common.io.stream.StreamInput;
12+
import org.elasticsearch.common.io.stream.StreamOutput;
13+
14+
import java.io.IOException;
15+
16+
public class ClearCcrRestoreSessionRequest extends BaseNodesRequest<ClearCcrRestoreSessionRequest> {
17+
18+
private Request request;
19+
20+
ClearCcrRestoreSessionRequest() {
21+
}
22+
23+
public ClearCcrRestoreSessionRequest(String nodeId, Request request) {
24+
super(nodeId);
25+
this.request = request;
26+
}
27+
28+
@Override
29+
public void readFrom(StreamInput streamInput) throws IOException {
30+
super.readFrom(streamInput);
31+
request = new Request();
32+
request.readFrom(streamInput);
33+
}
34+
35+
@Override
36+
public void writeTo(StreamOutput streamOutput) throws IOException {
37+
super.writeTo(streamOutput);
38+
request.writeTo(streamOutput);
39+
}
40+
41+
public Request getRequest() {
42+
return request;
43+
}
44+
45+
public static class Request extends BaseNodeRequest {
46+
47+
private String sessionUUID;
48+
49+
Request() {
50+
}
51+
52+
public Request(String nodeId, String sessionUUID) {
53+
super(nodeId);
54+
this.sessionUUID = sessionUUID;
55+
}
56+
57+
@Override
58+
public void readFrom(StreamInput in) throws IOException {
59+
super.readFrom(in);
60+
sessionUUID = in.readString();
61+
}
62+
63+
@Override
64+
public void writeTo(StreamOutput out) throws IOException {
65+
super.writeTo(out);
66+
out.writeString(sessionUUID);
67+
}
68+
69+
public String getSessionUUID() {
70+
return sessionUUID;
71+
}
72+
}
73+
}

0 commit comments

Comments
 (0)