Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.7] [ML] reduce InferenceProcessor.Factory log spam by not parsing pipelines (#56020) #56127

Merged
merged 3 commits into from
May 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet

InferenceProcessor.Factory inferenceFactory = new InferenceProcessor.Factory(parameters.client,
parameters.ingestService.getClusterService(),
this.settings,
parameters.ingestService);
this.settings);
parameters.ingestService.addIngestClusterStateListener(inferenceFactory);
return Collections.singletonMap(InferenceProcessor.TYPE, inferenceFactory);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.IngestMetadata;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.PipelineConfiguration;
import org.elasticsearch.ingest.Processor;
Expand All @@ -41,12 +40,14 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import static org.elasticsearch.ingest.Pipeline.PROCESSORS_KEY;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

Expand Down Expand Up @@ -162,26 +163,24 @@ public String getType() {

public static final class Factory implements Processor.Factory, Consumer<ClusterState> {

private static final String FOREACH_PROCESSOR_NAME = "foreach";
//Any more than 10 nestings of processors, we stop searching for inference processor definitions
private static final int MAX_INFERENCE_PROCESSOR_SEARCH_RECURSIONS = 10;
private static final Logger logger = LogManager.getLogger(Factory.class);

private static final Set<String> RESERVED_ML_FIELD_NAMES = new HashSet<>(Arrays.asList(
WarningInferenceResults.WARNING.getPreferredName(),
MODEL_ID));

private final Client client;
private final IngestService ingestService;
private final InferenceAuditor auditor;
private volatile int currentInferenceProcessors;
private volatile int maxIngestProcessors;
private volatile Version minNodeVersion = Version.CURRENT;

public Factory(Client client,
ClusterService clusterService,
Settings settings,
IngestService ingestService) {
public Factory(Client client, ClusterService clusterService, Settings settings) {
this.client = client;
this.maxIngestProcessors = MAX_INFERENCE_PROCESSORS.get(settings);
this.ingestService = ingestService;
this.auditor = new InferenceAuditor(client, clusterService.getNodeName());
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_INFERENCE_PROCESSORS, this::setMaxIngestProcessors);
}
Expand All @@ -202,19 +201,66 @@ public void accept(ClusterState state) {

int count = 0;
for (PipelineConfiguration configuration : ingestMetadata.getPipelines().values()) {
Map<String, Object> configMap = configuration.getConfigAsMap();
try {
Pipeline pipeline = Pipeline.create(configuration.getId(),
configuration.getConfigAsMap(),
ingestService.getProcessorFactories(),
ingestService.getScriptService());
count += pipeline.getProcessors().stream().filter(processor -> processor instanceof InferenceProcessor).count();
List<Map<String, Object>> processorConfigs = ConfigurationUtils.readList(null, null, configMap, PROCESSORS_KEY);
for (Map<String, Object> processorConfigWithKey : processorConfigs) {
for (Map.Entry<String, Object> entry : processorConfigWithKey.entrySet()) {
count += numInferenceProcessors(entry.getKey(), entry.getValue());
}
}
// We cannot throw any exception here. It might break other pipelines.
} catch (Exception ex) {
logger.warn(new ParameterizedMessage("failure parsing pipeline config [{}]", configuration.getId()), ex);
logger.debug(
() -> new ParameterizedMessage("failed gathering processors for pipeline [{}]", configuration.getId()),
ex);
}
}
currentInferenceProcessors = count;
}

@SuppressWarnings("unchecked")
static int numInferenceProcessors(String processorType, Object processorDefinition) {
return numInferenceProcessors(processorType, (Map<String, Object>)processorDefinition, 0);
}

@SuppressWarnings("unchecked")
static int numInferenceProcessors(String processorType, Map<String, Object> processorDefinition, int level) {
int count = 0;
// arbitrary, but we must limit this somehow
if (level > MAX_INFERENCE_PROCESSOR_SEARCH_RECURSIONS) {
return count;
}
if (processorType == null || processorDefinition == null) {
return count;
}
if (TYPE.equals(processorType)) {
count++;
}
if (FOREACH_PROCESSOR_NAME.equals(processorType)) {
Map<String, Object> innerProcessor = (Map<String, Object>)processorDefinition.get("processor");
if (innerProcessor != null) {
// a foreach processor should only have a SINGLE nested processor. Iteration is for simplicity's sake.
for (Map.Entry<String, Object> innerProcessorWithName : innerProcessor.entrySet()) {
count += numInferenceProcessors(innerProcessorWithName.getKey(),
(Map<String, Object>) innerProcessorWithName.getValue(),
level + 1);
}
}
}
if (processorDefinition.containsKey(Pipeline.ON_FAILURE_KEY)) {
List<Map<String, Object>> onFailureConfigs = ConfigurationUtils.readList(
null,
null,
processorDefinition,
Pipeline.ON_FAILURE_KEY);
count += onFailureConfigs.stream()
.flatMap(map -> map.entrySet().stream())
.mapToInt(entry -> numInferenceProcessors(entry.getKey(), (Map<String, Object>)entry.getValue(), level + 1)).sum();
}
return count;
}

// Used for testing
int numInferenceProcessors() {
return currentInferenceProcessors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
factoryMap.put(InferenceProcessor.TYPE,
new InferenceProcessor.Factory(parameters.client,
parameters.ingestService.getClusterService(),
Settings.EMPTY,
parameters.ingestService));
Settings.EMPTY));

factoryMap.put("not_inference", new NotInferenceProcessor.Factory());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,8 @@
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.ingest.IngestMetadata;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.PipelineConfiguration;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.ClassificationConfig;
Expand All @@ -54,22 +51,9 @@

public class InferenceProcessorFactoryTests extends ESTestCase {

private static final IngestPlugin SKINNY_PLUGIN = new IngestPlugin() {
@Override
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
XPackLicenseState licenseState = mock(XPackLicenseState.class);
when(licenseState.isMachineLearningAllowed()).thenReturn(true);
return Collections.singletonMap(InferenceProcessor.TYPE,
new InferenceProcessor.Factory(parameters.client,
parameters.ingestService.getClusterService(),
Settings.EMPTY,
parameters.ingestService));
}
};
private Client client;
private XPackLicenseState licenseState;
private ClusterService clusterService;
private IngestService ingestService;

@Before
public void setUpVariables() {
Expand All @@ -86,8 +70,6 @@ public void setUpVariables() {
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING,
ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING)));
clusterService = new ClusterService(settings, clusterSettings, tp);
ingestService = new IngestService(clusterService, tp, null, null,
null, Collections.singletonList(SKINNY_PLUGIN), client);
licenseState = mock(XPackLicenseState.class);
when(licenseState.isMachineLearningAllowed()).thenReturn(true);
}
Expand All @@ -97,8 +79,7 @@ public void testNumInferenceProcessors() throws Exception {

InferenceProcessor.Factory processorFactory = new InferenceProcessor.Factory(client,
clusterService,
Settings.EMPTY,
ingestService);
Settings.EMPTY);
processorFactory.accept(buildClusterState(metaData));

