Shahzad Bhatti

July 1, 2008

Designing Microblogging system for Scalability

Filed under: Computing — admin @ 5:12 pm

Introduction

I have been a Twitter user for a while, have observed or heard about downtime and scalability problems with Twitter. The scalability of Twitter has become a topic for a lot of discussions and blogs and has also offered a useful excercise to design scalable systems. A common root cause as identified from Twitter’s blogs is that the architecture is based on CMS because it was written in Ruby on Rails and that is what Rails good at. The solution to the scalability problem as pointed by other people is messaging based architecture. There’s also been a lot of blame for Twitter’s problems on Ruby and Rails because Ruby is a slow language compared to other static and dynamic languages and Rails is not built for scalability. Though, there is some truth to it, but I don’t think there are the sole bottlenecks. In fact, I am going to show a small prototype written in Ruby and Rails (partially) that integrate with the messaging middlewares, which can be scaled easily. I have been using messaging middlwares such as CORBA Event service, IBM MQ series, Websphere, Weblogic, Tibco and ActiveMQ for over ten years and have long been proponent of messaging based sysems for scalable systems [1] [2]. So, I spent a few hours to put together a prototype based on messaging middleware and Ruby on Rails to see how such system can be developed.

Design Principles

Before describing my design, I am going to review some of the design principles that I have used for building large scale systems ([1], [2]), which are:

  • Coupling/Cohesion – loosely coupled and shared nothing architecture and partitioning based on functionality.
  • Messaging or SEDA architecture to implement reliable and scalable services and avoid temporal dependencies.
  • Resource management – good old practices of avoiding leaks of memory, database connections, etc.
  • Data replication especially read-only data.
  • Partition data (Sharding) – using multiple databases to partition the data.
  • Avoid single point of failure.
  • Bring processing closer to the data.
  • Design for service failures and crashes.
  • Dynamic Resources – Design service frameworks so that resources can be removed or added automatically and clients can automatically discover them. Use virtualization and horizontal scalability whenever possible.
  • Smart Caching – Cache expensive operations and contents as much as possible.
  • Avoid distributed transactions, use optimistic compensating transactions (See CAP principle).

Architecture & Design

Following is high level architecture for the Microblogging system:

First, I selected REST architecture as an entry point to our system for both Web UI and 3rd party applications and used messaging middleware for implementing the sevices. This gives us ease of access with REST APIs and scalability with messaging. In my implementation I chose JRuby/Rails to implement most of the code, Derby for the database and ActiveMQ for the messaging store. In addition to scalability, the messaging middleware gives you a lot of advantages from functional languages like Erlang such as immutability, message passing, fault tolerance (via persistence queues). You can even build support for versioning and hot code swapping by adding version number to each message and creating routers (See integration patterns) to direct messages to different handlers.

APIs

Following are REST APIs that will be exposed to 3rd party apps, Web and other kind of UI:

Create User

POST /users
where the user information is passed in the body in the form of parameters.

Login/Authenticate User

POST /users/userid/sessions
This will authenticate the user and create a session. Note that most of following APIs send back session-id, will be stored in the database (sharded) and will be used to retrieve user information.

Logout

HTTP-HEADER
session-id

DELETE /users/userid/sessions

Get User information

HTTP-HEADER
session-id

GET /users/userid
This API will return detailed user information

Anonymous User information

GET /users/userid
This API will return public user information

List of Followings

HTTP-HEADER
session-id

GET /followings/userid
This API will return summary of people, the user is following.

Create Followers

HTTP-HEADER
session-id

POST /followers/followerid
This API will create one-way follower relationship between the user and follower.

Enable notification for Followers

HTTP-HEADER
session-id

POST /followers/followerid/notifications

Disable notification for Followers

HTTP-HEADER
session-id

DELETE /followers/followerid/notifications

Block Followers

HTTP-HEADER
session-id

POST /followers/followerid/blocking

Unblock Followers

HTTP-HEADER
session-id

DELETE /followers/followerid/blocking

Follower Exist

HTTP-HEADER
session-id

GET /followers/followerid
This API will return 200 HTTP code if follower exist.

List of Followers

HTTP-HEADER
session-id

GET /followers
This API will return summary of people, the user is following.

Archive Messages

HTTP-HEADER
session-id

GET /messages?offset=xxx&limit=yyy&since=date
This API will return archived messages for the user, where offset and limit will be optional.

DELETE Messages

HTTP-HEADER
session-id

DELETE /messages/message-id
This API will return archived messages for the user, where offset and limit will be optional.

Send Direct Messages

HTTP-HEADER
session-id

POST /directmessages/targetuserid
This API will return send direct message to the given user.

Send Reply

HTTP-HEADER
session-id

POST /reply/message-id
This API will return reply for the given message-id and pass the contents of the message in the body (as parameters).

Direct Messages Received

HTTP-HEADER
session-id

GET /directmessages/userid?offset=xxx&limit=yyy&since=date
This API will return messages received by the user.

Replies Received

HTTP-HEADER
session-id

GET /replies?offset=xxx&limit=yyy&since=date
This API will return replies received by the user.

Update Status

HTTP-HEADER
session-id

POST /statuses
This API will update status of the user and pass the contents of the message in the body (as parameters).

Get Statuses

HTTP-HEADER
session-id

GET /statuses?offset=xxx&limit=yyy&since=date
This API will update status of the user and pass the contents of the message in the body (as parameters).

User Timeline

HTTP-HEADER
session-id

GET /timeline/username?offset=xxx&limit=yyy&since=date
This API will return timeline of the user. This API will compare given username with the authenticated username and will return detailed timeline if match, otherwise it will return public timeline.

Public Timeline

GET /timeline/username?offset=xxx&limit=yyy&since=date
This API will return public timeline of the user.

Friends Timeline

HTTP-HEADER
session-id

GET /friendstimeline?offset=xxx&limit=yyy&since=date
This API will return timeline of the friends of the user.

Request Flow

Here is an illustration of how information is flowed through different components:

Though, I am not showing request flow of all APIs, but they will follow similar pattern of flow.

Detailed Design

Domain Classes

Primary domain classes are:

  • User
  • Message, which has four subclasses DirectMessage, ReplyMessage, Tweet and Status for various kind of messages in the system.
  • Follower – creates one-way relationship between two users, where follower can choose to be notified when the user changes his/her status.

I identify each message with special GUID and using a simple scheme to generate GUID for request-ids and message-ids, but for real project I would recommend better libraries such as UUIDTools.

Schema

Followers

  1 
  2 class CreateFollowers < ActiveRecord::Migration
  3   def self.up
  4     
  5     create_table :followers do |t|
  6       t.column :username,            :string, :limit => 16
  7       t.column :follower_username,   :string, :limit => 16
  8       t.column :relation_type,       :string, :default => 'Follower', :limit => 32
  9       t.column :blocked,             :boolean, :default => false
 10       t.column :notifications,       :boolean, :default => false
 11       t.column :created_at,          :datetime
 12       t.column :updated_at,          :datetime
 13       t.column :deleted_at,          :datetime
 14     end
 15     add_index :followers, :username
 16     add_index :followers, :follower_username
 17   end
 18 
 19   def self.down
 20     drop_table :followers
 21   end
 22 end
 
 

