Skip to content

Commit

Permalink
Retry blocking if async indexing is rejected due to queue size
Browse files Browse the repository at this point in the history
Some tests use AbstractIntegrationTest#indexRandom which sometimes uses async
indexing. This can easily run into queue size based rejections on a slow
box. In that case we should retry blocked indexing.
  • Loading branch information
s1monw committed Oct 14, 2013
1 parent df95453 commit 9dc59e2
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 16 deletions.
5 changes: 5 additions & 0 deletions src/main/java/org/elasticsearch/common/collect/Tuple.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,9 @@ public int hashCode() {
result = 31 * result + (v2 != null ? v2.hashCode() : 0);
return result;
}

@Override
public String toString() {
return "Tuple [v1=" + v1 + ", v2=" + v2 + "]";
}
}
64 changes: 48 additions & 16 deletions src/test/java/org/elasticsearch/test/AbstractIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.Iterators;
import org.apache.lucene.util.AbstractRandomizedTest.IntegrationTests;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
Expand Down Expand Up @@ -52,8 +53,10 @@
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.merge.policy.*;
import org.elasticsearch.indices.IndexAlreadyExistsException;
Expand Down Expand Up @@ -495,6 +498,7 @@ public void indexRandom(boolean forceRefresh, IndexRequestBuilder... builders) t
if (builders.length == 0) {
return;
}

Random random = getRandom();
Set<String> indicesSet = new HashSet<String>();
for (int i = 0; i < builders.length; i++) {
Expand All @@ -503,21 +507,21 @@ public void indexRandom(boolean forceRefresh, IndexRequestBuilder... builders) t
final String[] indices = indicesSet.toArray(new String[0]);
List<IndexRequestBuilder> list = Arrays.asList(builders);
Collections.shuffle(list, random);
final CopyOnWriteArrayList<Throwable> errors = new CopyOnWriteArrayList<Throwable>();
final CopyOnWriteArrayList<Tuple<IndexRequestBuilder, Throwable>> errors = new CopyOnWriteArrayList<Tuple<IndexRequestBuilder, Throwable>>();
List<CountDownLatch> latches = new ArrayList<CountDownLatch>();
if (frequently()) {
logger.info("Index [{}] docs async: [{}]", list.size(), true);
final CountDownLatch latch = new CountDownLatch(list.size());
latches.add(latch);
for (IndexRequestBuilder indexRequestBuilder : list) {
indexRequestBuilder.execute(new LatchedActionListener<IndexResponse>(latch, errors));
indexRequestBuilder.execute(new PayloadLatchedActionListener<IndexResponse, IndexRequestBuilder>(indexRequestBuilder, latch, errors));
if (rarely()) {
if (rarely()) {
client().admin().indices().prepareRefresh(indices).setIgnoreIndices(IgnoreIndices.MISSING).execute(new LatchedActionListener<RefreshResponse>(newLatch(latches), errors));
client().admin().indices().prepareRefresh(indices).setIgnoreIndices(IgnoreIndices.MISSING).execute(new LatchedActionListener<RefreshResponse>(newLatch(latches)));
} else if (rarely()) {
client().admin().indices().prepareFlush(indices).setIgnoreIndices(IgnoreIndices.MISSING).execute(new LatchedActionListener<FlushResponse>(newLatch(latches), errors));
client().admin().indices().prepareFlush(indices).setIgnoreIndices(IgnoreIndices.MISSING).execute(new LatchedActionListener<FlushResponse>(newLatch(latches)));
} else if (rarely()) {
client().admin().indices().prepareOptimize(indices).setIgnoreIndices(IgnoreIndices.MISSING).setMaxNumSegments(between(1, 10)).setFlush(random.nextBoolean()).execute(new LatchedActionListener<OptimizeResponse>(newLatch(latches), errors));
client().admin().indices().prepareOptimize(indices).setIgnoreIndices(IgnoreIndices.MISSING).setMaxNumSegments(between(1, 10)).setFlush(random.nextBoolean()).execute(new LatchedActionListener<OptimizeResponse>(newLatch(latches)));
}
}
}
Expand All @@ -528,19 +532,27 @@ public void indexRandom(boolean forceRefresh, IndexRequestBuilder... builders) t
indexRequestBuilder.execute().actionGet();
if (rarely()) {
if (rarely()) {
client().admin().indices().prepareRefresh(indices).setIgnoreIndices(IgnoreIndices.MISSING).execute(new LatchedActionListener<RefreshResponse>(newLatch(latches), errors));
client().admin().indices().prepareRefresh(indices).setIgnoreIndices(IgnoreIndices.MISSING).execute(new LatchedActionListener<RefreshResponse>(newLatch(latches)));
} else if (rarely()) {
client().admin().indices().prepareFlush(indices).setIgnoreIndices(IgnoreIndices.MISSING).execute(new LatchedActionListener<FlushResponse>(newLatch(latches), errors));
client().admin().indices().prepareFlush(indices).setIgnoreIndices(IgnoreIndices.MISSING).execute(new LatchedActionListener<FlushResponse>(newLatch(latches)));
} else if (rarely()) {
client().admin().indices().prepareOptimize(indices).setIgnoreIndices(IgnoreIndices.MISSING).setMaxNumSegments(between(1, 10)).setFlush(random.nextBoolean()).execute(new LatchedActionListener<OptimizeResponse>(newLatch(latches), errors));
client().admin().indices().prepareOptimize(indices).setIgnoreIndices(IgnoreIndices.MISSING).setMaxNumSegments(between(1, 10)).setFlush(random.nextBoolean()).execute(new LatchedActionListener<OptimizeResponse>(newLatch(latches)));
}
}
}
}
for (CountDownLatch countDownLatch : latches) {
countDownLatch.await();
}
assertThat(errors, emptyIterable());
final List<Throwable> actualErrors = new ArrayList<Throwable>();
for (Tuple<IndexRequestBuilder, Throwable> tuple : errors) {
if (ExceptionsHelper.unwrapCause(tuple.v2()) instanceof EsRejectedExecutionException) {
tuple.v1().execute().actionGet(); // re-index if rejected
} else {
actualErrors.add(tuple.v2());
}
}
assertThat(actualErrors, emptyIterable());
if (forceRefresh) {
assertNoFailures(client().admin().indices().prepareRefresh(indices).setIgnoreIndices(IgnoreIndices.MISSING).execute().get());
}
Expand All @@ -551,29 +563,49 @@ private static final CountDownLatch newLatch(List<CountDownLatch> latches) {
latches.add(l);
return l;
}



