Shahzad Bhatti Welcome to my ramblings and rants!

July 10, 2006

Integrating Ruby with Java using ActiveMQ and Stomp

Filed under: Computing — admin @ 8:14 pm

Integrating Ruby with Java using ActiveMQ and Stomp
Though, Ruby is a great dynamic language, and it is slowly gaining on Java
and .Net, but often folks from large corporations snub at Ruby for lack of
enterprise features. This criticism is somewhat valid as Ruby and Rails have to play
nice with existing legacy applications. Here, I am showing how Ruby and
Java can be integrated using ActiveMQ messaging middleware. ActiveMQ is a very
popular open source messaging middleware and is highly scalable, robust,
clusterable and peformant piece of software. In an ideal world, we can
use Ruby/Rails, where its strength lies, i.e., web development and
Java can be used on the serverside, where its strength lies.

Here is the recipe for integrating Java with Ruby:

Install Java SE 1.5 and Java EE 1.4

  • Download Java SE 1.5 or above from http://java.sun.com/javase/downloads/index.jsp (Though, this would work with Java SE 1.4, but 1.5 has very good JMX support that helps debugging as we will see later).
  • Set JAVA_HOME environment variable that points to the installation directory for Java SE.
  • Download Java EE 1.4 SDK from http://java.sun.com/javaee/downloads/index.jsp.
  • Set J2EE_HOME environment variable that points to the installation directory for Java EE.

Install ActiveMQ 4.0.1

Install Ruby 1.8 and Stomp 1.0.1

Start ActiveMQ

  • If you are using older version of ActiveMQ, you may need to edit conf/activemq.xml and add
         <transportConnectors>
           <transportConnector uri="tcp://localhost:61616"/>
           <transportConnector uri="stomp://localhost:61613"/>
         </transportConnectors>

    However, if you are using 4.0.1 then it comes with built-in support for
    stomp and you don’t need to do anything.

  • cd to ActiveMQ’s bin directory and type activemq or activemq.bat depending on your operating system.

Debugging ActiveMQ through JMX

  • start jconsole from Java 1.5’s bin directory.
  • It will show activemq’s process, select it as shown below
  • We will get back to jConsole later.

Create a Ruby Client (ruby-stomp.rb) that sends a message and then receives one

 1 #!/usr/bin/env ruby -w

 2 
 3 require 'rubygems'
 4 require 'stomp'
 5 

 6 #class Publisher < ActiveMessaging::Processor
 7 # publishes_to :ServerSideQ

 8 #
 9 #end
10 #pub = Publisher.new

11 #pub.publish :message => "Go Sox!"
12 
13 client = Stomp::Client.open nil, nil, "localhost", 61613

14 client.subscribe("/queue/clientSideReplyQ", {
15   "persistent" => true,
16   "client-id" => "rubyClient",

17 }) do |message|
18   puts "Got Reply: #{message.body} on #{message.headers['destination']}"

19   client.ack message
20 end
21 
22 
23 for i in 1..5 do

24   m = "Go Sox #{i}!"
25   puts m
26   client.send("/queue/ServerSideQ", m, {
27     "persistent" => true,

28     "priority" => 4,
29     "reply-to" => "/queue/clientSideReplyQ",
30   }

31   )
32 end
33 puts "Waiting for response on /queue/clientSideReplyQ"
34 gets
35 client.close 


Create Java Server that listens and then responds

On the server-side, ActiveMQ will use BytesMessage if “content-length”
header is present, otherwise it uses TextMessage. Ruby stomp client sets
this parameter, so on the server side it will be BytesMessage.

 1 import java.io.PrintStream;

 2 import javax.jms.BytesMessage;
 3 import javax.jms.Connection;
 4 import javax.jms.Destination;

 5 import javax.jms.ExceptionListener;
 6 import javax.jms.JMSException;
 7 import javax.jms.Message;

 8 import javax.jms.MessageConsumer;
 9 import javax.jms.MessageEOFException;
10 import javax.jms.MessageListener;

11 import javax.jms.Queue;
12 import javax.jms.QueueConnection;
13 import javax.jms.QueueSender;
14 import javax.jms.QueueSession;

15 import javax.jms.Session;
16 import javax.jms.TextMessage;
17 import org.apache.activemq.ActiveMQConnection;
18 import org.apache.activemq.ActiveMQConnectionFactory;

