Skip to content

Commit db57d22

Browse files
committed
Prevent delete policy for active executing policy (#45472)
This commit adds a lock to the delete policy, in the same way that the locking is done for policy execution. It also creates a test to exercise the delete transport action, and modifies an existing test to provide a common set of functions for saving and deleting policies.
1 parent 03f45da commit db57d22

File tree

8 files changed

+204
-57
lines changed

8 files changed

+204
-57
lines changed

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyLocks.java

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,24 @@
1919
* state of any policy executions in flight. This execution state can be captured and then later be used to verify that no policy
2020
* executions have started in the time between the first state capturing.
2121
*/
22-
class EnrichPolicyLocks {
22+
public class EnrichPolicyLocks {
2323

2424
/**
2525
* A snapshot in time detailing if any policy executions are in flight and total number of local executions that
2626
* have been kicked off since the node has started
2727
*/
28-
static class EnrichPolicyExecutionState {
29-
final boolean arePoliciesInFlight;
28+
public static class EnrichPolicyExecutionState {
29+
final boolean anyPolicyInFlight;
3030
final long executions;
3131

32-
EnrichPolicyExecutionState(boolean arePoliciesInFlight, long executions) {
33-
this.arePoliciesInFlight = arePoliciesInFlight;
32+
EnrichPolicyExecutionState(boolean anyPolicyInFlight, long executions) {
33+
this.anyPolicyInFlight = anyPolicyInFlight;
3434
this.executions = executions;
3535
}
36+
37+
public boolean isAnyPolicyInFlight() {
38+
return anyPolicyInFlight;
39+
}
3640
}
3741

3842
/**
@@ -54,19 +58,19 @@ static class EnrichPolicyExecutionState {
5458
private final AtomicLong policyRunCounter = new AtomicLong(0L);
5559

5660
/**
57-
* Locks a policy for execution. If the policy is currently executing, this method will immediately throw without waiting.
58-
* This method only blocks if another thread is currently capturing the current policy execution state.
61+
* Locks a policy to prevent concurrent execution. If the policy is currently executing, this method will immediately
62+
* throw without waiting. This method only blocks if another thread is currently capturing the current policy execution state.
5963
* @param policyName The policy name to lock for execution
6064
* @throws EsRejectedExecutionException if the policy is locked already or if the maximum number of concurrent policy executions
6165
* has been reached
6266
*/
63-
void lockPolicy(String policyName) {
67+
public void lockPolicy(String policyName) {
6468
currentStateLock.readLock().lock();
6569
try {
6670
Semaphore runLock = policyLocks.computeIfAbsent(policyName, (name) -> new Semaphore(1));
6771
boolean acquired = runLock.tryAcquire();
6872
if (acquired == false) {
69-
throw new EsRejectedExecutionException("Policy execution failed. Policy execution for [" + policyName +
73+
throw new EsRejectedExecutionException("Could not obtain lock because policy execution for [" + policyName +
7074
"] is already in progress.");
7175
}
7276
policyRunCounter.incrementAndGet();
@@ -80,7 +84,7 @@ void lockPolicy(String policyName) {
8084
* currently starting its execution and returns an appropriate state.
8185
* @return The current state of in-flight policy executions
8286
*/
83-
EnrichPolicyExecutionState captureExecutionState() {
87+
public EnrichPolicyExecutionState captureExecutionState() {
8488
if (currentStateLock.writeLock().tryLock()) {
8589
try {
8690
long revision = policyRunCounter.get();
@@ -101,15 +105,15 @@ EnrichPolicyExecutionState captureExecutionState() {
101105
*/
102106
boolean isSameState(EnrichPolicyExecutionState previousState) {
103107
EnrichPolicyExecutionState currentState = captureExecutionState();
104-
return currentState.arePoliciesInFlight == previousState.arePoliciesInFlight &&
108+
return currentState.anyPolicyInFlight == previousState.anyPolicyInFlight &&
105109
currentState.executions == previousState.executions;
106110
}
107111

108112
/**
109113
* Releases the lock for a given policy name, allowing it to be executed.
110114
* @param policyName The policy to release.
111115
*/
112-
void releasePolicy(String policyName) {
116+
public void releasePolicy(String policyName) {
113117
currentStateLock.readLock().lock();
114118
try {
115119
policyLocks.remove(policyName);

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyMaintenanceService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ void cleanUpEnrichIndices() {
137137
.indicesOptions(IndicesOptions.lenientExpand());
138138
// Check that no enrich policies are being executed
139139
final EnrichPolicyLocks.EnrichPolicyExecutionState executionState = enrichPolicyLocks.captureExecutionState();
140-
if (executionState.arePoliciesInFlight == false) {
140+
if (executionState.isAnyPolicyInFlight() == false) {
141141
client.admin().indices().getIndex(indices, new ActionListener<GetIndexResponse>() {
142142
@Override
143143
public void onResponse(GetIndexResponse getIndexResponse) {

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportDeleteEnrichPolicyAction.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
2626
import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
2727
import org.elasticsearch.xpack.enrich.AbstractEnrichProcessor;
28+
import org.elasticsearch.xpack.enrich.EnrichPolicyLocks;
2829
import org.elasticsearch.xpack.enrich.EnrichStore;
2930

3031
import java.io.IOException;
@@ -33,6 +34,7 @@
3334

3435
public class TransportDeleteEnrichPolicyAction extends TransportMasterNodeAction<DeleteEnrichPolicyAction.Request, AcknowledgedResponse> {
3536

37+
private final EnrichPolicyLocks enrichPolicyLocks;
3638
private final IngestService ingestService;
3739

3840
@Inject
@@ -41,9 +43,11 @@ public TransportDeleteEnrichPolicyAction(TransportService transportService,
4143
ThreadPool threadPool,
4244
ActionFilters actionFilters,
4345
IndexNameExpressionResolver indexNameExpressionResolver,
46+
EnrichPolicyLocks enrichPolicyLocks,
4447
IngestService ingestService) {
4548
super(DeleteEnrichPolicyAction.NAME, transportService, clusterService, threadPool, actionFilters,
4649
DeleteEnrichPolicyAction.Request::new, indexNameExpressionResolver);
50+
this.enrichPolicyLocks = enrichPolicyLocks;
4751
this.ingestService = ingestService;
4852
}
4953

@@ -64,6 +68,7 @@ protected AcknowledgedResponse read(StreamInput in) throws IOException {
6468
@Override
6569
protected void masterOperation(DeleteEnrichPolicyAction.Request request, ClusterState state,
6670
ActionListener<AcknowledgedResponse> listener) throws Exception {
71+
enrichPolicyLocks.lockPolicy(request.getName());
6772
List<PipelineConfiguration> pipelines = IngestService.getPipelines(state);
6873
EnrichPolicy policy = EnrichStore.getPolicy(request.getName(), state);
6974
List<String> pipelinesWithProcessors = new ArrayList<>();
@@ -79,13 +84,15 @@ protected void masterOperation(DeleteEnrichPolicyAction.Request request, Cluster
7984
}
8085

8186
if (pipelinesWithProcessors.isEmpty() == false) {
87+
enrichPolicyLocks.releasePolicy(request.getName());
8288
listener.onFailure(
8389
new ElasticsearchStatusException("Could not delete policy [{}] because a pipeline is referencing it {}",
8490
RestStatus.CONFLICT, request.getName(), pipelinesWithProcessors));
8591
return;
8692
}
8793

8894
EnrichStore.deletePolicy(request.getName(), clusterService, e -> {
95+
enrichPolicyLocks.releasePolicy(request.getName());
8996
if (e == null) {
9097
listener.onResponse(new AcknowledgedResponse(true));
9198
} else {
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.enrich;
7+
8+
import org.elasticsearch.cluster.service.ClusterService;
9+
import org.elasticsearch.plugins.Plugin;
10+
import org.elasticsearch.test.ESSingleNodeTestCase;
11+
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
12+
13+
import java.util.Collection;
14+
import java.util.Collections;
15+
import java.util.concurrent.CountDownLatch;
16+
import java.util.concurrent.atomic.AtomicReference;
17+
18+
public abstract class AbstractEnrichTestCase extends ESSingleNodeTestCase {
19+
20+
@Override
21+
protected Collection<Class<? extends Plugin>> getPlugins() {
22+
return Collections.singletonList(LocalStateEnrich.class);
23+
}
24+
25+
protected AtomicReference<Exception> saveEnrichPolicy(String name, EnrichPolicy policy,
26+
ClusterService clusterService) throws InterruptedException {
27+
CountDownLatch latch = new CountDownLatch(1);
28+
AtomicReference<Exception> error = new AtomicReference<>();
29+
EnrichStore.putPolicy(name, policy, clusterService, e -> {
30+
error.set(e);
31+
latch.countDown();
32+
});
33+
latch.await();
34+
return error;
35+
}
36+
37+
void deleteEnrichPolicy(String name, ClusterService clusterService) throws Exception {
38+
CountDownLatch latch = new CountDownLatch(1);
39+
AtomicReference<Exception> error = new AtomicReference<>();
40+
EnrichStore.deletePolicy(name, clusterService, e -> {
41+
error.set(e);
42+
latch.countDown();
43+
});
44+
latch.await();
45+
if (error.get() != null){
46+
throw error.get();
47+
}
48+
}
49+
}

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public void testNonConcurrentPolicyExecution() throws InterruptedException {
127127
firstTaskComplete.await();
128128

129129
// Validate exception from second run
130-
assertThat(expected.getMessage(), containsString("Policy execution failed. Policy execution for [" + testPolicyName +
130+
assertThat(expected.getMessage(), containsString("Could not obtain lock because policy execution for [" + testPolicyName +
131131
"] is already in progress."));
132132

133133
// Ensure that the lock from the previous run has been cleared

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyLocksTests.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,14 @@ public void testLockPolicy() {
2424
// Ensure that locked policies are rejected
2525
EsRejectedExecutionException exception1 = expectThrows(EsRejectedExecutionException.class,
2626
() -> policyLocks.lockPolicy(policy1));
27-
assertThat(exception1.getMessage(), is(equalTo("Policy execution failed. Policy execution for [policy1]" +
27+
assertThat(exception1.getMessage(), is(equalTo("Could not obtain lock because policy execution for [policy1]" +
2828
" is already in progress.")));
2929

3030
policyLocks.lockPolicy(policy2);
3131
EsRejectedExecutionException exception2 = expectThrows(EsRejectedExecutionException.class,
3232
() -> policyLocks.lockPolicy(policy2));
3333

34-
assertThat(exception2.getMessage(), is(equalTo("Policy execution failed. Policy execution for [policy2]" +
34+
assertThat(exception2.getMessage(), is(equalTo("Could not obtain lock because policy execution for [policy2]" +
3535
" is already in progress.")));
3636
}
3737

@@ -42,13 +42,13 @@ public void testSafePoint() {
4242

4343
// Get exec state - should note as safe and revision 1 since nothing has happened yet
4444
executionState = policyLocks.captureExecutionState();
45-
assertThat(executionState.arePoliciesInFlight, is(false));
45+
assertThat(executionState.anyPolicyInFlight, is(false));
4646
assertThat(executionState.executions, is(0L));
4747
assertThat(policyLocks.isSameState(executionState), is(true));
4848

4949
// Get another exec state - should still note as safe and revision 1 since nothing has happened yet
5050
executionState = policyLocks.captureExecutionState();
51-
assertThat(executionState.arePoliciesInFlight, is(false));
51+
assertThat(executionState.anyPolicyInFlight, is(false));
5252
assertThat(executionState.executions, is(0L));
5353
assertThat(policyLocks.isSameState(executionState), is(true));
5454

@@ -57,7 +57,7 @@ public void testSafePoint() {
5757

5858
// Get a third exec state - should have a new revision and report unsafe since execution is in progress
5959
executionState = policyLocks.captureExecutionState();
60-
assertThat(executionState.arePoliciesInFlight, is(true));
60+
assertThat(executionState.anyPolicyInFlight, is(true));
6161
assertThat(executionState.executions, is(1L));
6262

6363
// Unlock the policy
@@ -66,13 +66,13 @@ public void testSafePoint() {
6666
// Get a fourth exec state - should have the same revision as third, and report no policies in flight since the previous execution
6767
// is complete
6868
executionState = policyLocks.captureExecutionState();
69-
assertThat(executionState.arePoliciesInFlight, is(false));
69+
assertThat(executionState.anyPolicyInFlight, is(false));
7070
assertThat(executionState.executions, is(1L));
7171

7272
// Create a fifth exec state, lock and release a policy, and check if the captured exec state is the same as the current state in
7373
// the lock object
7474
executionState = policyLocks.captureExecutionState();
75-
assertThat(executionState.arePoliciesInFlight, is(false));
75+
assertThat(executionState.anyPolicyInFlight, is(false));
7676
assertThat(executionState.executions, is(1L));
7777
policyLocks.lockPolicy(policy);
7878
policyLocks.releasePolicy(policy);

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichStoreTests.java renamed to x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichStoreCrudTests.java

Lines changed: 2 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -3,31 +3,22 @@
33
* or more contributor license agreements. Licensed under the Elastic License;
44
* you may not use this file except in compliance with the Elastic License.
55
*/
6+
67
package org.elasticsearch.xpack.enrich;
78

89
import org.elasticsearch.ResourceNotFoundException;
910
import org.elasticsearch.cluster.service.ClusterService;
1011
import org.elasticsearch.common.xcontent.XContentType;
11-
import org.elasticsearch.plugins.Plugin;
12-
import org.elasticsearch.test.ESSingleNodeTestCase;
1312
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
1413

15-
import java.util.Collection;
16-
import java.util.Collections;
1714
import java.util.Map;
18-
import java.util.concurrent.CountDownLatch;
1915
import java.util.concurrent.atomic.AtomicReference;
2016

2117
import static org.elasticsearch.xpack.enrich.EnrichPolicyTests.randomEnrichPolicy;
2218
import static org.hamcrest.Matchers.equalTo;
2319
import static org.hamcrest.Matchers.nullValue;
2420

25-
public class EnrichStoreTests extends ESSingleNodeTestCase {
26-
27-
@Override
28-
protected Collection<Class<? extends Plugin>> getPlugins() {
29-
return Collections.singletonList(LocalStateEnrich.class);
30-
}
21+
public class EnrichStoreCrudTests extends AbstractEnrichTestCase {
3122

3223
public void testCrud() throws Exception {
3324
EnrichPolicy policy = randomEnrichPolicy(XContentType.JSON);
@@ -137,29 +128,4 @@ public void testListValidation() {
137128
Map<String, EnrichPolicy> policies = EnrichStore.getPolicies(clusterService.state());
138129
assertTrue(policies.isEmpty());
139130
}
140-
141-
private AtomicReference<Exception> saveEnrichPolicy(String name, EnrichPolicy policy,
142-
ClusterService clusterService) throws InterruptedException {
143-
CountDownLatch latch = new CountDownLatch(1);
144-
AtomicReference<Exception> error = new AtomicReference<>();
145-
EnrichStore.putPolicy(name, policy, clusterService, e -> {
146-
error.set(e);
147-
latch.countDown();
148-
});
149-
latch.await();
150-
return error;
151-
}
152-
153-
private void deleteEnrichPolicy(String name, ClusterService clusterService) throws Exception {
154-
CountDownLatch latch = new CountDownLatch(1);
155-
AtomicReference<Exception> error = new AtomicReference<>();
156-
EnrichStore.deletePolicy(name, clusterService, e -> {
157-
error.set(e);
158-
latch.countDown();
159-
});
160-
latch.await();
161-
if (error.get() != null){
162-
throw error.get();
163-
}
164-
}
165131
}

0 commit comments

Comments
 (0)