Skip to content

Commit fc20264

Browse files
committed
Add Enrich index background task to cleanup old indices (#43746)
This PR adds a background maintenance task that is scheduled on the master node only. The deletion of an index is based on if it is not linked to a policy or if the enrich alias is not currently pointing at it. Synchronization has been added to make sure that no policy executions are running at the time of cleanup, and if any executions do occur, the marking process delays cleanup until next run.
1 parent 7ad9beb commit fc20264

File tree

9 files changed

+664
-24
lines changed

9 files changed

+664
-24
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/EnrichPolicy.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
*/
3030
public final class EnrichPolicy implements Writeable, ToXContentFragment {
3131

32-
private static final String ENRICH_INDEX_NAME_BASE = ".enrich-";
32+
public static final String ENRICH_INDEX_NAME_BASE = ".enrich-";
3333

3434
public static final String EXACT_MATCH_TYPE = "exact_match";
3535
public static final String[] SUPPORTED_POLICY_TYPES = new String[]{EXACT_MATCH_TYPE};

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.common.settings.Setting;
2121
import org.elasticsearch.common.settings.Settings;
2222
import org.elasticsearch.common.settings.SettingsFilter;
23+
import org.elasticsearch.common.unit.TimeValue;
2324
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
2425
import org.elasticsearch.env.Environment;
2526
import org.elasticsearch.env.NodeEnvironment;
@@ -56,6 +57,7 @@
5657
import java.util.Collections;
5758
import java.util.List;
5859
import java.util.Map;
60+
import java.util.concurrent.TimeUnit;
5961
import java.util.function.Supplier;
6062

6163
import static org.elasticsearch.xpack.core.XPackSettings.ENRICH_ENABLED_SETTING;
@@ -65,6 +67,9 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
6567
static final Setting<Integer> ENRICH_FETCH_SIZE_SETTING =
6668
Setting.intSetting("index.xpack.enrich.fetch_size", 10000, 1, 1000000, Setting.Property.NodeScope);
6769

70+
static final Setting<TimeValue> ENRICH_CLEANUP_PERIOD =
71+
Setting.timeSetting("enrich.cleanup_period", new TimeValue(15, TimeUnit.MINUTES), Setting.Property.NodeScope);
72+
6873
public static final Setting<Integer> COORDINATOR_PROXY_MAX_CONCURRENT_REQUESTS =
6974
Setting.intSetting("enrich.coordinator_proxy.max_concurrent_requests", 8, 1, 10000, Setting.Property.NodeScope);
7075

@@ -135,9 +140,18 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
135140
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
136141
NamedXContentRegistry xContentRegistry, Environment environment,
137142
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
143+
EnrichPolicyLocks enrichPolicyLocks = new EnrichPolicyLocks();
138144
EnrichPolicyExecutor enrichPolicyExecutor = new EnrichPolicyExecutor(settings, clusterService, client, threadPool,
139-
new IndexNameExpressionResolver(), System::currentTimeMillis);
140-
return Arrays.asList(enrichPolicyExecutor, new CoordinatorProxyAction.Coordinator(client, settings));
145+
new IndexNameExpressionResolver(), enrichPolicyLocks, System::currentTimeMillis);
146+
EnrichPolicyMaintenanceService enrichPolicyMaintenanceService = new EnrichPolicyMaintenanceService(settings, client,
147+
clusterService, threadPool, enrichPolicyLocks);
148+
enrichPolicyMaintenanceService.initialize();
149+
return Arrays.asList(
150+
enrichPolicyLocks,
151+
enrichPolicyExecutor,
152+
new CoordinatorProxyAction.Coordinator(client, settings),
153+
enrichPolicyMaintenanceService
154+
);
141155
}
142156

