Skip to content

Commit

Permalink
INGEST: Create Index Before Pipeline Execute (#32786) (#32975)
Browse files Browse the repository at this point in the history
* INGEST: Create Index Before Pipeline Execute

* Ensures that indices are created before the default pipeline setting is read to correcly handle the case of an index template containing a default pipeline (without the fix the first document does not get the pipeline applied as explained in #32758)
* closes #32758
  • Loading branch information
original-brownbear authored and jasontedor committed Aug 21, 2018
1 parent 45e2a18 commit eeeb5d4
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,37 +131,6 @@ protected final void doExecute(final BulkRequest bulkRequest, final ActionListen

@Override
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
boolean hasIndexRequestsWithPipelines = false;
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = clusterService.state().getMetaData().indices();
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
if (actionRequest instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) actionRequest;
String pipeline = indexRequest.getPipeline();
if (pipeline == null) {
IndexMetaData indexMetaData = indicesMetaData.get(indexRequest.index());
if (indexMetaData == null) {
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
} else {
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings());
indexRequest.setPipeline(defaultPipeline);
if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
hasIndexRequestsWithPipelines = true;
}
}
} else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) {
hasIndexRequestsWithPipelines = true;
}
}
}
if (hasIndexRequestsWithPipelines) {
if (clusterService.localNode().isIngestNode()) {
processBulkIndexIngestRequest(task, bulkRequest, listener);
} else {
ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener);
}
return;
}

final long startTime = relativeTime();
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());

Expand Down Expand Up @@ -195,15 +164,15 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
}
// Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back.
if (autoCreateIndices.isEmpty()) {
executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
executeIngestAndBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
} else {
final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size());
for (String index : autoCreateIndices) {
createIndex(index, bulkRequest.timeout(), new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse result) {
if (counter.decrementAndGet() == 0) {
executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
executeIngestAndBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
}
}

Expand All @@ -219,7 +188,7 @@ public void onFailure(Exception e) {
}
}
if (counter.decrementAndGet() == 0) {
executeBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> {
executeIngestAndBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> {
inner.addSuppressed(e);
listener.onFailure(inner);
}), responses, indicesThatCannotBeCreated);
Expand All @@ -229,7 +198,47 @@ public void onFailure(Exception e) {
}
}
} else {
executeBulk(task, bulkRequest, startTime, listener, responses, emptyMap());
executeIngestAndBulk(task, bulkRequest, startTime, listener, responses, emptyMap());
}
}