assertThat(processorFactory.numInferenceProcessors(), equalTo(0));
Expand All @@ -111,11 +92,61 @@ public void testNumInferenceProcessors() throws Exception {
assertThat(processorFactory.numInferenceProcessors(), equalTo(3));
}

public void testNumInferenceProcessorsRecursivelyDefined() throws Exception {
MetaData metadata = null;

InferenceProcessor.Factory processorFactory = new InferenceProcessor.Factory(client,
clusterService,
Settings.EMPTY);
processorFactory.accept(buildClusterState(metadata));

Map<String, PipelineConfiguration> configurations = new HashMap<>();
configurations.put("pipeline_with_model_top_level",
randomBoolean() ?
newConfigurationWithInferenceProcessor("top_level") :
newConfigurationWithForeachProcessorProcessor("top_level"));
try(XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().map(Collections.singletonMap("processors",
Collections.singletonList(
Collections.singletonMap("set",
new HashMap<String, Object>() {{
put("field", "foo");
put("value", "bar");
put("on_failure",
Arrays.asList(
inferenceProcessorForModel("second_level"),
forEachProcessorWithInference("third_level")));
}}))))) {
configurations.put("pipeline_with_model_nested",
new PipelineConfiguration("pipeline_with_model_nested", BytesReference.bytes(xContentBuilder), XContentType.JSON));
}

IngestMetadata ingestMetadata = new IngestMetadata(configurations);

ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(IngestMetadata.TYPE, ingestMetadata))
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("min_node",
new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
Version.CURRENT))
.add(new DiscoveryNode("current_node",
new TransportAddress(InetAddress.getLoopbackAddress(), 9302),
Version.CURRENT))
.localNodeId("_node_id")
.masterNodeId("_node_id"))
.build();

processorFactory.accept(cs);
assertThat(processorFactory.numInferenceProcessors(), equalTo(3));
}

public void testNumInferenceWhenLevelExceedsMaxRecurions() {
assertThat(InferenceProcessor.Factory.numInferenceProcessors(InferenceProcessor.TYPE, Collections.emptyMap(), 100), equalTo(0));
}

public void testCreateProcessorWithTooManyExisting() throws Exception {
InferenceProcessor.Factory processorFactory = new InferenceProcessor.Factory(client,
clusterService,
Settings.builder().put(InferenceProcessor.MAX_INFERENCE_PROCESSORS.getKey(), 1).build(),
ingestService);
Settings.builder().put(InferenceProcessor.MAX_INFERENCE_PROCESSORS.getKey(), 1).build());

processorFactory.accept(buildClusterStateWithModelReferences("model1"));

