Shahzad Bhatti Welcome to my ramblings and rants!

October 10, 2022

Implementing Distributed Locks (Mutex and Semaphore) with Databases

Filed under: Concurrency,Rust,Uncategorized — admin @ 10:55 pm

Overview

I recently needed a way to control access to shared resources in a distributed system for concurrency control. Though, you may use Consul or Zookeeper (or low-level Raft / Paxos implementations if you are brave) for managing distributed locks but I wanted to reuse existing database without adding another dependency to my tech stack. Most databases support transactions or conditional updates with varying degree of support for transactional guarantees, but they can’t be used for distributed locks if the business logic you need to protect resides outside the databases. I found a lock client library based on AWS Databases but it didn’t support semaphores. The library implementation was tightly coupled with concerns of lock management and database access and it wasn’t easy to extend it easily. For example, following diagram shows how cyclic dependencies in the code:

class diagram

Due to above deficiencies in existing solutions, I decided to implement my own implementation of distributed locks in Rust with following capabilities:

  • Allow creating lease based locks that can either protect a single shared resource with a Mutex lock or protect a finite set of shared resources with a Semaphore lock.
  • Allow renewing leases based on periodic intervals so that stale locks can be acquired by other users.
  • Allow releasing Mutex and semaphore locks explicitly after the user performs a critical action.
  • CRUD APIs to manage Mutex and Semaphore entities in the database.
  • Multi-tenancy support for different clients in case the database is shared by multiple users.
  • Fair locks to support first-come and first serve based access grant when acquiring same lock concurrently.
  • Scalable solution for supporting tens of thousands concurrent mutexes and semaphores.
  • Support multiple data stores such as relational databases such as MySQL, PostgreSQL, Sqlite and as well as NoSQL/Cache data stores such as AWS Dynamo DB and Redis.

High-level Design

I chose Rust to build the library for managing distributed locks due to strict performance and correctness requirements. Following diagram shows the high-level components in the new library:

LockManager Interface

The client interacts with the LockManager that defines following operations to acquire, release, renew lock leases and manage lifecycle of Mutexes and Semaphores:

#[async_trait]
pub trait LockManager {
    // Attempts to acquire a lock until it either acquires the lock, or a specified additional_time_to_wait_for_lock_ms is
    // reached. This method will poll database based on the refresh_period. If it does not see the lock in database, it
    // will immediately return the lock to the caller. If it does see the lock, it will note the lease expiration on the lock. If
    // the lock is deemed stale, (that is, there is no heartbeat on it for at least the length of its lease duration) then this
    // will acquire and return it. Otherwise, if it waits for as long as additional_time_to_wait_for_lock_ms without acquiring the
    // lock, then it will return LockError::NotGranted.
    //
    async fn acquire_lock(&self, opts: &AcquireLockOptions) -> LockResult<MutexLock>;

    // Releases the given lock if the current user still has it, returning true if the lock was
    // successfully released, and false if someone else already stole the lock. Deletes the
    // lock item if it is released and delete_lock_item_on_close is set.
    async fn release_lock(&self, opts: &ReleaseLockOptions) -> LockResult<bool>;

    // Sends a heartbeat to indicate that the given lock is still being worked on.
    // This method will also set the lease duration of the lock to the given value.
    // This will also either update or delete the data from the lock, as specified in the options
    async fn send_heartbeat(&self, opts: &SendHeartbeatOptions) -> LockResult<MutexLock>;

    // Creates mutex if doesn't exist
    async fn create_mutex(&self, mutex: &MutexLock) -> LockResult<usize>;

    // Deletes mutex lock if not locked
    async fn delete_mutex(&self,
                          other_key: &str,
                          other_version: &str,
                          other_semaphore_key: Option<String>) -> LockResult<usize>;

    // Finds out who owns the given lock, but does not acquire the lock. It returns the metadata currently associated with the
    // given lock. If the client currently has the lock, it will return the lock, and operations such as release_lock will work.
    // However, if the client does not have the lock, then operations like releaseLock will not work (after calling get_lock, the
    // caller should check mutex.expired() to figure out if it currently has the lock.)
    async fn get_mutex(&self, mutex_key: &str) -> LockResult<MutexLock>;

    // Creates or updates semaphore with given max size
    async fn create_semaphore(&self, semaphore: &Semaphore) -> LockResult<usize>;

    // Returns semaphore for the key
    async fn get_semaphore(&self, semaphore_key: &str) -> LockResult<Semaphore>;

    // find locks by semaphore
    async fn get_semaphore_mutexes(&self,
                                   other_semaphore_key: &str,
    ) -> LockResult<Vec<MutexLock>>;

