Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove tricky switch in bulk #80624

Merged
merged 5 commits into from
Nov 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ public interface DocWriteRequest<T> extends IndicesRequest, Accountable {
*/
boolean isRequireAlias();

/**
* Finalize the request before executing or routing it.
*/
void process();

/**
* Pick the appropriate shard id to receive this request.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteRequest.OpType;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.admin.indices.create.AutoCreateAction;
Expand Down Expand Up @@ -523,26 +524,14 @@ protected void doRun() {
throw new IllegalArgumentException("only write ops with an op_type of create are allowed in data streams");
}

IndexRouting indexRouting = concreteIndices.routing(concreteIndex);

switch (docWriteRequest.opType()) {
case CREATE:
case INDEX:
prohibitAppendWritesInBackingIndices(docWriteRequest, metadata);
prohibitCustomRoutingOnDataStream(docWriteRequest, metadata);
IndexRequest indexRequest = (IndexRequest) docWriteRequest;
indexRequest.resolveRouting(metadata);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the resolveRouting(...) method can now also be removed from IndexRequest class? (looks this was the only usage).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doh! I had done that when I was working on it and had lost it in the shuffle. I'll add it back

indexRequest.process();
break;
case UPDATE:
docWriteRequest.routing(metadata.resolveWriteIndexRouting(docWriteRequest.routing(), docWriteRequest.index()));
break;
case DELETE:
docWriteRequest.routing(metadata.resolveWriteIndexRouting(docWriteRequest.routing(), docWriteRequest.index()));
break;
default:
throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]");
if (docWriteRequest.opType() == OpType.CREATE || docWriteRequest.opType() == OpType.INDEX) {
prohibitAppendWritesInBackingIndices(docWriteRequest, metadata);
prohibitCustomRoutingOnDataStream(docWriteRequest, metadata);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martijnvg I wonder if it is not a bug that we do not check this for all request types, including update/delete? Maybe better handled in a follow-up, but would be nice to move out of this "if". And perhaps then let prohibitAppendWritesInBackingIndices handle the ops itself to simplify this code (the filter for CREATE/INDEX here is really only partial anyway, so might as well do that inside the method).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think prohibitCustomRoutingOnDataStream() should be checked for all types of request, irregardless whether these target the data stream of one of the backing indices. I think only prohibitAppendWritesInBackingIndices() should be checked in this if statement. I can work on that in a followup.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neat! I was hoping writing it like this would make stuff like this more obvious. I'll leave this to @martijnvg's follow up though.

}
docWriteRequest.routing(metadata.resolveWriteIndexRouting(docWriteRequest.routing(), docWriteRequest.index()));
docWriteRequest.process();

IndexRouting indexRouting = concreteIndices.routing(concreteIndex);
int shardId = docWriteRequest.route(indexRouting);
List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(
new ShardId(concreteIndex, shardId),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ static boolean executeBulkItemRequest(
) throws Exception {
final DocWriteRequest.OpType opType = context.getCurrent().opType();

// Translate update requests into index or delete requests which can be executed directly
final UpdateHelper.Result updateResult;
if (opType == DocWriteRequest.OpType.UPDATE) {
final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent();
Expand All @@ -279,24 +280,14 @@ static boolean executeBulkItemRequest(
context.markAsCompleted(context.getExecutionResult());
return true;
}
// execute translated update request
switch (updateResult.getResponseResult()) {
case CREATED:
case UPDATED:
IndexRequest indexRequest = updateResult.action();
indexRequest.process();
context.setRequestToExecute(indexRequest);
break;
case DELETED:
context.setRequestToExecute(updateResult.action());
break;
case NOOP:
context.markOperationAsNoOp(updateResult.action());
context.markAsCompleted(context.getExecutionResult());
return true;
default:
throw new IllegalStateException("Illegal update operation " + updateResult.getResponseResult());
if (updateResult.getResponseResult() == DocWriteResponse.Result.NOOP) {
context.markOperationAsNoOp(updateResult.action());
context.markAsCompleted(context.getExecutionResult());
return true;
}
DocWriteRequest<?> translated = updateResult.action();
translated.process();
context.setRequestToExecute(translated);
} else {
context.setRequestToExecute(context.getCurrent());
updateResult = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@ public boolean isRequireAlias() {
return false;
}

@Override
public void process() {
// Nothing to do
}

@Override
public int route(IndexRouting indexRouting) {
return indexRouting.deleteShard(id, routing);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.routing.IndexRouting;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
Expand Down Expand Up @@ -588,6 +587,7 @@ public VersionType versionType() {
return this.versionType;
}

@Override
public void process() {
if ("".equals(id)) {
throw new IllegalArgumentException("if _id is specified it must not be empty");
Expand All @@ -604,11 +604,6 @@ public void process() {
}
}

/* resolve the routing if needed */
public void resolveRouting(Metadata metadata) {
routing(metadata.resolveWriteIndexRouting(routing, index));
}

public void checkAutoIdWithOpTypeCreateSupportedByVersion(Version version) {
if (id == null && opType == OpType.CREATE && version.before(Version.V_7_5_0)) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,11 @@ public boolean isRequireAlias() {
return requireAlias;
}

@Override
public void process() {
// Nothing to do
}

@Override
public int route(IndexRouting indexRouting) {
return indexRouting.updateShard(id, routing);
Expand Down