Skip to content
Merged
1 change: 1 addition & 0 deletions x-pack/plugin/ml/qa/ml-with-security/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ integTest.runner {
'ml/start_data_frame_analytics/Test start given source index has no compatible fields',
'ml/start_data_frame_analytics/Test start with inconsistent body/param ids',
'ml/start_data_frame_analytics/Test start given dest index is not empty',
'ml/start_data_frame_analytics/Test start with compatible fields but no data',
'ml/start_stop_datafeed/Test start datafeed job, but not open',
'ml/start_stop_datafeed/Test start non existing datafeed',
'ml/start_stop_datafeed/Test stop non existing datafeed',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -226,10 +227,41 @@ public void onFailure(Exception e) {
}

private void getConfigAndValidate(String id, ActionListener<DataFrameAnalyticsConfig> finalListener) {

// Step 5. Validate that there are analyzable data in the source index
ActionListener<DataFrameAnalyticsConfig> validateMappingsMergeListener = ActionListener.wrap(
config -> DataFrameDataExtractorFactory.createForSourceIndices(client,
"validate_source_index_has_rows-" + id,
config,
ActionListener.wrap(
dataFrameDataExtractorFactory ->
dataFrameDataExtractorFactory
.newExtractor(false)
.collectDataSummaryAsync(ActionListener.wrap(
dataSummary -> {
if (dataSummary.rows == 0) {
finalListener.onFailure(new ElasticsearchStatusException(
"Unable to start {} as there are no analyzable data in source indices [{}].",
RestStatus.BAD_REQUEST,
id,
Strings.arrayToCommaDelimitedString(config.getSource().getIndex())
));
} else {
finalListener.onResponse(config);
}
},
finalListener::onFailure
)),
finalListener::onFailure
))
,
finalListener::onFailure
);

// Step 4. Validate mappings can be merged
ActionListener<DataFrameAnalyticsConfig> toValidateMappingsListener = ActionListener.wrap(
config -> MappingsMerger.mergeMappings(client, config.getHeaders(), config.getSource().getIndex(), ActionListener.wrap(
mappings -> finalListener.onResponse(config), finalListener::onFailure)),
mappings -> validateMappingsMergeListener.onResponse(config), finalListener::onFailure)),
finalListener::onFailure
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.ClearScrollAction;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchAction;
Expand Down Expand Up @@ -234,14 +235,33 @@ public List<String> getFieldNames() {
}

public DataSummary collectDataSummary() {
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE)
SearchRequestBuilder searchRequestBuilder = buildDataSummarySearchRequestBuilder();
SearchResponse searchResponse = executeSearchRequest(searchRequestBuilder);
return new DataSummary(searchResponse.getHits().getTotalHits().value, context.extractedFields.getAllFields().size());
}

public void collectDataSummaryAsync(ActionListener<DataSummary> dataSummaryActionListener) {
SearchRequestBuilder searchRequestBuilder = buildDataSummarySearchRequestBuilder();
final int numberOfFields = context.extractedFields.getAllFields().size();

ClientHelper.executeWithHeadersAsync(context.headers,
ClientHelper.ML_ORIGIN,
client,
SearchAction.INSTANCE,
searchRequestBuilder.request(),
ActionListener.wrap(
searchResponse -> dataSummaryActionListener.onResponse(
new DataSummary(searchResponse.getHits().getTotalHits().value, numberOfFields)),
dataSummaryActionListener::onFailure
));
}

private SearchRequestBuilder buildDataSummarySearchRequestBuilder() {
return new SearchRequestBuilder(client, SearchAction.INSTANCE)
.setIndices(context.indices)
.setSize(0)
.setQuery(context.query)
.setTrackTotalHits(true);

SearchResponse searchResponse = executeSearchRequest(searchRequestBuilder);
return new DataSummary(searchResponse.getHits().getTotalHits().value, context.extractedFields.getAllFields().size());
}

public Set<String> getCategoricalFields() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
Expand Down Expand Up @@ -273,7 +274,15 @@ private synchronized boolean startProcess(DataFrameDataExtractorFactory dataExtr
}

dataExtractor = dataExtractorFactory.newExtractor(false);
process = createProcess(task, createProcessConfig(config, dataExtractor));
AnalyticsProcessConfig analyticsProcessConfig = createProcessConfig(config, dataExtractor);
LOGGER.trace("[{}] creating analytics process with config [{}]", config.getId(), Strings.toString(analyticsProcessConfig));
// If we have no rows, that means there is no data so no point in starting the native process
// just finish the task
if (analyticsProcessConfig.rows() == 0) {
LOGGER.info("[{}] no data found to analyze. Will not start analytics native process.", config.getId());
return false;
}
process = createProcess(task, analyticsProcessConfig);
DataFrameRowsJoiner dataFrameRowsJoiner = new DataFrameRowsJoiner(config.getId(), client,
dataExtractorFactory.newExtractor(true));
resultProcessor = new AnalyticsResultProcessor(id, dataFrameRowsJoiner, this::isProcessKilled, task.getProgressTracker());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,34 @@
id: "foo"

---
"Test start with compatible fields but no data":
- do:
indices.create:
index: empty-index-with-compatible-fields
body:
mappings:
properties:
long_field: { "type": "long" }

- do:
ml.put_data_frame_analytics:
id: "empty-with-compatible-fields"
body: >
{
"source": {
"index": "empty-index-with-compatible-fields"
},
"dest": {
"index": "empty-index-with-compatible-fields-dest"
},
"analysis": {"outlier_detection":{}}
}

- do:
catch: /Unable to start empty-with-compatible-fields as there are no analyzable data in source indices \[empty-index-with-compatible-fields\]/
ml.start_data_frame_analytics:
id: "empty-with-compatible-fields"
---
"Test start with inconsistent body/param ids":

- do:
Expand Down