Skip to content

Commit

Permalink
SOLR-16927 Allow SolrClientCache clients to use Jetty HTTP2 clients (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
stillalex authored Aug 21, 2023
1 parent e0462e7 commit f6ef54a
Show file tree
Hide file tree
Showing 32 changed files with 596 additions and 340 deletions.
2 changes: 2 additions & 0 deletions solr/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ Improvements

* SOLR-16940: Users can pass Java system properties to the SolrCLI via the SOLR_TOOL_OPTS environment variable. (Houston Putman)

* SOLR-16927: Allow SolrClientCache clients to use Jetty HTTP2 clients (Alex Deparvu, David Smiley)

Optimizations
---------------------

Expand Down
3 changes: 2 additions & 1 deletion solr/benchmark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ task echoCp {
dependencies {
implementation project(':solr:test-framework')
implementation project(':solr:solrj')
implementation project(':solr:solrj-streaming')

implementation 'org.apache.lucene:lucene-core'

implementation 'org.apache.httpcomponents:httpclient'
implementation 'commons-io:commons-io'
implementation 'io.dropwizard.metrics:metrics-core'
implementation 'org.apache.commons:commons-math3'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public static class MiniClusterBenchState {
/** The Nodes. */
public List<String> nodes;

public String zkHost;

/** The Cluster. */
MiniSolrCloudCluster cluster;

Expand Down Expand Up @@ -277,6 +279,7 @@ public void startMiniCluster(int nodeCount) {
for (JettySolrRunner runner : jetties) {
nodes.add(runner.getBaseUrl().toString());
}
zkHost = cluster.getZkServer().getZkAddress();

client = new Http2SolrClient.Builder().useHttp1_1(useHttp1).build();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.bench.search;

import static org.apache.solr.bench.Docs.docs;
import static org.apache.solr.bench.generators.SourceDSL.integers;
import static org.apache.solr.bench.generators.SourceDSL.strings;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.solr.bench.Docs;
import org.apache.solr.bench.MiniClusterState;
import org.apache.solr.bench.MiniClusterState.MiniClusterBenchState;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;

@Fork(value = 1)
@BenchmarkMode(Mode.Throughput)
@Warmup(time = 5, iterations = 1)
@Measurement(time = 30, iterations = 4)
@Threads(value = 1)
public class StreamingSearch {

private static final String collection = "benchStreamingSearch";

@State(Scope.Benchmark)
public static class BenchState {

@Param({"false", "true"})
boolean useHttp1;

private int docs = 1000;
private String zkHost;
private ModifiableSolrParams params;
private StreamContext streamContext;
private Http2SolrClient http2SolrClient;

@Setup(Level.Trial)
public void setup(MiniClusterBenchState miniClusterState) throws Exception {

miniClusterState.startMiniCluster(3);
miniClusterState.createCollection(collection, 3, 1);
Docs docGen =
docs()
.field("id", integers().incrementing())
.field("text2_ts", strings().basicLatinAlphabet().multi(312).ofLengthBetween(30, 64))
.field("text3_ts", strings().basicLatinAlphabet().multi(312).ofLengthBetween(30, 64))
.field("int1_i_dv", integers().all());
miniClusterState.index(collection, docGen, docs);
miniClusterState.waitForMerges(collection);

zkHost = miniClusterState.zkHost;

params = new ModifiableSolrParams();
params.set(CommonParams.Q, "*:*");
params.set(CommonParams.FL, "id,text2_ts,text3_ts,int1_i_dv");
params.set(CommonParams.SORT, "id asc,int1_i_dv asc");
params.set(CommonParams.ROWS, docs);
}

@Setup(Level.Iteration)
public void setupIteration(MiniClusterState.MiniClusterBenchState miniClusterState)
throws SolrServerException, IOException {
SolrClientCache solrClientCache;
if (useHttp1) {
var httpClient = HttpClientUtil.createClient(null); // TODO tune params?
solrClientCache = new SolrClientCache(httpClient);
} else {
http2SolrClient = newHttp2SolrClient();
solrClientCache = new SolrClientCache(http2SolrClient);
}

streamContext = new StreamContext();
streamContext.setSolrClientCache(solrClientCache);
}

@TearDown(Level.Iteration)
public void teardownIt() {
streamContext.getSolrClientCache().close();
if (http2SolrClient != null) {
http2SolrClient.close();
}
}
}

@Benchmark
public Object stream(
BenchState benchState, MiniClusterState.MiniClusterBenchState miniClusterState)
throws SolrServerException, IOException {
CloudSolrStream stream = new CloudSolrStream(benchState.zkHost, collection, benchState.params);
stream.setStreamContext(benchState.streamContext);
return getTuples(stream);
}

private static List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
List<Tuple> tuples = new ArrayList<>();
try {
tupleStream.open();
while (true) {
Tuple t = tupleStream.read();
if (t.EOF) {
break;
} else {
tuples.add(t);
}
}
return tuples;
} finally {
tupleStream.close();
}
}

public static Http2SolrClient newHttp2SolrClient() {
// TODO tune params?
var builder = new Http2SolrClient.Builder();
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@
*/
package org.apache.solr.client.solrj.io;

import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -28,97 +27,137 @@
import java.util.concurrent.TimeUnit;
import org.apache.http.client.HttpClient;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
import org.apache.solr.client.solrj.impl.CloudLegacySolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.SolrClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The SolrClientCache caches SolrClients so they can be reused by different TupleStreams.
*
* <p>TODO: Cut this over to using Solr's new Http2 clients
*/
public class SolrClientCache implements Serializable {
/** The SolrClientCache caches SolrClients so they can be reused by different TupleStreams. */
public class SolrClientCache implements Closeable {

private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private final Map<String, SolrClient> solrClients = new HashMap<>();
private final HttpClient httpClient;
// Set the floor for timeouts to 60 seconds.
// Timeouts cans be increased by setting the system properties defined below.
private static final int conTimeout =
private static final int MIN_TIMEOUT = 60000;
private static final int minConnTimeout =
Math.max(
Integer.parseInt(System.getProperty(HttpClientUtil.PROP_CONNECTION_TIMEOUT, "60000")),
60000);
private static final int socketTimeout =
Math.max(
Integer.parseInt(System.getProperty(HttpClientUtil.PROP_SO_TIMEOUT, "60000")), 60000);
Integer.getInteger(HttpClientUtil.PROP_CONNECTION_TIMEOUT, MIN_TIMEOUT), MIN_TIMEOUT);
private static final int minSocketTimeout =
Math.max(Integer.getInteger(HttpClientUtil.PROP_SO_TIMEOUT, MIN_TIMEOUT), MIN_TIMEOUT);

private final Map<String, SolrClient> solrClients = new HashMap<>();
private final HttpClient apacheHttpClient;
private final Http2SolrClient http2SolrClient;

public SolrClientCache() {
httpClient = null;
this.apacheHttpClient = null;
this.http2SolrClient = null;
}

@Deprecated(since = "9.0")
public SolrClientCache(HttpClient httpClient) {
this.httpClient = httpClient;
public SolrClientCache(HttpClient apacheHttpClient) {
this.apacheHttpClient = apacheHttpClient;
this.http2SolrClient = null;
}

@Deprecated(since = "9.0")
public synchronized CloudSolrClient getCloudSolrClient(String zkHost) {

// Timeouts should never be lower then 60000 but they can be set higher
assert (conTimeout >= 60000);
assert (socketTimeout >= 60000);

if (log.isDebugEnabled()) {
log.debug("SolrClientCache.conTimeout: {}", conTimeout);
log.debug("SolrClientCache.socketTimeout: {}", socketTimeout);
}
public SolrClientCache(Http2SolrClient http2SolrClient) {
this.apacheHttpClient = null;
this.http2SolrClient = http2SolrClient;
}

public synchronized CloudSolrClient getCloudSolrClient(String zkHost) {
Objects.requireNonNull(zkHost, "ZooKeeper host cannot be null!");
CloudSolrClient client;
if (solrClients.containsKey(zkHost)) {
client = (CloudSolrClient) solrClients.get(zkHost);
return (CloudSolrClient) solrClients.get(zkHost);
}
final CloudSolrClient client;
if (apacheHttpClient != null) {
client = newCloudLegacySolrClient(zkHost, apacheHttpClient);
} else {
final List<String> hosts = new ArrayList<>();
hosts.add(zkHost);
var builder =
new CloudLegacySolrClient.Builder(hosts, Optional.empty())
.withSocketTimeout(socketTimeout, TimeUnit.MILLISECONDS)
.withConnectionTimeout(conTimeout, TimeUnit.MILLISECONDS);
if (httpClient != null) {
builder = builder.withHttpClient(httpClient);
}

client = builder.build();
client.connect();
solrClients.put(zkHost, client);
client = newCloudHttp2SolrClient(zkHost, http2SolrClient);
}
solrClients.put(zkHost, client);
return client;
}

@Deprecated
private static CloudSolrClient newCloudLegacySolrClient(String zkHost, HttpClient httpClient) {
final List<String> hosts = List.of(zkHost);
var builder = new CloudLegacySolrClient.Builder(hosts, Optional.empty());
adjustTimeouts(builder, httpClient);
var client = builder.build();
client.connect();
return client;
}

private static CloudHttp2SolrClient newCloudHttp2SolrClient(
String zkHost, Http2SolrClient http2SolrClient) {
final List<String> hosts = List.of(zkHost);
var builder = new CloudHttp2SolrClient.Builder(hosts, Optional.empty());
// using internal builder to ensure the internal client gets closed
builder = builder.withInternalClientBuilder(newHttp2SolrClientBuilder(null, http2SolrClient));
var client = builder.build();
client.connect();
return client;
}

@Deprecated(since = "9.0")
public synchronized SolrClient getHttpSolrClient(String baseUrl) {
SolrClient client;
Objects.requireNonNull(baseUrl, "Url cannot be null!");
if (solrClients.containsKey(baseUrl)) {
client = solrClients.get(baseUrl);
return solrClients.get(baseUrl);
}
final SolrClient client;
if (apacheHttpClient != null) {
client = newHttpSolrClient(baseUrl, apacheHttpClient);
} else {
HttpSolrClient.Builder builder =
new HttpSolrClient.Builder(baseUrl)
.withSocketTimeout(socketTimeout, TimeUnit.MILLISECONDS)
.withConnectionTimeout(conTimeout, TimeUnit.MILLISECONDS);
if (httpClient != null) {
builder = builder.withHttpClient(httpClient);
}
client = builder.build();
solrClients.put(baseUrl, client);
client = newHttp2SolrClientBuilder(baseUrl, http2SolrClient).build();
}
solrClients.put(baseUrl, client);
return client;
}

@Deprecated
private static SolrClient newHttpSolrClient(String baseUrl, HttpClient httpClient) {
HttpSolrClient.Builder builder = new HttpSolrClient.Builder(baseUrl);
adjustTimeouts(builder, httpClient);
return builder.build();
}

@Deprecated
private static void adjustTimeouts(SolrClientBuilder<?> builder, HttpClient httpClient) {
builder.withHttpClient(httpClient);
int socketTimeout = Math.max(minSocketTimeout, builder.getSocketTimeoutMillis());
builder.withSocketTimeout(socketTimeout, TimeUnit.MILLISECONDS);
int connTimeout = Math.max(minConnTimeout, builder.getConnectionTimeoutMillis());
builder.withConnectionTimeout(connTimeout, TimeUnit.MILLISECONDS);
}

private static Http2SolrClient.Builder newHttp2SolrClientBuilder(
String baseUrl, Http2SolrClient http2SolrClient) {
var builder = new Http2SolrClient.Builder(baseUrl);
if (http2SolrClient != null) {
builder = builder.withHttpClient(http2SolrClient);
}
long idleTimeout = minSocketTimeout;
if (builder.getIdleTimeoutMillis() != null) {
idleTimeout = Math.max(idleTimeout, builder.getIdleTimeoutMillis());
}
builder.withIdleTimeout(idleTimeout, TimeUnit.MILLISECONDS);
long connTimeout = minConnTimeout;
if (builder.getConnectionTimeout() != null) {
connTimeout = Math.max(idleTimeout, builder.getConnectionTimeout());
}
builder.withConnectionTimeout(connTimeout, TimeUnit.MILLISECONDS);
return builder;
}

@Override
public synchronized void close() {
for (Map.Entry<String, SolrClient> entry : solrClients.entrySet()) {
try {
Expand Down
Loading

0 comments on commit f6ef54a

Please sign in to comment.