Skip to content

Commit

Permalink
Support for remote path in reindex api (#31290)
Browse files Browse the repository at this point in the history
Support for remote path in reindex api
Closes #22913
  • Loading branch information
vladimirdolzhenko authored Jun 15, 2018
1 parent a705e1a commit dbc9d60
Show file tree
Hide file tree
Showing 13 changed files with 136 additions and 73 deletions.
10 changes: 5 additions & 5 deletions docs/reference/docs/reindex.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -422,11 +422,11 @@ POST _reindex
// TEST[s/"username": "user",//]
// TEST[s/"password": "pass"//]

The `host` parameter must contain a scheme, host, and port (e.g.
`https://otherhost:9200`). The `username` and `password` parameters are
optional, and when they are present `_reindex` will connect to the remote
Elasticsearch node using basic auth. Be sure to use `https` when using
basic auth or the password will be sent in plain text.
The `host` parameter must contain a scheme, host, port (e.g.
`https://otherhost:9200`) and optional path (e.g. `https://otherhost:9200/proxy`).
The `username` and `password` parameters are optional, and when they are present `_reindex`
will connect to the remote Elasticsearch node using basic auth. Be sure to use `https` when
using basic auth or the password will be sent in plain text.

Remote hosts have to be explicitly whitelisted in elasticsearch.yaml using the
`reindex.remote.whitelist` property. It can be set to a comma delimited list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
*/
public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexRequest, ReindexAction> {
static final ObjectParser<ReindexRequest, Void> PARSER = new ObjectParser<>("reindex");
private static final Pattern HOST_PATTERN = Pattern.compile("(?<scheme>[^:]+)://(?<host>[^:]+):(?<port>\\d+)");
private static final Pattern HOST_PATTERN = Pattern.compile("(?<scheme>[^:]+)://(?<host>[^:]+):(?<port>\\d+)(?<pathPrefix>/.*)?");

static {
ObjectParser.Parser<ReindexRequest, Void> sourceParser = (parser, request, context) -> {
Expand Down Expand Up @@ -139,10 +139,12 @@ static RemoteInfo buildRemoteInfo(Map<String, Object> source) throws IOException
String hostInRequest = requireNonNull(extractString(remote, "host"), "[host] must be specified to reindex from a remote cluster");
Matcher hostMatcher = HOST_PATTERN.matcher(hostInRequest);
if (false == hostMatcher.matches()) {
throw new IllegalArgumentException("[host] must be of the form [scheme]://[host]:[port] but was [" + hostInRequest + "]");
throw new IllegalArgumentException("[host] must be of the form [scheme]://[host]:[port](/[pathPrefix])? but was ["
+ hostInRequest + "]");
}
String scheme = hostMatcher.group("scheme");
String host = hostMatcher.group("host");
String pathPrefix = hostMatcher.group("pathPrefix");
int port = Integer.parseInt(hostMatcher.group("port"));
Map<String, String> headers = extractStringStringMap(remote, "headers");
TimeValue socketTimeout = extractTimeValue(remote, "socket_timeout", RemoteInfo.DEFAULT_SOCKET_TIMEOUT);
Expand All @@ -151,7 +153,8 @@ static RemoteInfo buildRemoteInfo(Map<String, Object> source) throws IOException
throw new IllegalArgumentException(
"Unsupported fields in [remote]: [" + Strings.collectionToCommaDelimitedString(remote.keySet()) + "]");
}
return new RemoteInfo(scheme, host, port, queryForRemote(source), username, password, headers, socketTimeout, connectTimeout);
return new RemoteInfo(scheme, host, port, pathPrefix, queryForRemote(source),
username, password, headers, socketTimeout, connectTimeout);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
Expand Down Expand Up @@ -206,34 +207,39 @@ static RestClient buildRestClient(RemoteInfo remoteInfo, long taskId, List<Threa
for (Map.Entry<String, String> header : remoteInfo.getHeaders().entrySet()) {
clientHeaders[i++] = new BasicHeader(header.getKey(), header.getValue());
}
return RestClient.builder(new HttpHost(remoteInfo.getHost(), remoteInfo.getPort(), remoteInfo.getScheme()))
.setDefaultHeaders(clientHeaders)
.setRequestConfigCallback(c -> {
c.setConnectTimeout(Math.toIntExact(remoteInfo.getConnectTimeout().millis()));
c.setSocketTimeout(Math.toIntExact(remoteInfo.getSocketTimeout().millis()));
return c;
})
.setHttpClientConfigCallback(c -> {
// Enable basic auth if it is configured
if (remoteInfo.getUsername() != null) {
UsernamePasswordCredentials creds = new UsernamePasswordCredentials(remoteInfo.getUsername(),
remoteInfo.getPassword());
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, creds);
c.setDefaultCredentialsProvider(credentialsProvider);
}
// Stick the task id in the thread name so we can track down tasks from stack traces
AtomicInteger threads = new AtomicInteger();
c.setThreadFactory(r -> {
String name = "es-client-" + taskId + "-" + threads.getAndIncrement();
Thread t = new Thread(r, name);
threadCollector.add(t);
return t;
});
// Limit ourselves to one reactor thread because for now the search process is single threaded.
c.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build());
return c;
}).build();
final RestClientBuilder builder =
RestClient.builder(new HttpHost(remoteInfo.getHost(), remoteInfo.getPort(), remoteInfo.getScheme()))
.setDefaultHeaders(clientHeaders)
.setRequestConfigCallback(c -> {
c.setConnectTimeout(Math.toIntExact(remoteInfo.getConnectTimeout().millis()));
c.setSocketTimeout(Math.toIntExact(remoteInfo.getSocketTimeout().millis()));
return c;
})
.setHttpClientConfigCallback(c -> {
// Enable basic auth if it is configured
if (remoteInfo.getUsername() != null) {
UsernamePasswordCredentials creds = new UsernamePasswordCredentials(remoteInfo.getUsername(),
remoteInfo.getPassword());
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, creds);
c.setDefaultCredentialsProvider(credentialsProvider);
}
// Stick the task id in the thread name so we can track down tasks from stack traces
AtomicInteger threads = new AtomicInteger();
c.setThreadFactory(r -> {
String name = "es-client-" + taskId + "-" + threads.getAndIncrement();
Thread t = new Thread(r, name);
threadCollector.add(t);
return t;
});
// Limit ourselves to one reactor thread because for now the search process is single threaded.
c.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build());
return c;
});
if (Strings.hasLength(remoteInfo.getPathPrefix()) && "/".equals(remoteInfo.getPathPrefix()) == false) {
builder.setPathPrefix(remoteInfo.getPathPrefix());
}
return builder.build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,22 @@