Messages

  1 class CreateMessages < ActiveRecord::Migration
  2   def self.up
  3     create_table :messages do |t|
  4       t.column :message_id,          :string, :limit => 64
  5       t.column :type,                :string, :limit => 32
  6       t.column :message_type,        :string, :default => 'Say', :limit => 32
  7       t.column :reply_message_id,    :string, :limit => 64
  8       t.column :username,            :string, :limit => 16
  9       t.column :channel_name,        :string, :limit => 32
 10       t.column :message_body,        :string, :limit => 140
 11       t.column :favorite,            :boolean, :default => false
 12       t.column :sent_at,             :datetime, :default => Time.now.utc
 13       t.column :created_at,          :datetime
 14       t.column :deleted_at,          :datetime
 15     end
 16     add_index :messages, :message_id
 17     add_index :messages, :username
 18   end
 19 
 20   def self.down
 21     drop_table :messages
 22   end
 23 end
 
 

Users

  1 class CreateUsers < ActiveRecord::Migration
  2   def self.up
  3     create_table :users do |t|
  4       t.column :username,            :string, :limit => 16
  5       t.column :password,            :string, :limit => 16
  6       t.column :name,                :string, :limit => 64
  7       t.column :email,               :string, :limit => 64
  8       t.column :time_zone_id,        :string, :limit => 32
  9       t.column :created_at,          :datetime
 10       t.column :updated_at,          :datetime
 11       t.column :deleted_at,          :datetime
 12     end
 13     add_index :users, :username
 14   end
 15 
 16   def self.down
 17     drop_table :users
 18   end
 19 end
 
 

Persistence

I used Rails’ ActiveRecord library to provide persistence, though alternatively I could have used ActiveHibernate. These libraries provide a quick way to add persistence capabilities with minimum configuration and boilerplate. This prototype is using multiple levels of partitioning, first at the service level, second at the persistence level. I am using multiple databases of Derby to store objects using a simple hashing scheme for load distribution. This prototype also shows how to connect to multiple databases in Rails, which was difficult in early implementation of Twitter.

Domain Services

The core model and services use domain driven design and applies principles of fat model and thin service (as opposed to fat servicess and anemic model). The domain services implement external REST APIs and use underlying ActiveRecord for most of the functionality.

Messaging Middleware

The REST based web services don’t invoke domain services directly, instead they use messaging middleware. In real application, I might use ESB/integration patterns such as intelligent routing to partition the system across multiple machines and send the request to the suitable queue. In this prototype, I am simply using ActiveMQ, which is fairly robust and easy to use. I am also using separate queues for different kind of operations. Another lesson I have learned in building large systems is to separate reads from the writes so that you can scale them independently and also offer different quality of services, e.g. read queues can be non-persistent, but write queues can be persistent.

Business Delegate

The REST based web services don’t interact with the messaging middleware directly, instead they use business delegates that hides all details of sending out message, creating temporary queues and receiving messages. The interface of business delegates is same as services.

Benchmark Results

Though, performance was not the objective of my prototype, but I tried to check how many requests I can process on my development machine. I chose only to benchmark messaging middlewares and not REST server because JVM uses native threads and web containers such as Tomcat uses a small sized thread pool to perform requests. Since, our architecture is heavily IO-bound, that would not scale. Alternatively, I could have build reactive or event based APIs for HTTP or use Yaws/Mochiweb as a container for REST based web sevices because creating a process in Erlang is pretty cheap. For example, Erlang process takes 300 bytes, whereas Java thread take 2M by default (though, it can be reduced to 256K on most machines). Here are results of running a simple server with embedded ActiveMQ and load test both running JRuby on my Pentium Linux machine. I used default VM size for both JRuby processes and didn’t tune any options:

What Elapsed Time (secs) Throughput Invocation Times
load_test_create_users 99.041000 10.09682857311318/s 1000
load_test_bad_authenticate 79.407000 12.593348183199506/s 1000
load_test_good_authenticate 91.802000 10.893008861477503/s 1000
load_test_get_users 91.362000 10.945469671474584/s 1000
load_test_create_followers 143.560000 6.958714408811979/s 999
load_test_follower_exists 95.140000 10.50020495537709/s 999
load_test_get_followers 99.881000 10.002002391423325/s 999
load_test_get_followings 104.737000 9.538176576655392/s 999
load_test_block_follower 156.106000 6.399497779340769/s 999
load_test_unblock_follower 161.950000 6.168532449211547/s 999
load_test_follower_enable_notifications 157.637000 6.337344665142635/s 999
load_test_follower_disable_notifications 161.714000 6.1775727524053545/s 999
load_test_send_direct_message 1673.494000 5.969546350480188/s 9990
load_test_messages_sent_receive 157.832000 6.323179075798668/s 998
load_test_send_direct_message 1725.717000 5.788898179687536/s 9990
load_test_send_replies 21.546000 5.105592926919201/s 110
load_test_get_user_status 86.952000 11.500598043963544/s 1000
load_test_update_status 213.298000 4.688276498896948/s 1000
load_test_get_user_status 88.987000 11.23759650430517/s 1000
load_test_archive_messages 80.982000 12.348575563593377/s 1000
load_test_public_timeline 98.143000 10.189213685309506/s 1000
load_test_user_timeline 92.871000 10.767623902461311/s 1000
load_test_friends_timeline 140.155000 7.1349577268319555/s 1000
load_test_user_and_friends_timeline 193.257000 5.174430032905596/s 1000

I was not impressed with the results I got and I may implement similar prototype in Erlang using MochiWeb, Mnesia and Ejabber.

Summary

Though, the technologies I choose are somewhat arbitrary, the same design principles can be applied to other technology stack. Though, technologies are generaly selected due to political reasons or personal preference, it is important to consider team’s familiarity with the technology stack. For example, Twitter tried Ejabber but had problems troubleshooting becaues they were not familiar with Erlang.
Again, I used architecture principles such as stateless services, though in real life I may have to store some state which can be stored in the shareded database. I used embedded services such as embedded database server and messaging server because you can then easily start a single process on a machine and replicate machines as the load increases. I used partitioning/sharding heavily, which is key for scalability. I also used replication specially replicating messages for each follower so that the reads are fast and we don’t spend a lot of time querying the database. Though, it does add cost on the write side and adds disk requirements. I think we can control those by limiting number of messages per user per minute and maximum number of notifications. Also, we can remove old messages from the database. I also used principle of bringing computating closer to data by using embedded database.

Other Improvements

I tried to show that scalability problems are best solved from the architecture that is independent of a particular technology or language. In fact, I may implement similar design in Erlang using Mnesia as the database, Ejabber as the messaging middleware, Mochiweb for REST and OTP for implementing services. Other improvements that I would suggest be use of reverse proxy server for caching along with DMZ for security. We can also add some throttling to the reverse proxy servers. In order to have better fault tolerance, I might have multiple reverse proxy servers and use DNS round robin, though I don’t like DNS based load balance for real load distribution because it does not take server’s capacity and load into account, but it would suitable in this case. Also, I didn’t implement any caching, but we can use caching solutions such as EHCache, Tangosol, Terracotta, Memcache, etc. Though, caching highly dynamic contents may be difficult and less reusable. Also, I didn’t use any ESB (lightweight providers such as Mule, ServiceMix), but it can be used to abstract routing to the services, load balance, aggregation, replication, transformation, etc. Also, we can build support for versioning and hot code swapping by adding version number to each message and changing routing schemes. This prototype uses a simple scheme for partitioning by creating hash of business keys such as username, however it is difficult to manage when you need to add or remove servers because it requires migrating data. Another solution is to use MD5 scheme that we use at Amazon for S3, which may calculate MD5 of username and each replicated queue and select the queue whose md5-sum is higher than the username’s sum. For further fault tolerance, we could replicate data in Master/Slave fashion.

PS: I noticed “NativeException: javax.jms.JMSException: PermGen space” a couple of times running long load test, which is generally caused by reloading classes (as I have observed this for many years in Weblogic and Tomcat). I probably need to investigate this further but my guess is that JRuby is doing something dumb.

