Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.elasticsearch.cluster.metadata.AliasOrIndex;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.CheckedBiFunction;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -139,9 +139,13 @@ private ElasticsearchMappings() {
}

public static XContentBuilder configMapping() throws IOException {
return configMapping(SINGLE_MAPPING_NAME);
}

public static XContentBuilder configMapping(String mappingType) throws IOException {
XContentBuilder builder = jsonBuilder();
builder.startObject();
builder.startObject(SINGLE_MAPPING_NAME);
builder.startObject(mappingType);
addMetaInformation(builder);
addDefaultMapping(builder);
builder.startObject(PROPERTIES);
Expand Down Expand Up @@ -1129,7 +1133,7 @@ static String[] mappingRequiresUpdate(ClusterState state, String[] concreteIndic
}

public static void addDocMappingIfMissing(String alias,
CheckedBiFunction<String, Collection<String>, XContentBuilder, IOException> mappingSupplier,
CheckedFunction<String, XContentBuilder, IOException> mappingSupplier,
Client client, ClusterState state, ActionListener<Boolean> listener) {
AliasOrIndex aliasOrIndex = state.metaData().getAliasAndIndexLookup().get(alias);
if (aliasOrIndex == null) {
Expand All @@ -1153,7 +1157,7 @@ public static void addDocMappingIfMissing(String alias,
IndexMetaData indexMetaData = state.metaData().index(indicesThatRequireAnUpdate[0]);
String mappingType = indexMetaData.mapping().type();

try (XContentBuilder mapping = mappingSupplier.apply(mappingType, Collections.emptyList())) {
try (XContentBuilder mapping = mappingSupplier.apply(mappingType)) {
PutMappingRequest putMappingRequest = new PutMappingRequest(indicesThatRequireAnUpdate);
putMappingRequest.type(mappingType);
putMappingRequest.source(mapping);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,25 @@
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig;
Expand All @@ -34,6 +41,7 @@
import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition;
import org.elasticsearch.xpack.core.ml.job.results.ReservedFieldNames;
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.mockito.ArgumentCaptor;

import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
Expand All @@ -47,7 +55,16 @@
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;


public class ElasticsearchMappingsTests extends ESTestCase {
Expand Down Expand Up @@ -205,6 +222,54 @@ public void testMappingRequiresUpdateNewerMappingVersionMinor() throws IOExcepti
ElasticsearchMappings.mappingRequiresUpdate(cs, indices, VersionUtils.getPreviousMinorVersion()));
}

public void testAddDocMappingIfMissing() throws IOException {
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
Client client = mock(Client.class);
when(client.threadPool()).thenReturn(threadPool);
doAnswer(
invocationOnMock -> {
ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2];
listener.onResponse(new AcknowledgedResponse(true));
return null;
})
.when(client).execute(eq(PutMappingAction.INSTANCE), any(), any(ActionListener.class));

ClusterState clusterState = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("index-name", "0.0"));
ElasticsearchMappings.addDocMappingIfMissing(
"index-name",
ElasticsearchMappingsTests::fakeMapping,
client,
clusterState,
ActionListener.wrap(
ok -> assertTrue(ok),
e -> fail(e.toString())
)
);

ArgumentCaptor<PutMappingRequest> requestCaptor = ArgumentCaptor.forClass(PutMappingRequest.class);
verify(client).threadPool();
verify(client).execute(eq(PutMappingAction.INSTANCE), requestCaptor.capture(), any(ActionListener.class));
verifyNoMoreInteractions(client);

PutMappingRequest request = requestCaptor.getValue();
assertThat(request.type(), equalTo("_doc"));
assertThat(request.indices(), equalTo(new String[] { "index-name" }));
assertThat(request.source(), equalTo("{\"_doc\":{\"properties\":{\"some-field\":{\"type\":\"long\"}}}}"));
}

private static XContentBuilder fakeMapping(String mappingType) throws IOException {
return jsonBuilder()
.startObject()
.startObject(mappingType)
.startObject(ElasticsearchMappings.PROPERTIES)
.startObject("some-field")
.field(ElasticsearchMappings.TYPE, ElasticsearchMappings.LONG)
.endObject()
.endObject()
.endObject()
.endObject();
}

private ClusterState getClusterStateWithMappingsWithMetaData(Map<String, Object> namesAndVersions) throws IOException {
MetaData.Builder metaDataBuilder = MetaData.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@
*/
package org.elasticsearch.xpack.ml.action;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
Expand All @@ -29,6 +33,8 @@
import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.MlStrings;
import org.elasticsearch.xpack.core.security.SecurityContext;
Expand All @@ -43,12 +49,15 @@

import java.io.IOException;
import java.time.Instant;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;

public class TransportPutDataFrameAnalyticsAction
extends HandledTransportAction<PutDataFrameAnalyticsAction.Request, PutDataFrameAnalyticsAction.Response> {

private static final Logger logger = LogManager.getLogger(TransportPutDataFrameAnalyticsAction.class);

private final XPackLicenseState licenseState;
private final DataFrameAnalyticsConfigProvider configProvider;
private final ThreadPool threadPool;
Expand Down Expand Up @@ -97,6 +106,7 @@ protected void doExecute(Task task, PutDataFrameAnalyticsAction.Request request,
.setCreateTime(Instant.now())
.setVersion(Version.CURRENT)
.build();

if (licenseState.isAuthAllowed()) {
final String username = securityContext.getUser().principal();
RoleDescriptor.IndicesPrivileges sourceIndexPrivileges = RoleDescriptor.IndicesPrivileges.builder()
Expand All @@ -120,9 +130,12 @@ protected void doExecute(Task task, PutDataFrameAnalyticsAction.Request request,

client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener);
} else {
configProvider.put(memoryCappedConfig, threadPool.getThreadContext().getHeaders(), ActionListener.wrap(
indexResponse -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(memoryCappedConfig)),
listener::onFailure
updateDocMappingAndPutConfig(
memoryCappedConfig,
threadPool.getThreadContext().getHeaders(),
ActionListener.wrap(
indexResponse -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(memoryCappedConfig)),
listener::onFailure
));
}
}
Expand All @@ -131,9 +144,12 @@ private void handlePrivsResponse(String username, DataFrameAnalyticsConfig memor
HasPrivilegesResponse response,
ActionListener<PutDataFrameAnalyticsAction.Response> listener) throws IOException {
if (response.isCompleteMatch()) {
configProvider.put(memoryCappedConfig, threadPool.getThreadContext().getHeaders(), ActionListener.wrap(
indexResponse -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(memoryCappedConfig)),
listener::onFailure
updateDocMappingAndPutConfig(
memoryCappedConfig,
threadPool.getThreadContext().getHeaders(),
ActionListener.wrap(
indexResponse -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(memoryCappedConfig)),
listener::onFailure
));
} else {
XContentBuilder builder = JsonXContent.contentBuilder();
Expand All @@ -150,6 +166,25 @@ private void handlePrivsResponse(String username, DataFrameAnalyticsConfig memor
}
}

private void updateDocMappingAndPutConfig(DataFrameAnalyticsConfig config,
Map<String, String> headers,
ActionListener<IndexResponse> listener) {
ClusterState clusterState = clusterService.state();
if (clusterState == null) {
logger.warn("Cannot update doc mapping because clusterState == null");
configProvider.put(config, headers, listener);
return;
}
ElasticsearchMappings.addDocMappingIfMissing(
AnomalyDetectorsIndex.configIndexName(),
ElasticsearchMappings::configMapping,
client,
clusterState,
ActionListener.wrap(
unused -> configProvider.put(config, headers, listener),
listener::onFailure));
}

private void validateConfig(DataFrameAnalyticsConfig config) {
if (MlStrings.isValidId(config.getId()) == false) {
throw ExceptionsHelper.badRequestException(Messages.getMessage(Messages.INVALID_ID, DataFrameAnalyticsConfig.ID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.ml.action;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchAction;
Expand Down Expand Up @@ -36,6 +38,8 @@
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.rollup.action.GetRollupIndexCapsAction;
import org.elasticsearch.xpack.core.rollup.action.RollupSearchAction;
Expand All @@ -58,6 +62,8 @@

public class TransportPutDatafeedAction extends TransportMasterNodeAction<PutDatafeedAction.Request, PutDatafeedAction.Response> {

private static final Logger logger = LogManager.getLogger(TransportPutDatafeedAction.class);

private final XPackLicenseState licenseState;
private final Client client;
private final SecurityContext securityContext;
Expand Down Expand Up @@ -111,7 +117,7 @@ protected void masterOperation(PutDatafeedAction.Request request, ClusterState s
.indices(indices);

ActionListener<HasPrivilegesResponse> privResponseListener = ActionListener.wrap(
r -> handlePrivsResponse(username, request, r, listener),
r -> handlePrivsResponse(username, request, r, state, listener),
listener::onFailure);

ActionListener<GetRollupIndexCapsAction.Response> getRollupIndexCapsActionHandler = ActionListener.wrap(
Expand Down Expand Up @@ -145,15 +151,17 @@ protected void masterOperation(PutDatafeedAction.Request request, ClusterState s
}

} else {
putDatafeed(request, threadPool.getThreadContext().getHeaders(), listener);
putDatafeed(request, threadPool.getThreadContext().getHeaders(), state, listener);
}
}

private void handlePrivsResponse(String username, PutDatafeedAction.Request request,
private void handlePrivsResponse(String username,
PutDatafeedAction.Request request,
HasPrivilegesResponse response,
ClusterState clusterState,
ActionListener<PutDatafeedAction.Response> listener) throws IOException {
if (response.isCompleteMatch()) {
putDatafeed(request, threadPool.getThreadContext().getHeaders(), listener);
putDatafeed(request, threadPool.getThreadContext().getHeaders(), clusterState, listener);
} else {
XContentBuilder builder = JsonXContent.contentBuilder();
builder.startObject();
Expand All @@ -169,7 +177,9 @@ private void handlePrivsResponse(String username, PutDatafeedAction.Request requ
}
}

private void putDatafeed(PutDatafeedAction.Request request, Map<String, String> headers,
private void putDatafeed(PutDatafeedAction.Request request,
Map<String, String> headers,
ClusterState clusterState,
ActionListener<PutDatafeedAction.Response> listener) {

String datafeedId = request.getDatafeed().getId();
Expand All @@ -181,13 +191,30 @@ private void putDatafeed(PutDatafeedAction.Request request, Map<String, String>
}
DatafeedConfig.validateAggregations(request.getDatafeed().getParsedAggregations(xContentRegistry));

CheckedConsumer<Boolean, Exception> validationOk = ok -> {
datafeedConfigProvider.putDatafeedConfig(request.getDatafeed(), headers, ActionListener.wrap(
CheckedConsumer<Boolean, Exception> mappingsUpdated = ok -> {
datafeedConfigProvider.putDatafeedConfig(
request.getDatafeed(),
headers,
ActionListener.wrap(
indexResponse -> listener.onResponse(new PutDatafeedAction.Response(request.getDatafeed())),
listener::onFailure
));
};

CheckedConsumer<Boolean, Exception> validationOk = ok -> {
if (clusterState == null) {
logger.warn("Cannot update doc mapping because clusterState == null");
mappingsUpdated.accept(false);
return;
}
ElasticsearchMappings.addDocMappingIfMissing(
AnomalyDetectorsIndex.configIndexName(),
ElasticsearchMappings::configMapping,
client,
clusterState,
ActionListener.wrap(mappingsUpdated, listener::onFailure));
};

CheckedConsumer<Boolean, Exception> jobOk = ok ->
jobConfigProvider.validateDatafeedJob(request.getDatafeed(), ActionListener.wrap(validationOk, listener::onFailure));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
Expand Down Expand Up @@ -254,7 +256,7 @@ public void putJob(PutJobAction.Request request, AnalysisRegistry analysisRegist

ActionListener<Boolean> putJobListener = new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean indicesCreated) {
public void onResponse(Boolean mappingsUpdated) {

jobConfigProvider.putJob(job, ActionListener.wrap(
response -> {
Expand All @@ -281,10 +283,23 @@ public void onFailure(Exception e) {
}
};

ActionListener<Boolean> addDocMappingsListener = ActionListener.wrap(
indicesCreated -> {
if (state == null) {
logger.warn("Cannot update doc mapping because clusterState == null");
putJobListener.onResponse(false);
return;
}
ElasticsearchMappings.addDocMappingIfMissing(
AnomalyDetectorsIndex.configIndexName(), ElasticsearchMappings::configMapping, client, state, putJobListener);
},
putJobListener::onFailure
);

ActionListener<List<String>> checkForLeftOverDocs = ActionListener.wrap(
matchedIds -> {
if (matchedIds.isEmpty()) {
jobResultsProvider.createJobResultIndex(job, state, putJobListener);
jobResultsProvider.createJobResultIndex(job, state, addDocMappingsListener);
} else {
// A job has the same Id as one of the group names
// error with the first in the list
Expand Down