Shahzad Bhatti Welcome to my ramblings and rants!

August 2, 2006

Message-Driven Pojos with ActiveMQ in Java

Filed under: Computing — admin @ 11:27 am

Message-Driven Pojos with ActiveMQ in Java
ActiveMQ is a popular open source messaging middleware, that obviates the need
for application server just for the sake of messaging. It offers Message
Driven Pojos, which is alternative to Message Driven Beans. You can still use JMS APIs and even transactions through Jencks (lightweight JCA container).

Here is what you need:

Download Java SE 1.5

Download Java SE 1.5 from http://java.sun.com/javase/downloads/index.jsp (Though, this would work with Java SE 1.4, but 1.5 has very good JMX support that helps debugging as we will see later).

Set JAVA_HOME environment variable that points to the installation directory for Java SE.

Download and install ActiveMQ 4.0.1

set ACTIVEMQ_HOME environment variable to point to installation directory of ActiveMQ.

Copy ActiveMQ JCA (rar) file to JBoss’s deploy directory, e.g.

cp $ACTIVEMQ_HOME/lib/optional/optional/activemq-ra-4.0-RC2.rar $JBOSS_HOME/server/all/deploy

Start Active MQ Server

cd $ACTIVEMQ_HOME/bin
sh activemq

Download Jencks from jencks.org
Download Spring 2.0 from springframework.org
Download Spring-XBean 2.5 from springframework.org
Define Message Driven Pojo, in src/mdp/MessageReceiver.java e.g.

 

 1
 2  package mdp;
 3  import org.apache.log4j.Logger;
 4  import java.io.Serializable;
 5  import javax.jms.Message;
 6  import javax.jms.ObjectMessage;
 7  import javax.jms.JMSException;
 8  import javax.jms.ExceptionListener;
 9  import javax.jms.MessageListener;
10  import javax.jms.Session;
11  import javax.jms.Queue;
12  import javax.jms.QueueConnection;
13  import javax.jms.QueueConnectionFactory;
14  import javax.jms.QueueSender;
15  import javax.jms.QueueSession;
16  import org.springframework.jms.support.converter.SimpleMessageConverter;
17
18  import org.springframework.beans.BeansException;
19  import org.springframework.beans.factory.NoSuchBeanDefinitionException;
20  import org.springframework.context.ApplicationContext;
21  import org.springframework.context.ApplicationContextAware;
22  import org.springframework.context.support.ClassPathXmlApplicationContext;
23
24  public class MessageReceiver implements MessageListener, ApplicationContextAware
25   {
26      final Logger log = Logger.getLogger(getClass());
27
28      private ApplicationContext applicationContext;
29      private QueueConnectionFactory connectionFactory;
30      private ExceptionListener exceptionListener;
31
32      private SimpleMessageConverter converter = new SimpleMessageConverter();
33
34      public final void onMessage(Message jmsMessage){
35          try {
36              ObjectMessage objectMessage = (ObjectMessage) jmsMessage;
37              Serializable response = handleMessage(objectMessage.getObject());
38              reply((Queue) jmsMessage.getJMSReplyTo(), response);
39          } catch (JMSException jmsException){
40              log.error("Error handling " + jmsMessage, jmsException);
41              if (exceptionListener != null) exceptionListener.onException(jmsException);
42          }
43      }
44
45      public void setApplicationContext(ApplicationContext applicationContext) throws Beans
46  Exception {
47          this.applicationContext = applicationContext;
48          this.connectionFactory = (QueueConnectionFactory) applicationContext.getBean("jms
49  Factory");
50      }
51
52      private void reply(Queue queue, Serializable data)
53          throws JMSException {
54          if (queue == null) {
55              log.warn(" No reply queue specified for data " + data);
56              return;
57          }
58          QueueConnection connection = connectionFactory.createQueueConnection();
59          QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLE
60  DGE);
61
62          QueueSender sender = session.createSender(queue);
63
64          connection.start();
65          ObjectMessage m = null;
66          try {
67              m = session.createObjectMessage(data);
68              sender.send(m);
69              log.info(" replying -------- -> Message: " + data);
70          } finally {
71              connection.close();
72          }
73     }
74
75      public void setExceptionListener(ExceptionListener el) {
76          this.exceptionListener = el;
77      }
78
79      protected Serializable handleMessage(Serializable message) {
80          Integer n = (Integer) message;
81          return new Integer(n.intValue() * n.intValue());
82      }
83  }
Define Message Sender in src/client/MessageSender.java, e.g.

 1  package client;
 2  import org.apache.log4j.Logger;
 3  import javax.jms.Session;
 4  import java.io.Serializable;
 5 
 6  import javax.jms.JMSException;
 7  import javax.jms.QueueReceiver;
 8  import javax.jms.Session;
 9  import javax.jms.ObjectMessage;