Download

Source Source Code

Appendix

Source Code Model

./model/user.rb
  1 ###
  2 # User of the system.
  3 # Though, in our system user can have many messages that it sent to update 
  4 # his/her status or other users and can also have followers and followings, but
  5 # we are not showing those relationships because we cannot populate them due to
  6 # sharding and scalability concerns.
  7 
  8
  9 ###
 10 class User < ActiveRecord::Base
 11     validates_presence_of       :name
 12     validates_presence_of       :username,         :within => 3..16
 13     validates_presence_of       :password,      :within => 3..16
 14     validates_length_of         :email,         :within => 3..64
 15     validates_uniqueness_of     :email,         :case_sensitive => false
 16     validates_format_of         :email,         :with => /^([^@\s]+)@((?:[-a-z0-9]+\.)+[a-z]{2,})$/i
 17     #
 18     ### all messages for the same user wil be stored in the same database.
 19     ### also, all messages from the users he/she is following will be copied to the user's database
 20     ### if they lie in different databases.
 21     ### limiting rows returned from these relationships to 100 so that we don't overwhelm server.
 22     #
 23     has_many :statuses, :class_name => 'Message', :foreign_key => :username, :limit => 100, :order => 'sent_at desc'
 24     has_many :tweets, :class_name => 'Message', :foreign_key => :channel_name, :limit => 100, :order => 'sent_at desc'
 25 
 26     #
 27     ### We will replicate follower/following to each database and though we may not have corresponding users,
 28     ### but we can have usernames
 29     #
 30     def self.follower_usernames(username, offset, limit)
 31        Follower.followers(username, offset, limit).map {|f| f.follower_username }
 32     end
 33 
 34     def self.following_usernames(username, offset, limit)
 35        Follower.followings(username, offset, limit).map {|f| f.username }
 36     end
 37 
 38     def self.follower_count(username)
 39        Follower.followers_count(username)
 40     end
 41     def self.following_count(username)
 42        Follower.followers_count(username)
 43     end
 44 
 45     #
 46     ### user's timeline
 47     #
 48     def self.timeline(username, offset, limit)
 49        Message.find(:all, :conditions => ['username = ?', username], :offset => offset, :limit => limit)
 50     end
 51 
 52     def to_s
 53       "#{username}"
 54     end
 55 
 56     def to_external_xml(xml)
 57       xml.user(:username => self.username, :name => self.name, :email => self.email)
 58     end
 59 end
 
 

./model/message.rb
  1 #
  2 ###
  3 # Message stores any message sent by user to update his/her status, tweets
  4 # from other users that he/she is following, direct message sent to another
  5 # user or reply message sent in response to tweet or direct message.
  6 # Note that we use a GUID based message_id in conjunction with the database column 'id'
  7 # because we are using sharding the database column can be duplicate, but the GUID
  8 # based message_id will always be unique across all databases.
  9 ###
 10 class Message < ActiveRecord::Base
 11     MESSAGE_TYPE_SAY = "Say"
 12     MESSAGE_TYPE_SHOUT = "Shout"
 13 #
 14     validates_presence_of       :message_id,    :within => 1..64
 15     validates_presence_of       :username
 16     #
 17     ### channel_name will be username of the target user for now,
 18     ### though, in future it could be a group or a topic
 19     #
 20     validates_presence_of       :channel_name,  :within => 1..32
 21     validates_presence_of       :message_body,  :within => 1..140
 22     validates_presence_of       :message_type,  :within => 1..32
 23     validates_presence_of       :sent_at
 24     validates_uniqueness_of     :message_id
 25     #
 26     ### all messages for the same user wil be stored in the same database.
 27     ### also, all messages from the users he/she is following will be copied to the user's database
 28     ### if they lie in different databases.
 29     #
 30     belongs_to :user, :class_name => 'User', :foreign_key => :username
 31 
 32     def self.messages_for(username, offset, limit)
 33        Message.find(:all, :conditions => ['username = ?', username], :offset => offset, :limit => limit, :order => 'sent_at desc')
 34     end
 35 
 36     def self.destroy_message(id)
 37        message = Message.find_by_message_id(id)
 38        if message
 39           message.destroy
 40           message
 41        else
 42           nil
 43        end
 44     end
 45 
 46 
 47     def to_s
 48       "#{username} -> #{channel_name}: #{message_body}"
 49     end
 50 
 51     def to_external_xml(xml)
 52       xml.message(message_body, :message_id => self.message_id, :message_type => self.message_type, :from => self.username, :to => self.channel_name, :sent_at => sent_at.httpdate) 
 53     end
 54 end
 
 

./model/reply_message.rb
  1 ###
  2 # Message sent in response to another tweet, message or direct message.
  3 #
  4 # commented below because message could be in different database.
  5 # belongs_to :reply_message_id, :class_name => Message, :dependent => :destroy
  6 ###
  7 
  8 require 'direct_message'
  9 
 10 class ReplyMessage < DirectMessage
 11   validates_presence_of       :reply_message_id,    :within => 1..64
 12   def self.replies(username, offset, limit)
 13      ReplyMessage.find(:all, :conditions => ['username = ?', username], :offset => offset, :limit => limit, :order => 'sent_at desc')
 14   end
 15 end
 
 

./model/guid_generator.rb
  1 class GuidGenerator
  2   @@count = 1
  3   def self.guid(prefix)
  4     @@count += 1
  5     "#{prefix}#{@@count}#{Time.now.to_i}"
  6   end
  7   def self.next_message_id
  8     guid("message_id")
  9   end
 10   def self.next_request_id
 11     guid("request_id")
 12   end
 13 end
 
 

./model/follower.rb
  1 #
  2 ####
  3 # Follower stores one-way relation between two users. However, in order to 
  4 # support sharding, we don't directly connect users instead we only store
  5 # users' usernames.
  6 ###
  7 class Follower < ActiveRecord::Base
  8     RELATION_TYPE_FOLLOWER = "Follower"
  9     RELATION_TYPE_FRIEND = "Friend"
 10     RELATION_TYPE_FAMILY = "Family"
 11     RELATION_TYPE_COLLEAGUE = "Colleague"
 12     RELATION_TYPE_ACQUAINTENCE = "Acquaintance"
 13     RELATION_TYPE_FAN = "Fan"
 14     RELATION_TYPE_OTHER = "Other"
 15 
 16     validates_presence_of       :relation_type
 17     #validates_presence_of       :blocked
 18     #validates_presence_of       :notifications
 19 
 20     def self.create_follower(follower_attrs)
 21         Follower.create!(follower_attrs) unless exists?(follower_attrs[:username], follower_attrs[:follower_username])
 22     end
 23 
 24     def self.followers(username, offset, limit)
 25         self.find(:all, :conditions => ['username = ?', username], :offset => offset, :limit => limit)
 26     end
 27     def self.follower_count(username)
 28         self.count(:conditions => ['username = ?', username])
 29     end
 30     def self.following_count(username)
 31         self.count(:conditions => ['follower_username = ?', username])
 32     end
 33 
 34     def self.followings(username, offset, limit)
 35         self.find(:all, :conditions => ['follower_username = ?', username], :offset => offset, :limit => limit)
 36     end
 37     def self.destroy_follower(username, follower_username)
 38         self.get_record(username, follower_username).destroy
 39     end
 40     def self.exists?(username, follower_username)
 41         !self.get_record(username, follower_username).nil?
 42     end
 43     def self.enable_notifications(username, follower_username)
 44         self.get_record(username, follower_username).update_attributes(:notifications => true)
 45     end
 46     def self.disable_notifications(username, follower_username)
 47         self.get_record(username, follower_username).update_attributes(:notifications => false)
 48     end
 49     def self.block(username, follower_username)
 50         self.get_record(username, follower_username).update_attributes(:blocked => true)
 51     end
 52     def self.unblock(username, follower_username)
 53         self.get_record(username, follower_username).update_attributes(:blocked => false)
 54     end
 55     def self.get_record(username, follower_username)
 56         self.find(:first, :conditions => ['username = ? and follower_username = ?', username, follower_username])
 57     end
 58     def to_s
 59       "#{username} -> #{follower_username}"
 60     end
 61     def to_external_xml(xml)
 62       xml.follower(:username => self.username, :follower_username => self.follower_username)
 63     end
 64 
 65 end
 
 

