Skip to content

Commit

Permalink
Add Remote Reindex SPI extension (#547)
Browse files Browse the repository at this point in the history
This change extends the remote reindex SPI to allow adding a custom interceptor.
This interceptor can be plugged in to perform any processing on the request or response.

Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
  • Loading branch information
soosinha authored May 6, 2021
1 parent 2cf40be commit 181ee8a
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.index.reindex;

import java.util.Optional;
import org.apache.http.HttpRequestInterceptor;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.ActionListener;
Expand Down Expand Up @@ -112,6 +114,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
protected final ThreadPool threadPool;
protected final ScriptService scriptService;
protected final ReindexSslConfig sslConfig;
protected Optional<HttpRequestInterceptor> interceptor;

/**
* The request for this action. Named mainRequest because we create lots of <code>request</code> variables all representing child
Expand Down Expand Up @@ -152,6 +155,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
this.threadPool = threadPool;
this.mainRequest = mainRequest;
this.listener = listener;
this.interceptor = Optional.empty();
BackoffPolicy backoffPolicy = buildBackoffPolicy();
bulkRetry = new Retry(BackoffPolicy.wrap(backoffPolicy, worker::countBulkRetry), threadPool);
scrollSource = buildScrollableResultSource(backoffPolicy);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@

package org.opensearch.index.reindex;

import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.index.reindex.spi.RemoteReindexExtension;
import org.opensearch.plugins.ExtensiblePlugin;
import org.opensearch.plugins.ExtensiblePlugin.ExtensionLoader;
import org.opensearch.watcher.ResourceWatcherService;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionResponse;
Expand Down Expand Up @@ -67,8 +73,9 @@

import static java.util.Collections.singletonList;

public class ReindexPlugin extends Plugin implements ActionPlugin {
public class ReindexPlugin extends Plugin implements ActionPlugin, ExtensiblePlugin {
public static final String NAME = "reindex";
private static final Logger logger = LogManager.getLogger(ReindexPlugin.class);

@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
Expand Down Expand Up @@ -112,4 +119,22 @@ public List<Setting<?>> getSettings() {
settings.addAll(ReindexSslConfig.getSettings());
return settings;
}

@Override
public void loadExtensions(ExtensionLoader loader) {
logger.info("ReindexPlugin reloadSPI called");
Iterable<RemoteReindexExtension> iterable = loader.loadExtensions(RemoteReindexExtension.class);
List<RemoteReindexExtension> remoteReindexExtensionList = new ArrayList<>();
iterable.forEach(remoteReindexExtensionList::add);
if (remoteReindexExtensionList.isEmpty()) {
logger.info("Unable to find any implementation for RemoteReindexExtension");
} else {
if (remoteReindexExtensionList.size() > 1) {
logger.warn("More than one implementation found: " + remoteReindexExtensionList);
}
// We shouldn't have more than one extension. Incase there is, we simply pick the first one.
TransportReindexAction.remoteExtension = Optional.ofNullable(remoteReindexExtensionList.get(0));
logger.info("Loaded extension " + TransportReindexAction.remoteExtension);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@

package org.opensearch.index.reindex;

import java.util.Optional;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
Expand Down Expand Up @@ -63,6 +65,7 @@
import org.opensearch.index.VersionType;
import org.opensearch.index.mapper.VersionFieldMapper;
import org.opensearch.index.reindex.remote.RemoteScrollableHitSource;
import org.opensearch.index.reindex.spi.RemoteReindexExtension;
import org.opensearch.script.Script;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -91,40 +94,76 @@ public class Reindexer {
private final ThreadPool threadPool;
private final ScriptService scriptService;
private final ReindexSslConfig reindexSslConfig;
private final Optional<RemoteReindexExtension> remoteExtension;

Reindexer(ClusterService clusterService, Client client, ThreadPool threadPool, ScriptService scriptService,
ReindexSslConfig reindexSslConfig) {
this(clusterService, client, threadPool, scriptService, reindexSslConfig, Optional.empty());
}

Reindexer(ClusterService clusterService, Client client, ThreadPool threadPool, ScriptService scriptService,
ReindexSslConfig reindexSslConfig, Optional<RemoteReindexExtension> remoteExtension) {
this.clusterService = clusterService;
this.client = client;
this.threadPool = threadPool;
this.scriptService = scriptService;
this.reindexSslConfig = reindexSslConfig;
this.remoteExtension = remoteExtension;
}

public void initTask(BulkByScrollTask task, ReindexRequest request, ActionListener<Void> listener) {
BulkByScrollParallelizationHelper.initTaskState(task, request, client, listener);
}

public void execute(BulkByScrollTask task, ReindexRequest request, ActionListener<BulkByScrollResponse> listener) {
ActionListener<BulkByScrollResponse> remoteReindexActionListener = getRemoteReindexWrapperListener(listener, request);
BulkByScrollParallelizationHelper.executeSlicedAction(task, request, ReindexAction.INSTANCE, listener, client,
clusterService.localNode(),
() -> {
ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(), task);
AsyncIndexBySearchAction searchAction = new AsyncIndexBySearchAction(task, logger, assigningClient, threadPool,
scriptService, reindexSslConfig, request, listener);
scriptService, reindexSslConfig, request, remoteReindexActionListener, getInterceptor(request));
searchAction.start();
});

}

private Optional<HttpRequestInterceptor> getInterceptor(ReindexRequest request) {
if (request.getRemoteInfo() == null) {
return Optional.empty();
} else {
return remoteExtension.map(x -> x.getInterceptorProvider()).flatMap(provider ->
provider.getRestInterceptor(request, threadPool.getThreadContext()));
}
}

private ActionListener<BulkByScrollResponse> getRemoteReindexWrapperListener(
ActionListener<BulkByScrollResponse> listener, ReindexRequest reindexRequest) {
if (reindexRequest.getRemoteInfo() == null) {
return listener;
}
if (remoteExtension.isPresent()) {
return remoteExtension.get().getRemoteReindexActionListener(listener, reindexRequest);
}
logger.info("No extension found for remote reindex listener");
return listener;
}

static RestClient buildRestClient(RemoteInfo remoteInfo, ReindexSslConfig sslConfig, long taskId, List<Thread> threadCollector) {
return buildRestClient(remoteInfo, sslConfig, taskId, threadCollector, Optional.empty());
}

/**
* Build the {@link RestClient} used for reindexing from remote clusters.
*
* @param remoteInfo connection information for the remote cluster
* @param sslConfig configuration for potential outgoing HTTPS connections
* @param taskId the id of the current task. This is added to the thread name for easier tracking
* @param threadCollector a list in which we collect all the threads created by the client
* @param restInterceptor an optional HttpRequestInterceptor
*/
static RestClient buildRestClient(RemoteInfo remoteInfo, ReindexSslConfig sslConfig, long taskId, List<Thread> threadCollector) {
static RestClient buildRestClient(RemoteInfo remoteInfo, ReindexSslConfig sslConfig, long taskId, List<Thread> threadCollector,
Optional<HttpRequestInterceptor> restInterceptor) {
Header[] clientHeaders = new Header[remoteInfo.getHeaders().size()];
int i = 0;
for (Map.Entry<String, String> header : remoteInfo.getHeaders().entrySet()) {
Expand All @@ -146,6 +185,8 @@ static RestClient buildRestClient(RemoteInfo remoteInfo, ReindexSslConfig sslCon
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, creds);
c.setDefaultCredentialsProvider(credentialsProvider);
} else {
restInterceptor.ifPresent(interceptor -> c.addInterceptorLast(interceptor));
}
// Stick the task id in the thread name so we can track down tasks from stack traces
AtomicInteger threads = new AtomicInteger();
Expand Down Expand Up @@ -185,13 +226,20 @@ static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<Re
AsyncIndexBySearchAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
ThreadPool threadPool, ScriptService scriptService, ReindexSslConfig sslConfig, ReindexRequest request,
ActionListener<BulkByScrollResponse> listener) {
this(task, logger, client, threadPool, scriptService, sslConfig, request, listener, Optional.empty());
}

AsyncIndexBySearchAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
ThreadPool threadPool, ScriptService scriptService, ReindexSslConfig sslConfig, ReindexRequest request,
ActionListener<BulkByScrollResponse> listener, Optional<HttpRequestInterceptor> interceptor) {
super(task,
/*
* We only need the source version if we're going to use it when write and we only do that when the destination request uses
* external versioning.
*/
request.getDestination().versionType() != VersionType.INTERNAL,
false, logger, client, threadPool, request, listener, scriptService, sslConfig);
this.interceptor = interceptor;
}

@Override
Expand All @@ -200,7 +248,8 @@ protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffP
RemoteInfo remoteInfo = mainRequest.getRemoteInfo();
createdThreads = synchronizedList(new ArrayList<>());
assert sslConfig != null : "Reindex ssl config must be set";
RestClient restClient = buildRestClient(remoteInfo, sslConfig, task.getId(), createdThreads);
RestClient restClient = buildRestClient(remoteInfo, sslConfig, task.getId(), createdThreads,
this.interceptor);
return new RemoteScrollableHitSource(logger, backoffPolicy, threadPool, worker::countSearchRetry,
this::onScrollResponse, this::finishHim,
restClient, remoteInfo.getQuery(), mainRequest.getSearchRequest());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.index.reindex;

import java.util.Optional;
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.AutoCreateIndex;
Expand All @@ -43,6 +44,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.reindex.spi.RemoteReindexExtension;
import org.opensearch.script.ScriptService;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -56,6 +58,7 @@
public class TransportReindexAction extends HandledTransportAction<ReindexRequest, BulkByScrollResponse> {
public static final Setting<List<String>> REMOTE_CLUSTER_WHITELIST =
Setting.listSetting("reindex.remote.whitelist", emptyList(), Function.identity(), Property.NodeScope);
public static Optional<RemoteReindexExtension> remoteExtension = Optional.empty();

private final ReindexValidator reindexValidator;
private final Reindexer reindexer;
Expand All @@ -66,7 +69,7 @@ public TransportReindexAction(Settings settings, ThreadPool threadPool, ActionFi
AutoCreateIndex autoCreateIndex, Client client, TransportService transportService, ReindexSslConfig sslConfig) {
super(ReindexAction.NAME, transportService, actionFilters, ReindexRequest::new);
this.reindexValidator = new ReindexValidator(settings, clusterService, indexNameExpressionResolver, autoCreateIndex);
this.reindexer = new Reindexer(clusterService, client, threadPool, scriptService, sslConfig);
this.reindexer = new Reindexer(clusterService, client, threadPool, scriptService, sslConfig, remoteExtension);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright OpenSearch Contributors.
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.index.reindex.spi;

import java.util.Optional;
import org.apache.http.HttpRequestInterceptor;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.index.reindex.ReindexRequest;

public interface ReindexRestInterceptorProvider {
/**
* @param request Reindex request.
* @param threadContext Current thread context.
* @return HttpRequestInterceptor object.
*/
Optional<HttpRequestInterceptor> getRestInterceptor(ReindexRequest request, ThreadContext threadContext);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright OpenSearch Contributors.
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.index.reindex.spi;

import org.opensearch.action.ActionListener;
import org.opensearch.index.reindex.BulkByScrollResponse;
import org.opensearch.index.reindex.ReindexRequest;

/**
* This interface provides an extension point for {@link org.opensearch.index.reindex.ReindexPlugin}.
* This interface can be implemented to provide a custom Rest interceptor and {@link ActionListener}
* The Rest interceptor can be used to pre-process any reindex request and perform any action
* on the response. The ActionListener listens to the success and failure events on every reindex request
* and can be used to take any actions based on the success or failure.
*/
public interface RemoteReindexExtension {
/**
* Get an InterceptorProvider.
* @return ReindexRestInterceptorProvider implementation.
*/
ReindexRestInterceptorProvider getInterceptorProvider();

/**
* Get a wrapper of ActionListener which is can used to perform any action based on
* the success/failure of the remote reindex call.
* @return ActionListener wrapper implementation.
*/
ActionListener<BulkByScrollResponse> getRemoteReindexActionListener(ActionListener<BulkByScrollResponse> listener,
ReindexRequest reindexRequest);
}

0 comments on commit 181ee8a

Please sign in to comment.