Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introducing "took" time (in ms) for _msearch #23767

Merged
merged 10 commits into from
Nov 2, 2017
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;

Expand Down Expand Up @@ -112,11 +114,14 @@ public Exception getFailure() {

private Item[] items;

private long tookInMillis;

MultiSearchResponse() {
}

public MultiSearchResponse(Item[] items) {
public MultiSearchResponse(Item[] items, long tookInMillis) {
this.items = items;
this.tookInMillis = tookInMillis;
}

@Override
Expand All @@ -131,13 +136,23 @@ public Item[] getResponses() {
return this.items;
}

/**
* How long the msearch took.
*/
public TimeValue getTook() {
return new TimeValue(tookInMillis);
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
items = new Item[in.readVInt()];
for (int i = 0; i < items.length; i++) {
items[i] = Item.readItem(in);
}
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
tookInMillis = in.readVLong();
}
}

@Override
Expand All @@ -147,11 +162,15 @@ public void writeTo(StreamOutput out) throws IOException {
for (Item item : items) {
item.writeTo(out);
}
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeVLong(tookInMillis);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("took", tookInMillis);
builder.startArray(Fields.RESPONSES);
for (Item item : items) {
builder.startObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,18 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.LongSupplier;

public class TransportMultiSearchAction extends HandledTransportAction<MultiSearchRequest, MultiSearchResponse> {

private final int availableProcessors;
private final ClusterService clusterService;
private final TransportAction<SearchRequest, SearchResponse> searchAction;
private final LongSupplier relativeTimeProvider;

@Inject
public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, TransportService transportService,
Expand All @@ -53,19 +55,23 @@ public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, Tran
this.clusterService = clusterService;
this.searchAction = searchAction;
this.availableProcessors = EsExecutors.numberOfProcessors(settings);
this.relativeTimeProvider = System::nanoTime;
}

TransportMultiSearchAction(ThreadPool threadPool, ActionFilters actionFilters, TransportService transportService,
ClusterService clusterService, TransportAction<SearchRequest, SearchResponse> searchAction,
IndexNameExpressionResolver resolver, int availableProcessors) {
IndexNameExpressionResolver resolver, int availableProcessors, LongSupplier relativeTimeProvider) {
super(Settings.EMPTY, MultiSearchAction.NAME, threadPool, transportService, actionFilters, resolver, MultiSearchRequest::new);
this.clusterService = clusterService;
this.searchAction = searchAction;
this.availableProcessors = availableProcessors;
this.relativeTimeProvider = relativeTimeProvider;
}

@Override
protected void doExecute(MultiSearchRequest request, ActionListener<MultiSearchResponse> listener) {
final long relativeStartTime = relativeTimeProvider.getAsLong();

ClusterState clusterState = clusterService.state();
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);

Expand All @@ -85,7 +91,7 @@ protected void doExecute(MultiSearchRequest request, ActionListener<MultiSearchR
final AtomicInteger responseCounter = new AtomicInteger(numRequests);
int numConcurrentSearches = Math.min(numRequests, maxConcurrentSearches);
for (int i = 0; i < numConcurrentSearches; i++) {
executeSearch(searchRequestSlots, responses, responseCounter, listener);
executeSearch(searchRequestSlots, responses, responseCounter, listener, relativeStartTime);
}
}

Expand All @@ -111,11 +117,12 @@ static int defaultMaxConcurrentSearches(int availableProcessors, ClusterState st
* @param responseCounter incremented on each response
* @param listener the listener attached to the multi-search request
*/
private void executeSearch(
void executeSearch(
final Queue<SearchRequestSlot> requests,
final AtomicArray<MultiSearchResponse.Item> responses,
final AtomicInteger responseCounter,
final ActionListener<MultiSearchResponse> listener) {
final ActionListener<MultiSearchResponse> listener,
final long relativeStartTime) {
SearchRequestSlot request = requests.poll();
if (request == null) {
/*
Expand Down Expand Up @@ -155,16 +162,25 @@ private void handleResponse(final int responseSlot, final MultiSearchResponse.It
} else {
if (thread == Thread.currentThread()) {
// we are on the same thread, we need to fork to another thread to avoid recursive stack overflow on a single thread
threadPool.generic().execute(() -> executeSearch(requests, responses, responseCounter, listener));
threadPool.generic()
.execute(() -> executeSearch(requests, responses, responseCounter, listener, relativeStartTime));
} else {
// we are on a different thread (we went asynchronous), it's safe to recurse
executeSearch(requests, responses, responseCounter, listener);
executeSearch(requests, responses, responseCounter, listener, relativeStartTime);
}
}
}

private void finish() {
listener.onResponse(new MultiSearchResponse(responses.toArray(new MultiSearchResponse.Item[responses.length()])));
listener.onResponse(new MultiSearchResponse(responses.toArray(new MultiSearchResponse.Item[responses.length()]),
buildTookInMillis()));
}

/**
* Builds how long it took to execute the msearch.
*/
private long buildTookInMillis() {
return TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - relativeStartTime);
}
});
}
Expand All @@ -178,7 +194,5 @@ static final class SearchRequestSlot {
this.request = request;
this.responseSlot = responseSlot;
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert this change.

}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert this change.

}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionL
mSearchResponses.add(new MultiSearchResponse.Item(response, null));
}

listener.onResponse(new MultiSearchResponse(mSearchResponses.toArray(new MultiSearchResponse.Item[0])));
listener.onResponse(
new MultiSearchResponse(mSearchResponses.toArray(new MultiSearchResponse.Item[0]), randomIntBetween(1, 10000)));
}
};

Expand Down Expand Up @@ -153,10 +154,11 @@ void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionL
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(collapsedHits,
null, null, null, false, null, 1);
SearchResponse response = mockSearchPhaseContext.buildSearchResponse(internalSearchResponse, null);
listener.onResponse(new MultiSearchResponse(new MultiSearchResponse.Item[]{
new MultiSearchResponse.Item(null, new RuntimeException("boom")),
new MultiSearchResponse.Item(response, null)
}));
listener.onResponse(new MultiSearchResponse(
new MultiSearchResponse.Item[]{
new MultiSearchResponse.Item(null, new RuntimeException("boom")),
new MultiSearchResponse.Item(response, null)
}, randomIntBetween(1, 10000)));
}
};

Expand Down
Loading