./model/direct_message.rb
  1 ####
  2 # Direct message is sent directly to another user
  3 ####
  4 class DirectMessage < Message
  5   def to_user(user)
  6      self.channel_name = user.username
  7   end
  8   def self.direct_messages_sent(username, offset, limit)
  9      DirectMessage.find(:all, :conditions => ['username = ?', username], :offset => offset, :limit => limit, :order => 'sent_at desc')
 10   end
 11   def self.direct_messages_received(username, offset, limit)
 12      DirectMessage.find(:all, :conditions => ['channel_name = ?', username], :offset => offset, :limit => limit, :order => 'sent_at desc')
 13   end
 14 end
 
 

Source Code Services

./service/base_service.rb
  1 class BaseService
  2 protected
  3   def initialize(username)
  4     SchemaHelper.setup_schema_for(username)
  5     
  6   end
  7 
  8   def serialize(obj, format)
  9     if obj
 10       format ||= 'xml'
 11       case format
 12         when 'xml'
 13           obj.to_xml
 14         when 'yaml'
 15           obj.to_yaml
 16         when 'json'
 17           obj.to_json
 18         else
 19           obj
 20       end
 21     end
 22   end
 23 end
 
 

./service/users_service.rb
  1 #
  2 ## UserService provides basic functionality for quering/storing users
  3 ## It assumes connection to the right database is already setup.
  4 #
  5 class UsersService < BaseService
  6   #
  7   ### initialize service
  8   #
  9   def initialize(username)
 10     super
 11   end
 12 
 13 
 14   #
 15   ### retrieves user record
 16   #
 17   def get_user(username, options={})
 18     User.find_by_username(username)
 19   end
 20 
 21   #
 22   ### return usernames of people whom the user is following.
 23   #
 24   def followings(username, options={})
 25     offset = options[:offset] || 0
 26     limit = options[:limit] || 100
 27     User.following_usernames(username, offset, limit)
 28   end
 29 
 30 
 31   #
 32   ### return usernames of followers of authenticated user
 33   #
 34   def followers(username, options={})
 35     offset = options[:offset] || 0
 36     limit = options[:limit] || 100
 37     User.follower_usernames(username, offset, limit)
 38   end
 39 
 40 
 41   def authenticate(username, password, options={})
 42     User.find_by_username_and_password(username, password)
 43   end
 44 
 45 
 46   def create_user(user_attrs, options={})
 47     user = User.new()
 48     user.attributes = user_attrs
 49     user.save!
 50     user
 51   end
 52 
 53   def archive_messages(username, options={})
 54     offset = options[:offset] || 0
 55     limit = options[:limit] || 100
 56     Message.messages_for(username, offset, limit)
 57   end
 58 end
 
 

./service/followers_service.rb
  1 #
  2 ## FollowersService provides basic functionality for creating/quering followers
  3 ## It assumes all users are already authenticated before calling these methods.
  4 #
  5 class FollowersService < BaseService
  6   #
  7   ### initialize service
  8   #
  9   def initialize(username)
 10     super
 11   end
 12   #
 13   ### create follower
 14   #
 15   def create_follower(follower_attrs, options={})
 16     Follower.create_follower(follower_attrs)
 17   end
 18 
 19   #
 20   ### enable notifications
 21   #
 22   def enable_notifications(username, follower_username, options={})
 23     Follower.enable_notifications(username, follower_username)
 24   end
 25 
 26   #
 27   ### disable notifications
 28   #
 29   def disable_notifications(username, follower_username, options={})
 30     Follower.disable_notifications(username, follower_username)
 31   end
 32 
 33 
 34   #
 35   ### blocks a user
 36   #
 37   def block(username, follower_username, options={})
 38     Follower.block(username, follower_username)
 39   end
 40 
 41   #
 42   ### unblock user
 43   #
 44   def unblock(username, follower_username, options={})
 45     Follower.unblock(username, follower_username)
 46   end
 47 
 48 
 49   #
 50   ### delete status
 51   #
 52   def destroy_follower(username, follower_username, options={})
 53     Follower.destroy_follower(username, follower_username)
 54     true
 55   rescue ActiveRecord::RecordNotFound => e
 56     false
 57   end
 58 
 59 
 60   #
 61   ### get a single status
 62   #
 63   def exists?(username, follower_username, options={})
 64     Follower.exists?(username, follower_username)
 65   end
 66 end
 
 

./service/messages_service.rb
   1 #
   2 ## MessagesService provides basic functionality for quering/storing statuses/messages
   3 ## It assumes all users are already authenticated before calling these methods.
   4 #
   5 class MessagesService < BaseService
   6   #
   7   ### initialize service
   8   #
   9   def initialize(username)
  10     super
  11   end
  12 
  13 
  14   #
  15   ### get a single status
  16   #
  17   def get_status(message_id, options={})
  18     Message.find_by_message_id(message_id)
  19   end
  20 
  21   #
  22   ### update status
  23   #
  24   def update_status(message_attrs, options={})
  25     message = Tweet.new
  26     message.attributes = message_attrs
  27     safe_save(message)
  28   end
  29 
  30   #
  31   ### delete status
  32   #
  33   def destroy_message(message_id, options={})
  34     Message.destroy_message(message_id)
  35   end
  36 
  37 
  38 
  39   #
  40   ### retrieves most recent statuses for public
  41   #
  42   def public_timeline(username, options={})
  43     offset = options[:offset] || 0
  44     limit = options[:limit] || 10
  45     User.timeline(username, offset, limit)
  46   end
  47 
  48   #
  49   ### retrieves most recent statuses for user
  50   #
  51   def user_timeline(username, options={})
  52     offset = options[:offset] || 0
  53     limit = options[:limit] || 100
  54     User.timeline(username, offset, limit)
  55   end
  56 
  57 
  58   #
  59   ### retrieves most recent statuses for friends
  60   #
  61   def friends_timeline(username, options={})
  62     offset = options[:offset]
  63     limit = options[:limit]
  64     get_friends_timeline(username, offset, limit)
  65   end
  66 
  67 
  68   #
  69   ### retrieves most recent statuses for user and his/her friends
  70   #
  71   def user_and_friends_timeline(username, options={})
  72     offset = options[:offset]
  73     limit = options[:limit]
  74     messages = User.timeline(username, offset, limit)
  75     messages += get_friends_timeline(username, offset, limit)
  76   end
  77 
  78 private
  79 
  80   #
  81   ### retrieves most recent statuses for friends without any conversion
  82   #
  83   def get_friends_timeline(username, offset, limit)
  84     offset = offset || 0
  85     following_count = Follower.following_count(username)
  86     limit = limit || (following_count < 100 ? following_count / 100 + 1 : 1)
  87 
  88     following = User.following_usernames(username, 0, 200)
  89     messages = []
  90     following.each do |username|
  91       messages += Message.messages_for(username, offset, limit)
  92     end
  93     messages
  94   end
  95 protected
  96   def safe_save(message)
  97     old_message = Message.find_by_message_id(message.message_id)
  98     if old_message.nil?
  99        message.save!
 100        message
 101     else
 102        puts "Message with id #{message.message_id} already exists #{message.inspect}" 
 103        nil
 104     end
 105   end
 106 end
 
 

