Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SOLR-17419: Introduce ParallelHttpShardHandler #2681

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ public class HttpShardHandler extends ShardHandler {
*/
public static String ONLY_NRT_REPLICAS = "distribOnlyRealtime";

private HttpShardHandlerFactory httpShardHandlerFactory;
private Map<ShardResponse, CompletableFuture<LBSolrClient.Rsp>> responseFutureMap;
private BlockingQueue<ShardResponse> responses;
private AtomicInteger pending;
private Map<String, List<String>> shardToURLs;
private LBHttp2SolrClient lbClient;
protected HttpShardHandlerFactory httpShardHandlerFactory;
protected Map<ShardResponse, CompletableFuture<LBSolrClient.Rsp>> responseFutureMap;
protected BlockingQueue<ShardResponse> responses;
protected AtomicInteger pending;
protected Map<String, List<String>> shardToURLs;
protected LBHttp2SolrClient lbClient;

public HttpShardHandler(HttpShardHandlerFactory httpShardHandlerFactory) {
this.httpShardHandlerFactory = httpShardHandlerFactory;
Expand All @@ -80,7 +80,7 @@ public HttpShardHandler(HttpShardHandlerFactory httpShardHandlerFactory) {
shardToURLs = new HashMap<>();
}

private static class SimpleSolrResponse extends SolrResponse {
public static class SimpleSolrResponse extends SolrResponse {

volatile long elapsedTime;

Expand Down Expand Up @@ -109,7 +109,7 @@ public void setElapsedTime(long elapsedTime) {

// Not thread safe... don't use in Callable.
// Don't modify the returned URL list.
private List<String> getURLs(String shard) {
protected List<String> getURLs(String shard) {
List<String> urls = shardToURLs.get(shard);
if (urls == null) {
urls = httpShardHandlerFactory.buildURLList(shard);
Expand All @@ -118,48 +118,58 @@ private List<String> getURLs(String shard) {
return urls;
}

@Override
public void submit(
final ShardRequest sreq, final String shard, final ModifiableSolrParams params) {
// do this outside of the callable for thread safety reasons
final List<String> urls = getURLs(shard);

protected LBSolrClient.Req prepareLBRequest(
ShardRequest sreq, String shard, ModifiableSolrParams params, List<String> urls) {
params.remove(CommonParams.WT); // use default (currently javabin)
params.remove(CommonParams.VERSION);
QueryRequest req = makeQueryRequest(sreq, params, shard);
req.setMethod(SolrRequest.METHOD.POST);
SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo();
if (requestInfo != null) {
req.setUserPrincipal(requestInfo.getReq().getUserPrincipal());
}

LBSolrClient.Req lbReq = httpShardHandlerFactory.newLBHttpSolrClientReq(req, urls);
return httpShardHandlerFactory.newLBHttpSolrClientReq(req, urls);
}

protected ShardResponse prepareShardResponse(ShardRequest sreq, String shard) {
ShardResponse srsp = new ShardResponse();
if (sreq.nodeName != null) {
srsp.setNodeName(sreq.nodeName);
}
srsp.setShardRequest(sreq);
srsp.setShard(shard);
SimpleSolrResponse ssr = new SimpleSolrResponse();
srsp.setSolrResponse(ssr);

return srsp;
}

protected void recordNoUrlShardResponse(ShardResponse srsp, String shard) {
// TODO: what's the right error code here? We should use the same thing when
// all of the servers for a shard are down.
SolrException exception =
new SolrException(
SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting shard: " + shard);
srsp.setException(exception);
srsp.setResponseCode(exception.code());
responses.add(srsp);
}

@Override
public void submit(ShardRequest sreq, String shard, ModifiableSolrParams params) {
// do this outside of the callable for thread safety reasons
final List<String> urls = getURLs(shard);
final var lbReq = prepareLBRequest(sreq, shard, params, urls);
final var srsp = prepareShardResponse(sreq, shard);
final var ssr = new SimpleSolrResponse();
srsp.setSolrResponse(ssr);
pending.incrementAndGet();
// if there are no shards available for a slice, urls.size()==0

if (urls.isEmpty()) {
// TODO: what's the right error code here? We should use the same thing when
// all of the servers for a shard are down.
SolrException exception =
new SolrException(
SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting shard: " + shard);
srsp.setException(exception);
srsp.setResponseCode(exception.code());
responses.add(srsp);
recordNoUrlShardResponse(srsp, shard);
return;
}

long startTime = System.nanoTime();
SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo();
if (requestInfo != null) {
req.setUserPrincipal(requestInfo.getReq().getUserPrincipal());
}

CompletableFuture<LBSolrClient.Rsp> future = this.lbClient.requestAsync(lbReq);
future.whenComplete(
(rsp, throwable) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory
// requests at some point (or should we simply return failure?)
//
// This executor is initialized in the init method
private ExecutorService commExecutor;
protected ExecutorService commExecutor;

protected volatile Http2SolrClient defaultClient;
protected InstrumentedHttpListenerFactory httpListenerFactory;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler.component;

import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import net.jcip.annotations.NotThreadSafe;
import org.apache.solr.client.solrj.impl.LBSolrClient;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public class ParallelHttpShardHandler extends HttpShardHandler {

private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private final ExecutorService commExecutor;

public ParallelHttpShardHandler(ParallelHttpShardHandlerFactory httpShardHandlerFactory) {
super(httpShardHandlerFactory);
this.commExecutor = httpShardHandlerFactory.commExecutor;
}

@Override
public void submit(ShardRequest sreq, String shard, ModifiableSolrParams params) {
// do this outside of the callable for thread safety reasons
final List<String> urls = getURLs(shard);
final var lbReq = prepareLBRequest(sreq, shard, params, urls);
final var srsp = prepareShardResponse(sreq, shard);
final var ssr = new SimpleSolrResponse();
srsp.setSolrResponse(ssr);
pending.incrementAndGet();

if (urls.isEmpty()) {
recordNoUrlShardResponse(srsp, shard);
return;
}

long startTime = System.nanoTime();
final Runnable executeRequestRunnable =
() -> {
CompletableFuture<LBSolrClient.Rsp> future = this.lbClient.requestAsync(lbReq);
future.whenComplete(
(rsp, throwable) -> {
if (rsp != null) {
ssr.nl = rsp.getResponse();
srsp.setShardAddress(rsp.getServer());
ssr.elapsedTime =
TimeUnit.MILLISECONDS.convert(
System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
responses.add(srsp);
} else if (throwable != null) {
ssr.elapsedTime =
TimeUnit.MILLISECONDS.convert(
System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
srsp.setException(throwable);
if (throwable instanceof SolrException) {
srsp.setResponseCode(((SolrException) throwable).code());
}
responses.add(srsp);
}
});
responseFutureMap.put(srsp, future);
};

CompletableFuture.runAsync(executeRequestRunnable, commExecutor);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler.component;

import java.lang.invoke.MethodHandles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ParallelHttpShardHandlerFactory extends HttpShardHandlerFactory {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

@Override
public ShardHandler getShardHandler() {
return new ParallelHttpShardHandler(this);
}
}
4 changes: 4 additions & 0 deletions solr/server/solr/configsets/_default/conf/solrconfig.xml
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,10 @@

<!-- Primary search handler, expected by most clients, examples and UI frameworks -->
<requestHandler name="/select" class="solr.SearchHandler">
<shardHandlerFactory name="shardHandlerFactory" class="${shardHandlerFactory:ParallelHttpShardHandlerFactory}">
gerlowskija marked this conversation as resolved.
Show resolved Hide resolved
<int name="socketTimeout">${socketTimeout:600000}</int>
<int name="connTimeout">${connTimeout:60000}</int>
</shardHandlerFactory>
<lst name="defaults">
<str name="echoParams">explicit</str>
<int name="rows">10</int>
Expand Down
Loading