Skip to content

Commit 7c42cde

Browse files
authored
Make ingest executing non blocking (#43361)
Added an additional method to the Processor interface to allow a processor implementation to make a non blocking call. Also added semaphore in order to avoid search thread pools from rejecting search requests originating from the match processor. This is a temporary workaround.
1 parent 79ba94f commit 7c42cde

File tree

22 files changed

+803
-460
lines changed

22 files changed

+803
-460
lines changed

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@
2828
import java.util.List;
2929
import java.util.Map;
3030
import java.util.Set;
31+
import java.util.concurrent.CopyOnWriteArrayList;
32+
import java.util.function.BiConsumer;
33+
3134
import org.elasticsearch.script.ScriptService;
3235

3336
import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
@@ -63,29 +66,46 @@ boolean isIgnoreMissing() {
6366
}
6467

6568
@Override
66-
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
69+
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
6770
List<?> values = ingestDocument.getFieldValue(field, List.class, ignoreMissing);
6871
if (values == null) {
6972
if (ignoreMissing) {
70-
return ingestDocument;
73+
handler.accept(ingestDocument, null);
74+
} else {
75+
handler.accept(null, new IllegalArgumentException("field [" + field + "] is null, cannot loop over its elements."));
7176
}
72-
throw new IllegalArgumentException("field [" + field + "] is null, cannot loop over its elements.");
77+
} else {
78+
List<Object> newValues = new CopyOnWriteArrayList<>();
79+
innerExecute(0, values, newValues, ingestDocument, handler);
7380
}
74-
List<Object> newValues = new ArrayList<>(values.size());
75-
IngestDocument document = ingestDocument;
76-
for (Object value : values) {
77-
Object previousValue = ingestDocument.getIngestMetadata().put("_value", value);
78-
try {
79-
document = processor.execute(document);
80-
if (document == null) {
81-
return null;
82-
}
83-
} finally {
84-
newValues.add(ingestDocument.getIngestMetadata().put("_value", previousValue));
85-
}
81+
}
82+
83+
void innerExecute(int index, List<?> values, List<Object> newValues, IngestDocument document,
84+
BiConsumer<IngestDocument, Exception> handler) {
85+
if (index == values.size()) {
86+
document.setFieldValue(field, new ArrayList<>(newValues));
87+
handler.accept(document, null);
88+
return;
8689
}
87-
document.setFieldValue(field, newValues);
88-
return document;
90+
91+
Object value = values.get(index);
92+
Object previousValue = document.getIngestMetadata().put("_value", value);
93+
processor.execute(document, (result, e) -> {
94+
if (e != null) {
95+
newValues.add(document.getIngestMetadata().put("_value", previousValue));
96+
handler.accept(null, e);
97+
} else if (result == null) {
98+
handler.accept(null, null);
99+
} else {
100+
newValues.add(document.getIngestMetadata().put("_value", previousValue));
101+
innerExecute(index + 1, values, newValues, document, handler);
102+
}
103+
});
104+
}
105+
106+
@Override
107+
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
108+
throw new UnsupportedOperationException("this method should not get executed");
89109
}
90110

91111
@Override

modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public void testExecute() throws Exception {
5353
"_tag", "values", new UppercaseProcessor("_tag", "_ingest._value", false, "_ingest._value"),
5454
false
5555
);
56-
processor.execute(ingestDocument);
56+
processor.execute(ingestDocument, (result, e) -> {});
5757

5858
@SuppressWarnings("unchecked")
5959
List<String> result = ingestDocument.getFieldValue("values", List.class);
@@ -73,12 +73,9 @@ public void testExecuteWithFailure() throws Exception {
7373
}
7474
});
7575
ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor, false);
76-
try {
77-
processor.execute(ingestDocument);
78-
fail("exception expected");
79-
} catch (RuntimeException e) {
80-
assertThat(e.getMessage(), equalTo("failure"));
81-
}
76+
Exception[] exceptions = new Exception[1];
77+
processor.execute(ingestDocument, (result, e) -> {exceptions[0] = e;});
78+
assertThat(exceptions[0].getMessage(), equalTo("failure"));
8279
assertThat(testProcessor.getInvokedCounter(), equalTo(3));
8380
assertThat(ingestDocument.getFieldValue("values", List.class), equalTo(Arrays.asList("a", "b", "c")));
8481