./service/direct_messages_service.rb
  1 #
  2 ## DirectMessagesService provides functionality for sending/receiving direct messages
  3 #
  4 class DirectMessagesService < MessagesService
  5   #
  6   ### initialize service
  7   #
  8   def initialize(username)
  9     super
 10   end
 11   #
 12   ### sents back 20 most recent direct messages for the user
 13   ### it assumes user is already authenticated.
 14   #
 15   def direct_messages_received(username, options={})
 16     offset = options[:offset] || 0
 17     limit = options[:limit] || 20
 18     DirectMessage.direct_messages_received(username, offset, limit)
 19   end
 20   #
 21   ### sents back 20 most recent direct messages for the user
 22   ### it assumes user is already authenticated.
 23   #
 24   def direct_messages_sent(username, options={})
 25     offset = options[:offset] || 0
 26     limit = options[:limit] || 20
 27     DirectMessage.direct_messages_sent(username, offset, limit)
 28   end
 29 
 30   #
 31   ### send direct message
 32   #
 33   def send_direct_message(message_attrs, options={})
 34     message = DirectMessage.new()
 35     message.attributes = message_attrs
 36     safe_save(message)
 37   end
 38 
 39   #
 40   ### send reply message
 41   #
 42   def send_reply(message_attrs, options={})
 43     message = ReplyMessage.new()
 44     message.attributes = message_attrs
 45     safe_save(message)
 46   end
 47 
 48   #
 49   ### sents back 20 most recent replies for the user
 50   ### it assumes user is already authenticated.
 51   #
 52   def replies(username, options={})
 53     offset = options[:offset] || 0
 54     limit = options[:limit] || 100
 55     ReplyMessage.replies(username, offset, limit)
 56   end
 57 
 58 
 59   #
 60   ### delete direct message
 61   #
 62   def destroy_direct_message(id, options={})
 63     DirectMessage.destroy_message(id)
 64   end
 65 end
 
 

Source Code Business Delegate

./delegate/base_delegate.rb
  1 #
  2 ## BaseDelegate provides client side interface for connecting to the business 
  3 ## service using BusinessDelegate pattern.
  4 #
  5 class BaseDelegate
  6 protected
  7   def initialize(jms_helper, read_queue_name, write_queue_name)
  8      @read_queue_name = read_queue_name
  9      @write_queue_name = write_queue_name
 10      @jms_helper = jms_helper 
 11      @jms_helper.create_producers(@read_queue_name, @write_queue_name)
 12   end
 13 
 14 
 15   def new_jms_message(properties={})
 16     jms_msg = @jms_helper.create_message("")    ### ActiveMQTextMessage.new
 17     jms_msg.setStringProperty('request_id', GuidGenerator.next_request_id)
 18     properties.each do |name, value|
 19         jms_msg.setStringProperty(name.to_s, value.to_s)
 20     end
 21     jms_msg
 22   end
 23 
 24 
 25   def send_read_request(jms_msg)
 26     @jms_helper.send_message(@read_queue_name, jms_msg, true).text
 27   end
 28 
 29 
 30   def send_write_request(jms_msg, reply=true)
 31     response =  @jms_helper.send_message(@write_queue_name, jms_msg, reply)
 32     if response.respond_to? :text
 33         response.text
 34     else
 35         response
 36     end
 37   end
 38 end
 
 
./delegate/users_delegate.rb
  1 #
  2 ## UserDelegate provides basic functionality for quering/storing users
  3 ## It assumes connection to the right database is already setup.
  4 #
  5 class UsersDelegate < BaseDelegate
  6   def initialize(jms_helper)
  7     super(jms_helper, JmsHelper::QUEUE_READ_USERS, JmsHelper::QUEUE_WRITE_USERS)
  8   end
  9   def create_user(user_attrs, options={})
 10       args = options.merge(user_attrs).merge(:action => 'Users.create_user')
 11       jms_msg = new_jms_message(args)
 12       send_write_request(jms_msg)
 13       
 14   end
 15 
 16   #
 17   ### retrieves user record
 18   #
 19   def get_user(username, options={})
 20       jms_msg = new_jms_message(options.merge(:username => username, :action => 'Users.get_user'))
 21       send_read_request(jms_msg)
 22       
 23   end
 24 
 25   #
 26   ### return usernames of people the user is following
 27   #
 28   def followings(username, options={})
 29       jms_msg = new_jms_message(options.merge(:username => username, :action => 'Users.followings'))
 30       send_read_request(jms_msg)
 31       
 32   end
 33 
 34 
 35   #
 36   ### return 100 usernames of followers of authenticated user
 37   #
 38   def followers(username, options={})
 39       jms_msg = new_jms_message(options.merge(:username => username, :action => 'Users.followers'))
 40       send_read_request(jms_msg)
 41       
 42   end
 43 
 44 
 45   def authenticate(username, password, options={})
 46       jms_msg = new_jms_message(options.merge(:username => username, :password => password, :action => 'Users.authenticate'))
 47       send_read_request(jms_msg)
 48       
 49   end
 50 
 51 
 52   def archive_messages(username, options={})
 53       jms_msg = new_jms_message(options.merge(:username => username, :action => 'Users.archive_messages'))
 54       send_read_request(jms_msg)
 55       
 56   end
 57 end
 
 

./delegate/messages_delegate.rb
  1 #
  2 ## MessagesDelegate provides basic functionality for quering/storing statuses/messages
  3 ## It assumes all users are already authenticated before calling these methods.
  4 #
  5 class MessagesDelegate < BaseDelegate
  6   def initialize(jms_helper)
  7     super(jms_helper, JmsHelper::QUEUE_READ_MESSAGES, JmsHelper::QUEUE_WRITE_MESSAGES)
  8   end
  9   #
 10   ### get a single status
 11   #
 12   def get_status(message_id, options={})
 13       jms_msg = new_jms_message(options.merge(:message_id => message_id, :action => 'Messages.get_status'))
 14       send_read_request(jms_msg)
 15       
 16   end
 17 
 18   #
 19   ### update status
 20   #
 21   def update_status(message_attrs, options={})
 22       jms_msg = new_jms_message({:message_id => GuidGenerator.next_message_id, :action => 'Messages.update_status'}.merge(options).merge(message_attrs))
 23       send_write_request(jms_msg)
 24       
 25   end
 26 
 27   #
 28   ### delete status
 29   #
 30   def destroy_message(message_id, options={})
 31       jms_msg = new_jms_message(options.merge(:message_id => message_id, :action => 'Messages.destroy_message'))
 32       send_write_request(jms_msg)
 33       
 34   end
 35 
 36 
 37 
 38   #
 39   ### retrieves most recent statuses for public
 40   #
 41   def public_timeline(username, options={})
 42       jms_msg = new_jms_message(options.merge(:username => username, :action => 'Messages.public_timeline'))
 43       send_read_request(jms_msg)
 44       
 45   end
 46 
 47   #
 48   ### retrieves most recent statuses for user
 49   #
 50   def user_timeline(username, options={})
 51       jms_msg = new_jms_message(options.merge(:username => username, :action => 'Messages.user_timeline'))
 52       send_read_request(jms_msg)
 53       
 54   end
 55 
 56 
 57   #
 58   ### retrieves most recent statuses for friends
 59   #
 60   def friends_timeline(username, options={})
 61       jms_msg = new_jms_message(options.merge(:username => username, :action => 'Messages.friends_timeline'))
 62       send_read_request(jms_msg)
 63       
 64   end
 65 
 66 
 67   #
 68   ### retrieves most recent statuses for user and his/her friends
 69   #
 70   def user_and_friends_timeline(username, options={})
 71       jms_msg = new_jms_message(options.merge(:username => username, :action => 'Messages.user_and_friends_timeline'))
 72       send_read_request(jms_msg)
 73       
 74   end
 75 end
 
 

