Skip to content

Commit

Permalink
Refactored TransportIndexReplicationOperationAction to be able to exp…
Browse files Browse the repository at this point in the history
…ose the shard id related to a shard failure

The `ShardOperationFailedException` is now created within `TransportIndexReplicationAction` passing in the current shard id as a constructor argument.
Also replaced `AtomicReferenceArray<Object>` with `AtomicReferenceArray<ShardActionResult>`, where `ShardActionResult` wraps the `ShardResponse` or the failure, containing all the needed info.
  • Loading branch information
javanna committed Feb 26, 2014
1 parent 6f8f33d commit 4f32ead
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ public class IndexDeleteResponse extends ActionResponse {
private int failedShards;
private ShardDeleteResponse[] deleteResponses;

IndexDeleteResponse(String index, int successfulShards, int failedShards, ShardDeleteResponse[] deleteResponses) {
IndexDeleteResponse(String index, int failedShards, ShardDeleteResponse[] deleteResponses) {
this.index = index;
this.successfulShards = successfulShards;
this.successfulShards = deleteResponses.length;
this.failedShards = failedShards;
this.deleteResponses = deleteResponses;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.delete.index;

import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.replication.TransportIndexReplicationOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -30,8 +31,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.List;

/**
*
Expand All @@ -50,19 +50,8 @@ protected IndexDeleteRequest newRequestInstance() {
}

@Override
protected IndexDeleteResponse newResponseInstance(IndexDeleteRequest request, AtomicReferenceArray shardsResponses) {
int successfulShards = 0;
int failedShards = 0;
ArrayList<ShardDeleteResponse> responses = new ArrayList<ShardDeleteResponse>();
for (int i = 0; i < shardsResponses.length(); i++) {
if (shardsResponses.get(i) == null) {
failedShards++;
} else {
responses.add((ShardDeleteResponse) shardsResponses.get(i));
successfulShards++;
}
}
return new IndexDeleteResponse(request.index(), successfulShards, failedShards, responses.toArray(new ShardDeleteResponse[responses.size()]));
protected IndexDeleteResponse newResponseInstance(IndexDeleteRequest request, List<ShardDeleteResponse> shardDeleteResponses, int failuresCount, List<ShardOperationFailedException> shardFailures) {
return new IndexDeleteResponse(request.index(), failuresCount, shardDeleteResponses.toArray(new ShardDeleteResponse[shardDeleteResponses.size()]));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.elasticsearch.action.deletebyquery;

import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.replication.TransportIndexReplicationOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -32,9 +31,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceArray;

/**
*
Expand All @@ -53,20 +50,8 @@ protected IndexDeleteByQueryRequest newRequestInstance() {
}

@Override
protected IndexDeleteByQueryResponse newResponseInstance(IndexDeleteByQueryRequest request, AtomicReferenceArray shardsResponses) {
int successfulShards = 0;
int failedShards = 0;
List<ShardOperationFailedException> failures = new ArrayList<ShardOperationFailedException>(3);
for (int i = 0; i < shardsResponses.length(); i++) {
Object shardResponse = shardsResponses.get(i);
if (shardResponse instanceof Throwable) {
failedShards++;
failures.add(new DefaultShardOperationFailedException(request.index(), -1, (Throwable) shardResponse));
} else {
successfulShards++;
}
}
return new IndexDeleteByQueryResponse(request.index(), successfulShards, failedShards, failures);
protected IndexDeleteByQueryResponse newResponseInstance(IndexDeleteByQueryRequest request, List<ShardDeleteByQueryResponse> shardDeleteByQueryResponses, int failuresCount, List<ShardOperationFailedException> shardFailures) {
return new IndexDeleteByQueryResponse(request.index(), shardDeleteByQueryResponses.size(), failuresCount, shardFailures);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@

package org.elasticsearch.action.support.replication;

import com.google.common.collect.Lists;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -36,6 +39,7 @@
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;

Expand Down Expand Up @@ -81,8 +85,9 @@ protected void doExecute(final Request request, final ActionListener<Response> l
return;
}
final AtomicInteger indexCounter = new AtomicInteger();
final AtomicInteger failureCounter = new AtomicInteger();
final AtomicInteger completionCounter = new AtomicInteger(groups.size());
final AtomicReferenceArray<Object> shardsResponses = new AtomicReferenceArray<Object>(groups.size());
final AtomicReferenceArray<ShardActionResult> shardsResponses = new AtomicReferenceArray<ShardActionResult>(groups.size());

for (final ShardIterator shardIt : groups) {
ShardRequest shardRequest = newShardRequestInstance(request, shardIt.shardId().id());
Expand All @@ -96,20 +101,41 @@ protected void doExecute(final Request request, final ActionListener<Response> l
shardAction.execute(shardRequest, new ActionListener<ShardResponse>() {
@Override
public void onResponse(ShardResponse result) {
shardsResponses.set(indexCounter.getAndIncrement(), result);
if (completionCounter.decrementAndGet() == 0) {
listener.onResponse(newResponseInstance(request, shardsResponses));
}
shardsResponses.set(indexCounter.getAndIncrement(), new ShardActionResult(result));
returnIfNeeded();
}

@Override
public void onFailure(Throwable e) {
failureCounter.getAndIncrement();
int index = indexCounter.getAndIncrement();
if (accumulateExceptions()) {
shardsResponses.set(index, e);
shardsResponses.set(index, new ShardActionResult(
new DefaultShardOperationFailedException(request.index, shardIt.shardId().id(), e)));
}
returnIfNeeded();
}

private void returnIfNeeded() {
if (completionCounter.decrementAndGet() == 0) {
listener.onResponse(newResponseInstance(request, shardsResponses));
List<ShardResponse> responses = Lists.newArrayList();
List<ShardOperationFailedException> failures = Lists.newArrayList();
for (int i = 0; i < shardsResponses.length(); i++) {
ShardActionResult shardActionResult = shardsResponses.get(i);
if (shardActionResult == null) {
assert !accumulateExceptions();
continue;
}
if (shardActionResult.isFailure()) {
assert accumulateExceptions() && shardActionResult.shardFailure != null;
failures.add(shardActionResult.shardFailure);
} else {
responses.add(shardActionResult.shardResponse);
}
}

assert failures.size() == 0 || failures.size() == failureCounter.get();
listener.onResponse(newResponseInstance(request, responses, failureCounter.get(), failures));
}
}
});
Expand All @@ -118,7 +144,7 @@ public void onFailure(Throwable e) {

protected abstract Request newRequestInstance();

protected abstract Response newResponseInstance(Request request, AtomicReferenceArray shardsResponses);
protected abstract Response newResponseInstance(Request request, List<ShardResponse> shardResponses, int failuresCount, List<ShardOperationFailedException> shardFailures);

protected abstract String transportAction();

Expand All @@ -132,6 +158,28 @@ public void onFailure(Throwable e) {

protected abstract ClusterBlockException checkRequestBlock(ClusterState state, Request request);

private class ShardActionResult {

private final ShardResponse shardResponse;
private final ShardOperationFailedException shardFailure;

private ShardActionResult(ShardResponse shardResponse) {
assert shardResponse != null;
this.shardResponse = shardResponse;
this.shardFailure = null;
}

private ShardActionResult(ShardOperationFailedException shardOperationFailedException) {
assert shardOperationFailedException != null;
this.shardFailure = shardOperationFailedException;
this.shardResponse = null;
}

boolean isFailure() {
return shardFailure != null;
}
}

private class TransportHandler extends BaseTransportRequestHandler<Request> {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.deleteByQuery;

import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
import org.elasticsearch.action.search.SearchResponse;
Expand All @@ -27,12 +28,10 @@
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Assert;
import org.junit.Test;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.*;

public class DeleteByQueryTests extends ElasticsearchIntegrationTest {

Expand Down Expand Up @@ -82,9 +81,10 @@ public void testMissing() {
deleteByQueryRequestBuilder.setQuery(QueryBuilders.matchAllQuery());

try {
DeleteByQueryResponse actionGet = deleteByQueryRequestBuilder.execute().actionGet();
Assert.fail("Exception should have been thrown.");
deleteByQueryRequestBuilder.execute().actionGet();
fail("Exception should have been thrown.");
} catch (IndexMissingException e) {
//everything well
}

deleteByQueryRequestBuilder.setIndicesOptions(IndicesOptions.lenient());
Expand All @@ -109,6 +109,14 @@ public void testFailure() throws Exception {
assertThat(response.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(response.getIndex("twitter").getSuccessfulShards(), equalTo(0));
assertThat(response.getIndex("twitter").getFailedShards(), equalTo(5));
assertThat(response.getIndices().size(), equalTo(1));
assertThat(response.getIndices().get("twitter").getFailedShards(), equalTo(5));
assertThat(response.getIndices().get("twitter").getFailures().length, equalTo(5));
for (ShardOperationFailedException failure : response.getIndices().get("twitter").getFailures()) {
assertThat(failure.reason(), containsString("[twitter] [has_child] No mapping for for type [type]"));
assertThat(failure.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(failure.shardId(), greaterThan(-1));
}
}

@Test
Expand Down

0 comments on commit 4f32ead

Please sign in to comment.