@@ -95,7 +92,7 @@ public void testExecuteWithFailure() throws Exception {
9592
"_tag", "values", new CompoundProcessor(false, Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor)),
9693
false
9794
);
98-
processor.execute(ingestDocument);
95+
processor.execute(ingestDocument, (result, e) -> {});
9996
assertThat(testProcessor.getInvokedCounter(), equalTo(3));
10097
assertThat(ingestDocument.getFieldValue("values", List.class), equalTo(Arrays.asList("A", "B", "c")));
10198
}
@@ -114,7 +111,7 @@ public void testMetaDataAvailable() throws Exception {
114111
id.setFieldValue("_ingest._value.id", id.getSourceAndMetadata().get("_id"));
115112
});
116113
ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false);
117-
processor.execute(ingestDocument);
114+
processor.execute(ingestDocument, (result, e) -> {});
118115

119116
assertThat(innerProcessor.getInvokedCounter(), equalTo(2));
120117
assertThat(ingestDocument.getFieldValue("values.0.index", String.class), equalTo("_index"));
@@ -142,7 +139,7 @@ public void testRestOfTheDocumentIsAvailable() throws Exception {
142139
"_tag", "values", new SetProcessor("_tag",
143140
new TestTemplateService.MockTemplateScript.Factory("_ingest._value.new_field"),
144141
(model) -> model.get("other")), false);
145-
processor.execute(ingestDocument);
142+
processor.execute(ingestDocument, (result, e) -> {});
146143

147144
assertThat(ingestDocument.getFieldValue("values.0.new_field", String.class), equalTo("value"));
148145
assertThat(ingestDocument.getFieldValue("values.1.new_field", String.class), equalTo("value"));
@@ -180,7 +177,7 @@ public String getTag() {
180177
);
181178

182179
ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false);
183-
processor.execute(ingestDocument);
180+
processor.execute(ingestDocument, (result, e) -> {});
184181
@SuppressWarnings("unchecked")
185182
List<String> result = ingestDocument.getFieldValue("values", List.class);
186183
assertThat(result.size(), equalTo(numValues));
@@ -205,7 +202,7 @@ public void testModifyFieldsOutsideArray() throws Exception {
205202
Collections.singletonList(new UppercaseProcessor("_tag_upper", "_ingest._value", false, "_ingest._value")),
206203
Collections.singletonList(new AppendProcessor("_tag", template, (model) -> (Collections.singletonList("added"))))
207204
), false);
208-
processor.execute(ingestDocument);
205+
processor.execute(ingestDocument, (result, e) -> {});
209206

210207
List<?> result = ingestDocument.getFieldValue("values", List.class);
211208
assertThat(result.get(0), equalTo("STRING"));
@@ -231,7 +228,7 @@ public void testScalarValueAllowsUnderscoreValueFieldToRemainAccessible() throws
231228
TestProcessor processor = new TestProcessor(doc -> doc.setFieldValue("_ingest._value",
232229
doc.getFieldValue("_source._value", String.class)));
233230
ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor, false);
234-
forEachProcessor.execute(ingestDocument);
231+
forEachProcessor.execute(ingestDocument, (result, e) -> {});
235232