./delegate/followers_delegate.rb
  1 #
  2 ## FollowersDelegate provides basic functionality for creating/quering followers
  3 ## It assumes all users are already authenticated before calling these methods.
  4 #
  5 class FollowersDelegate < BaseDelegate
  6   def initialize(jms_helper)
  7     super(jms_helper, JmsHelper::QUEUE_READ_FOLLOWERS, JmsHelper::QUEUE_WRITE_FOLLOWERS)
  8   end
  9   #
 10   ### create follower
 11   #
 12   def create_follower(follower_attrs, options={})
 13       jms_msg = new_jms_message(options.merge(follower_attrs).merge(:action => 'Followers.create_follower'))
 14       send_write_request(jms_msg)
 15       
 16   end
 17 
 18   #
 19   ### enable notifications
 20   #
 21   def enable_notifications(username, follower_username, options={})
 22       jms_msg = new_jms_message(options.merge(:username => username, :follower_username => follower_username, :action => 'Followers.enable_notifications'))
 23       send_write_request(jms_msg)
 24       
 25   end
 26 
 27   #
 28   ### disable notifications
 29   #
 30   def disable_notifications(username, follower_username, options={})
 31       jms_msg = new_jms_message(options.merge(:username => username, :follower_username => follower_username, :action => 'Followers.disable_notifications'))
 32       send_write_request(jms_msg)
 33       
 34   end
 35 
 36 
 37   #
 38   ### blocks a user
 39   #
 40   def block(username, follower_username, options={})
 41       jms_msg = new_jms_message(options.merge(:username => username, :follower_username => follower_username, :action => 'Followers.block'))
 42       send_write_request(jms_msg)
 43       
 44   end
 45 
 46   #
 47   ### unblock user
 48   #
 49   def unblock(username, follower_username, options={})
 50       jms_msg = new_jms_message(options.merge(:username => username, :follower_username => follower_username, :action => 'Followers.unblock'))
 51       send_write_request(jms_msg)
 52       
 53   end
 54 
 55 
 56   #
 57   ### delete status
 58   #
 59   def destroy_follower(username, follower_username, options={})
 60       jms_msg = new_jms_message(options.merge(:username => username, :follower_username => follower_username, :action => 'Followers.destroy_follower'))
 61       send_write_request(jms_msg)
 62       
 63   end
 64 
 65 
 66   #
 67   ### get a single status
 68   #
 69   def exists?(username, follower_username, options={})
 70       jms_msg = new_jms_message(options.merge(:username => username, :follower_username => follower_username, :action => 'Followers.exists?'))
 71       send_read_request(jms_msg)
 72       
 73   end
 74 end
 
 

./delegate/direct_messages_delegate.rb
  1 #
  2 ## DirectMessagesDelegate provides functionality for sending/receiving direct messages
  3 #
  4 class DirectMessagesDelegate < MessagesDelegate
  5   def initialize(jms_helper)
  6     super(jms_helper)
  7   end
  8 
  9   #
 10   ### sents back 20 most recent direct messages for the user
 11   ### it assumes user is already authenticated.
 12   #
 13   def direct_messages_received(username, options={})
 14       jms_msg = new_jms_message(options.merge(:username => username, :action => 'DirectMessages.direct_messages_received'))
 15       send_read_request(jms_msg)
 16       
 17   end
 18   #
 19   ### sents back 20 most recent direct messages for the user
 20   ### it assumes user is already authenticated.
 21   #
 22   def direct_messages_sent(username, options={})
 23       jms_msg = new_jms_message(options.merge(:username => username, :action => 'DirectMessages.direct_messages_sent'))
 24       send_read_request(jms_msg)
 25       
 26   end
 27 
 28   #
 29   ### send direct message
 30   #
 31   def send_direct_message(message_attrs, options={})
 32       jms_msg = new_jms_message({:message_id => GuidGenerator.next_message_id, :action => 'DirectMessages.send_direct_message'}.merge(options).merge(message_attrs))
 33       send_write_request(jms_msg)
 34       
 35   end
 36 
 37   #
 38   ### send reply message
 39   #
 40   def send_reply(message_attrs, options={})
 41       jms_msg = new_jms_message({:message_id => GuidGenerator.next_message_id, :action => 'DirectMessages.send_reply'}.merge(options).merge(message_attrs))
 42       send_write_request(jms_msg)
 43       
 44   end
 45 
 46   #
 47   ### sents back 20 most recent replies for the user
 48   ### it assumes user is already authenticated.
 49   #
 50   def replies(username, options={})
 51       jms_msg = new_jms_message(options.merge(:username => username, :action => 'DirectMessages.replies'))
 52       send_read_request(jms_msg)
 53       
 54   end
 55 
 56 
 57   #
 58   ### delete direct message
 59   #
 60   def destroy_direct_message(message_id, options={})
 61       jms_msg = new_jms_message(options.merge(:message_id => message_id, :action => 'DirectMessages.destroy_direct_message'))
 62       send_write_request(jms_msg)
 63       
 64   end
 65 end
 
 

Source Code Message Handlers

