Skip to content

Commit

Permalink
GG-20638 Added retries on peer class loading timeouts.
Browse files Browse the repository at this point in the history
  • Loading branch information
ibessonov committed Aug 8, 2019
1 parent fdd7fd4 commit aa827af
Show file tree
Hide file tree
Showing 8 changed files with 487 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicStampedReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.compute.ComputeTask;
import org.apache.ignite.configuration.DeploymentMode;
import org.apache.ignite.internal.processors.task.GridInternal;
Expand All @@ -41,6 +43,7 @@
import org.apache.ignite.internal.util.lang.GridTuple;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
Expand Down Expand Up @@ -497,6 +500,10 @@ else if (!a.equals(clsName)) {
}
}
}
catch (IgniteException e) {
if (!X.hasCause(e, TimeoutException.class))
throw e;
}
}

return cls;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.DeploymentMode;
Expand All @@ -36,6 +38,7 @@
import org.apache.ignite.internal.util.GridByteArrayList;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
Expand Down Expand Up @@ -444,6 +447,9 @@ private boolean isLocallyExcluded(String name) {
// Catch Throwable to secure against any errors resulted from
// corrupted class definitions or other user errors.
catch (Exception e) {
if (X.hasCause(e, TimeoutException.class))
throw e;

throw new ClassNotFoundException("Failed to load class due to unexpected error: " + name, e);
}

Expand Down Expand Up @@ -580,6 +586,8 @@ private GridByteArrayList sendClassRequest(String name, String path) throws Clas

IgniteCheckedException err = null;

TimeoutException te = null;

for (UUID nodeId : nodeListCp) {
if (nodeId.equals(ctx.discovery().localNode().id()))
// Skip local node as it is already used as parent class loader.
Expand All @@ -597,7 +605,14 @@ private GridByteArrayList sendClassRequest(String name, String path) throws Clas
}

try {
GridDeploymentResponse res = comm.sendResourceRequest(path, ldrId, node, endTime);
GridDeploymentResponse res = null;

try {
res = comm.sendResourceRequest(path, ldrId, node, endTime);
}
catch (TimeoutException e) {
te = e;
}

if (res == null) {
String msg = "Failed to send class-loading request to node (is node alive?) [node=" +
Expand Down Expand Up @@ -656,12 +671,28 @@ else if (log.isDebugEnabled())
}
}

if (te != null) {
err.addSuppressed(te);

throw new IgniteException(err);
}

throw new ClassNotFoundException("Failed to peer load class [class=" + name + ", nodeClsLdrs=" +
nodeLdrMapCp + ", parentClsLoader=" + getParent() + ']', err);
}

/** {@inheritDoc} */
@Nullable @Override public InputStream getResourceAsStream(String name) {
try {
return getResourceAsStreamEx(name);
}
catch (TimeoutException ignore) {
return null;
}
}

