-
Notifications
You must be signed in to change notification settings - Fork 25.4k
[ML] reduce InferenceProcessor.Factory log spam by not parsing pipelines #56020
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ML] reduce InferenceProcessor.Factory log spam by not parsing pipelines #56020
Conversation
Pinging @elastic/ml-core (:ml) |
Pinging @elastic/es-core-features (:Core/Features/Ingest) |
Map<String, Object> configMap = configuration.getConfigAsMap(); | ||
try { | ||
Pipeline pipeline = Pipeline.create(configuration.getId(), | ||
configuration.getConfigAsMap(), | ||
ingestService.getProcessorFactories(), | ||
ingestService.getScriptService()); | ||
count += pipeline.getProcessors().stream().filter(processor -> processor instanceof InferenceProcessor).count(); | ||
List<Map<String, Object>> processorConfigs = ConfigurationUtils.readList(null, null, configMap, PROCESSORS_KEY); | ||
for (Map<String, Object> processorConfigWithKey : processorConfigs) { | ||
for (Map.Entry<String, Object> entry : processorConfigWithKey.entrySet()) { | ||
if (TYPE.equals(entry.getKey())) { | ||
count++; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++ on this approach to getting a count of inference processors. It should fit more nicely into the ingest pipeline initialization process and be lighter weight than fully instantiating each pipeline.
One thing you might consider is that the config map for an ingest pipeline is a tree structure in which some nodes such as ForEach processors may contain child nodes. I do not know how inference processors are typically configured, but the code above will count them only if they're at the top level of the pipeline tree.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@danhermann good point...I will have to traverse the tree.
@@ -165,6 +165,7 @@ public String getType() { | |||
|
|||
public static final class Factory implements Processor.Factory, Consumer<ClusterState> { | |||
|
|||
private static final String FOREACH_PROCESSOR_NAME = "foreach"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: you could use ForEachProcessor.TYPE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would require adding the ingest-common module as a dependency to the ML plugin. Did not want to do that for a single string :/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, right. Definitely not worth it for a string.
// Special handling as `foreach` processors allow a `processor` to be defined | ||
if (FOREACH_PROCESSOR_NAME.equals(entry.getKey())) { | ||
if (entry.getValue() instanceof Map<?, ?>) { | ||
Object processorDefinition = ((Map<?, ?>)entry.getValue()).get("processor"); | ||
if (processorDefinition instanceof Map<?, ?>) { | ||
if (((Map<?, ?>) processorDefinition).keySet().contains(TYPE)) { | ||
++count; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The for-each processor and the onFailure directive are the only scenarios I know of that result in child processors. Both of those can be nested to an indefinite number of levels. I'm not sure how far you want to go down that rabbit hole, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeesh, yeah, thats right.
Ugh, I will do the recursion. But who on earth would have a foreach processor nested in a foreach processor :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, hence the 🐇 hole. 😃
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only allow recursion up to 10 layers. Handling on_failure
and foreach
. seems to work ok :D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. Thanks for making that change.
static int numInferenceProcessors(String processorType, Map<String, Object> processorDefinition, int level) { | ||
int count = 0; | ||
// arbitrary, but we must limit this somehow | ||
if (MAX_INFERENCE_PROCESSOR_SEARCH_RECURSIONS > 10) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
??
if (MAX_INFERENCE_PROCESSOR_SEARCH_RECURSIONS > 10) { | |
if (level > 10) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lulz, yeah, I goofed.
if (FOREACH_PROCESSOR_NAME.equals(processorType)) { | ||
Map<String, Object> innerProcessor = (Map<String, Object>)processorDefinition.get("processor"); | ||
if (innerProcessor != null) { | ||
for (Map.Entry<String, Object> innerProcessorWithName : innerProcessor.entrySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
foreach
can only have 1 processor so there is no need to iterate here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The iteration is just for simplicity. Otherwise I will have assert size is == 1 and then get first entry, etc. Iteration here is cleaner IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
It threw me because it is called the foreach processor and it looks like a misunderstanding, perhaps leave a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…nes (elastic#56020) If there are ill-formed pipelines, or other pipelines are not ready to be parsed, `InferenceProcessor.Factory::accept(ClusterState)` logs warnings. This can be confusing and cause log spam. It might lead folks to think there an issue with the inference processor. Also, they would see logs for the inference processor even though they might not be using the inference processor. Leading to more confusion. Additionally, pipelines might not be parseable in this method as some processors require the new cluster state metadata before construction (e.g. `enrich` requires cluster metadata to be set before creating the processor). closes elastic#55985
…nes (elastic#56020) If there are ill-formed pipelines, or other pipelines are not ready to be parsed, `InferenceProcessor.Factory::accept(ClusterState)` logs warnings. This can be confusing and cause log spam. It might lead folks to think there an issue with the inference processor. Also, they would see logs for the inference processor even though they might not be using the inference processor. Leading to more confusion. Additionally, pipelines might not be parseable in this method as some processors require the new cluster state metadata before construction (e.g. `enrich` requires cluster metadata to be set before creating the processor). closes elastic#55985
…nes (#56020) (#56126) If there are ill-formed pipelines, or other pipelines are not ready to be parsed, `InferenceProcessor.Factory::accept(ClusterState)` logs warnings. This can be confusing and cause log spam. It might lead folks to think there an issue with the inference processor. Also, they would see logs for the inference processor even though they might not be using the inference processor. Leading to more confusion. Additionally, pipelines might not be parseable in this method as some processors require the new cluster state metadata before construction (e.g. `enrich` requires cluster metadata to be set before creating the processor). closes #55985
…pipelines (#56020) (#56127) * [ML] reduce InferenceProcessor.Factory log spam by not parsing pipelines (#56020) If there are ill-formed pipelines, or other pipelines are not ready to be parsed, `InferenceProcessor.Factory::accept(ClusterState)` logs warnings. This can be confusing and cause log spam. It might lead folks to think there an issue with the inference processor. Also, they would see logs for the inference processor even though they might not be using the inference processor. Leading to more confusion. Additionally, pipelines might not be parseable in this method as some processors require the new cluster state metadata before construction (e.g. `enrich` requires cluster metadata to be set before creating the processor). closes #55985 * fixing for backport Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
If there are ill-formed pipelines, or other pipelines are not ready to be parsed,
InferenceProcessor.Factory::accept(ClusterState)
logs warnings. This can be confusing and cause log spam.It might lead folks to think there an issue with the inference processor. Also, they would see logs for the inference processor even though they might not be using the inference processor. Leading to more confusion.
Additionally, pipelines might not be parseable in this method as some processors require the new cluster state metadata before construction (e.g.
enrich
requires cluster metadata to be set before creating the processor).closes #55985