236233
List<?> result = ingestDocument.getFieldValue("values", List.class);
237234
assertThat(result.get(0), equalTo("new_value"));
@@ -264,7 +261,7 @@ public void testNestedForEach() throws Exception {
264261
);
265262
ForEachProcessor processor = new ForEachProcessor(
266263
"_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor, false), false);
267-
processor.execute(ingestDocument);
264+
processor.execute(ingestDocument, (result, e) -> {});
268265

269266
List<?> result = ingestDocument.getFieldValue("values1.0.values2", List.class);
270267
assertThat(result.get(0), equalTo("ABC"));
@@ -282,7 +279,7 @@ public void testIgnoreMissing() throws Exception {
282279
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
283280
TestProcessor testProcessor = new TestProcessor(doc -> {});
284281
ForEachProcessor processor = new ForEachProcessor("_tag", "_ingest._value", testProcessor, true);
285-
processor.execute(ingestDocument);
282+
processor.execute(ingestDocument, (result, e) -> {});
286283
assertIngestDocument(originalIngestDocument, ingestDocument);
287284
assertThat(testProcessor.getInvokedCounter(), equalTo(0));
288285
}

server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 57 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.elasticsearch.action.bulk;
2121

22+
import org.apache.logging.log4j.LogManager;
23+
import org.apache.logging.log4j.Logger;
2224
import org.apache.logging.log4j.message.ParameterizedMessage;
2325
import org.apache.lucene.util.SparseFixedBitSet;
2426
import org.elasticsearch.ElasticsearchParseException;
@@ -80,6 +82,7 @@
8082
import java.util.Set;
8183
import java.util.concurrent.TimeUnit;
8284
import java.util.concurrent.atomic.AtomicInteger;
85+
import java.util.concurrent.atomic.AtomicIntegerArray;
8386
import java.util.function.LongSupplier;
8487
import java.util.function.Supplier;
8588
import java.util.stream.Collectors;
@@ -581,14 +584,13 @@ private long relativeTime() {
581584
}
582585

583586
void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListener<BulkResponse> listener) {
584-
long ingestStartTimeInNanos = System.nanoTime();
585-
BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
586-
ingestService.executeBulkRequest(() -> bulkRequestModifier,
587-
(indexRequest, exception) -> {
588-
logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]",
589-
indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()), exception);
590-
bulkRequestModifier.markCurrentItemAsFailed(exception);
591-
}, (exception) -> {
587+
final long ingestStartTimeInNanos = System.nanoTime();
588+
final BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
589+
ingestService.executeBulkRequest(
590+
original.numberOfActions(),
591+
() -> bulkRequestModifier,
592+
bulkRequestModifier::markItemAsFailed,
593+
(originalThread, exception) -> {
592594
if (exception != null) {
593595
logger.error("failed to execute pipeline for a bulk request", exception);
594596
listener.onFailure(exception);
@@ -603,26 +605,56 @@ void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListen
603605
// (this will happen if pre-processing all items in the bulk failed)
604606
actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0));
605607
} else {
606-
doExecute(task, bulkRequest, actionListener);
608+
// If a processor went async and returned a response on a different thread then
609+
// before we continue the bulk request we should fork back on a write thread:
610+
if (originalThread == Thread.currentThread()) {
611+
assert Thread.currentThread().getName().contains(ThreadPool.Names.WRITE);
612+
doExecute(task, bulkRequest, actionListener);
613+
} else {
614+
threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() {
615+
@Override
616+
public void onFailure(Exception e) {
617+
listener.onFailure(e);
618+
}
619+
620+
@Override
621+
protected void doRun() throws Exception {
622+
doExecute(task, bulkRequest, actionListener);
623+
}
624+
625+
@Override
626+
public boolean isForceExecution() {
627+
// If we fork back to a write thread we **not** should fail, because tp queue is full.
628+
// (Otherwise the work done during ingest will be lost)
629+
// It is okay to force execution here. Throttling of write requests happens prior to
630+
// ingest when a node receives a bulk request.
631+
return true;
632+
}
633+
});
634+
}
607635
}
608636
}
609637
},
610-
indexRequest -> bulkRequestModifier.markCurrentItemAsDropped());
638+
bulkRequestModifier::markItemAsDropped
639+
);
611640
}
612641