    // Deletes semaphore if all associated locks are not locked
    async fn delete_semaphore(&self,
                              other_key: &str,
                              other_version: &str,
    ) -> LockResult<usize>;
}

The LockManager interacts with LockStore to access mutexes and semaphores, which delegate to implementation of mutex and semaphore repositories for lock management. The library defines two implementation of LockStore: first, DefaultLockStore that supports mutexes and semaphores where mutexes are used to acquire a singular lock whereas semaphores are used to acquire a lock from a set of finite shared resources. The second, FairLockStore uses a Redis specific implementation of fair semaphores for managing lease based semaphores that support first-come and first-serve order. The LockManager supports waiting for the lock to be available if lock is not immediately available where it periodically checks for the availability of mutex or semaphore based lock. Due to this periodic polling, the fair semaphore algorithm won’t support FIFO order if a new client requests a lock while previous lock request is waiting for next polling interval.

Create Lock Manager

You can instantiate a Lock Manager with relational database store as follows:

let config = LocksConfig::new("test_tenant");
let mutex_repo = factory::build_mutex_repository(RepositoryProvider::Rdb, &config)
	.await.expect("failed to build mutex repository");
let semaphore_repo = factory::build_semaphore_repository(
  	RepositoryProvider::Rdb, &config)
	.await.expect("failed to build semaphore repository");
let store = Box::new(DefaultLockStore::new(&config, mutex_repo, semaphore_repo));

let locks_manager = LockManagerImpl::new(
  	&config, store, &default_registry()).expect("failed to initialize lock manager");

Alternatively, you can choose AWS Dynamo DB as follows:

let mutex_repo = factory::build_mutex_repository(
  	RepositoryProvider::Ddb, &config).await.expect("failed to build mutex repository");
let semaphore_repo = factory::build_semaphore_repository(
  	RepositoryProvider::Ddb, &config).await.expect("failed to build semaphore repository");
let store = Box::new(DefaultLockStore::new(&config, mutex_repo, semaphore_repo));

Or Redis based data-store as follows:

let mutex_repo = factory::build_mutex_repository(
  	RepositoryProvider::Redis, &config).await.expect("failed to build mutex repository");
let semaphore_repo = factory::build_semaphore_repository(
  	RepositoryProvider::Redis, &config).await.expect("failed to build semaphore repository");
let store = Box::new(DefaultLockStore::new(&config, mutex_repo, semaphore_repo));

Note: The AWS Dynamo DB uses strongly consistent reads feature as by default it is eventually consistent reads.

Acquiring a Mutex Lock

You will need to build options for acquiring with key name and lease period in milliseconds and then acquire it:

let opts = AcquireLockOptionsBuilder::new("mylock")
	.with_lease_duration_secs(10).build();
let lock = lock_manager.acquire_lock(&opts)
	.expect("should acquire lock");

The acquire_lock operation will automatically create mutex lock if it doesn’t exist otherwise it will wait for the period of lease-time if the lock is not available. This will return a structure for mutex lock that includes:

{
  "mutex_key":"one",
  "tenant_id":"local-host-name",
  "version":"258d513e-bae4-4d91-8608-5d500be27593",
  "lease_duration_ms":15000,
  "locked":true,
  "expires_at":"2022-10-11T03:04:43.126542"
}

Renewing the lease of Lock

A lock is only available for the duration specified in lease_duration period, but you can renew it periodically if needed:

let opts = SendHeartbeatOptionsBuilder::new("one", "258d513e-bae4-4d91-8608-5d500be27593")
                .with_lease_duration_secs(15)
                .build();

let updated_lock = lock_manager.send_heartbeat(&opts)
					.expect("should renew lock");

Note: The lease renewal will also update the version of lock so you will need to use the updated version to renew or release the lock.

Releasing the lease of Lock

You can build options for releasing from the lock returned by above API as follows and then release it:

let opts = ReleaseLockOptionsBuilder::new("one", "258d513e-bae4-4d91-8608-5d500be27593")
                .build();
lock_manager.release_lock(&release_opts)
				.expect("should release lock");

Acquiring a Semaphore based Lock

The semaphores allow you to define a set of locks for a resource with a maximum size. The operation for acquiring semaphore is similar to acquiring regular lock except you specify semaphore size, e.g.:

let opts = AcquireLockOptionsBuilder::new("my_pool")
                    .with_lease_duration_secs(15)
                    .with_semaphore_max_size(10)
                    .build();
let lock = lock_manager.acquire_lock(&opts)
				.expect("should acquire semaphore lock");

