|
133 | 133 | import java.util.*; |
134 | 134 | import java.util.concurrent.*; |
135 | 135 | import java.util.concurrent.atomic.AtomicInteger; |
| 136 | +import java.util.concurrent.atomic.AtomicLong; |
136 | 137 |
|
137 | 138 | import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; |
138 | 139 | import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; |
@@ -1021,34 +1022,36 @@ public long waitForDocs(final long numDocs, final @Nullable BackgroundIndexer in |
1021 | 1022 | */ |
1022 | 1023 | public long waitForDocs(final long numDocs, int maxWaitTime, TimeUnit maxWaitTimeUnit, final @Nullable BackgroundIndexer indexer) |
1023 | 1024 | throws InterruptedException { |
1024 | | - final long[] lastKnownCount = {-1}; |
| 1025 | + final AtomicLong lastKnownCount = new AtomicLong(-1); |
1025 | 1026 | long lastStartCount = -1; |
1026 | 1027 | Predicate<Object> testDocs = new Predicate<Object>() { |
1027 | 1028 | public boolean apply(Object o) { |
1028 | | - lastKnownCount[0] = indexer.totalIndexedDocs(); |
1029 | | - if (lastKnownCount[0] >= numDocs) { |
| 1029 | + if (indexer != null) { |
| 1030 | + lastKnownCount.set(indexer.totalIndexedDocs()); |
| 1031 | + } |
| 1032 | + if (lastKnownCount.get() >= numDocs) { |
1030 | 1033 | long count = client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(); |
1031 | | - if (count == lastKnownCount[0]) { |
| 1034 | + if (count == lastKnownCount.get()) { |
1032 | 1035 | // no progress - try to refresh for the next time |
1033 | 1036 | client().admin().indices().prepareRefresh().get(); |
1034 | 1037 | } |
1035 | | - lastKnownCount[0] = count; |
1036 | | - logger.debug("[{}] docs visible for search. waiting for [{}]", lastKnownCount[0], numDocs); |
| 1038 | + lastKnownCount.set(count); |
| 1039 | + logger.debug("[{}] docs visible for search. waiting for [{}]", lastKnownCount.get(), numDocs); |
1037 | 1040 | } else { |
1038 | | - logger.debug("[{}] docs indexed. waiting for [{}]", lastKnownCount[0], numDocs); |
| 1041 | + logger.debug("[{}] docs indexed. waiting for [{}]", lastKnownCount.get(), numDocs); |
1039 | 1042 | } |
1040 | | - return lastKnownCount[0] >= numDocs; |
| 1043 | + return lastKnownCount.get() >= numDocs; |
1041 | 1044 | } |
1042 | 1045 | }; |
1043 | 1046 |
|
1044 | 1047 | while (!awaitBusy(testDocs, maxWaitTime, maxWaitTimeUnit)) { |
1045 | | - if (lastStartCount == lastKnownCount[0]) { |
| 1048 | + if (lastStartCount == lastKnownCount.get()) { |
1046 | 1049 | // we didn't make any progress |
1047 | 1050 | fail("failed to reach " + numDocs + "docs"); |
1048 | 1051 | } |
1049 | | - lastStartCount = lastKnownCount[0]; |
| 1052 | + lastStartCount = lastKnownCount.get(); |
1050 | 1053 | } |
1051 | | - return lastKnownCount[0]; |
| 1054 | + return lastKnownCount.get(); |
1052 | 1055 | } |
1053 | 1056 |
|
1054 | 1057 |
|
|
0 commit comments