Expand All @@ -129,8 +160,7 @@ public void testCreateProcessorWithTooManyExisting() throws Exception {
public void testCreateProcessorWithInvalidInferenceConfig() {
InferenceProcessor.Factory processorFactory = new InferenceProcessor.Factory(client,
clusterService,
Settings.EMPTY,
ingestService);
Settings.EMPTY);

Map<String, Object> config = new HashMap<String, Object>() {{
put(InferenceProcessor.FIELD_MAP, Collections.emptyMap());
Expand Down Expand Up @@ -170,8 +200,7 @@ public void testCreateProcessorWithInvalidInferenceConfig() {
public void testCreateProcessorWithTooOldMinNodeVersion() throws IOException {
InferenceProcessor.Factory processorFactory = new InferenceProcessor.Factory(client,
clusterService,
Settings.EMPTY,
ingestService);
Settings.EMPTY);
processorFactory.accept(builderClusterStateWithModelReferences(Version.V_7_5_0, "model1"));

Map<String, Object> regression = new HashMap<String, Object>() {{
Expand Down Expand Up @@ -214,8 +243,7 @@ public void testCreateProcessorWithTooOldMinNodeVersion() throws IOException {
public void testCreateProcessor() {
InferenceProcessor.Factory processorFactory = new InferenceProcessor.Factory(client,
clusterService,
Settings.EMPTY,
ingestService);
Settings.EMPTY);

Map<String, Object> regression = new HashMap<String, Object>() {{
put(InferenceProcessor.FIELD_MAP, Collections.emptyMap());
Expand Down Expand Up @@ -249,8 +277,7 @@ public void testCreateProcessor() {
public void testCreateProcessorWithDuplicateFields() {
InferenceProcessor.Factory processorFactory = new InferenceProcessor.Factory(client,
clusterService,
Settings.EMPTY,
ingestService);
Settings.EMPTY);

Map<String, Object> regression = new HashMap<String, Object>() {{
put(InferenceProcessor.FIELD_MAP, Collections.emptyMap());
Expand Down Expand Up @@ -280,7 +307,8 @@ private static ClusterState buildClusterStateWithModelReferences(String... model
private static ClusterState builderClusterStateWithModelReferences(Version minNodeVersion, String... modelId) throws IOException {
Map<String, PipelineConfiguration> configurations = new HashMap<>(modelId.length);
for (String id : modelId) {
configurations.put("pipeline_with_model_" + id, newConfigurationWithInferenceProcessor(id));
configurations.put("pipeline_with_model_" + id,
randomBoolean() ? newConfigurationWithInferenceProcessor(id) : newConfigurationWithForeachProcessorProcessor(id));
}
IngestMetadata ingestMetadata = new IngestMetadata(configurations);

Expand All @@ -300,17 +328,35 @@ private static ClusterState builderClusterStateWithModelReferences(Version minNo

private static PipelineConfiguration newConfigurationWithInferenceProcessor(String modelId) throws IOException {
try(XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().map(Collections.singletonMap("processors",
Collections.singletonList(
Collections.singletonMap(InferenceProcessor.TYPE,
new HashMap<String, Object>() {{
put(InferenceProcessor.MODEL_ID, modelId);
put(InferenceProcessor.INFERENCE_CONFIG,
Collections.singletonMap(RegressionConfig.NAME.getPreferredName(), Collections.emptyMap()));
put(InferenceProcessor.TARGET_FIELD, "new_field");
put(InferenceProcessor.FIELD_MAP, Collections.singletonMap("source", "dest"));
}}))))) {
Collections.singletonList(inferenceProcessorForModel(modelId))))) {
return new PipelineConfiguration("pipeline_with_model_" + modelId, BytesReference.bytes(xContentBuilder), XContentType.JSON);
}
}

private static PipelineConfiguration newConfigurationWithForeachProcessorProcessor(String modelId) throws IOException {
try(XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().map(Collections.singletonMap("processors",
Collections.singletonList(forEachProcessorWithInference(modelId))))) {
return new PipelineConfiguration("pipeline_with_model_" + modelId, BytesReference.bytes(xContentBuilder), XContentType.JSON);
}
}

private static Map<String, Object> forEachProcessorWithInference(String modelId) {
return Collections.singletonMap("foreach",
new HashMap<String, Object>() {{
put("field", "foo");
put("processor", inferenceProcessorForModel(modelId));
}});
}

private static Map<String, Object> inferenceProcessorForModel(String modelId) {
return Collections.singletonMap(InferenceProcessor.TYPE,
new HashMap<String, Object>() {{
put(InferenceProcessor.MODEL_ID, modelId);
put(InferenceProcessor.INFERENCE_CONFIG,
Collections.singletonMap(RegressionConfig.NAME.getPreferredName(), Collections.emptyMap()));
put(InferenceProcessor.TARGET_FIELD, "new_field");
put(InferenceProcessor.FIELD_MAP, Collections.singletonMap("source", "dest"));
}});
}

}