Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use plugin classloader when executing task defined in a plugin #80

Merged
merged 1 commit into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.html
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ <h1>
<li>[<a href='https://github.com/igniterealtime/openfire-hazelcast-plugin/issues/103'>Issue #103</a>] - Fix Cluster initialization race condition</li>
<li>[<a href='https://github.com/igniterealtime/openfire-hazelcast-plugin/issues/102'>Issue #102</a>] - Remove unused code in ClusterListener</li>
<li>[<a href='https://github.com/igniterealtime/openfire-hazelcast-plugin/issues/81'>Issue #81</a>] - ClusterExternalizableUtil should not ignore provided ClassLoader instances</li>
<li>[<a href='https://github.com/igniterealtime/openfire-hazelcast-plugin/issues/79'>Issue #79</a>] - Use plugin classloader when executing task defined in a plugin</li>
</ul>

<p><b>3.0.0</b> -- September 12, 2024</p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,9 +420,14 @@ public <T> Collection<T> doSynchronousClusterTask(final ClusterTask<T> task, fin
final Collection<T> result = new ArrayList<>();
if (!members.isEmpty()) {
// Asynchronously execute the task on the other cluster members
logger.debug("Executing MultiTask: " + task.getClass().getName());
final PluginClassLoader pluginClassLoader = checkForPluginClassLoader(task);
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
logger.debug("Executing MultiTask: " + task.getClass().getName());
checkForPluginClassLoader(task);
// Switch to the classloader that provides the plugin classes, if needed.
if (pluginClassLoader != null) {
Thread.currentThread().setContextClassLoader(pluginClassLoader);
}
final Map<Member, ? extends Future<T>> futures = hazelcast.getExecutorService(HAZELCAST_EXECUTOR_SERVICE_NAME.getValue()).submitToMembers(new CallableTask<>(task), members);
long nanosLeft = TimeUnit.SECONDS.toNanos(MAX_CLUSTER_EXECUTION_TIME.getValue().getSeconds() * members.size());
for (final Future<T> future : futures.values()) {
Expand All @@ -434,6 +439,11 @@ public <T> Collection<T> doSynchronousClusterTask(final ClusterTask<T> task, fin
logger.error("Failed to execute cluster task within " + StringUtils.getFullElapsedTime(MAX_CLUSTER_EXECUTION_TIME.getValue()), te);
} catch (final Exception e) {
logger.error("Failed to execute cluster task", e);
} finally {
if (pluginClassLoader != null) {
// Revert back to the original classloader.
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
} else {
logger.debug("No cluster members selected for cluster task " + task.getClass().getName());
Expand All @@ -457,15 +467,25 @@ public <T> T doSynchronousClusterTask(final ClusterTask<T> task, final byte[] no
if (member != null) {
// Asynchronously execute the task on the target member
logger.debug("Executing DistributedTask: " + task.getClass().getName());
checkForPluginClassLoader(task);
final PluginClassLoader pluginClassLoader = checkForPluginClassLoader(task);
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
// Switch to the classloader that provides the plugin classes, if needed.
if (pluginClassLoader != null) {
Thread.currentThread().setContextClassLoader(pluginClassLoader);
}
final Future<T> future = hazelcast.getExecutorService(HAZELCAST_EXECUTOR_SERVICE_NAME.getValue()).submitToMember(new CallableTask<>(task), member);
result = future.get(MAX_CLUSTER_EXECUTION_TIME.getValue().getSeconds(), TimeUnit.SECONDS);
logger.trace("DistributedTask result: {}", result);
} catch (final TimeoutException te) {
logger.error("Failed to execute cluster task within " + MAX_CLUSTER_EXECUTION_TIME + " seconds", te);
} catch (final Exception e) {
logger.error("Failed to execute cluster task", e);
} finally {
if (pluginClassLoader != null) {
// Revert back to the original classloader.
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
} else {
final String msg = MessageFormat.format("Requested node {0} not found in cluster", new String(nodeID, StandardCharsets.UTF_8));
Expand Down Expand Up @@ -547,11 +567,12 @@ public Lock getLock(final Object key, Cache cache) {
* limited by a time interval.
*
* @param o the instance for which to verify the class loader
* @return The PluginClassLoader that was used to load the instance, if it was loaded by a plugin
* @see <a href="https://github.com/igniterealtime/openfire-hazelcast-plugin/issues/74">Issue #74: Warn against usage of plugin-provided classes in Hazelcast</a>
*/
protected <T extends ClusterTask<?>> void checkForPluginClassLoader(final T o) {
if (o != null && o.getClass().getClassLoader() instanceof PluginClassLoader
&& !pluginClassLoaderWarnings.containsKey(o.getClass().getName()) )
protected <T extends ClusterTask<?>> PluginClassLoader checkForPluginClassLoader(final T o) {
PluginClassLoader result = null;
if (o != null && o.getClass().getClassLoader() instanceof PluginClassLoader)
{
// Try to determine what plugin loaded the offending class.
String pluginName = null;
Expand All @@ -561,17 +582,23 @@ protected <T extends ClusterTask<?>> void checkForPluginClassLoader(final T o) {
final PluginClassLoader pluginClassloader = XMPPServer.getInstance().getPluginManager().getPluginClassloader(plugin);
if (o.getClass().getClassLoader().equals(pluginClassloader)) {
pluginName = XMPPServer.getInstance().getPluginManager().getCanonicalName(plugin);
result = pluginClassloader;
break;
}
}
} catch (Exception e) {
logger.debug("An exception occurred while trying to determine the plugin class loader that loaded an instance of {}", o.getClass(), e);
}
logger.warn("An instance of {} that is executed as a cluster task. This will cause issues when reloading " +
"the plugin that provides this class. The plugin implementation should be modified.",
pluginName != null ? o.getClass() + " (provided by plugin " + pluginName + ")" : o.getClass());
pluginClassLoaderWarnings.put(o.getClass().getName(), Instant.now()); // Note that this Instant is unused.

// Only print this warning once in a while (a cache entry that is added will eventually be evicted).
if (!pluginClassLoaderWarnings.containsKey(o.getClass().getName())) {
logger.warn("An instance of {} that is executed as a cluster task. This will cause issues when reloading " +
"the plugin that provides this class. The plugin implementation should be modified.",
pluginName != null ? o.getClass() + " (provided by plugin " + pluginName + ")" : o.getClass());
pluginClassLoaderWarnings.put(o.getClass().getName(), Instant.now()); // Note that this Instant is unused.
}
}
return result;
}

private static class ClusterLock implements Lock {
Expand Down