613642
static final class BulkRequestModifier implements Iterator<DocWriteRequest<?>> {
614643

644+
private static final Logger LOGGER = LogManager.getLogger(BulkRequestModifier.class);
645+
615646
final BulkRequest bulkRequest;
616647
final SparseFixedBitSet failedSlots;
617648
final List<BulkItemResponse> itemResponses;
649+
final AtomicIntegerArray originalSlots;
618650

619-
int currentSlot = -1;
620-
int[] originalSlots;
651+
volatile int currentSlot = -1;
621652

622653
BulkRequestModifier(BulkRequest bulkRequest) {
623654
this.bulkRequest = bulkRequest;
624655
this.failedSlots = new SparseFixedBitSet(bulkRequest.requests().size());
625656
this.itemResponses = new ArrayList<>(bulkRequest.requests().size());
657+
this.originalSlots = new AtomicIntegerArray(bulkRequest.requests().size()); // oversize, but that's ok
626658
}
627659

628660
@Override
@@ -646,12 +678,11 @@ BulkRequest getBulkRequest() {
646678

647679
int slot = 0;
648680
List<DocWriteRequest<?>> requests = bulkRequest.requests();
649-
originalSlots = new int[requests.size()]; // oversize, but that's ok
650681
for (int i = 0; i < requests.size(); i++) {
651682
DocWriteRequest<?> request = requests.get(i);
652683
if (failedSlots.get(i) == false) {
653684
modifiedBulkRequest.add(request);
654-
originalSlots[slot++] = i;
685+
originalSlots.set(slot++, i);
655686
}
656687
}
657688
return modifiedBulkRequest;
@@ -666,7 +697,7 @@ ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis,
666697
return ActionListener.delegateFailure(actionListener, (delegatedListener, response) -> {
667698
BulkItemResponse[] items = response.getItems();
668699
for (int i = 0; i < items.length; i++) {
669-
itemResponses.add(originalSlots[i], response.getItems()[i]);
700+
itemResponses.add(originalSlots.get(i), response.getItems()[i]);
670701
}
671702
delegatedListener.onResponse(
672703
new BulkResponse(
@@ -675,11 +706,11 @@ ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis,
675706
}
676707
}
677708

678-
void markCurrentItemAsDropped() {
679-
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(currentSlot));
680-
failedSlots.set(currentSlot);
709+
synchronized void markItemAsDropped(int slot) {
710+
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot));
711+
failedSlots.set(slot);
681712
itemResponses.add(
682-
new BulkItemResponse(currentSlot, indexRequest.opType(),
713+
new BulkItemResponse(slot, indexRequest.opType(),
683714
new UpdateResponse(
684715
new ShardId(indexRequest.index(), IndexMetaData.INDEX_UUID_NA_VALUE, 0),
685716
indexRequest.type(), indexRequest.id(), indexRequest.version(), DocWriteResponse.Result.NOOP
@@ -688,16 +719,19 @@ void markCurrentItemAsDropped() {
688719
);
689720
}
690721

691-
void markCurrentItemAsFailed(Exception e) {
692-
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(currentSlot));
722+
synchronized void markItemAsFailed(int slot, Exception e) {
723+
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot));
724+
LOGGER.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]",
725+
indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()), e);
726+
693727
// We hit a error during preprocessing a request, so we:
694728
// 1) Remember the request item slot from the bulk, so that we're done processing all requests we know what failed
695729
// 2) Add a bulk item failure for this request
696730
// 3) Continue with the next request in the bulk.
697-
failedSlots.set(currentSlot);
731+
failedSlots.set(slot);
698732
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(),
699733
indexRequest.id(), e);
700-
itemResponses.add(new BulkItemResponse(currentSlot, indexRequest.opType(), failure));
734+
itemResponses.add(new BulkItemResponse(slot, indexRequest.opType(), failure));
701735
}
702736

703737
}

0 commit comments

Comments
 (0)