forked from elastic/elasticsearch
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add the ability to fetch the latest successful shard snapshot
This commit adds a new master transport action TransportGetShardSnapshotAction that allows getting the last successful snapshot for a particular shard in a set of repositories. It deals with the different implementation details around BwC for repositories. Relates elastic#73496 Backport of elastic#75080
- Loading branch information
Showing
16 changed files
with
1,165 additions
and
4 deletions.
There are no files selected for viewing
345 changes: 345 additions & 0 deletions
345
.../src/internalClusterTest/java/org/elasticsearch/repositories/IndexSnapshotsServiceIT.java
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
21 changes: 21 additions & 0 deletions
21
...va/org/elasticsearch/action/admin/cluster/snapshots/get/shard/GetShardSnapshotAction.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0 and the Server Side Public License, v 1; you may not use this file except | ||
* in compliance with, at your election, the Elastic License 2.0 or the Server | ||
* Side Public License, v 1. | ||
*/ | ||
|
||
package org.elasticsearch.action.admin.cluster.snapshots.get.shard; | ||
|
||
import org.elasticsearch.action.ActionType; | ||
|
||
public class GetShardSnapshotAction extends ActionType<GetShardSnapshotResponse> { | ||
|
||
public static final GetShardSnapshotAction INSTANCE = new GetShardSnapshotAction(); | ||
public static final String NAME = "internal:admin/snapshot/get_shard"; | ||
|
||
public GetShardSnapshotAction() { | ||
super(NAME, GetShardSnapshotResponse::new); | ||
} | ||
} |
105 changes: 105 additions & 0 deletions
105
...a/org/elasticsearch/action/admin/cluster/snapshots/get/shard/GetShardSnapshotRequest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0 and the Server Side Public License, v 1; you may not use this file except | ||
* in compliance with, at your election, the Elastic License 2.0 or the Server | ||
* Side Public License, v 1. | ||
*/ | ||
|
||
package org.elasticsearch.action.admin.cluster.snapshots.get.shard; | ||
|
||
import org.elasticsearch.action.ActionRequestValidationException; | ||
import org.elasticsearch.action.support.master.MasterNodeRequest; | ||
import org.elasticsearch.common.io.stream.StreamInput; | ||
import org.elasticsearch.common.io.stream.StreamOutput; | ||
import org.elasticsearch.index.shard.ShardId; | ||
|
||
import java.io.IOException; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Objects; | ||
|
||
import static org.elasticsearch.action.ValidateActions.addValidationError; | ||
|
||
public class GetShardSnapshotRequest extends MasterNodeRequest<GetShardSnapshotRequest> { | ||
private static final String ALL_REPOSITORIES = "_all"; | ||
|
||
private final List<String> repositories; | ||
private final ShardId shardId; | ||
|
||
GetShardSnapshotRequest(List<String> repositories, ShardId shardId) { | ||
assert repositories.isEmpty() == false; | ||
assert repositories.stream().noneMatch(Objects::isNull); | ||
assert repositories.size() == 1 || repositories.stream().noneMatch(repo -> repo.equals(ALL_REPOSITORIES)); | ||
this.repositories = Objects.requireNonNull(repositories); | ||
this.shardId = Objects.requireNonNull(shardId); | ||
} | ||
|
||
public GetShardSnapshotRequest(StreamInput in) throws IOException { | ||
super(in); | ||
this.repositories = in.readStringList(); | ||
this.shardId = new ShardId(in); | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
super.writeTo(out); | ||
out.writeStringCollection(repositories); | ||
shardId.writeTo(out); | ||
} | ||
|
||
public static GetShardSnapshotRequest latestSnapshotInAllRepositories(ShardId shardId) { | ||
return new GetShardSnapshotRequest(Collections.singletonList(ALL_REPOSITORIES), shardId); | ||
} | ||
|
||
public static GetShardSnapshotRequest latestSnapshotInRepositories(ShardId shardId, List<String> repositories) { | ||
if (repositories.isEmpty()) { | ||
throw new IllegalArgumentException("Expected at least 1 repository but got none"); | ||
} | ||
|
||
if (repositories.stream().anyMatch(Objects::isNull)) { | ||
throw new NullPointerException("null values are not allowed in the repository list"); | ||
} | ||
return new GetShardSnapshotRequest(repositories, shardId); | ||
} | ||
|
||
@Override | ||
public ActionRequestValidationException validate() { | ||
ActionRequestValidationException validationException = null; | ||
|
||
if (repositories.size() == 0) { | ||
validationException = addValidationError("repositories are missing", validationException); | ||
} | ||
|
||
return validationException; | ||
} | ||
|
||
public boolean getFromAllRepositories() { | ||
return repositories.size() == 1 && ALL_REPOSITORIES.equalsIgnoreCase(repositories.get(0)); | ||
} | ||
|
||
public boolean isSingleRepositoryRequest() { | ||
return repositories.size() == 1 && ALL_REPOSITORIES.equalsIgnoreCase(repositories.get(0)) == false; | ||
} | ||
|
||
public ShardId getShardId() { | ||
return shardId; | ||
} | ||
|
||
public List<String> getRepositories() { | ||
return repositories; | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (this == o) return true; | ||
if (o == null || getClass() != o.getClass()) return false; | ||
GetShardSnapshotRequest request = (GetShardSnapshotRequest) o; | ||
return Objects.equals(repositories, request.repositories) && Objects.equals(shardId, request.shardId); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(repositories, shardId); | ||
} | ||
} |
60 changes: 60 additions & 0 deletions
60
.../org/elasticsearch/action/admin/cluster/snapshots/get/shard/GetShardSnapshotResponse.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0 and the Server Side Public License, v 1; you may not use this file except | ||
* in compliance with, at your election, the Elastic License 2.0 or the Server | ||
* Side Public License, v 1. | ||
*/ | ||
|
||
package org.elasticsearch.action.admin.cluster.snapshots.get.shard; | ||
|
||
import org.elasticsearch.action.ActionResponse; | ||
import org.elasticsearch.common.io.stream.StreamInput; | ||
import org.elasticsearch.common.io.stream.StreamOutput; | ||
import org.elasticsearch.repositories.RepositoryException; | ||
import org.elasticsearch.repositories.ShardSnapshotInfo; | ||
|
||
import java.io.IOException; | ||
import java.util.Collections; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
|
||
public class GetShardSnapshotResponse extends ActionResponse { | ||
public static GetShardSnapshotResponse EMPTY = new GetShardSnapshotResponse(Collections.emptyMap(), Collections.emptyMap()); | ||
|
||
private final Map<String, ShardSnapshotInfo> repositoryShardSnapshots; | ||
private final Map<String, RepositoryException> repositoryFailures; | ||
|
||
GetShardSnapshotResponse(Map<String, ShardSnapshotInfo> repositoryShardSnapshots, Map<String, RepositoryException> repositoryFailures) { | ||
this.repositoryShardSnapshots = repositoryShardSnapshots; | ||
this.repositoryFailures = repositoryFailures; | ||
} | ||
|
||
GetShardSnapshotResponse(StreamInput in) throws IOException { | ||
super(in); | ||
this.repositoryShardSnapshots = in.readMap(StreamInput::readString, ShardSnapshotInfo::new); | ||
this.repositoryFailures = in.readMap(StreamInput::readString, RepositoryException::new); | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
out.writeMap(repositoryShardSnapshots, StreamOutput::writeString, (o, info) -> info.writeTo(o)); | ||
out.writeMap(repositoryFailures, StreamOutput::writeString, (o, err) -> err.writeTo(o)); | ||
} | ||
|
||
public Optional<ShardSnapshotInfo> getIndexShardSnapshotInfoForRepository(String repositoryName) { | ||
return Optional.ofNullable(repositoryShardSnapshots.get(repositoryName)); | ||
} | ||
|
||
public Optional<RepositoryException> getFailureForRepository(String repository) { | ||
return Optional.ofNullable(repositoryFailures.get(repository)); | ||
} | ||
|
||
public Map<String, ShardSnapshotInfo> getRepositoryShardSnapshots() { | ||
return repositoryShardSnapshots; | ||
} | ||
|
||
public Map<String, RepositoryException> getRepositoryFailures() { | ||
return repositoryFailures; | ||
} | ||
} |
151 changes: 151 additions & 0 deletions
151
...asticsearch/action/admin/cluster/snapshots/get/shard/TransportGetShardSnapshotAction.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0 and the Server Side Public License, v 1; you may not use this file except | ||
* in compliance with, at your election, the Elastic License 2.0 or the Server | ||
* Side Public License, v 1. | ||
*/ | ||
|
||
package org.elasticsearch.action.admin.cluster.snapshots.get.shard; | ||
|
||
import org.elasticsearch.action.ActionListener; | ||
import org.elasticsearch.action.support.ActionFilters; | ||
import org.elasticsearch.action.support.GroupedActionListener; | ||
import org.elasticsearch.action.support.master.TransportMasterNodeAction; | ||
import org.elasticsearch.cluster.ClusterState; | ||
import org.elasticsearch.cluster.block.ClusterBlockException; | ||
import org.elasticsearch.cluster.block.ClusterBlockLevel; | ||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; | ||
import org.elasticsearch.cluster.metadata.RepositoriesMetadata; | ||
import org.elasticsearch.cluster.metadata.RepositoryMetadata; | ||
import org.elasticsearch.cluster.service.ClusterService; | ||
import org.elasticsearch.common.inject.Inject; | ||
import org.elasticsearch.core.Tuple; | ||
import org.elasticsearch.index.shard.ShardId; | ||
import org.elasticsearch.repositories.IndexSnapshotsService; | ||
import org.elasticsearch.repositories.RepositoriesService; | ||
import org.elasticsearch.repositories.RepositoryException; | ||
import org.elasticsearch.repositories.ShardSnapshotInfo; | ||
import org.elasticsearch.threadpool.ThreadPool; | ||
import org.elasticsearch.transport.TransportService; | ||
|
||
import java.util.Collection; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.Optional; | ||
import java.util.Set; | ||
import java.util.concurrent.BlockingQueue; | ||
import java.util.concurrent.LinkedBlockingQueue; | ||
import java.util.function.Function; | ||
import java.util.stream.Collectors; | ||
|
||
public class TransportGetShardSnapshotAction extends TransportMasterNodeAction<GetShardSnapshotRequest, GetShardSnapshotResponse> { | ||
|
||
private final IndexSnapshotsService indexSnapshotsService; | ||
|
||
@Inject | ||
public TransportGetShardSnapshotAction( | ||
TransportService transportService, | ||
ClusterService clusterService, | ||
ThreadPool threadPool, | ||
RepositoriesService repositoriesService, | ||
ActionFilters actionFilters, | ||
IndexNameExpressionResolver indexNameExpressionResolver | ||
) { | ||
super( | ||
GetShardSnapshotAction.NAME, | ||
transportService, | ||
clusterService, | ||
threadPool, | ||
actionFilters, | ||
GetShardSnapshotRequest::new, | ||
indexNameExpressionResolver, | ||
GetShardSnapshotResponse::new, | ||
ThreadPool.Names.SAME | ||
); | ||
this.indexSnapshotsService = new IndexSnapshotsService(repositoriesService); | ||
} | ||
|
||
@Override | ||
protected void masterOperation(GetShardSnapshotRequest request, ClusterState state, ActionListener<GetShardSnapshotResponse> listener) | ||
throws Exception { | ||
final Set<String> repositories = getRequestedRepositories(request, state); | ||
final ShardId shardId = request.getShardId(); | ||
|
||
if (repositories.isEmpty()) { | ||
listener.onResponse(GetShardSnapshotResponse.EMPTY); | ||
return; | ||
} | ||
|
||
GroupedActionListener<Tuple<Optional<ShardSnapshotInfo>, RepositoryException>> groupedActionListener = new GroupedActionListener<>( | ||
listener.map(this::transformToResponse), | ||
repositories.size() | ||
); | ||
|
||
BlockingQueue<String> repositoriesQueue = new LinkedBlockingQueue<>(repositories); | ||
getShardSnapshots(repositoriesQueue, shardId, new ActionListener<Optional<ShardSnapshotInfo>>() { | ||
@Override | ||
public void onResponse(Optional<ShardSnapshotInfo> shardSnapshotInfo) { | ||
groupedActionListener.onResponse(Tuple.tuple(shardSnapshotInfo, null)); | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception err) { | ||
if (request.isSingleRepositoryRequest() == false && err instanceof RepositoryException) { | ||
groupedActionListener.onResponse(Tuple.tuple(Optional.empty(), (RepositoryException) err)); | ||
} else { | ||
groupedActionListener.onFailure(err); | ||
} | ||
} | ||
}); | ||
} | ||
|
||
private void getShardSnapshots( | ||
BlockingQueue<String> repositories, | ||
ShardId shardId, | ||
ActionListener<Optional<ShardSnapshotInfo>> listener | ||
) { | ||
final String repository = repositories.poll(); | ||
if (repository == null) { | ||
return; | ||
} | ||
|
||
indexSnapshotsService.getLatestSuccessfulSnapshotForShard( | ||
repository, | ||
shardId, | ||
ActionListener.runAfter(listener, () -> getShardSnapshots(repositories, shardId, listener)) | ||
); | ||
} | ||
|
||
private GetShardSnapshotResponse transformToResponse( | ||
Collection<Tuple<Optional<ShardSnapshotInfo>, RepositoryException>> shardSnapshots | ||
) { | ||
final Map<String, ShardSnapshotInfo> repositoryShardSnapshot = shardSnapshots.stream() | ||
.map(Tuple::v1) | ||
.filter(Objects::nonNull) | ||
.filter(Optional::isPresent) | ||
.map(Optional::get) | ||
.collect(Collectors.toMap(ShardSnapshotInfo::getRepository, Function.identity())); | ||
|
||
final Map<String, RepositoryException> failures = shardSnapshots.stream() | ||
.map(Tuple::v2) | ||
.filter(Objects::nonNull) | ||
.collect(Collectors.toMap(RepositoryException::repository, Function.identity())); | ||
|
||
return new GetShardSnapshotResponse(repositoryShardSnapshot, failures); | ||
} | ||
|
||
private Set<String> getRequestedRepositories(GetShardSnapshotRequest request, ClusterState state) { | ||
RepositoriesMetadata repositories = state.metadata().custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY); | ||
if (request.getFromAllRepositories()) { | ||
return repositories.repositories().stream().map(RepositoryMetadata::name).collect(Collectors.toSet()); | ||
} | ||
|
||
return request.getRepositories().stream().filter(Objects::nonNull).collect(Collectors.toSet()); | ||
} | ||
|
||
@Override | ||
protected ClusterBlockException checkBlock(GetShardSnapshotRequest request, ClusterState state) { | ||
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.