Shahzad Bhatti

May 6, 2009

Integrating Springs event notification with JMS

Filed under: Java — admin @ 3:56 pm
I recently had to deploy a service that needed deployment in cluster fashion, however I wanted to synchronize some cache between the instances of servers. Since that service was making some use of Spring, I decided to leverage Spring’s builtin event notification with some glue to convert those Spring events into JMS based messaging. First, I defined an event that I can use in the application. As Map of Strings to Strings seemed simple, I used it to store message properties, e.g.

  1 import org.springframework.context.ApplicationEvent;
 
  2 import org.apache.commons.lang.builder.ToStringBuilder;
  3 import org.apache.commons.lang.builder.ToStringStyle;
  4 import java.util.Map;
 
  5 import java.util.HashMap;
  6 import java.util.Collection;  
  7                                                                   
  8 public class MessageEvent extends ApplicationEvent {       
 
  9     private String msg;
 10     private Map<String, String> map;
 11     public MessageEvent(final Object source, final String ... properties) {
 
 12         super(source);                                                            
 13         map = new HashMap<String, String>();
 14         for (int i=0; i<properties.length-1; i+=2) { 
 
 15             String name = properties[i];
 16             String value = properties[i+1];
 17             map.put(name, value);
 18         }
 19     }   
 20     public MessageEvent(final Object source, final Map<String, String> map) {
 
 21         super(source);
 22         this.map = map; 
 23     }       
 24     public String getProperty(final String key) {
 
 25         return map.get(key);
 26     }   
 27     public boolean isProperty(final String key) {
 
 28         String value = map.get(key);
 29         if (value == null) {
 30             return false;
 31         }
 
 32         return new Boolean(value).booleanValue();
 33     }   
 34     public Map<String, String> getProperties() {
 
 35         return map;
 36     }   
 37     @Override
 38     public String toString() {
 39         return new ToStringBuilder(this, ToStringStyle.MULTI_LINE_STYLE)
 
 40                 .append("source", getSource())
 41                 .append("map", this.map)
 42                 .toString();
 43     }
 
 44 }
 45 
 46 
 

I then created a class to convert above message into JMS message, e.g.

  1 import javax.jms.Session;
 
  2 import javax.jms.JMSException;
  3 import javax.jms.Message;
  4 import javax.jms.MapMessage;                                      
 
  5                                                            
  6 import org.springframework.jms.support.converter.MessageConversionException;
  7 import org.springframework.jms.support.converter.MessageConverter;
  8                                                                            
 
  9 import java.util.Map;
 10 import java.util.HashMap;
 11 import java.util.Enumeration;
 12                                         
 
 13 public class MapMessageConverter implements MessageConverter {
 14     public Object fromMessage(final Message message) throws JMSException, MessageConversionException {
 
 15         if (!(message instanceof MapMessage)) {
 16             throw new MessageConversionException("Message isn't a MapMessage");
 
 17         }
 18         MapMessage mapMessage = (MapMessage) message;
 19         Map<String, String> map = new HashMap<String, String>();
 
 20         Enumeration<String> en = mapMessage.getMapNames();
 21         while (en.hasMoreElements()) {
 22             String name = en.nextElement();
 23             String value = mapMessage.getString(name);
 
 24             map.put(name, value);
 25         }
 26         return map;
 27     }       
 28     public Message toMessage(final Object object, final Session session) throws JMSException, MessageConversionException {
 
 29         if (!(object instanceof Map)) {
 30             throw new MessageConversionException("Object isn't a Map");
 
 31         }
 32         Map<String, String> map = (Map<String, String>) object;
 33         MapMessage message = session.createMapMessage();
 34         for (Map.Entry<String, String> e : map.entrySet()) {
 
 35             message.setString(e.getKey(), e.getValue());
 36         }
 37         return message;
 38     }           
 39 }              
 
 40 
 

Next I created a class that listened for Spring application event and converted into JMS message and published it:

  1 import javax.jms.JMSException;
 
  2 import javax.jms.Session;
  3 import javax.jms.Message;
  4 import java.util.Map;
 
  5 import java.util.UUID;
  6 import org.apache.log4j.Logger;                            
  7 import org.springframework.jms.core.JmsTemplate;                            
 
  8 import org.springframework.jms.core.MessageCreator;               
  9 import org.springframework.jms.support.converter.MessageConverter;         
 10 import org.springframework.context.ApplicationEvent;
 
 11 import org.springframework.context.ApplicationListener;
 12     
 13 public class PublisherAdapter implements ApplicationListener {
 
 14     private static final Logger logger = Logger.getLogger(PublisherAdapter.class);
 15     private JmsTemplate jmsTemplate;
 16     private MessageConverter converter;
 
 17             
 18     //      
 19     public void setMessageConverter(final MessageConverter converter) {
 20         this.converter = converter;
 
 21     }
 22         
 23     public void setJmsTemplate(final JmsTemplate jmsTemplate) {
 24         this.jmsTemplate = jmsTemplate;
 
 25     }
 26         
 27     public void publish(final Map<String, String> map) {
 
 28         jmsTemplate.send(new MessageCreator() {
 29                 public Message createMessage(final Session session) throws JMSException {
 
 30                     return converter.toMessage(map, session);
 31                 }
 32             });
 33     }   
 34     
 35     public void onApplicationEvent(final ApplicationEvent event) {
 
 36        if (event.getSource() != converter && event instanceof MessageEvent) {
 37            publish(((MessageEvent)event).getProperties());
 38        }        
 39     }           
 
 40 }
 41 
 42 
 