19 
20 public class Server implements MessageListener, ExceptionListener {
21     Session session;
22     public Server() throws Exception {

23         // I found it simple to do everything here, but production code will
24         // need to be cleaned up and resources management needs to be better
25         // handled.
26         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(

27                 ActiveMQConnection.DEFAULT_USER,
28                 ActiveMQConnection.DEFAULT_PASSWORD,
29                 ActiveMQConnection.DEFAULT_BROKER_URL);
30         Connection connection = connectionFactory.createQueueConnection();
31         connection.setClientID("clientID");

32         connection.setExceptionListener(this);
33         connection.start();
34         session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
35         Destination destination = session.createQueue("ServerSideQ");

36         MessageConsumer consumer = session.createConsumer(destination);
37         consumer.setMessageListener(this);
38     }
39     
40     // I am not closing all resources, which should change for production code.

41     private void close() throws Exception {
42         if (session != null) session.close();

43         //if (connection != null) connection.close();
44     }
45     
46     public static void main(String[] args) throws Exception {

47         new Server();
48         Thread.currentThread().join();
49     }
50     
51     public void onMessage(Message message) {

52         try {
53             if (message instanceof BytesMessage) {
54                 BytesMessage bytMsg = (BytesMessage) message;
55                 StringBuffer msg = new StringBuffer();

56                 int c;
57                 try {
58                     while ((c=bytMsg.readByte()) != -1) {

59                         msg.append((char) c);
60                     }
61                 } catch (javax.jms.MessageEOFException e) {}
62                 System.out.println("Received: " + msg);

63             } else {
64                 System.out.println("Unknown Message: " + message);
65             }
66             session.commit();

67             reply(message);
68         } catch (Exception e) {
69             e.printStackTrace();
70         }
71     }

72     
73     synchronized public void onException(JMSException ex) {
74         System.out.println("JMS Exception occured. " + ex);

75     }
76     
77     public void reply(Message msg) throws Exception {
78         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(

79                 ActiveMQConnection.DEFAULT_USER,
80                 ActiveMQConnection.DEFAULT_PASSWORD,
81                 ActiveMQConnection.DEFAULT_BROKER_URL);
82         QueueConnection replyConnection = connectionFactory.createQueueConnection();
83         replyConnection.setExceptionListener(this);

84         replyConnection.start();
85         QueueSession replySession = (QueueSession) replyConnection.createQueueSession(
86                 true, Session.AUTO_ACKNOWLEDGE);
87         Queue queue = (Queue) msg.getJMSReplyTo();
88         QueueSender sender = replySession.createSender(queue);
89         BytesMessage responseMessage = replySession.createBytesMessage();

90         responseMessage.writeUTF("Response to " + msg);
91         responseMessage.setStringProperty("KEY", "key");
92         sender.send(responseMessage);
93         replySession.commit();

94         sender.close();
95         replyConnection.close();
96     }
97 }

Compile Server.java

When compiling Java file, make sure you include following jar files in your CLASSPATH:

  • activemq-core-4.0.1.jar
  • j2ee.jar
  • incubator-activemq-4.0.1.jar

For example,

 javac -classpath lib/activemq-core-4.0.1.jar;lib/j2ee.jar; -d classes/ Server.java

Start Server

java -classpath lib/incubator-activemq-4.0.1.jar;lib/activemq-core-4.0.1.jar;lib/j2ee.jar;classes/ Server

Run Ruby client

ruby ruby-stomp.rb

Verify jConsole

You should see two queues in jConsole, i.e., “/queue/ServerSideQ”, and
“/queue/clientSideReplyQ”. Look at the # of enqueues and dequeues count.

Voila

Note that we are using persistent queues and subscribers by setting persistent and client-id headers. Refer to ActiveMQ and Stomp for more information on these headers.

Caveat Emptor: I found subscribe functionality of stomp gem a bit shaky and it often missed publications.

Finally, Rails is planning to add support for ActiveMessaging in future, which
is based on Stomp and will make this a bit easier. Following is how it will
look in rails:

 class MyProcessor < ActiveMessaging:: Processor
   subscribes_to :clientSideReplyQ
   def on_message(message)
     puts "received: " + message
   end
 end

Download ruby-stomp.rb and Server.java.

import java.io.PrintStream;

Filed under: Computing — admin @ 4:18 pm

import java.io.PrintStream;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageEOFException;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Server implements MessageListener, ExceptionListener {
Session session;
public Server() throws Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL);
Connection connection = connectionFactory.createQueueConnection();
connection.setClientID(“clientID”);
connection.setExceptionListener(this);
connection.start();
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(“ServerSideQ”);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(this);
//if (session != null) session.close();
//if (connection != null) connection.close();
}

public static void main(String[] args) throws Exception {
new Server();
Thread.currentThread().join();
}

public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
String msg = txtMsg.getText();
System.out.println(“Received: ” + msg);
} else if (message instanceof BytesMessage) {
BytesMessage bytMsg = (BytesMessage) message;
StringBuffer msg = new StringBuffer();
int c;
try {
while ((c=bytMsg.readByte()) != -1) {
msg.append((char) c);
}
} catch (javax.jms.MessageEOFException e) {}
System.out.println(“Received: ” + msg);
}
else {
System.out.println(“Unknown Type ” + message.getClass().getName() + ” Got: ” + message);
}
session.commit();
reply(message);
/*
if (++count % dumpCount == 0) {
dumpStats(connection);
}
*/
}
catch (Exception e) {
System.out.println(“Caught: ” + e);
e.printStackTrace();
}
}

synchronized public void onException(JMSException ex) {
System.out.println(“JMS Exception occured. Shutting down client.”);
}

public void reply(Message msg) throws Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL);
QueueConnection qconnection = connectionFactory.createQueueConnection();
qconnection.setExceptionListener(this);
qconnection.start();
QueueSession qsession = (QueueSession) qconnection.createQueueSession(true, Session.AUTO_ACKNOWLEDGE);
String subject = msg.getJMSReplyTo().toString();
subject = subject.substring(subject.lastIndexOf(‘/’)+1);
//Queue queue = (Queue) session.createQueue(subject);
Queue queue = (Queue) msg.getJMSReplyTo();
QueueSender sender = qsession.createSender(queue);
TextMessage responseMessage = qsession.createTextMessage(“hello back”);
//BytesMessage responseMessage = qsession.createBytesMessage();
//responseMessage.writeUTF(“Response to ” + msg);
responseMessage.setStringProperty(“KEY”, “key”);
sender.send(responseMessage);
qsession.commit();
sender.close();
qconnection.close();
System.out.println(“—Sent ” + responseMessage.toString().substring(0, 20) + ” to ” + subject + “—” + queue);
}
}

Powered by WordPress