|
129 | 129 | import java.util.*; |
130 | 130 | import java.util.concurrent.*; |
131 | 131 | import java.util.concurrent.atomic.AtomicInteger; |
| 132 | +import java.util.concurrent.atomic.AtomicLong; |
132 | 133 |
|
133 | 134 | import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; |
134 | 135 | import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; |
@@ -991,35 +992,37 @@ public long waitForDocs(final long numDocs, final @Nullable BackgroundIndexer in |
991 | 992 | */ |
992 | 993 | public long waitForDocs(final long numDocs, int maxWaitTime, TimeUnit maxWaitTimeUnit, final @Nullable BackgroundIndexer indexer) |
993 | 994 | throws InterruptedException { |
994 | | - final long[] lastKnownCount = {-1}; |
| 995 | + final AtomicLong lastKnownCount = new AtomicLong(-1); |
995 | 996 | long lastStartCount = -1; |
996 | 997 | Predicate<Object> testDocs = new Predicate<Object>() { |
997 | 998 | @Override |
998 | 999 | public boolean apply(Object o) { |
999 | | - lastKnownCount[0] = indexer.totalIndexedDocs(); |
1000 | | - if (lastKnownCount[0] >= numDocs) { |
| 1000 | + if (indexer != null) { |
| 1001 | + lastKnownCount.set(indexer.totalIndexedDocs()); |
| 1002 | + } |
| 1003 | + if (lastKnownCount.get() >= numDocs) { |
1001 | 1004 | long count = client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(); |
1002 | | - if (count == lastKnownCount[0]) { |
| 1005 | + if (count == lastKnownCount.get()) { |
1003 | 1006 | // no progress - try to refresh for the next time |
1004 | 1007 | client().admin().indices().prepareRefresh().get(); |
1005 | 1008 | } |
1006 | | - lastKnownCount[0] = count; |
1007 | | - logger.debug("[{}] docs visible for search. waiting for [{}]", lastKnownCount[0], numDocs); |
| 1009 | + lastKnownCount.set(count); |
| 1010 | + logger.debug("[{}] docs visible for search. waiting for [{}]", lastKnownCount.get(), numDocs); |
1008 | 1011 | } else { |
1009 | | - logger.debug("[{}] docs indexed. waiting for [{}]", lastKnownCount[0], numDocs); |
| 1012 | + logger.debug("[{}] docs indexed. waiting for [{}]", lastKnownCount.get(), numDocs); |
1010 | 1013 | } |
1011 | | - return lastKnownCount[0] >= numDocs; |
| 1014 | + return lastKnownCount.get() >= numDocs; |
1012 | 1015 | } |
1013 | 1016 | }; |
1014 | 1017 |
|
1015 | 1018 | while (!awaitBusy(testDocs, maxWaitTime, maxWaitTimeUnit)) { |
1016 | | - if (lastStartCount == lastKnownCount[0]) { |
| 1019 | + if (lastStartCount == lastKnownCount.get()) { |
1017 | 1020 | // we didn't make any progress |
1018 | 1021 | fail("failed to reach " + numDocs + "docs"); |
1019 | 1022 | } |
1020 | | - lastStartCount = lastKnownCount[0]; |
| 1023 | + lastStartCount = lastKnownCount.get(); |
1021 | 1024 | } |
1022 | | - return lastKnownCount[0]; |
| 1025 | + return lastKnownCount.get(); |
1023 | 1026 | } |
1024 | 1027 |
|
1025 | 1028 |
|
|
0 commit comments