Thursday, February 17, 2011

Push in Grails

There are a few push applications for grails (horrible documentation) and we had to tweak a few things for our application.

Asynchronous Notification or Push follows the typical publish/subscribe design pattern. One of the things about push is it broadcasts to everyone while we had to come up with a way to push messages to specific clients so as not to waste resources on unnecessary updates.

ICEPush:
We started with ICEPush as one advantage it offered over comet is it did not keep a thread open for each request. However we noticed that plugin did not seem well designed to keep track of URL mappings and context paths. I tried to get some feedback on their forums but it doesnt seem like the technology is under active development
forum post

Atmosphere:
Atmosphere seems to have a bigger community following offering comet push and long polling options. Setting it up in grails is pretty simple

grails install-plugin atmosphere


You can then create a service as an atmosphere service

grails create-service clientCountBroadcastService

class ClientCountBroadcastService {
 
    static transactional = false
 
    static atmosphere = [mapping: '/atmosphere/clientNotify']
 
    def onRequest = { event ->
    }
 
    def onStateChange = { event ->
    }
 
}


With this service, UI clients can register with this service and get response on state change. For us we needed to push notifications on a per 'Client' entity and obviously not push notifications for unrelated clients.

At application start up, we then created client specific spring beans and append to the application context in BootStrap.groovy

class BootStrap {
 
    def init = { servletContext ->
        def clientCountBroadcasts = Client.getClientIds().collect { clientId -> ["/atmosphere/clientCountBroadcast/${clientId}", clientId] }
        if (clientCountBroadcasts) {
            def parentContext = servletContext.getAttribute(GrailsApplicationAttributes.APPLICATION_CONTEXT)
            def bb = new grails.spring.BeanBuilder(parentContext)
 
            def beanName = "clientCountBroadcast"
            def beans = bb.beans {
                clientCountBroadcasts.each { clientCountBroadcast ->
                    "${beanName}${clientCountBroadcast[1]}Service"(com.icrossing.cmp.kwbt.ClientCountBroadcastService) {
                    }
                    "${beanName}${clientCountBroadcast[1]}ServiceGrailsHandler"(com.odelia.grails.plugins.atmosphere.GrailsHandler) {
                        targetService = ref("${beanName}${clientCountBroadcast[1]}Service")
                        servletContext = servletContext
                    }
                }
            }
            beans.registerBeans(parentContext)
            def handlers = servletContext.getAttribute(StratosphereServlet.ATMOSPHERE_PLUGIN_SERVICE_HANDLERS)
            clientCountBroadcasts.each { clientCountBroadcast ->
                addMethod(parentContext.getBean("${beanName}${clientCountBroadcast[1]}Service"))
                handlers << [mapping: "${clientCountBroadcast[0]}",
                             handler: parentContext.getBean("${beanName}${clientCountBroadcast[1]}ServiceGrailsHandler")]
            }
            servletContext.setAttribute(StratosphereServlet.ATMOSPHERE_PLUGIN_SERVICE_HANDLERS, handlers)
        }
    }
 
    def destroy = { }
 
    private addMethod(source) {
        source.metaClass.getBroadcaster = {->
            def _broadcaster = [:]
            servletContext[StratosphereServlet.ATMOSPHERE_PLUGIN_HANDLERS_CONFIG].each {
                _broadcaster."${it.key}" = it.value.broadcaster
            }
            _broadcaster
        }
    }



Each client atmosphere bean has a dynamic method to broadcast messages to subscriber. Our client count broadcast service now has the following structure.

class ClientCountBroadcastService {

    static transactional = false

    def onRequest = { event ->
        event.suspend()
        println "onRequest"
    }

    def onStateChange = { event ->
        println "onStateChange"
        def response = event.resource.response
        response.writer.with {
            write event.message
            flush()
        }
        event.resume()
    }

    def broadcastCount(def clientId, def clientCount, def clientVolume) {
        println "broadcastCount"
        def broadcaster = getBroadcaster()
        def clientCountBroadcaster = broadcaster["/atmosphere/clientCountBroadcast/${clientId}"]
        clientCountBroadcaster.broadcast("{\"successeful\":\"true\",\"clientCount\":\"$clientCount\",\"clientVolume\":\"$clientVolume\"}")
    }
    
}

On the UI side we decided to go with long-polling. So a request comes in to the onRequest method where it is suspended and on state change it broadcasts the client count and volume. As we went with long polling, we have to resume the thread on broadcast (which essentially closes the thread and the UI needs to register again).


The UI on load will register with a specific client based broadcaster using long polling and on callback update the text.

...   

//using the jquery atmosphere plugin in head

     
     


.....


    
    function callback (response) {
        $('#updateCount').text(response.clientCount);
        $('#updateVolume').text(response.clientVolume);
    }

    $(document).ready( function (){
        setTimeout(function () {
            $.atmosphere.subscribe(
                KWBT.app.root + '/atmosphere/clientCountBroadcast/${params.clientId}',
                callback,
                $.atmosphere.request = {
                    fallbackTransport : 'long-polling',
                }
            );
        }, 250);
    });


....