private void executeIngestAndBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos,
final ActionListener<BulkResponse> listener, final AtomicArray<BulkItemResponse> responses,
Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {
boolean hasIndexRequestsWithPipelines = false;
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = clusterService.state().getMetaData().indices();
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
if (actionRequest instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) actionRequest;
String pipeline = indexRequest.getPipeline();
if (pipeline == null) {
IndexMetaData indexMetaData = indicesMetaData.get(indexRequest.index());
if (indexMetaData == null) {
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
} else {
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings());
indexRequest.setPipeline(defaultPipeline);
if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
hasIndexRequestsWithPipelines = true;
}
}
} else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) {
hasIndexRequestsWithPipelines = true;
}
}
}
if (hasIndexRequestsWithPipelines) {
try {
if (clusterService.localNode().isIngestNode()) {
processBulkIndexIngestRequest(task, bulkRequest, listener);
} else {
ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener);
}
} catch (Exception e) {
listener.onFailure(e);
}
} else {
executeBulk(task, bulkRequest, startTimeNanos, listener, responses, indicesThatCannotBeCreated);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,25 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSettings;
Expand Down Expand Up @@ -77,6 +82,9 @@ public class TransportBulkActionIngestTests extends ESTestCase {
*/
private static final String WITH_DEFAULT_PIPELINE = "index_with_default_pipeline";

private static final Settings SETTINGS =
Settings.builder().put(AutoCreateIndex.AUTO_CREATE_INDEX_SETTING.getKey(), true).build();

/** Services needed by bulk action */
TransportService transportService;
ClusterService clusterService;
Expand Down Expand Up @@ -112,25 +120,42 @@ public class TransportBulkActionIngestTests extends ESTestCase {
/** A subclass of the real bulk action to allow skipping real bulk indexing, and marking when it would have happened. */
class TestTransportBulkAction extends TransportBulkAction {
boolean isExecuted = false; // set when the "real" bulk execution happens

boolean needToCheck; // pluggable return value for `needToCheck`

boolean indexCreated = true; // If set to false, will be set to true by call to createIndex

TestTransportBulkAction() {
super(Settings.EMPTY, null, transportService, clusterService, ingestService,
null, null, new ActionFilters(Collections.emptySet()), null, null);
super(SETTINGS, null, transportService, clusterService, ingestService,
null, null, new ActionFilters(Collections.emptySet()), null,
new AutoCreateIndex(
SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
new IndexNameExpressionResolver(SETTINGS)
)
);
}
@Override
protected boolean needToCheck() {
return false;
return needToCheck;
}
@Override
void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener<BulkResponse> listener,
final AtomicArray<BulkItemResponse> responses, Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {
assertTrue(indexCreated);
isExecuted = true;
}

@Override
void createIndex(String index, TimeValue timeout, ActionListener<CreateIndexResponse> listener) {
indexCreated = true;
listener.onResponse(null);
}
}

class TestSingleItemBulkWriteAction extends TransportSingleItemBulkWriteAction<IndexRequest, IndexResponse> {

TestSingleItemBulkWriteAction(TestTransportBulkAction bulkAction) {
super(Settings.EMPTY, IndexAction.NAME, TransportBulkActionIngestTests.this.transportService,
super(SETTINGS, IndexAction.NAME, TransportBulkActionIngestTests.this.transportService,
TransportBulkActionIngestTests.this.clusterService,
null, null, null, new ActionFilters(Collections.emptySet()), null,
IndexRequest::new, IndexRequest::new, ThreadPool.Names.INDEX, bulkAction, null);
Expand Down Expand Up @@ -162,15 +187,17 @@ public void setupAction() {
when(nodes.getIngestNodes()).thenReturn(ingestNodes);
ClusterState state = mock(ClusterState.class);
when(state.getNodes()).thenReturn(nodes);
when(state.getMetaData()).thenReturn(MetaData.builder().indices(ImmutableOpenMap.<String, IndexMetaData>builder()
MetaData metaData = MetaData.builder().indices(ImmutableOpenMap.<String, IndexMetaData>builder()
.putAll(
Collections.singletonMap(
WITH_DEFAULT_PIPELINE,
IndexMetaData.builder(WITH_DEFAULT_PIPELINE).settings(
settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline")
.build()
).numberOfShards(1).numberOfReplicas(1).build()))
.build()).build());
.build()).build();
when(state.getMetaData()).thenReturn(metaData);
when(state.metaData()).thenReturn(metaData);
when(clusterService.state()).thenReturn(state);
doAnswer(invocation -> {
ClusterChangedEvent event = mock(ClusterChangedEvent.class);
Expand Down Expand Up @@ -408,4 +435,36 @@ public void testUseDefaultPipeline() throws Exception {
verifyZeroInteractions(transportService);
}

public void testCreateIndexBeforeRunPipeline() throws Exception {
Exception exception = new Exception("fake exception");
IndexRequest indexRequest = new IndexRequest("missing_index", "type", "id");
indexRequest.setPipeline("testpipeline");
indexRequest.source(Collections.emptyMap());
AtomicBoolean responseCalled = new AtomicBoolean(false);
AtomicBoolean failureCalled = new AtomicBoolean(false);
action.needToCheck = true;
action.indexCreated = false;
singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap(
response -> responseCalled.set(true),
e -> {
assertThat(e, sameInstance(exception));
failureCalled.set(true);
}));

// check failure works, and passes through to the listener
assertFalse(action.isExecuted); // haven't executed yet
assertFalse(responseCalled.get());
assertFalse(failureCalled.get());
verify(executionService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture());
completionHandler.getValue().accept(exception);
assertTrue(failureCalled.get());

// now check success
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
completionHandler.getValue().accept(null);
assertTrue(action.isExecuted);
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
verifyZeroInteractions(transportService);
}

}

0 comments on commit eeeb5d4

Please sign in to comment.