/** */
@Nullable public InputStream getResourceAsStreamEx(String name) throws TimeoutException {
assert !Thread.holdsLock(mux);

if (byteMap != null && name.endsWith(".class")) {
Expand Down Expand Up @@ -701,7 +732,7 @@ else if (log.isDebugEnabled())
* @param name Resource name.
* @return InputStream for resource or {@code null} if resource could not be found.
*/
@Nullable private InputStream sendResourceRequest(String name) {
@Nullable private InputStream sendResourceRequest(String name) throws TimeoutException {
assert !Thread.holdsLock(mux);

long endTime = computeEndTime(p2pTimeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
Expand Down Expand Up @@ -353,7 +354,7 @@ void sendUndeployRequest(String rsrcName, Collection<ClusterNode> rmtNodes) thro
* @throws IgniteCheckedException Thrown if there is no connection with remote node.
*/
GridDeploymentResponse sendResourceRequest(final String rsrcName, IgniteUuid clsLdrId,
final ClusterNode dstNode, long threshold) throws IgniteCheckedException {
final ClusterNode dstNode, long threshold) throws IgniteCheckedException, TimeoutException {
assert rsrcName != null;
assert dstNode != null;
assert clsLdrId != null;
Expand Down Expand Up @@ -470,13 +471,21 @@ GridDeploymentResponse sendResourceRequest(final String rsrcName, IgniteUuid cls

timeout = threshold - U.currentTimeMillis();
}

if (timeout <= 0)
throw new TimeoutException();
}
catch (InterruptedException e) {
// Interrupt again to get it in the users code.
Thread.currentThread().interrupt();

throw new IgniteCheckedException("Got interrupted while waiting for response from node: " +
dstNode.id(), e);
TimeoutException te = new TimeoutException(
"Got interrupted while waiting for response from node: " + dstNode.id()
);

te.initCause(e);

throw te;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.DeploymentMode;
import org.apache.ignite.events.DeploymentEvent;
Expand Down Expand Up @@ -688,7 +689,7 @@ private boolean checkLoadRemoteClass(String clsName, GridDeploymentMetadata meta
return false;

// Temporary class loader.
ClassLoader temp = new GridDeploymentClassLoader(
GridDeploymentClassLoader temp = new GridDeploymentClassLoader(
IgniteUuid.fromUuid(ctx.localNodeId()),
meta.userVersion(),
meta.deploymentMode(),
Expand All @@ -711,7 +712,14 @@ private boolean checkLoadRemoteClass(String clsName, GridDeploymentMetadata meta
InputStream rsrcIn = null;

try {
rsrcIn = temp.getResourceAsStream(path);
boolean timeout = false;

try {
rsrcIn = temp.getResourceAsStreamEx(path);
}
catch (TimeoutException e) {
timeout = true;
}

boolean found = rsrcIn != null;

Expand All @@ -731,7 +739,7 @@ private boolean checkLoadRemoteClass(String clsName, GridDeploymentMetadata meta

return false;
}
else
else if (!timeout)
// Cache result if classloader is still alive.
ldrRsrcCache.put(clsName, found);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Copyright 2019 GridGain Systems, Inc. and Contributors.
*
* Licensed under the GridGain Community Edition License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal;

import java.net.MalformedURLException;
import java.net.URL;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeTask;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DeploymentMode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.managers.deployment.GridDeploymentResponse;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.testframework.GridTestExternalClassLoader;
import org.apache.ignite.testframework.config.GridTestProperties;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;

/**
* Tests affinity and affinity mapper P2P loading.
*/
public class GridPeerDeploymentRetryModifiedTest extends GridCommonAbstractTest {
/** */
private static final String EXT_TASK_CLASS_NAME = "org.apache.ignite.tests.p2p.CacheDeploymentTestTask4";

/** URL of classes. */
private static final URL[] URLS;

/** Current deployment mode. Used in {@link #getConfiguration(String)}. */
private DeploymentMode depMode;

/**
* Initialize URLs.
*/
static {
try {
URLS = new URL[] {new URL(GridTestProperties.getProperty("p2p.uri.cls"))};
}
catch (MalformedURLException e) {
throw new RuntimeException("Define property p2p.uri.cls", e);
}
}

/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
cfg.setDeploymentMode(depMode);

cfg.setCacheConfiguration(new CacheConfiguration().setName(DEFAULT_CACHE_NAME));

return cfg;
}

/**
*
*/
public GridPeerDeploymentRetryModifiedTest() {
super(false);
}

/**
* Test {@link DeploymentMode#PRIVATE} mode.
*
* @throws Exception if error occur.
*/
@Test
public void testPrivateMode() throws Exception {
depMode = DeploymentMode.PRIVATE;

deploymentTest();
}

/**
* Test {@link DeploymentMode#ISOLATED} mode.
*
* @throws Exception if error occur.
*/
@Test
public void testIsolatedMode() throws Exception {
depMode = DeploymentMode.ISOLATED;

deploymentTest();
}

/**
* Test {@link DeploymentMode#CONTINUOUS} mode.
*
* @throws Exception if error occur.
*/
@Test
public void testContinuousMode() throws Exception {
depMode = DeploymentMode.CONTINUOUS;

deploymentTest();
}

/**
* Test {@link DeploymentMode#SHARED} mode.
*
* @throws Exception if error occur.
*/
@Test
public void testSharedMode() throws Exception {
depMode = DeploymentMode.SHARED;

deploymentTest();
}

/** @throws Exception If failed. */
private void deploymentTest() throws Exception {
Ignite g1 = startGrid(1);
IgniteEx g2 = startGrid(2);

try {
GridTestExternalClassLoader ldr = new GridTestExternalClassLoader(URLS);

ClusterNode node = g1.cluster().node(g2.cluster().localNode().id());

g1.compute(g1.cluster().forRemotes()).execute(
(ComputeTask<Object, T2>)ldr.loadClass(EXT_TASK_CLASS_NAME).newInstance(),
new T2<>(node, null)
);

TestRecordingCommunicationSpi rec1 =
(TestRecordingCommunicationSpi)g1.configuration().getCommunicationSpi();

rec1.blockMessages((n, message) -> message instanceof GridDeploymentResponse);

ComputeTask<Object, T2> task = (ComputeTask<Object, T2>)ldr.loadClass(EXT_TASK_CLASS_NAME).newInstance();

try {
g1.compute(g1.cluster().forRemotes()).withTimeout(2000).execute(task, new T2<>(node, "foo"));

fail("Exception should be thrown");
}
catch (IgniteException ignore) {
// Expected exception.
//ignore.printStackTrace();
}

rec1.stopBlock(false, null, true, true);

try {
g1.compute(g1.cluster().forRemotes()).execute(task, new T2<>(node, "bar"));
}
catch (Exception e) {
throw new RuntimeException(e);
}

assertFalse(g2.cache(DEFAULT_CACHE_NAME).containsKey("foo"));
assertTrue(g2.cache(DEFAULT_CACHE_NAME).containsKey("bar"));
}
finally {
stopAllGrids(true);
}
}
}
Loading

0 comments on commit aa827af

Please sign in to comment.