Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.discovery;

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.discovery.PeerFinder.ConfiguredHostsResolver;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.discovery.zen.UnicastZenPing;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import static org.elasticsearch.discovery.zen.UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING;

public class UnicastConfiguredHostsResolver extends AbstractLifecycleComponent implements ConfiguredHostsResolver {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should call this PeerFinderHostsResolver. This class, with its quite unusual interface (i.e. calling the callback only sometimes) is very much tailored towards use in PeerFinder whereas the name here suggests it is more than that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed this and will leave it as-is for now, but will contemplate folding some of the interfaces with a single production implementation into methods on PeerFinder at a later date.


private final AtomicBoolean resolveInProgress = new AtomicBoolean();
private final TransportService transportService;
private final UnicastHostsProvider hostsProvider;
private final SetOnce<ExecutorService> executorService = new SetOnce<>();
private final TimeValue resolveTimeout;

public UnicastConfiguredHostsResolver(Settings settings, TransportService transportService, UnicastHostsProvider hostsProvider) {
super(settings);
this.transportService = transportService;
this.hostsProvider = hostsProvider;
resolveTimeout = UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.get(settings);
}

@Override
protected void doStart() {
final int concurrentConnects = DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings);
logger.debug("using concurrent_connects [{}], resolve_timeout [{}]", concurrentConnects, resolveTimeout);
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_configured_hosts_resolver]");
executorService.set(EsExecutors.newScaling(nodeName() + "/" + "unicast_configured_hosts_resolver",
0, concurrentConnects, 60, TimeUnit.SECONDS, threadFactory, transportService.getThreadPool().getThreadContext()));
}

@Override
protected void doStop() {
ThreadPool.terminate(executorService.get(), 10, TimeUnit.SECONDS);
}

@Override
protected void doClose() {
}

@Override
public void resolveConfiguredHosts(Consumer<List<TransportAddress>> consumer) {
if (lifecycle.started() == false) {
logger.debug("resolveConfiguredHosts: lifecycle is {}, not proceeding", lifecycle);
return;
}

if (resolveInProgress.compareAndSet(false, true)) {
transportService.getThreadPool().generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.debug("failure when resolving unicast hosts list", e);
}

@Override
protected void doRun() {
if (lifecycle.started() == false) {
logger.debug("resolveConfiguredHosts.doRun: lifecycle is {}, not proceeding", lifecycle);
return;
}

List<TransportAddress> providedAddresses
= hostsProvider.buildDynamicHosts((hosts, limitPortCounts)
-> UnicastZenPing.resolveHostsLists(executorService.get(), logger, hosts, limitPortCounts,
transportService, resolveTimeout));

consumer.accept(providedAddresses);
}

@Override
public void onAfter() {
resolveInProgress.set(false);
}

@Override
public String toString() {
return "UnicastConfiguredHostsResolver resolving unicast hosts list";
}
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.discovery;

import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.junit.After;
import org.junit.Before;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.core.IsNull.nullValue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class UnicastConfiguredHostsResolverTests extends ESTestCase {

private List<TransportAddress> transportAddresses;
private UnicastConfiguredHostsResolver unicastConfiguredHostsResolver;
private ThreadPool threadPool;

@Before
public void startResolver() {
threadPool = new TestThreadPool("node");
transportAddresses = new ArrayList<>();

TransportService transportService = mock(TransportService.class);
when(transportService.getThreadPool()).thenReturn(threadPool);

unicastConfiguredHostsResolver
= new UnicastConfiguredHostsResolver(Settings.EMPTY, transportService, hostsResolver -> transportAddresses);
unicastConfiguredHostsResolver.start();
}

@After
public void stopResolver() {
unicastConfiguredHostsResolver.stop();
threadPool.shutdown();
}

public void testResolvesAddressesInBackgroundAndIgnoresConcurrentCalls() throws Exception {
final AtomicReference<List<TransportAddress>> resolvedAddressesRef = new AtomicReference<>();
final CountDownLatch startLatch = new CountDownLatch(1);
final CountDownLatch endLatch = new CountDownLatch(1);

final int addressCount = randomIntBetween(0, 5);
for (int i = 0; i < addressCount; i++) {
transportAddresses.add(buildNewFakeTransportAddress());
}

unicastConfiguredHostsResolver.resolveConfiguredHosts(resolvedAddresses -> {
try {
assertTrue(startLatch.await(30, TimeUnit.SECONDS));
} catch (InterruptedException e) {
throw new AssertionError(e);
}
resolvedAddressesRef.set(resolvedAddresses);
endLatch.countDown();
});

unicastConfiguredHostsResolver.resolveConfiguredHosts(resolvedAddresses -> {
throw new AssertionError("unexpected concurrent resolution");
});

assertThat(resolvedAddressesRef.get(), nullValue());
startLatch.countDown();
assertTrue(endLatch.await(30, TimeUnit.SECONDS));
assertThat(resolvedAddressesRef.get(), equalTo(transportAddresses));
}
}