Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initialize sequence numbers on a shrunken index #25321

Merged
merged 4 commits into from
Jun 21, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1318,7 +1318,7 @@ public static Set<ShardId> selectShrinkShards(int shardId, IndexMetaData sourceI
* @param sourceIndexMetadata the metadata of the source index
* @param targetNumberOfShards the total number of shards in the target index
* @return the routing factor for and shrunk index with the given number of target shards.
* @throws IllegalArgumentException if the number of source shards is greater than the number of target shards or if the source shards
* @throws IllegalArgumentException if the number of source shards is less than the number of target shards or if the source shards
* are not divisible by the number of target shards.
*/
public static int getRoutingFactor(IndexMetaData sourceIndexMetadata, int targetNumberOfShards) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ Index getIndex() {
return shard.indexSettings().getIndex();
}

long maxSeqNo() {
return shard.getEngine().seqNoService().getMaxSeqNo();
}

Directory getSnapshotDirectory() {
/* this directory will not be used for anything else but reading / copying files to another directory
* we prevent all write operations on this directory with UOE - nobody should close it either. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.RecoveryState;
Expand All @@ -49,6 +50,8 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -115,9 +118,11 @@ boolean recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdate
logger.debug("starting recovery from local shards {}", shards);
try {
final Directory directory = indexShard.store().directory(); // don't close this directory!!
addIndices(indexShard.recoveryState().getIndex(), directory, indexSort,
shards.stream().map(s -> s.getSnapshotDirectory())
.collect(Collectors.toList()).toArray(new Directory[shards.size()]));

final Directory[] sources =
shards.stream().map(LocalShardSnapshot::getSnapshotDirectory).collect(Collectors.toList()).toArray(new Directory[0]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit - there is a stream::toArray(Directory[]::new))

final long maxSeqNo = shards.stream().mapToLong(LocalShardSnapshot::maxSeqNo).max().getAsLong();
addIndices(indexShard.recoveryState().getIndex(), directory, indexSort, sources, maxSeqNo);
internalRecoverFromStore(indexShard);
// just trigger a merge to do housekeeping on the
// copied segments - we will also see them in stats etc.
Expand All @@ -131,8 +136,13 @@ boolean recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdate
return false;
}

void addIndices(RecoveryState.Index indexRecoveryStats, Directory target, Sort indexSort, Directory... sources) throws IOException {
target = new org.apache.lucene.store.HardlinkCopyDirectoryWrapper(target);
void addIndices(
final RecoveryState.Index indexRecoveryStats,
final Directory target,
final Sort indexSort,
final Directory[] sources,
final long maxSeqNo) throws IOException {
final Directory hardLinkOrCopyTarget = new org.apache.lucene.store.HardlinkCopyDirectoryWrapper(target);
IndexWriterConfig iwc = new IndexWriterConfig(null)
.setCommitOnClose(false)
// we don't want merges to happen here - we call maybe merge on the engine
Expand All @@ -143,8 +153,19 @@ void addIndices(RecoveryState.Index indexRecoveryStats, Directory target, Sort i
if (indexSort != null) {
iwc.setIndexSort(indexSort);
}
try (IndexWriter writer = new IndexWriter(new StatsDirectoryWrapper(target, indexRecoveryStats), iwc)) {
try (IndexWriter writer = new IndexWriter(new StatsDirectoryWrapper(hardLinkOrCopyTarget, indexRecoveryStats), iwc)) {
writer.addIndexes(sources);
/*
* We set the maximum sequence number and the local checkpoint on the target to the maximum of the maximum sequence numbers on
* the source shards. This ensures that history after this maximum sequence number can advance and we have correct
* document-level semantics.
*/
writer.setLiveCommitData(() -> {
final HashMap<String, String> liveCommitData = new HashMap<>(2);
liveCommitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
liveCommitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the plan to do MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID as a follow up?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I edited the plan on #10708 to separate these out into separate line items, I do not like mixing things. I do forgive you for not seeing this. 😛

return liveCommitData.entrySet().iterator();
});
writer.commit();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.action.admin.indices.create;

import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.SortedSetSelector;
Expand All @@ -29,17 +28,17 @@
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.Murmur3HashFunction;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.ImmutableOpenMap;
Expand All @@ -48,8 +47,8 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
Expand All @@ -58,15 +57,11 @@

import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
Expand Down Expand Up @@ -233,7 +228,8 @@ public void testCreateShrinkIndex() {
.put("number_of_shards", randomIntBetween(2, 7))
.put("index.version.created", version)
).get();
for (int i = 0; i < 20; i++) {
final int docs = randomIntBetween(1, 128);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to test with 0 docs too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

for (int i = 0; i < docs; i++) {
client().prepareIndex("source", "type")
.setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", XContentType.JSON).get();
}
Expand All @@ -252,30 +248,42 @@ public void testCreateShrinkIndex() {
.put("index.routing.allocation.require._name", mergeNode)
.put("index.blocks.write", true)).get();
ensureGreen();

final IndicesStatsResponse sourceStats = client().admin().indices().prepareStats("source").get();
final long maxSeqNo =
Arrays.stream(sourceStats.getShards()).map(ShardStats::getSeqNoStats).mapToLong(SeqNoStats::getMaxSeqNo).max().getAsLong();
// now merge source into a single shard index

final boolean createWithReplicas = randomBoolean();
assertAcked(client().admin().indices().prepareShrinkIndex("source", "target")
.setSettings(Settings.builder().put("index.number_of_replicas", createWithReplicas ? 1 : 0).build()).get());
ensureGreen();
assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);

final IndicesStatsResponse targetStats = client().admin().indices().prepareStats("target").get();
for (final ShardStats shardStats : targetStats.getShards()) {
final SeqNoStats seqNoStats = shardStats.getSeqNoStats();
assertThat(seqNoStats.getMaxSeqNo(), equalTo(maxSeqNo));
assertThat(seqNoStats.getLocalCheckpoint(), equalTo(maxSeqNo));
}

assertHitCount(client().prepareSearch("target").setSize(2 * docs).setQuery(new TermsQueryBuilder("foo", "bar")).get(), docs);

if (createWithReplicas == false) {
// bump replicas
client().admin().indices().prepareUpdateSettings("target")
.setSettings(Settings.builder()
.put("index.number_of_replicas", 1)).get();
ensureGreen();
assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
assertHitCount(client().prepareSearch("target").setSize(2 * docs).setQuery(new TermsQueryBuilder("foo", "bar")).get(), docs);
}

for (int i = 20; i < 40; i++) {
for (int i = docs; i < 2 * docs; i++) {
client().prepareIndex("target", "type")
.setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", XContentType.JSON).get();
}
flushAndRefresh();
assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 40);
assertHitCount(client().prepareSearch("source").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
assertHitCount(client().prepareSearch("target").setSize(4 * docs).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 2 * docs);
assertHitCount(client().prepareSearch("source").setSize(2 * docs).setQuery(new TermsQueryBuilder("foo", "bar")).get(), docs);
GetSettingsResponse target = client().admin().indices().prepareGetSettings("target").get();
assertEquals(version, target.getIndexToSettings().get("target").getAsVersion("index.version.created", null));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.test.ESTestCase;

Expand All @@ -46,8 +47,11 @@
import java.nio.file.attribute.BasicFileAttributes;
import java.security.AccessControlException;
import java.util.Arrays;
import java.util.Map;
import java.util.function.Predicate;

import static org.hamcrest.CoreMatchers.equalTo;

public class StoreRecoveryTests extends ESTestCase {

public void testAddIndices() throws IOException {
Expand Down Expand Up @@ -82,7 +86,8 @@ public void testAddIndices() throws IOException {
StoreRecovery storeRecovery = new StoreRecovery(new ShardId("foo", "bar", 1), logger);
RecoveryState.Index indexStats = new RecoveryState.Index();
Directory target = newFSDirectory(createTempDir());
storeRecovery.addIndices(indexStats, target, indexSort, dirs);
final long maxSeqNo = randomNonNegativeLong();
storeRecovery.addIndices(indexStats, target, indexSort, dirs, maxSeqNo);
int numFiles = 0;
Predicate<String> filesFilter = (f) -> f.startsWith("segments") == false && f.equals("write.lock") == false
&& f.startsWith("extra") == false;
Expand All @@ -99,6 +104,9 @@ public void testAddIndices() throws IOException {
}
DirectoryReader reader = DirectoryReader.open(target);
SegmentInfos segmentCommitInfos = SegmentInfos.readLatestCommit(target);
final Map<String, String> userData = segmentCommitInfos.getUserData();
assertThat(userData.get(SequenceNumbers.MAX_SEQ_NO), equalTo(Long.toString(maxSeqNo)));
assertThat(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY), equalTo(Long.toString(maxSeqNo)));
for (SegmentCommitInfo info : segmentCommitInfos) { // check that we didn't merge
assertEquals("all sources must be flush",
info.info.getDiagnostics().get("source"), "flush");
Expand Down