10  import javax.jms.Queue;
11  import javax.jms.QueueConnection;
12  import javax.jms.QueueConnectionFactory;
13  import javax.jms.QueueSender;
14  import javax.jms.QueueSession;
15 
16  public class MessageSenderImpl implements MessageSender {
17      static final Logger log = Logger.getLogger(MessageSender.class);
18 
19      private long timeout;
20      private QueueConnectionFactory connectionFactory;
21 
22      public MessageSenderImpl(QueueConnectionFactory connectionFactory, long timeout) {
23          this.connectionFactory = connectionFactory;
24          this.timeout = timeout;
25      }
26 
27      public Serializable sendReceive(
28                  String queueName,
29                  Serializable data) throws JMSException {
30          QueueConnection connection = connectionFactory.createQueueConnection();
31          QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLE
32  DGE);
33          Queue queue = session.createQueue(queueName);
34          QueueSender sender = session.createSender(queue);
35 
36          Queue tempQ = session.createTemporaryQueue();
37          connection.start();
38          ObjectMessage m = null;
39          try {
40              m = session.createObjectMessage(data);
41              m.setJMSReplyTo(tempQ);
42              sender.send(m);
43              m = null;
44              //session.commit();
45              log.info(" sending -------- -> Message: " + data);
46              QueueReceiver receiver = session.createReceiver(tempQ);
47 
48              m = (ObjectMessage) receiver.receive(timeout);
49 
50          } finally {
51              connection.close();
52          }
53          if (m == null) {
54              throw new JMSException("Failed to receive response from " + queueName + " within " + timeout + " millis for request " + data);
55              //throw new TimeoutException("Failed to receive response from " + queueName + " within " + timeout + " millis for request " + data);
56          }
57          log.info(" received -------- -> Message: " + m.getObject());
58          return m.getObject();
59     }
60 
61      public void setTimeout(long timeout) {
62          this.timeout = timeout;
63      }
64  }

