Skip to content

Commit

Permalink
Revert "Introduce bolt+unix scheme support (#1586)" (#1589)
Browse files Browse the repository at this point in the history
This reverts commit b4f74cc.
  • Loading branch information
injectives authored Nov 21, 2024
1 parent b4f74cc commit 15d498b
Show file tree
Hide file tree
Showing 11 changed files with 155 additions and 168 deletions.
25 changes: 4 additions & 21 deletions driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import io.netty.channel.local.LocalAddress;
import io.netty.util.concurrent.EventExecutorGroup;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Clock;
import java.util.LinkedHashSet;
import java.util.Set;
Expand Down Expand Up @@ -113,6 +111,7 @@ public final Driver newInstance(
ownsEventLoopGroup = false;
}

var address = new InternalServerAddress(uri);
var routingSettings = new RoutingSettings(config.routingTablePurgeDelayMillis(), new RoutingContext(uri));

EventExecutorGroup eventExecutorGroup = bootstrap.config().group();
Expand All @@ -123,6 +122,7 @@ public final Driver newInstance(
return createDriver(
uri,
securityPlanManager,
address,
bootstrap.group(),
routingSettings,
retryLogic,
Expand All @@ -149,6 +149,7 @@ protected static MetricsProvider getOrCreateMetricsProvider(Config config, Clock
private InternalDriver createDriver(
URI uri,
BoltSecurityPlanManager securityPlanManager,
ServerAddress address,
EventLoopGroup eventLoopGroup,
RoutingSettings routingSettings,
RetryLogic retryLogic,
Expand All @@ -158,29 +159,11 @@ private InternalDriver createDriver(
boolean ownsEventLoopGroup,
Supplier<Rediscovery> rediscoverySupplier) {
BoltConnectionProvider boltConnectionProvider = null;
BoltServerAddress address;
if (Scheme.BOLT_UNIX_URI_SCHEME.equals(uri.getScheme())) {
var path = Path.of(uri.getPath());
if (!Files.exists(path)) {
throw new IllegalArgumentException(String.format("%s does not exist", path));
}
address = new BoltServerAddress(path);
} else {
var port = uri.getPort();
if (port == -1) {
port = InternalServerAddress.DEFAULT_PORT;
}
if (port >= 0 && port <= 65_535) {
address = new BoltServerAddress(uri.getHost(), port);
} else {
throw new IllegalArgumentException("Illegal port: " + port);
}
}
try {
boltConnectionProvider =
createBoltConnectionProvider(uri, config, eventLoopGroup, routingSettings, rediscoverySupplier);
boltConnectionProvider.init(
address,
new BoltServerAddress(address.host(), address.port()),
new RoutingContext(uri),
DriverInfoUtil.boltAgent(),
config.userAgent(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ private static void requireValidPort(int port) {
throw new IllegalArgumentException("Illegal port: " + port);
}

public InternalServerAddress(String address) {
this(uriFrom(address));
}

public InternalServerAddress(URI uri) {
this(hostFrom(uri), portFrom(uri));
}
Expand All @@ -60,6 +64,43 @@ private static RuntimeException invalidAddressFormat(String address) {
return new IllegalArgumentException("Invalid address format `" + address + "`");
}

@SuppressWarnings("DuplicatedCode")
private static URI uriFrom(String address) {
String scheme;
String hostPort;

var schemeSplit = address.split("://");
if (schemeSplit.length == 1) {
// URI can't parse addresses without scheme, prepend fake "bolt://" to reuse the parsing facility
scheme = "bolt://";
hostPort = hostPortFrom(schemeSplit[0]);
} else if (schemeSplit.length == 2) {
scheme = schemeSplit[0] + "://";
hostPort = hostPortFrom(schemeSplit[1]);
} else {
throw invalidAddressFormat(address);
}

return URI.create(scheme + hostPort);
}

private static String hostPortFrom(String address) {
if (address.startsWith("[")) {
// expected to be an IPv6 address like [::1] or [::1]:7687
return address;
}

var containsSingleColon = address.indexOf(":") == address.lastIndexOf(":");
if (containsSingleColon) {
// expected to be an IPv4 address with or without port like 127.0.0.1 or 127.0.0.1:7687
return address;
}

// address contains multiple colons and does not start with '['
// expected to be an IPv6 address without brackets
return "[" + address + "]";
}

@Override
public String toString() {
return String.format("%s:%d", host, port);
Expand Down
2 changes: 0 additions & 2 deletions driver/src/main/java/org/neo4j/driver/internal/Scheme.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ public class Scheme {
public static final String BOLT_URI_SCHEME = "bolt";
public static final String BOLT_HIGH_TRUST_URI_SCHEME = "bolt+s";
public static final String BOLT_LOW_TRUST_URI_SCHEME = "bolt+ssc";
public static final String BOLT_UNIX_URI_SCHEME = "bolt+unix";
public static final String NEO4J_URI_SCHEME = "neo4j";
public static final String NEO4J_HIGH_TRUST_URI_SCHEME = "neo4j+s";
public static final String NEO4J_LOW_TRUST_URI_SCHEME = "neo4j+ssc";
Expand All @@ -35,7 +34,6 @@ public static void validateScheme(String scheme) {
case BOLT_URI_SCHEME,
BOLT_LOW_TRUST_URI_SCHEME,
BOLT_HIGH_TRUST_URI_SCHEME,
BOLT_UNIX_URI_SCHEME,
NEO4J_URI_SCHEME,
NEO4J_LOW_TRUST_URI_SCHEME,
NEO4J_HIGH_TRUST_URI_SCHEME -> {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static java.util.Objects.requireNonNull;

import java.net.URI;
import java.nio.file.Path;
import java.util.Objects;
import java.util.stream.Stream;

Expand All @@ -36,7 +35,6 @@ public class BoltServerAddress {
// resolved IP address.
protected final int port;
private final String stringValue;
private final Path path;

public BoltServerAddress(String address) {
this(uriFrom(address));
Expand All @@ -57,15 +55,6 @@ public BoltServerAddress(String host, String connectionHost, int port) {
this.stringValue = host.equals(connectionHost)
? String.format("%s:%d", host, port)
: String.format("%s(%s):%d", host, connectionHost, port);
this.path = null;
}

public BoltServerAddress(Path path) {
this.host = path.toString();
this.connectionHost = this.host;
this.port = -1;
this.stringValue = this.host;
this.path = path;
}

@Override
Expand Down Expand Up @@ -102,10 +91,6 @@ public String connectionHost() {
return connectionHost;
}

public Path path() {
return path;
}

/**
* Create a stream of unicast addresses.
* <p>
Expand All @@ -130,6 +115,7 @@ private static int portFrom(URI uri) {
return port == -1 ? DEFAULT_PORT : port;
}

@SuppressWarnings("DuplicatedCode")
private static URI uriFrom(String address) {
String scheme;
String hostPort;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [https://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.bolt.basicimpl;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.neo4j.driver.Value;
import org.neo4j.driver.internal.bolt.api.AccessMode;
import org.neo4j.driver.internal.bolt.api.BoltAgent;
import org.neo4j.driver.internal.bolt.api.BoltServerAddress;
import org.neo4j.driver.internal.bolt.api.MetricsListener;
import org.neo4j.driver.internal.bolt.api.NotificationConfig;
import org.neo4j.driver.internal.bolt.api.RoutingContext;
import org.neo4j.driver.internal.bolt.api.SecurityPlan;
import org.neo4j.driver.internal.bolt.basicimpl.spi.Connection;

public interface ConnectionProvider {

CompletionStage<Connection> acquireConnection(
BoltServerAddress address,
SecurityPlan securityPlan,
RoutingContext routingContext,
String databaseName,
Map<String, Value> authMap,
BoltAgent boltAgent,
String userAgent,
AccessMode mode,
int connectTimeoutMillis,
String impersonatedUser,
CompletableFuture<Long> latestAuthMillisFuture,
NotificationConfig notificationConfig,
MetricsListener metricsListener);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [https://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.bolt.basicimpl;

import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalAddress;
import java.time.Clock;
import org.neo4j.driver.internal.bolt.api.DomainNameResolver;
import org.neo4j.driver.internal.bolt.api.LoggingProvider;

public class ConnectionProviders {
static ConnectionProvider netty(
EventLoopGroup group,
Clock clock,
DomainNameResolver domainNameResolver,
LocalAddress localAddress,
LoggingProvider logging) {
return new NettyConnectionProvider(group, clock, domainNameResolver, localAddress, logging);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public final class NettyBoltConnectionProvider implements BoltConnectionProvider
private final LoggingProvider logging;
private final System.Logger log;

private final NettyConnectionProvider connectionProvider;
private final ConnectionProvider connectionProvider;

private BoltServerAddress address;

Expand All @@ -76,7 +76,7 @@ public NettyBoltConnectionProvider(
this.logging = Objects.requireNonNull(logging);
this.log = logging.getLog(getClass());
this.connectionProvider =
new NettyConnectionProvider(eventLoopGroup, clock, domainNameResolver, localAddress, logging);
ConnectionProviders.netty(eventLoopGroup, clock, domainNameResolver, localAddress, logging);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@
import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.socket.nio.NioDomainSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.resolver.AddressResolverGroup;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnixDomainSocketAddress;
import java.time.Clock;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand All @@ -54,7 +52,7 @@
import org.neo4j.driver.internal.bolt.basicimpl.messaging.BoltProtocol;
import org.neo4j.driver.internal.bolt.basicimpl.spi.Connection;

public final class NettyConnectionProvider {
public final class NettyConnectionProvider implements ConnectionProvider {
private final EventLoopGroup eventLoopGroup;
private final Clock clock;
private final DomainNameResolver domainNameResolver;
Expand All @@ -77,6 +75,7 @@ public NettyConnectionProvider(
this.logging = logging;
}

@Override
public CompletionStage<Connection> acquireConnection(
BoltServerAddress address,
SecurityPlan securityPlan,
Expand All @@ -91,9 +90,27 @@ public CompletionStage<Connection> acquireConnection(
CompletableFuture<Long> latestAuthMillisFuture,
NotificationConfig notificationConfig,
MetricsListener metricsListener) {
var bootstrap = new Bootstrap();
bootstrap
.group(this.eventLoopGroup)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMillis)
.channel(localAddress != null ? LocalChannel.class : NioSocketChannel.class)
.resolver(addressResolverGroup)
.handler(new NettyChannelInitializer(address, securityPlan, connectTimeoutMillis, clock, logging));

SocketAddress socketAddress;
if (localAddress == null) {
try {
socketAddress =
new InetSocketAddress(domainNameResolver.resolve(address.connectionHost())[0], address.port());
} catch (Throwable t) {
socketAddress = InetSocketAddress.createUnresolved(address.connectionHost(), address.port());
}
} else {
socketAddress = localAddress;
}

return installChannelConnectedListeners(
address, connect(address, securityPlan, connectTimeoutMillis), connectTimeoutMillis)
return installChannelConnectedListeners(address, bootstrap.connect(socketAddress), connectTimeoutMillis)
.thenCompose(channel -> BoltProtocol.forChannel(channel)
.initializeChannel(
channel,
Expand All @@ -107,39 +124,6 @@ address, connect(address, securityPlan, connectTimeoutMillis), connectTimeoutMil
.thenApply(channel -> new NetworkConnection(channel, logging));
}

private ChannelFuture connect(BoltServerAddress address, SecurityPlan securityPlan, int connectTimeoutMillis) {
Class<? extends Channel> channelClass;
SocketAddress socketAddress;

if (localAddress != null) {
channelClass = LocalChannel.class;
socketAddress = localAddress;
} else {
if (address.path() != null) {
channelClass = NioDomainSocketChannel.class;
socketAddress = UnixDomainSocketAddress.of(address.path());
} else {
channelClass = NioSocketChannel.class;
try {
socketAddress = new InetSocketAddress(
domainNameResolver.resolve(address.connectionHost())[0], address.port());
} catch (Throwable t) {
socketAddress = InetSocketAddress.createUnresolved(address.connectionHost(), address.port());
}
}
}

var bootstrap = new Bootstrap();
bootstrap
.group(this.eventLoopGroup)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMillis)
.channel(channelClass)
.resolver(addressResolverGroup)
.handler(new NettyChannelInitializer(address, securityPlan, connectTimeoutMillis, clock, logging));

return bootstrap.connect(socketAddress);
}

private CompletionStage<Channel> installChannelConnectedListeners(
BoltServerAddress address, ChannelFuture channelConnected, int connectTimeoutMillis) {
var pipeline = channelConnected.channel().pipeline();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@

class DriverFactoryTest {
private static Stream<String> testUris() {
return Stream.of("bolt://localhost:7687", "bolt+unix://localhost:7687", "neo4j://localhost:7687");
return Stream.of("bolt://localhost:7687", "neo4j://localhost:7687");
}

@ParameterizedTest
Expand Down
Loading

0 comments on commit 15d498b

Please sign in to comment.