./messaging/base_handler.rb
   1 #
   2 ## BaseHandler provides common functionality for handling messages
   3 #
   4 class BaseHandler 
   5   attr_reader :username
   6   attr_reader :message_text
   7   attr_reader :timings
   8   def initialize
   9     @timings = []
  10     @started_at = Time.now
  11     @host_id = Digest::MD5.hexdigest("localhost")       #lookup real host
  12   end
  13 
  14   def handle(jms_msg, options={})
  15     add_timing_begin
  16     msg_attrs = {}
  17     pnames = jms_msg.getPropertyNames() 
  18     for pname in pnames
  19         msg_attrs[pname] = msg_attrs[pname.to_sym] = jms_msg.getObjectProperty(pname)
  20         case pname 
  21         when 'username'
  22           @username = msg_attrs[pname]
  23         when 'auth_user'
  24           @auth_user = msg_attrs[pname]
  25         when 'request_id'
  26           @request_id = msg_attrs[pname]
  27         end
  28     end
  29     if @username.nil?
  30         raise "username missing in #{jms_msg.inspect} options #{options.inspect}"
  31     end
  32     if jms_msg.respond_to? :text
  33         #msg_attrs.merge!(xml_to_dict(jms_msg.text)) 
  34         msg_attrs[:message_text] = jms_msg.text
  35     end
  36     ##
  37     add_timing_for_input_xml
  38     response = do_handle(msg_attrs)
  39     ##
  40     add_timing_end
  41     response
  42   rescue ActiveRecord::RecordInvalid => invalid
  43     response = xml_error("400", "handle", invalid.record.errors)
  44     add_timing_end
  45     response
  46   rescue java.sql.SQLException => e
  47     puts "Failed to handle #{self.class.name}(#{jms_msg} due to #{e.inspect}\n\n"
  48     while (e) 
  49       puts e.backtrace << "\n\n"
  50       e = e.getNextException() 
  51     end
  52     response = xml_error("500", "handle", {"UNKNOWN_ERROR" => e.to_s})
  53     add_timing_end
  54     response
  55   rescue Exception => e
  56     puts "Failed to handle #{self.class.name}(#{jms_msg} due to #{e.inspect}\n\n#{e.backtrace}"
  57     response = xml_error("500", "handle", {"UNKNOWN_ERROR" => e.to_s})
  58     add_timing_end
  59     response
  60   end
  61 
  62 protected
  63   def username_for_service
  64     @auth_user || @username
  65   end
  66 
  67   def add_timing(what)
  68     @timings << {what, (Time.now.to_i-@started_at.to_i)}
  69   end
  70   def add_timing_begin
  71     add_timing "#{self.class.name} begin"
  72   end
  73   def add_timing_end
  74     add_timing "#{self.class.name} end"
  75   end
  76   def add_timing_for_db
  77     add_timing "#{self.class.name} connected to db"
  78   end
  79   def add_timing_for_output_xml
  80     add_timing "#{self.class.name} converting output xml"
  81   end
  82   def add_timing_for_input_xml
  83     add_timing "#{self.class.name} converting input xml"
  84   end
  85 
  86   def do_handle(msg_attrs, options={})
  87     raise "Implement handle"
  88   end
  89   def xml_to_dict(xml)
  90     Hash.from_xml(xml)
  91   end
  92   def xml_error(response_code, action, errors, options={})
  93     #logger.error "Failed to #{action} due to #{error_code} by #{username} -- #{error_message}"
  94     to_xml(response_code, "Error response for #{action}", options) do |xml|
  95       xml.errors do 
  96         errors.each do |error_code, error_message|
  97           xml.error(:error_code => error_code, :error_message => error_message)
  98         end
  99       end
 100     end
 101   end
 102 
 103   def to_xml(response_code, comment="", options={})
 104     buffer = options[:buffer] ||= ''
 105 
 106     xml = options[:builder] ||= Builder::XmlMarkup.new(:target => buffer, :indent => options[:indent])
 107     xml.instruct! unless options[:skip_instruct]
 108     xml.comment! "Response #{comment} in reply for #{username} as of #{Time.now.utc}"
 109     xml.response(:request_id => @request_id, :response_code => response_code, :host_id => @host_id, :version => '1.0') do
 110       yield xml if block_given?
 111       xml.timings do
 112          @timings.each do |what, duration|
 113             xml.timing(:what => what, :duration_millis => duration)
 114          end
 115       end
 116     end
 117     buffer
 118   end
 119 end
 
 

./messaging/archive_messages_handler.rb
  1 #
  2 ## ArchiveMessagesHandler sends back archive messages for the user
  3 #
  4 class ArchiveMessagesHandler < BaseHandler
  5 protected
  6   def do_handle(msg_attrs, options={})
  7     svc = UsersService.new(self.username_for_service)
  8     add_timing_for_db
  9     messages = svc.archive_messages(self.username, options)
 10     add_timing_for_output_xml
 11     to_xml("200", "Archived Messages", options) do |xml|
 12       xml.messages do
 13          messages.each do |message|
 14            message.to_external_xml(xml)
 15          end
 16       end
 17     end
 18   end
 19 end
 
 

./messaging/authenticate_handler.rb
  1 ## AuthenticateHandler authenticates user
  2 #
  3 class AuthenticateHandler < BaseHandler
  4 protected
  5   def do_handle(msg_attrs, options={})
  6     password = msg_attrs[:password]
  7     svc = UsersService.new(self.username_for_service)
  8     add_timing_for_db
  9     user = svc.authenticate(username, password, options)
 10     add_timing_for_output_xml
 11     if user
 12       to_xml("200", "Authentication", options) do |xml|
 13          user.to_external_xml(xml)
 14       end
 15     else
 16       xml_error("401", "Authentication", {"AUTH_FAILURE" => "Invalid username/password"})
 17     end
 18   end
 19 end
 
 

./messaging/block_follower_handler.rb
  1 ## BlockFollowerHandler blocks follower
  2 #
  3 class BlockFollowerHandler < BaseHandler
  4 protected
  5   def do_handle(msg_attrs, options={})
  6     svc = FollowersService.new(self.username_for_service)
  7     follower_username = msg_attrs[:follower_username]
  8     add_timing_for_db
  9     blocked = svc.block(self.username, follower_username, options)
 10     response_code = blocked ? "200" : "404"
 11     add_timing_for_output_xml
 12     to_xml(response_code, "Block Follower", options) do |xml|
 13       xml.follower(:username => self.username, :follower_username => follower_username)
 14     end
 15   end
 16 end
 
 

Messaging Helpers

./messaging/message_forwarder.rb
  1 class MessageForwarder
  2   include java.lang.Runnable
  3   include javax.jms.MessageListener
  4 
  5   def initialize(jms_helper, queue_name, handlers)
  6     @jms_helper = jms_helper
  7     @queue_name = queue_name
  8     @handlers = handlers
  9   end
 10 
 11   def run
 12     @consumer = @jms_helper.get_consumer(@queue_name)
 13     @consumer.set_message_listener(self);
 14     puts "Starting listener for queue #{@queue_name}"
 15   end
 16 
 17   def onMessage(jms_msg)
 18     action = jms_msg.getProperty('action')
 19     if action.nil?
 20        puts "Unknown message #{jms_msg.inspect}"
 21     else
 22        handler = @handlers[action]
 23        if handler.nil?
 24           puts "No handler for #{action} -- #{jms_msg.inspect}"
 25        else
 26           
 27           response = handler.new().handle(jms_msg)
 28           
 29           if jms_msg.getJMSReplyTo()
 30              reply_queue = jms_msg.getJMSReplyTo()
 31              reply_producer = @jms_helper.create_producer(reply_queue)
 32              reply_message = @jms_helper.create_message(response)
 33              reply_message.setJMSCorrelationID(jms_msg.getJMSMessageID())
 34              reply_producer.send(reply_message)
 35           end
 36        end
 37     end
 38   end
 39 
 40   def close
 41     @consumer.close();
 42   end
 43 end
 
 

