-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add verbose pipeline parameter to output each processor's execution details #16843
base: main
Are you sure you want to change the base?
Add verbose pipeline parameter to output each processor's execution details #16843
Conversation
…etails Signed-off-by: Junwei Dai <junweid@amazon.com>
Signed-off-by: Junwei Dai <junweid@amazon.com>
d5a2c4c
to
d931750
Compare
Signed-off-by: Junwei Dai <junweid@amazon.com>
@joshpalis Hey Josh, can you take a look when you have time? :) |
Signed-off-by: Junwei Dai <junweid@amazon.com>
❌ Gradle check result for 488377f: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
❌ Gradle check result for 719fa1c: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Signed-off-by: Junwei Dai <junweid@amazon.com>
719fa1c
to
e4e30f5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work @junweid62 on implementing this feature. A few comments from my initial pass
* @return a deep copy of the current SearchHits object | ||
* @throws IOException if an I/O exception occurs during serialization or deserialization | ||
*/ | ||
public SearchHits deepCopy() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Im curious why this serialization/deserialization is needed
|
||
private static void writeProcessorResultOnOrAfter(StreamOutput out, List<ProcessorExecutionDetail> processorResult) throws IOException { | ||
if (out.getVersion().onOrAfter(Version.V_2_18_0)) { | ||
out.writeCollection(processorResult, (o, detail) -> detail.writeTo(o)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit : we could use out.writeList()
here
@@ -225,11 +241,19 @@ ActionListener<SearchResponse> transformResponseListener( | |||
final SearchResponseProcessor processor = searchResponseProcessors.get(i); | |||
|
|||
responseListener = ActionListener.wrap(r -> { | |||
ProcessorExecutionDetail detail = new ProcessorExecutionDetail(processor.getType()); | |||
detail.addInput(Arrays.asList(r.getHits().deepCopy().getHits())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trying to understand, what is the rationale for invoking deepCopy()
? r.getHits()
already returns an object of type SearchHit
, this can just be written as detail.addInput(Arrays.asList(r.getHits().getHits()));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I initially thought the same:). However, when the Object receives this list, it holds a reference to the original SearchHit objects. Without a deepCopy, any modifications to the output will also reflect in the input, as they share the same reference.
By using deepCopy, we ensure that the input and output hold independent values, allowing us to clearly see the differences between the two at each step of the processor.
beforeResponseProcessor(processor); | ||
final long start = relativeTimeSupplier.getAsLong(); | ||
processor.processResponseAsync(request, r, requestContext, ActionListener.wrap(rr -> { | ||
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start); | ||
afterResponseProcessor(processor, took); | ||
detail.addOutput(Arrays.asList(rr.getHits().deepCopy().getHits())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
❌ Gradle check result for e4e30f5: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Description
Related RFC : #16705
This PR introduces enhancements to OpenSearch's search pipeline functionality, focusing on improving the traceability and debugging of search request and response transformations. It addresses the increasing complexity of search pipeline processors by implementing verbose mode support, which provides detailed insights into processor execution.
Adds Verbose Mode for Search Pipelines:
verbose_pipeline
parameter to search requests, default to false.Improves Pipeline Debugging:
Supports All Pipeline Configurations:
Test Framework Enhancements:
Example output with request processor:
filter_query
response processor:rename_field
andsort
Related Issues
Resolves #14745
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.