Then, I created a JMS listener that listened for messages on Topic and converted those into Spring application event:

  1 import org.apache.log4j.Logger;
 
  2 import java.util.Map;
  3 import javax.jms.MessageListener;
  4 import javax.jms.Message;
 
  5 import javax.jms.MapMessage;
  6 import javax.jms.JMSException;
  7 import org.springframework.jms.support.converter.MessageConverter;
 
  8 import org.springframework.context.ApplicationContext;
  9 import org.springframework.context.ApplicationContextAware;
 10 import org.springframework.beans.BeansException;
 
 11 
 12 
 13 public class ListenerAdapter implements MessageListener, ApplicationContextAware {
 14     private static final Logger logger = Logger.getLogger(ListenerAdapter.class);
 
 15     private MessageConverter converter;
 16     private ApplicationContext applicationContext;
 17             
 18     public void setMessageConverter(final MessageConverter converter) {
 
 19         this.converter = converter;
 20     }   
 21     public void setApplicationContext(final ApplicationContext applicationContext) throws BeansException {
 
 22         this.applicationContext = applicationContext;
 23             
 24     }       
 25             
 26     public void onMessage(final Message message) {
 
 27         Map<String, String> map = (Map<String, String>) converter.fromMessage(message);
 28         applicationContext.publishEvent(new MessageEvent(this, map));
 29     }   
 
 30 }       
 31 
 32 
 

Next, here is Spring configuration to bootstrap these listeners:

  1 <?xml version="1.0" encoding="UTF-8"?>
 
  2 <beans>
  3    <bean id="mapMessageConverter" class="com.amazon.jasper.messaging.spring.MapMessageConverter"/>
 
  4   <bean id="springTopic" class="org.apache.activemq.command.ActiveMQTopic">
  5       <constructor-arg index="0" value="springTopic"/>
 
  6    </bean>
  7    <bean id="springJmsTemplate" class="org.springframework.jms.core.JmsTemplate" scope="prototype">
 
  8         <property name="connectionFactory" ref="jmsConnectionFactory"/>
  9         <property name="deliveryPersistent" value="true"/>
 
 10         <property name="messageConverter" ref="mapMessageConverter"/>
 11         <property name="defaultDestination" ref="springTopic"/>
 
 12    </bean>
 13    <bean id="publisherAdapter" class="com.amazon.jasper.messaging.spring.PublisherAdapter" scope="prototype">
 
 14         <property name="jmsTemplate" ref="springJmsTemplate"/>
 15         <property name="messageConverter" ref="mapMessageConverter"/>
 
 16    </bean>
 17    <bean id="springTopicListener" class="com.amazon.jasper.messaging.spring.ListenerAdapter" scope="prototype">
 
 18         <property name="messageConverter" ref="mapMessageConverter"/>
 19    </bean>
 
 20    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer" init-method="start" destroy-method="stop" scope="prototype">
 
 21         <property name="connectionFactory" ref="jmsConnectionFactory"/>
 22         <property name="destination" ref="springTopic"/>
 
 23         <property name="messageListener" ref="springTopicListener"/>
 24         <property name="transactionManager" ref="jmsTransactionManager"/>
 
 25         <property name="concurrentConsumers" value="10"/>
 26    </bean>
 
 27 </beans>
 28 
 29 
 

Finally, here is how you will actually use this plumbing in your code:

  1 import org.springframework.context.ApplicationContext;
 
  2 import org.springframework.context.ApplicationContextAware;
  3 import org.springframework.context.ApplicationEvent;
  4 import org.springframework.context.ApplicationListener;
 
  5 import com.amazon.jasper.workflow.WorkflowContext;
  6 public class Myclass implements ApplicationListener, ApplicationContextAware {
 
  7     private ApplicationContext ctx;
  8 
  9     //  ...
 10             ctx.publishEvent(new MessageEvent(this, SYNC_ID, syncId, SYNC_XPDL, "true"));
 
 11 
 12     public void setApplicationContext(ApplicationContext applicationContext) {
 13         this.ctx = applicationContext;
 14     }
 15 
 
 16     public void onApplicationEvent(ApplicationEvent event) {
 17             if (event instanceof MessageEvent) {
 18                 MessageEvent msgEvt = (MessageEvent)event;
 
 19                 // do cache coherence
 20             }
 21     }
 22 
 23 
 

All you need is to add your class to your Spring configuration file, it will automatically be registered as listener for spring events. All this is fairly simple, but I hope it helps you for similar uses in your code.

No Comments »

No comments yet.

RSS feed for comments on this post. TrackBack URL

Leave a comment

You must be logged in to post a comment.

Powered by WordPress