Introduction
I have been a Twitter user for a while, have observed or heard about downtime and scalability problems with Twitter. The scalability of Twitter has become a topic for a lot of discussions and blogs and has also offered a useful excercise to design scalable systems. A common root cause as identified from Twitter’s blogs is that the architecture is based on CMS because it was written in Ruby on Rails and that is what Rails good at. The solution to the scalability problem as pointed by other people is messaging based architecture. There’s also been a lot of blame for Twitter’s problems on Ruby and Rails because Ruby is a slow language compared to other static and dynamic languages and Rails is not built for scalability. Though, there is some truth to it, but I don’t think there are the sole bottlenecks. In fact, I am going to show a small prototype written in Ruby and Rails (partially) that integrate with the messaging middlewares, which can be scaled easily. I have been using messaging middlwares such as CORBA Event service, IBM MQ series, Websphere, Weblogic, Tibco and ActiveMQ for over ten years and have long been proponent of messaging based sysems for scalable systems [1] [2]. So, I spent a few hours to put together a prototype based on messaging middleware and Ruby on Rails to see how such system can be developed.
Design Principles
Before describing my design, I am going to review some of the design principles that I have used for building large scale systems ([1], [2]), which are:
- Coupling/Cohesion – loosely coupled and shared nothing architecture and partitioning based on functionality.
- Messaging or SEDA architecture to implement reliable and scalable services and avoid temporal dependencies.
- Resource management – good old practices of avoiding leaks of memory, database connections, etc.
- Data replication especially read-only data.
- Partition data (Sharding) – using multiple databases to partition the data.
- Avoid single point of failure.
- Bring processing closer to the data.
- Design for service failures and crashes.
- Dynamic Resources – Design service frameworks so that resources can be removed or added automatically and clients can automatically discover them. Use virtualization and horizontal scalability whenever possible.
- Smart Caching – Cache expensive operations and contents as much as possible.
- Avoid distributed transactions, use optimistic compensating transactions (See CAP principle).
Architecture & Design
Following is high level architecture for the Microblogging system:
First, I selected REST architecture as an entry point to our system for both Web UI and 3rd party applications and used messaging middleware for implementing the sevices. This gives us ease of access with REST APIs and scalability with messaging. In my implementation I chose JRuby/Rails to implement most of the code, Derby for the database and ActiveMQ for the messaging store. In addition to scalability, the messaging middleware gives you a lot of advantages from functional languages like Erlang such as immutability, message passing, fault tolerance (via persistence queues). You can even build support for versioning and hot code swapping by adding version number to each message and creating routers (See integration patterns) to direct messages to different handlers.
APIs
Following are REST APIs that will be exposed to 3rd party apps, Web and other kind of UI:
Create User
POST /users
where the user information is passed in the body in the form of parameters.
Login/Authenticate User
POST /users/userid/sessions
This will authenticate the user and create a session. Note that most of following APIs send back session-id, will be stored in the database (sharded) and will be used to retrieve user information.
Logout
HTTP-HEADER
session-id
DELETE /users/userid/sessions
Get User information
HTTP-HEADER
session-id
GET /users/userid
This API will return detailed user information
Anonymous User information
GET /users/userid
This API will return public user information
List of Followings
HTTP-HEADER
session-id
GET /followings/userid
This API will return summary of people, the user is following.
Create Followers
HTTP-HEADER
session-id
POST /followers/followerid
This API will create one-way follower relationship between the user and follower.
Enable notification for Followers
HTTP-HEADER
session-id
POST /followers/followerid/notifications
Disable notification for Followers
HTTP-HEADER
session-id
DELETE /followers/followerid/notifications
Block Followers
HTTP-HEADER
session-id
POST /followers/followerid/blocking
Unblock Followers
HTTP-HEADER
session-id
DELETE /followers/followerid/blocking
Follower Exist
HTTP-HEADER
session-id
GET /followers/followerid
This API will return 200 HTTP code if follower exist.
List of Followers
HTTP-HEADER
session-id
GET /followers
This API will return summary of people, the user is following.
Archive Messages
HTTP-HEADER
session-id
GET /messages?offset=xxx&limit=yyy&since=date
This API will return archived messages for the user, where offset and limit will be optional.
DELETE Messages
HTTP-HEADER
session-id
DELETE /messages/message-id
This API will return archived messages for the user, where offset and limit will be optional.
Send Direct Messages
HTTP-HEADER
session-id
POST /directmessages/targetuserid
This API will return send direct message to the given user.
Send Reply
HTTP-HEADER
session-id
POST /reply/message-id
This API will return reply for the given message-id and pass the contents of the message in the body (as parameters).
Direct Messages Received
HTTP-HEADER
session-id
GET /directmessages/userid?offset=xxx&limit=yyy&since=date
This API will return messages received by the user.
Replies Received
HTTP-HEADER
session-id
GET /replies?offset=xxx&limit=yyy&since=date
This API will return replies received by the user.
Update Status
HTTP-HEADER
session-id
POST /statuses
This API will update status of the user and pass the contents of the message in the body (as parameters).
Get Statuses
HTTP-HEADER
session-id
GET /statuses?offset=xxx&limit=yyy&since=date
This API will update status of the user and pass the contents of the message in the body (as parameters).
User Timeline
HTTP-HEADER
session-id
GET /timeline/username?offset=xxx&limit=yyy&since=date
This API will return timeline of the user. This API will compare given username with the authenticated username and will return detailed timeline if match, otherwise it will return public timeline.
Public Timeline
GET /timeline/username?offset=xxx&limit=yyy&since=date
This API will return public timeline of the user.
Friends Timeline
HTTP-HEADER
session-id
GET /friendstimeline?offset=xxx&limit=yyy&since=date
This API will return timeline of the friends of the user.
Request Flow
Here is an illustration of how information is flowed through different components:
Though, I am not showing request flow of all APIs, but they will follow similar pattern of flow.
Detailed Design
Domain Classes
Primary domain classes are:
- User
- Message, which has four subclasses DirectMessage, ReplyMessage, Tweet and Status for various kind of messages in the system.
- Follower – creates one-way relationship between two users, where follower can choose to be notified when the user changes his/her status.
I identify each message with special GUID and using a simple scheme to generate GUID for request-ids and message-ids, but for real project I would recommend better libraries such as UUIDTools.
Schema
Followers
1 2 class CreateFollowers < ActiveRecord::Migration 3 def self.up 4 5 create_table :followers do |t| 6 t.column :username, :string, :limit => 16 7 t.column :follower_username, :string, :limit => 16 8 t.column :relation_type, :string, :default => 'Follower', :limit => 32 9 t.column :blocked, :boolean, :default => false 10 t.column :notifications, :boolean, :default => false 11 t.column :created_at, :datetime 12 t.column :updated_at, :datetime 13 t.column :deleted_at, :datetime 14 end 15 add_index :followers, :username 16 add_index :followers, :follower_username 17 end 18 19 def self.down 20 drop_table :followers 21 end 22 end
Messages
1 class CreateMessages < ActiveRecord::Migration 2 def self.up 3 create_table :messages do |t| 4 t.column :message_id, :string, :limit => 64 5 t.column :type, :string, :limit => 32 6 t.column :message_type, :string, :default => 'Say', :limit => 32 7 t.column :reply_message_id, :string, :limit => 64 8 t.column :username, :string, :limit => 16 9 t.column :channel_name, :string, :limit => 32 10 t.column :message_body, :string, :limit => 140 11 t.column :favorite, :boolean, :default => false 12 t.column :sent_at, :datetime, :default => Time.now.utc 13 t.column :created_at, :datetime 14 t.column :deleted_at, :datetime 15 end 16 add_index :messages, :message_id 17 add_index :messages, :username 18 end 19 20 def self.down 21 drop_table :messages 22 end 23 end
Users
1 class CreateUsers < ActiveRecord::Migration 2 def self.up 3 create_table :users do |t| 4 t.column :username, :string, :limit => 16 5 t.column :password, :string, :limit => 16 6 t.column :name, :string, :limit => 64 7 t.column :email, :string, :limit => 64 8 t.column :time_zone_id, :string, :limit => 32 9 t.column :created_at, :datetime 10 t.column :updated_at, :datetime 11 t.column :deleted_at, :datetime 12 end 13 add_index :users, :username 14 end 15 16 def self.down 17 drop_table :users 18 end 19 end
Persistence
I used Rails’ ActiveRecord library to provide persistence, though alternatively I could have used ActiveHibernate. These libraries provide a quick way to add persistence capabilities with minimum configuration and boilerplate. This prototype is using multiple levels of partitioning, first at the service level, second at the persistence level. I am using multiple databases of Derby to store objects using a simple hashing scheme for load distribution. This prototype also shows how to connect to multiple databases in Rails, which was difficult in early implementation of Twitter.
Domain Services
The core model and services use domain driven design and applies principles of fat model and thin service (as opposed to fat servicess and anemic model). The domain services implement external REST APIs and use underlying ActiveRecord for most of the functionality.
Messaging Middleware
The REST based web services don’t invoke domain services directly, instead they use messaging middleware. In real application, I might use ESB/integration patterns such as intelligent routing to partition the system across multiple machines and send the request to the suitable queue. In this prototype, I am simply using ActiveMQ, which is fairly robust and easy to use. I am also using separate queues for different kind of operations. Another lesson I have learned in building large systems is to separate reads from the writes so that you can scale them independently and also offer different quality of services, e.g. read queues can be non-persistent, but write queues can be persistent.
Business Delegate
The REST based web services don’t interact with the messaging middleware directly, instead they use business delegates that hides all details of sending out message, creating temporary queues and receiving messages. The interface of business delegates is same as services.
Benchmark Results
Though, performance was not the objective of my prototype, but I tried to check how many requests I can process on my development machine. I chose only to benchmark messaging middlewares and not REST server because JVM uses native threads and web containers such as Tomcat uses a small sized thread pool to perform requests. Since, our architecture is heavily IO-bound, that would not scale. Alternatively, I could have build reactive or event based APIs for HTTP or use Yaws/Mochiweb as a container for REST based web sevices because creating a process in Erlang is pretty cheap. For example, Erlang process takes 300 bytes, whereas Java thread take 2M by default (though, it can be reduced to 256K on most machines). Here are results of running a simple server with embedded ActiveMQ and load test both running JRuby on my Pentium Linux machine. I used default VM size for both JRuby processes and didn’t tune any options:
What | Elapsed Time (secs) | Throughput | Invocation Times |
load_test_create_users | 99.041000 | 10.09682857311318/s | 1000 |
load_test_bad_authenticate | 79.407000 | 12.593348183199506/s | 1000 |
load_test_good_authenticate | 91.802000 | 10.893008861477503/s | 1000 |
load_test_get_users | 91.362000 | 10.945469671474584/s | 1000 |
load_test_create_followers | 143.560000 | 6.958714408811979/s | 999 |
load_test_follower_exists | 95.140000 | 10.50020495537709/s | 999 |
load_test_get_followers | 99.881000 | 10.002002391423325/s | 999 |
load_test_get_followings | 104.737000 | 9.538176576655392/s | 999 |
load_test_block_follower | 156.106000 | 6.399497779340769/s | 999 |
load_test_unblock_follower | 161.950000 | 6.168532449211547/s | 999 |
load_test_follower_enable_notifications | 157.637000 | 6.337344665142635/s | 999 |
load_test_follower_disable_notifications | 161.714000 | 6.1775727524053545/s | 999 |
load_test_send_direct_message | 1673.494000 | 5.969546350480188/s | 9990 |
load_test_messages_sent_receive | 157.832000 | 6.323179075798668/s | 998 |
load_test_send_direct_message | 1725.717000 | 5.788898179687536/s | 9990 |
load_test_send_replies | 21.546000 | 5.105592926919201/s | 110 |
load_test_get_user_status | 86.952000 | 11.500598043963544/s | 1000 |
load_test_update_status | 213.298000 | 4.688276498896948/s | 1000 |
load_test_get_user_status | 88.987000 | 11.23759650430517/s | 1000 |
load_test_archive_messages | 80.982000 | 12.348575563593377/s | 1000 |
load_test_public_timeline | 98.143000 | 10.189213685309506/s | 1000 |
load_test_user_timeline | 92.871000 | 10.767623902461311/s | 1000 |
load_test_friends_timeline | 140.155000 | 7.1349577268319555/s | 1000 |
load_test_user_and_friends_timeline | 193.257000 | 5.174430032905596/s | 1000 |
I was not impressed with the results I got and I may implement similar prototype in Erlang using MochiWeb, Mnesia and Ejabber.
Summary
Though, the technologies I choose are somewhat arbitrary, the same design principles can be applied to other technology stack. Though, technologies are generaly selected due to political reasons or personal preference, it is important to consider team’s familiarity with the technology stack. For example, Twitter tried Ejabber but had problems troubleshooting becaues they were not familiar with Erlang.
Again, I used architecture principles such as stateless services, though in real life I may have to store some state which can be stored in the shareded database. I used embedded services such as embedded database server and messaging server because you can then easily start a single process on a machine and replicate machines as the load increases. I used partitioning/sharding heavily, which is key for scalability. I also used replication specially replicating messages for each follower so that the reads are fast and we don’t spend a lot of time querying the database. Though, it does add cost on the write side and adds disk requirements. I think we can control those by limiting number of messages per user per minute and maximum number of notifications. Also, we can remove old messages from the database. I also used principle of bringing computating closer to data by using embedded database.
Other Improvements
I tried to show that scalability problems are best solved from the architecture that is independent of a particular technology or language. In fact, I may implement similar design in Erlang using Mnesia as the database, Ejabber as the messaging middleware, Mochiweb for REST and OTP for implementing services. Other improvements that I would suggest be use of reverse proxy server for caching along with DMZ for security. We can also add some throttling to the reverse proxy servers. In order to have better fault tolerance, I might have multiple reverse proxy servers and use DNS round robin, though I don’t like DNS based load balance for real load distribution because it does not take server’s capacity and load into account, but it would suitable in this case. Also, I didn’t implement any caching, but we can use caching solutions such as EHCache, Tangosol, Terracotta, Memcache, etc. Though, caching highly dynamic contents may be difficult and less reusable. Also, I didn’t use any ESB (lightweight providers such as Mule, ServiceMix), but it can be used to abstract routing to the services, load balance, aggregation, replication, transformation, etc. Also, we can build support for versioning and hot code swapping by adding version number to each message and changing routing schemes. This prototype uses a simple scheme for partitioning by creating hash of business keys such as username, however it is difficult to manage when you need to add or remove servers because it requires migrating data. Another solution is to use MD5 scheme that we use at Amazon for S3, which may calculate MD5 of username and each replicated queue and select the queue whose md5-sum is higher than the username’s sum. For further fault tolerance, we could replicate data in Master/Slave fashion.
PS: I noticed “NativeException: javax.jms.JMSException: PermGen space” a couple of times running long load test, which is generally caused by reloading classes (as I have observed this for many years in Weblogic and Tomcat). I probably need to investigate this further but my guess is that JRuby is doing something dumb.
Download
Appendix
Source Code Model
./model/user.rb
1 ### 2 # User of the system. 3 # Though, in our system user can have many messages that it sent to update 4 # his/her status or other users and can also have followers and followings, but 5 # we are not showing those relationships because we cannot populate them due to 6 # sharding and scalability concerns. 7 8 9 ### 10 class User < ActiveRecord::Base 11 validates_presence_of :name 12 validates_presence_of :username, :within => 3..16 13 validates_presence_of :password, :within => 3..16 14 validates_length_of :email, :within => 3..64 15 validates_uniqueness_of :email, :case_sensitive => false 16 validates_format_of :email, :with => /^([^@\s]+)@((?:[-a-z0-9]+\.)+[a-z]{2,})$/i 17 # 18 ### all messages for the same user wil be stored in the same database. 19 ### also, all messages from the users he/she is following will be copied to the user's database 20 ### if they lie in different databases. 21 ### limiting rows returned from these relationships to 100 so that we don't overwhelm server. 22 # 23 has_many :statuses, :class_name => 'Message', :foreign_key => :username, :limit => 100, :order => 'sent_at desc' 24 has_many :tweets, :class_name => 'Message', :foreign_key => :channel_name, :limit => 100, :order => 'sent_at desc' 25 26 # 27 ### We will replicate follower/following to each database and though we may not have corresponding users, 28 ### but we can have usernames 29 # 30 def self.follower_usernames(username, offset, limit) 31 Follower.followers(username, offset, limit).map {|f| f.follower_username } 32 end 33 34 def self.following_usernames(username, offset, limit) 35 Follower.followings(username, offset, limit).map {|f| f.username } 36 end 37 38 def self.follower_count(username) 39 Follower.followers_count(username) 40 end 41 def self.following_count(username) 42 Follower.followers_count(username) 43 end 44 45 # 46 ### user's timeline 47 # 48 def self.timeline(username, offset, limit) 49 Message.find(:all, :conditions => ['username = ?', username], :offset => offset, :limit => limit) 50 end 51 52 def to_s 53 "#{username}" 54 end 55 56 def to_external_xml(xml) 57 xml.user(:username => self.username, :name => self.name, :email => self.email) 58 end 59 end
./model/message.rb
1 # 2 ### 3 # Message stores any message sent by user to update his/her status, tweets 4 # from other users that he/she is following, direct message sent to another 5 # user or reply message sent in response to tweet or direct message. 6 # Note that we use a GUID based message_id in conjunction with the database column 'id' 7 # because we are using sharding the database column can be duplicate, but the GUID 8 # based message_id will always be unique across all databases. 9 ### 10 class Message < ActiveRecord::Base 11 MESSAGE_TYPE_SAY = "Say" 12 MESSAGE_TYPE_SHOUT = "Shout" 13 # 14 validates_presence_of :message_id, :within => 1..64 15 validates_presence_of :username 16 # 17 ### channel_name will be username of the target user for now, 18 ### though, in future it could be a group or a topic 19 # 20 validates_presence_of :channel_name, :within => 1..32 21 validates_presence_of :message_body, :within => 1..140 22 validates_presence_of :message_type, :within => 1..32 23 validates_presence_of :sent_at 24 validates_uniqueness_of :message_id 25 # 26 ### all messages for the same user wil be stored in the same database. 27 ### also, all messages from the users he/she is following will be copied to the user's database 28 ### if they lie in different databases. 29 # 30 belongs_to :user, :class_name => 'User', :foreign_key => :username 31 32 def self.messages_for(username, offset, limit) 33 Message.find(:all, :conditions => ['username = ?', username], :offset => offset, :limit => limit, :order => 'sent_at desc') 34 end 35 36 def self.destroy_message(id) 37 message = Message.find_by_message_id(id) 38 if message 39 message.destroy 40 message 41 else 42 nil 43 end 44 end 45 46 47 def to_s 48 "#{username} -> #{channel_name}: #{message_body}" 49 end 50 51 def to_external_xml(xml) 52 xml.message(message_body, :message_id => self.message_id, :message_type => self.message_type, :from => self.username, :to => self.channel_name, :sent_at => sent_at.httpdate) 53 end 54 end
./model/reply_message.rb
1 ### 2 # Message sent in response to another tweet, message or direct message. 3 # 4 # commented below because message could be in different database. 5 # belongs_to :reply_message_id, :class_name => Message, :dependent => :destroy 6 ### 7 8 require 'direct_message' 9 10 class ReplyMessage < DirectMessage 11 validates_presence_of :reply_message_id, :within => 1..64 12 def self.replies(username, offset, limit) 13 ReplyMessage.find(:all, :conditions => ['username = ?', username], :offset => offset, :limit => limit, :order => 'sent_at desc') 14 end 15 end
./model/guid_generator.rb
1 class GuidGenerator 2 @@count = 1 3 def self.guid(prefix) 4 @@count += 1 5 "#{prefix}#{@@count}#{Time.now.to_i}" 6 end 7 def self.next_message_id 8 guid("message_id") 9 end 10 def self.next_request_id 11 guid("request_id") 12 end 13 end
./model/follower.rb
1 # 2 #### 3 # Follower stores one-way relation between two users. However, in order to 4 # support sharding, we don't directly connect users instead we only store 5 # users' usernames. 6 ### 7 class Follower < ActiveRecord::Base 8 RELATION_TYPE_FOLLOWER = "Follower" 9 RELATION_TYPE_FRIEND = "Friend" 10 RELATION_TYPE_FAMILY = "Family" 11 RELATION_TYPE_COLLEAGUE = "Colleague" 12 RELATION_TYPE_ACQUAINTENCE = "Acquaintance" 13 RELATION_TYPE_FAN = "Fan" 14 RELATION_TYPE_OTHER = "Other" 15 16 validates_presence_of :relation_type 17 #validates_presence_of :blocked 18 #validates_presence_of :notifications 19 20 def self.create_follower(follower_attrs) 21 Follower.create!(follower_attrs) unless exists?(follower_attrs[:username], follower_attrs[:follower_username]) 22 end 23 24 def self.followers(username, offset, limit) 25 self.find(:all, :conditions => ['username = ?', username], :offset => offset, :limit => limit) 26 end 27 def self.follower_count(username) 28 self.count(:conditions => ['username = ?', username]) 29 end 30 def self.following_count(username) 31 self.count(:conditions => ['follower_username = ?', username]) 32 end 33 34 def self.followings(username, offset, limit) 35 self.find(:all, :conditions => ['follower_username = ?', username], :offset => offset, :limit => limit) 36 end 37 def self.destroy_follower(username, follower_username) 38 self.get_record(username, follower_username).destroy 39 end 40 def self.exists?(username, follower_username) 41 !self.get_record(username, follower_username).nil? 42 end 43 def self.enable_notifications(username, follower_username) 44 self.get_record(username, follower_username).update_attributes(:notifications => true) 45 end 46 def self.disable_notifications(username, follower_username) 47 self.get_record(username, follower_username).update_attributes(:notifications => false) 48 end 49 def self.block(username, follower_username) 50 self.get_record(username, follower_username).update_attributes(:blocked => true) 51 end 52 def self.unblock(username, follower_username) 53 self.get_record(username, follower_username).update_attributes(:blocked => false) 54 end 55 def self.get_record(username, follower_username) 56 self.find(:first, :conditions => ['username = ? and follower_username = ?', username, follower_username]) 57 end 58 def to_s 59 "#{username} -> #{follower_username}" 60 end 61 def to_external_xml(xml) 62 xml.follower(:username => self.username, :follower_username => self.follower_username) 63 end 64 65 end
./model/direct_message.rb
1 #### 2 # Direct message is sent directly to another user 3 #### 4 class DirectMessage < Message 5 def to_user(user) 6 self.channel_name = user.username 7 end 8 def self.direct_messages_sent(username, offset, limit) 9 DirectMessage.find(:all, :conditions => ['username = ?', username], :offset => offset, :limit => limit, :order => 'sent_at desc') 10 end 11 def self.direct_messages_received(username, offset, limit) 12 DirectMessage.find(:all, :conditions => ['channel_name = ?', username], :offset => offset, :limit => limit, :order => 'sent_at desc') 13 end 14 end
Source Code Services
./service/base_service.rb
1 class BaseService 2 protected 3 def initialize(username) 4 SchemaHelper.setup_schema_for(username) 5 6 end 7 8 def serialize(obj, format) 9 if obj 10 format ||= 'xml' 11 case format 12 when 'xml' 13 obj.to_xml 14 when 'yaml' 15 obj.to_yaml 16 when 'json' 17 obj.to_json 18 else 19 obj 20 end 21 end 22 end 23 end
./service/users_service.rb
1 # 2 ## UserService provides basic functionality for quering/storing users 3 ## It assumes connection to the right database is already setup. 4 # 5 class UsersService < BaseService 6 # 7 ### initialize service 8 # 9 def initialize(username) 10 super 11 end 12 13 14 # 15 ### retrieves user record 16 # 17 def get_user(username, options={}) 18 User.find_by_username(username) 19 end 20 21 # 22 ### return usernames of people whom the user is following. 23 # 24 def followings(username, options={}) 25 offset = options[:offset] || 0 26 limit = options[:limit] || 100 27 User.following_usernames(username, offset, limit) 28 end 29 30 31 # 32 ### return usernames of followers of authenticated user 33 # 34 def followers(username, options={}) 35 offset = options[:offset] || 0 36 limit = options[:limit] || 100 37 User.follower_usernames(username, offset, limit) 38 end 39 40 41 def authenticate(username, password, options={}) 42 User.find_by_username_and_password(username, password) 43 end 44 45 46 def create_user(user_attrs, options={}) 47 user = User.new() 48 user.attributes = user_attrs 49 user.save! 50 user 51 end 52 53 def archive_messages(username, options={}) 54 offset = options[:offset] || 0 55 limit = options[:limit] || 100 56 Message.messages_for(username, offset, limit) 57 end 58 end
./service/followers_service.rb
1 # 2 ## FollowersService provides basic functionality for creating/quering followers 3 ## It assumes all users are already authenticated before calling these methods. 4 # 5 class FollowersService < BaseService 6 # 7 ### initialize service 8 # 9 def initialize(username) 10 super 11 end 12 # 13 ### create follower 14 # 15 def create_follower(follower_attrs, options={}) 16 Follower.create_follower(follower_attrs) 17 end 18 19 # 20 ### enable notifications 21 # 22 def enable_notifications(username, follower_username, options={}) 23 Follower.enable_notifications(username, follower_username) 24 end 25 26 # 27 ### disable notifications 28 # 29 def disable_notifications(username, follower_username, options={}) 30 Follower.disable_notifications(username, follower_username) 31 end 32 33 34 # 35 ### blocks a user 36 # 37 def block(username, follower_username, options={}) 38 Follower.block(username, follower_username) 39 end 40 41 # 42 ### unblock user 43 # 44 def unblock(username, follower_username, options={}) 45 Follower.unblock(username, follower_username) 46 end 47 48 49 # 50 ### delete status 51 # 52 def destroy_follower(username, follower_username, options={}) 53 Follower.destroy_follower(username, follower_username) 54 true 55 rescue ActiveRecord::RecordNotFound => e 56 false 57 end 58 59 60 # 61 ### get a single status 62 # 63 def exists?(username, follower_username, options={}) 64 Follower.exists?(username, follower_username) 65 end 66 end
./service/messages_service.rb
1 # 2 ## MessagesService provides basic functionality for quering/storing statuses/messages 3 ## It assumes all users are already authenticated before calling these methods. 4 # 5 class MessagesService < BaseService 6 # 7 ### initialize service 8 # 9 def initialize(username) 10 super 11 end 12 13 14 # 15 ### get a single status 16 # 17 def get_status(message_id, options={}) 18 Message.find_by_message_id(message_id) 19 end 20 21 # 22 ### update status 23 # 24 def update_status(message_attrs, options={}) 25 message = Tweet.new 26 message.attributes = message_attrs 27 safe_save(message) 28 end 29 30 # 31 ### delete status 32 # 33 def destroy_message(message_id, options={}) 34 Message.destroy_message(message_id) 35 end 36 37 38 39 # 40 ### retrieves most recent statuses for public 41 # 42 def public_timeline(username, options={}) 43 offset = options[:offset] || 0 44 limit = options[:limit] || 10 45 User.timeline(username, offset, limit) 46 end 47 48 # 49 ### retrieves most recent statuses for user 50 # 51 def user_timeline(username, options={}) 52 offset = options[:offset] || 0 53 limit = options[:limit] || 100 54 User.timeline(username, offset, limit) 55 end 56 57 58 # 59 ### retrieves most recent statuses for friends 60 # 61 def friends_timeline(username, options={}) 62 offset = options[:offset] 63 limit = options[:limit] 64 get_friends_timeline(username, offset, limit) 65 end 66 67 68 # 69 ### retrieves most recent statuses for user and his/her friends 70 # 71 def user_and_friends_timeline(username, options={}) 72 offset = options[:offset] 73 limit = options[:limit] 74 messages = User.timeline(username, offset, limit) 75 messages += get_friends_timeline(username, offset, limit) 76 end 77 78 private 79 80 # 81 ### retrieves most recent statuses for friends without any conversion 82 # 83 def get_friends_timeline(username, offset, limit) 84 offset = offset || 0 85 following_count = Follower.following_count(username) 86 limit = limit || (following_count < 100 ? following_count / 100 + 1 : 1) 87 88 following = User.following_usernames(username, 0, 200) 89 messages = [] 90 following.each do |username| 91 messages += Message.messages_for(username, offset, limit) 92 end 93 messages 94 end 95 protected 96 def safe_save(message) 97 old_message = Message.find_by_message_id(message.message_id) 98 if old_message.nil? 99 message.save! 100 message 101 else 102 puts "Message with id #{message.message_id} already exists #{message.inspect}" 103 nil 104 end 105 end 106 end
./service/direct_messages_service.rb
1 # 2 ## DirectMessagesService provides functionality for sending/receiving direct messages 3 # 4 class DirectMessagesService < MessagesService 5 # 6 ### initialize service 7 # 8 def initialize(username) 9 super 10 end 11 # 12 ### sents back 20 most recent direct messages for the user 13 ### it assumes user is already authenticated. 14 # 15 def direct_messages_received(username, options={}) 16 offset = options[:offset] || 0 17 limit = options[:limit] || 20 18 DirectMessage.direct_messages_received(username, offset, limit) 19 end 20 # 21 ### sents back 20 most recent direct messages for the user 22 ### it assumes user is already authenticated. 23 # 24 def direct_messages_sent(username, options={}) 25 offset = options[:offset] || 0 26 limit = options[:limit] || 20 27 DirectMessage.direct_messages_sent(username, offset, limit) 28 end 29 30 # 31 ### send direct message 32 # 33 def send_direct_message(message_attrs, options={}) 34 message = DirectMessage.new() 35 message.attributes = message_attrs 36 safe_save(message) 37 end 38 39 # 40 ### send reply message 41 # 42 def send_reply(message_attrs, options={}) 43 message = ReplyMessage.new() 44 message.attributes = message_attrs 45 safe_save(message) 46 end 47 48 # 49 ### sents back 20 most recent replies for the user 50 ### it assumes user is already authenticated. 51 # 52 def replies(username, options={}) 53 offset = options[:offset] || 0 54 limit = options[:limit] || 100 55 ReplyMessage.replies(username, offset, limit) 56 end 57 58 59 # 60 ### delete direct message 61 # 62 def destroy_direct_message(id, options={}) 63 DirectMessage.destroy_message(id) 64 end 65 end
Source Code Business Delegate
./delegate/base_delegate.rb
1 # 2 ## BaseDelegate provides client side interface for connecting to the business 3 ## service using BusinessDelegate pattern. 4 # 5 class BaseDelegate 6 protected 7 def initialize(jms_helper, read_queue_name, write_queue_name) 8 @read_queue_name = read_queue_name 9 @write_queue_name = write_queue_name 10 @jms_helper = jms_helper 11 @jms_helper.create_producers(@read_queue_name, @write_queue_name) 12 end 13 14 15 def new_jms_message(properties={}) 16 jms_msg = @jms_helper.create_message("") ### ActiveMQTextMessage.new 17 jms_msg.setStringProperty('request_id', GuidGenerator.next_request_id) 18 properties.each do |name, value| 19 jms_msg.setStringProperty(name.to_s, value.to_s) 20 end 21 jms_msg 22 end 23 24 25 def send_read_request(jms_msg) 26 @jms_helper.send_message(@read_queue_name, jms_msg, true).text 27 end 28 29 30 def send_write_request(jms_msg, reply=true) 31 response = @jms_helper.send_message(@write_queue_name, jms_msg, reply) 32 if response.respond_to? :text 33 response.text 34 else 35 response 36 end 37 end 38 end
./delegate/users_delegate.rb
1 # 2 ## UserDelegate provides basic functionality for quering/storing users 3 ## It assumes connection to the right database is already setup. 4 # 5 class UsersDelegate < BaseDelegate 6 def initialize(jms_helper) 7 super(jms_helper, JmsHelper::QUEUE_READ_USERS, JmsHelper::QUEUE_WRITE_USERS) 8 end 9 def create_user(user_attrs, options={}) 10 args = options.merge(user_attrs).merge(:action => 'Users.create_user') 11 jms_msg = new_jms_message(args) 12 send_write_request(jms_msg) 13 14 end 15 16 # 17 ### retrieves user record 18 # 19 def get_user(username, options={}) 20 jms_msg = new_jms_message(options.merge(:username => username, :action => 'Users.get_user')) 21 send_read_request(jms_msg) 22 23 end 24 25 # 26 ### return usernames of people the user is following 27 # 28 def followings(username, options={}) 29 jms_msg = new_jms_message(options.merge(:username => username, :action => 'Users.followings')) 30 send_read_request(jms_msg) 31 32 end 33 34 35 # 36 ### return 100 usernames of followers of authenticated user 37 # 38 def followers(username, options={}) 39 jms_msg = new_jms_message(options.merge(:username => username, :action => 'Users.followers')) 40 send_read_request(jms_msg) 41 42 end 43 44 45 def authenticate(username, password, options={}) 46 jms_msg = new_jms_message(options.merge(:username => username, :password => password, :action => 'Users.authenticate')) 47 send_read_request(jms_msg) 48 49 end 50 51 52 def archive_messages(username, options={}) 53 jms_msg = new_jms_message(options.merge(:username => username, :action => 'Users.archive_messages')) 54 send_read_request(jms_msg) 55 56 end 57 end
./delegate/messages_delegate.rb
1 # 2 ## MessagesDelegate provides basic functionality for quering/storing statuses/messages 3 ## It assumes all users are already authenticated before calling these methods. 4 # 5 class MessagesDelegate < BaseDelegate 6 def initialize(jms_helper) 7 super(jms_helper, JmsHelper::QUEUE_READ_MESSAGES, JmsHelper::QUEUE_WRITE_MESSAGES) 8 end 9 # 10 ### get a single status 11 # 12 def get_status(message_id, options={}) 13 jms_msg = new_jms_message(options.merge(:message_id => message_id, :action => 'Messages.get_status')) 14 send_read_request(jms_msg) 15 16 end 17 18 # 19 ### update status 20 # 21 def update_status(message_attrs, options={}) 22 jms_msg = new_jms_message({:message_id => GuidGenerator.next_message_id, :action => 'Messages.update_status'}.merge(options).merge(message_attrs)) 23 send_write_request(jms_msg) 24 25 end 26 27 # 28 ### delete status 29 # 30 def destroy_message(message_id, options={}) 31 jms_msg = new_jms_message(options.merge(:message_id => message_id, :action => 'Messages.destroy_message')) 32 send_write_request(jms_msg) 33 34 end 35 36 37 38 # 39 ### retrieves most recent statuses for public 40 # 41 def public_timeline(username, options={}) 42 jms_msg = new_jms_message(options.merge(:username => username, :action => 'Messages.public_timeline')) 43 send_read_request(jms_msg) 44 45 end 46 47 # 48 ### retrieves most recent statuses for user 49 # 50 def user_timeline(username, options={}) 51 jms_msg = new_jms_message(options.merge(:username => username, :action => 'Messages.user_timeline')) 52 send_read_request(jms_msg) 53 54 end 55 56 57 # 58 ### retrieves most recent statuses for friends 59 # 60 def friends_timeline(username, options={}) 61 jms_msg = new_jms_message(options.merge(:username => username, :action => 'Messages.friends_timeline')) 62 send_read_request(jms_msg) 63 64 end 65 66 67 # 68 ### retrieves most recent statuses for user and his/her friends 69 # 70 def user_and_friends_timeline(username, options={}) 71 jms_msg = new_jms_message(options.merge(:username => username, :action => 'Messages.user_and_friends_timeline')) 72 send_read_request(jms_msg) 73 74 end 75 end
./delegate/followers_delegate.rb
1 # 2 ## FollowersDelegate provides basic functionality for creating/quering followers 3 ## It assumes all users are already authenticated before calling these methods. 4 # 5 class FollowersDelegate < BaseDelegate 6 def initialize(jms_helper) 7 super(jms_helper, JmsHelper::QUEUE_READ_FOLLOWERS, JmsHelper::QUEUE_WRITE_FOLLOWERS) 8 end 9 # 10 ### create follower 11 # 12 def create_follower(follower_attrs, options={}) 13 jms_msg = new_jms_message(options.merge(follower_attrs).merge(:action => 'Followers.create_follower')) 14 send_write_request(jms_msg) 15 16 end 17 18 # 19 ### enable notifications 20 # 21 def enable_notifications(username, follower_username, options={}) 22 jms_msg = new_jms_message(options.merge(:username => username, :follower_username => follower_username, :action => 'Followers.enable_notifications')) 23 send_write_request(jms_msg) 24 25 end 26 27 # 28 ### disable notifications 29 # 30 def disable_notifications(username, follower_username, options={}) 31 jms_msg = new_jms_message(options.merge(:username => username, :follower_username => follower_username, :action => 'Followers.disable_notifications')) 32 send_write_request(jms_msg) 33 34 end 35 36 37 # 38 ### blocks a user 39 # 40 def block(username, follower_username, options={}) 41 jms_msg = new_jms_message(options.merge(:username => username, :follower_username => follower_username, :action => 'Followers.block')) 42 send_write_request(jms_msg) 43 44 end 45 46 # 47 ### unblock user 48 # 49 def unblock(username, follower_username, options={}) 50 jms_msg = new_jms_message(options.merge(:username => username, :follower_username => follower_username, :action => 'Followers.unblock')) 51 send_write_request(jms_msg) 52 53 end 54 55 56 # 57 ### delete status 58 # 59 def destroy_follower(username, follower_username, options={}) 60 jms_msg = new_jms_message(options.merge(:username => username, :follower_username => follower_username, :action => 'Followers.destroy_follower')) 61 send_write_request(jms_msg) 62 63 end 64 65 66 # 67 ### get a single status 68 # 69 def exists?(username, follower_username, options={}) 70 jms_msg = new_jms_message(options.merge(:username => username, :follower_username => follower_username, :action => 'Followers.exists?')) 71 send_read_request(jms_msg) 72 73 end 74 end
./delegate/direct_messages_delegate.rb
1 # 2 ## DirectMessagesDelegate provides functionality for sending/receiving direct messages 3 # 4 class DirectMessagesDelegate < MessagesDelegate 5 def initialize(jms_helper) 6 super(jms_helper) 7 end 8 9 # 10 ### sents back 20 most recent direct messages for the user 11 ### it assumes user is already authenticated. 12 # 13 def direct_messages_received(username, options={}) 14 jms_msg = new_jms_message(options.merge(:username => username, :action => 'DirectMessages.direct_messages_received')) 15 send_read_request(jms_msg) 16 17 end 18 # 19 ### sents back 20 most recent direct messages for the user 20 ### it assumes user is already authenticated. 21 # 22 def direct_messages_sent(username, options={}) 23 jms_msg = new_jms_message(options.merge(:username => username, :action => 'DirectMessages.direct_messages_sent')) 24 send_read_request(jms_msg) 25 26 end 27 28 # 29 ### send direct message 30 # 31 def send_direct_message(message_attrs, options={}) 32 jms_msg = new_jms_message({:message_id => GuidGenerator.next_message_id, :action => 'DirectMessages.send_direct_message'}.merge(options).merge(message_attrs)) 33 send_write_request(jms_msg) 34 35 end 36 37 # 38 ### send reply message 39 # 40 def send_reply(message_attrs, options={}) 41 jms_msg = new_jms_message({:message_id => GuidGenerator.next_message_id, :action => 'DirectMessages.send_reply'}.merge(options).merge(message_attrs)) 42 send_write_request(jms_msg) 43 44 end 45 46 # 47 ### sents back 20 most recent replies for the user 48 ### it assumes user is already authenticated. 49 # 50 def replies(username, options={}) 51 jms_msg = new_jms_message(options.merge(:username => username, :action => 'DirectMessages.replies')) 52 send_read_request(jms_msg) 53 54 end 55 56 57 # 58 ### delete direct message 59 # 60 def destroy_direct_message(message_id, options={}) 61 jms_msg = new_jms_message(options.merge(:message_id => message_id, :action => 'DirectMessages.destroy_direct_message')) 62 send_write_request(jms_msg) 63 64 end 65 end
Source Code Message Handlers
./messaging/base_handler.rb
1 # 2 ## BaseHandler provides common functionality for handling messages 3 # 4 class BaseHandler 5 attr_reader :username 6 attr_reader :message_text 7 attr_reader :timings 8 def initialize 9 @timings = [] 10 @started_at = Time.now 11 @host_id = Digest::MD5.hexdigest("localhost") #lookup real host 12 end 13 14 def handle(jms_msg, options={}) 15 add_timing_begin 16 msg_attrs = {} 17 pnames = jms_msg.getPropertyNames() 18 for pname in pnames 19 msg_attrs[pname] = msg_attrs[pname.to_sym] = jms_msg.getObjectProperty(pname) 20 case pname 21 when 'username' 22 @username = msg_attrs[pname] 23 when 'auth_user' 24 @auth_user = msg_attrs[pname] 25 when 'request_id' 26 @request_id = msg_attrs[pname] 27 end 28 end 29 if @username.nil? 30 raise "username missing in #{jms_msg.inspect} options #{options.inspect}" 31 end 32 if jms_msg.respond_to? :text 33 #msg_attrs.merge!(xml_to_dict(jms_msg.text)) 34 msg_attrs[:message_text] = jms_msg.text 35 end 36 ## 37 add_timing_for_input_xml 38 response = do_handle(msg_attrs) 39 ## 40 add_timing_end 41 response 42 rescue ActiveRecord::RecordInvalid => invalid 43 response = xml_error("400", "handle", invalid.record.errors) 44 add_timing_end 45 response 46 rescue java.sql.SQLException => e 47 puts "Failed to handle #{self.class.name}(#{jms_msg} due to #{e.inspect}\n\n" 48 while (e) 49 puts e.backtrace << "\n\n" 50 e = e.getNextException() 51 end 52 response = xml_error("500", "handle", {"UNKNOWN_ERROR" => e.to_s}) 53 add_timing_end 54 response 55 rescue Exception => e 56 puts "Failed to handle #{self.class.name}(#{jms_msg} due to #{e.inspect}\n\n#{e.backtrace}" 57 response = xml_error("500", "handle", {"UNKNOWN_ERROR" => e.to_s}) 58 add_timing_end 59 response 60 end 61 62 protected 63 def username_for_service 64 @auth_user || @username 65 end 66 67 def add_timing(what) 68 @timings << {what, (Time.now.to_i-@started_at.to_i)} 69 end 70 def add_timing_begin 71 add_timing "#{self.class.name} begin" 72 end 73 def add_timing_end 74 add_timing "#{self.class.name} end" 75 end 76 def add_timing_for_db 77 add_timing "#{self.class.name} connected to db" 78 end 79 def add_timing_for_output_xml 80 add_timing "#{self.class.name} converting output xml" 81 end 82 def add_timing_for_input_xml 83 add_timing "#{self.class.name} converting input xml" 84 end 85 86 def do_handle(msg_attrs, options={}) 87 raise "Implement handle" 88 end 89 def xml_to_dict(xml) 90 Hash.from_xml(xml) 91 end 92 def xml_error(response_code, action, errors, options={}) 93 #logger.error "Failed to #{action} due to #{error_code} by #{username} -- #{error_message}" 94 to_xml(response_code, "Error response for #{action}", options) do |xml| 95 xml.errors do 96 errors.each do |error_code, error_message| 97 xml.error(:error_code => error_code, :error_message => error_message) 98 end 99 end 100 end 101 end 102 103 def to_xml(response_code, comment="", options={}) 104 buffer = options[:buffer] ||= '' 105 106 xml = options[:builder] ||= Builder::XmlMarkup.new(:target => buffer, :indent => options[:indent]) 107 xml.instruct! unless options[:skip_instruct] 108 xml.comment! "Response #{comment} in reply for #{username} as of #{Time.now.utc}" 109 xml.response(:request_id => @request_id, :response_code => response_code, :host_id => @host_id, :version => '1.0') do 110 yield xml if block_given? 111 xml.timings do 112 @timings.each do |what, duration| 113 xml.timing(:what => what, :duration_millis => duration) 114 end 115 end 116 end 117 buffer 118 end 119 end
./messaging/archive_messages_handler.rb
1 # 2 ## ArchiveMessagesHandler sends back archive messages for the user 3 # 4 class ArchiveMessagesHandler < BaseHandler 5 protected 6 def do_handle(msg_attrs, options={}) 7 svc = UsersService.new(self.username_for_service) 8 add_timing_for_db 9 messages = svc.archive_messages(self.username, options) 10 add_timing_for_output_xml 11 to_xml("200", "Archived Messages", options) do |xml| 12 xml.messages do 13 messages.each do |message| 14 message.to_external_xml(xml) 15 end 16 end 17 end 18 end 19 end
./messaging/authenticate_handler.rb
1 ## AuthenticateHandler authenticates user 2 # 3 class AuthenticateHandler < BaseHandler 4 protected 5 def do_handle(msg_attrs, options={}) 6 password = msg_attrs[:password] 7 svc = UsersService.new(self.username_for_service) 8 add_timing_for_db 9 user = svc.authenticate(username, password, options) 10 add_timing_for_output_xml 11 if user 12 to_xml("200", "Authentication", options) do |xml| 13 user.to_external_xml(xml) 14 end 15 else 16 xml_error("401", "Authentication", {"AUTH_FAILURE" => "Invalid username/password"}) 17 end 18 end 19 end
./messaging/block_follower_handler.rb
1 ## BlockFollowerHandler blocks follower 2 # 3 class BlockFollowerHandler < BaseHandler 4 protected 5 def do_handle(msg_attrs, options={}) 6 svc = FollowersService.new(self.username_for_service) 7 follower_username = msg_attrs[:follower_username] 8 add_timing_for_db 9 blocked = svc.block(self.username, follower_username, options) 10 response_code = blocked ? "200" : "404" 11 add_timing_for_output_xml 12 to_xml(response_code, "Block Follower", options) do |xml| 13 xml.follower(:username => self.username, :follower_username => follower_username) 14 end 15 end 16 end
Messaging Helpers
./messaging/message_forwarder.rb
1 class MessageForwarder 2 include java.lang.Runnable 3 include javax.jms.MessageListener 4 5 def initialize(jms_helper, queue_name, handlers) 6 @jms_helper = jms_helper 7 @queue_name = queue_name 8 @handlers = handlers 9 end 10 11 def run 12 @consumer = @jms_helper.get_consumer(@queue_name) 13 @consumer.set_message_listener(self); 14 puts "Starting listener for queue #{@queue_name}" 15 end 16 17 def onMessage(jms_msg) 18 action = jms_msg.getProperty('action') 19 if action.nil? 20 puts "Unknown message #{jms_msg.inspect}" 21 else 22 handler = @handlers[action] 23 if handler.nil? 24 puts "No handler for #{action} -- #{jms_msg.inspect}" 25 else 26 27 response = handler.new().handle(jms_msg) 28 29 if jms_msg.getJMSReplyTo() 30 reply_queue = jms_msg.getJMSReplyTo() 31 reply_producer = @jms_helper.create_producer(reply_queue) 32 reply_message = @jms_helper.create_message(response) 33 reply_message.setJMSCorrelationID(jms_msg.getJMSMessageID()) 34 reply_producer.send(reply_message) 35 end 36 end 37 end 38 end 39 40 def close 41 @consumer.close(); 42 end 43 end
./messaging/jms_helper.rb
1 2 class JmsHelper 3 #Object.module_eval("::#{i}") 4 #http://java.sun.com/j2ee/1.4/docs/tutorial/doc/JMS6.html 5 # 6 ### A key tip for scalability is to separate your reads from writes that way you can scale them independently and also 7 ### offer different quality of services, e.g. read queues can be non-persistent, but write queues can be persistent. 8 ### 9 # 10 QUEUE_READ_MESSAGES = 'read_messages' 11 QUEUE_WRITE_MESSAGES = 'write_messages' 12 QUEUE_READ_FOLLOWERS = 'read_followers' 13 QUEUE_WRITE_FOLLOWERS = 'write_followers' 14 QUEUE_READ_USERS = 'read_users' 15 QUEUE_WRITE_USERS = 'write_users' 16 17 READ_MESSAGES_HANDLERS = { 18 'DirectMessages.direct_messages_received' => DirectMessagesReceivedHandler, 19 'DirectMessages.direct_messages_sent' => DirectMessagesSentHandler, 20 'DirectMessages.replies' => RepliesHandler, 21 'Messages.get_status' => GetStatusHandler, 22 'Messages.public_timeline' => PublicTimelineHandler, 23 'Messages.user_timeline' => UserTimelineHandler, 24 'Messages.friends_timeline' => FriendsTimelineHandler, 25 'Messages.user_and_friends_timeline' => UserFriendTimelineHandler, 26 } 27 WRITE_MESSAGES_HANDLERS = { 28 'DirectMessages.send_direct_message' => SendDirectMessageHandler, 29 'DirectMessages.send_reply' => SendReplyMessageHandler, 30 'DirectMessages.destroy_direct_message' => DestroyDirectMessageHandler, 31 'Messages.update_status' => UpdateStatusHandler, 32 'Messages.destroy_message' => DestroyMessageHandler, 33 } 34 READ_FOLLOWERS_HANDLERS = { 35 'Followers.exists?' => FollowerExistsHandler, 36 } 37 WRITE_FOLLOWERS_HANDLERS = { 38 'Followers.create_follower' => CreateFollowerHandler, 39 'Followers.enable_notifications' => EnableNotificationsHandler, 40 'Followers.disable_notifications' => DisableNotificationsHandler, 41 'Followers.block' => BlockFollowerHandler, 42 'Followers.unblock' => UnblockFollowerHandler, 43 'Followers.destroy_follower' => DestroyFollowerHandler, 44 } 45 READ_USERS_HANDLERS = { 46 'Users.get_user' => GetUserHandler, 47 'Users.followings' => FollowingsHandler, 48 'Users.followers' => FollowersHandler, 49 'Users.authenticate' => AuthenticateHandler, 50 'Users.archive_messages' => ArchiveMessagesHandler, 51 } 52 WRITE_USERS_HANDLERS = { 53 'Users.create_user' => CreateUserHandler, 54 } 55 56 def initialize(connection=nil) 57 if connection 58 @connection = connection 59 else 60 factory = ActiveMQConnectionFactory.new("tcp://localhost:61616") 61 @connection = factory.create_connection() 62 @connection.set_exception_listener(self) 63 @connection.start(); 64 end 65 @queues = {} 66 @producers = {} 67 @consumers = {} 68 end 69 70 def start_consumers 71 ### Creating another instance of jms helper because it keeps private session which cannot be shared 72 ### across threads, though it can share connection. 73 start_forwarder(QUEUE_READ_MESSAGES, READ_MESSAGES_HANDLERS) 74 start_forwarder(QUEUE_WRITE_MESSAGES, WRITE_MESSAGES_HANDLERS) 75 start_forwarder(QUEUE_READ_FOLLOWERS, READ_FOLLOWERS_HANDLERS) 76 start_forwarder(QUEUE_WRITE_FOLLOWERS, WRITE_FOLLOWERS_HANDLERS) 77 start_forwarder(QUEUE_READ_USERS, READ_USERS_HANDLERS) 78 start_forwarder(QUEUE_WRITE_USERS, WRITE_USERS_HANDLERS) 79 # 80 java.lang.Thread.currentThread().join 81 end 82 83 def onException(jmsEx) 84 puts "JMS Exception occured #{jmsEx.inspect}, shutting down" 85 end 86 87 def close 88 @connection.close() 89 @session.close if @session 90 end 91 92 def create_message(text) 93 get_session().createTextMessage(text) 94 end 95 96 def send_message(queue_name, jms_msg, reply=false) 97 ## TODO add error_queue/dead letter queue 98 if reply 99 reply_queue = get_session().createTemporaryQueue() 100 jms_msg.setJMSReplyTo(reply_queue) 101 end 102 103 get_producer(queue_name).send(get_queue(queue_name), jms_msg) 104 if reply_queue 105 reply_consumer = get_session().createConsumer(reply_queue) 106 reply_consumer.receive() 107 end 108 end 109 110 def get_producer(queue_name) 111 producer = @producers[queue_name] 112 if producer.nil? 113 producer = @producers[queue_name] = get_session().createProducer(get_queue(queue_name)) 114 #producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT) # this can be changed for write queues 115 #producer.setTimeToLive(60000) 116 end 117 producer 118 end 119 def get_queue(queue_name) 120 queue = @queues[queue_name] 121 if queue.nil? 122 queue = @queues[queue_name] = get_session().create_queue(queue_name) 123 end 124 queue 125 end 126 127 def get_consumer(queue_name) 128 consumer = @consumers[queue_name] 129 if consumer.nil? 130 consumer = @consumers[queue_name] = get_session().create_consumer(get_queue(queue_name)) 131 end 132 consumer 133 end 134 135 def create_producers(*queue_names) 136 for queue_name in queue_names 137 queue = get_queue(queue_name) 138 producer = get_producer(queue_name) 139 end 140 end 141 142 def create_producer(queue) 143 get_session().createProducer(queue) 144 end 145 146 def self.start_broker 147 broker = BrokerService.new() 148 broker.addConnector("tcp://localhost:61616") 149 broker.start() 150 sleep 2 # let broker startup 151 end 152 protected 153 def get_session 154 @session ||= @connection.create_session(false, Session::AUTO_ACKNOWLEDGE) 155 end 156 def start_forwarder(queue_name, handlers) 157 raise "null connection" if @connection.nil? 158 jms_helper = JmsHelper.new(@connection) 159 Thread.new(){MessageForwarder.new(jms_helper, queue_name, handlers).run} 160 end 161 end 162 163 if $PROGRAM_NAME == __FILE__ 164 JmsHelper.start_broker 165 JmsHelper.new.start_consumers 166 end
I am not showing all message handlers because they look pretty much the same.
Database
./db/schema_helper.rb
1 require 'rubygems' 2 require 'un' 3 require 'active_record' 4 require 'create_followers' 5 require 'create_messages' 6 require 'create_users' 7 8 if defined?(JRUBY_VERSION) 9 gem 'activerecord-jdbc-adapter' 10 require 'jdbc_adapter' 11 require 'java' 12 end 13 14 15 class SchemaHelper 16 MAX_DB = 10 17 # 18 def self.create_schema() 19 for i in 0 ... MAX_DB 20 connect_to(i) 21 end 22 end 23 24 def self.hash_for(username) 25 hash = username.hash % MAX_DB 26 end 27 28 def self.setup_schema_for(username) 29 hash = hash_for(username) 30 connect_to(hash) 31 end 32 33 def self.connect_to(hash) 34 dbdir = "#{FileUtils.pwd}/data/db#{hash}" 35 dbfile = "#{dbdir}/database.sqlite" 36 #ActiveRecord::Base.establish_connection(:adapter => "sqlite", :dbfile => dbfile) 37 ActiveRecord::Base.establish_connection( 38 :adapter => "jdbc", 39 :dbfile => dbfile, 40 #:driver => "SQLite.JDBCDriver", ####:driver => "org.sqlite.JDBC", 41 #:url => "jdbc:sqlite:#{dbfile}" 42 :driver => "org.apache.derby.jdbc.EmbeddedDriver", 43 :url => "jdbc:derby:#{dbdir};create=true" 44 ) 45 unless File.directory?(dbdir) 46 47 48 CreateUsers.migrate(:up) 49 CreateMessages.migrate(:up) 50 CreateFollowers.migrate(:up) 51 end 52 end 53 def self.test 54 SchemaHelper.setup_schema_for("shahbhat") 55 require 'user' 56 for i in 0 ... 10 do 57 SchemaHelper.connect_to(i) 58 puts User.count 59 end 60 end 61 end