Skip to content

Commit

Permalink
Ejbclient 356 4.0 (wildfly#470)
Browse files Browse the repository at this point in the history
* [EJBCLIENT-356] Add ability to set a short timeout for additional nodes

* [EJBCLIENT-356] Eagerly return node information in discovery

Return information eagerly, so that if one node is
badly behaved additional information can still be
returned.

Includes a 'black hole' test, where a socket is opened
that does nothing, to emulate a network where the packets
are just going missing.

Co-authored-by: Stuart Douglas <stuart.w.douglas@gmail.com>
  • Loading branch information
bmaxwell and stuartwdouglas authored Sep 23, 2020
1 parent 92b11c3 commit b1db702
Show file tree
Hide file tree
Showing 5 changed files with 261 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public final class DiscoveryEJBClientInterceptor implements EJBClientInterceptor
private static final boolean WILDFLY_TESTSUITE_HACK = Boolean.getBoolean("org.jboss.ejb.client.wildfly-testsuite-hack");
// This provides a way timeout a discovery, avoiding blocking on some edge cases. See EJBCLIENT-311.
private static final long DISCOVERY_TIMEOUT = Long.parseLong(WildFlySecurityManager.getPropertyPrivileged("org.jboss.ejb.client.discovery.timeout", "0"));
//how long to wait if at least one node has already been discovered. This one is in ms rather than s
private static final long DISCOVERY_ADDITIONAL_TIMEOUT = Long.parseLong(WildFlySecurityManager.getPropertyPrivileged("org.jboss.ejb.client.discovery.additional-node-timeout", "0"));

/**
* This interceptor's priority.
Expand Down Expand Up @@ -490,9 +492,10 @@ private List<Throwable> doAnyDiscovery(AbstractInvocationContext context, final
final Map<URI, List<String>> clusterAssociations = new HashMap<>();

int nodeless = 0;
long timeout = DISCOVERY_TIMEOUT * 1000;
try (final ServicesQueue queue = discover(filterSpec)) {
ServiceURL serviceURL;
while ((serviceURL = queue.takeService(DISCOVERY_TIMEOUT, TimeUnit.SECONDS)) != null) {
while ((serviceURL = queue.takeService(timeout, TimeUnit.MILLISECONDS)) != null) {
final URI location = serviceURL.getLocationURI();
if (!blacklist.contains(location)) {
// Got a match! See if there's a node affinity to set for the invocation.
Expand Down Expand Up @@ -528,6 +531,10 @@ private List<Throwable> doAnyDiscovery(AbstractInvocationContext context, final
}
}
}
//one has already been discovered, we may want a shorter timeout for additional nodes
if (DISCOVERY_ADDITIONAL_TIMEOUT != 0) {
timeout = DISCOVERY_ADDITIONAL_TIMEOUT; //this one is actually in ms, you generally want it very short
}
}
}
problems = queue.getProblems();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,10 @@ final class DiscoveryAttempt implements DiscoveryRequest, DiscoveryResult {
// keep a record of URIs we try to connect to for each cluster
private final ConcurrentHashMap<String, Set<URI>> urisByCluster = new ConcurrentHashMap<>();
private final Set<URI> connectFailedURIs = new HashSet<>();
/**
* nodes that have already been provided to the discovery provider eagerly
*/
private final Set<String> eagerNodes = Collections.synchronizedSet(new HashSet<>());

DiscoveryAttempt(final ServiceType serviceType, final FilterSpec filterSpec, final DiscoveryResult discoveryResult, final RemoteEJBReceiver ejbReceiver, final AuthenticationContext authenticationContext) {
this.serviceType = serviceType;
Expand Down Expand Up @@ -466,28 +470,36 @@ void countDown() {
final EJBModuleIdentifier module = filterSpec.accept(MI_EXTRACTOR);
if (phase2) {
if (node != null) {
final NodeInformation information = nodes.get(node);
if (information != null) information.discover(serviceType, filterSpec, result);
if (!eagerNodes.contains(node)) {
final NodeInformation information = nodes.get(node);
if (information != null) information.discover(serviceType, filterSpec, result);
}
} else for (NodeInformation information : nodes.values()) {
information.discover(serviceType, filterSpec, result);
if (!eagerNodes.contains(information.getNodeName())) {
information.discover(serviceType, filterSpec, result);
}
}
result.complete();
} else {
boolean ok = false;
// optimize for simple module identifier and node name queries
if (node != null) {
final NodeInformation information = nodes.get(node);
if (information != null) {
if (information.discover(serviceType, filterSpec, result)) {
ok = true;
if (!eagerNodes.contains(node)) {
final NodeInformation information = nodes.get(node);
if (information != null) {
if (information.discover(serviceType, filterSpec, result)) {
ok = true;
}
}
}
} else for (NodeInformation information : nodes.values()) {
if (information.discover(serviceType, filterSpec, result)) {
ok = true;
if (!eagerNodes.contains(information.getNodeName())) {
if (information.discover(serviceType, filterSpec, result)) {
ok = true;
}
}
}
if (ok) {
if (ok || !eagerNodes.isEmpty()) {
result.complete();
} else {
// everything failed. We have to reconnect everything.
Expand Down Expand Up @@ -546,6 +558,25 @@ void countDown() {
countDown();
}
}
} else {
final DiscoveryResult result = this.discoveryResult;
final String node = filterSpec.accept(NODE_EXTRACTOR);
if (node != null) {
if (!eagerNodes.contains(node)) {
final NodeInformation information = nodes.get(node);
if (information != null) {
if (information.discover(serviceType, filterSpec, result)) {
eagerNodes.add(node);
}
}
}
} else for (NodeInformation information : nodes.values()) {
if (!eagerNodes.contains(information.getNodeName())) {
if (information.discover(serviceType, filterSpec, result)) {
eagerNodes.add(information.getNodeName());
}
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* JBoss, Home of Professional Open Source.
* Copyright 2019 Red Hat, Inc., and individual contributors
* as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jboss.ejb.client.test;

import org.jboss.ejb.client.EJBClient;
import org.jboss.ejb.client.StatelessEJBLocator;
import org.jboss.ejb.client.legacy.JBossEJBProperties;
import org.jboss.ejb.client.test.common.DummyServer;
import org.jboss.ejb.client.test.common.Echo;
import org.jboss.ejb.client.test.common.EchoBean;
import org.jboss.logging.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import java.net.InetAddress;
import java.net.ServerSocket;

/**
* @author <a href="ingo@redhat.com">Ingo Weiss</a>
*/
public class NetworkBlackHoleInvocationTestCase {
private static final Logger logger = Logger.getLogger(NetworkBlackHoleInvocationTestCase.class);
private static final String PROPERTIES_FILE = "jboss-ejb-client.properties";

private DummyServer server;
private boolean serverStarted = false;

// module
private static final String APP_NAME = "my-foo-app";
private static final String MODULE_NAME = "my-bar-module";
private static final String DISTINCT_NAME = "";

private static final String SERVER_NAME = "test-server";

/**
* Do any general setup here
*
* @throws Exception
*/
@BeforeClass
public static void beforeClass() throws Exception {
// trigger the static init of the correct properties file - this also depends on running in forkMode=always
JBossEJBProperties ejbProperties = JBossEJBProperties.fromClassPath(NetworkBlackHoleInvocationTestCase.class.getClassLoader(), PROPERTIES_FILE);
JBossEJBProperties.getContextManager().setGlobalDefault(ejbProperties);
}

/**
* Do any test specific setup here
*/
@Before
public void beforeTest() throws Exception {
// start a server
server = new DummyServer("localhost", 6999, SERVER_NAME);
server.start();
serverStarted = true;
logger.info("Started server ...");
serverStarted = true;

server.register(APP_NAME, MODULE_NAME, DISTINCT_NAME, Echo.class.getSimpleName(), new EchoBean());
logger.info("Registered module ...");
}

/**
* Do any test-specific tear down here.
*/
@After
public void afterTest() {
server.unregister(APP_NAME, MODULE_NAME, DISTINCT_NAME, Echo.class.getName());
logger.info("Unregistered module ...");

if (serverStarted) {
try {
this.server.stop();
} catch (Throwable t) {
logger.info("Could not stop server", t);
}
}
logger.info("Stopped server ...");
}

/**
* Test a failed client discovery
*/
@Test
public void testTakingDownServerDoesNotBreakClients() throws Exception {
System.setProperty("org.jboss.ejb.client.discovery.timeout", "10");
System.setProperty("org.jboss.ejb.client.discovery.additional-node-timeout","2");

try (DummyServer server2 = new DummyServer("localhost", 7099, "test2")) {
server2.start();
server2.register(APP_NAME, MODULE_NAME, DISTINCT_NAME, Echo.class.getSimpleName(), new EchoBean());

// create a proxy for invocation
final StatelessEJBLocator<Echo> statelessEJBLocator = new StatelessEJBLocator<>
(Echo.class, APP_NAME, MODULE_NAME, Echo.class.getSimpleName(), DISTINCT_NAME);
final Echo proxy = EJBClient.createProxy(statelessEJBLocator);
Assert.assertNotNull("Received a null proxy", proxy);
logger.info("Created proxy for Echo: " + proxy.toString());

logger.info("Invoking on proxy...");
// Invoke on the proxy. This should fail in 10 seconds or else it'll hang.
final String message = "hello!";
String echo = proxy.echo(message);
Assert.assertEquals(message, echo);
server2.hardKill();
//this is a network black hole
//it emulates what happens if the server just disappears, and connect attempts hang
//instead of being immediately rejected (e.g. a firewall dropping packets)
try (ServerSocket s = new ServerSocket(7099, 100, InetAddress.getByName("localhost"))) {
echo = proxy.echo(message);
Assert.assertEquals(message, echo);

}

}
}
}
51 changes: 44 additions & 7 deletions src/test/java/org/jboss/ejb/client/test/common/DummyServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
import org.jboss.ejb.protocol.remote.RemoteEJBService;
import org.jboss.ejb.server.Association;
import org.jboss.ejb.server.ClusterTopologyListener.ClusterInfo;
import org.jboss.ejb.server.ClusterTopologyListener.NodeInfo;
import org.jboss.ejb.server.ClusterTopologyListener.MappingInfo;
import org.jboss.ejb.server.ClusterTopologyListener.ClusterRemovalInfo;
import org.jboss.ejb.server.ClusterTopologyListener.MappingInfo;
import org.jboss.ejb.server.ClusterTopologyListener.NodeInfo;
import org.jboss.logging.Logger;
import org.jboss.remoting3.Channel;
import org.jboss.remoting3.Endpoint;
import org.jboss.remoting3.EndpointBuilder;
import org.jboss.remoting3.OpenListener;
Expand All @@ -47,6 +48,7 @@
import org.xnio.Xnio;
import org.xnio.channels.AcceptingChannel;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
Expand All @@ -56,13 +58,15 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

/**
* @author <a href="mailto:cdewolf@redhat.com">Carlo de Wolf</a>
* @author <a href="mailto:rachmato@redhat.com">Richard Achmatowicz</a>
*/
public class DummyServer {
public class DummyServer implements AutoCloseable {

private static final Logger logger = Logger.getLogger(DummyServer.class);
/*
Expand All @@ -85,6 +89,8 @@ public class DummyServer {
private EJBDeploymentRepository deploymentRepository = new EJBDeploymentRepository();
private EJBClusterRegistry clusterRegistry = new EJBClusterRegistry();

final Set<Channel> currentConnections = Collections.newSetFromMap(new ConcurrentHashMap<>());

public DummyServer(final String host, final int port) {
this(host, port, "default-dummy-server-endpoint");
}
Expand Down Expand Up @@ -170,16 +176,29 @@ public void start() throws Exception {
// Register an EJB channel open listener
OpenListener channelOpenListener = remoteEJBService.getOpenListener();
try {
registration = endpoint.registerService("jboss.ejb", channelOpenListener, OptionMap.EMPTY);
registration = endpoint.registerService("jboss.ejb", new OpenListener() {
@Override
public void channelOpened(Channel channel) {
currentConnections.add(channel);
channelOpenListener.channelOpened(channel);
}

@Override
public void registrationTerminated() {

}
}, OptionMap.EMPTY);
} catch (ServiceRegistrationException e) {
throw new Exception(e);
}
}

public void stop() throws Exception {
this.server.close();
this.server = null;
IoUtils.safeClose(this.endpoint);
if (server != null) {
this.server.close();
this.server = null;
IoUtils.safeClose(this.endpoint);
}
}

// module deployment interface
Expand Down Expand Up @@ -208,6 +227,24 @@ public void removeClusterNodes(ClusterRemovalInfo clusterRemovalInfo) {
clusterRegistry.removeClusterNodes(clusterRemovalInfo);
}

@Override
public void close() throws Exception {
stop();
}

public void hardKill() throws IOException {
for (Channel i : currentConnections) {
try {
i.close();
} catch (IOException e) {
logger.error("failed to close", e);
}
}
server.close();
server = null;
endpoint.close();
}

public interface EJBDeploymentRepositoryListener {
void moduleAvailable(List<EJBModuleIdentifier> modules);
void moduleUnavailable(List<EJBModuleIdentifier> modules);
Expand Down
33 changes: 33 additions & 0 deletions src/test/resources/broken-server-jboss-ejb-client.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#
# JBoss, Home of Professional Open Source.
# Copyright 2019 Red Hat, Inc., and individual contributors
# as indicated by the @author tags.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

remote.connectionprovider.create.options.org.xnio.Options.SSL_ENABLED=false

remote.connections=one

# connection to a node at protocol://host:port
remote.connection.one.host=localhost
remote.connection.one.port=6999
remote.connection.one.connect.options.org.xnio.Options.SASL_POLICY_NOANONYMOUS=false
remote.connection.one.username=test
remote.connection.one.password=test
remote.connection.two.host=localhost
remote.connection.two.port=6998
remote.connection.two.connect.options.org.xnio.Options.SASL_POLICY_NOANONYMOUS=false
remote.connection.two.username=test
remote.connection.two.password=test

0 comments on commit b1db702

Please sign in to comment.