Skip to content

Commit a79cd77

Browse files
committed
Remove IndexShard dependency from Repository (elastic#42213)
* Remove IndexShard dependency from Repository In order to simplify repository testing especially for BlobStoreRepository it's important to remove the dependency on IndexShard and reduce it to Store and MapperService (in the snapshot case). This significantly reduces the dependcy footprint for Repository and allows unittesting without starting nodes or instantiate entire shard instances. This change deprecates the old method signatures and adds a unittest for FileRepository to show the advantage of this change. In addition, the unittesting surfaced a bug where the internal file names that are private to the repository were used in the recovery stats instead of the target file names which makes it impossible to relate to the actual lucene files in the recovery stats. * don't delegate deprecated methods * apply comments * test
1 parent 3a20ff7 commit a79cd77

File tree

16 files changed

+353
-107
lines changed

16 files changed

+353
-107
lines changed

server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,8 @@ private void restore(final IndexShard indexShard, final Repository repository, f
476476
snapshotShardId = new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, shardId.id());
477477
}
478478
final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName);
479-
repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(),
479+
assert indexShard.getEngineOrNull() == null;
480+
repository.restoreShard(indexShard, indexShard.store(), restoreSource.snapshot().getSnapshotId(),
480481
restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState());
481482
final Store store = indexShard.store();
482483
store.bootstrapNewHistory();

server/src/main/java/org/elasticsearch/repositories/FilterRepository.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.elasticsearch.cluster.node.DiscoveryNode;
2828
import org.elasticsearch.common.component.Lifecycle;
2929
import org.elasticsearch.common.component.LifecycleListener;
30-
import org.elasticsearch.index.shard.IndexShard;
30+
import org.elasticsearch.index.mapper.MapperService;
3131
import org.elasticsearch.index.shard.ShardId;
3232
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
3333
import org.elasticsearch.index.store.Store;
@@ -119,16 +119,17 @@ public boolean isReadOnly() {
119119
return in.isReadOnly();
120120
}
121121

122+
122123
@Override
123-
public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
124-
IndexShardSnapshotStatus snapshotStatus) {
125-
in.snapshotShard(shard, store, snapshotId, indexId, snapshotIndexCommit, snapshotStatus);
124+
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
125+
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
126+
in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus);
126127
}
127128

