Skip to content

Commit

Permalink
Make Snapshot Logic Write Metadata after Segments
Browse files Browse the repository at this point in the history
Backport of elastic/elasticsearch#45689
Write metadata during snapshot finalization after segment files
to prevent outdated metadata in case of dynamic mapping updates.
  • Loading branch information
mkleen committed Nov 13, 2019
1 parent c896b96 commit aa48a6d
Show file tree
Hide file tree
Showing 6 changed files with 253 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStream;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.internal.io.IOUtils;

Expand Down Expand Up @@ -149,6 +150,17 @@ public static int readFully(InputStream reader, byte[] dest, int offset, int len
return read;
}

/**
* Reads all bytes from the given {@link InputStream} and closes it afterwards.
*/
public static BytesReference readFully(InputStream in) throws IOException {
try (InputStream inputStream = in) {
BytesStreamOutput out = new BytesStreamOutput();
copy(inputStream, out);
return out.bytes();
}
}

/**
* Wraps the given {@link BytesStream} in a {@link StreamOutput} that simply flushes when
* close is called.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,15 @@ default Repository create(RepositoryMetaData metaData, Function<String, Reposito
* @param includeGlobalState include cluster global state
* @return snapshot description
*/
SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState);
SnapshotInfo finalizeSnapshot(SnapshotId snapshotId,
List<IndexId> indices,
long startTime,
String failure,
int totalShards,
List<SnapshotShardFailure> shardFailures,
long repositoryStateId,
final MetaData clusterMetaData,
boolean includeGlobalState);

/**
* Deletes snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,29 +387,13 @@ public RepositoryMetaData getMetadata() {

@Override
public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData clusterMetaData) {
if (isReadOnly()) {
throw new RepositoryException(metadata.name(), "cannot create snapshot in a readonly repository");
}
try {
final String snapshotName = snapshotId.getName();
// check if the snapshot name already exists in the repository
final RepositoryData repositoryData = getRepositoryData();
if (repositoryData.getAllSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) {
throw new InvalidSnapshotNameException(metadata.name(), snapshotId.getName(), "snapshot with the same name already exists");
}
if (snapshotFormat.exists(blobContainer(), snapshotId.getUUID())) {
throw new InvalidSnapshotNameException(metadata.name(), snapshotId.getName(), "snapshot with the same name already exists");
}

// Write Global MetaData
globalMetaDataFormat.write(clusterMetaData, blobContainer(), snapshotId.getUUID());
globalMetaDataFormat.write(clusterMetaData, blobContainer(), snapshotId.getUUID(), true);

// write the index metadata for each index in the snapshot
for (IndexId index : indices) {
final IndexMetaData indexMetaData = clusterMetaData.index(index.getName());
final BlobPath indexPath = basePath().add("indices").add(index.getId());
final BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath);
indexMetaDataFormat.write(indexMetaData, indexMetaDataBlobContainer, snapshotId.getUUID());
indexMetaDataFormat.write(clusterMetaData.index(index.getName()), indexContainer(index), snapshotId.getUUID(), true);
}
} catch (IOException ex) {
throw new SnapshotCreationException(metadata.name(), snapshotId, ex);
Expand Down Expand Up @@ -544,15 +528,35 @@ public SnapshotInfo finalizeSnapshot(final SnapshotId snapshotId,
final int totalShards,
final List<SnapshotShardFailure> shardFailures,
final long repositoryStateId,
final boolean includeGlobalState) {
final MetaData clusterMetaData,
final boolean includeGlobalState
) {
SnapshotInfo blobStoreSnapshot = new SnapshotInfo(snapshotId,
indices.stream().map(IndexId::getName).collect(Collectors.toList()),
startTime, failure, System.currentTimeMillis(), totalShards, shardFailures,
includeGlobalState);
try {
snapshotFormat.write(blobStoreSnapshot, blobContainer(), snapshotId.getUUID());
final RepositoryData repositoryData = getRepositoryData();
writeIndexGen(repositoryData.addSnapshot(snapshotId, blobStoreSnapshot.state(), indices), repositoryStateId);
// We ignore all FileAlreadyExistsException here since otherwise a master failover while in this method will
// mean that no snap-${uuid}.dat blob is ever written for this snapshot. This is safe because any updated version of the
// index or global metadata will be compatible with the segments written in this snapshot as well.
// Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a way that
// decrements the generation it points at

// Write Global MetaData
globalMetaDataFormat.write(clusterMetaData, blobContainer(), snapshotId.getUUID(), false);

// write the index metadata for each index in the snapshot
for (IndexId index : indices) {
indexMetaDataFormat.write(clusterMetaData.index(index.getName()), indexContainer(index), snapshotId.getUUID(), false);
}
} catch (IOException ex) {
throw new SnapshotException(metadata.name(), snapshotId, "failed to write metadata for snapshot", ex);
}

try {
final RepositoryData updatedRepositoryData = getRepositoryData().addSnapshot(snapshotId, blobStoreSnapshot.state(), indices);
snapshotFormat.write(blobStoreSnapshot, blobContainer(), snapshotId.getUUID(), false);
writeIndexGen(updatedRepositoryData, repositoryStateId);
} catch (FileAlreadyExistsException ex) {
// if another master was elected and took over finalizing the snapshot, it is possible
// that both nodes try to finalize the snapshot and write to the same blobs, so we just
Expand All @@ -565,6 +569,14 @@ public SnapshotInfo finalizeSnapshot(final SnapshotId snapshotId,
return blobStoreSnapshot;
}

private BlobPath indicesPath() {
return basePath().add("indices");
}

private BlobContainer indexContainer(IndexId indexId) {
return blobStore().blobContainer(indicesPath().add(indexId.getId()));
}

@Override
public SnapshotInfo getSnapshotInfo(final SnapshotId snapshotId) {
try {
Expand Down Expand Up @@ -928,10 +940,6 @@ public String toString() {
']';
}

BlobStoreFormat<BlobStoreIndexShardSnapshot> indexShardSnapshotFormat(Version version) {
return indexShardSnapshotFormat;
}

/**
* Context for snapshot/restore operations
*/
Expand Down Expand Up @@ -972,7 +980,7 @@ public void delete() {
int fileListGeneration = tuple.v2();

try {
indexShardSnapshotFormat(version).delete(blobContainer, snapshotId.getUUID());
indexShardSnapshotFormat.delete(blobContainer, snapshotId.getUUID());
} catch (IOException e) {
LOGGER.debug("[{}] [{}] failed to delete shard snapshot file", shardId, snapshotId);
}
Expand All @@ -993,7 +1001,7 @@ public void delete() {
*/
BlobStoreIndexShardSnapshot loadSnapshot() {
try {
return indexShardSnapshotFormat(version).read(blobContainer, snapshotId.getUUID());
return indexShardSnapshotFormat.read(blobContainer, snapshotId.getUUID());
} catch (IOException ex) {
throw new SnapshotException(metadata.name(), snapshotId, "failed to read shard snapshot file for " + shardId, ex);
}
Expand Down Expand Up @@ -1299,7 +1307,7 @@ public void snapshot(final IndexCommit snapshotIndexCommit) {
//TODO: The time stored in snapshot doesn't include cleanup time.
LOGGER.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId);
try {
indexShardSnapshotFormat.write(snapshot, blobContainer, snapshotId.getUUID());
indexShardSnapshotFormat.write(snapshot, blobContainer, snapshotId.getUUID(), false);
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e);
}
Expand Down
Loading

0 comments on commit aa48a6d

Please sign in to comment.