143157
@Override
@@ -159,6 +173,7 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
159173
public List<Setting<?>> getSettings() {
160174
return Arrays.asList(
161175
ENRICH_FETCH_SIZE_SETTING,
176+
ENRICH_CLEANUP_PERIOD,
162177
COORDINATOR_PROXY_MAX_CONCURRENT_REQUESTS,
163178
COORDINATOR_PROXY_MAX_LOOKUPS_PER_REQUEST,
164179
COORDINATOR_PROXY_QUEUE_CAPACITY

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,13 @@
66

77
package org.elasticsearch.xpack.enrich;
88

9-
import java.util.concurrent.ConcurrentHashMap;
10-
import java.util.concurrent.Semaphore;
119
import java.util.function.LongSupplier;
1210

1311
import org.elasticsearch.action.ActionListener;
1412
import org.elasticsearch.client.Client;
1513
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1614
import org.elasticsearch.cluster.service.ClusterService;
1715
import org.elasticsearch.common.settings.Settings;
18-
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
1916
import org.elasticsearch.threadpool.ThreadPool;
2017
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
2118

@@ -27,34 +24,24 @@ public class EnrichPolicyExecutor {
2724
private final IndexNameExpressionResolver indexNameExpressionResolver;
2825
private final LongSupplier nowSupplier;
2926
private final int fetchSize;
30-
private final ConcurrentHashMap<String, Semaphore> policyLocks = new ConcurrentHashMap<>();
27+
private final EnrichPolicyLocks policyLocks;
3128

3229
EnrichPolicyExecutor(Settings settings,
3330
ClusterService clusterService,
3431
Client client,
3532
ThreadPool threadPool,
3633
IndexNameExpressionResolver indexNameExpressionResolver,
34+
EnrichPolicyLocks policyLocks,
3735
LongSupplier nowSupplier) {
3836
this.clusterService = clusterService;
3937
this.client = client;
4038
this.threadPool = threadPool;
4139
this.indexNameExpressionResolver = indexNameExpressionResolver;
4240
this.nowSupplier = nowSupplier;
41+
this.policyLocks = policyLocks;
4342
this.fetchSize = EnrichPlugin.ENRICH_FETCH_SIZE_SETTING.get(settings);
4443
}
4544

46-
private void tryLockingPolicy(String policyName) {
47-
Semaphore runLock = policyLocks.computeIfAbsent(policyName, (name) -> new Semaphore(1));
48-
if (runLock.tryAcquire() == false) {
49-
throw new EsRejectedExecutionException("Policy execution failed. Policy execution for [" + policyName +
50-
"] is already in progress.");
51-
}
52-
}
53-
54-
private void releasePolicy(String policyName) {
55-
policyLocks.remove(policyName);
56-
}
57-
5845
private class PolicyUnlockingListener implements ActionListener<PolicyExecutionResult> {
5946
private final String policyName;
6047
private final ActionListener<PolicyExecutionResult> listener;
@@ -66,13 +53,13 @@ private class PolicyUnlockingListener implements ActionListener<PolicyExecutionR
6653

6754
@Override
6855
public void onResponse(PolicyExecutionResult policyExecutionResult) {
69-
releasePolicy(policyName);
56+
policyLocks.releasePolicy(policyName);
7057
listener.onResponse(policyExecutionResult);
7158
}
7259

7360
@Override
7461
public void onFailure(Exception e) {
75-
releasePolicy(policyName);
62+
policyLocks.releasePolicy(policyName);
7663
listener.onFailure(e);
7764
}
7865
}
@@ -93,13 +80,13 @@ public void runPolicy(String policyId, ActionListener<PolicyExecutionResult> lis
9380
}
9481

9582
public void runPolicy(String policyName, EnrichPolicy policy, ActionListener<PolicyExecutionResult> listener) {
96-
tryLockingPolicy(policyName);
83+
policyLocks.lockPolicy(policyName);
9784
try {
9885
Runnable runnable = createPolicyRunner(policyName, policy, new PolicyUnlockingListener(policyName, listener));
9986
threadPool.executor(ThreadPool.Names.GENERIC).execute(runnable);
10087
} catch (Exception e) {
10188
// Be sure to unlock if submission failed.
102-
releasePolicy(policyName);
89+
policyLocks.releasePolicy(policyName);
10390
throw e;
10491
}
10592
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.enrich;
7+
8+
import java.util.concurrent.ConcurrentHashMap;
9+
import java.util.concurrent.Semaphore;
10+
import java.util.concurrent.atomic.AtomicLong;
11+
import java.util.concurrent.locks.ReadWriteLock;
12+
import java.util.concurrent.locks.ReentrantReadWriteLock;
13+
14+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
15+
16+
/**
17+
* A coordination object that allows multiple distinct polices to be executed concurrently, but also makes sure that a single
18+
* policy can only have one execution in flight at a time. Additionally, this class allows for capturing the current execution
19+
* state of any policy executions in flight. This execution state can be captured and then later be used to verify that no policy
20+
* executions have started in the time between the first state capturing.
21+
*/
22+
class EnrichPolicyLocks {
23+
24+
/**
25+
* A snapshot in time detailing if any policy executions are in flight and total number of local executions that
26+
* have been kicked off since the node has started
27+
*/
28+
static class EnrichPolicyExecutionState {
29+
final boolean arePoliciesInFlight;
30+
final long executions;
31+
32+
EnrichPolicyExecutionState(boolean arePoliciesInFlight, long executions) {
33+
this.arePoliciesInFlight = arePoliciesInFlight;
34+
this.executions = executions;
35+
}
36+
}
37+
38+
/**
39+
* A read-write lock that allows for policies to be executed concurrently with minimal overhead, but allows for blocking
40+
* policy locking operations while capturing the state of policy executions.
41+
*/
42+
private final ReadWriteLock currentStateLock = new ReentrantReadWriteLock(true);
43+
44+
/**
45+
* A mapping of policy name to a semaphore used for ensuring that a single policy can only have one execution in flight
46+
* at a time.
47+
*/
48+
private final ConcurrentHashMap<String, Semaphore> policyLocks = new ConcurrentHashMap<>();
49+
50+
/**
51+
* A counter that is used as a sort of policy execution sequence id / dirty bit. This is incremented every time a policy
52+
* successfully acquires an execution lock.
53+
*/
54+
private final AtomicLong policyRunCounter = new AtomicLong(0L);
55+
56+
/**
57+
* Locks a policy for execution. If the policy is currently executing, this method will immediately throw without waiting.
58+
* This method only blocks if another thread is currently capturing the current policy execution state.
59+
* @param policyName The policy name to lock for execution
60+
* @throws EsRejectedExecutionException if the policy is locked already or if the maximum number of concurrent policy executions
61+
* has been reached
62+
*/
63+
void lockPolicy(String policyName) {
64+
currentStateLock.readLock().lock();
65+
try {
66+
Semaphore runLock = policyLocks.computeIfAbsent(policyName, (name) -> new Semaphore(1));
67+
boolean acquired = runLock.tryAcquire();
68+
if (acquired == false) {
69+
throw new EsRejectedExecutionException("Policy execution failed. Policy execution for [" + policyName +
70+
"] is already in progress.");
71+
}
72+
policyRunCounter.incrementAndGet();
73+
} finally {
74+
currentStateLock.readLock().unlock();
75+
}
76+
}
77+
78+
/**
79+
* Captures a snapshot of the current policy execution state. This method never blocks, instead assuming that a policy is
80+
* currently starting its execution and returns an appropriate state.
81+
* @return The current state of in-flight policy executions
82+
*/
83+
EnrichPolicyExecutionState captureExecutionState() {
84+
if (currentStateLock.writeLock().tryLock()) {
85+
try {
86+
long revision = policyRunCounter.get();
87+
long currentPolicyExecutions = policyLocks.mappingCount();
88+
return new EnrichPolicyExecutionState(currentPolicyExecutions > 0L, revision);
89+
} finally {
90+
currentStateLock.writeLock().unlock();
91+
}
92+
}
93+
return new EnrichPolicyExecutionState(true, policyRunCounter.get());
94+
}
95+
96+
/**
97+
* Checks if the current execution state matches that of the given execution state. Used to ensure that over a period of time
98+
* no changes to the policy execution state have occurred.
99+
* @param previousState The previous state to check the current state against
100+
* @return true if the current state matches the given previous state, false if policy executions have changed over time.
101+
*/
102+
boolean isSameState(EnrichPolicyExecutionState previousState) {
103+
EnrichPolicyExecutionState currentState = captureExecutionState();
104+
return currentState.arePoliciesInFlight == previousState.arePoliciesInFlight &&
105+
currentState.executions == previousState.executions;
106+
}
107+
108+
/**
109+
* Releases the lock for a given policy name, allowing it to be executed.
110+
* @param policyName The policy to release.
111+
*/
112+
void releasePolicy(String policyName) {
113+
currentStateLock.readLock().lock();
114+
try {
115+
policyLocks.remove(policyName);
116+
} finally {
117+
currentStateLock.readLock().unlock();
118+
}
119+
}
120+
}

0 commit comments

Comments
 (0)