diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/RemoteStoreService.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/RemoteStoreService.java index fc7f57461bb88..4254b3efd79ed 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/RemoteStoreService.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/RemoteStoreService.java @@ -10,21 +10,25 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.RepositoriesMetadata; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.settings.Setting; -import org.opensearch.core.action.ActionListener; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; +import org.opensearch.repositories.RepositoryVerificationException; +import org.opensearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Locale; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; /** @@ -34,6 +38,7 @@ public class RemoteStoreService { private static final Logger logger = LogManager.getLogger(RemoteStoreService.class); private final Supplier repositoriesService; + private final ThreadPool threadPool; public static final Setting REMOTE_STORE_COMPATIBILITY_MODE_SETTING = Setting.simpleString( "remote_store.compatibility_mode", CompatibilityMode.ALLOW_ONLY_REMOTE_STORE_NODES.value, @@ -44,8 +49,7 @@ public class RemoteStoreService { public enum CompatibilityMode { ALLOW_ONLY_REMOTE_STORE_NODES("allow_only_remote_store_nodes"), - ALLOW_ALL_NODES("allow_all_nodes"), - MIGRATING_TO_HOT("migrating_to_hot"); + ALLOW_ALL_NODES("allow_all_nodes"); public static CompatibilityMode validate(String compatibilityMode) { try { @@ -69,35 +73,35 @@ public static CompatibilityMode validate(String compatibilityMode) { } } - public RemoteStoreService(Supplier repositoriesService) { + public RemoteStoreService(Supplier repositoriesService, ThreadPool threadPool) { this.repositoriesService = repositoriesService; + this.threadPool = threadPool; } - public void verifyRepository(List repositories) { - ActionListener listener = new ActionListener<>() { - - @Override - public void onResponse(VerifyRepositoryResponse verifyRepositoryResponse) { - logger.info("Successfully verified repository : " + verifyRepositoryResponse.toString()); - } + public void verifyRepository(List repositories, DiscoveryNode localNode) { + for (Repository repository : repositories) { + String verificationToken = repository.startVerification(); + ExecutorService executor = threadPool.executor(ThreadPool.Names.GENERIC); + executor.execute(() -> { + try { + repository.verify(verificationToken, localNode); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("[{}] failed to verify repository", repository), e); + throw new RepositoryVerificationException(repository.getMetadata().name(), e.getMessage()); + } + }); - @Override - public void onFailure(Exception e) { - throw new IllegalStateException("Failed to finish remote store repository verification" + e.getMessage()); + // TODO: See if using listener here which is async makes sense, made this sync as + // we need the repository registration for remote store node to be completed before the bootstrap + // completes. + try { + if(executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { + throw new RepositoryVerificationException(repository.getMetadata().name(), "could not complete " + + "repository verification within timeout."); + } + } catch (InterruptedException e) { + throw new RepositoryVerificationException(repository.getMetadata().name(), e.getMessage()); } - }; - - for (Repository repository : repositories) { - repositoriesService.get() - .verifyRepository( - repository.getMetadata().name(), - ActionListener.delegateFailure( - listener, - (delegatedListener, verifyResponse) -> delegatedListener.onResponse( - new VerifyRepositoryResponse(verifyResponse.toArray(new DiscoveryNode[0])) - ) - ) - ); } } diff --git a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java index 57ceeb1f61720..0fb8a54209a1b 100644 --- a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java +++ b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java @@ -305,7 +305,7 @@ public static DiscoveryNode createLocal( ); RemoteStoreNode remoteStoreNode = new RemoteStoreNode(discoveryNode); List repositories = remoteStoreService.createRepositories(remoteStoreNode); - remoteStoreService.verifyRepository(repositories); + remoteStoreService.verifyRepository(repositories, discoveryNode); return discoveryNode; } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index dd51f6437c383..600e238c6d900 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -519,14 +519,14 @@ protected Node( ); resourcesToClose.add(nodeEnvironment); - final SetOnce repositoriesServiceReference = new SetOnce<>(); - final RemoteStoreService remoteStoreService = new RemoteStoreService(repositoriesServiceReference::get); - localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId(), remoteStoreService); - final List> executorBuilders = pluginsService.getExecutorBuilders(settings); runnableTaskListener = new AtomicReference<>(); final ThreadPool threadPool = new ThreadPool(settings, runnableTaskListener, executorBuilders.toArray(new ExecutorBuilder[0])); + + final SetOnce repositoriesServiceReference = new SetOnce<>(); + final RemoteStoreService remoteStoreService = new RemoteStoreService(repositoriesServiceReference::get, threadPool); + localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId(), remoteStoreService); resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS)); final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool); resourcesToClose.add(resourceWatcherService); @@ -1730,11 +1730,11 @@ public DiscoveryNode apply(BoundTransportAddress boundTransportAddress) { .keySet() .stream() .anyMatch(key -> key.startsWith(RemoteStoreNode.REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX))) { - localNode.set(DiscoveryNode.createLocal(settings, boundTransportAddress.publishAddress(), persistentNodeId)); - } else { localNode.set( DiscoveryNode.createLocal(settings, boundTransportAddress.publishAddress(), persistentNodeId, remoteStoreService) ); + } else { + localNode.set(DiscoveryNode.createLocal(settings, boundTransportAddress.publishAddress(), persistentNodeId)); } return localNode.get(); } diff --git a/server/src/test/java/org/opensearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/opensearch/indices/cluster/ClusterStateChanges.java index a928c3b055282..06b604152bf11 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/opensearch/indices/cluster/ClusterStateChanges.java @@ -376,7 +376,7 @@ public IndexMetadata upgradeIndexMetadata(IndexMetadata indexMetadata, Version m threadPool ); - remoteStoreService = new RemoteStoreService(new SetOnce<>(repositoriesService)::get); + remoteStoreService = new RemoteStoreService(new SetOnce<>(repositoriesService)::get, threadPool); nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger); joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger, (s, p, r) -> {}, remoteStoreService);