diff --git a/server/src/main/java/org/elasticsearch/discovery/UnicastConfiguredHostsResolver.java b/server/src/main/java/org/elasticsearch/discovery/UnicastConfiguredHostsResolver.java new file mode 100644 index 0000000000000..cc0ae594f9546 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/discovery/UnicastConfiguredHostsResolver.java @@ -0,0 +1,118 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.discovery; + +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.discovery.PeerFinder.ConfiguredHostsResolver; +import org.elasticsearch.discovery.zen.UnicastHostsProvider; +import org.elasticsearch.discovery.zen.UnicastZenPing; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +import static org.elasticsearch.discovery.zen.UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING; + +public class UnicastConfiguredHostsResolver extends AbstractLifecycleComponent implements ConfiguredHostsResolver { + + private final AtomicBoolean resolveInProgress = new AtomicBoolean(); + private final TransportService transportService; + private final UnicastHostsProvider hostsProvider; + private final SetOnce executorService = new SetOnce<>(); + private final TimeValue resolveTimeout; + + public UnicastConfiguredHostsResolver(Settings settings, TransportService transportService, UnicastHostsProvider hostsProvider) { + super(settings); + this.transportService = transportService; + this.hostsProvider = hostsProvider; + resolveTimeout = UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.get(settings); + } + + @Override + protected void doStart() { + final int concurrentConnects = DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings); + logger.debug("using concurrent_connects [{}], resolve_timeout [{}]", concurrentConnects, resolveTimeout); + final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_configured_hosts_resolver]"); + executorService.set(EsExecutors.newScaling(nodeName() + "/" + "unicast_configured_hosts_resolver", + 0, concurrentConnects, 60, TimeUnit.SECONDS, threadFactory, transportService.getThreadPool().getThreadContext())); + } + + @Override + protected void doStop() { + ThreadPool.terminate(executorService.get(), 10, TimeUnit.SECONDS); + } + + @Override + protected void doClose() { + } + + @Override + public void resolveConfiguredHosts(Consumer> consumer) { + if (lifecycle.started() == false) { + logger.debug("resolveConfiguredHosts: lifecycle is {}, not proceeding", lifecycle); + return; + } + + if (resolveInProgress.compareAndSet(false, true)) { + transportService.getThreadPool().generic().execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + logger.debug("failure when resolving unicast hosts list", e); + } + + @Override + protected void doRun() { + if (lifecycle.started() == false) { + logger.debug("resolveConfiguredHosts.doRun: lifecycle is {}, not proceeding", lifecycle); + return; + } + + List providedAddresses + = hostsProvider.buildDynamicHosts((hosts, limitPortCounts) + -> UnicastZenPing.resolveHostsLists(executorService.get(), logger, hosts, limitPortCounts, + transportService, resolveTimeout)); + + consumer.accept(providedAddresses); + } + + @Override + public void onAfter() { + resolveInProgress.set(false); + } + + @Override + public String toString() { + return "UnicastConfiguredHostsResolver resolving unicast hosts list"; + } + }); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/discovery/UnicastConfiguredHostsResolverTests.java b/server/src/test/java/org/elasticsearch/discovery/UnicastConfiguredHostsResolverTests.java new file mode 100644 index 0000000000000..73b02ed292fe7 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/discovery/UnicastConfiguredHostsResolverTests.java @@ -0,0 +1,96 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.discovery; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.junit.After; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.core.IsNull.nullValue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class UnicastConfiguredHostsResolverTests extends ESTestCase { + + private List transportAddresses; + private UnicastConfiguredHostsResolver unicastConfiguredHostsResolver; + private ThreadPool threadPool; + + @Before + public void startResolver() { + threadPool = new TestThreadPool("node"); + transportAddresses = new ArrayList<>(); + + TransportService transportService = mock(TransportService.class); + when(transportService.getThreadPool()).thenReturn(threadPool); + + unicastConfiguredHostsResolver + = new UnicastConfiguredHostsResolver(Settings.EMPTY, transportService, hostsResolver -> transportAddresses); + unicastConfiguredHostsResolver.start(); + } + + @After + public void stopResolver() { + unicastConfiguredHostsResolver.stop(); + threadPool.shutdown(); + } + + public void testResolvesAddressesInBackgroundAndIgnoresConcurrentCalls() throws Exception { + final AtomicReference> resolvedAddressesRef = new AtomicReference<>(); + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch endLatch = new CountDownLatch(1); + + final int addressCount = randomIntBetween(0, 5); + for (int i = 0; i < addressCount; i++) { + transportAddresses.add(buildNewFakeTransportAddress()); + } + + unicastConfiguredHostsResolver.resolveConfiguredHosts(resolvedAddresses -> { + try { + assertTrue(startLatch.await(30, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + resolvedAddressesRef.set(resolvedAddresses); + endLatch.countDown(); + }); + + unicastConfiguredHostsResolver.resolveConfiguredHosts(resolvedAddresses -> { + throw new AssertionError("unexpected concurrent resolution"); + }); + + assertThat(resolvedAddressesRef.get(), nullValue()); + startLatch.countDown(); + assertTrue(endLatch.await(30, TimeUnit.SECONDS)); + assertThat(resolvedAddressesRef.get(), equalTo(transportAddresses)); + } +}