diff --git a/config/logging.yml b/config/logging.yml index e87252e8ed106..a08a2c224a2b9 100644 --- a/config/logging.yml +++ b/config/logging.yml @@ -1,6 +1,8 @@ rootLogger: INFO, console, file logger: jgroups: WARN + # log action execution errors for easier debugging + action : DEBUG appender: console: diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/NoShardAvailableActionException.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/NoShardAvailableActionException.java index 41dedc96980bb..19976e4de6ab6 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/NoShardAvailableActionException.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/NoShardAvailableActionException.java @@ -27,6 +27,10 @@ */ public class NoShardAvailableActionException extends IndexShardException { + public NoShardAvailableActionException(ShardId shardId, String msg) { + super(shardId, msg); + } + public NoShardAvailableActionException(ShardId shardId, String msg, Throwable cause) { super(shardId, msg, cause); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/broadcast/BroadcastPingResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/broadcast/BroadcastPingResponse.java index 9c99bf5889f95..58dc6005901d4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/broadcast/BroadcastPingResponse.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/broadcast/BroadcastPingResponse.java @@ -36,8 +36,8 @@ public class BroadcastPingResponse extends BroadcastOperationResponse { } - public BroadcastPingResponse(int successfulShards, int failedShards, List shardFailures) { - super(successfulShards, failedShards, shardFailures); + public BroadcastPingResponse(int totalShards, int successfulShards, int failedShards, List shardFailures) { + super(totalShards, successfulShards, failedShards, shardFailures); } @Override public void readFrom(StreamInput in) throws IOException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/broadcast/TransportBroadcastPingAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/broadcast/TransportBroadcastPingAction.java index 10f2edf110d85..fc281fe5af61f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/broadcast/TransportBroadcastPingAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/broadcast/TransportBroadcastPingAction.java @@ -83,7 +83,7 @@ public class TransportBroadcastPingAction extends TransportBroadcastOperationAct successfulShards++; } } - return new BroadcastPingResponse(successfulShards, failedShards, shardFailures); + return new BroadcastPingResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures); } @Override protected BroadcastShardPingRequest newShardRequest() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/ShardReplicationPingRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/ShardReplicationPingRequest.java index 05118dcd28c71..e317ef09925aa 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/ShardReplicationPingRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/ShardReplicationPingRequest.java @@ -58,4 +58,8 @@ public int shardId() { super.writeTo(out); out.writeVInt(shardId); } + + @Override public String toString() { + return "[" + index + "][" + shardId + "]"; + } } \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/ClearIndicesCacheResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/ClearIndicesCacheResponse.java index 06acdd2018a44..d6038f2269ac1 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/ClearIndicesCacheResponse.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/ClearIndicesCacheResponse.java @@ -38,8 +38,8 @@ public class ClearIndicesCacheResponse extends BroadcastOperationResponse { } - ClearIndicesCacheResponse(int successfulShards, int failedShards, List shardFailures) { - super(successfulShards, failedShards, shardFailures); + ClearIndicesCacheResponse(int totalShards, int successfulShards, int failedShards, List shardFailures) { + super(totalShards, successfulShards, failedShards, shardFailures); } @Override public void readFrom(StreamInput in) throws IOException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java index 54407b6c14acf..6842cbb87f4ba 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java @@ -66,6 +66,10 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio return new ClearIndicesCacheRequest(); } + @Override protected boolean ignoreNonActiveExceptions() { + return true; + } + @Override protected ClearIndicesCacheResponse newResponse(ClearIndicesCacheRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) { int successfulShards = 0; int failedShards = 0; @@ -73,7 +77,7 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio for (int i = 0; i < shardsResponses.length(); i++) { Object shardResponse = shardsResponses.get(i); if (shardResponse == null) { - failedShards++; + // simply ignore non active shards } else if (shardResponse instanceof BroadcastShardOperationFailedException) { failedShards++; if (shardFailures == null) { @@ -84,7 +88,7 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio successfulShards++; } } - return new ClearIndicesCacheResponse(successfulShards, failedShards, shardFailures); + return new ClearIndicesCacheResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures); } @Override protected ShardClearIndicesCacheRequest newShardRequest() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushResponse.java index c66197615cbcc..5208af8e155c9 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushResponse.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushResponse.java @@ -38,8 +38,8 @@ public class FlushResponse extends BroadcastOperationResponse { } - FlushResponse(int successfulShards, int failedShards, List shardFailures) { - super(successfulShards, failedShards, shardFailures); + FlushResponse(int totalShards, int successfulShards, int failedShards, List shardFailures) { + super(totalShards, successfulShards, failedShards, shardFailures); } @Override public void readFrom(StreamInput in) throws IOException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java index bb4b383b2bce0..c7d801b428311 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java @@ -65,6 +65,10 @@ public class TransportFlushAction extends TransportBroadcastOperationAction shardFailures) { - super(successfulShards, failedShards, shardFailures); + OptimizeResponse(int totalShards, int successfulShards, int failedShards, List shardFailures) { + super(totalShards, successfulShards, failedShards, shardFailures); } @Override public void readFrom(StreamInput in) throws IOException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/optimize/TransportOptimizeAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/optimize/TransportOptimizeAction.java index 7468417f8c7cc..9082829897897 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/optimize/TransportOptimizeAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/optimize/TransportOptimizeAction.java @@ -66,6 +66,10 @@ public class TransportOptimizeAction extends TransportBroadcastOperationAction shardFailures) { - super(successfulShards, failedShards, shardFailures); + RefreshResponse(int totalShards, int successfulShards, int failedShards, List shardFailures) { + super(totalShards, successfulShards, failedShards, shardFailures); } @Override public void readFrom(StreamInput in) throws IOException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java index 21331dae70b75..0a0bbb2cb41d0 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java @@ -66,6 +66,10 @@ public class TransportRefreshAction extends TransportBroadcastOperationAction shardFailures) { - super(successfulShards, failedShards, shardFailures); + IndicesStatusResponse(ShardStatus[] shards, ClusterState clusterState, int totalShards, int successfulShards, int failedShards, List shardFailures) { + super(totalShards, successfulShards, failedShards, shardFailures); this.shards = shards; indicesSettings = newHashMap(); for (ShardStatus shard : shards) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java index 65a076003069e..59b55d28b95b1 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java @@ -45,7 +45,7 @@ import static com.google.common.collect.Lists.*; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class TransportIndicesStatusAction extends TransportBroadcastOperationAction { @@ -65,6 +65,10 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct return new IndicesStatusRequest(); } + @Override protected boolean ignoreNonActiveExceptions() { + return true; + } + @Override protected IndicesStatusResponse newResponse(IndicesStatusRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) { int successfulShards = 0; int failedShards = 0; @@ -73,7 +77,7 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct for (int i = 0; i < shardsResponses.length(); i++) { Object shardResponse = shardsResponses.get(i); if (shardResponse == null) { - failedShards++; + // simply ignore non active shards } else if (shardResponse instanceof BroadcastShardOperationFailedException) { failedShards++; if (shardFailures == null) { @@ -85,7 +89,7 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct successfulShards++; } } - return new IndicesStatusResponse(shards.toArray(new ShardStatus[shards.size()]), clusterState, successfulShards, failedShards, shardFailures); + return new IndicesStatusResponse(shards.toArray(new ShardStatus[shards.size()]), clusterState, shardsResponses.length(), successfulShards, failedShards, shardFailures); } @Override protected IndexShardStatusRequest newShardRequest() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/CountRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/CountRequest.java index 76e93fb5c4e86..ce45953adc140 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/CountRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/CountRequest.java @@ -201,6 +201,6 @@ public CountRequest types(String... types) { } @Override public String toString() { - return "[" + Arrays.toString(indices) + "][" + Arrays.toString(types) + "], querySource[" + Unicode.fromBytes(querySource) + "]"; + return "[" + Arrays.toString(indices) + "]" + Arrays.toString(types) + ", querySource[" + Unicode.fromBytes(querySource) + "]"; } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/CountResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/CountResponse.java index c562d23db3da2..1ab7c19ebcf5d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/CountResponse.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/CountResponse.java @@ -40,8 +40,8 @@ public class CountResponse extends BroadcastOperationResponse { } - CountResponse(long count, int successfulShards, int failedShards, List shardFailures) { - super(successfulShards, failedShards, shardFailures); + CountResponse(long count, int totalShards, int successfulShards, int failedShards, List shardFailures) { + super(totalShards, successfulShards, failedShards, shardFailures); this.count = count; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/TransportCountAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/TransportCountAction.java index 4ff9834580381..582abed2c9044 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/TransportCountAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/TransportCountAction.java @@ -98,7 +98,7 @@ public class TransportCountAction extends TransportBroadcastOperationAction shardsIts = indicesService.searchShards(clusterState, request.indices(), request.queryHint()); expectedSuccessfulOps = shardsIts.size(); - expectedTotalOps = shardsIts.totalSize(); + expectedTotalOps = shardsIts.totalSizeActive(); } public void start() { // count the local operations, and perform the non local ones int localOperations = 0; for (final ShardsIterator shardIt : shardsIts) { - final ShardRouting shard = shardIt.next(); - if (shard.active()) { + final ShardRouting shard = shardIt.nextActiveOrNull(); + if (shard != null) { if (shard.currentNodeId().equals(nodes.localNodeId())) { localOperations++; } else { @@ -122,7 +122,7 @@ public void start() { performFirstPhase(shardIt.reset()); } } else { - // as if we have a "problem", so we iterate to the next one and maintain counts + // really, no shards active in this group onFirstPhaseResult(shard, shardIt, null); } } @@ -132,8 +132,8 @@ public void start() { threadPool.execute(new Runnable() { @Override public void run() { for (final ShardsIterator shardIt : shardsIts) { - final ShardRouting shard = shardIt.reset().next(); - if (shard.active()) { + final ShardRouting shard = shardIt.reset().nextActiveOrNull(); + if (shard != null) { if (shard.currentNodeId().equals(nodes.localNodeId())) { performFirstPhase(shardIt.reset()); } @@ -144,8 +144,8 @@ public void start() { } else { boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD; for (final ShardsIterator shardIt : shardsIts) { - final ShardRouting shard = shardIt.reset().next(); - if (shard.active()) { + final ShardRouting shard = shardIt.reset().nextActiveOrNull(); + if (shard != null) { if (shard.currentNodeId().equals(nodes.localNodeId())) { if (localAsync) { threadPool.execute(new Runnable() { @@ -163,10 +163,10 @@ public void start() { } } - private void performFirstPhase(final Iterator shardIt) { - final ShardRouting shard = shardIt.next(); - if (!shard.active()) { - // as if we have a "problem", so we iterate to the next one and maintain counts + private void performFirstPhase(final ShardsIterator shardIt) { + final ShardRouting shard = shardIt.nextActiveOrNull(); + if (shard == null) { + // no more active shards... (we should not really get here, but just for safety) onFirstPhaseResult(shard, shardIt, null); } else { Node node = nodes.get(shard.currentNodeId()); @@ -182,13 +182,13 @@ private void performFirstPhase(final Iterator shardIt) { } } - private void onFirstPhaseResult(ShardRouting shard, FirstResult result, Iterator shardIt) { + private void onFirstPhaseResult(ShardRouting shard, FirstResult result, ShardsIterator shardIt) { processFirstPhaseResult(shard, result); // increment all the "future" shards to update the total ops since we some may work and some may not... // and when that happens, we break on total ops, so we must maintain them - while (shardIt.hasNext()) { + while (shardIt.hasNextActive()) { totalOps.incrementAndGet(); - shardIt.next(); + shardIt.nextActive(); } if (successulOps.incrementAndGet() == expectedSuccessfulOps || totalOps.incrementAndGet() == expectedTotalOps) { @@ -200,15 +200,25 @@ private void onFirstPhaseResult(ShardRouting shard, FirstResult result, Iterator } } - private void onFirstPhaseResult(ShardRouting shard, final Iterator shardIt, Throwable t) { - if (logger.isDebugEnabled()) { - if (t != null) { - logger.debug(shard.shortSummary() + ": Failed to search [" + request + "]", t); - } - } + private void onFirstPhaseResult(ShardRouting shard, final ShardsIterator shardIt, Throwable t) { if (totalOps.incrementAndGet() == expectedTotalOps) { + // e is null when there is no next active.... + if (logger.isDebugEnabled()) { + if (t != null) { + if (shard != null) { + logger.debug(shard.shortSummary() + ": Failed to execute [" + request + "]", t); + } else { + logger.debug(shardIt.shardId() + ": Failed to execute [" + request + "]", t); + } + } + } // no more shards, add a failure - shardFailures.add(new ShardSearchFailure(t)); + if (t == null) { + // no active shards + shardFailures.add(new ShardSearchFailure("No active shards", new SearchShardTarget(null, shardIt.shardId().index().name(), shardIt.shardId().id()))); + } else { + shardFailures.add(new ShardSearchFailure(t)); + } if (successulOps.get() == 0) { // no successful ops, raise an exception invokeListener(new SearchPhaseExecutionException(firstPhaseName(), "total failure", buildShardFailures())); @@ -220,11 +230,36 @@ private void onFirstPhaseResult(ShardRouting shard, final Iterator } } } else { - if (shardIt.hasNext()) { + if (shardIt.hasNextActive()) { + // trace log this exception + if (logger.isTraceEnabled()) { + if (t != null) { + if (shard != null) { + logger.trace(shard.shortSummary() + ": Failed to execute [" + request + "]", t); + } else { + logger.trace(shardIt.shardId() + ": Failed to execute [" + request + "]", t); + } + } + } performFirstPhase(shardIt); } else { - // no more shards, add a failure - shardFailures.add(new ShardSearchFailure(t)); + // no more shards active, add a failure + // e is null when there is no next active.... + if (logger.isDebugEnabled()) { + if (t != null) { + if (shard != null) { + logger.debug(shard.shortSummary() + ": Failed to execute [" + request + "]", t); + } else { + logger.debug(shardIt.shardId() + ": Failed to execute [" + request + "]", t); + } + } + } + if (t == null) { + // no active shards + shardFailures.add(new ShardSearchFailure("No active shards", new SearchShardTarget(null, shardIt.shardId().index().name(), shardIt.shardId().id()))); + } else { + shardFailures.add(new ShardSearchFailure(t)); + } } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastOperationResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastOperationResponse.java index 1db94073dc5b3..1bbae5975e8a6 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastOperationResponse.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastOperationResponse.java @@ -38,6 +38,8 @@ */ public abstract class BroadcastOperationResponse implements ActionResponse { + private int totalShards; + private int successfulShards; private int failedShards; @@ -47,7 +49,8 @@ public abstract class BroadcastOperationResponse implements ActionResponse { protected BroadcastOperationResponse() { } - protected BroadcastOperationResponse(int successfulShards, int failedShards, List shardFailures) { + protected BroadcastOperationResponse(int totalShards, int successfulShards, int failedShards, List shardFailures) { + this.totalShards = totalShards; this.successfulShards = successfulShards; this.failedShards = failedShards; this.shardFailures = shardFailures; @@ -60,7 +63,7 @@ protected BroadcastOperationResponse(int successfulShards, int failedShards, Lis * The total shards this request ran against. */ public int totalShards() { - return successfulShards + failedShards; + return totalShards; } /** @@ -88,6 +91,7 @@ public List shardFailures() { } @Override public void readFrom(StreamInput in) throws IOException { + totalShards = in.readVInt(); successfulShards = in.readVInt(); failedShards = in.readVInt(); int size = in.readVInt(); @@ -100,6 +104,7 @@ public List shardFailures() { } @Override public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(totalShards); out.writeVInt(successfulShards); out.writeVInt(failedShards); out.writeVInt(shardFailures.size()); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastShardOperationFailedException.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastShardOperationFailedException.java index 99e325f13a4ee..52fc31da7f07a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastShardOperationFailedException.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastShardOperationFailedException.java @@ -26,7 +26,7 @@ /** * An exception indicating that a failure occurred performing an operation on the shard. * - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class BroadcastShardOperationFailedException extends IndexShardException implements ElasticSearchWrapperException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java index 9a75876b49c82..585a8270ffdb3 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java @@ -42,7 +42,6 @@ import org.elasticsearch.util.settings.Settings; import java.io.IOException; -import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReferenceArray; @@ -96,10 +95,14 @@ protected TransportBroadcastOperationAction(Settings settings, ThreadPool thread protected abstract GroupShardsIterator shards(Request request, ClusterState clusterState); - private boolean accumulateExceptions() { + protected boolean accumulateExceptions() { return true; } + protected boolean ignoreNonActiveExceptions() { + return false; + } + class AsyncBroadcastAction { private final Request request; @@ -145,8 +148,8 @@ public void start() { // count the local operations, and perform the non local ones int localOperations = 0; for (final ShardsIterator shardIt : shardsIts) { - final ShardRouting shard = shardIt.next(); - if (shard.active()) { + final ShardRouting shard = shardIt.nextActiveOrNull(); + if (shard != null) { if (shard.currentNodeId().equals(nodes.localNodeId())) { localOperations++; } else { @@ -154,8 +157,8 @@ public void start() { performOperation(shardIt.reset(), true); } } else { - // as if we have a "problem", so we iterate to the next one and maintain counts - onOperation(shard, shardIt, new BroadcastShardOperationFailedException(shard.shardId(), "Not active"), false); + // really, no shards active in this group + onOperation(shard, shardIt, null, false); } } // we have local operations, perform them now @@ -164,8 +167,8 @@ public void start() { threadPool.execute(new Runnable() { @Override public void run() { for (final ShardsIterator shardIt : shardsIts) { - final ShardRouting shard = shardIt.reset().next(); - if (shard.active()) { + final ShardRouting shard = shardIt.reset().nextActiveOrNull(); + if (shard != null) { if (shard.currentNodeId().equals(nodes.localNodeId())) { performOperation(shardIt.reset(), false); } @@ -176,8 +179,8 @@ public void start() { } else { boolean localAsync = request.operationThreading() == BroadcastOperationThreading.THREAD_PER_SHARD; for (final ShardsIterator shardIt : shardsIts) { - final ShardRouting shard = shardIt.reset().next(); - if (shard.active()) { + final ShardRouting shard = shardIt.reset().nextActiveOrNull(); + if (shard != null) { if (shard.currentNodeId().equals(nodes.localNodeId())) { performOperation(shardIt.reset(), localAsync); } @@ -187,11 +190,11 @@ public void start() { } } - private void performOperation(final Iterator shardIt, boolean localAsync) { - final ShardRouting shard = shardIt.next(); - if (!shard.active()) { - // as if we have a "problem", so we iterate to the next one and maintain counts - onOperation(shard, shardIt, new BroadcastShardOperationFailedException(shard.shardId(), "Not Active"), false); + private void performOperation(final ShardsIterator shardIt, boolean localAsync) { + final ShardRouting shard = shardIt.nextActiveOrNull(); + if (shard == null) { + // no more active shards... (we should not really get here, just safety) + onOperation(shard, shardIt, null, false); } else { final ShardRequest shardRequest = newShardRequest(shard, request); if (shard.currentNodeId().equals(nodes.localNodeId())) { @@ -223,8 +226,8 @@ private void performOperation(final Iterator shardIt, boolean loca onOperation(shard, response, false); } - @Override public void handleException(RemoteTransportException exp) { - onOperation(shard, shardIt, exp, false); + @Override public void handleException(RemoteTransportException e) { + onOperation(shard, shardIt, e, false); } @Override public boolean spawn() { @@ -236,32 +239,54 @@ private void performOperation(final Iterator shardIt, boolean loca } } - @SuppressWarnings({"unchecked"}) private void onOperation(ShardRouting shard, ShardResponse response, boolean alreadyThreaded) { + @SuppressWarnings({"unchecked"}) + private void onOperation(ShardRouting shard, ShardResponse response, boolean alreadyThreaded) { shardsResponses.set(indexCounter.getAndIncrement(), response); if (expectedOps == counterOps.incrementAndGet()) { finishHim(alreadyThreaded); } } - @SuppressWarnings({"unchecked"}) private void onOperation(ShardRouting shard, final Iterator shardIt, Exception e, boolean alreadyThreaded) { - if (logger.isDebugEnabled()) { - if (e != null) { - logger.debug(shard.shortSummary() + ": Failed to execute [" + request + "]", e); + @SuppressWarnings({"unchecked"}) + private void onOperation(ShardRouting shard, final ShardsIterator shardIt, Throwable t, boolean alreadyThreaded) { + if (!shardIt.hasNextActive()) { + // e is null when there is no next active.... + if (logger.isDebugEnabled()) { + if (t != null) { + if (shard != null) { + logger.debug(shard.shortSummary() + ": Failed to execute [" + request + "]", t); + } else { + logger.debug(shardIt.shardId() + ": Failed to execute [" + request + "]", t); + } + } } - } - if (!shardIt.hasNext()) { - // no more shards in this partition + // no more shards in this group int index = indexCounter.getAndIncrement(); if (accumulateExceptions()) { - if (!(e instanceof BroadcastShardOperationFailedException)) { - e = new BroadcastShardOperationFailedException(shard.shardId(), e); + if (t == null) { + if (!ignoreNonActiveExceptions()) { + t = new BroadcastShardOperationFailedException(shardIt.shardId(), "No active shard(s)"); + } + } else if (!(t instanceof BroadcastShardOperationFailedException)) { + t = new BroadcastShardOperationFailedException(shardIt.shardId(), t); } - shardsResponses.set(index, e); + shardsResponses.set(index, t); } if (expectedOps == counterOps.incrementAndGet()) { finishHim(alreadyThreaded); } return; + } else { + // trace log this exception + if (logger.isTraceEnabled()) { + if (t != null) { + if (shard != null) { + logger.trace(shard.shortSummary() + ": Failed to execute [" + request + "]", t); + } else { + logger.trace(shardIt.shardId() + ": Failed to execute [" + request + "]", t); + } + } + } } // we are not threaded here if we got here from the transport // or we possibly threaded if we got from a local threaded one, diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/ShardReplicationOperationRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/ShardReplicationOperationRequest.java index 28a5a17ba0cf1..86fa610284216 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/ShardReplicationOperationRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/ShardReplicationOperationRequest.java @@ -31,7 +31,7 @@ import static org.elasticsearch.action.Actions.*; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public abstract class ShardReplicationOperationRequest implements ActionRequest { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java index 27d245fd6a12e..b50c02e9dcac5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java @@ -330,6 +330,9 @@ private void performOnPrimary(int primaryShardId, boolean fromDiscoveryListener, // still in recovery, retry (we know that its not UNASSIGNED OR INITIALIZING since we are checking it in the calling method) retryPrimary(fromDiscoveryListener, shard); } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug(shard.shortSummary() + ": Failed to execute [" + request + "]", e); + } listener.onFailure(new ReplicationShardOperationFailedException(shards.shardId(), e)); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/single/TransportSingleOperationAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/single/TransportSingleOperationAction.java index 1e8253ebed6eb..c9299800119a5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/single/TransportSingleOperationAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/single/TransportSingleOperationAction.java @@ -39,10 +39,9 @@ import org.elasticsearch.util.settings.Settings; import java.io.IOException; -import java.util.Iterator; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public abstract class TransportSingleOperationAction extends BaseAction { @@ -83,9 +82,7 @@ private class AsyncSingleAction { private final ActionListener listener; - private final ShardsIterator shards; - - private Iterator shardsIt; + private final ShardsIterator shardsIt; private final Request request; @@ -102,18 +99,17 @@ private AsyncSingleAction(Request request, ActionListener listener) { // update to the concrete shard to use request.index(clusterState.metaData().concreteIndex(request.index())); - this.shards = indicesService.indexServiceSafe(request.index()).operationRouting() + this.shardsIt = indicesService.indexServiceSafe(request.index()).operationRouting() .getShards(clusterState, request.type(), request.id()); - this.shardsIt = shards.iterator(); } public void start() { performFirst(); } - public void onFailure(ShardRouting shardRouting, Exception e) { - if (logger.isDebugEnabled()) { - logger.debug(shardRouting.shortSummary() + ": Failed to get [" + request.type() + "#" + request.id() + "]", e); + private void onFailure(ShardRouting shardRouting, Exception e) { + if (logger.isTraceEnabled() && e != null) { + logger.trace(shardRouting.shortSummary() + ": Failed to get [" + request.type() + "#" + request.id() + "]", e); } perform(e); } @@ -122,11 +118,8 @@ public void onFailure(ShardRouting shardRouting, Exception e) { * First get should try and use a shard that exists on a local node for better performance */ private void performFirst() { - while (shardsIt.hasNext()) { - final ShardRouting shard = shardsIt.next(); - if (!shard.active()) { - continue; - } + while (shardsIt.hasNextActive()) { + final ShardRouting shard = shardsIt.nextActive(); if (shard.currentNodeId().equals(nodes.localNodeId())) { if (request.operationThreaded()) { threadPool.execute(new Runnable() { @@ -159,19 +152,16 @@ private void performFirst() { } } } - if (!shardsIt.hasNext()) { + if (!shardsIt.hasNextActive()) { // no local node get, go remote - shardsIt = shards.reset().iterator(); + shardsIt.reset(); perform(null); } } private void perform(final Exception lastException) { - while (shardsIt.hasNext()) { - final ShardRouting shard = shardsIt.next(); - if (!shard.active()) { - continue; - } + while (shardsIt.hasNextActive()) { + final ShardRouting shard = shardsIt.nextActive(); // no need to check for local nodes, we tried them already in performFirstGet if (!shard.currentNodeId().equals(nodes.localNodeId())) { Node node = nodes.get(shard.currentNodeId()); @@ -204,12 +194,20 @@ private void perform(final Exception lastException) { return; } } - if (!shardsIt.hasNext()) { - final NoShardAvailableActionException failure = new NoShardAvailableActionException(shards.shardId(), "No shard available for [" + request.type() + "#" + request.id() + "]", lastException); + if (!shardsIt.hasNextActive()) { + Exception failure = lastException; + if (failure == null) { + failure = new NoShardAvailableActionException(shardsIt.shardId(), "No shard available for [" + request.type() + "#" + request.id() + "]"); + } else { + if (logger.isDebugEnabled()) { + logger.debug(shardsIt.shardId() + ": Failed to get [" + request.type() + "#" + request.id() + "]", failure); + } + } if (request.listenerThreaded()) { + final Exception fFailure = failure; threadPool.execute(new Runnable() { @Override public void run() { - listener.onFailure(failure); + listener.onFailure(fFailure); } }); } else { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/terms/TermsResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/terms/TermsResponse.java index c67fddec07362..fc3b18b9429e7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/terms/TermsResponse.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/terms/TermsResponse.java @@ -54,9 +54,9 @@ public class TermsResponse extends BroadcastOperationResponse implements Iterabl TermsResponse() { } - TermsResponse(int successfulShards, int failedShards, List shardFailures, FieldTermsFreq[] fieldsTermsFreq, + TermsResponse(int totalShards, int successfulShards, int failedShards, List shardFailures, FieldTermsFreq[] fieldsTermsFreq, long numDocs, long maxDoc, long numDeletedDocs) { - super(successfulShards, failedShards, shardFailures); + super(totalShards, successfulShards, failedShards, shardFailures); this.fieldsTermsFreq = fieldsTermsFreq; this.numDocs = numDocs; this.maxDoc = maxDoc; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/terms/TransportTermsAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/terms/TransportTermsAction.java index ad119802090de..7844209048d9b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/terms/TransportTermsAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/terms/TransportTermsAction.java @@ -140,7 +140,7 @@ public class TransportTermsAction extends TransportBroadcastOperationAction { - - private int index = 0; - - private final List iterators; - - private Iterator current; - - public CompoundShardsIterator(List iterators) { - this.iterators = iterators; - } - - @Override public ShardsIterator reset() { - for (ShardsIterator it : iterators) { - it.reset(); - } - index = 0; - current = null; - return this; - } - - @Override public int size() { - int size = 0; - for (ShardsIterator it : iterators) { - size += it.size(); - } - return size; - } - - @Override public boolean hasNext() { - if (index == iterators.size()) { - return false; - } - if (current == null) { - current = iterators.get(index).iterator(); - } - while (!current.hasNext()) { - if (++index == iterators.size()) { - return false; - } - current = iterators.get(index).iterator(); - } - return true; - } - - @Override public ShardRouting next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - return current.next(); - } - - @Override public void remove() { - throw new UnsupportedOperationException(); - } - - @Override public ShardId shardId() { - return currentShardsIterator().shardId(); - } - - @Override public Iterator iterator() { - return this; - } - - private ShardsIterator currentShardsIterator() throws NoSuchElementException { - if (iterators.size() == 0) { - throw new NoSuchElementException(); - } - if (index == iterators.size()) { - return iterators.get(index - 1); - } - return iterators.get(index); - - } -} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/GroupShardsIterator.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/GroupShardsIterator.java index b8dae204a9298..40d4d305a3d2c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/GroupShardsIterator.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/GroupShardsIterator.java @@ -57,6 +57,14 @@ public int totalSize() { return size; } + public int totalSizeActive() { + int size = 0; + for (ShardsIterator shard : iterators) { + size += shard.sizeActive(); + } + return size; + } + public int size() { return iterators.size(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java index 102e823a5c406..b569ffe5da909 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java @@ -110,7 +110,7 @@ public GroupShardsIterator groupByShardsIt() { * A groups shards iterator where each groups is a single {@link ShardRouting} and a group * is created for each shard routing. * - *

