Skip to content

Commit

Permalink
fixes igniterealtime#19: Prevent ClassCastExceptions after reloading/…
Browse files Browse the repository at this point in the history
…updating the plugin.

This commit replaces the usage of classes provided by the plugin itself during cache usage. ClassCastExceptions
can occur when a cache has a reference to a class that was loaded by a PluginClassLoader from a previous incarnation
of this plugin.

See igniterealtime#19 for more details.
  • Loading branch information
guusdk committed Jan 29, 2021
1 parent 394c167 commit 48b408c
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 84 deletions.
1 change: 1 addition & 0 deletions src/changelog.html
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ <h1>
<p><b>0.8.0</b> -- (tbd)</p>
<ul>
<li><a href="https://github.com/igniterealtime/openfire-pushnotification-plugin/issues/15">Issue #15</a>: Define minimum server requirement (4.3.0).</li>
<li><a href="https://github.com/igniterealtime/openfire-pushnotification-plugin/issues/19">Issue #19</a>: Prevent ClassCastExceptions after reloading/updating the plugin.</li>
</ul>

<p><b>0.7.0</b> -- April 26, 2020</p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,36 @@
import org.xmpp.packet.Message;
import org.xmpp.packet.Packet;

import java.io.Serializable;
import java.time.Duration;
import java.time.Instant;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.*;
import java.util.concurrent.locks.Lock;

