-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce global checkpoint background sync #26591
Changes from 34 commits
c0e7443
511f96e
e1dc814
86ddf79
e0657a7
b79b13f
f7a76cd
3881d76
89bbf84
1082a31
15453b4
aa0c62c
721f725
9bc5155
63e9d80
6ade720
354df1c
79742ea
d9fc19d
afb082f
9bf875e
007d5c4
cb9373b
7e6d1bf
f7295ac
f237d88
e88b92c
030156d
0343372
cf4e67b
f82df30
507806b
530addd
f3b04dc
5030a77
b640b10
cccdec6
d43f794
26e4c76
b80b728
b8adcce
c041ea2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,7 +22,6 @@ | |
import org.apache.lucene.util.SetOnce; | ||
import org.elasticsearch.client.Client; | ||
import org.elasticsearch.common.Strings; | ||
import org.elasticsearch.common.TriFunction; | ||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry; | ||
import org.elasticsearch.common.settings.Setting; | ||
import org.elasticsearch.common.settings.Setting.Property; | ||
|
@@ -40,6 +39,7 @@ | |
import org.elasticsearch.index.shard.IndexSearcherWrapper; | ||
import org.elasticsearch.index.shard.IndexingOperationListener; | ||
import org.elasticsearch.index.shard.SearchOperationListener; | ||
import org.elasticsearch.index.shard.ShardId; | ||
import org.elasticsearch.index.similarity.BM25SimilarityProvider; | ||
import org.elasticsearch.index.similarity.SimilarityProvider; | ||
import org.elasticsearch.index.similarity.SimilarityService; | ||
|
@@ -330,7 +330,8 @@ public IndexService newIndexService( | |
IndicesQueryCache indicesQueryCache, | ||
MapperRegistry mapperRegistry, | ||
IndicesFieldDataCache indicesFieldDataCache, | ||
NamedWriteableRegistry namedWriteableRegistry) | ||
NamedWriteableRegistry namedWriteableRegistry, | ||
Consumer<ShardId> globalCheckpointSyncer) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove this and remove all changes to IndicesService.createIndex etc. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I pushed 26e4c76. |
||
throws IOException { | ||
final IndexEventListener eventListener = freeze(); | ||
IndexSearcherWrapperFactory searcherWrapperFactory = indexSearcherWrapper.get() == null | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,11 +25,15 @@ | |
import org.apache.lucene.store.AlreadyClosedException; | ||
import org.apache.lucene.util.Accountable; | ||
import org.apache.lucene.util.IOUtils; | ||
import org.elasticsearch.action.ActionListener; | ||
import org.elasticsearch.client.Client; | ||
import org.elasticsearch.cluster.metadata.IndexMetaData; | ||
import org.elasticsearch.cluster.routing.ShardRouting; | ||
import org.elasticsearch.common.Nullable; | ||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry; | ||
import org.elasticsearch.common.lease.Releasable; | ||
import org.elasticsearch.common.settings.Setting; | ||
import org.elasticsearch.common.settings.Setting.Property; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.unit.TimeValue; | ||
import org.elasticsearch.common.util.BigArrays; | ||
|
@@ -82,6 +86,7 @@ | |
import java.util.concurrent.ScheduledFuture; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.function.Consumer; | ||
import java.util.function.LongSupplier; | ||
import java.util.function.Supplier; | ||
|
||
|
@@ -109,10 +114,11 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust | |
private final AtomicBoolean closed = new AtomicBoolean(false); | ||
private final AtomicBoolean deleted = new AtomicBoolean(false); | ||
private final IndexSettings indexSettings; | ||
private final List<IndexingOperationListener> indexingOperationListeners; | ||
private final List<SearchOperationListener> searchOperationListeners; | ||
private final List<IndexingOperationListener> indexingOperationListeners; | ||
private volatile AsyncRefreshTask refreshTask; | ||
private volatile AsyncTranslogFSync fsyncTask; | ||
private volatile AsyncGlobalCheckpointTask globalCheckpointTask; | ||
|
||
// don't convert to Setting<> and register... we only set this in tests and register via a plugin | ||
private final String INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING = "index.translog.retention.check_interval"; | ||
|
@@ -182,11 +188,12 @@ public IndexService( | |
this.engineFactory = engineFactory; | ||
// initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE | ||
this.searcherWrapper = wrapperFactory.newWrapper(this); | ||
this.indexingOperationListeners = Collections.unmodifiableList(indexingOperationListeners); | ||
this.searchOperationListeners = Collections.unmodifiableList(searchOperationListeners); | ||
this.indexingOperationListeners = Collections.unmodifiableList(indexingOperationListeners); | ||
// kick off async ops for the first shard in this index | ||
this.refreshTask = new AsyncRefreshTask(this); | ||
this.trimTranslogTask = new AsyncTrimTranslogTask(this); | ||
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this); | ||
rescheduleFsyncTask(indexSettings.getTranslogDurability()); | ||
} | ||
|
||
|
@@ -268,7 +275,15 @@ public synchronized void close(final String reason, boolean delete) throws IOExc | |
} | ||
} | ||
} finally { | ||
IOUtils.close(bitsetFilterCache, indexCache, indexFieldData, mapperService, refreshTask, fsyncTask, trimTranslogTask); | ||
IOUtils.close( | ||
bitsetFilterCache, | ||
indexCache, | ||
indexFieldData, | ||
mapperService, | ||
refreshTask, | ||
fsyncTask, | ||
trimTranslogTask, | ||
globalCheckpointTask); | ||
} | ||
} | ||
} | ||
|
@@ -293,8 +308,7 @@ private long getAvgShardSizeInBytes() throws IOException { | |
} | ||
} | ||
|
||
public synchronized IndexShard createShard(ShardRouting routing) throws IOException { | ||
final boolean primary = routing.primary(); | ||
public synchronized IndexShard createShard(ShardRouting routing, Consumer<ShardId> globalCheckpointSyncer) throws IOException { | ||
/* | ||
* TODO: we execute this in parallel but it's a synced method. Yet, we might | ||
* be able to serialize the execution via the cluster state in the future. for now we just | ||
|
@@ -365,7 +379,7 @@ public synchronized IndexShard createShard(ShardRouting routing) throws IOExcept | |
indexShard = new IndexShard(routing, this.indexSettings, path, store, indexSortSupplier, | ||
indexCache, mapperService, similarityService, engineFactory, | ||
eventListener, searcherWrapper, threadPool, bigArrays, engineWarmer, | ||
searchOperationListeners, indexingOperationListeners); | ||
searchOperationListeners, indexingOperationListeners, () -> globalCheckpointSyncer.accept(shardId)); | ||
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); | ||
eventListener.afterIndexShardCreated(indexShard); | ||
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap(); | ||
|
@@ -710,6 +724,44 @@ private void maybeTrimTranslog() { | |
} | ||
} | ||
|
||
private void maybeSyncGlobalCheckpoints() { | ||
for (final IndexShard shard : this.shards.values()) { | ||
if (shard.routingEntry().active() && shard.routingEntry().primary()) { | ||
switch (shard.state()) { | ||
case CLOSED: | ||
case CREATED: | ||
case RECOVERING: | ||
case RELOCATED: | ||
continue; | ||
case POST_RECOVERY: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. routing entry cannot be active while shard state is POST_RECOVERY ;-) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Addressed in 9bf875e. |
||
assert false : "shard " + shard.shardId() + " is in post-recovery but marked as active"; | ||
continue; | ||
case STARTED: | ||
try { | ||
shard.acquirePrimaryOperationPermit( | ||
ActionListener.wrap( | ||
releasable -> { | ||
try (Releasable ignored = releasable) { | ||
shard.maybeSyncGlobalCheckpoint("background"); | ||
} | ||
}, | ||
e -> { | ||
if (!(e instanceof AlreadyClosedException || e instanceof IndexShardClosedException)) { | ||
logger.info("failed to execute background global checkpoint sync", e); | ||
} | ||
}), | ||
ThreadPool.Names.SAME); | ||
} catch (final AlreadyClosedException | IndexShardClosedException e) { | ||
// the shard was closed concurrently, continue | ||
} | ||
continue; | ||
default: | ||
throw new IllegalStateException("unknown state [" + shard.state() + "]"); | ||
} | ||
} | ||
} | ||
} | ||
|
||
abstract static class BaseAsyncTask implements Runnable, Closeable { | ||
protected final IndexService indexService; | ||
protected final ThreadPool threadPool; | ||
|
@@ -877,6 +929,41 @@ public String toString() { | |
} | ||
} | ||
|
||
// this setting is intentionally not registered, it is only used in tests | ||
public static final Setting<TimeValue> GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING = | ||
Setting.timeSetting( | ||
"index.global_checkpoint_sync.interval", | ||
new TimeValue(30, TimeUnit.SECONDS), | ||
new TimeValue(0, TimeUnit.MILLISECONDS), | ||
Property.Dynamic, | ||
Property.IndexScope); | ||
|
||
/** | ||
* Background task that syncs the global checkpoint to replicas. | ||
*/ | ||
final class AsyncGlobalCheckpointTask extends BaseAsyncTask { | ||
|
||
AsyncGlobalCheckpointTask(final IndexService indexService) { | ||
// index.global_checkpoint_sync_interval is not a real setting, it is only registered in tests | ||
super(indexService, GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.get(indexService.getIndexSettings().getSettings())); | ||
} | ||
|
||
@Override | ||
protected void runInternal() { | ||
indexService.maybeSyncGlobalCheckpoints(); | ||
} | ||
|
||
@Override | ||
protected String getThreadPool() { | ||
return ThreadPool.Names.GENERIC; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "global_checkpoint_sync"; | ||
} | ||
} | ||
|
||
AsyncRefreshTask getRefreshTask() { // for tests | ||
return refreshTask; | ||
} | ||
|
@@ -885,6 +972,10 @@ AsyncTranslogFSync getFsyncTask() { // for tests | |
return fsyncTask; | ||
} | ||
|
||
AsyncGlobalCheckpointTask getGlobalCheckpointTask() { | ||
return globalCheckpointTask; | ||
} | ||
|
||
/** | ||
* Clears the caches for the given shard id if the shard is still allocated on this node | ||
*/ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IDE gone rogue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed b8adcce.