The acquire_lock operation will automatically create semaphore if it doesn’t exist and it will then check for available locks and wait if all the locks are busy. This will return a structure for lock that includes:

{
  "mutex_key":"one_0000000000",
  "tenant_id":"local-host-name",
  "version":"5ad557df-dbe6-439d-8a31-dc367e32eab9",
  "lease_duration_ms":15000,
  "semaphore_key":"one",
  "locked":true,
  "expires_at":"2022-10-11T04:03:33.662484"
}

The semaphore lock will create mutexes internally that will be numbered from 0 to max-size (exclusive). You can get semaphore details using:

let semaphore = locks_manager.get_semaphore("one").await
                .expect("failed to find semaphore");

That would return:

{
  "semaphore_key": "one",
  "tenant_id": "local-host-name",
  "version": "4ff77432-ed84-48b5-9831-8e53f56c2620",
  "max_size": 10,
  "lease_duration_ms": 15000,
  "busy_count": 1,
  "fair_semaphore": false,
}

Or, fetch state of all mutexes associated with the semaphore using:

let mutexes = locks_manager.get_semaphore_mutexes("one").await
                .expect("failed to find semaphore mutexes");

Which would return:

  {
    "mutex_key": "one_0000000000",
    "tenant_id": "local-host-name",
    "version": "ba5a62e5-80f1-474e-a895-c4a18d252cb9",
    "lease_duration_ms": 15000,
    "semaphore_key": "one",
    "locked": true,
  },
  {
    "mutex_key": "one_0000000001",
    "tenant_id": "local-host-name",
    "version": "749b4ded-e356-4ef5-a23b-73a4984130c8",
    "lease_duration_ms": 15000,
    "semaphore_key": "one",
    "locked": false,
  },
  ...

Renewing the lease of Semaphore Lock

A lock is only available for the duration specified in lease_duration period, but you can renew it periodically if needed:

let opts = SendHeartbeatOptionsBuilder::new(
  				"one_0000000000", "749b4ded-e356-4ef5-a23b-73a4984130c8")
                .with_lease_duration_secs(15)
                .with_opt_semaphore_key("one")
                .build();
let updated_lock = lock_manager.send_heartbeat(&opts)
					.expect("should renew lock");

Note: The lease renewal will also update the version of lock so you will need to use the updated version to renew or release the lock.

Releasing the lease of Semaphore Lock

You can build options for releasing from the lock returned by above API as follows and then release it:

let opts = ReleaseLockOptionsBuilder::new("one_0000000000", "749b4ded-e356-4ef5-a23b-73a4984130c8")
                .with_opt_semaphore_key("one")
                .build();

lock_manager.release_lock(&opts)
				.expect("should release lock");

Acquiring a Fair Semaphore

The fair semaphores is only available for Redis due to internal implementation, and it requires enabling it via fair_semaphore configuration option, otherwise its usage is similar to above operations, e.g.:

let mut config = LocksConfig::new("test_tenant");
config.fair_semaphore = Some(fair_semaphore);

let fair_semaphore_repo = factory::build_fair_semaphore_repository(
  	RepositoryProvider::Redis, &config)
	.await.expect("failed to create fair semaphore");
let store = Box::new(FairLockStore::new(&config, fair_semaphore_repo));
let locks_manager = LockManagerImpl::new(
  	&config, store, &default_registry())
	.expect("failed to initialize lock manager");

Then acquire lock similar to the semaphore syntax as before:

let opts = AcquireLockOptionsBuilder::new("my_pool")
                    .with_lease_duration_secs(15)
                    .with_semaphore_max_size(10)
                    .build();
let lock = lock_manager.acquire_lock(&opts)
			.expect("should acquire semaphore lock");

The acquire_lock operation will automatically create semaphore if it doesn’t exist and it will then check for available locks and wait if all the locks are busy. This will return a structure for lock that includes:

{
  "mutex_key": "one_0fec9a7b-4354-4712-b537-ac14213bc5e8",
  "tenant_id": "local-host-name",
  "version": "0fec9a7b-4354-4712-b537-ac14213bc5e8",
  "lease_duration_ms": 15000,
  "semaphore_key": "one",
  "locked": true,
}

The fair semaphore lock does not use mutexes internally but for the API compatibility, it builds a mutex with a key based on combination of semaphore-key and version. You can then query semaphore state as follows:

let semaphore = locks_manager.get_semaphore("one").await
                .expect("failed to find semaphore");

That would return:

{
  "semaphore_key": "one",
  "tenant_id": "local-host-name",
  "version": "5779b01f-eaea-4043-8ae0-9f8b942c2727",
  "max_size": 10,
  "lease_duration_ms": 15000,
  "busy_count": 1,
  "fair_semaphore": true,
}

Or, fetch state of all mutexes associated with the semaphore using:

let mutexes = locks_manager.get_semaphore_mutexes("one").await
                .expect("failed to find semaphore mutexes");

Which would return:

  [
  {
    "mutex_key": "one_0fec9a7b-4354-4712-b537-ac14213bc5e8",
    "tenant_id": "local-host-name",
    "version": "0fec9a7b-4354-4712-b537-ac14213bc5e8",
    "lease_duration_ms": 15000,
    "semaphore_key": "one",
    "locked": true,
    "expires_at": "2022-10-11T04:41:43.845711",
  },
  {
    "mutex_key": "one_0000000001",
    "tenant_id": "local-host-name",
    "version": "",
    "lease_duration_ms": 15000,
    "semaphore_key": "one",
    "locked": false,
  },
  ...

Note: The mutex_key will be slightly different for unlocked mutexes as mutex-key isn’t needed for internal implementation.

Renewing the lease of Fair Semaphore Lock

You can renew lease of fair semaphore similar to above semaphore syntax, e.g.:

let opts = SendHeartbeatOptionsBuilder::new(
  			"one_0fec9a7b-4354-4712-b537-ac14213bc5e8", "0fec9a7b-4354-4712-b537-ac14213bc5e8")
                .with_lease_duration_secs(15)
                .with_opt_semaphore_key("one")
                .build();
let updated_lock = lock_manager.send_heartbeat(&opts)
					.expect("should renew lock");

Note: Due to internal implementation of fair semaphore, the version won’t be changed upon lease renewal.

Releasing the lease of Semaphore Lock

You can build options for releasing from the lock returned by above API as follows and then release it:

let opts = ReleaseLockOptionsBuilder::new(
    			"one_0fec9a7b-4354-4712-b537-ac14213bc5e8", "0fec9a7b-4354-4712-b537-ac14213bc5e8")
                .with_opt_semaphore_key("one")
                .build();

lock_manager.release_lock(&opts)
				.expect("should release lock");

Command Line Interface

In addition to a Rust based interface, the distributed locks library also provides a command line interface for managing mutex and semaphore based locks, e.g.:

Mutexes and Semaphores based Distributed Locks with databases.

Usage: db-locks [OPTIONS] [PROVIDER] <COMMAND>

Commands:
  acquire

  heartbeat

  release

  get-mutex

  delete-mutex

  create-mutex

  create-semaphore

  get-semaphore

  delete-semaphore

  get-semaphore-mutexes

  help
          Print this message or the help of the given subcommand(s)

Arguments:
  [PROVIDER]
          Database provider [default: rdb] [possible values: rdb, ddb, redis]

Options:
  -t, --tenant <TENANT>
          tentant-id for the database [default: local-host-name]
  -f, --fair-semaphore <FAIR_SEMAPHORE>
          fair semaphore lock [default: false] [possible values: true, false]
  -j, --json-output <JSON_OUTPUT>
          json output of result from action [default: false] [possible values: true, false]
  -c, --config <FILE>
          Sets a custom config file
  -h, --help
          Print help information
  -V, --version
          Print version information

For example, you can acquire fair semaphore lock as follows:

% REDIS_URL=redis://192.168.1.102 cargo run --  --fair-semaphore true --json-output true redis acquire --key one --semaphore-max-size 10

Which would return:

{
  "mutex_key": "one_69816448-7080-40f3-8416-ede1b0d90e80",
  "tenant_id": "local-host-name",
  "version": "69816448-7080-40f3-8416-ede1b0d90e80",
  "lease_duration_ms": 15000,
  "semaphore_key": "one",
  "locked": true,
}

You can run following command for renewing above lock:

% REDIS_URL=redis://192.168.1.102 cargo run --  --fair-semaphore true --json-output true redis heartbeat --key one_69816448-7080-40f3-8416-ede1b0d90e80 --semaphore-key one --version 69816448-7080-40f3-8416-ede1b0d90e80

And then release it as follows:

% REDIS_URL=redis://192.168.1.102 cargo run --  --fair-semaphore true --json-output true redis release --key one_69816448-7080-40f3-8416-ede1b0d90e80 --semaphore-key one --version 69816448-7080-40f3-8416-ede1b0d90e80

Summary

I was able to meet the initial goals for implementing distributed locks and though this library is early in development. You can download and try it from https://github.com/bhatti/db-locks. Feel free to send your feedback or contribute to this library.

Powered by WordPress