Define application-context.xml as follows:

 <?xml version="1.0" encoding="UTF-8"?>

 <!-- START SNIPPET: spring -->
 <!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd">
 <beans>

   <!--
   || ActiveMQ Broker
     <bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
       <property name="config" value="classpath:tbs-activemq.xml" />
       <property name="start" value="true" />
     </bean>
   -->

     <bean id="brokerURL" class="java.lang.String">
         <constructor-arg>
             <value>tcp://localhost:61616</value>
         </constructor-arg>
     </bean>

     <bean id="activeMQContainer" class="org.jencks.JCAContainer" singleton="true">
         <property name="bootstrapContext">
                 <bean class="org.jencks.factory.BootstrapContextFactoryBean">
                         <property name="threadPoolSize" value="25" />
                 </bean>
          </property>
         <!-- the JCA Resource Adapter -->
         <property name="resourceAdapter">
             <bean id="activeMQResourceAdapter"
                   class="org.apache.activemq.ra.ActiveMQResourceAdapter">
                 <property name="serverUrl" ref="brokerURL"/>
             </bean>
         </property>
     </bean>

     <!-- plain connection factory -->
     <bean id="jmsFactory"  class="org.apache.activemq.ActiveMQConnectionFactory">
         <property name="brokerURL" ref="brokerURL"/>
     </bean>

     <bean id="MessageSender" class="com.orbitz.tbs.host.txn.messaging.MessageSenderImpl">
         <constructor-arg>
             <ref bean="jmsFactory"/>
         </constructor-arg>
         <constructor-arg>
             <value>30000</value>
         </constructor-arg>
     </bean>

   <!--
     || an inbound message connector using a stateless, thread safe MessageListener
     -->
   <!-- START SNIPPET: inbound -->
   <bean id="inboundConnectorA" class="org.jencks.JCAConnector">

     <property name="jcaContainer" ref="activeMQContainer" />

     <!-- subscription details -->
     <property name="activationSpec">
       <bean class="org.apache.activemq.ra.ActiveMQActivationSpec">
         <property name="destination" value="testQ"/>
         <property name="destinationType" value="javax.jms.Queue"/>
       </bean>
     </property>
     <property name="ref" value="MessageReceiver"/>
   </bean>
   <!-- END SNIPPET: inbound -->

   <bean id="MessageReceiver" class="mdp.MessageReceiver">
   </bean>
Write Unit test/MessageSenderTest.java as follows:

 1  package com.orbitz.tbs.host.txn.messaging;
 2 
 3  import junit.framework.TestCase;
 4  import com.orbitz.tbs.host.txn.messaging.MessageSender;
 5  import org.springframework.context.support.ClassPathXmlApplicationContext;
 6 
 7  public class MessageSenderTest extends TestCase {
 8      protected static ClassPathXmlApplicationContext appContext = new ClassPathXmlApplicationContext(new String[]{"TBSMessagingOrbitzBeans.xml"});
 9      MessageSender sender;
10 
11    public MessageSenderTest(String name) {
12      super(name);
13    }
14 
15    protected void setUp() throws Exception {
16      super.setUp();
17      sender = (MessageSender) appContext.getBean("MessageSender", MessageSender.class);
18    }
19 
20    protected void tearDown() throws Exception {
21      sender = null;
22      super.tearDown();
23    }
24 
25    public void testSendReceive() throws Exception {
26      for (int i=0; i&lt;10; i++) {
27        Integer data = (Integer) sender.sendReceive("CancellationQ", new Integer(i));
28        assertEquals("Data didn't match", new Integer(i*i), data);
29      }
30    }
31 
32    public static void main(String[] args) {
33      junit.textui.TestRunner.run(MessageSenderTest.class);
34    }
35  }

Compile your code as follows:

javac -d build -classpath $ACTIVEMQ_HOME/lib/optional/spring-1.2.4.jar:\
$ACTIVEMQ_HOME/lib/optional/activemq-optional-4.0.jar:$ACTIVEMQ_HOME/lib/activeio-core-3.0.jar:\
$ACTIVEMQ_HOME/lib/activemq-core-4.0.jar:$ACTIVEMQ_HOME/lib/activemq-console-4.0.jar src/*/*java

Start activemq server:

cd $ACTIVEMQ_HOME/bin
./activemq

Run Test:

java -classpath build:$ACTIVEMQ_HOME/lib/optional/spring-1.2.4.jar:\
$ACTIVEMQ_HOME/lib/optional/activemq-optional-4.0.jar:$ACTIVEMQ_HOME/lib/activeio-core-3.0.jar:\
$ACTIVEMQ_HOME/lib/activemq-core-4.0.jar:$ACTIVEMQ_HOME/lib/activemq-console-4.0.jar client.MessageSender

Start jConsole (require Java 1.5) to verify messages using JMX

.
Select Queue and then see # of messages

Voila

Finally, you can also run embedded ActiveMQ, though it is commented in the spring configuration file.

No Comments

No comments yet.

RSS feed for comments on this post. TrackBack URL

Sorry, the comment form is closed at this time.

Powered by WordPress