Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the TransportRolloverAction execute in one cluster state update #50388

Merged
merged 6 commits into from
Dec 20, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.elasticsearch.action.admin.indices.rollover;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
Expand Down Expand Up @@ -130,7 +129,7 @@ protected void masterOperation(Task task, final RolloverRequest rolloverRequest,
.docs(true);
statsRequest.setParentTask(clusterService.localNode().getId(), task.getId());
client.execute(IndicesStatsAction.INSTANCE, statsRequest,
new ActionListener<IndicesStatsResponse>() {
new ActionListener<>() {
@Override
public void onResponse(IndicesStatsResponse statsResponse) {
final Map<String, Boolean> conditionResults = evaluateConditions(rolloverRequest.getConditions().values(),
Expand All @@ -141,56 +140,41 @@ public void onResponse(IndicesStatsResponse statsResponse) {
new RolloverResponse(sourceIndexName, rolloverIndexName, conditionResults, true, false, false, false));
return;
}
List<Condition<?>> metConditions = rolloverRequest.getConditions().values().stream()
List<Condition<?>> metConditions = rolloverRequest.getConditions().values().stream()
.filter(condition -> conditionResults.get(condition.toString())).collect(Collectors.toList());
if (conditionResults.size() == 0 || metConditions.size() > 0) {
CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(unresolvedName, rolloverIndexName,
rolloverRequest);
createIndexService.createIndex(updateRequest, ActionListener.wrap(createIndexClusterStateUpdateResponse -> {
final IndicesAliasesClusterStateUpdateRequest aliasesUpdateRequest;
if (explicitWriteIndex) {
aliasesUpdateRequest = prepareRolloverAliasesWriteIndexUpdateRequest(sourceIndexName,
rolloverIndexName, rolloverRequest);
} else {
aliasesUpdateRequest = prepareRolloverAliasesUpdateRequest(sourceIndexName,
rolloverIndexName, rolloverRequest);
CreateIndexClusterStateUpdateRequest createIndexRequest = prepareCreateIndexRequest(unresolvedName,
rolloverIndexName, rolloverRequest);
clusterService.submitStateUpdateTask("rollover_index", new ClusterStateUpdateTask() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add the index name into the task string?

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
ClusterState newState = createIndexService.applyCreateIndexRequest(currentState, createIndexRequest);
newState = indexAliasesService.applyAliasActions(newState,
rolloverAliasToNewIndex(sourceIndexName, rolloverIndexName, rolloverRequest, explicitWriteIndex));
RolloverInfo rolloverInfo = new RolloverInfo(rolloverRequest.getAlias(), metConditions,
threadPool.absoluteTimeInMillis());
return ClusterState.builder(newState)
.metaData(MetaData.builder(newState.metaData())
.put(IndexMetaData.builder(newState.metaData().index(sourceIndexName))
.putRolloverInfo(rolloverInfo))).build();
}
indexAliasesService.indicesAliases(aliasesUpdateRequest,
ActionListener.wrap(aliasClusterStateUpdateResponse -> {
if (aliasClusterStateUpdateResponse.isAcknowledged()) {
clusterService.submitStateUpdateTask("update_rollover_info", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
RolloverInfo rolloverInfo = new RolloverInfo(rolloverRequest.getAlias(), metConditions,
threadPool.absoluteTimeInMillis());
return ClusterState.builder(currentState)
.metaData(MetaData.builder(currentState.metaData())
.put(IndexMetaData.builder(currentState.metaData().index(sourceIndexName))
.putRolloverInfo(rolloverInfo))).build();
}

@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}
@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
activeShardsObserver.waitForActiveShards(new String[]{rolloverIndexName},
rolloverRequest.getCreateIndexRequest().waitForActiveShards(),
rolloverRequest.masterNodeTimeout(),
isShardsAcknowledged -> listener.onResponse(new RolloverResponse(
sourceIndexName, rolloverIndexName, conditionResults, false, true, true,
isShardsAcknowledged)),
listener::onFailure);
}
});
} else {
listener.onResponse(new RolloverResponse(sourceIndexName, rolloverIndexName, conditionResults,
false, true, false, false));
}
}, listener::onFailure));
}, listener::onFailure));
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
activeShardsObserver.waitForActiveShards(new String[]{rolloverIndexName},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't expect it to happen (it would be a bad situation if it did), but it might be a good idea to wrap this in:

if (newState.equals(oldState) == false) {
    ...
}

rolloverRequest.getCreateIndexRequest().waitForActiveShards(),
rolloverRequest.masterNodeTimeout(),
isShardsAcknowledged -> listener.onResponse(new RolloverResponse(
sourceIndexName, rolloverIndexName, conditionResults, false, true, true,
isShardsAcknowledged)),
listener::onFailure);
}
});
} else {
// conditions not met
listener.onResponse(
Expand All @@ -207,27 +191,24 @@ public void onFailure(Exception e) {
);
}

static IndicesAliasesClusterStateUpdateRequest prepareRolloverAliasesUpdateRequest(String oldIndex, String newIndex,
RolloverRequest request) {
final List<AliasAction> actions = List.of(
new AliasAction.Add(newIndex, request.getAlias(), null, null, null, null),
new AliasAction.Remove(oldIndex, request.getAlias()));
return new IndicesAliasesClusterStateUpdateRequest(actions)
.ackTimeout(request.ackTimeout())
.masterNodeTimeout(request.masterNodeTimeout());
}

static IndicesAliasesClusterStateUpdateRequest prepareRolloverAliasesWriteIndexUpdateRequest(String oldIndex, String newIndex,
RolloverRequest request) {
final List<AliasAction> actions = List.of(
/**
* Creates the alias actions to reflect the alias rollover from the old (source) index to the new (target/rolled over) index. An
* alias pointing to multiple indices will have to be an explicit write index (ie. the old index alias has is_write_index set to true)
* in which case, after the rollover, the new index will need to be the explicit write index.
*/
static List<AliasAction> rolloverAliasToNewIndex(String oldIndex, String newIndex, RolloverRequest request,
boolean explicitWriteIndex) {
if (explicitWriteIndex) {
return List.of(
new AliasAction.Add(newIndex, request.getAlias(), null, null, null, true),
new AliasAction.Add(oldIndex, request.getAlias(), null, null, null, false));
return new IndicesAliasesClusterStateUpdateRequest(actions)
.ackTimeout(request.ackTimeout())
.masterNodeTimeout(request.masterNodeTimeout());
} else {
return List.of(
new AliasAction.Add(newIndex, request.getAlias(), null, null, null, null),
new AliasAction.Remove(oldIndex, request.getAlias()));
}
}


static String generateRolloverIndexName(String sourceIndexName, IndexNameExpressionResolver indexNameExpressionResolver) {
String resolvedName = indexNameExpressionResolver.resolveDateMathExpression(sourceIndexName);
final boolean isDateMath = sourceIndexName.equals(resolvedName) == false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ public void onFailure(String source, Exception e) {
* Handles the cluster state transition to a version that reflects the {@link CreateIndexClusterStateUpdateRequest}.
* All the requested changes are firstly validated before mutating the {@link ClusterState}.
*/
ClusterState applyCreateIndexRequest(ClusterState currentState, CreateIndexClusterStateUpdateRequest request) throws Exception {
public ClusterState applyCreateIndexRequest(ClusterState currentState, CreateIndexClusterStateUpdateRequest request) throws Exception {
logger.trace("executing IndexCreationTask for [{}] against cluster state version [{}]", request, currentState.version());
Index createdIndex = null;
String removalExtraInfo = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,15 @@ protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {

@Override
public ClusterState execute(ClusterState currentState) {
return innerExecute(currentState, request.actions());
return applyAliasActions(currentState, request.actions());
}
});
}

ClusterState innerExecute(ClusterState currentState, Iterable<AliasAction> actions) {
/**
* Handles the cluster state transition to a version that reflects the provided {@link AliasAction}s.
*/
public ClusterState applyAliasActions(ClusterState currentState, Iterable<AliasAction> actions) {
List<Index> indicesToClose = new ArrayList<>();
Map<String, IndexService> indices = new HashMap<>();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
Expand Down Expand Up @@ -219,15 +218,13 @@ public void testEvaluateWithoutMetaData() {
results2.forEach((k, v) -> assertFalse(v));
}

public void testCreateUpdateAliasRequest() {
public void testRolloverAliasActions() {
String sourceAlias = randomAlphaOfLength(10);
String sourceIndex = randomAlphaOfLength(10);
String targetIndex = randomAlphaOfLength(10);
final RolloverRequest rolloverRequest = new RolloverRequest(sourceAlias, targetIndex);
final IndicesAliasesClusterStateUpdateRequest updateRequest =
TransportRolloverAction.prepareRolloverAliasesUpdateRequest(sourceIndex, targetIndex, rolloverRequest);

List<AliasAction> actions = updateRequest.actions();
List<AliasAction> actions = TransportRolloverAction.rolloverAliasToNewIndex(sourceIndex, targetIndex, rolloverRequest, false);
assertThat(actions, hasSize(2));
boolean foundAdd = false;
boolean foundRemove = false;
Expand All @@ -246,15 +243,13 @@ public void testCreateUpdateAliasRequest() {
assertTrue(foundRemove);
}

public void testCreateUpdateAliasRequestWithExplicitWriteIndex() {
public void testRolloverAliasActionsWithExplicitWriteIndex() {
String sourceAlias = randomAlphaOfLength(10);
String sourceIndex = randomAlphaOfLength(10);
String targetIndex = randomAlphaOfLength(10);
final RolloverRequest rolloverRequest = new RolloverRequest(sourceAlias, targetIndex);
final IndicesAliasesClusterStateUpdateRequest updateRequest =
TransportRolloverAction.prepareRolloverAliasesWriteIndexUpdateRequest(sourceIndex, targetIndex, rolloverRequest);
List<AliasAction> actions = TransportRolloverAction.rolloverAliasToNewIndex(sourceIndex, targetIndex, rolloverRequest, true);

List<AliasAction> actions = updateRequest.actions();
assertThat(actions, hasSize(2));
boolean foundAddWrite = false;
boolean foundRemoveWrite = false;
Expand Down
Loading