./messaging/jms_helper.rb
   1 
   2 class JmsHelper
   3   #Object.module_eval("::#{i}")
   4   #http://java.sun.com/j2ee/1.4/docs/tutorial/doc/JMS6.html
   5   #
   6   ### A key tip for scalability is to separate your reads from writes that way you can scale them independently and also
   7   ### offer different quality of services, e.g. read queues can be non-persistent, but write queues can be persistent.
   8   ###
   9   #
  10     QUEUE_READ_MESSAGES = 'read_messages'
  11     QUEUE_WRITE_MESSAGES = 'write_messages'
  12     QUEUE_READ_FOLLOWERS = 'read_followers'
  13     QUEUE_WRITE_FOLLOWERS = 'write_followers'
  14     QUEUE_READ_USERS = 'read_users'
  15     QUEUE_WRITE_USERS = 'write_users'
  16 
  17     READ_MESSAGES_HANDLERS = {
  18 	'DirectMessages.direct_messages_received' => DirectMessagesReceivedHandler,
  19 	'DirectMessages.direct_messages_sent' => DirectMessagesSentHandler,
  20 	'DirectMessages.replies' => RepliesHandler,
  21 	'Messages.get_status' => GetStatusHandler,
  22 	'Messages.public_timeline' => PublicTimelineHandler,
  23 	'Messages.user_timeline' => UserTimelineHandler,
  24 	'Messages.friends_timeline' => FriendsTimelineHandler,
  25 	'Messages.user_and_friends_timeline' => UserFriendTimelineHandler,
  26     }
  27     WRITE_MESSAGES_HANDLERS = {
  28 	'DirectMessages.send_direct_message' => SendDirectMessageHandler,
  29 	'DirectMessages.send_reply' => SendReplyMessageHandler,
  30 	'DirectMessages.destroy_direct_message' => DestroyDirectMessageHandler,
  31 	'Messages.update_status' => UpdateStatusHandler,
  32 	'Messages.destroy_message' => DestroyMessageHandler,
  33     }
  34     READ_FOLLOWERS_HANDLERS = {
  35 	'Followers.exists?' => FollowerExistsHandler,
  36     }
  37     WRITE_FOLLOWERS_HANDLERS = {
  38 	'Followers.create_follower' => CreateFollowerHandler,
  39 	'Followers.enable_notifications' => EnableNotificationsHandler,
  40 	'Followers.disable_notifications' => DisableNotificationsHandler,
  41 	'Followers.block' => BlockFollowerHandler,
  42 	'Followers.unblock' => UnblockFollowerHandler,
  43 	'Followers.destroy_follower' => DestroyFollowerHandler,
  44     }
  45     READ_USERS_HANDLERS = {
  46 	'Users.get_user' => GetUserHandler,
  47 	'Users.followings' => FollowingsHandler,
  48 	'Users.followers' => FollowersHandler,
  49 	'Users.authenticate' => AuthenticateHandler,
  50 	'Users.archive_messages' => ArchiveMessagesHandler,
  51     }
  52     WRITE_USERS_HANDLERS = {
  53 	'Users.create_user' => CreateUserHandler,
  54     }
  55 
  56   def initialize(connection=nil)
  57     if connection
  58        @connection = connection
  59     else
  60        factory = ActiveMQConnectionFactory.new("tcp://localhost:61616")
  61        @connection = factory.create_connection()
  62        @connection.set_exception_listener(self)
  63        @connection.start();
  64     end
  65     @queues = {}
  66     @producers = {}
  67     @consumers = {}
  68   end
  69 
  70   def start_consumers
  71     ### Creating another instance of jms helper because it keeps private session which cannot be shared
  72     ### across threads, though it can share connection.
  73     start_forwarder(QUEUE_READ_MESSAGES, READ_MESSAGES_HANDLERS)
  74     start_forwarder(QUEUE_WRITE_MESSAGES, WRITE_MESSAGES_HANDLERS)
  75     start_forwarder(QUEUE_READ_FOLLOWERS, READ_FOLLOWERS_HANDLERS)
  76     start_forwarder(QUEUE_WRITE_FOLLOWERS, WRITE_FOLLOWERS_HANDLERS)
  77     start_forwarder(QUEUE_READ_USERS, READ_USERS_HANDLERS)
  78     start_forwarder(QUEUE_WRITE_USERS, WRITE_USERS_HANDLERS)
  79     #
  80     java.lang.Thread.currentThread().join
  81   end
  82   
  83   def onException(jmsEx)
  84     puts "JMS Exception occured #{jmsEx.inspect}, shutting down"
  85   end
  86 
  87   def close
  88     @connection.close()
  89     @session.close if @session
  90   end
  91 
  92   def create_message(text)
  93     get_session().createTextMessage(text)
  94   end
  95 
  96   def send_message(queue_name, jms_msg, reply=false)
  97     ## TODO add error_queue/dead letter queue
  98     if reply
  99       reply_queue = get_session().createTemporaryQueue() 
 100       jms_msg.setJMSReplyTo(reply_queue)
 101     end
 102     
 103     get_producer(queue_name).send(get_queue(queue_name), jms_msg)
 104     if reply_queue 
 105         reply_consumer = get_session().createConsumer(reply_queue)
 106         reply_consumer.receive()
 107     end
 108   end
 109 
 110   def get_producer(queue_name)
 111       producer = @producers[queue_name]
 112       if producer.nil?
 113           producer = @producers[queue_name] = get_session().createProducer(get_queue(queue_name))
 114           #producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)     # this can be changed for write queues
 115           #producer.setTimeToLive(60000)
 116       end
 117       producer
 118   end
 119   def get_queue(queue_name)
 120      queue = @queues[queue_name]
 121      if queue.nil?
 122         queue = @queues[queue_name] = get_session().create_queue(queue_name)
 123      end
 124      queue
 125   end
 126 
 127   def get_consumer(queue_name)
 128       consumer = @consumers[queue_name]
 129       if consumer.nil?
 130           consumer = @consumers[queue_name] = get_session().create_consumer(get_queue(queue_name))
 131       end
 132       consumer
 133   end
 134 
 135   def create_producers(*queue_names)
 136     for queue_name in queue_names
 137       queue = get_queue(queue_name)
 138       producer = get_producer(queue_name)
 139     end
 140   end
 141 
 142   def create_producer(queue)
 143      get_session().createProducer(queue)
 144   end
 145 
 146   def self.start_broker
 147     broker = BrokerService.new()
 148     broker.addConnector("tcp://localhost:61616")
 149     broker.start()
 150     sleep 2    # let broker startup
 151   end
 152 protected    
 153   def get_session
 154     @session ||= @connection.create_session(false, Session::AUTO_ACKNOWLEDGE)
 155   end
 156   def start_forwarder(queue_name, handlers)
 157     raise "null connection" if @connection.nil?
 158     jms_helper = JmsHelper.new(@connection)
 159     Thread.new(){MessageForwarder.new(jms_helper, queue_name, handlers).run}
 160   end
 161 end
 162 
 163 if $PROGRAM_NAME == __FILE__
 164   JmsHelper.start_broker
 165   JmsHelper.new.start_consumers
 166 end
 
 

I am not showing all message handlers because they look pretty much the same.

Database

./db/schema_helper.rb
  1 require 'rubygems'
  2 require 'un'
  3 require 'active_record'
  4 require 'create_followers'
  5 require 'create_messages'
  6 require 'create_users'
  7 
  8 if defined?(JRUBY_VERSION)
  9   gem 'activerecord-jdbc-adapter'
 10   require 'jdbc_adapter'
 11   require 'java'
 12 end
 13 
 14 
 15 class SchemaHelper
 16   MAX_DB = 10
 17   #
 18   def self.create_schema()
 19     for i in 0 ... MAX_DB
 20        connect_to(i)
 21     end
 22   end
 23 
 24   def self.hash_for(username)
 25      hash = username.hash % MAX_DB
 26   end
 27 
 28   def self.setup_schema_for(username)
 29      hash = hash_for(username)
 30      connect_to(hash)
 31   end
 32 
 33   def self.connect_to(hash)
 34      dbdir = "#{FileUtils.pwd}/data/db#{hash}"
 35      dbfile = "#{dbdir}/database.sqlite"
 36      #ActiveRecord::Base.establish_connection(:adapter => "sqlite", :dbfile => dbfile)
 37      ActiveRecord::Base.establish_connection(
 38                 :adapter => "jdbc", 
 39                 :dbfile => dbfile,
 40                 #:driver => "SQLite.JDBCDriver",         ####:driver => "org.sqlite.JDBC", 
 41                 #:url => "jdbc:sqlite:#{dbfile}"
 42                 :driver => "org.apache.derby.jdbc.EmbeddedDriver",
 43                 :url => "jdbc:derby:#{dbdir};create=true"
 44         )
 45      unless File.directory?(dbdir)
 46         
 47         
 48         CreateUsers.migrate(:up)
 49         CreateMessages.migrate(:up)
 50         CreateFollowers.migrate(:up)
 51      end
 52   end
 53   def self.test
 54      SchemaHelper.setup_schema_for("shahbhat")
 55      require 'user'
 56      for i in 0 ... 10 do
 57        SchemaHelper.connect_to(i)
 58        puts User.count
 59      end
 60   end
 61 end
 
 

No Comments »

No comments yet.

RSS feed for comments on this post. TrackBack URL

Leave a comment

You must be logged in to post a comment.

Powered by WordPress