private static class LatchedActionListener<Response> implements ActionListener<Response> {
private class LatchedActionListener<Response> implements ActionListener<Response> {
private final CountDownLatch latch;
private final CopyOnWriteArrayList<Throwable> errors;

public LatchedActionListener(CountDownLatch latch, CopyOnWriteArrayList<Throwable> errors) {
public LatchedActionListener(CountDownLatch latch) {
this.latch = latch;
this.errors = errors;
}

@Override
public void onResponse(Response response) {
public final void onResponse(Response response) {
latch.countDown();
}

@Override
public void onFailure(Throwable e) {
public final void onFailure(Throwable t) {
try {
errors.add(e);
logger.info("Action Failed", t);
addError(t);
} finally {
latch.countDown();
}
}

protected void addError(Throwable t) {
}

}

private class PayloadLatchedActionListener<Response, T> extends LatchedActionListener<Response> {
private final CopyOnWriteArrayList<Tuple<T, Throwable>> errors;
private final T builder;

public PayloadLatchedActionListener(T builder, CountDownLatch latch, CopyOnWriteArrayList<Tuple<T, Throwable>> errors) {
super(latch);
this.errors = errors;
this.builder = builder;
}

protected void addError(Throwable t) {
errors.add(new Tuple<T, Throwable>(builder, t));
}

}

Expand Down

0 comments on commit 9dc59e2

Please sign in to comment.