public class PushInterceptor implements PacketInterceptor, OfflineMessageListener
{
private static final Logger Log = LoggerFactory.getLogger( PushInterceptor.class );

/**
* An memory-only cache that keeps track of the last few notification that have been generated per user.
* An memory-only cache that keeps track of for which messages (by ID) push notifications have been generated for
* specific users (by username). Key: username. Value: Set of message identifiers.
*/
private static Cache<String, HashSet<SentNotification>> LAST_NOTIFICATIONS = CacheFactory.createCache( "pushnotification.last" );
// Note: it's MUCH MORE convenient to use a single Cache that uses a custom class to keep track of notifications per user. However
// that will cause to ClassCastExceptions when the plugin gets reloaded while the cache contains (or contained) instances of
// that class! See https://github.com/igniterealtime/openfire-pushnotification-plugin/issues/19
// To prevent this issue, cache entries should only contain classes loaded by Openfire's classloader (and explicitly not classes
// loaded by the classloader that's used by this plugin). In other words: do not use classes provided by the plugin itself.
private static final Cache<String, HashSet<String>> MESSAGES_BY_USER = CacheFactory.createCache( "pushnotification.users" );

/**
* An memory-only cache that keeps track of when a push notification was generated for a particular message.
* Key: message identifier. Value: Set of instant.
*/
// Note: it's MUCH MORE convenient to use a single Cache that uses a custom class to keep track of notifications per user. However
// that will cause to ClassCastExceptions when the plugin gets reloaded while the cache contains (or contained) instances of
// that class! See https://github.com/igniterealtime/openfire-pushnotification-plugin/issues/19
// To prevent this issue, cache entries should only contain classes loaded by Openfire's classloader (and explicitly not classes
// loaded by the classloader that's used by this plugin). In other words: do not use classes provided by the plugin itself.
private static final Cache<String, HashSet<Instant>> INSTANTS_BY_MESSAGE = CacheFactory.createCache( "pushnotification.messages" );

/**
* Invokes the interceptor on the specified packet. The interceptor can either modify
Expand Down Expand Up @@ -139,8 +153,9 @@ private void tryPushNotification( User user, Message message )
}

// Basic throttling.
synchronized ( user )
{
final Lock lock = CacheFactory.getLock(user.getUsername(), MESSAGES_BY_USER);
lock.lock();
try {
if ( wasPushAttemptedFor( user, message, Duration.ofMinutes(5)) ) {
Log.debug( "For user '{}', not re-attempting push for this message that already had a push attempt recently.", user.toString() );
return;
Expand All @@ -152,6 +167,8 @@ private void tryPushNotification( User user, Message message )
}

addAttemptFor( user, message );
} finally {
lock.unlock();
}

// Perform the pushes
Expand Down Expand Up @@ -252,96 +269,167 @@ public void messageStored( final Message message )
}
}

/**
* Checks if a push notification was (attempted to be) sent to a particular user, to notify them of a particular
* message.
*
* @param user The user that would have received the push notification
* @param message The message for which a push notification would have been sent
* @param duration The past amount of time in which to check for sent push notifications
* @return true when at least one push attempt for the user/message was recently sent.
*/
public boolean wasPushAttemptedFor( final User user, final Message message, final Duration duration )
{
final Set<SentNotification> sentNotifications = LAST_NOTIFICATIONS.get(user.getUsername());
if ( sentNotifications == null || sentNotifications.isEmpty() ) {
return false;
}

// Cleanup of older attempts.
sentNotifications.removeIf( n -> n.attempt.isBefore(Instant.now().minus(duration.multipliedBy(2))));

final boolean result = sentNotifications.stream().anyMatch( n ->
n != null
&& n.messageIdentifier.equals(getMessageIdentifier(message ) )
&& n.attempt.isAfter(Instant.now().minus(duration))
);
return result;
}

public long attemptsForLast( final User user, final Duration duration ) {
final Set<SentNotification> sentNotifications = LAST_NOTIFICATIONS.get(user.getUsername());
if ( sentNotifications == null || sentNotifications.isEmpty() ) {
return 0;
}
final String identifier = getMessageIdentifier(user, message);

final Lock lock = CacheFactory.getLock(user.getUsername(), MESSAGES_BY_USER);
lock.lock();
try {
/* This can be short-circuited, as the same identifier is used in the secondary cache.
final HashSet<String> messageIdentifiers = MESSAGES_BY_USER.get( user.getUsername() );
if ( messageIdentifiers == null || messageIdentifiers.isEmpty() || !messageIdentifiers.contains(identifier)) {
return false;
}
*/

return sentNotifications.stream().filter(
n -> n.attempt.isAfter(Instant.now().minus(duration))
).count();
}
// Look up the timestamps when a push was sent for this particular user/message combinations.
final HashSet<Instant> sentTimestamps = INSTANTS_BY_MESSAGE.get(identifier);
if ( sentTimestamps == null || sentTimestamps.isEmpty() ) {
return false;
}

public void addAttemptFor( final User user, final Message message )
{
HashSet<SentNotification> sentNotifications = LAST_NOTIFICATIONS.get(user.getUsername());
if ( sentNotifications == null ) {
sentNotifications = new HashSet<>();
return sentTimestamps.stream().anyMatch( n ->
n != null && n.isAfter(Instant.now().minus(duration))
);
} finally {
lock.unlock();
}
sentNotifications.add( new SentNotification( Instant.now(), getMessageIdentifier( message ) ));
LAST_NOTIFICATIONS.put(user.getUsername(), sentNotifications);
}

public static String getMessageIdentifier( final Message message )
{
return message.getID() != null ? message.getID() : "" + message.getFrom().hashCode() + message.getBody().hashCode();
}

public static class SentNotification implements Serializable
/**
* Returns the amount of push notifications (attempted to be) sent to a particular user.
*
* @param user The user that would have received the push notification
* @param duration The past amount of time in which to check for sent push notifications
* @return The amount of notifications sent to the user recently.
*/
public long attemptsForLast( final User user, final Duration duration )
{
private Instant attempt;
private String messageIdentifier;
final Lock lock = CacheFactory.getLock(user.getUsername(), MESSAGES_BY_USER);
lock.lock();
try {
final HashSet<String> messageIdentifiers = MESSAGES_BY_USER.get(user.getUsername());
if (messageIdentifiers == null) {
return 0;
}

public SentNotification() {} // For serialization.
// Count the amount of timestamps for each pushed attempt that were within the target duration.
long result = 0;
for (final String messageIdentifier : messageIdentifiers) {
final HashSet<Instant> instants = INSTANTS_BY_MESSAGE.get(messageIdentifier);
result += instants.stream().filter(instant -> instant.isAfter(Instant.now().minus(duration))).count();
}

public SentNotification( final Instant attempt, final String messageIdentifier ) {
this.attempt = attempt;
this.messageIdentifier = messageIdentifier;
return result;
} finally {
lock.unlock();
}
}

public Instant getAttempt()
{
return attempt;
}
/**
* Registers attempts to sent a push notification for a particular message to a user.
*
* @param user The user that would receive the push notification
* @param message The message for which a push notification has been sent
*/
public void addAttemptFor( final User user, final Message message )
{
final String identifier = getMessageIdentifier(user, message);

final Lock lock = CacheFactory.getLock(user.getUsername(), MESSAGES_BY_USER);
lock.lock();
try {
HashSet<String> messageIdentifiers = MESSAGES_BY_USER.get(user.getUsername());
if (messageIdentifiers == null) {
messageIdentifiers = new HashSet<>();
}
messageIdentifiers.add(identifier);
// Clustered caches require an explicit PUT for the added element to be registered.
MESSAGES_BY_USER.put(user.getUsername(), messageIdentifiers);

public void setAttempt( final Instant attempt )
{
this.attempt = attempt;
HashSet<Instant> instants = INSTANTS_BY_MESSAGE.get(identifier);
if (instants == null) {
instants = new HashSet<>();
}
instants.add(Instant.now());
// Clustered caches require an explicit PUT for the added element to be registered.
INSTANTS_BY_MESSAGE.put(identifier, instants);
} finally {
lock.unlock();
}
}

public String getMessageIdentifier()
{
return messageIdentifier;
}
/**
* Remove from internal caches all push attempts that were sent before a particular cutoff timestamp.
*
* @param cutoff The instant after which all attempts should be retained in the caches.
*/
public void purgeAllOlderThan(final Instant cutoff)
{
Log.debug("Purging cached entries older than {}", cutoff);
final Set<String> userNames = new HashSet<>(MESSAGES_BY_USER.keySet());

public void setMessageIdentifier( final String messageIdentifier )
// Iterate over all message identifiers for each user, to be able to apply the required user-specific lock.
for ( final String username : userNames )
{
this.messageIdentifier = messageIdentifier;
}
final Lock lock = CacheFactory.getLock(username, MESSAGES_BY_USER);
lock.lock();
try
{
// These are all the message identifiers for this user.
final HashSet<String> messageIdentifiers = new HashSet<>(MESSAGES_BY_USER.get(username));
final HashSet<String> removedMessageIds = new HashSet<>();
for (String messageIdentifier : messageIdentifiers) {
// For each message, check when a pushes were sent.
final HashSet<Instant> instants = INSTANTS_BY_MESSAGE.get(messageIdentifier);

// Remove all entries that can be purged.
if (instants.removeIf(i -> i.isBefore(cutoff))) // No need to do anything if nothing changed.
{
if (instants.isEmpty()) {
// When no attempts are left, remove the entry completely.
INSTANTS_BY_MESSAGE.remove(messageIdentifier);
// Also mark this identifier as being removable from the user set.
removedMessageIds.add(messageIdentifier);
} else {
// When attempts are left, re-add the updated push (an explicit PUT is required for clustered caches).
INSTANTS_BY_MESSAGE.put(messageIdentifier, instants);
}
}
}

@Override
public boolean equals( final Object o )
{
if ( this == o ) { return true; }
if ( o == null || getClass() != o.getClass() ) { return false; }
final SentNotification that = (SentNotification) o;
return Objects.equals(attempt, that.attempt) &&
Objects.equals(messageIdentifier, that.messageIdentifier);
// When the iteration above caused any messages to be removed, remove them from the user set too.
if (!removedMessageIds.isEmpty()) {
messageIdentifiers.removeAll(removedMessageIds);
if (messageIdentifiers.isEmpty()) {
// When there are no messages left, remove the entry completely.
MESSAGES_BY_USER.remove(username);
} else {
// When attempts are left, re-add the updated push (an explicit PUT is required for clustered caches).
MESSAGES_BY_USER.put(username, messageIdentifiers);
}
}
} finally {
lock.unlock();
}
}
}

@Override
public int hashCode()
{
return Objects.hash(attempt, messageIdentifier);
}
/**
* Generates a reasonably unique identifier for a message / user combination.
*/
public static String getMessageIdentifier( final User user, final Message message )
{
return user.getUsername() + "->" + (message.getID() != null ? message.getID() : "") + message.getFrom().hashCode() + message.getBody().hashCode();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import org.jivesoftware.openfire.OfflineMessageStrategy;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.cluster.ClusterManager;
import org.jivesoftware.openfire.container.Plugin;
import org.jivesoftware.openfire.container.PluginManager;
import org.jivesoftware.openfire.disco.UserFeaturesProvider;
Expand All @@ -30,10 +31,10 @@

import java.io.File;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.*;

/**
* An Openfire plugin that adds push notification support, as defined in XEP-0357.
Expand All @@ -49,6 +50,18 @@ public class PushNotificationPlugin implements Plugin, UserEventListener

private final PushInterceptor interceptor = new PushInterceptor();

private final Timer timer = new Timer();
private final TimerTask timerTask = new TimerTask() {
@Override
public void run() {
try {
interceptor.purgeAllOlderThan(Instant.now().minus(10, ChronoUnit.MINUTES));
} catch (Exception e) {
Log.warn( "An exception occurred while trying to purge old cache entries.", e);
}
}
};

/**
* Initializes the plugin.
*
Expand All @@ -72,6 +85,9 @@ public synchronized void initializePlugin( final PluginManager manager, final Fi
XMPPServer.getInstance().getIQDiscoInfoHandler().addServerFeature( Push0IQHandler.ELEMENT_NAMESPACE );
XMPPServer.getInstance().getIQDiscoInfoHandler().addUserFeaturesProvider( push0IQHandler );

if (ClusterManager.isSeniorClusterMember()) {
timer.schedule(timerTask, Duration.ofMinutes(2).toMillis(), Duration.ofMinutes(2).toMillis());
}
Log.debug( "Initialized." );
}

Expand All @@ -89,6 +105,9 @@ public synchronized void destroyPlugin()
{
Log.debug( "Destroying..." );

timerTask.cancel();
timer.cancel();

XMPPServer.getInstance().getIQDiscoInfoHandler().removeServerFeature( Push0IQHandler.ELEMENT_NAMESPACE );

final Iterator<IQHandler> iterator = registeredHandlers.iterator();
Expand Down

0 comments on commit 48b408c

Please sign in to comment.