128129
@Override
129-
public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId,
130-
RecoveryState recoveryState) {
131-
in.restoreShard(shard, snapshotId, version, indexId, snapshotShardId, recoveryState);
130+
public void restoreShard(Store store, SnapshotId snapshotId,
131+
Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
132+
in.restoreShard(store, snapshotId, version, indexId, snapshotShardId, recoveryState);
132133
}
133134

134135
@Override

server/src/main/java/org/elasticsearch/repositories/Repository.java

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
2727
import org.elasticsearch.cluster.node.DiscoveryNode;
2828
import org.elasticsearch.common.component.LifecycleComponent;
29+
import org.elasticsearch.index.mapper.MapperService;
2930
import org.elasticsearch.index.shard.IndexShard;
3031
import org.elasticsearch.index.shard.ShardId;
3132
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
@@ -49,7 +50,7 @@
4950
* <ul>
5051
* <li>Master calls {@link #initializeSnapshot(SnapshotId, List, org.elasticsearch.cluster.metadata.MetaData)}
5152
* with list of indices that will be included into the snapshot</li>
52-
* <li>Data nodes call {@link Repository#snapshotShard(IndexShard, Store, SnapshotId, IndexId, IndexCommit, IndexShardSnapshotStatus)}
53+
* <li>Data nodes call {@link Repository#snapshotShard(Store, MapperService, SnapshotId, IndexId, IndexCommit, IndexShardSnapshotStatus)}
5354
* for each shard</li>
5455
* <li>When all shard calls return master calls {@link #finalizeSnapshot} with possible list of failures</li>
5556
* </ul>
@@ -196,30 +197,69 @@ SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long
196197
* <p>
197198
* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check
198199
* {@link IndexShardSnapshotStatus#isAborted()} to see if the snapshot process should be aborted.
199-
* @param shard shard to be snapshotted
200+
* @param indexShard the shard to be snapshotted
201+
* @param snapshotId snapshot id
202+
* @param indexId id for the index being snapshotted
203+
* @param snapshotIndexCommit commit point
204+
* @param snapshotStatus snapshot status
205+
* @deprecated use {@link #snapshotShard(Store, MapperService, SnapshotId, IndexId, IndexCommit, IndexShardSnapshotStatus)} instead
206+
*/
207+
@Deprecated
208+
default void snapshotShard(IndexShard indexShard, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
209+
IndexShardSnapshotStatus snapshotStatus) {
210+
snapshotShard(indexShard.store(), indexShard.mapperService(), snapshotId, indexId, snapshotIndexCommit, snapshotStatus);
211+
}
212+
213+
/**
214+
* Creates a snapshot of the shard based on the index commit point.
215+
* <p>
216+
* The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#acquireLastIndexCommit} method.
217+
* Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller.
218+
* <p>
219+
* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check
220+
* {@link IndexShardSnapshotStatus#isAborted()} to see if the snapshot process should be aborted.
200221
* @param store store to be snapshotted
222+
* @param mapperService the shards mapper service
201223
* @param snapshotId snapshot id
202224
* @param indexId id for the index being snapshotted
203225
* @param snapshotIndexCommit commit point
204226
* @param snapshotStatus snapshot status
205227
*/
206-
void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
228+
void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
207229
IndexShardSnapshotStatus snapshotStatus);
208230

209231
/**
210232
* Restores snapshot of the shard.
211233
* <p>
212234
* The index can be renamed on restore, hence different {@code shardId} and {@code snapshotShardId} are supplied.
213-
*
214235
* @param shard the shard to restore the index into
236+
* @param store the store to restore the index into
237+
* @param snapshotId snapshot id
238+
* @param version version of elasticsearch that created this snapshot
239+
* @param indexId id of the index in the repository from which the restore is occurring
240+
* @param snapshotShardId shard id (in the snapshot)
241+
* @param recoveryState recovery state
242+
* @deprecated use {@link #restoreShard(Store, SnapshotId, Version, IndexId, ShardId, RecoveryState)} instead
243+
*/
244+
@Deprecated
245+
default void restoreShard(IndexShard shard, Store store, SnapshotId snapshotId, Version version, IndexId indexId,
246+
ShardId snapshotShardId, RecoveryState recoveryState) {
247+
restoreShard(store, snapshotId, version, indexId, snapshotShardId, recoveryState);
248+
}
249+
250+
/**
251+
* Restores snapshot of the shard.
252+
* <p>
253+
* The index can be renamed on restore, hence different {@code shardId} and {@code snapshotShardId} are supplied.
254+
* @param store the store to restore the index into
215255
* @param snapshotId snapshot id
216256
* @param version version of elasticsearch that created this snapshot
217257
* @param indexId id of the index in the repository from which the restore is occurring
218258
* @param snapshotShardId shard id (in the snapshot)
219259
* @param recoveryState recovery state
220260
*/
221-
void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId,
222-
ShardId snapshotShardId, RecoveryState recoveryState);
261+
void restoreShard(Store store, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId,
262+
RecoveryState recoveryState);
223263

224264
/**
225265
* Retrieve shard snapshot status for the stored snapshot

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@
7171
import org.elasticsearch.common.xcontent.XContentParser;
7272
import org.elasticsearch.common.xcontent.XContentType;
7373
import org.elasticsearch.core.internal.io.Streams;
74-
import org.elasticsearch.index.shard.IndexShard;
74+
import org.elasticsearch.index.mapper.MapperService;
7575
import org.elasticsearch.index.shard.ShardId;
7676
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
7777
import org.elasticsearch.index.snapshots.IndexShardSnapshotException;
@@ -830,8 +830,8 @@ private void writeAtomic(final String blobName, final BytesReference bytesRef, b
830830
}
831831

832832
@Override
833-
public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
834-
IndexShardSnapshotStatus snapshotStatus) {
833+
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
834+
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
835835
SnapshotContext snapshotContext = new SnapshotContext(store, snapshotId, indexId, snapshotStatus, System.currentTimeMillis());
836836
try {
837837
snapshotContext.snapshot(snapshotIndexCommit);
@@ -846,18 +846,19 @@ public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId,
846846
}
847847

848848
@Override
849-
public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId,
850-
RecoveryState recoveryState) {
851-
final Context context = new Context(snapshotId, indexId, shard.shardId(), snapshotShardId);
849+
public void restoreShard(Store store, SnapshotId snapshotId,
850+
Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
851+
ShardId shardId = store.shardId();
852+
final Context context = new Context(snapshotId, indexId, shardId, snapshotShardId);
852853
BlobPath path = basePath().add("indices").add(indexId.getId()).add(Integer.toString(snapshotShardId.getId()));
853854
BlobContainer blobContainer = blobStore().blobContainer(path);
854-
final RestoreContext snapshotContext = new RestoreContext(shard, snapshotId, recoveryState, blobContainer);
855+
final RestoreContext snapshotContext = new RestoreContext(shardId, snapshotId, recoveryState, blobContainer);
855856
try {
856857
BlobStoreIndexShardSnapshot snapshot = context.loadSnapshot();
857858
SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles());
858-
snapshotContext.restore(snapshotFiles);
859+
snapshotContext.restore(snapshotFiles, store);
859860
} catch (Exception e) {
860-
throw new IndexShardRestoreFailedException(shard.shardId(), "failed to restore snapshot [" + snapshotId + "]", e);
861+
throw new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e);
861862
}
862863
}
863864

@@ -1403,13 +1404,13 @@ private class RestoreContext extends FileRestoreContext {
14031404

14041405
/**
14051406
* Constructs new restore context
1406-
* @param indexShard shard to restore into
1407+
* @param shardId shard id to restore into
14071408
* @param snapshotId snapshot id
14081409
* @param recoveryState recovery state to report progress
14091410
* @param blobContainer the blob container to read the files from
14101411
*/
1411-
RestoreContext(IndexShard indexShard, SnapshotId snapshotId, RecoveryState recoveryState, BlobContainer blobContainer) {
1412-
super(metadata.name(), indexShard, snapshotId, recoveryState, BUFFER_SIZE);
1412+
RestoreContext(ShardId shardId, SnapshotId snapshotId, RecoveryState recoveryState, BlobContainer blobContainer) {
1413+
super(metadata.name(), shardId, snapshotId, recoveryState, BUFFER_SIZE);
14131414
this.blobContainer = blobContainer;
14141415
}
14151416

server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.apache.lucene.util.BytesRefBuilder;
3232
import org.elasticsearch.common.lucene.Lucene;
3333
import org.elasticsearch.common.util.iterable.Iterables;
34-
import org.elasticsearch.index.shard.IndexShard;
3534
import org.elasticsearch.index.shard.ShardId;
3635
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
3736
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
@@ -64,7 +63,6 @@ public abstract class FileRestoreContext {
6463
protected static final Logger logger = LogManager.getLogger(FileRestoreContext.class);
6564

6665
protected final String repositoryName;
67-
protected final IndexShard indexShard;
6866
protected final RecoveryState recoveryState;
6967
protected final SnapshotId snapshotId;
7068
protected final ShardId shardId;
@@ -73,26 +71,24 @@ public abstract class FileRestoreContext {
7371
/**
7472
* Constructs new restore context
7573
*
76-
* @param indexShard shard to restore into
74+
* @param shardId shard id to restore into
7775
* @param snapshotId snapshot id
7876
* @param recoveryState recovery state to report progress
7977
* @param bufferSize buffer size for restore
8078
*/
81-
protected FileRestoreContext(String repositoryName, IndexShard indexShard, SnapshotId snapshotId, RecoveryState recoveryState,
79+
protected FileRestoreContext(String repositoryName, ShardId shardId, SnapshotId snapshotId, RecoveryState recoveryState,
8280
int bufferSize) {
8381
this.repositoryName = repositoryName;
8482
this.recoveryState = recoveryState;
85-
this.indexShard = indexShard;
8683
this.snapshotId = snapshotId;
87-
this.shardId = indexShard.shardId();
84+
this.shardId = shardId;
8885
this.bufferSize = bufferSize;
8986
}
9087

9188
/**
9289
* Performs restore operation
9390
*/
94-
public void restore(SnapshotFiles snapshotFiles) throws IOException {
95-
final Store store = indexShard.store();
91+
public void restore(SnapshotFiles snapshotFiles, Store store) throws IOException {
9692
store.incRef();
9793
try {
9894
logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, repositoryName, shardId);
@@ -108,7 +104,7 @@ public void restore(SnapshotFiles snapshotFiles) throws IOException {
108104
// version number and no checksum, even though the index itself is perfectly fine to restore, this
109105
// empty shard would cause exceptions to be thrown. Since there is no data to restore from an empty
110106
// shard anyway, we just create the empty shard here and then exit.
111-
store.createEmpty(indexShard.indexSettings().getIndexMetaData().getCreationVersion().luceneVersion);
107+
store.createEmpty(store.indexSettings().getIndexVersionCreated().luceneVersion);
112108
return;
113109
}
114110

@@ -117,7 +113,7 @@ public void restore(SnapshotFiles snapshotFiles) throws IOException {
117113
// this will throw an IOException if the store has no segments infos file. The
118114
// store can still have existing files but they will be deleted just before being
119115
// restored.
120-
recoveryTargetMetadata = indexShard.snapshotStoreMetadata();
116+
recoveryTargetMetadata = store.getMetadata(null, true);
121117
} catch (org.apache.lucene.index.IndexNotFoundException e) {
122118
// happens when restore to an empty shard, not a big deal
123119
logger.trace("[{}] [{}] restoring from to an empty shard", shardId, snapshotId);
@@ -127,7 +123,6 @@ public void restore(SnapshotFiles snapshotFiles) throws IOException {
127123
shardId, snapshotId), e);
128124
recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY;
129125
}
130-
131126
final List<BlobStoreIndexShardSnapshot.FileInfo> filesToRecover = new ArrayList<>();
132127
final Map<String, StoreFileMetaData> snapshotMetaData = new HashMap<>();
133128
final Map<String, BlobStoreIndexShardSnapshot.FileInfo> fileInfos = new HashMap<>();
@@ -157,7 +152,7 @@ public void restore(SnapshotFiles snapshotFiles) throws IOException {
157152
final Store.RecoveryDiff diff = sourceMetaData.recoveryDiff(recoveryTargetMetadata);
158153
for (StoreFileMetaData md : diff.identical) {
159154
BlobStoreIndexShardSnapshot.FileInfo fileInfo = fileInfos.get(md.name());
160-
recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length(), true);
155+
recoveryState.getIndex().addFileDetail(fileInfo.physicalName(), fileInfo.length(), true);
161156
if (logger.isTraceEnabled()) {
162157
logger.trace("[{}] [{}] not_recovering file [{}] from [{}], exists in local store and is same", shardId, snapshotId,
163158
fileInfo.physicalName(), fileInfo.name());
@@ -167,7 +162,7 @@ public void restore(SnapshotFiles snapshotFiles) throws IOException {
167162
for (StoreFileMetaData md : concat(diff)) {
168163
BlobStoreIndexShardSnapshot.FileInfo fileInfo = fileInfos.get(md.name());
169164
filesToRecover.add(fileInfo);
170-
recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length(), false);
165+
recoveryState.getIndex().addFileDetail(fileInfo.physicalName(), fileInfo.length(), false);
171166
if (logger.isTraceEnabled()) {
172167
logger.trace("[{}] [{}] recovering [{}] from [{}]", shardId, snapshotId,
173168
fileInfo.physicalName(), fileInfo.name());
@@ -260,7 +255,7 @@ private void restoreFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo, fi
260255
int length;
261256
while ((length = stream.read(buffer)) > 0) {
262257
indexOutput.writeBytes(buffer, 0, length);
263-
recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.name(), length);
258+
recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.physicalName(), length);
264259
}
265260
Store.verify(indexOutput);
266261
indexOutput.close();

server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -365,8 +365,7 @@ private void snapshot(final IndexShard indexShard, final Snapshot snapshot, fina
365365
try {
366366
// we flush first to make sure we get the latest writes snapshotted
367367
try (Engine.IndexCommitRef snapshotRef = indexShard.acquireLastIndexCommit(true)) {
368-
repository.snapshotShard(indexShard, indexShard.store(), snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(),
369-
snapshotStatus);
368+
repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(), snapshotStatus);
370369
if (logger.isDebugEnabled()) {
371370
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
372371
logger.debug("snapshot ({}) completed to {} with {}", snapshot, repository, lastSnapshotStatus);

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2308,8 +2308,8 @@ public void testRestoreShard() throws IOException {
23082308
target.markAsRecovering("store", new RecoveryState(routing, localNode, null));
23092309
assertTrue(target.restoreFromRepository(new RestoreOnlyRepository("test") {
23102310
@Override
2311-
public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId,
2312-
RecoveryState recoveryState) {
2311+
public void restoreShard(Store store, SnapshotId snapshotId,
2312+
Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
23132313
try {
23142314
cleanLuceneIndex(targetStore.directory());
23152315
for (String file : sourceStore.directory().listAll()) {

0 commit comments

Comments
 (0)