diff --git a/src/groovy/org/grails/plugin/platform/events/push/PushPerRequestBroadcastFilter.groovy b/src/groovy/org/grails/plugin/platform/events/push/PushPerRequestBroadcastFilter.groovy new file mode 100644 index 0000000..329c797 --- /dev/null +++ b/src/groovy/org/grails/plugin/platform/events/push/PushPerRequestBroadcastFilter.groovy @@ -0,0 +1,102 @@ +package org.grails.plugin.platform.events.push + + +import org.atmosphere.cpr.* +import org.atmosphere.cpr.BroadcastFilter.BroadcastAction +import org.codehaus.groovy.grails.support.PersistenceContextInterceptor; +import org.grails.plugin.platform.events.registry.EventsRegistry; +import org.grails.plugin.platform.events.*; +import grails.converters.JSON; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationContext; + +import static org.grails.plugin.platform.events.push.SharedConstants.*; + +class PushPerRequestBroadcastFilter implements PerRequestBroadcastFilter { + + static private Logger log = LoggerFactory.getLogger(PushPerRequestBroadcastFilter.class); + private ApplicationContext applicationContext = null; + private Events grailsEvents; + private EventsRegistry eventsRegistry; + private PersistenceContextInterceptor persistenceInterceptor + + public PushPerRequestBroadcastFilter(def applicationContext, def grailsEvents, def eventsRegistry) { + this.applicationContext = applicationContext + this.grailsEvents = grailsEvents + this.eventsRegistry = eventsRegistry + try { + persistenceInterceptor = applicationContext.getBean("persistenceInterceptor", PersistenceContextInterceptor.class); + } catch (Exception c) { + log.error "Couldn't manage to load persistence interceptor bean", c + } + } + + public BroadcastAction filter(AtmosphereResource atmosphereResource, Object originalMessage, Object message) { + BroadcastSignal signal; + + Boolean pass = false; + + if (BroadcastSignal.class.isAssignableFrom(message.getClass())) { + try { + // Required or will see the following error: + // org.hibernate.LazyInitializationException: could not initialize proxy - no Session + persistenceInterceptor.init(); + persistenceInterceptor.setReadOnly() + + signal = (BroadcastSignal) message; + def eventMessage = signal.eventMessage + def dataMessage = eventMessage.getData() + + if (eventMessage.isGormSession()) { + // To call merge is not enough here as it doesn't seem to process associated collections + // This call is required to avoid seeing the following error: + // org.hibernate.HibernateException: illegally attempted to associate a proxy with two open Sessions + dataMessage = dataMessage.get(dataMessage.id) + } + + if (atmosphereResource.getRequest().getHeader(TOPICS_HEADER) != null) { + String[] topics = atmosphereResource.getRequest().getHeader(TOPICS_HEADER).split(","); + for (String topic : topics) { + if (topic.equals(signal.eventMessage.getEvent())) { + pass = true; + break; + } + } + } + + if (signal.broadcastClientFilter != null) { + pass = (Boolean) signal.broadcastClientFilter.call( + [signal.eventMessageType ? eventMessage : dataMessage, atmosphereResource.getRequest()] as Object[] + ); + } + + if (pass) { + String json = jsonify(eventMessage, dataMessage); + return new BroadcastAction(json); + } else { + return new BroadcastAction(BroadcastAction.ACTION.ABORT, null); + } + } catch (Exception e){ + log.error "Exception in Atmosphere Request Filter", e + } finally { + persistenceInterceptor.flush(); + persistenceInterceptor.destroy(); + } + } + return new BroadcastAction(message); + } + + public BroadcastAction filter(Object originalMessage, Object message) { + return new BroadcastAction(message); + } + + private String jsonify(EventMessage eventMessage, def dataMessage) { + Map jsonResponse = new HashMap(); + jsonResponse.put("topic", eventMessage.getEvent()); + jsonResponse.put("body", dataMessage); + String res = new JSON(jsonResponse).toString(); + return res.length() + DELIMITER + res; + } + +} diff --git a/src/java/org/grails/plugin/platform/events/push/EventsPushHandler.java b/src/java/org/grails/plugin/platform/events/push/EventsPushHandler.java index 0adf58c..bcf8ae6 100644 --- a/src/java/org/grails/plugin/platform/events/push/EventsPushHandler.java +++ b/src/java/org/grails/plugin/platform/events/push/EventsPushHandler.java @@ -17,7 +17,6 @@ */ package org.grails.plugin.platform.events.push; -import grails.converters.JSON; import java.io.IOException; import java.io.InputStreamReader; @@ -48,6 +47,8 @@ import org.springframework.context.ApplicationContext; import org.springframework.util.ReflectionUtils; +import static org.grails.plugin.platform.events.push.SharedConstants.*; + /** * @author Stephane Maldini * @version 1.0 @@ -60,19 +61,18 @@ public class EventsPushHandler extends HttpServlet { static private Logger log = LoggerFactory.getLogger(EventsPushHandler.class); - + + private ApplicationContext applicationContext = null; private Events grailsEvents; private EventsRegistry eventsRegistry; private BroadcasterFactory broadcasterFactory; public static final String ID_GRAILSEVENTS = "grailsEvents"; public static final String ID_GRAILSEVENTSREGISTRY = "grailsEventsRegistry"; - public static final String TOPICS_HEADER = "topics"; public static final String GLOBAL_TOPIC = "eventsbus"; public static final String PUSH_SCOPE = "browser"; public static final String CLIENT_BROADCAST_PARAM = "browser"; public static final String CLIENT_FILTER_PARAM = "browserFilter"; - public static final String DELIMITER = "<@>"; public HashMap broadcastersWhiteList = new HashMap(); @@ -83,7 +83,6 @@ public void init() throws ServletException { super.init(); broadcasterFactory = BroadcasterFactory.getDefault(); - ApplicationContext applicationContext = null; try { applicationContext = ((ApplicationContext) getServletContext().getAttribute(ApplicationAttributes.APPLICATION_CONTEXT)); @@ -103,46 +102,7 @@ public void init() throws ServletException { if (grailsEvents != null && eventsRegistry != null) { Broadcaster b = BroadcasterFactory.getDefault().lookup(GLOBAL_TOPIC, true); b.getBroadcasterConfig().setBroadcasterCache(new HeaderBroadcasterCache()); - b.getBroadcasterConfig().addFilter(new PerRequestBroadcastFilter() { - public BroadcastAction filter(AtmosphereResource atmosphereResource, Object originalMessage, Object message) { - BroadcastSignal signal; - - Boolean pass = false; - - if (BroadcastSignal.class.isAssignableFrom(message.getClass())) { - signal = (BroadcastSignal) message; - - if (atmosphereResource.getRequest().getHeader(TOPICS_HEADER) != null) { - String[] topics = atmosphereResource.getRequest().getHeader(TOPICS_HEADER).split(","); - for (String topic : topics) { - if (topic.equals(signal.eventMessage.getEvent())) { - pass = true; - break; - } - } - } - - if (signal.broadcastClientFilter != null) { - pass = (Boolean) signal.broadcastClientFilter.call( - new Object[]{signal.eventMessageType ? signal.eventMessage : signal.eventMessage.getData(), - atmosphereResource.getRequest()} - ); - } - - if (pass) { - return new BroadcastAction(jsonify(signal.eventMessage)); - } else { - return new BroadcastAction(BroadcastAction.ACTION.ABORT, null); - } - } - - return new BroadcastAction(message); - } - - public BroadcastAction filter(Object originalMessage, Object message) { - return new BroadcastAction(message); - } - }); + b.getBroadcasterConfig().addFilter(new PushPerRequestBroadcastFilter(applicationContext, grailsEvents, eventsRegistry)); broadcastersWhiteList.putAll(registerTopics(eventsRegistry, grailsEvents)); b.scheduleFixedBroadcast(2+DELIMITER+"{}", 10, TimeUnit.SECONDS); } @@ -177,14 +137,6 @@ static public Map registerTopics(EventsRegistry eventsR return doneTopics; } - private String jsonify(EventMessage message) { - Map jsonResponse = new HashMap(); - jsonResponse.put("topic", message.getEvent()); - jsonResponse.put("body", message.getData()); - String res = new JSON(jsonResponse).toString(); - return res.length() + DELIMITER + res; - } - @Override public void doGet(HttpServletRequest req, HttpServletResponse res) throws IOException { Broadcaster defaultBroadcaster = broadcasterFactory.lookup(GLOBAL_TOPIC); diff --git a/src/java/org/grails/plugin/platform/events/push/SharedConstants.java b/src/java/org/grails/plugin/platform/events/push/SharedConstants.java new file mode 100644 index 0000000..04cf4f2 --- /dev/null +++ b/src/java/org/grails/plugin/platform/events/push/SharedConstants.java @@ -0,0 +1,6 @@ +package org.grails.plugin.platform.events.push; + +public class SharedConstants { + public static final String DELIMITER = "<@>"; + public static final String TOPICS_HEADER = "topics"; +}