public class ReindexFromRemoteBuildRestClientTests extends RestClientBuilderTestCase {
public void testBuildRestClient() throws Exception {
RemoteInfo remoteInfo = new RemoteInfo("https", "localhost", 9200, new BytesArray("ignored"), null, null, emptyMap(),
for(final String path: new String[]{"", null, "/", "path"}) {
RemoteInfo remoteInfo = new RemoteInfo("https", "localhost", 9200, path, new BytesArray("ignored"), null, null, emptyMap(),
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
long taskId = randomLong();
List<Thread> threads = synchronizedList(new ArrayList<>());
RestClient client = TransportReindexAction.buildRestClient(remoteInfo, taskId, threads);
try {
assertBusy(() -> assertThat(threads, hasSize(2)));
int i = 0;
for (Thread thread : threads) {
assertEquals("es-client-" + taskId + "-" + i, thread.getName());
i++;
long taskId = randomLong();
List<Thread> threads = synchronizedList(new ArrayList<>());
RestClient client = TransportReindexAction.buildRestClient(remoteInfo, taskId, threads);
try {
assertBusy(() -> assertThat(threads, hasSize(2)));
int i = 0;
for (Thread thread : threads) {
assertEquals("es-client-" + taskId + "-" + i, thread.getName());
i++;
}
} finally {
client.close();
}
} finally {
client.close();
}
}

Expand All @@ -57,7 +59,7 @@ public void testHeaders() throws Exception {
for (int i = 0; i < numHeaders; ++i) {
headers.put("header" + i, Integer.toString(i));
}
RemoteInfo remoteInfo = new RemoteInfo("https", "localhost", 9200, new BytesArray("ignored"), null, null,
RemoteInfo remoteInfo = new RemoteInfo("https", "localhost", 9200, null, new BytesArray("ignored"), null, null,
headers, RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
long taskId = randomLong();
List<Thread> threads = synchronizedList(new ArrayList<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void testLocalRequestWithWhitelist() {
* Build a {@link RemoteInfo}, defaulting values that we don't care about in this test to values that don't hurt anything.
*/
private RemoteInfo newRemoteInfo(String host, int port) {
return new RemoteInfo(randomAlphaOfLength(5), host, port, new BytesArray("test"), null, null, emptyMap(),
return new RemoteInfo(randomAlphaOfLength(5), host, port, null, new BytesArray("test"), null, null, emptyMap(),
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
}

Expand All @@ -63,7 +63,7 @@ public void testWhitelistedRemote() {

public void testWhitelistedByPrefix() {
checkRemoteWhitelist(buildRemoteWhitelist(singletonList("*.example.com:9200")),
new RemoteInfo(randomAlphaOfLength(5), "es.example.com", 9200, new BytesArray("test"), null, null, emptyMap(),
new RemoteInfo(randomAlphaOfLength(5), "es.example.com", 9200, null, new BytesArray("test"), null, null, emptyMap(),
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT));
checkRemoteWhitelist(buildRemoteWhitelist(singletonList("*.example.com:9200")),
newRemoteInfo("6e134134a1.us-east-1.aws.example.com", 9200));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,9 @@ public void fetchTransportAddress() {
* Build a {@link RemoteInfo}, defaulting values that we don't care about in this test to values that don't hurt anything.
*/
private RemoteInfo newRemoteInfo(String username, String password, Map<String, String> headers) {
return new RemoteInfo("http", address.getAddress(), address.getPort(), new BytesArray("{\"match_all\":{}}"), username, password,
headers, RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
return new RemoteInfo("http", address.getAddress(), address.getPort(), null,
new BytesArray("{\"match_all\":{}}"), username, password, headers,
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
}

public void testReindexFromRemoteWithAuthentication() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ public void testTargetIsAlias() {

public void testRemoteInfoSkipsValidation() {
// The index doesn't have to exist
succeeds(new RemoteInfo(randomAlphaOfLength(5), "test", 9200, new BytesArray("test"), null, null, emptyMap(),
succeeds(new RemoteInfo(randomAlphaOfLength(5), "test", 9200, null, new BytesArray("test"), null, null, emptyMap(),
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT), "does_not_exist", "target");
// And it doesn't matter if they are the same index. They are considered to be different because the remote one is, well, remote.
succeeds(new RemoteInfo(randomAlphaOfLength(5), "test", 9200, new BytesArray("test"), null, null, emptyMap(),
succeeds(new RemoteInfo(randomAlphaOfLength(5), "test", 9200, null, new BytesArray("test"), null, null, emptyMap(),
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT), "target", "target");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,38 @@ public void testBuildRemoteInfoWithAllHostParts() throws IOException {
assertEquals("http", info.getScheme());
assertEquals("example.com", info.getHost());
assertEquals(9200, info.getPort());
assertNull(info.getPathPrefix());
assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout()); // Didn't set the timeout so we should get the default
assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout()); // Didn't set the timeout so we should get the default

info = buildRemoteInfoHostTestCase("https://other.example.com:9201");
assertEquals("https", info.getScheme());
assertEquals("other.example.com", info.getHost());
assertEquals(9201, info.getPort());
assertNull(info.getPathPrefix());
assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout());
assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout());

info = buildRemoteInfoHostTestCase("https://other.example.com:9201/");
assertEquals("https", info.getScheme());
assertEquals("other.example.com", info.getHost());
assertEquals(9201, info.getPort());
assertEquals("/", info.getPathPrefix());
assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout());
assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout());

info = buildRemoteInfoHostTestCase("https://other.example.com:9201/proxy-path/");
assertEquals("https", info.getScheme());
assertEquals("other.example.com", info.getHost());
assertEquals(9201, info.getPort());
assertEquals("/proxy-path/", info.getPathPrefix());
assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout());
assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout());

final IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
() -> buildRemoteInfoHostTestCase("https"));
assertEquals("[host] must be of the form [scheme]://[host]:[port](/[pathPrefix])? but was [https]",
exception.getMessage());
}

public void testReindexFromRemoteRequestParsing() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,10 @@ public void testReindexFromRemote() throws Exception {
assertNotNull(masterNode);

TransportAddress address = masterNode.getHttp().getAddress().publishAddress();
RemoteInfo remote = new RemoteInfo("http", address.getAddress(), address.getPort(), new BytesArray("{\"match_all\":{}}"), null,
null, emptyMap(), RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
RemoteInfo remote =
new RemoteInfo("http", address.getAddress(), address.getPort(), null,
new BytesArray("{\"match_all\":{}}"), null, null, emptyMap(),
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
ReindexRequestBuilder request = new ReindexRequestBuilder(client, ReindexAction.INSTANCE).source("source").destination("dest")
.setRemoteInfo(remote);
return request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ public void testReindexRequest() throws IOException {
}
TimeValue socketTimeout = parseTimeValue(randomPositiveTimeValue(), "socketTimeout");
TimeValue connectTimeout = parseTimeValue(randomPositiveTimeValue(), "connectTimeout");
reindex.setRemoteInfo(new RemoteInfo(randomAlphaOfLength(5), randomAlphaOfLength(5), port, query, username, password, headers,
socketTimeout, connectTimeout));
reindex.setRemoteInfo(
new RemoteInfo(randomAlphaOfLength(5), randomAlphaOfLength(5), port, null,
query, username, password, headers, socketTimeout, connectTimeout));
}
ReindexRequest tripped = new ReindexRequest();
roundTrip(reindex, tripped);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,21 @@
import static java.util.Collections.emptyMap;

public class RemoteInfoTests extends ESTestCase {
private RemoteInfo newRemoteInfo(String scheme, String username, String password) {
return new RemoteInfo(scheme, "testhost", 12344, new BytesArray("testquery"), username, password, emptyMap(),
private RemoteInfo newRemoteInfo(String scheme, String prefixPath, String username, String password) {
return new RemoteInfo(scheme, "testhost", 12344, prefixPath, new BytesArray("testquery"), username, password, emptyMap(),
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
}

public void testToString() {
assertEquals("host=testhost port=12344 query=testquery", newRemoteInfo("http", null, null).toString());
assertEquals("host=testhost port=12344 query=testquery username=testuser", newRemoteInfo("http", "testuser", null).toString());
assertEquals("host=testhost port=12344 query=testquery",
newRemoteInfo("http", null, null, null).toString());
assertEquals("host=testhost port=12344 query=testquery username=testuser",
newRemoteInfo("http", null, "testuser", null).toString());
assertEquals("host=testhost port=12344 query=testquery username=testuser password=<<>>",
newRemoteInfo("http", "testuser", "testpass").toString());
newRemoteInfo("http", null, "testuser", "testpass").toString());
assertEquals("scheme=https host=testhost port=12344 query=testquery username=testuser password=<<>>",
newRemoteInfo("https", "testuser", "testpass").toString());
newRemoteInfo("https", null, "testuser", "testpass").toString());
assertEquals("scheme=https host=testhost port=12344 pathPrefix=prxy query=testquery username=testuser password=<<>>",
newRemoteInfo("https", "prxy", "testuser", "testpass").toString());
}
}
Loading

0 comments on commit dbc9d60

Please sign in to comment.