-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introducing "took" time (in ms) for _msearch
#23767
Changes from 7 commits
b0b06e5
8317497
b7c41ce
83f77ea
c6ad0cc
6bb1d2c
ccae5b4
516937a
7ced928
0744b44
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,16 +34,18 @@ | |
import org.elasticsearch.threadpool.ThreadPool; | ||
import org.elasticsearch.transport.TransportService; | ||
|
||
import java.util.List; | ||
import java.util.Queue; | ||
import java.util.concurrent.ConcurrentLinkedQueue; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import java.util.function.LongSupplier; | ||
|
||
public class TransportMultiSearchAction extends HandledTransportAction<MultiSearchRequest, MultiSearchResponse> { | ||
|
||
private final int availableProcessors; | ||
private final ClusterService clusterService; | ||
private final TransportAction<SearchRequest, SearchResponse> searchAction; | ||
private final LongSupplier relativeTimeProvider; | ||
|
||
@Inject | ||
public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, TransportService transportService, | ||
|
@@ -53,19 +55,23 @@ public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, Tran | |
this.clusterService = clusterService; | ||
this.searchAction = searchAction; | ||
this.availableProcessors = EsExecutors.numberOfProcessors(settings); | ||
this.relativeTimeProvider = System::nanoTime; | ||
} | ||
|
||
TransportMultiSearchAction(ThreadPool threadPool, ActionFilters actionFilters, TransportService transportService, | ||
ClusterService clusterService, TransportAction<SearchRequest, SearchResponse> searchAction, | ||
IndexNameExpressionResolver resolver, int availableProcessors) { | ||
IndexNameExpressionResolver resolver, int availableProcessors, LongSupplier relativeTimeProvider) { | ||
super(Settings.EMPTY, MultiSearchAction.NAME, threadPool, transportService, actionFilters, resolver, MultiSearchRequest::new); | ||
this.clusterService = clusterService; | ||
this.searchAction = searchAction; | ||
this.availableProcessors = availableProcessors; | ||
this.relativeTimeProvider = relativeTimeProvider; | ||
} | ||
|
||
@Override | ||
protected void doExecute(MultiSearchRequest request, ActionListener<MultiSearchResponse> listener) { | ||
long startTimeInNanos = relativeTime(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can be final, and change it's name |
||
|
||
ClusterState clusterState = clusterService.state(); | ||
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); | ||
|
||
|
@@ -85,7 +91,7 @@ protected void doExecute(MultiSearchRequest request, ActionListener<MultiSearchR | |
final AtomicInteger responseCounter = new AtomicInteger(numRequests); | ||
int numConcurrentSearches = Math.min(numRequests, maxConcurrentSearches); | ||
for (int i = 0; i < numConcurrentSearches; i++) { | ||
executeSearch(searchRequestSlots, responses, responseCounter, listener); | ||
executeSearch(searchRequestSlots, responses, responseCounter, listener, startTimeInNanos); | ||
} | ||
} | ||
|
||
|
@@ -111,11 +117,12 @@ static int defaultMaxConcurrentSearches(int availableProcessors, ClusterState st | |
* @param responseCounter incremented on each response | ||
* @param listener the listener attached to the multi-search request | ||
*/ | ||
private void executeSearch( | ||
void executeSearch( | ||
final Queue<SearchRequestSlot> requests, | ||
final AtomicArray<MultiSearchResponse.Item> responses, | ||
final AtomicInteger responseCounter, | ||
final ActionListener<MultiSearchResponse> listener) { | ||
final ActionListener<MultiSearchResponse> listener, | ||
long startTimeInNanos) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please make this parameter final, and name it |
||
SearchRequestSlot request = requests.poll(); | ||
if (request == null) { | ||
/* | ||
|
@@ -155,20 +162,32 @@ private void handleResponse(final int responseSlot, final MultiSearchResponse.It | |
} else { | ||
if (thread == Thread.currentThread()) { | ||
// we are on the same thread, we need to fork to another thread to avoid recursive stack overflow on a single thread | ||
threadPool.generic().execute(() -> executeSearch(requests, responses, responseCounter, listener)); | ||
threadPool.generic().execute(() -> executeSearch(requests, responses, responseCounter, listener, startTimeInNanos)); | ||
} else { | ||
// we are on a different thread (we went asynchronous), it's safe to recurse | ||
executeSearch(requests, responses, responseCounter, listener); | ||
executeSearch(requests, responses, responseCounter, listener, startTimeInNanos); | ||
} | ||
} | ||
} | ||
|
||
private void finish() { | ||
listener.onResponse(new MultiSearchResponse(responses.toArray(new MultiSearchResponse.Item[responses.length()]))); | ||
listener.onResponse(new MultiSearchResponse(responses.toArray(new MultiSearchResponse.Item[responses.length()]), | ||
buildTookInMillis())); | ||
} | ||
|
||
/** | ||
* Builds how long it took to execute the msearch. | ||
*/ | ||
private long buildTookInMillis() { | ||
return TimeUnit.NANOSECONDS.toMillis(relativeTime() - startTimeInNanos); | ||
} | ||
}); | ||
} | ||
|
||
private long relativeTime() { | ||
return relativeTimeProvider.getAsLong(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this method is needed. |
||
} | ||
|
||
static final class SearchRequestSlot { | ||
|
||
final SearchRequest request; | ||
|
@@ -178,7 +197,5 @@ static final class SearchRequestSlot { | |
this.request = request; | ||
this.responseSlot = responseSlot; | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please revert this change. |
||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please revert this change. |
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this is only used in tests, and only to get the milliseconds, so I think that we can remove it and just use
getTookInMillis
in those places.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method exists in both
SearchResponse
andBulkResponse
.I would prefer to keep the consistency unless there is a good reason no to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a good reason, the method is unneeded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So shall I remove also the corresponding methods for
Bulk
andSearch
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not as part of this change.