Introduction
I have been a Twitter user for a while, have observed or heard about downtime and scalability problems with Twitter. The scalability of Twitter has become a topic for a lot of discussions and blogs and has also offered a useful excercise to design scalable systems. A common root cause as identified from Twitter’s blogs is that the architecture is based on CMS because it was written in Ruby on Rails and that is what Rails good at. The solution to the scalability problem as pointed by other people is messaging based architecture. There’s also been a lot of blame for Twitter’s problems on Ruby and Rails because Ruby is a slow language compared to other static and dynamic languages and Rails is not built for scalability. Though, there is some truth to it, but I don’t think there are the sole bottlenecks. In fact, I am going to show a small prototype written in Ruby and Rails (partially) that integrate with the messaging middlewares, which can be scaled easily. I have been using messaging middlwares such as CORBA Event service, IBM MQ series, Websphere, Weblogic, Tibco and ActiveMQ for over ten years and have long been proponent of messaging based sysems for scalable systems [1] [2]. So, I spent a few hours to put together a prototype based on messaging middleware and Ruby on Rails to see how such system can be developed.
Design Principles
Before describing my design, I am going to review some of the design principles that I have used for building large scale systems ([1], [2]), which are:
- Coupling/Cohesion – loosely coupled and shared nothing architecture and partitioning based on functionality.
- Messaging or SEDA architecture to implement reliable and scalable services and avoid temporal dependencies.
- Resource management – good old practices of avoiding leaks of memory, database connections, etc.
- Data replication especially read-only data.
- Partition data (Sharding) – using multiple databases to partition the data.
- Avoid single point of failure.
- Bring processing closer to the data.
- Design for service failures and crashes.
- Dynamic Resources – Design service frameworks so that resources can be removed or added automatically and clients can automatically discover them. Use virtualization and horizontal scalability whenever possible.
- Smart Caching – Cache expensive operations and contents as much as possible.
- Avoid distributed transactions, use optimistic compensating transactions (See CAP principle).
Architecture & Design
Following is high level architecture for the Microblogging system:
First, I selected REST architecture as an entry point to our system for both Web UI and 3rd party applications and used messaging middleware for implementing the sevices. This gives us ease of access with REST APIs and scalability with messaging. In my implementation I chose JRuby/Rails to implement most of the code, Derby for the database and ActiveMQ for the messaging store. In addition to scalability, the messaging middleware gives you a lot of advantages from functional languages like Erlang such as immutability, message passing, fault tolerance (via persistence queues). You can even build support for versioning and hot code swapping by adding version number to each message and creating routers (See integration patterns) to direct messages to different handlers.
APIs
Following are REST APIs that will be exposed to 3rd party apps, Web and other kind of UI:
Create User
POST /users
where the user information is passed in the body in the form of parameters.
Login/Authenticate User
POST /users/userid/sessions
This will authenticate the user and create a session. Note that most of following APIs send back session-id, will be stored in the database (sharded) and will be used to retrieve user information.
Logout
HTTP-HEADER
session-id
DELETE /users/userid/sessions
Get User information
HTTP-HEADER
session-id
GET /users/userid
This API will return detailed user information
Anonymous User information
GET /users/userid
This API will return public user information
List of Followings
HTTP-HEADER
session-id
GET /followings/userid
This API will return summary of people, the user is following.
Create Followers
HTTP-HEADER
session-id
POST /followers/followerid
This API will create one-way follower relationship between the user and follower.
Enable notification for Followers
HTTP-HEADER
session-id
POST /followers/followerid/notifications
Disable notification for Followers
HTTP-HEADER
session-id
DELETE /followers/followerid/notifications
Block Followers
HTTP-HEADER
session-id
POST /followers/followerid/blocking
Unblock Followers
HTTP-HEADER
session-id
DELETE /followers/followerid/blocking
Follower Exist
HTTP-HEADER
session-id
GET /followers/followerid
This API will return 200 HTTP code if follower exist.
List of Followers
HTTP-HEADER
session-id
GET /followers
This API will return summary of people, the user is following.
Archive Messages
HTTP-HEADER
session-id
GET /messages?offset=xxx&limit=yyy&since=date
This API will return archived messages for the user, where offset and limit will be optional.
DELETE Messages
HTTP-HEADER
session-id
DELETE /messages/message-id
This API will return archived messages for the user, where offset and limit will be optional.
Send Direct Messages
HTTP-HEADER
session-id
POST /directmessages/targetuserid
This API will return send direct message to the given user.
Send Reply
HTTP-HEADER
session-id
POST /reply/message-id
This API will return reply for the given message-id and pass the contents of the message in the body (as parameters).
Direct Messages Received
HTTP-HEADER
session-id
GET /directmessages/userid?offset=xxx&limit=yyy&since=date
This API will return messages received by the user.
Replies Received
HTTP-HEADER
session-id
GET /replies?offset=xxx&limit=yyy&since=date
This API will return replies received by the user.
Update Status
HTTP-HEADER
session-id
POST /statuses
This API will update status of the user and pass the contents of the message in the body (as parameters).
Get Statuses
HTTP-HEADER
session-id
GET /statuses?offset=xxx&limit=yyy&since=date
This API will update status of the user and pass the contents of the message in the body (as parameters).
User Timeline
HTTP-HEADER
session-id
GET /timeline/username?offset=xxx&limit=yyy&since=date
This API will return timeline of the user. This API will compare given username with the authenticated username and will return detailed timeline if match, otherwise it will return public timeline.
Public Timeline
GET /timeline/username?offset=xxx&limit=yyy&since=date
This API will return public timeline of the user.
Friends Timeline
HTTP-HEADER
session-id
GET /friendstimeline?offset=xxx&limit=yyy&since=date
This API will return timeline of the friends of the user.
Request Flow
Here is an illustration of how information is flowed through different components:
Though, I am not showing request flow of all APIs, but they will follow similar pattern of flow.
Detailed Design
Domain Classes
Primary domain classes are:
- User
- Message, which has four subclasses DirectMessage, ReplyMessage, Tweet and Status for various kind of messages in the system.
- Follower – creates one-way relationship between two users, where follower can choose to be notified when the user changes his/her status.
I identify each message with special GUID and using a simple scheme to generate GUID for request-ids and message-ids, but for real project I would recommend better libraries such as UUIDTools.
Schema
Followers
1
2 class CreateFollowers < ActiveRecord::Migration
3 def self.up
4
5 create_table :followers do |t|
6 t.column :username, :string, :limit => 16
7 t.column :follower_username, :string, :limit => 16
8 t.column :relation_type, :string, :default => 'Follower', :limit => 32
9 t.column :blocked, :boolean, :default => false
10 t.column :notifications, :boolean, :default => false
11 t.column :created_at, :datetime
12 t.column :updated_at, :datetime
13 t.column :deleted_at, :datetime
14 end
15 add_index :followers, :username
16 add_index :followers, :follower_username
17 end
18
19 def self.down
20 drop_table :followers
21 end
22 end
Messages
1 class CreateMessages < ActiveRecord::Migration
2 def self.up
3 create_table :messages do |t|
4 t.column :message_id, :string, :limit => 64
5 t.column :type, :string, :limit => 32
6 t.column :message_type, :string, :default => 'Say', :limit => 32
7 t.column :reply_message_id, :string, :limit => 64
8 t.column :username, :string, :limit => 16
9 t.column :channel_name, :string, :limit => 32
10 t.column :message_body, :string, :limit => 140
11 t.column :favorite, :boolean, :default => false
12 t.column :sent_at, :datetime, :default => Time.now.utc
13 t.column :created_at, :datetime
14 t.column :deleted_at, :datetime
15 end
16 add_index :messages, :message_id
17 add_index :messages, :username
18 end
19
20 def self.down
21 drop_table :messages
22 end
23 end
Users
1 class CreateUsers < ActiveRecord::Migration
2 def self.up
3 create_table :users do |t|
4 t.column :username, :string, :limit => 16
5 t.column :password, :string, :limit => 16
6 t.column :name, :string, :limit => 64
7 t.column :email, :string, :limit => 64
8 t.column :time_zone_id, :string, :limit => 32
9 t.column :created_at, :datetime
10 t.column :updated_at, :datetime
11 t.column :deleted_at, :datetime
12 end
13 add_index :users, :username
14 end
15
16 def self.down
17 drop_table :users
18 end
19 end
Persistence
I used Rails’ ActiveRecord library to provide persistence, though alternatively I could have used ActiveHibernate. These libraries provide a quick way to add persistence capabilities with minimum configuration and boilerplate. This prototype is using multiple levels of partitioning, first at the service level, second at the persistence level. I am using multiple databases of Derby to store objects using a simple hashing scheme for load distribution. This prototype also shows how to connect to multiple databases in Rails, which was difficult in early implementation of Twitter.
Domain Services
The core model and services use domain driven design and applies principles of fat model and thin service (as opposed to fat servicess and anemic model). The domain services implement external REST APIs and use underlying ActiveRecord for most of the functionality.
Messaging Middleware
The REST based web services don’t invoke domain services directly, instead they use messaging middleware. In real application, I might use ESB/integration patterns such as intelligent routing to partition the system across multiple machines and send the request to the suitable queue. In this prototype, I am simply using ActiveMQ, which is fairly robust and easy to use. I am also using separate queues for different kind of operations. Another lesson I have learned in building large systems is to separate reads from the writes so that you can scale them independently and also offer different quality of services, e.g. read queues can be non-persistent, but write queues can be persistent.
Business Delegate
The REST based web services don’t interact with the messaging middleware directly, instead they use business delegates that hides all details of sending out message, creating temporary queues and receiving messages. The interface of business delegates is same as services.
Benchmark Results
Though, performance was not the objective of my prototype, but I tried to check how many requests I can process on my development machine. I chose only to benchmark messaging middlewares and not REST server because JVM uses native threads and web containers such as Tomcat uses a small sized thread pool to perform requests. Since, our architecture is heavily IO-bound, that would not scale. Alternatively, I could have build reactive or event based APIs for HTTP or use Yaws/Mochiweb as a container for REST based web sevices because creating a process in Erlang is pretty cheap. For example, Erlang process takes 300 bytes, whereas Java thread take 2M by default (though, it can be reduced to 256K on most machines). Here are results of running a simple server with embedded ActiveMQ and load test both running JRuby on my Pentium Linux machine. I used default VM size for both JRuby processes and didn’t tune any options:
What |
Elapsed Time (secs) |
Throughput |
Invocation Times |
load_test_create_users |
99.041000 |
10.09682857311318/s |
1000 |
load_test_bad_authenticate |
79.407000 |
12.593348183199506/s |
1000 |
load_test_good_authenticate |
91.802000 |
10.893008861477503/s |
1000 |
load_test_get_users |
91.362000 |
10.945469671474584/s |
1000 |
load_test_create_followers |
143.560000 |
6.958714408811979/s |
999 |
load_test_follower_exists |
95.140000 |
10.50020495537709/s |
999 |
load_test_get_followers |
99.881000 |
10.002002391423325/s |
999 |
load_test_get_followings |
104.737000 |
9.538176576655392/s |
999 |
load_test_block_follower |
156.106000 |
6.399497779340769/s |
999 |
load_test_unblock_follower |
161.950000 |
6.168532449211547/s |
999 |
load_test_follower_enable_notifications |
157.637000 |
6.337344665142635/s |
999 |
load_test_follower_disable_notifications |
161.714000 |
6.1775727524053545/s |
999 |
load_test_send_direct_message |
1673.494000 |
5.969546350480188/s |
9990 |
load_test_messages_sent_receive |
157.832000 |
6.323179075798668/s |
998 |
load_test_send_direct_message |
1725.717000 |
5.788898179687536/s |
9990 |
load_test_send_replies |
21.546000 |
5.105592926919201/s |
110 |
load_test_get_user_status |
86.952000 |
11.500598043963544/s |
1000 |
load_test_update_status |
213.298000 |
4.688276498896948/s |
1000 |
load_test_get_user_status |
88.987000 |
11.23759650430517/s |
1000 |
load_test_archive_messages |
80.982000 |
12.348575563593377/s |
1000 |
load_test_public_timeline |
98.143000 |
10.189213685309506/s |
1000 |
load_test_user_timeline |
92.871000 |
10.767623902461311/s |
1000 |
load_test_friends_timeline |
140.155000 |
7.1349577268319555/s |
1000 |
load_test_user_and_friends_timeline |
193.257000 |
5.174430032905596/s |
1000 |
I was not impressed with the results I got and I may implement similar prototype in Erlang using MochiWeb, Mnesia and Ejabber.
Summary
Though, the technologies I choose are somewhat arbitrary, the same design principles can be applied to other technology stack. Though, technologies are generaly selected due to political reasons or personal preference, it is important to consider team’s familiarity with the technology stack. For example, Twitter tried Ejabber but had problems troubleshooting becaues they were not familiar with Erlang.
Again, I used architecture principles such as stateless services, though in real life I may have to store some state which can be stored in the shareded database. I used embedded services such as embedded database server and messaging server because you can then easily start a single process on a machine and replicate machines as the load increases. I used partitioning/sharding heavily, which is key for scalability. I also used replication specially replicating messages for each follower so that the reads are fast and we don’t spend a lot of time querying the database. Though, it does add cost on the write side and adds disk requirements. I think we can control those by limiting number of messages per user per minute and maximum number of notifications. Also, we can remove old messages from the database. I also used principle of bringing computating closer to data by using embedded database.
Other Improvements
I tried to show that scalability problems are best solved from the architecture that is independent of a particular technology or language. In fact, I may implement similar design in Erlang using Mnesia as the database, Ejabber as the messaging middleware, Mochiweb for REST and OTP for implementing services. Other improvements that I would suggest be use of reverse proxy server for caching along with DMZ for security. We can also add some throttling to the reverse proxy servers. In order to have better fault tolerance, I might have multiple reverse proxy servers and use DNS round robin, though I don’t like DNS based load balance for real load distribution because it does not take server’s capacity and load into account, but it would suitable in this case. Also, I didn’t implement any caching, but we can use caching solutions such as EHCache, Tangosol, Terracotta, Memcache, etc. Though, caching highly dynamic contents may be difficult and less reusable. Also, I didn’t use any ESB (lightweight providers such as Mule, ServiceMix), but it can be used to abstract routing to the services, load balance, aggregation, replication, transformation, etc. Also, we can build support for versioning and hot code swapping by adding version number to each message and changing routing schemes. This prototype uses a simple scheme for partitioning by creating hash of business keys such as username, however it is difficult to manage when you need to add or remove servers because it requires migrating data. Another solution is to use MD5 scheme that we use at Amazon for S3, which may calculate MD5 of username and each replicated queue and select the queue whose md5-sum is higher than the username’s sum. For further fault tolerance, we could replicate data in Master/Slave fashion.
PS: I noticed “NativeException: javax.jms.JMSException: PermGen space” a couple of times running long load test, which is generally caused by reloading classes (as I have observed this for many years in Weblogic and Tomcat). I probably need to investigate this further but my guess is that JRuby is doing something dumb.
Download
Source Source Code
Appendix
Source Code Model
./model/user.rb
1 ###
2 # User of the system.
3 # Though, in our system user can have many messages that it sent to update
4 # his/her status or other users and can also have followers and followings, but
5 # we are not showing those relationships because we cannot populate them due to
6 # sharding and scalability concerns.
7
8
9 ###
10 class User < ActiveRecord::Base
11 validates_presence_of :name
12 validates_presence_of :username, :within => 3..16
13 validates_presence_of :password, :within => 3..16
14 validates_length_of :email, :within => 3..64
15 validates_uniqueness_of :email, :case_sensitive => false
16 validates_format_of :email, :with => /^([^@\s]+)@((?:[-a-z0-9]+\.)+[a-z]{2,})$/i
17 #
18 ### all messages for the same user wil be stored in the same database.
19 ### also, all messages from the users he/she is following will be copied to the user's database
20 ### if they lie in different databases.
21 ### limiting rows returned from these relationships to 100 so that we don't overwhelm server.
22 #
23 has_many :statuses, :class_name => 'Message', :foreign_key => :username, :limit => 100, :order => 'sent_at desc'
24 has_many :tweets, :class_name => 'Message', :foreign_key => :channel_name, :limit => 100, :order => 'sent_at desc'
25
26 #
27 ### We will replicate follower/following to each database and though we may not have corresponding users,
28 ### but we can have usernames
29 #
30 def self.follower_usernames(username, offset, limit)
31 Follower.followers(username, offset, limit).map {|f| f.follower_username }
32 end
33
34 def self.following_usernames(username, offset, limit)
35 Follower.followings(username, offset, limit).map {|f| f.username }
36 end
37
38 def self.follower_count(username)
39 Follower.followers_count(username)
40 end
41 def self.following_count(username)
42 Follower.followers_count(username)
43 end
44
45 #
46 ### user's timeline
47 #
48 def self.timeline(username, offset, limit)
49 Message.find(:all, :conditions => ['username = ?', username], :offset => offset, :limit => limit)
50 end
51
52 def to_s
53 "#{username}"
54 end
55
56 def to_external_xml(xml)
57 xml.user(:username => self.username, :name => self.name, :email => self.email)
58 end
59 end
./model/message.rb
1 #
2 ###
3 # Message stores any message sent by user to update his/her status, tweets
4 # from other users that he/she is following, direct message sent to another
5 # user or reply message sent in response to tweet or direct message.
6 # Note that we use a GUID based message_id in conjunction with the database column 'id'
7 # because we are using sharding the database column can be duplicate, but the GUID
8 # based message_id will always be unique across all databases.
9 ###
10 class Message < ActiveRecord::Base
11 MESSAGE_TYPE_SAY = "Say"
12 MESSAGE_TYPE_SHOUT = "Shout"
13 #
14 validates_presence_of :message_id, :within => 1..64
15 validates_presence_of :username
16 #
17 ### channel_name will be username of the target user for now,
18 ### though, in future it could be a group or a topic
19 #
20 validates_presence_of :channel_name, :within => 1..32
21 validates_presence_of :message_body, :within => 1..140
22 validates_presence_of :message_type, :within => 1..32
23 validates_presence_of :sent_at
24 validates_uniqueness_of :message_id
25 #
26 ### all messages for the same user wil be stored in the same database.
27 ### also, all messages from the users he/she is following will be copied to the user's database
28 ### if they lie in different databases.
29 #
30 belongs_to :user, :class_name => 'User', :foreign_key => :username
31
32 def self.messages_for(username, offset, limit)
33 Message.find(:all, :conditions => ['username = ?', username], :offset => offset, :limit => limit, :order => 'sent_at desc')
34 end
35
36 def self.destroy_message(id)
37 message = Message.find_by_message_id(id)
38 if message
39 message.destroy
40 message
41 else
42 nil
43 end
44 end
45
46
47 def to_s
48 "#{username} -> #{channel_name}: #{message_body}"
49 end
50
51 def to_external_xml(xml)
52 xml.message(message_body, :message_id => self.message_id, :message_type => self.message_type, :from => self.username, :to => self.channel_name, :sent_at => sent_at.httpdate)
53 end
54 end
./model/reply_message.rb
1 ###
2 # Message sent in response to another tweet, message or direct message.
3 #
4 # commented below because message could be in different database.
5 # belongs_to :reply_message_id, :class_name => Message, :dependent => :destroy
6 ###
7
8 require 'direct_message'
9
10 class ReplyMessage < DirectMessage
11 validates_presence_of :reply_message_id, :within => 1..64
12 def self.replies(username, offset, limit)
13 ReplyMessage.find(:all, :conditions => ['username = ?', username], :offset => offset, :limit => limit, :order => 'sent_at desc')
14 end
15 end
./model/guid_generator.rb
1 class GuidGenerator
2 @@count = 1
3 def self.guid(prefix)
4 @@count += 1
5 "#{prefix}#{@@count}#{Time.now.to_i}"
6 end
7 def self.next_message_id
8 guid("message_id")
9 end
10 def self.next_request_id
11 guid("request_id")
12 end
13 end
./model/follower.rb
1 #
2 ####
3 # Follower stores one-way relation between two users. However, in order to
4 # support sharding, we don't directly connect users instead we only store
5 # users' usernames.
6 ###
7 class Follower < ActiveRecord::Base
8 RELATION_TYPE_FOLLOWER = "Follower"
9 RELATION_TYPE_FRIEND = "Friend"
10 RELATION_TYPE_FAMILY = "Family"
11 RELATION_TYPE_COLLEAGUE = "Colleague"
12 RELATION_TYPE_ACQUAINTENCE = "Acquaintance"
13 RELATION_TYPE_FAN = "Fan"
14 RELATION_TYPE_OTHER = "Other"
15
16 validates_presence_of :relation_type
17 #validates_presence_of :blocked
18 #validates_presence_of :notifications
19
20 def self.create_follower(follower_attrs)
21 Follower.create!(follower_attrs) unless exists?(follower_attrs[:username], follower_attrs[:follower_username])
22 end
23
24 def self.followers(username, offset, limit)
25 self.find(:all, :conditions => ['username = ?', username], :offset => offset, :limit => limit)
26 end
27 def self.follower_count(username)
28 self.count(:conditions => ['username = ?', username])
29 end
30 def self.following_count(username)
31 self.count(:conditions => ['follower_username = ?', username])
32 end
33
34 def self.followings(username, offset, limit)
35 self.find(:all, :conditions => ['follower_username = ?', username], :offset => offset, :limit => limit)
36 end
37 def self.destroy_follower(username, follower_username)
38 self.get_record(username, follower_username).destroy
39 end
40 def self.exists?(username, follower_username)
41 !self.get_record(username, follower_username).nil?
42 end
43 def self.enable_notifications(username, follower_username)
44 self.get_record(username, follower_username).update_attributes(:notifications => true)
45 end
46 def self.disable_notifications(username, follower_username)
47 self.get_record(username, follower_username).update_attributes(:notifications => false)
48 end
49 def self.block(username, follower_username)
50 self.get_record(username, follower_username).update_attributes(:blocked => true)
51 end
52 def self.unblock(username, follower_username)
53 self.get_record(username, follower_username).update_attributes(:blocked => false)
54 end
55 def self.get_record(username, follower_username)
56 self.find(:first, :conditions => ['username = ? and follower_username = ?', username, follower_username])
57 end
58 def to_s
59 "#{username} -> #{follower_username}"
60 end
61 def to_external_xml(xml)
62 xml.follower(:username => self.username, :follower_username => self.follower_username)
63 end
64
65 end
./model/direct_message.rb
1 ####
2 # Direct message is sent directly to another user
3 ####
4 class DirectMessage < Message
5 def to_user(user)
6 self.channel_name = user.username
7 end
8 def self.direct_messages_sent(username, offset, limit)
9 DirectMessage.find(:all, :conditions => ['username = ?', username], :offset => offset, :limit => limit, :order => 'sent_at desc')
10 end
11 def self.direct_messages_received(username, offset, limit)
12 DirectMessage.find(:all, :conditions => ['channel_name = ?', username], :offset => offset, :limit => limit, :order => 'sent_at desc')
13 end
14 end
Source Code Services
./service/base_service.rb
1 class BaseService
2 protected
3 def initialize(username)
4 SchemaHelper.setup_schema_for(username)
5
6 end
7
8 def serialize(obj, format)
9 if obj
10 format ||= 'xml'
11 case format
12 when 'xml'
13 obj.to_xml
14 when 'yaml'
15 obj.to_yaml
16 when 'json'
17 obj.to_json
18 else
19 obj
20 end
21 end
22 end
23 end
./service/users_service.rb
1 #
2 ## UserService provides basic functionality for quering/storing users
3 ## It assumes connection to the right database is already setup.
4 #
5 class UsersService < BaseService
6 #
7 ### initialize service
8 #
9 def initialize(username)
10 super
11 end
12
13
14 #
15 ### retrieves user record
16 #
17 def get_user(username, options={})
18 User.find_by_username(username)
19 end
20
21 #
22 ### return usernames of people whom the user is following.
23 #
24 def followings(username, options={})
25 offset = options[:offset] || 0
26 limit = options[:limit] || 100
27 User.following_usernames(username, offset, limit)
28 end
29
30
31 #
32 ### return usernames of followers of authenticated user
33 #
34 def followers(username, options={})
35 offset = options[:offset] || 0
36 limit = options[:limit] || 100
37 User.follower_usernames(username, offset, limit)
38 end
39
40
41 def authenticate(username, password, options={})
42 User.find_by_username_and_password(username, password)
43 end
44
45
46 def create_user(user_attrs, options={})
47 user = User.new()
48 user.attributes = user_attrs
49 user.save!
50 user
51 end
52
53 def archive_messages(username, options={})
54 offset = options[:offset] || 0
55 limit = options[:limit] || 100
56 Message.messages_for(username, offset, limit)
57 end
58 end
./service/followers_service.rb
1 #
2 ## FollowersService provides basic functionality for creating/quering followers
3 ## It assumes all users are already authenticated before calling these methods.
4 #
5 class FollowersService < BaseService
6 #
7 ### initialize service
8 #
9 def initialize(username)
10 super
11 end
12 #
13 ### create follower
14 #
15 def create_follower(follower_attrs, options={})
16 Follower.create_follower(follower_attrs)
17 end
18
19 #
20 ### enable notifications
21 #
22 def enable_notifications(username, follower_username, options={})
23 Follower.enable_notifications(username, follower_username)
24 end
25
26 #
27 ### disable notifications
28 #
29 def disable_notifications(username, follower_username, options={})
30 Follower.disable_notifications(username, follower_username)
31 end
32
33
34 #
35 ### blocks a user
36 #
37 def block(username, follower_username, options={})
38 Follower.block(username, follower_username)
39 end
40
41 #
42 ### unblock user
43 #
44 def unblock(username, follower_username, options={})
45 Follower.unblock(username, follower_username)
46 end
47
48
49 #
50 ### delete status
51 #
52 def destroy_follower(username, follower_username, options={})
53 Follower.destroy_follower(username, follower_username)
54 true
55 rescue ActiveRecord::RecordNotFound => e
56 false
57 end
58
59
60 #
61 ### get a single status
62 #
63 def exists?(username, follower_username, options={})
64 Follower.exists?(username, follower_username)
65 end
66 end
./service/messages_service.rb
1 #
2 ## MessagesService provides basic functionality for quering/storing statuses/messages
3 ## It assumes all users are already authenticated before calling these methods.
4 #
5 class MessagesService < BaseService
6 #
7 ### initialize service
8 #
9 def initialize(username)
10 super
11 end
12
13
14 #
15 ### get a single status
16 #
17 def get_status(message_id, options={})
18 Message.find_by_message_id(message_id)
19 end
20
21 #
22 ### update status
23 #
24 def update_status(message_attrs, options={})
25 message = Tweet.new
26 message.attributes = message_attrs
27 safe_save(message)
28 end
29
30 #
31 ### delete status
32 #
33 def destroy_message(message_id, options={})
34 Message.destroy_message(message_id)
35 end
36
37
38
39 #
40 ### retrieves most recent statuses for public
41 #
42 def public_timeline(username, options={})
43 offset = options[:offset] || 0
44 limit = options[:limit] || 10
45 User.timeline(username, offset, limit)
46 end
47
48 #
49 ### retrieves most recent statuses for user
50 #
51 def user_timeline(username, options={})
52 offset = options[:offset] || 0
53 limit = options[:limit] || 100
54 User.timeline(username, offset, limit)
55 end
56
57
58 #
59 ### retrieves most recent statuses for friends
60 #
61 def friends_timeline(username, options={})
62 offset = options[:offset]
63 limit = options[:limit]
64 get_friends_timeline(username, offset, limit)
65 end
66
67
68 #
69 ### retrieves most recent statuses for user and his/her friends
70 #
71 def user_and_friends_timeline(username, options={})
72 offset = options[:offset]
73 limit = options[:limit]
74 messages = User.timeline(username, offset, limit)
75 messages += get_friends_timeline(username, offset, limit)
76 end
77
78 private
79
80 #
81 ### retrieves most recent statuses for friends without any conversion
82 #
83 def get_friends_timeline(username, offset, limit)
84 offset = offset || 0
85 following_count = Follower.following_count(username)
86 limit = limit || (following_count < 100 ? following_count / 100 + 1 : 1)
87
88 following = User.following_usernames(username, 0, 200)
89 messages = []
90 following.each do |username|
91 messages += Message.messages_for(username, offset, limit)
92 end
93 messages
94 end
95 protected
96 def safe_save(message)
97 old_message = Message.find_by_message_id(message.message_id)
98 if old_message.nil?
99 message.save!
100 message
101 else
102 puts "Message with id #{message.message_id} already exists #{message.inspect}"
103 nil
104 end
105 end
106 end
./service/direct_messages_service.rb
1 #
2 ## DirectMessagesService provides functionality for sending/receiving direct messages
3 #
4 class DirectMessagesService < MessagesService
5 #
6 ### initialize service
7 #
8 def initialize(username)
9 super
10 end
11 #
12 ### sents back 20 most recent direct messages for the user
13 ### it assumes user is already authenticated.
14 #
15 def direct_messages_received(username, options={})
16 offset = options[:offset] || 0
17 limit = options[:limit] || 20
18 DirectMessage.direct_messages_received(username, offset, limit)
19 end
20 #
21 ### sents back 20 most recent direct messages for the user
22 ### it assumes user is already authenticated.
23 #
24 def direct_messages_sent(username, options={})
25 offset = options[:offset] || 0
26 limit = options[:limit] || 20
27 DirectMessage.direct_messages_sent(username, offset, limit)
28 end
29
30 #
31 ### send direct message
32 #
33 def send_direct_message(message_attrs, options={})
34 message = DirectMessage.new()
35 message.attributes = message_attrs
36 safe_save(message)
37 end
38
39 #
40 ### send reply message
41 #
42 def send_reply(message_attrs, options={})
43 message = ReplyMessage.new()
44 message.attributes = message_attrs
45 safe_save(message)
46 end
47
48 #
49 ### sents back 20 most recent replies for the user
50 ### it assumes user is already authenticated.
51 #
52 def replies(username, options={})
53 offset = options[:offset] || 0
54 limit = options[:limit] || 100
55 ReplyMessage.replies(username, offset, limit)
56 end
57
58
59 #
60 ### delete direct message
61 #
62 def destroy_direct_message(id, options={})
63 DirectMessage.destroy_message(id)
64 end
65 end
Source Code Business Delegate
./delegate/base_delegate.rb
1 #
2 ## BaseDelegate provides client side interface for connecting to the business
3 ## service using BusinessDelegate pattern.
4 #
5 class BaseDelegate
6 protected
7 def initialize(jms_helper, read_queue_name, write_queue_name)
8 @read_queue_name = read_queue_name
9 @write_queue_name = write_queue_name
10 @jms_helper = jms_helper
11 @jms_helper.create_producers(@read_queue_name, @write_queue_name)
12 end
13
14
15 def new_jms_message(properties={})
16 jms_msg = @jms_helper.create_message("") ### ActiveMQTextMessage.new
17 jms_msg.setStringProperty('request_id', GuidGenerator.next_request_id)
18 properties.each do |name, value|
19 jms_msg.setStringProperty(name.to_s, value.to_s)
20 end
21 jms_msg
22 end
23
24
25 def send_read_request(jms_msg)
26 @jms_helper.send_message(@read_queue_name, jms_msg, true).text
27 end
28
29
30 def send_write_request(jms_msg, reply=true)
31 response = @jms_helper.send_message(@write_queue_name, jms_msg, reply)
32 if response.respond_to? :text
33 response.text
34 else
35 response
36 end
37 end
38 end
./delegate/users_delegate.rb
1 #
2 ## UserDelegate provides basic functionality for quering/storing users
3 ## It assumes connection to the right database is already setup.
4 #
5 class UsersDelegate < BaseDelegate
6 def initialize(jms_helper)
7 super(jms_helper, JmsHelper::QUEUE_READ_USERS, JmsHelper::QUEUE_WRITE_USERS)
8 end
9 def create_user(user_attrs, options={})
10 args = options.merge(user_attrs).merge(:action => 'Users.create_user')
11 jms_msg = new_jms_message(args)
12 send_write_request(jms_msg)
13
14 end
15
16 #
17 ### retrieves user record
18 #
19 def get_user(username, options={})
20 jms_msg = new_jms_message(options.merge(:username => username, :action => 'Users.get_user'))
21 send_read_request(jms_msg)
22
23 end
24
25 #
26 ### return usernames of people the user is following
27 #
28 def followings(username, options={})
29 jms_msg = new_jms_message(options.merge(:username => username, :action => 'Users.followings'))
30 send_read_request(jms_msg)
31
32 end
33
34
35 #
36 ### return 100 usernames of followers of authenticated user
37 #
38 def followers(username, options={})
39 jms_msg = new_jms_message(options.merge(:username => username, :action => 'Users.followers'))
40 send_read_request(jms_msg)
41
42 end
43
44
45 def authenticate(username, password, options={})
46 jms_msg = new_jms_message(options.merge(:username => username, :password => password, :action => 'Users.authenticate'))
47 send_read_request(jms_msg)
48
49 end
50
51
52 def archive_messages(username, options={})
53 jms_msg = new_jms_message(options.merge(:username => username, :action => 'Users.archive_messages'))
54 send_read_request(jms_msg)
55
56 end
57 end
./delegate/messages_delegate.rb
1 #
2 ## MessagesDelegate provides basic functionality for quering/storing statuses/messages
3 ## It assumes all users are already authenticated before calling these methods.
4 #
5 class MessagesDelegate < BaseDelegate
6 def initialize(jms_helper)
7 super(jms_helper, JmsHelper::QUEUE_READ_MESSAGES, JmsHelper::QUEUE_WRITE_MESSAGES)
8 end
9 #
10 ### get a single status
11 #
12 def get_status(message_id, options={})
13 jms_msg = new_jms_message(options.merge(:message_id => message_id, :action => 'Messages.get_status'))
14 send_read_request(jms_msg)
15
16 end
17
18 #
19 ### update status
20 #
21 def update_status(message_attrs, options={})
22 jms_msg = new_jms_message({:message_id => GuidGenerator.next_message_id, :action => 'Messages.update_status'}.merge(options).merge(message_attrs))
23 send_write_request(jms_msg)
24
25 end
26
27 #
28 ### delete status
29 #
30 def destroy_message(message_id, options={})
31 jms_msg = new_jms_message(options.merge(:message_id => message_id, :action => 'Messages.destroy_message'))
32 send_write_request(jms_msg)
33
34 end
35
36
37
38 #
39 ### retrieves most recent statuses for public
40 #
41 def public_timeline(username, options={})
42 jms_msg = new_jms_message(options.merge(:username => username, :action => 'Messages.public_timeline'))
43 send_read_request(jms_msg)
44
45 end
46
47 #
48 ### retrieves most recent statuses for user
49 #
50 def user_timeline(username, options={})
51 jms_msg = new_jms_message(options.merge(:username => username, :action => 'Messages.user_timeline'))
52 send_read_request(jms_msg)
53
54 end
55
56
57 #
58 ### retrieves most recent statuses for friends
59 #
60 def friends_timeline(username, options={})
61 jms_msg = new_jms_message(options.merge(:username => username, :action => 'Messages.friends_timeline'))
62 send_read_request(jms_msg)
63
64 end
65
66
67 #
68 ### retrieves most recent statuses for user and his/her friends
69 #
70 def user_and_friends_timeline(username, options={})
71 jms_msg = new_jms_message(options.merge(:username => username, :action => 'Messages.user_and_friends_timeline'))
72 send_read_request(jms_msg)
73
74 end
75 end
./delegate/followers_delegate.rb
1 #
2 ## FollowersDelegate provides basic functionality for creating/quering followers
3 ## It assumes all users are already authenticated before calling these methods.
4 #
5 class FollowersDelegate < BaseDelegate
6 def initialize(jms_helper)
7 super(jms_helper, JmsHelper::QUEUE_READ_FOLLOWERS, JmsHelper::QUEUE_WRITE_FOLLOWERS)
8 end
9 #
10 ### create follower
11 #
12 def create_follower(follower_attrs, options={})
13 jms_msg = new_jms_message(options.merge(follower_attrs).merge(:action => 'Followers.create_follower'))
14 send_write_request(jms_msg)
15
16 end
17
18 #
19 ### enable notifications
20 #
21 def enable_notifications(username, follower_username, options={})
22 jms_msg = new_jms_message(options.merge(:username => username, :follower_username => follower_username, :action => 'Followers.enable_notifications'))
23 send_write_request(jms_msg)
24
25 end
26
27 #
28 ### disable notifications
29 #
30 def disable_notifications(username, follower_username, options={})
31 jms_msg = new_jms_message(options.merge(:username => username, :follower_username => follower_username, :action => 'Followers.disable_notifications'))
32 send_write_request(jms_msg)
33
34 end
35
36
37 #
38 ### blocks a user
39 #
40 def block(username, follower_username, options={})
41 jms_msg = new_jms_message(options.merge(:username => username, :follower_username => follower_username, :action => 'Followers.block'))
42 send_write_request(jms_msg)
43
44 end
45
46 #
47 ### unblock user
48 #
49 def unblock(username, follower_username, options={})
50 jms_msg = new_jms_message(options.merge(:username => username, :follower_username => follower_username, :action => 'Followers.unblock'))
51 send_write_request(jms_msg)
52
53 end
54
55
56 #
57 ### delete status
58 #
59 def destroy_follower(username, follower_username, options={})
60 jms_msg = new_jms_message(options.merge(:username => username, :follower_username => follower_username, :action => 'Followers.destroy_follower'))
61 send_write_request(jms_msg)
62
63 end
64
65
66 #
67 ### get a single status
68 #
69 def exists?(username, follower_username, options={})
70 jms_msg = new_jms_message(options.merge(:username => username, :follower_username => follower_username, :action => 'Followers.exists?'))
71 send_read_request(jms_msg)
72
73 end
74 end
./delegate/direct_messages_delegate.rb
1 #
2 ## DirectMessagesDelegate provides functionality for sending/receiving direct messages
3 #
4 class DirectMessagesDelegate < MessagesDelegate
5 def initialize(jms_helper)
6 super(jms_helper)
7 end
8
9 #
10 ### sents back 20 most recent direct messages for the user
11 ### it assumes user is already authenticated.
12 #
13 def direct_messages_received(username, options={})
14 jms_msg = new_jms_message(options.merge(:username => username, :action => 'DirectMessages.direct_messages_received'))
15 send_read_request(jms_msg)
16
17 end
18 #
19 ### sents back 20 most recent direct messages for the user
20 ### it assumes user is already authenticated.
21 #
22 def direct_messages_sent(username, options={})
23 jms_msg = new_jms_message(options.merge(:username => username, :action => 'DirectMessages.direct_messages_sent'))
24 send_read_request(jms_msg)
25
26 end
27
28 #
29 ### send direct message
30 #
31 def send_direct_message(message_attrs, options={})
32 jms_msg = new_jms_message({:message_id => GuidGenerator.next_message_id, :action => 'DirectMessages.send_direct_message'}.merge(options).merge(message_attrs))
33 send_write_request(jms_msg)
34
35 end
36
37 #
38 ### send reply message
39 #
40 def send_reply(message_attrs, options={})
41 jms_msg = new_jms_message({:message_id => GuidGenerator.next_message_id, :action => 'DirectMessages.send_reply'}.merge(options).merge(message_attrs))
42 send_write_request(jms_msg)
43
44 end
45
46 #
47 ### sents back 20 most recent replies for the user
48 ### it assumes user is already authenticated.
49 #
50 def replies(username, options={})
51 jms_msg = new_jms_message(options.merge(:username => username, :action => 'DirectMessages.replies'))
52 send_read_request(jms_msg)
53
54 end
55
56
57 #
58 ### delete direct message
59 #
60 def destroy_direct_message(message_id, options={})
61 jms_msg = new_jms_message(options.merge(:message_id => message_id, :action => 'DirectMessages.destroy_direct_message'))
62 send_write_request(jms_msg)
63
64 end
65 end
Source Code Message Handlers
./messaging/base_handler.rb
1 #
2 ## BaseHandler provides common functionality for handling messages
3 #
4 class BaseHandler
5 attr_reader :username
6 attr_reader :message_text
7 attr_reader :timings
8 def initialize
9 @timings = []
10 @started_at = Time.now
11 @host_id = Digest::MD5.hexdigest("localhost") #lookup real host
12 end
13
14 def handle(jms_msg, options={})
15 add_timing_begin
16 msg_attrs = {}
17 pnames = jms_msg.getPropertyNames()
18 for pname in pnames
19 msg_attrs[pname] = msg_attrs[pname.to_sym] = jms_msg.getObjectProperty(pname)
20 case pname
21 when 'username'
22 @username = msg_attrs[pname]
23 when 'auth_user'
24 @auth_user = msg_attrs[pname]
25 when 'request_id'
26 @request_id = msg_attrs[pname]
27 end
28 end
29 if @username.nil?
30 raise "username missing in #{jms_msg.inspect} options #{options.inspect}"
31 end
32 if jms_msg.respond_to? :text
33 #msg_attrs.merge!(xml_to_dict(jms_msg.text))
34 msg_attrs[:message_text] = jms_msg.text
35 end
36 ##
37 add_timing_for_input_xml
38 response = do_handle(msg_attrs)
39 ##
40 add_timing_end
41 response
42 rescue ActiveRecord::RecordInvalid => invalid
43 response = xml_error("400", "handle", invalid.record.errors)
44 add_timing_end
45 response
46 rescue java.sql.SQLException => e
47 puts "Failed to handle #{self.class.name}(#{jms_msg} due to #{e.inspect}\n\n"
48 while (e)
49 puts e.backtrace << "\n\n"
50 e = e.getNextException()
51 end
52 response = xml_error("500", "handle", {"UNKNOWN_ERROR" => e.to_s})
53 add_timing_end
54 response
55 rescue Exception => e
56 puts "Failed to handle #{self.class.name}(#{jms_msg} due to #{e.inspect}\n\n#{e.backtrace}"
57 response = xml_error("500", "handle", {"UNKNOWN_ERROR" => e.to_s})
58 add_timing_end
59 response
60 end
61
62 protected
63 def username_for_service
64 @auth_user || @username
65 end
66
67 def add_timing(what)
68 @timings << {what, (Time.now.to_i-@started_at.to_i)}
69 end
70 def add_timing_begin
71 add_timing "#{self.class.name} begin"
72 end
73 def add_timing_end
74 add_timing "#{self.class.name} end"
75 end
76 def add_timing_for_db
77 add_timing "#{self.class.name} connected to db"
78 end
79 def add_timing_for_output_xml
80 add_timing "#{self.class.name} converting output xml"
81 end
82 def add_timing_for_input_xml
83 add_timing "#{self.class.name} converting input xml"
84 end
85
86 def do_handle(msg_attrs, options={})
87 raise "Implement handle"
88 end
89 def xml_to_dict(xml)
90 Hash.from_xml(xml)
91 end
92 def xml_error(response_code, action, errors, options={})
93 #logger.error "Failed to #{action} due to #{error_code} by #{username} -- #{error_message}"
94 to_xml(response_code, "Error response for #{action}", options) do |xml|
95 xml.errors do
96 errors.each do |error_code, error_message|
97 xml.error(:error_code => error_code, :error_message => error_message)
98 end
99 end
100 end
101 end
102
103 def to_xml(response_code, comment="", options={})
104 buffer = options[:buffer] ||= ''
105
106 xml = options[:builder] ||= Builder::XmlMarkup.new(:target => buffer, :indent => options[:indent])
107 xml.instruct! unless options[:skip_instruct]
108 xml.comment! "Response #{comment} in reply for #{username} as of #{Time.now.utc}"
109 xml.response(:request_id => @request_id, :response_code => response_code, :host_id => @host_id, :version => '1.0') do
110 yield xml if block_given?
111 xml.timings do
112 @timings.each do |what, duration|
113 xml.timing(:what => what, :duration_millis => duration)
114 end
115 end
116 end
117 buffer
118 end
119 end
./messaging/archive_messages_handler.rb
1 #
2 ## ArchiveMessagesHandler sends back archive messages for the user
3 #
4 class ArchiveMessagesHandler < BaseHandler
5 protected
6 def do_handle(msg_attrs, options={})
7 svc = UsersService.new(self.username_for_service)
8 add_timing_for_db
9 messages = svc.archive_messages(self.username, options)
10 add_timing_for_output_xml
11 to_xml("200", "Archived Messages", options) do |xml|
12 xml.messages do
13 messages.each do |message|
14 message.to_external_xml(xml)
15 end
16 end
17 end
18 end
19 end
./messaging/authenticate_handler.rb
1 ## AuthenticateHandler authenticates user
2 #
3 class AuthenticateHandler < BaseHandler
4 protected
5 def do_handle(msg_attrs, options={})
6 password = msg_attrs[:password]
7 svc = UsersService.new(self.username_for_service)
8 add_timing_for_db
9 user = svc.authenticate(username, password, options)
10 add_timing_for_output_xml
11 if user
12 to_xml("200", "Authentication", options) do |xml|
13 user.to_external_xml(xml)
14 end
15 else
16 xml_error("401", "Authentication", {"AUTH_FAILURE" => "Invalid username/password"})
17 end
18 end
19 end
./messaging/block_follower_handler.rb
1 ## BlockFollowerHandler blocks follower
2 #
3 class BlockFollowerHandler < BaseHandler
4 protected
5 def do_handle(msg_attrs, options={})
6 svc = FollowersService.new(self.username_for_service)
7 follower_username = msg_attrs[:follower_username]
8 add_timing_for_db
9 blocked = svc.block(self.username, follower_username, options)
10 response_code = blocked ? "200" : "404"
11 add_timing_for_output_xml
12 to_xml(response_code, "Block Follower", options) do |xml|
13 xml.follower(:username => self.username, :follower_username => follower_username)
14 end
15 end
16 end
Messaging Helpers
./messaging/message_forwarder.rb
1 class MessageForwarder
2 include java.lang.Runnable
3 include javax.jms.MessageListener
4
5 def initialize(jms_helper, queue_name, handlers)
6 @jms_helper = jms_helper
7 @queue_name = queue_name
8 @handlers = handlers
9 end
10
11 def run
12 @consumer = @jms_helper.get_consumer(@queue_name)
13 @consumer.set_message_listener(self);
14 puts "Starting listener for queue #{@queue_name}"
15 end
16
17 def onMessage(jms_msg)
18 action = jms_msg.getProperty('action')
19 if action.nil?
20 puts "Unknown message #{jms_msg.inspect}"
21 else
22 handler = @handlers[action]
23 if handler.nil?
24 puts "No handler for #{action} -- #{jms_msg.inspect}"
25 else
26
27 response = handler.new().handle(jms_msg)
28
29 if jms_msg.getJMSReplyTo()
30 reply_queue = jms_msg.getJMSReplyTo()
31 reply_producer = @jms_helper.create_producer(reply_queue)
32 reply_message = @jms_helper.create_message(response)
33 reply_message.setJMSCorrelationID(jms_msg.getJMSMessageID())
34 reply_producer.send(reply_message)
35 end
36 end
37 end
38 end
39
40 def close
41 @consumer.close();
42 end
43 end
./messaging/jms_helper.rb
1
2 class JmsHelper
3 #Object.module_eval("::#{i}")
4 #http://java.sun.com/j2ee/1.4/docs/tutorial/doc/JMS6.html
5 #
6 ### A key tip for scalability is to separate your reads from writes that way you can scale them independently and also
7 ### offer different quality of services, e.g. read queues can be non-persistent, but write queues can be persistent.
8 ###
9 #
10 QUEUE_READ_MESSAGES = 'read_messages'
11 QUEUE_WRITE_MESSAGES = 'write_messages'
12 QUEUE_READ_FOLLOWERS = 'read_followers'
13 QUEUE_WRITE_FOLLOWERS = 'write_followers'
14 QUEUE_READ_USERS = 'read_users'
15 QUEUE_WRITE_USERS = 'write_users'
16
17 READ_MESSAGES_HANDLERS = {
18 'DirectMessages.direct_messages_received' => DirectMessagesReceivedHandler,
19 'DirectMessages.direct_messages_sent' => DirectMessagesSentHandler,
20 'DirectMessages.replies' => RepliesHandler,
21 'Messages.get_status' => GetStatusHandler,
22 'Messages.public_timeline' => PublicTimelineHandler,
23 'Messages.user_timeline' => UserTimelineHandler,
24 'Messages.friends_timeline' => FriendsTimelineHandler,
25 'Messages.user_and_friends_timeline' => UserFriendTimelineHandler,
26 }
27 WRITE_MESSAGES_HANDLERS = {
28 'DirectMessages.send_direct_message' => SendDirectMessageHandler,
29 'DirectMessages.send_reply' => SendReplyMessageHandler,
30 'DirectMessages.destroy_direct_message' => DestroyDirectMessageHandler,
31 'Messages.update_status' => UpdateStatusHandler,
32 'Messages.destroy_message' => DestroyMessageHandler,
33 }
34 READ_FOLLOWERS_HANDLERS = {
35 'Followers.exists?' => FollowerExistsHandler,
36 }
37 WRITE_FOLLOWERS_HANDLERS = {
38 'Followers.create_follower' => CreateFollowerHandler,
39 'Followers.enable_notifications' => EnableNotificationsHandler,
40 'Followers.disable_notifications' => DisableNotificationsHandler,
41 'Followers.block' => BlockFollowerHandler,
42 'Followers.unblock' => UnblockFollowerHandler,
43 'Followers.destroy_follower' => DestroyFollowerHandler,
44 }
45 READ_USERS_HANDLERS = {
46 'Users.get_user' => GetUserHandler,
47 'Users.followings' => FollowingsHandler,
48 'Users.followers' => FollowersHandler,
49 'Users.authenticate' => AuthenticateHandler,
50 'Users.archive_messages' => ArchiveMessagesHandler,
51 }
52 WRITE_USERS_HANDLERS = {
53 'Users.create_user' => CreateUserHandler,
54 }
55
56 def initialize(connection=nil)
57 if connection
58 @connection = connection
59 else
60 factory = ActiveMQConnectionFactory.new("tcp://localhost:61616")
61 @connection = factory.create_connection()
62 @connection.set_exception_listener(self)
63 @connection.start();
64 end
65 @queues = {}
66 @producers = {}
67 @consumers = {}
68 end
69
70 def start_consumers
71 ### Creating another instance of jms helper because it keeps private session which cannot be shared
72 ### across threads, though it can share connection.
73 start_forwarder(QUEUE_READ_MESSAGES, READ_MESSAGES_HANDLERS)
74 start_forwarder(QUEUE_WRITE_MESSAGES, WRITE_MESSAGES_HANDLERS)
75 start_forwarder(QUEUE_READ_FOLLOWERS, READ_FOLLOWERS_HANDLERS)
76 start_forwarder(QUEUE_WRITE_FOLLOWERS, WRITE_FOLLOWERS_HANDLERS)
77 start_forwarder(QUEUE_READ_USERS, READ_USERS_HANDLERS)
78 start_forwarder(QUEUE_WRITE_USERS, WRITE_USERS_HANDLERS)
79 #
80 java.lang.Thread.currentThread().join
81 end
82
83 def onException(jmsEx)
84 puts "JMS Exception occured #{jmsEx.inspect}, shutting down"
85 end
86
87 def close
88 @connection.close()
89 @session.close if @session
90 end
91
92 def create_message(text)
93 get_session().createTextMessage(text)
94 end
95
96 def send_message(queue_name, jms_msg, reply=false)
97 ## TODO add error_queue/dead letter queue
98 if reply
99 reply_queue = get_session().createTemporaryQueue()
100 jms_msg.setJMSReplyTo(reply_queue)
101 end
102
103 get_producer(queue_name).send(get_queue(queue_name), jms_msg)
104 if reply_queue
105 reply_consumer = get_session().createConsumer(reply_queue)
106 reply_consumer.receive()
107 end
108 end
109
110 def get_producer(queue_name)
111 producer = @producers[queue_name]
112 if producer.nil?
113 producer = @producers[queue_name] = get_session().createProducer(get_queue(queue_name))
114 #producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT) # this can be changed for write queues
115 #producer.setTimeToLive(60000)
116 end
117 producer
118 end
119 def get_queue(queue_name)
120 queue = @queues[queue_name]
121 if queue.nil?
122 queue = @queues[queue_name] = get_session().create_queue(queue_name)
123 end
124 queue
125 end
126
127 def get_consumer(queue_name)
128 consumer = @consumers[queue_name]
129 if consumer.nil?
130 consumer = @consumers[queue_name] = get_session().create_consumer(get_queue(queue_name))
131 end
132 consumer
133 end
134
135 def create_producers(*queue_names)
136 for queue_name in queue_names
137 queue = get_queue(queue_name)
138 producer = get_producer(queue_name)
139 end
140 end
141
142 def create_producer(queue)
143 get_session().createProducer(queue)
144 end
145
146 def self.start_broker
147 broker = BrokerService.new()
148 broker.addConnector("tcp://localhost:61616")
149 broker.start()
150 sleep 2 # let broker startup
151 end
152 protected
153 def get_session
154 @session ||= @connection.create_session(false, Session::AUTO_ACKNOWLEDGE)
155 end
156 def start_forwarder(queue_name, handlers)
157 raise "null connection" if @connection.nil?
158 jms_helper = JmsHelper.new(@connection)
159 Thread.new(){MessageForwarder.new(jms_helper, queue_name, handlers).run}
160 end
161 end
162
163 if $PROGRAM_NAME == __FILE__
164 JmsHelper.start_broker
165 JmsHelper.new.start_consumers
166 end
I am not showing all message handlers because they look pretty much the same.
Database
./db/schema_helper.rb
1 require 'rubygems'
2 require 'un'
3 require 'active_record'
4 require 'create_followers'
5 require 'create_messages'
6 require 'create_users'
7
8 if defined?(JRUBY_VERSION)
9 gem 'activerecord-jdbc-adapter'
10 require 'jdbc_adapter'
11 require 'java'
12 end
13
14
15 class SchemaHelper
16 MAX_DB = 10
17 #
18 def self.create_schema()
19 for i in 0 ... MAX_DB
20 connect_to(i)
21 end
22 end
23
24 def self.hash_for(username)
25 hash = username.hash % MAX_DB
26 end
27
28 def self.setup_schema_for(username)
29 hash = hash_for(username)
30 connect_to(hash)
31 end
32
33 def self.connect_to(hash)
34 dbdir = "#{FileUtils.pwd}/data/db#{hash}"
35 dbfile = "#{dbdir}/database.sqlite"
36 #ActiveRecord::Base.establish_connection(:adapter => "sqlite", :dbfile => dbfile)
37 ActiveRecord::Base.establish_connection(
38 :adapter => "jdbc",
39 :dbfile => dbfile,
40 #:driver => "SQLite.JDBCDriver", ####:driver => "org.sqlite.JDBC",
41 #:url => "jdbc:sqlite:#{dbfile}"
42 :driver => "org.apache.derby.jdbc.EmbeddedDriver",
43 :url => "jdbc:derby:#{dbdir};create=true"
44 )
45 unless File.directory?(dbdir)
46
47
48 CreateUsers.migrate(:up)
49 CreateMessages.migrate(:up)
50 CreateFollowers.migrate(:up)
51 end
52 end
53 def self.test
54 SchemaHelper.setup_schema_for("shahbhat")
55 require 'user'
56 for i in 0 ... 10 do
57 SchemaHelper.connect_to(i)
58 puts User.count
59 end
60 end
61 end