diff --git a/core/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java b/core/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java index 4d42ad334a9f0..560379a6ce2f6 100644 --- a/core/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java +++ b/core/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java @@ -21,12 +21,14 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -112,11 +114,14 @@ public Exception getFailure() { private Item[] items; + private long tookInMillis; + MultiSearchResponse() { } - public MultiSearchResponse(Item[] items) { + public MultiSearchResponse(Item[] items, long tookInMillis) { this.items = items; + this.tookInMillis = tookInMillis; } @Override @@ -131,6 +136,13 @@ public Item[] getResponses() { return this.items; } + /** + * How long the msearch took. + */ + public TimeValue getTook() { + return new TimeValue(tookInMillis); + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); @@ -138,6 +150,9 @@ public void readFrom(StreamInput in) throws IOException { for (int i = 0; i < items.length; i++) { items[i] = Item.readItem(in); } + if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + tookInMillis = in.readVLong(); + } } @Override @@ -147,11 +162,15 @@ public void writeTo(StreamOutput out) throws IOException { for (Item item : items) { item.writeTo(out); } + if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + out.writeVLong(tookInMillis); + } } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); + builder.field("took", tookInMillis); builder.startArray(Fields.RESPONSES); for (Item item : items) { builder.startObject(); diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java index b65cd4d55516a..9dec3be5c1b11 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java @@ -34,16 +34,18 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.LongSupplier; public class TransportMultiSearchAction extends HandledTransportAction { private final int availableProcessors; private final ClusterService clusterService; private final TransportAction searchAction; + private final LongSupplier relativeTimeProvider; @Inject public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, TransportService transportService, @@ -53,19 +55,23 @@ public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, Tran this.clusterService = clusterService; this.searchAction = searchAction; this.availableProcessors = EsExecutors.numberOfProcessors(settings); + this.relativeTimeProvider = System::nanoTime; } TransportMultiSearchAction(ThreadPool threadPool, ActionFilters actionFilters, TransportService transportService, ClusterService clusterService, TransportAction searchAction, - IndexNameExpressionResolver resolver, int availableProcessors) { + IndexNameExpressionResolver resolver, int availableProcessors, LongSupplier relativeTimeProvider) { super(Settings.EMPTY, MultiSearchAction.NAME, threadPool, transportService, actionFilters, resolver, MultiSearchRequest::new); this.clusterService = clusterService; this.searchAction = searchAction; this.availableProcessors = availableProcessors; + this.relativeTimeProvider = relativeTimeProvider; } @Override protected void doExecute(MultiSearchRequest request, ActionListener listener) { + final long relativeStartTime = relativeTimeProvider.getAsLong(); + ClusterState clusterState = clusterService.state(); clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); @@ -85,7 +91,7 @@ protected void doExecute(MultiSearchRequest request, ActionListener requests, final AtomicArray responses, final AtomicInteger responseCounter, - final ActionListener listener) { + final ActionListener listener, + final long relativeStartTime) { SearchRequestSlot request = requests.poll(); if (request == null) { /* @@ -155,16 +162,25 @@ private void handleResponse(final int responseSlot, final MultiSearchResponse.It } else { if (thread == Thread.currentThread()) { // we are on the same thread, we need to fork to another thread to avoid recursive stack overflow on a single thread - threadPool.generic().execute(() -> executeSearch(requests, responses, responseCounter, listener)); + threadPool.generic() + .execute(() -> executeSearch(requests, responses, responseCounter, listener, relativeStartTime)); } else { // we are on a different thread (we went asynchronous), it's safe to recurse - executeSearch(requests, responses, responseCounter, listener); + executeSearch(requests, responses, responseCounter, listener, relativeStartTime); } } } private void finish() { - listener.onResponse(new MultiSearchResponse(responses.toArray(new MultiSearchResponse.Item[responses.length()]))); + listener.onResponse(new MultiSearchResponse(responses.toArray(new MultiSearchResponse.Item[responses.length()]), + buildTookInMillis())); + } + + /** + * Builds how long it took to execute the msearch. + */ + private long buildTookInMillis() { + return TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - relativeStartTime); } }); } @@ -178,7 +194,5 @@ static final class SearchRequestSlot { this.request = request; this.responseSlot = responseSlot; } - } - } diff --git a/core/src/test/java/org/elasticsearch/action/search/ExpandSearchPhaseTests.java b/core/src/test/java/org/elasticsearch/action/search/ExpandSearchPhaseTests.java index b84dafb4f6d5b..0951380fcf4aa 100644 --- a/core/src/test/java/org/elasticsearch/action/search/ExpandSearchPhaseTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/ExpandSearchPhaseTests.java @@ -101,7 +101,8 @@ void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionL mSearchResponses.add(new MultiSearchResponse.Item(response, null)); } - listener.onResponse(new MultiSearchResponse(mSearchResponses.toArray(new MultiSearchResponse.Item[0]))); + listener.onResponse( + new MultiSearchResponse(mSearchResponses.toArray(new MultiSearchResponse.Item[0]), randomIntBetween(1, 10000))); } }; @@ -153,10 +154,11 @@ void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionL InternalSearchResponse internalSearchResponse = new InternalSearchResponse(collapsedHits, null, null, null, false, null, 1); SearchResponse response = mockSearchPhaseContext.buildSearchResponse(internalSearchResponse, null); - listener.onResponse(new MultiSearchResponse(new MultiSearchResponse.Item[]{ - new MultiSearchResponse.Item(null, new RuntimeException("boom")), - new MultiSearchResponse.Item(response, null) - })); + listener.onResponse(new MultiSearchResponse( + new MultiSearchResponse.Item[]{ + new MultiSearchResponse.Item(null, new RuntimeException("boom")), + new MultiSearchResponse.Item(response, null) + }, randomIntBetween(1, 10000))); } }; diff --git a/core/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java b/core/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java new file mode 100644 index 0000000000000..73743230d1a14 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java @@ -0,0 +1,199 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.LongSupplier; + +import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * MultiSearch took time tests + */ +public class MultiSearchActionTookTests extends ESTestCase { + + private ThreadPool threadPool; + private ClusterService clusterService; + + @BeforeClass + public static void beforeClass() { + } + + @AfterClass + public static void afterClass() { + } + + @Before + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool("MultiSearchActionTookTests"); + clusterService = createClusterService(threadPool); + } + + @After + public void tearDown() throws Exception { + clusterService.close(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + super.tearDown(); + } + + // test unit conversion using a controller clock + public void testTookWithControlledClock() throws Exception { + runTestTook(true); + } + + // test using System#nanoTime + public void testTookWithRealClock() throws Exception { + runTestTook(false); + } + + private void runTestTook(boolean controlledClock) throws Exception { + MultiSearchRequest multiSearchRequest = new MultiSearchRequest().add(new SearchRequest()); + AtomicLong expected = new AtomicLong(); + + TransportMultiSearchAction action = createTransportMultiSearchAction(controlledClock, expected); + + action.doExecute(multiSearchRequest, new ActionListener() { + @Override + public void onResponse(MultiSearchResponse multiSearchResponse) { + if (controlledClock) { + assertThat(TimeUnit.MILLISECONDS.convert(expected.get(), TimeUnit.NANOSECONDS), + equalTo(multiSearchResponse.getTook().getMillis())); + } else { + assertThat(multiSearchResponse.getTook().getMillis(), + greaterThanOrEqualTo(TimeUnit.MILLISECONDS.convert(expected.get(), TimeUnit.NANOSECONDS))); + } + } + + @Override + public void onFailure(Exception e) { + throw new RuntimeException(e); + } + }); + } + + private TransportMultiSearchAction createTransportMultiSearchAction(boolean controlledClock, AtomicLong expected) { + Settings settings = Settings.builder().put("node.name", TransportMultiSearchActionTests.class.getSimpleName()).build(); + TaskManager taskManager = mock(TaskManager.class); + TransportService transportService = new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null) { + @Override + public TaskManager getTaskManager() { + return taskManager; + } + }; + ActionFilters actionFilters = new ActionFilters(new HashSet<>()); + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("test")).build()); + IndexNameExpressionResolver resolver = new Resolver(Settings.EMPTY); + + final int availableProcessors = Runtime.getRuntime().availableProcessors(); + AtomicInteger counter = new AtomicInteger(); + final List threadPoolNames = Arrays.asList(ThreadPool.Names.GENERIC, ThreadPool.Names.SAME); + Randomness.shuffle(threadPoolNames); + final ExecutorService commonExecutor = threadPool.executor(threadPoolNames.get(0)); + final Set requests = Collections.newSetFromMap(Collections.synchronizedMap(new IdentityHashMap<>())); + + TransportAction searchAction = new TransportAction(Settings.EMPTY, + "action", threadPool, actionFilters, resolver, taskManager) { + @Override + protected void doExecute(SearchRequest request, ActionListener listener) { + requests.add(request); + commonExecutor.execute(() -> { + counter.decrementAndGet(); + listener.onResponse(new SearchResponse()); + }); + } + }; + + if (controlledClock) { + return new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, searchAction, resolver, + availableProcessors, expected::get) { + @Override + void executeSearch(final Queue requests, final AtomicArray responses, + final AtomicInteger responseCounter, final ActionListener listener, long startTimeInNanos) { + expected.set(1000000); + super.executeSearch(requests, responses, responseCounter, listener, startTimeInNanos); + } + }; + } else { + return new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, searchAction, resolver, + availableProcessors, System::nanoTime) { + + @Override + void executeSearch(final Queue requests, final AtomicArray responses, + final AtomicInteger responseCounter, final ActionListener listener, long startTimeInNanos) { + long elapsed = spinForAtLeastNMilliseconds(randomIntBetween(0, 10)); + expected.set(elapsed); + super.executeSearch(requests, responses, responseCounter, listener, startTimeInNanos); + } + }; + } + } + + static class Resolver extends IndexNameExpressionResolver { + + Resolver(Settings settings) { + super(settings); + } + + @Override + public String[] concreteIndexNames(ClusterState state, IndicesRequest request) { + return request.indices(); + } + } +} diff --git a/core/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java b/core/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java index 3a162f302bc3b..e6de1d859d867 100644 --- a/core/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java @@ -146,13 +146,16 @@ public void testSimpleAdd4() throws Exception { } public void testResponseErrorToXContent() throws IOException { + long tookInMillis = randomIntBetween(1, 1000); MultiSearchResponse response = new MultiSearchResponse( - new MultiSearchResponse.Item[]{ - new MultiSearchResponse.Item(null, new IllegalStateException("foobar")), - new MultiSearchResponse.Item(null, new IllegalStateException("baaaaaazzzz")) - }); - - assertEquals("{\"responses\":[" + new MultiSearchResponse.Item[] { + new MultiSearchResponse.Item(null, new IllegalStateException("foobar")), + new MultiSearchResponse.Item(null, new IllegalStateException("baaaaaazzzz")) + }, tookInMillis); + + assertEquals("{\"took\":" + + tookInMillis + + ",\"responses\":[" + "{" + "\"error\":{\"root_cause\":[{\"type\":\"illegal_state_exception\",\"reason\":\"foobar\"}]," + "\"type\":\"illegal_state_exception\",\"reason\":\"foobar\"},\"status\":500" diff --git a/core/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java b/core/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java index e811da82c47a8..4410507eef92e 100644 --- a/core/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java @@ -102,8 +102,10 @@ protected void doExecute(SearchRequest request, ActionListener l }); } }; - TransportMultiSearchAction action = - new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, searchAction, resolver, 10); + + TransportMultiSearchAction action = + new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, searchAction, resolver, 10, + System::nanoTime); // Execute the multi search api and fail if we find an error after executing: try {