Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reject port ranges in discovery.seed_hosts #41404

Merged
merged 9 commits into from
May 7, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,7 @@ public List<TransportAddress> getSeedAddresses(HostsResolver hostsResolver) {
}

try {
// we only limit to 1 port per address, makes no sense to ping 100 ports
TransportAddress[] addresses = transportService.addressesFromString(networkAddress, 1);
TransportAddress[] addresses = transportService.addressesFromString(networkAddress);
for (TransportAddress address : addresses) {
logger.trace("adding {}, transport_address {}", networkAddress, address);
dynamicHosts.add(address);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,7 @@ && disjoint(securityGroupIds, groups)) {
}
if (address != null) {
try {
// we only limit to 1 port per address, makes no sense to ping 100 ports
final TransportAddress[] addresses = transportService.addressesFromString(address, 1);
final TransportAddress[] addresses = transportService.addressesFromString(address);
for (int i = 0; i < addresses.length; i++) {
logger.trace("adding {}, address {}, transport_address {}", instance.getInstanceId(), address, addresses[i]);
dynamicHosts.add(addresses[i]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void createTransportService() {
new NetworkService(Collections.emptyList()), PageCacheRecycler.NON_RECYCLING_INSTANCE, namedWriteableRegistry,
new NoneCircuitBreakerService()) {
@Override
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
public TransportAddress[] addressesFromString(String address) throws UnknownHostException {
// we just need to ensure we don't resolve DNS here
return new TransportAddress[] {poorMansDNS.getOrDefault(address, buildNewFakeTransportAddress())};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,7 @@ public List<TransportAddress> getSeedAddresses(HostsResolver hostsResolver) {

// ip_private is a single IP Address. We need to build a TransportAddress from it
// If user has set `es_port` metadata, we don't need to ping all ports
// we only limit to 1 addresses, makes no sense to ping 100 ports
TransportAddress[] addresses = transportService.addressesFromString(address, 1);
TransportAddress[] addresses = transportService.addressesFromString(address);

for (TransportAddress transportAddress : addresses) {
logger.trace("adding {}, type {}, address {}, transport_address {}, status {}", name, type,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ private List<String> getHostsList() {

@Override
public List<TransportAddress> getSeedAddresses(HostsResolver hostsResolver) {
final List<TransportAddress> transportAddresses = hostsResolver.resolveHosts(getHostsList(), 1);
final List<TransportAddress> transportAddresses = hostsResolver.resolveHosts(getHostsList());
logger.debug("seed addresses: {}", transportAddresses);
return transportAddresses;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@ public interface SeedHostsProvider {

/**
* Helper object that allows to resolve a list of hosts to a list of transport addresses.
* Each host is resolved into a transport address (or a collection of addresses if the
* number of ports is greater than one)
* Each host is resolved into a transport address
*/
interface HostsResolver {
List<TransportAddress> resolveHosts(List<String> hosts, int limitPortCounts);
List<TransportAddress> resolveHosts(List<String> hosts);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,7 @@ public static TimeValue getResolveTimeout(Settings settings) {
}

@Override
public List<TransportAddress> resolveHosts(
final List<String> hosts,
final int limitPortCounts) {
public List<TransportAddress> resolveHosts(final List<String> hosts) {
Objects.requireNonNull(hosts);
if (resolveTimeout.nanos() < 0) {
throw new IllegalArgumentException("resolve timeout must be non-negative but was [" + resolveTimeout + "]");
Expand All @@ -98,7 +96,7 @@ public List<TransportAddress> resolveHosts(
final List<Callable<TransportAddress[]>> callables =
hosts
.stream()
.map(hn -> (Callable<TransportAddress[]>) () -> transportService.addressesFromString(hn, limitPortCounts))
.map(hn -> (Callable<TransportAddress[]>) () -> transportService.addressesFromString(hn))
.collect(Collectors.toList());
final List<Future<TransportAddress[]>> futures;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,29 +47,21 @@ public class SettingsBasedSeedHostsProvider implements SeedHostsProvider {
public static final Setting<List<String>> DISCOVERY_SEED_HOSTS_SETTING =
Setting.listSetting("discovery.seed_hosts", emptyList(), Function.identity(), Property.NodeScope);

// these limits are per-address
private static final int LIMIT_FOREIGN_PORTS_COUNT = 1;
private static final int LIMIT_LOCAL_PORTS_COUNT = 5;

private final List<String> configuredHosts;
private final int limitPortCounts;

public SettingsBasedSeedHostsProvider(Settings settings, TransportService transportService) {
if (DISCOVERY_SEED_HOSTS_SETTING.exists(settings)) {
configuredHosts = DISCOVERY_SEED_HOSTS_SETTING.get(settings);
// we only limit to 1 address, makes no sense to ping 100 ports
limitPortCounts = LIMIT_FOREIGN_PORTS_COUNT;
} else {
// if unicast hosts are not specified, fill with simple defaults on the local machine
configuredHosts = transportService.getLocalAddresses();
limitPortCounts = LIMIT_LOCAL_PORTS_COUNT;
configuredHosts = transportService.getDefaultSeedAddresses();
}

logger.debug("using initial hosts {}", configuredHosts);
}

@Override
public List<TransportAddress> getSeedAddresses(HostsResolver hostsResolver) {
return hostsResolver.resolveHosts(configuredHosts, limitPortCounts);
return hostsResolver.resolveHosts(configuredHosts);
}
}
51 changes: 33 additions & 18 deletions server/src/main/java/org/elasticsearch/transport/TcpTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.common.transport.NetworkExceptionHelper.isCloseConnectionException;
Expand All @@ -102,6 +103,9 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
private static final long NINETY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.9);
private static final BytesReference EMPTY_BYTES_REFERENCE = new BytesArray(new byte[0]);

// this limit is per-address
private static final int LIMIT_LOCAL_PORTS_COUNT = 6;

protected final Settings settings;
protected final ThreadPool threadPool;
protected final PageCacheRecycler pageCacheRecycler;
Expand Down Expand Up @@ -311,14 +315,20 @@ public Map<String, BoundTransportAddress> profileBoundAddresses() {
}

@Override
public List<String> getLocalAddresses() {
public List<String> getDefaultSeedAddresses() {
List<String> local = new ArrayList<>();
local.add("127.0.0.1");
// check if v6 is supported, if so, v4 will also work via mapped addresses.
if (NetworkUtils.SUPPORTS_V6) {
local.add("[::1]"); // may get ports appended!
}
return local;
return local.stream()
.flatMap(
address -> Arrays.stream(defaultPortRange())
.limit(LIMIT_LOCAL_PORTS_COUNT)
.mapToObj(port -> address + ":" + port)
)
.collect(Collectors.toList());
}

protected void bindServer(ProfileSettings profileSettings) {
Expand Down Expand Up @@ -456,8 +466,17 @@ static int resolvePublishPort(ProfileSettings profileSettings, List<InetSocketAd
}

@Override
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
return parse(address, settings.get("transport.profiles.default.port", TransportSettings.PORT.get(settings)), perAddressLimit);
public TransportAddress[] addressesFromString(String address) throws UnknownHostException {
return parse(address, defaultPortRange()[0]);
}

private int[] defaultPortRange() {
return new PortsRange(
settings.get(
TransportSettings.PORT_PROFILE.getConcreteSettingForNamespace(TransportSettings.DEFAULT_PROFILE).getKey(),
TransportSettings.PORT.get(settings)
)
).ports();
}

// this code is a take on guava's HostAndPort, like a HostAndPortRange
Expand All @@ -467,9 +486,9 @@ public TransportAddress[] addressesFromString(String address, int perAddressLimi
private static final Pattern BRACKET_PATTERN = Pattern.compile("^\\[(.*:.*)\\](?::([\\d\\-]*))?$");

/**
* parse a hostname+port range spec into its equivalent addresses
* parse a hostname+port spec into its equivalent addresses
*/
static TransportAddress[] parse(String hostPortString, String defaultPortRange, int perAddressLimit) throws UnknownHostException {
static TransportAddress[] parse(String hostPortString, int defaultPort) throws UnknownHostException {
Objects.requireNonNull(hostPortString);
String host;
String portString = null;
Expand Down Expand Up @@ -498,22 +517,18 @@ static TransportAddress[] parse(String hostPortString, String defaultPortRange,
}
}

int port;
// if port isn't specified, fill with the default
if (portString == null || portString.isEmpty()) {
portString = defaultPortRange;
port = defaultPort;
} else {
port = Integer.parseInt(portString);
}

// generate address for each port in the range
Set<InetAddress> addresses = new HashSet<>(Arrays.asList(InetAddress.getAllByName(host)));
List<TransportAddress> transportAddresses = new ArrayList<>();
int[] ports = new PortsRange(portString).ports();
int limit = Math.min(ports.length, perAddressLimit);
for (int i = 0; i < limit; i++) {
for (InetAddress address : addresses) {
transportAddresses.add(new TransportAddress(address, ports[i]));
}
}
return transportAddresses.toArray(new TransportAddress[transportAddresses.size()]);
return Arrays.stream(InetAddress.getAllByName(host))
.distinct()
.map(address -> new TransportAddress(address, port))
.toArray(TransportAddress[]::new);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ public interface Transport extends LifecycleComponent {
/**
* Returns an address from its string representation.
*/
TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException;
TransportAddress[] addressesFromString(String address) throws UnknownHostException;

/**
* Returns a list of all local adresses for this transport
* Returns a list of all local addresses for this transport
*/
List<String> getLocalAddresses();
List<String> getDefaultSeedAddresses();

default CircuitBreaker getInFlightRequestBreaker() {
return new NoopCircuitBreaker("in-flight-noop");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,8 @@ public BoundTransportAddress boundAddress() {
return transport.boundAddress();
}

public List<String> getLocalAddresses() {
return transport.getLocalAddresses();
public List<String> getDefaultSeedAddresses() {
return transport.getDefaultSeedAddresses();
}

/**
Expand Down Expand Up @@ -748,8 +748,8 @@ private boolean shouldTraceAction(String action) {
return true;
}

public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
return transport.addressesFromString(address, perAddressLimit);
public TransportAddress[] addressesFromString(String address) throws UnknownHostException {
return transport.addressesFromString(address);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public BoundTransportAddress boundAddress() {
}

@Override
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
public TransportAddress[] addressesFromString(String address) throws UnknownHostException {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ private static class TestIteration implements Closeable {
threadPool = new TestThreadPool("transport-client-nodes-service-tests");
transport = new FailAndRetryMockTransport<TestResponse>(random(), clusterName) {
@Override
public List<String> getLocalAddresses() {
public List<String> getDefaultSeedAddresses() {
return Collections.emptyList();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ public Map<String, BoundTransportAddress> profileBoundAddresses() {
}

@Override
public TransportAddress[] addressesFromString(String address, int perAddressLimit) {
public TransportAddress[] addressesFromString(String address) {
return new TransportAddress[0];
}

Expand Down Expand Up @@ -440,7 +440,7 @@ public boolean isClosed() {
}

@Override
public List<String> getLocalAddresses() {
public List<String> getDefaultSeedAddresses() {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,43 +152,6 @@ public void testResolvesAddressesInBackgroundAndIgnoresConcurrentCalls() throws
assertThat(resolvedAddressesRef.get(), equalTo(transportAddresses));
}

public void testPortLimit() {
final NetworkService networkService = new NetworkService(Collections.emptyList());
final Transport transport = new MockNioTransport(
Settings.EMPTY,
Version.CURRENT,
threadPool,
networkService,
PageCacheRecycler.NON_RECYCLING_INSTANCE,
new NamedWriteableRegistry(Collections.emptyList()),
new NoneCircuitBreakerService()) {

@Override
public BoundTransportAddress boundAddress() {
return new BoundTransportAddress(
new TransportAddress[]{new TransportAddress(InetAddress.getLoopbackAddress(), 9500)},
new TransportAddress(InetAddress.getLoopbackAddress(), 9500)
);
}
};
closeables.push(transport);
final TransportService transportService =
new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null,
Collections.emptySet());
closeables.push(transportService);
recreateSeedHostsResolver(transportService);
final int limitPortCounts = randomIntBetween(1, 10);
final List<TransportAddress> transportAddresses = seedHostsResolver.resolveHosts(Collections.singletonList("127.0.0.1"),
limitPortCounts);
assertThat(transportAddresses, hasSize(limitPortCounts));
final Set<Integer> ports = new HashSet<>();
for (final TransportAddress address : transportAddresses) {
assertTrue(address.address().getAddress().isLoopbackAddress());
ports.add(address.getPort());
}
assertThat(ports, equalTo(IntStream.range(9300, 9300 + limitPortCounts).boxed().collect(Collectors.toSet())));
}

public void testRemovingLocalAddresses() {
final NetworkService networkService = new NetworkService(Collections.emptyList());
final InetAddress loopbackAddress = InetAddress.getLoopbackAddress();
Expand Down Expand Up @@ -218,9 +181,10 @@ public BoundTransportAddress boundAddress() {
Collections.emptySet());
closeables.push(transportService);
recreateSeedHostsResolver(transportService);
final List<TransportAddress> transportAddresses = seedHostsResolver.resolveHosts(
Collections.singletonList(NetworkAddress.format(loopbackAddress)),
10);
List<String> hosts = IntStream.range(9300, 9310)
.mapToObj(port -> NetworkAddress.format(loopbackAddress) + ":" + port)
.collect(Collectors.toList());
final List<TransportAddress> transportAddresses = seedHostsResolver.resolveHosts(hosts);
assertThat(transportAddresses, hasSize(7));
final Set<Integer> ports = new HashSet<>();
for (final TransportAddress address : transportAddresses) {
Expand Down Expand Up @@ -252,7 +216,7 @@ public BoundTransportAddress boundAddress() {
}

@Override
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
public TransportAddress[] addressesFromString(String address) throws UnknownHostException {
throw unknownHostException;
}

Expand All @@ -279,7 +243,7 @@ public TransportAddress[] addressesFromString(String address, int perAddressLimi

try {
Loggers.addAppender(logger, appender);
final List<TransportAddress> transportAddresses = seedHostsResolver.resolveHosts(Collections.singletonList(hostname), 1);
final List<TransportAddress> transportAddresses = seedHostsResolver.resolveHosts(Collections.singletonList(hostname));

assertThat(transportAddresses, empty());
appender.assertAllExpectationsMatched();
Expand Down Expand Up @@ -310,7 +274,7 @@ public BoundTransportAddress boundAddress() {
}

@Override
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
public TransportAddress[] addressesFromString(String address) throws UnknownHostException {
if ("hostname1".equals(address)) {
return new TransportAddress[]{new TransportAddress(TransportAddress.META_ADDRESS, 9300)};
} else if ("hostname2".equals(address)) {
Expand Down Expand Up @@ -346,7 +310,7 @@ public TransportAddress[] addressesFromString(String address, int perAddressLimi

try {
Loggers.addAppender(logger, appender);
final List<TransportAddress> transportAddresses = seedHostsResolver.resolveHosts(Arrays.asList("hostname1", "hostname2"), 1);
final List<TransportAddress> transportAddresses = seedHostsResolver.resolveHosts(Arrays.asList("hostname1", "hostname2"));

assertThat(transportAddresses, hasSize(1));
appender.assertAllExpectationsMatched();
Expand Down Expand Up @@ -396,7 +360,7 @@ public BoundTransportAddress boundAddress() {
try {
Loggers.addAppender(logger, appender);
final List<TransportAddress> transportAddresses = seedHostsResolver.resolveHosts(
Arrays.asList("127.0.0.1:9300:9300", "127.0.0.1:9301"), 1);
Arrays.asList("127.0.0.1:9300:9300", "127.0.0.1:9301"));
assertThat(transportAddresses, hasSize(1)); // only one of the two is valid and will be used
assertThat(transportAddresses.get(0).getAddress(), equalTo("127.0.0.1"));
assertThat(transportAddresses.get(0).getPort(), equalTo(9301));
Expand Down
Loading