This basically means that components that use the {@link GroupShardsIterator} will itearte + *

This basically means that components that use the {@link GroupShardsIterator} will iterate * over *all* the shards (all the replicas) within the index. */ public GroupShardsIterator groupByAllIt() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java index 4cc30ef4a4994..7f1d0e4c94b87 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java @@ -106,7 +106,7 @@ ShardRouting shardModulo(int shardId) { *

The class can be used from different threads, though not designed to be used concurrently * from different threads. */ - private class IndexShardsIterator implements ShardsIterator, Iterator { + class IndexShardsIterator implements ShardsIterator, Iterator { private final int origIndex; @@ -130,17 +130,47 @@ private IndexShardsIterator(int index) { } @Override public boolean hasNext() { - return counter != size(); + return counter < size(); } - @Override public ShardRouting next() { + @Override public ShardRouting next() throws NoSuchElementException { if (!hasNext()) { - throw new NoSuchElementException(); + throw new NoSuchElementException("No shard found"); } counter++; return shardModulo(index++); } + @Override public boolean hasNextActive() { + int counter = this.counter; + int index = this.index; + while (counter++ < size()) { + ShardRouting shardRouting = shardModulo(index++); + if (shardRouting.active()) { + return true; + } + } + return false; + } + + @Override public ShardRouting nextActive() throws NoSuchElementException { + ShardRouting shardRouting = nextActiveOrNull(); + if (shardRouting == null) { + throw new NoSuchElementException("No active shard found"); + } + return shardRouting; + } + + @Override public ShardRouting nextActiveOrNull() throws NoSuchElementException { + while (counter++ < size()) { + ShardRouting shardRouting = shardModulo(index++); + if (shardRouting.active()) { + return shardRouting; + } + } + return null; + } + @Override public void remove() { throw new UnsupportedOperationException(); } @@ -149,6 +179,16 @@ private IndexShardsIterator(int index) { return IndexShardRoutingTable.this.size(); } + @Override public int sizeActive() { + int shardsActive = 0; + for (ShardRouting shardRouting : IndexShardRoutingTable.this.shards()) { + if (shardRouting.active()) { + shardsActive++; + } + } + return shardsActive; + } + @Override public ShardId shardId() { return IndexShardRoutingTable.this.shardId(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/PlainShardsIterator.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/PlainShardsIterator.java index dbe837f703c09..59cb297c80390 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/PlainShardsIterator.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/PlainShardsIterator.java @@ -23,9 +23,10 @@ import java.util.Iterator; import java.util.List; +import java.util.NoSuchElementException; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class PlainShardsIterator implements ShardsIterator { @@ -33,16 +34,15 @@ public class PlainShardsIterator implements ShardsIterator { private final List shards; - private Iterator iterator; + private volatile int counter = 0; public PlainShardsIterator(ShardId shardId, List shards) { this.shardId = shardId; this.shards = shards; - this.iterator = shards.iterator(); } @Override public ShardsIterator reset() { - this.iterator = shards.iterator(); + this.counter = 0; return this; } @@ -50,6 +50,16 @@ public PlainShardsIterator(ShardId shardId, List shards) { return shards.size(); } + @Override public int sizeActive() { + int sizeActive = 0; + for (ShardRouting shardRouting : shards) { + if (shardRouting.active()) { + sizeActive++; + } + } + return sizeActive; + } + @Override public ShardId shardId() { return this.shardId; } @@ -59,11 +69,42 @@ public PlainShardsIterator(ShardId shardId, List shards) { } @Override public boolean hasNext() { - return iterator.hasNext(); + return counter < shards.size(); } @Override public ShardRouting next() { - return iterator.next(); + if (!hasNext()) { + throw new NoSuchElementException("No shard found"); + } + return shards.get(counter++); + } + + @Override public boolean hasNextActive() { + int counter = this.counter; + while (counter < shards.size()) { + if (shards.get(counter++).active()) { + return true; + } + } + return false; + } + + @Override public ShardRouting nextActive() throws NoSuchElementException { + ShardRouting shardRouting = nextActiveOrNull(); + if (shardRouting == null) { + throw new NoSuchElementException("No active shard found"); + } + return shardRouting; + } + + @Override public ShardRouting nextActiveOrNull() throws NoSuchElementException { + while (counter < shards.size()) { + ShardRouting shardRouting = shards.get(counter++); + if (shardRouting.active()) { + return shardRouting; + } + } + return null; } @Override public void remove() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/ShardsIterator.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/ShardsIterator.java index 9a372a2854d1a..ab2e14097424f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/ShardsIterator.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/ShardsIterator.java @@ -22,6 +22,7 @@ import org.elasticsearch.index.shard.ShardId; import java.util.Iterator; +import java.util.NoSuchElementException; /** * @author kimchy (Shay Banon) @@ -35,5 +36,13 @@ public interface ShardsIterator extends Iterable, Iterator