Skip to content

Commit

Permalink
Remove the IndexCommitRef class
Browse files Browse the repository at this point in the history
This inner class is no longer required because its functionality has been moved to the generic GatedCloseable class.

Signed-off-by: Kartik Ganesh <gkart@amazon.com>
  • Loading branch information
kartg committed Mar 9, 2022
1 parent fb9e150 commit 4fb78e9
Show file tree
Hide file tree
Showing 15 changed files with 135 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@

package org.opensearch.action.admin.indices.forcemerge;

import org.apache.lucene.index.IndexCommit;
import org.opensearch.action.admin.indices.flush.FlushResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.Index;
import org.opensearch.index.engine.Engine;
Expand Down Expand Up @@ -99,8 +101,8 @@ public void testForceMergeUUIDConsistent() throws IOException {
}

private static String getForceMergeUUID(IndexShard indexShard) throws IOException {
try (Engine.IndexCommitRef indexCommitRef = indexShard.acquireLastIndexCommit(true)) {
return indexCommitRef.get().getUserData().get(Engine.FORCE_MERGE_UUID_KEY);
try (GatedCloseable<IndexCommit> wrappedIndexCommit = indexShard.acquireLastIndexCommit(true)) {
return wrappedIndexCommit.get().getUserData().get(Engine.FORCE_MERGE_UUID_KEY);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
package org.opensearch.indices.recovery;

import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.util.SetOnce;

import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
Expand Down Expand Up @@ -75,6 +75,7 @@
import org.opensearch.common.Strings;
import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.common.breaker.CircuitBreakingException;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.unit.ByteSizeValue;
Expand All @@ -88,7 +89,6 @@
import org.opensearch.index.MockEngineFactoryPlugin;
import org.opensearch.index.analysis.AbstractTokenFilterFactory;
import org.opensearch.index.analysis.TokenFilterFactory;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.mapper.MapperParsingException;
import org.opensearch.index.recovery.RecoveryStats;
import org.opensearch.index.seqno.ReplicationTracker;
Expand All @@ -114,11 +114,11 @@
import org.opensearch.snapshots.SnapshotState;
import org.opensearch.tasks.Task;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.InternalSettingsPlugin;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
import org.opensearch.test.OpenSearchIntegTestCase.Scope;
import org.opensearch.test.InternalSettingsPlugin;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.engine.MockEngineSupport;
import org.opensearch.test.store.MockFSIndexStore;
import org.opensearch.test.transport.MockTransportService;
Expand Down Expand Up @@ -151,12 +151,6 @@

import static java.util.Collections.singletonMap;
import static java.util.stream.Collectors.toList;
import static org.opensearch.action.DocWriteResponse.Result.CREATED;
import static org.opensearch.action.DocWriteResponse.Result.UPDATED;
import static org.opensearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
Expand All @@ -167,6 +161,11 @@
import static org.hamcrest.Matchers.isOneOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.opensearch.action.DocWriteResponse.Result.CREATED;
import static org.opensearch.action.DocWriteResponse.Result.UPDATED;
import static org.opensearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
public class IndexRecoveryIT extends OpenSearchIntegTestCase {
Expand Down Expand Up @@ -1599,9 +1598,9 @@ public void testRecoverLocallyUpToGlobalCheckpoint() throws Exception {
.getShardOrNull(new ShardId(resolveIndex(indexName), 0));
final long lastSyncedGlobalCheckpoint = shard.getLastSyncedGlobalCheckpoint();
final long localCheckpointOfSafeCommit;
try (Engine.IndexCommitRef safeCommitRef = shard.acquireSafeIndexCommit()) {
try (GatedCloseable<IndexCommit> wrappedSafeCommit = shard.acquireSafeIndexCommit()) {
localCheckpointOfSafeCommit = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(
safeCommitRef.get().getUserData().entrySet()
wrappedSafeCommit.get().getUserData().entrySet()
).localCheckpoint;
}
final long maxSeqNo = shard.seqNoStats().getMaxSeqNo();
Expand Down
11 changes: 2 additions & 9 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.apache.lucene.util.SetOnce;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.Nullable;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.collect.ImmutableOpenMap;
Expand Down Expand Up @@ -1109,12 +1108,12 @@ public abstract void forceMerge(
*
* @param flushFirst indicates whether the engine should flush before returning the snapshot
*/
public abstract IndexCommitRef acquireLastIndexCommit(boolean flushFirst) throws EngineException;
public abstract GatedCloseable<IndexCommit> acquireLastIndexCommit(boolean flushFirst) throws EngineException;

/**
* Snapshots the most recent safe index commit from the engine.
*/
public abstract IndexCommitRef acquireSafeIndexCommit() throws EngineException;
public abstract GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineException;

/**
* @return a summary of the contents of the current safe commit
Expand Down Expand Up @@ -1829,12 +1828,6 @@ private void awaitPendingClose() {
}
}

public static class IndexCommitRef extends GatedCloseable<IndexCommit> {
public IndexCommitRef(IndexCommit indexCommit, CheckedRunnable<IOException> onClose) {
super(indexCommit, onClose);
}
}

public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) {

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.opensearch.common.Booleans;
import org.opensearch.common.Nullable;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lucene.LoggerInfoStream;
import org.opensearch.common.lucene.Lucene;
Expand Down Expand Up @@ -103,10 +104,10 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.OpenSearchMergePolicy;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogCorruptedException;
import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogStats;
import org.opensearch.search.suggest.completion.CompletionStats;
Expand Down Expand Up @@ -2193,7 +2194,7 @@ public void forceMerge(
}

@Override
public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws EngineException {
public GatedCloseable<IndexCommit> acquireLastIndexCommit(final boolean flushFirst) throws EngineException {
// we have to flush outside of the readlock otherwise we might have a problem upgrading
// the to a write lock when we fail the engine in this operation
if (flushFirst) {
Expand All @@ -2202,13 +2203,13 @@ public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws En
logger.trace("finish flush for snapshot");
}
final IndexCommit lastCommit = combinedDeletionPolicy.acquireIndexCommit(false);
return new Engine.IndexCommitRef(lastCommit, () -> releaseIndexCommit(lastCommit));
return new GatedCloseable<>(lastCommit, () -> releaseIndexCommit(lastCommit));
}

@Override
public IndexCommitRef acquireSafeIndexCommit() throws EngineException {
public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineException {
final IndexCommit safeCommit = combinedDeletionPolicy.acquireIndexCommit(true);
return new Engine.IndexCommitRef(safeCommit, () -> releaseIndexCommit(safeCommit));
return new GatedCloseable<>(safeCommit, () -> releaseIndexCommit(safeCommit));
}

private void releaseIndexCommit(IndexCommit snapshot) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.lucene.store.Lock;
import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.util.concurrent.ReleasableLock;
Expand All @@ -49,9 +50,9 @@
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogStats;
import org.opensearch.search.suggest.completion.CompletionStats;
Expand Down Expand Up @@ -413,13 +414,13 @@ public void forceMerge(
) {}

@Override
public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) {
public GatedCloseable<IndexCommit> acquireLastIndexCommit(boolean flushFirst) {
store.incRef();
return new IndexCommitRef(indexCommit, store::decRef);
return new GatedCloseable<>(indexCommit, store::decRef);
}

@Override
public IndexCommitRef acquireSafeIndexCommit() {
public GatedCloseable<IndexCommit> acquireSafeIndexCommit() {
return acquireLastIndexCommit(false);
}

Expand Down
21 changes: 11 additions & 10 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@
import org.apache.lucene.util.SetOnce;
import org.apache.lucene.util.ThreadInterruptedException;
import org.opensearch.Assertions;
import org.opensearch.ExceptionsHelper;
import org.opensearch.LegacyESVersion;
import org.opensearch.OpenSearchException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionRunnable;
import org.opensearch.action.admin.indices.flush.FlushRequest;
Expand All @@ -73,6 +73,7 @@
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.Nullable;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
Expand Down Expand Up @@ -1409,7 +1410,7 @@ public org.apache.lucene.util.Version minimumCompatibleVersion() {
*
* @param flushFirst <code>true</code> if the index should first be flushed to disk / a low level lucene commit should be executed
*/
public Engine.IndexCommitRef acquireLastIndexCommit(boolean flushFirst) throws EngineException {
public GatedCloseable<IndexCommit> acquireLastIndexCommit(boolean flushFirst) throws EngineException {
final IndexShardState state = this.state; // one time volatile read
// we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine
if (state == IndexShardState.STARTED || state == IndexShardState.CLOSED) {
Expand All @@ -1423,7 +1424,7 @@ public Engine.IndexCommitRef acquireLastIndexCommit(boolean flushFirst) throws E
* Snapshots the most recent safe index commit from the currently running engine.
* All index files referenced by this index commit won't be freed until the commit/snapshot is closed.
*/
public Engine.IndexCommitRef acquireSafeIndexCommit() throws EngineException {
public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineException {
final IndexShardState state = this.state; // one time volatile read
// we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine
if (state == IndexShardState.STARTED || state == IndexShardState.CLOSED) {
Expand All @@ -1448,24 +1449,24 @@ public Engine.IndexCommitRef acquireSafeIndexCommit() throws EngineException {
*/
public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
assert Thread.holdsLock(mutex) == false : "snapshotting store metadata under mutex";
Engine.IndexCommitRef indexCommit = null;
GatedCloseable<IndexCommit> wrappedIndexCommit = null;
store.incRef();
try {
synchronized (engineMutex) {
// if the engine is not running, we can access the store directly, but we need to make sure no one starts
// the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine.
final Engine engine = getEngineOrNull();
if (engine != null) {
indexCommit = engine.acquireLastIndexCommit(false);
wrappedIndexCommit = engine.acquireLastIndexCommit(false);
}
if (indexCommit == null) {
if (wrappedIndexCommit == null) {
return store.getMetadata(null, true);
}
}
return store.getMetadata(indexCommit.get());
return store.getMetadata(wrappedIndexCommit.get());
} finally {
store.decRef();
IOUtils.close(indexCommit);
IOUtils.close(wrappedIndexCommit);
}
}

Expand Down Expand Up @@ -3913,7 +3914,7 @@ void resetEngineToGlobalCheckpoint() throws IOException {
true
) {
@Override
public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) {
public GatedCloseable<IndexCommit> acquireLastIndexCommit(boolean flushFirst) {
synchronized (engineMutex) {
if (newEngineReference.get() == null) {
throw new AlreadyClosedException("engine was closed");
Expand All @@ -3924,7 +3925,7 @@ public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) {
}

@Override
public IndexCommitRef acquireSafeIndexCommit() {
public GatedCloseable<IndexCommit> acquireSafeIndexCommit() {
synchronized (engineMutex) {
if (newEngineReference.get() == null) {
throw new AlreadyClosedException("engine was closed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@

package org.opensearch.index.shard;

import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.NoLockFactory;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.index.Index;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.store.Store;
Expand All @@ -52,7 +54,7 @@
final class LocalShardSnapshot implements Closeable {
private final IndexShard shard;
private final Store store;
private final Engine.IndexCommitRef indexCommit;
private final GatedCloseable<IndexCommit> wrappedIndexCommit;
private final AtomicBoolean closed = new AtomicBoolean(false);

LocalShardSnapshot(IndexShard shard) {
Expand All @@ -61,7 +63,7 @@ final class LocalShardSnapshot implements Closeable {
store.incRef();
boolean success = false;
try {
indexCommit = shard.acquireLastIndexCommit(true);
wrappedIndexCommit = shard.acquireLastIndexCommit(true);
success = true;
} finally {
if (success == false) {
Expand All @@ -88,7 +90,7 @@ Directory getSnapshotDirectory() {
return new FilterDirectory(store.directory()) {
@Override
public String[] listAll() throws IOException {
Collection<String> fileNames = indexCommit.get().getFileNames();
Collection<String> fileNames = wrappedIndexCommit.get().getFileNames();
final String[] fileNameArray = fileNames.toArray(new String[fileNames.size()]);
return fileNameArray;
}
Expand Down Expand Up @@ -143,7 +145,7 @@ public Set<String> getPendingDeletions() throws IOException {
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
try {
indexCommit.close();
wrappedIndexCommit.close();
} finally {
store.decRef();
}
Expand All @@ -156,6 +158,6 @@ IndexMetadata getIndexMetadata() {

@Override
public String toString() {
return "local_shard_snapshot:[" + shard.shardId() + " indexCommit: " + indexCommit + "]";
return "local_shard_snapshot:[" + shard.shardId() + " indexCommit: " + wrappedIndexCommit + "]";
}
}
Loading

0 comments on commit 4fb78e9

Please sign in to comment.