Skip to content

Commit

Permalink
Wait for mapping ready in testReadRequestsReturnLatestMappingVersion
Browse files Browse the repository at this point in the history
If the index request is executed before the mapping update is applied on
the IndexShard, the index request will perform a dynamic mapping update.
This mapping update will be timeout (i.e, ProcessClusterEventTimeoutException)
because the latch is not open. This leads to the failure of the index
request and the test. This commit makes sure the mapping is ready
before we execute the index request.

Closes elastic#37807
  • Loading branch information
dnhatn committed Jan 25, 2019
1 parent 9e932f4 commit af9c3af
Showing 1 changed file with 30 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
Expand Down Expand Up @@ -230,8 +231,7 @@ public void testAddNewReplicasOnFollower() throws Exception {
pauseFollow("follower-index");
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37807")
public void testReadRequestsReturnsLatestMappingVersion() throws Exception {
public void testReadRequestsReturnLatestMappingVersion() throws Exception {
InternalTestCluster leaderCluster = getLeaderCluster();
Settings nodeAttributes = Settings.builder().put("node.attr.box", "large").build();
String dataNode = leaderCluster.startDataOnlyNode(nodeAttributes);
Expand All @@ -244,6 +244,9 @@ public void testReadRequestsReturnsLatestMappingVersion() throws Exception {
.put("index.routing.allocation.require.box", "large"))
.get()
);
getFollowerCluster().startDataOnlyNode(nodeAttributes);
followerClient().execute(PutFollowAction.INSTANCE, putFollow("leader-index", "follower-index")).get();
ensureFollowerGreen("follower-index");
ClusterService clusterService = leaderCluster.clusterService(dataNode);
ShardId shardId = clusterService.state().routingTable().index("leader-index").shard(0).shardId();
IndicesService indicesService = leaderCluster.getInstance(IndicesService.class, dataNode);
Expand All @@ -265,22 +268,30 @@ public void testReadRequestsReturnsLatestMappingVersion() throws Exception {
});
leaderCluster.client().admin().indices().preparePutMapping().setType("doc")
.setSource("balance", "type=long").setTimeout(TimeValue.ZERO).get();
IndexResponse indexResp = leaderCluster.client(dataNode).prepareIndex("leader-index", "doc", "1")
.setSource("{\"balance\": 100}", XContentType.JSON).setTimeout(TimeValue.ZERO).get();
assertThat(indexResp.getResult(), equalTo(DocWriteResponse.Result.CREATED));
assertThat(indexShard.getGlobalCheckpoint(), equalTo(0L));
getFollowerCluster().startDataOnlyNode(nodeAttributes);
followerClient().execute(PutFollowAction.INSTANCE, putFollow("leader-index", "follower-index")).get();
ensureFollowerGreen("follower-index");
// Make sure at least one read-request which requires mapping sync is completed.
assertBusy(() -> {
CcrClient ccrClient = new CcrClient(followerClient());
FollowStatsAction.StatsResponses responses = ccrClient.followStats(new FollowStatsAction.StatsRequest()).actionGet();
long bytesRead = responses.getStatsResponses().stream().mapToLong(r -> r.status().bytesRead()).sum();
assertThat(bytesRead, Matchers.greaterThan(0L));
}, 60, TimeUnit.SECONDS);
latch.countDown();
assertIndexFullyReplicatedToFollower("leader-index", "follower-index");
pauseFollow("follower-index");
try {
// Make sure the mapping is ready on the shard before we execute the index request; otherwise the index request
// will perform a dynamic mapping update which however will be blocked because the latch is remained closed.
assertBusy(() -> {
DocumentMapper mapper = indexShard.mapperService().documentMapper("doc");
assertNotNull(mapper);
assertNotNull(mapper.mappers().getMapper("balance"));
});
IndexResponse indexResp = leaderCluster.client().prepareIndex("leader-index", "doc", "1")
.setSource("{\"balance\": 100}", XContentType.JSON).setTimeout(TimeValue.ZERO).get();
assertThat(indexResp.getResult(), equalTo(DocWriteResponse.Result.CREATED));
assertThat(indexShard.getGlobalCheckpoint(), equalTo(0L));
// Make sure at least one read-request which requires mapping sync is completed.
assertBusy(() -> {
CcrClient ccrClient = new CcrClient(followerClient());
FollowStatsAction.StatsResponses responses = ccrClient.followStats(new FollowStatsAction.StatsRequest()).actionGet();
long bytesRead = responses.getStatsResponses().stream().mapToLong(r -> r.status().bytesRead()).sum();
assertThat(bytesRead, Matchers.greaterThan(0L));
}, 60, TimeUnit.SECONDS);
latch.countDown();
assertIndexFullyReplicatedToFollower("leader-index", "follower-index");
} finally {
latch.countDown(); // no effect if latch was counted down - this makes sure teardown can make progress.
pauseFollow("follower-index");
}
}
}

0 comments on commit af9c3af

Please sign in to comment.