Shahzad Bhatti Welcome to my ramblings and rants!

April 26, 2026

20+ Production Patterns for Distributed AI Agents Using Actors and TupleSpaces

Filed under: Computing,Concurrency,Erlang,GO — admin @ 12:37 pm

Introduction

I have been building Agentic AI applications for a couple of years and shared some of the learnings (see previous blogs at the end). In most cases, I used Python with LangChain and LangGraph frameworks because they provide integration with local and cloud based LLM providers. However, the real challenge isn’t building one AI agent. It’s running 10,000 of them reliably, across teams, across nodes, without one team’s runaway model budget crashing another’s pipeline. This post is about the other problem: the infrastructure problem, which is fundamentally a distributed systems problem.

Most AI frameworks don’t even acknowledge that coordinating large scale agents is a distributed systems problem (See FLP theorem and Byzantine Generals Bound). You cannot engineer your way out of these constraints with better prompts or better models. You need explicit coordination protocols, failure detection, and external validation, which is at the heart of distributed systems. This is where the actor model comes in. Actors have been part of core abstractions for distributed computing since 1970s and can be easily used to structure agents. I first learned about actors and Linda memory model back in college during my post-doc research in distributed systems and used them to build frameworks for solving computational problems in HPC at scale. Actors provide the coordination substrate that makes distributed agent systems provably safer:

  • Isolated state: means no shared memory corruption and a misinterpreting agent cannot corrupt another agent’s state.
  • Message passing: makes coordination explicit and auditable without shared memory/locks.
  • Supervision trees: give you crash detection and recovery, e.g., when an agent fails (Byzantine or otherwise), the supervisor restarts it, links can propagate failures, and monitors can trigger compensating actions.
  • Durable state: with the durability facet means consensus progress survives node crashes.
  • TupleSpace coordination: gives you Linda-model consensus patterns without deadlock: write-once slots, pattern-matched reads, blocking takes, which are the building blocks of coordination protocols.

Every major AI framework today picks one problem and solves it well. For example, LangChain gives you chains, AutoGen gives you multi-agent conversations, Ray gives you distributed compute. But when you need all of these like stateful agents, distributed execution, durable pipelines, multi-tenant isolation, MCP tool calling, AllReduce gradient synchronization, AND the coordination substrate that makes distributed agents safe, you have to stitch together five systems. I wrote PlexSpaces actors system to solve scalable computational problems. It can be used to treat each agent as an actor: isolated state, message-driven communication, location-transparent routing, built-in fault tolerance. This framework supports polyglot development where applications can be written in Python, Go, Rust, or TypeScript. This post shows how to implement AI workload patterns concretely. For the theory behind why the actor model fits AI workloads so naturally, see my earlier post on PlexSpaces foundations. For the polyglot WASM runtime that makes four-language deployment possible, see the WebAssembly deep-dive. This post is about AI agent patterns specifically.


Part 1: Why Actors Are the Right Foundation for Distributed Agents

1.1 The Actor-Agent Isomorphism

An LLM agent has four things: state (conversation history, tool results), a processing loop (receive message -> reason -> act), communication (call tools, delegate to other agents), and failure modes (timeouts, hallucinations, rate limits). An actor has exactly the same structure. This isn’t a coincidence. Both actors and agents are inspired by the same computational model: isolated units of stateful computation that communicate by passing messages. Here is a Python research agent in 18 lines:

# examples/python/apps/a2a_multi_agent — ResearchAgent pattern
@actor(facets=["virtual_actor", "durability"])
class ResearchAgent:
    """Each actor IS an agent: isolated state + message-driven + fault-tolerant."""
    history: list = state(default_factory=list)
    queries_handled: int = state(default=0)
    agent_id: str = state(default="")

    @init_handler
    def on_init(self, config: dict) -> None:
        self.agent_id = config.get("actor_id", "")
        # Register in service registry — write-once so supervisor instance wins
        _ts_register_service("research", self.agent_id)

    @handler("research")
    def research(self, query: str = "", from_actor: str = "") -> dict:
        self.queries_handled += 1
        self.history.append({"query": query, "ts": host.now_ms()})
        return {"result": f"Research result for: {query}", "agent_id": self.agent_id}

The @actor decorator registers this as a GenServer actor. The durability facet checkpoints state automatically if the node crashes mid-query, the agent resumes from the last checkpoint. The virtual_actor facet activates the agent on demand and deactivates it when idle, so you pay nothing at rest.

Notice _ts_register_service("research", self.agent_id): this is the TupleSpace write-once service registry pattern. The first instance to call this writes the slot. Any subsequent instance finds the slot already taken and skips registration. This is how you implement safe service discovery without process groups that generate noisy warnings or risk routing to the wrong instance.

Agentic coding naturally favors small, composable actors. A researcher, an analyzer, a writer, each focused on one capability, composable via message passing. The Go a2a_multi_agent example makes this concrete: four actors (registry, researcher, analyzer, writer) each do one thing and delegate everything else.

1.2 The Distributed Consensus Problem in Multi-Agent Systems

When you run multiple LLM agents in parallel to speed up a complex coding task, to parallelize a RAG pipeline, to run specialist agents for different subtasks, you are building a distributed system. And distributed systems have properties that no amount of LLM capability improvement will change. Consider a prompt: “Build a REST API for user management with authentication.” This prompt is under specified. It admits at least these valid interpretations:

  • JWT vs session-based auth
  • REST vs GraphQL
  • PostgreSQL vs MongoDB
  • Monolith vs microservices

If you run four parallel agents on this prompt and each picks a different interpretation, you don’t get a coherent system, instead you get four incompatible subsystems. At ten agents this is a debugging problem. At ten thousand agents running across twenty nodes, this is a production incident at 3 AM. The agents must coordinate their design choices. That coordination is a consensus problem.

  • FLP Theorem: If agents communicate asynchronously (messages may be delayed arbitrarily) and any agent can crash (network failure, context limit, rate limiting), then no deterministic protocol can guarantee both safety (all agents agree on correct output) and liveness (the system eventually produces output).
  • Byzantine bound: Treat a misinterpreting agent as a Byzantine node, it sends plausible-looking messages but with incorrect content. Correct consensus requires fewer than 1/3 of agents to be Byzantine. If three of your ten agents hallucinate an incompatible API shape, you may not be able to reach correct consensus at all.

What follows from this:

  1. External validation (tests, type checking, static analysis) converts silent misinterpretations into detectable failures, e.g., Byzantine nodes become crash-detectable nodes, which is a strictly easier problem to solve.
  2. Explicit coordination protocols (not “talk to each other until you agree”) give you provable properties.
  3. Liveness requires failure detection. An agent that has crashed must be detected and either recovered or bypassed.

PlexSpaces provides all three, baked into the actor model:

Distributed Systems NeedPlexSpaces Mechanism
Failure detectionhost.monitor(actorID): get notified when an actor dies
Crash recoverySupervisor tree: automatic restart with configurable strategy
Coordination protocolTupleSpace write-once slots with explicit, auditable coordination
External validationValidatorActor pattern with external check before accepting output
Byzantine isolationPer-actor isolated state so that a misinterpreting actor cannot corrupt others
Liveness under crashesdurability facet so that progress survives node restarts

1.3 Failure Detection and Liveness: host.monitor()

Agents need “liveness-checking tools for better fault detection.” In PlexSpaces, this is host.monitor() and host.link() , following Erlang’s location-transparent supervision philosophy.

  • Monitor: any actor watches any other. When the monitored actor stops, the monitoring actor receives __DOWN__ in its mailbox and stays alive. The monitor_ref returned by host.monitor() lets you cancel the watch with host.demonitor().
  • Link: bidirectional fate-sharing. __EXIT__ is delivered only on abnormal exits (error, kill). Normal shutdown does not cascade. Use host.unlink() before graceful shutdown to avoid spurious propagation.

The example below is from examples/python/apps/ai_monitor_link_supervision:

# examples/python/apps/ai_monitor_link_supervision/ai_monitor_link_actor.py

@gen_server_actor
class ValidatorAgent:
    """Monitors workers; detects Byzantine faults; applies FLP >= 1/3 alert threshold."""
    monitor_refs: dict = state(default_factory=dict)   # worker_id -> monitor_ref
    down_events: list = state(default_factory=list)
    byzantine_count: int = state(default=0)
    total_validations: int = state(default=0)
    FLP_THRESHOLD = 1.0 / 3.0

    @handler("__DOWN__", "cast")
    def on_down(self, monitor_ref: str = "", down_from: str = "", down_reason: str = "") -> None:
        """Monitored worker stopped — one-way notification. ValidatorAgent stays alive.
        
        DOWN fires on ANY exit: normal, error, shutdown, kill. The monitoring actor
        decides what to do — this is Akka Death Watch semantics, not Erlang trap_exit.
        """
        self.down_events.append({"down_from": down_from, "down_reason": down_reason})
        # Remove stale watch entry so we don't leak monitor refs
        for wid, ref in list(self.monitor_refs.items()):
            if ref == monitor_ref:
                del self.monitor_refs[wid]
                break

    @handler("monitor_worker")
    def on_monitor_worker(self, worker_id: str = "") -> dict:
        """One-way watch. Returns monitor_ref for future demonitor() call."""
        monitor_ref = host.monitor(worker_id)
        self.monitor_refs[worker_id] = monitor_ref
        return {"status": "ok", "monitor_ref": monitor_ref}

    @handler("demonitor_worker")
    def on_demonitor_worker(self, worker_id: str = "") -> dict:
        """Cancel watch — used when gracefully replacing a worker."""
        ref = self.monitor_refs.pop(worker_id, None)
        if ref:
            host.demonitor(ref)   # idempotent: safe to call multiple times
        return {"status": "ok", "worker_id": worker_id}

    @handler("validate")
    def on_validate(self, result: str = "", worker_id: str = "") -> dict:
        """Apply FLP-inspired Byzantine threshold: >= 1/3 flagged ? alert.
        
        FLP theorem: no deterministic async protocol can guarantee both safety and
        liveness with even one crash. Monitors give us the failure signal; this
        threshold decides when to escalate.
        """
        self.total_validations += 1
        is_byzantine = any(p in result.lower() for p in ["42 is the answer", "null", "checkpoint corrupted"])
        if is_byzantine:
            self.byzantine_count += 1
        flp_ratio = self.byzantine_count / self.total_validations if self.total_validations else 0.0
        return {"valid": not is_byzantine, "flp_threshold_exceeded": flp_ratio >= self.FLP_THRESHOLD}


@gen_server_actor
class InferenceWorker:
    """LLM inference worker. Uses host.link() for bidirectional fate-sharing with peer workers."""
    linked_peers: list = state(default_factory=list)

    @handler("__EXIT__", "cast")
    def on_exit(self, exit_from: str = "", exit_reason: str = "") -> None:
        """Linked peer died abnormally — clean up and continue.
        
        __EXIT__ fires ONLY on abnormal exits (error, kill). Normal shutdown does
        NOT propagate — use host.unlink() before graceful shutdown to prevent cascade.
        """
        if exit_from in self.linked_peers:
            self.linked_peers.remove(exit_from)

    @handler("link_with")
    def on_link_with(self, peer_id: str = "") -> dict:
        host.link(peer_id)          # bidirectional: if either dies abnormally, other gets __EXIT__
        self.linked_peers.append(peer_id)
        return {"status": "ok", "peer_id": peer_id}

    @handler("unlink_from")
    def on_unlink_from(self, peer_id: str = "") -> dict:
        host.unlink(peer_id)        # decouple before graceful shutdown — no cascade
        self.linked_peers = [p for p in self.linked_peers if p != peer_id]
        return {"status": "ok", "peer_id": peer_id}

This is liveness management at the actor level. The ValidatorAgent stays alive even when a worker crashes and __DOWN__ is informational, not fatal. The InferenceWorker handles __EXIT__ only from abnormal peer failures; normal shutdowns don’t cascade because the supervisor calls unlink_from first.

The down_from / down_reason header names match the create_down_message wire format used by every PlexSpaces node. The same pattern works identically in Go, TypeScript, and Rust WASM (see examples/*/apps/ai_monitor_link_supervision for all four languages).

1.4 Four Behaviors, Four Agent Archetypes

PlexSpaces provides four behavior types, each mapping naturally to a class of AI agent:

BehaviorDecoratorAgent ArchetypeExample
GenServer@actorTool executor, stateful helperSearch agent, RAG retriever
GenEvent@event_actorAudit logger, event publisherUsage tracker, metrics collector
GenFSM@fsm_actorState-machine agentCircuit breaker, quality gate, budget guard
Workflow@workflow_actorOrchestrator agentMulti-step pipeline, RAG workflow, agentic loop

The TypeScript llm_workflow_orchestrator uses all four. The QualityFSMActor implements a quality gate with five states:

// From llm_workflow_orchestrator_actor.ts
class QualityFSMActor extends PlexSpacesActor<QualityFSMState> {
  getDefaultState(): QualityFSMState {
    return { actorId: "", fsmState: "pending", attempts: 0, lastScore: 0 };
  }

  onEvaluate(payload: Record<string, unknown>): Record<string, unknown> {
    const score = Number(payload.score ?? 0);
    this.state.attempts++;
    this.state.lastScore = score;
    if (score >= 8) {
      this.state.fsmState = "approved";
    } else if (score >= 6) {
      this.state.fsmState = this.state.attempts >= 3 ? "escalated" : "evaluating";
    } else {
      this.state.fsmState = this.state.attempts >= 3 ? "rejected" : "evaluating";
    }
    return { state: this.state.fsmState, score, attempts: this.state.attempts };
  }
}

The PipelineAuditActor uses GenEvent semantics, fire-and-forget, no reply needed:

// Fire-and-forget handler: cast (no return value)
onPipeline_step_completed(payload: Record<string, unknown>): void {
  this.state.eventsReceived++;
  this.state.lastEvent = payload;
  host.applicationMetricsAdd(this.state.actorId || "llm-orchestrator", {
    message_count: 1,
    counter_metrics: { pipeline_events: 1 },
  });
}

These two actors require zero changes to the orchestrator logic. They attach via config.

1.5 Facets: Cross-Cutting Agent Capabilities

Facets are the key architectural insight. They are pluggable capabilities that attach to actors at deployment time without code changes in the actor handler logic.

FacetAgent BenefitDistributed Systems Guarantee
virtual_actorActivates on demand, deactivates when idlePrevents unbounded resource consumption
durabilitySurvives node restarts, state checkpointed automaticallyProgress preservation across crashes (liveness)
timerSchedules follow-up actions, heartbeats, budget reviewsTimeout detection for hung agents
metricsEvery interaction auto-instrumented in PrometheusObservability for failure detection
cachingMemoize expensive LLM calls, skip redundant computationReduces cost of Byzantine retries

The updated app-config.toml for llm_workflow_orchestrator shows facets composing via config:

[[supervisor.children]]
id = "quality_fsm"
type = "quality_fsm"
behavior_kind = "GenFSM"
facets = [
  { type = "virtual_actor", priority = 100, config = { idle_timeout = "30m", activation_strategy = "lazy" } },
  { type = "durability", priority = 90, config = { checkpoint_interval = 1 } }
]

The quality FSM now checkpoints after every state transition (checkpoint_interval = 1) and deactivates after 30 minutes of inactivity. Zero lines changed in QualityFSMActor. That is the point, the business logic and the operational logic stay separate.

1.6 TupleSpace: Safe Coordination Without Race Conditions

The FLP theorem says you cannot guarantee both safety and liveness in an asynchronous system. But you can get very close by using the right coordination primitive. TupleSpace implements the Linda coordination model: write tuples, read them by pattern match, take them (destructive read). Three operations without locks or mutable state. Write-once slots give you safe service registration across concurrent actor instances:

// Go SDK — TupleSpace write-once service registration
// (from resource_aware_inference_actor.go and a2a_multi_agent_actor.go)
func tsRegisterService(serviceType, actorID string) {
    // Read first — if entry exists, skip (write-once semantics)
    if _, ok := host.TS().Read([]any{"svc", serviceType, nil}); !ok {
        host.TS().Write([]any{"svc", serviceType, actorID})
    }
}

func tsDiscoverService(serviceType string) (string, error) {
    tup, ok := host.TS().Read([]any{"svc", serviceType, nil})
    if !ok || len(tup) < 3 {
        return "", fmt.Errorf("service %q not registered", serviceType)
    }
    return tup[2].(string), nil
}
// TypeScript SDK — same pattern
function tsRegisterService(serviceType: string, actorId: string): void {
  const existing = host.ts.read(["svc", serviceType, null]);
  if (!existing) {
    host.ts.write(["svc", serviceType, actorId]);
  }
}

function tsDiscoverService(serviceType: string): string | null {
  const tup = host.ts.read(["svc", serviceType, null]);
  return (tup && tup.length >= 3) ? String(tup[2]) : null;
}
# Python SDK — same pattern
def _ts_register_service(service_type: str, actor_id: str) -> None:
    existing = host.ts_read(["svc", service_type, None])
    if not existing:
        host.ts_write(["svc", service_type, actor_id])

def _ts_discover_service(service_type: str) -> str | None:
    tup = host.ts_read(["svc", service_type, None])
    return tup[2] if tup and len(tup) >= 3 else None

The framework uses WASM re-instantiation to speed up actor startup (compile once, instantiate from cached binary). During the re-instantiation window, a new HTTP request can activate a second instance of the same actor type via virtual_actor. If both instances join a process group, pgFirst() returns non-deterministically. We saw this cause budget_exceeded errors in resource_aware_inference when the routing workflow asked the budget manager for remaining balance and got the empty virtual_actor instance that had never been initialized with budget data. TupleSpace write-once registration solves this:

  1. Supervisor-spawned instance calls tsRegisterService("budget_manager", myID) on Init writes slot.
  2. Virtual_actor instance calls tsRegisterService("budget_manager", myID2) on Init finds slot taken, skips.
  3. Routing workflow calls tsDiscoverService("budget_manager") and always gets the supervisor-spawned instance.

For shared state (like budget totals that all instances should see), store the data in TupleSpace too:

// BudgetManagerActor — state in TupleSpace, not per-actor KV
// Both the supervisor-spawned and any virtual_actor instance read the same data
func (b *BudgetManagerActor) tsReadBudgetFloat(prefix, tenantID string) float64 {
    tup, ok := host.TS().Read([]any{prefix, tenantID, nil})
    if !ok || len(tup) < 3 { return 0 }
    var v float64
    fmt.Sscanf(fmt.Sprint(tup[2]), "%f", &v)
    return v
}

func (b *BudgetManagerActor) tsWriteBudgetFloat(prefix, tenantID string, value float64) {
    host.TS().Take([]any{prefix, tenantID, nil}) // remove old value
    host.TS().Write([]any{prefix, tenantID, fmt.Sprintf("%f", value)}) // write new
}

This is the coordination protocol the FLP analysis demands: explicit, auditable, shared state managed through a primitive that has no locks and no deadlock risk.


Part 2: Platform Capabilities

2.1 WAR-File like Deployment: Multiple AI Apps Per Node

PlexSpaces nodes are application servers for WASM actors like JBoss for WAR files, but for AI agents. Each team deploys an independent application (a .wasm binary + a config file) to the same node. Applications share the runtime but have isolated namespaces, actor registries, and tenant contexts.

# Deploy RAG pipeline from Search team
plexspaces deploy --app rag-pipeline --wasm rag.wasm --config rag-config.toml

# Deploy inference server from ML team — same node, independent lifecycle
plexspaces deploy --app inference-server --wasm inference.wasm --config inference-config.toml

# Deploy agent orchestrator from Platform team — same node
plexspaces deploy --app agent-orchestrator --wasm orchestrator.wasm --config orchestrator-config.toml

Each application has its own supervisor tree, its own actor namespace, and its own failure isolation. The ML team’s inference workers crashing doesn’t touch the Search team’s RAG pipeline.

2.2 Node Communication with Location-Transparent Messaging

Actors on different nodes message each other with the same API as local actors. When OrchestratorAgent calls host.Ask(researchAgentID, "research", ...), the framework routes transparently to local mailbox if the target is on the same node, gRPC if it’s on a different node. The calling actor never knows the difference.

// From a2a_multi_agent_actor.go — OrchestratorAgent
// This call works whether researchAgent is local or 3 nodes away.
researchResp, err := host.Ask(researchAgentID, "research", map[string]any{
    "topic": task, "depth": 1,
}, 10000)
// No service discovery config. No DNS lookup. No circuit breaker setup.
// The framework handles routing, retries, and failover.

SWIM gossip propagates node membership in real time. When a new node joins, actors on existing nodes can immediately message actors on the new node. This makes multi-node agent deployments trivial. The a2a_multi_agent example deploys four specialist agents, each potentially on different nodes, and the orchestrator coordinates them with the same host.Ask() calls used for local agents.

2.3 Multi-Tenancy with AuthN/AuthZ

Every host.Ask() call carries a RequestContext with tenant_id and namespace. You cannot bypass it. The Python MCPGatewayWorkflow enforces tenant boundary at the application layer:

# From mcp_tool_server_actor.py — MCPGatewayWorkflow.start()
# JWT carries tenant_id — enforced at every Ask() boundary
tenant = request.get("tenant", "")
if tenant:
    self_ns = actor_application_id(self.actor_id)
    if self_ns and tenant != self_ns:
        return {
            "jsonrpc": "2.0", "id": request_id,
            "error": {"code": -32600,
                      "message": f"Tenant mismatch: '{tenant}' — access denied"},
        }
# Pass tenant context downstream — research agent sees the same tenant_id
result = host.ask("tool_registry", "tools_call", {
    "tool_name": tool_name, "input": params.get("arguments", {}),
    "tenant": tenant,  # propagated through the call chain
}, timeout_ms=15000)

The application_metrics_add() call in every actor automatically tags metrics by actor ID, which includes the application namespace. Prometheus metrics are naturally scoped to tenant. JWT validation, namespace isolation, and metric scoping all happen at the framework level.

2.4 The Primitive Stack — Everything You Need, Nothing You Don’t

Every pattern in this post builds on one or more of these primitives. All are available in every language. All are accessible via the same host.* API from any actor regardless of language or location.

PrimitiveWhat It DoesAI Agent Use CaseHPC/ML Analog
Shard GroupPartition data across N actors; scatter-gather with aggregationParallel RAG retrieval, distributed inferenceRay map_batches(), Spark partitions
Worker PoolStateless actor pool with load balancingBurst inference capacity, tool executionRay remote functions, Lambda concurrency
Process GroupDynamic membership; broadcast to all membersConfig updates to all inference workersMPI communicator, Gloo process group
TupleSpacePattern-matched shared memory; Linda-model coordinationService registry, task result sharing, consensusMPI ghost cell exchange, barrier sync
ChannelsQueue-based stage coupling; 6 backends (Kafka, Redis, SQS, PG, …)Async pipeline stages, event streamingKafka, SQS, RabbitMQ
Workflow ActorMulti-step durable orchestration; pause/resume/cancelRAG pipeline, agent orchestrationAirflow DAG, Temporal workflow
Distributed LockLease-based mutual exclusion across actorsModel weight update, index rebuildZooKeeper, Redis Redlock
Blob StorageLarge binary payloads (embeddings, model weights)Embedding cache, model artifact storeS3, HDFS
BroadcastSend data to all actors in a process groupPush config updates to all workersMPI_Bcast
Collective ReduceSum/min/max across all actors; return to coordinatorAggregate inference metricsMPI_Allreduce
Scatter/GatherFan-out to N workers, fan-in aggregated resultsParallel document search, batch inferenceMPI_Scatter + MPI_Gather

2.5 Custom Services and Components and Full Polyglot Stack

PlexSpaces is not just a runtime for the primitives above. It ships the entire stack needed to build production AI services:

SDKs in all four languages:

# Python: @actor decorator, host.ask(), host.ts_write(), host.monitor()
@actor(facets=["virtual_actor", "durability"])
class MyAgent: ...
// Go: struct embedding, host.Ask(), host.TS().Write(), host.Monitor()
type MyAgent struct { plexspaces.ActorBase }
func (a *MyAgent) HandleMessage(from, msgType, payload string) string { ... }
// TypeScript: class extends PlexSpacesActor, host.ask(), host.ts.write()
class MyAgent extends PlexSpacesActor<MyState> { ... }
// Rust: #[gen_server_actor], host::ask(), host::ts_write(), host::monitor()
#[gen_server_actor]
struct MyAgent { state: MyState }

Service links for outbound HTTP connect to any external API (OpenAI, Anthropic, your own inference endpoint) via config, not code:

# app-config.toml — service link to LLM provider
[[service_links]]
name = "llm_provider"
base_url = "https://api.openai.com"
timeout_secs = 30
retry_policy = { max_attempts = 3, backoff = "exponential" }
# Python actor using service link — no URL in code, no hardcoded credentials
response = host.http_fetch("llm_provider", "POST", "/v1/chat/completions",
    json.dumps({"model": "gpt-4o", "messages": messages}))

Custom supervisor strategies — configure how your agent tree recovers from failures:

[supervisor]
id = "rag-supervisor"
strategy = "one_for_one"        # restart only the crashed actor
max_restarts = 10
max_restart_window_secs = 60    # if 10 crashes in 60s, escalate to parent
children = [...]

Alternatively rest_for_one (restart crashed actor + all actors started after it) or one_for_all (restart entire team when any member crashes), the right choice depends on how much your agents share state.

Observability out of the box: every actor reports to Prometheus automatically:

// application_metrics_add() from any actor, any language
host.ApplicationMetricsAdd("rag-pipeline", map[string]any{
    "message_count": 1,
    "counter_metrics": map[string]any{
        "queries_processed": 1,
        "validation_failures": validationFailed,
    },
    "latency_totals_ms": map[string]any{
        "retrieve_ms": retrieveLatency,
        "generate_ms": generateLatency,
    },
})
// Automatically available at /metrics as:
// plexspaces_app_queries_processed{app="rag-pipeline",node="node-1"} 142
// plexspaces_app_retrieve_ms_total{app="rag-pipeline",node="node-1"} 8432

The battery list (all included, zero external deps beyond the binary):

BatteryWhat It Includes
RuntimeWASM AOT compilation, ~50 microsecond cold start, polyglot actor host
StoragePer-actor SQLite journal, KV store, blob store, TupleSpace
MessagingLocal mailbox, remote gRPC, ordered delivery, at-least-once
SchedulingTimers, send_after, cron-style periodic messages
CoordinationTupleSpace, distributed locks, process groups, channels
ScalingShard groups, elastic pools, MPI collectives
SecurityJWT auth, tenant isolation, namespace scoping, RBAC
ObservabilityPrometheus metrics, per-actor counters, application metrics API
DeploymentAPP/WAR-file hot deploy/undeploy, multi-app per node, SWIM gossip
NetworkingLocation-transparent routing, gRPC transport, service links

Part 3: Infrastructure Patterns

Pattern 1: Durable Workflows with Signals and Queries

Workflow actors give you the durability that LLM pipelines need but almost never have. Use durability when your pipeline has multiple expensive steps and you cannot afford to restart from scratch on a crash. Each step is checkpointed. Crash at step 3, resume from step 3. No full restart. The Python MCPGatewayWorkflow shows the pattern:

# From mcp_tool_server_actor.py — MCPGatewayWorkflow
@workflow_actor(facets=["virtual_actor", "durability"])
class MCPGatewayWorkflow:
    session_id: str = state(default="")
    requests_processed: int = state(default=0)
    last_error: str = state(default="")

    @run_handler
    def start(self, request: dict = None) -> dict:
        if not self.session_id:
            self.session_id = f"session-{host.now_ms()}"
        method = request.get("method", "")
        # Route to tool registry — state checkpointed before and after
        if method == "tools/list":
            result = host.ask("tool_registry", "tools_list", {}, timeout_ms=10000)
        elif method == "tools/call":
            tool_name = request.get("params", {}).get("name", "")
            result = host.ask("tool_registry", "tools_call",
                              {"tool_name": tool_name, "input": request.get("params", {}).get("arguments", {})},
                              timeout_ms=15000)
        self.requests_processed += 1
        return {"jsonrpc": "2.0", "id": request.get("id", 0), "result": result}

    @signal_handler("reset")
    def reset(self, reason: str = "manual") -> None:
        self.requests_processed = 0
        self.session_id = f"session-{host.now_ms()}"

Temporal requires a separate server and a separate SDK. Airflow restarts the whole DAG. PlexSpaces checkpoints per step inside the actor runtime, using the same SQLite journal that backs all actor state.

Pattern 2: SEDA (Staged Event-Driven Architecture)

SEDA decouples pipeline stages so a slow embedder doesn’t stall the parser, and a GPU failure at step 3 doesn’t rerun step 1. Every stage is an independent actor (or shard group of actors). Stages communicate by message passing. Each stage has its own queue, its own scaling policy, and its own failure boundary.

Use this pattern when your pipeline stages have meaningfully different latency profiles or resource requirements. For example, a slow GPU-bound generation step should not stall a fast CPU-bound parsing step, and a failure in one stage should not force the others to restart. The agentic_rag_pipeline example in Go shows the three core stages: index, retrieve, generate, validate as separate actors orchestrated by a workflow:

// From agentic_rag_pipeline_actor.go — RAGWorkflow: four actors, one workflow
// Each actor is an independent stage with its own queue and failure domain.
retrieverID := wf.siblingActorID("retriever")    // Stage 2: keyword search
generatorID := wf.siblingActorID("generator")    // Stage 3: LLM generation
validatorID := wf.siblingActorID("validator")    // Stage 4: guardrail checks

// Stage 2 -> Stage 3: message passing (no shared memory, no locks)
retrieveResp, err := host.Ask(retrieverID, "retrieve", map[string]any{
    "query": query, "mode": effectiveMode, "max_results": 5,
}, 15000)
chunks := extractStringSlice(retrieveResp, "results")

generateResp, err := host.Ask(generatorID, "generate", map[string]any{
    "query": query, "context": chunks,
}, 15000)

// Fire-and-forget audit event to GenEvent actor — Stage 4 doesn't wait for it
_ = host.Send(eventActorID, "pipeline_step_completed", map[string]any{
    "step": "generate", "status": "completed",
})

The host.Send() call to the PipelineEventActor is fire-and-forget. The workflow continues immediately without blocking, backpressure from the audit stage into the generation stage. That’s SEDA in one line. At larger scale (from data_lake_rag), each stage becomes a shard group for horizontal parallelism: the retrieval stage fans out across N shards of the index, collects top-K per shard, merges globally.

Scale the retrieval stage without touching the generation stage. Route GPU-heavy generation to GPU nodes via labels. The workflow actor checkpoints between stages so a crash at generation doesn’t re-run indexing. This is the operational superiority of SEDA: independent scaling, independent failure recovery, independent observability.

Pattern 3: Cellular Architecture

You can use this pattern when namespace isolation is not enough and you need hard failure domain separation between tenants or regions. Also use for geographic compliance requirements where data cannot leave a region. Each cell in cellular architecture is an independent PlexSpaces cluster of nodes sharing same cluster-name: with its own supervisor tree, its own KV store, its own actor registry. WASM APP/WAR-file deployment means each cell runs multiple AI services independently. SWIM gossip handles peer discovery between cells. Partition cells by tenant or by geography. Cells fail independently. An ACME tenant cell crashing doesn’t touch the Beta tenant cell. Add a new AI service to the ACME cell/cluster by dropping a .wasm file and the Beta cell/cluster never sees it, never needs to restart.

This is multi-tenancy at the infrastructure level not just separate namespaces but separate fault domains with transparent cross-cell message routing.

Pattern 4: Resource-Based Affinity

Use resource based affinity when you have heterogeneous compute (GPU vs CPU nodes) and need to route requests to the right tier based on prompt complexity, remaining budget, or hardware capability. The Go resource_aware_inference example below shows cost-aware model routing in 30 lines. The routing workflow coordinates three actors via TupleSpace discovery:

// From resource_aware_inference_actor.go — RoutingWorkflow.Run()
func (rw *RoutingWorkflow) Run(payloadJSON string) string {
    p := parsePayload(payloadJSON)
    prompt := stringVal(p, "prompt", "")
    tenantID := stringVal(p, "tenant_id", "default")
    preferGPU, _ := p["prefer_gpu"].(bool)

    // Discover services via TupleSpace registry (write-once, race-safe)
    budgetManagerID, err := tsDiscoverService("budget_manager")
    modelRegistryID, err := tsDiscoverService("model_registry")

    // Step 1: Check tenant budget
    complexity := promptComplexity(prompt)
    estimatedCost := 200.0 * tierCostPer1K("medium") / 1000.0
    budgetResp, err := host.Ask(budgetManagerID, "check_budget", map[string]any{
        "tenant_id": tenantID, "estimated_cost": estimatedCost,
    }, 10000)
    // ... if not allowed: return budget_exceeded

    // Step 2: Select model by complexity + budget + GPU preference
    modelResp, _ := host.Ask(modelRegistryID, "select_model", map[string]any{
        "complexity": complexity, "budget_remaining": remainingUSD, "prefer_gpu": preferGPU,
    }, 10000)
    selectedTier := stringVal(modelMap, "tier", "small")

    // Step 3: Route to tier-specific inference worker (also TS-discovered)
    workerRole := "inference_worker_" + selectedTier
    workerID, _ := tsDiscoverService(workerRole)
    inferResp, _ := host.Ask(workerID, "infer", map[string]any{
        "prompt": prompt, "max_tokens": 100, "tenant_id": tenantID,
    }, 30000)

    // Step 4: Deduct actual cost from shared TupleSpace budget
    host.Ask(budgetManagerID, "deduct", map[string]any{
        "tenant_id": tenantID, "cost": actualCost,
    }, 10000)
}

Three model tiers. One workflow actor. Per-tenant budget enforcement.


Part 4: RAG and Knowledge Patterns

Pattern 5: Indexing at Scale with Sharded RAG Index

Use indexing at scale when your document corpus is too large for a single actor to index or query within acceptable latency, or when you need to parallelize retrieval across many partitions and aggregate top-K results. For example, the parameter server Leader.train() in Python shows scatter-gather at its most direct: fan out compute_gradient to N workers, collect responses, aggregate:

# From parameter_server_actor.py — Leader.train()
group = host.create_shard_group({
    "group_id": group_id,
    "actor_type": "worker",
    "shard_count": self.num_workers,
    "partition_strategy": "hash",
    "placement": {"strategy": "from_registry"},
    "initial_state": {},
})

for _ in range(iterations):
    response = host.scatter_gather({
        "group_id": group_id,
        "query": {
            "op": "compute_gradient",
            "weights": {"w1": self.w1, "w2": self.w2},
            "input_dim": self.input_dim, "hidden_dim": self.hidden_dim,
        },
        "aggregation": "concat",
        "min_responses": self.num_workers,
        "timeout_ms": 30000,
    })
    # ... aggregate gradients, update weights

The same pattern applies to RAG indexing: N shard actors each hold a partition of the document corpus. Query time: scatter the search across all shards, gather top-K results, merge.

Pattern 6: Agentic RAG — Orchestrated Retrieve-Generate-Validate

Use agentic RAG when a single retrieval-generation pass is not reliable enough for your use case, and you can afford 2–3 retry cycles in exchange for higher answer quality. The Go agentic_rag_pipeline demonstrates a full agentic RAG loop with retry in a workflow actor. This directly addresses the external validation recommendation from the FLP analysis: the ValidatorActor converts silent LLM misinterpretations (hallucinations, off-topic answers) into detectable failures that the workflow can handle.

// From agentic_rag_pipeline_actor.go — RAGWorkflow.Run()
for attempt := 0; attempt <= maxRetries; attempt++ {
    effectiveMode := mode
    if attempt > 0 { effectiveMode = "deep" }  // escalate to deep search on retry

    // Step 1: Retrieve
    wf.CurrentStep = "retrieve"
    retrieveResp, err := host.Ask(retrieverID, "retrieve", map[string]any{
        "query": query, "mode": effectiveMode, "max_results": 5,
    }, 15000)
    chunks := extractStringSlice(retrieveResp, "results")

    // Step 2: Generate
    wf.CurrentStep = "generate"
    generateResp, err := host.Ask(generatorID, "generate", map[string]any{
        "query": query, "context": chunks, "max_retries": 1,
    }, 15000)
    answer := extractString(generateResp, "answer")

    // Step 3: Validate — external check converts silent errors to detectable failures
    wf.CurrentStep = "validate"
    validateResp, err := host.Ask(validatorID, "validate", map[string]any{
        "answer": answer, "query": query, "sources": sources,
    }, 10000)
    if extractBool(validateResp, "valid") || attempt >= maxRetries {
        wf.Status = "completed"
        return marshal(map[string]any{"status": "completed", "answer": answer,
            "score": extractFloat(validateResp, "score"), "retry_count": attempt})
    }
    // Validation failed — retry with deep search mode
}

The retry escalation is key: first attempt uses single mode (fast, keyword match). Failed attempts switch to deep mode — multi-hop retrieval that tries individual query words. The workflow actor checkpoints between steps, so a generator crash mid-validation doesn’t force re-retrieval.

Pattern 7: Trustworthy Generation with Guardrails

Use guardrails pattern when you are deploying agents in a context where incorrect or unsafe output has real consequences: customer-facing answers, financial decisions, regulated content. The ValidatorActor in the Go RAG pipeline runs three checks on every generated answer. These checks implement the “external validation converts Byzantine failures to detectable failures” principle:

// From agentic_rag_pipeline_actor.go — ValidatorActor.validate()
// Check 1: Length — answer must be longer than 10 chars
lengthOK := len(answer) > 10

// Check 2: Source grounding — answer must share words with at least one source
// This detects hallucination: an answer with no shared words with sources is likely fabricated
groundedOK := false
if len(sources) > 0 {
    answerWords := wordSet(strings.ToLower(answer))
    for _, src := range sources {
        srcWords := wordSet(strings.ToLower(src))
        for w := range answerWords {
            if len(w) > 3 && srcWords[w] { groundedOK = true; break }
        }
    }
}
if len(sources) == 0 { groundedOK = true }  // no sources: check not applicable

// Check 3: Safety — answer must not contain prompt injection attempts
forbidden := []string{"ignore", "bypass", "jailbreak", "forget"}
safeOK := true
for _, f := range forbidden {
    if strings.Contains(strings.ToLower(answer), f) { safeOK = false; break }
}

confidence := float64(passedCount) / 3.0

Three independent checks, composable. Add a toxicity check, a PII check, a hallucination detector, each is a new check function inside the same validator actor. Or promote the validator to a pipeline of validator actors, each responsible for one check category.

Pattern 8: Deep Search (Multi-Hop Retrieval)

Use this pattern when a single-pass keyword retrieval consistently returns fewer results than expected for complex or multi-concept queries. However, it can result in higher escalation cost. For example, the RetrieverActor escalates from keyword matching to word-level multi-hop retrieval when the first pass yields fewer than 2 results:

// From agentic_rag_pipeline_actor.go — RetrieverActor.retrieve()
if mode == "deep" && len(results) < 2 {
    words := strings.Fields(queryLower)
    for _, word := range words {
        if len(word) < 3 { continue }
        extra := ret.matchChunks(keys, word, maxResults-len(results))
        for _, e := range extra {
            results = append(results, e)
            if len(results) >= maxResults { break }
        }
    }
}

Simple and effective. The RetrieverActor tracks TotalChunksScanned so you can observe the cost of deep search versus single-pass retrieval in Prometheus.


Part 5: LLM Orchestration

Pattern 9: Prompt Chaining

Use this pattern when a single prompt cannot reliably produce your target output and you can decompose the task into sequential transforms where each step’s output is well-defined enough to be the next step’s input. If steps are independent rather than sequential, use parallel scatter-gather instead. For example, ChainActor in the TypeScript orchestrator executes multi-step sequential transforms. Each step receives the output of the previous step:

// From llm_workflow_orchestrator_actor.ts — ChainActor.onExecute_chain()
onExecute_chain(payload: Record<string, unknown>): Record<string, unknown> {
    const steps = Array.isArray(payload.steps)
      ? (payload.steps as string[])
      : ["summarize", "extract_keywords", "format_output"];
    let currentContent = String(payload.content ?? "");
    const stepResults: Record<string, unknown>[] = [];

    for (const step of steps) {
        const stepStart = host.nowMs();
        let transformed = currentContent;
        if (step === "summarize") {
            transformed = currentContent.length > 200
              ? currentContent.slice(0, 200) + "... [summarized]" : currentContent;
        } else if (step === "extract_keywords") {
            const words = currentContent.replace(/[^a-zA-Z\s]/g, "").split(/\s+/)
              .filter((w) => w.length > 5);
            transformed = [...new Set(words)].slice(0, 5).join(", ");
        } else if (step === "format_output") {
            transformed = JSON.stringify({ step_count: stepResults.length + 1,
              content: currentContent, processed: true });
        }
        stepResults.push({ step, latency_ms: host.nowMs() - stepStart });
        currentContent = transformed;
    }
    return { steps_completed: steps.length, final_output: currentContent };
}

Each step is pluggable. Add a translate step, a classify step, a fact_check step — the chain executor handles it without structural changes.

Pattern 10: Routing

Routing is one of the most important agentic patterns (see the full taxonomy here). You can use this pattern when you have specialist agents (or models) that each handle a category of input better than a single general agent, and you need a stateful, observable dispatch layer rather than ad hoc if/else logic scattered across your orchestration code. For example, a routing actor classifies the input, selects the appropriate specialist, and dispatches, all in one stateful actor that tracks routing decisions in Prometheus. RouterActor in the TypeScript orchestrator. Note that onInit uses TupleSpace registration, not process groups, so sibling discovery is deterministic:

// From llm_workflow_orchestrator_actor.ts — RouterActor
protected override onInit(config: Record<string, unknown>): void {
    this.state.actorId = String(config.actor_id ?? "");
    // TupleSpace write-once registration — supervisor instance wins
    tsRegisterService("router", this.state.actorId);
}

onRoute(payload: Record<string, unknown>): Record<string, unknown> {
    const content = String(payload.content ?? "");
    const lower = content.toLowerCase();
    let route: string;
    if (lower.includes("summarize") || content.length < 100) {
        route = "summarize";
    } else if (lower.includes("extract") || lower.includes("entities")) {
        route = "extract";
    } else if (lower.includes("analyze") || lower.includes("compare")) {
        route = "analyze";
    } else {
        route = "generate";
    }
    this.state.routingDecisions += 1;
    this.state.routes[route] = (this.state.routes[route] ?? 0) + 1;
    return { route, task_type: route, content, routing_id: host.nowMs() };
}

The OrchestratorWorkflow resolves sibling targets at onInit via TupleSpace discovery, then uses them throughout the workflow run without re-discovery:

// From llm_workflow_orchestrator_actor.ts — OrchestratorWorkflow.onInit()
protected override onInit(config: Record<string, unknown>): void {
    // Resolve once at init — TupleSpace discovery is consistent
    this.state.routerTarget = siblingActorTarget("router");
    this.state.chainTarget = siblingActorTarget("chain");
    this.state.judgeTarget = siblingActorTarget("judge");
}

In production, replace keyword matching with a lightweight classifier model. The router actor holds the classifier in its state (loaded once in getDefaultState()), just like the inference worker holds the LLM. The dispatch logic stays unchanged — swap the classification algorithm without touching the routing architecture.

Pattern 11: Reflection and LLM-as-Judge

Use this pattern when output quality is highly variable and you can define a numeric score threshold that separates acceptable from unacceptable responses. For example, the OrchestratorWorkflow implements the reflection loop. It chains generation (via ChainActor) with scoring (via JudgeActor) and refines until the score threshold is met or max iterations is reached:

// From llm_workflow_orchestrator_actor.ts — OrchestratorWorkflow.run()
for (let iter = 0; iter <= maxIterations; iter++) {
    const judgeRes = host.ask(this.state.judgeTarget, "evaluate",
        { content: currentContent, original_query: content }, 10000) as Record<string, unknown>;
    const score = Number(judgeRes.score ?? 0);
    finalScore = score;
    finalResult = currentContent;

    if (score >= scoreThreshold || iter >= maxIterations) { break; }

    // Refine: re-chain with iteration note
    this.state.iterationCount += 1;
    currentContent = `Refined attempt ${this.state.iterationCount}: ${content}`;
    const refinedChain = host.ask(this.state.chainTarget, "execute_chain",
        { content: currentContent }, 15000) as Record<string, unknown>;
    currentContent = String(refinedChain.final_output ?? currentContent);
}
// Store result in TupleSpace for cross-actor access — other actors can pattern-match
host.ts.write(["orchestrator", "result", this.state.taskId, this.state.finalScore, host.nowMs()]);

The TupleSpace write at the end is important: other actors (the PipelineAuditActor, a downstream consumer) can read the final result by pattern-matching on ["orchestrator", "result", taskId, ...] without polling or shared memory. This is the Linda coordination model applied to agent result sharing.

Pattern 12: Exception Handling with Circuit Breaker FSM

Use this pattern when your agents call downstream services (LLM providers, external APIs) that are occasionally unavailable, and an indefinite block on a failed call would cascade into pipeline-wide stalls. The circuit breaker converts an unresponsive dependency into a fast, predictable failure. For example, the GeneratorActor in Go implements a circuit breaker with three states. This directly addresses the FLP liveness problem: when a downstream LLM is unavailable (crashed, rate-limited), the circuit breaker converts an indefinite block into a fast fail, preserving system liveness.

// From agentic_rag_pipeline_actor.go — GeneratorActor.generate()
if gen.CircuitOpen {
    return marshal(map[string]any{
        "answer": "Service temporarily unavailable. Please try again later.",
        "model": "circuit-breaker-fallback", "circuit_open": true,
    })
}

for attempt := 0; attempt <= maxRetries; attempt++ {
    answer, err := gen.tryGenerate(query, contextChunks)
    if err == "" {
        gen.ConsecutiveFailures = 0
        return marshal(map[string]any{"answer": answer, "circuit_open": false})
    }
    gen.ConsecutiveFailures++
    if gen.ConsecutiveFailures >= 3 {
        gen.CircuitOpen = true
        return marshal(map[string]any{"error": "circuit opened", "circuit_open": true})
    }
}

Three consecutive failures open the circuit. The fallback message is immediate. The reset_circuit handler closes it again after recovery. No external circuit breaker library. The actor IS the circuit breaker and it persists its open/closed state via the durability facet, so a node restart doesn’t incorrectly re-open a circuit that was deliberately closed.

Pattern 13: Evol-Instruct with Prompt Mutation for Dataset Augmentation

Use this pattern when you are fine-tuning a model and your prompt dataset is too small or not diverse enough. Run this pattern to generate mutation candidates, score them with a judge, and keep the top performers. For example, ChainActor.onEvolve_instruction() mutates prompts to generate diverse training data:

// From llm_workflow_orchestrator_actor.ts — ChainActor.onEvolve_instruction()
onEvolve_instruction(payload: Record<string, unknown>): Record<string, unknown> {
    const instruction = String(payload.instruction ?? "");
    const mutations = Number(payload.mutations ?? 2);
    let evolved = instruction;
    let count = 0;
    if (mutations >= 1) { evolved = "Please explain in detail: " + evolved; count += 1; }
    if (mutations >= 2) { evolved = evolved + " Provide examples."; count += 1; }
    if (mutations >= 3) {
        const synonyms: Record<string, string> = { good: "excellent", use: "utilize", show: "demonstrate" };
        for (const [word, syn] of Object.entries(synonyms)) {
            evolved = evolved.replace(new RegExp(`\\b${word}\\b`, "gi"), syn);
        }
        count += 1;
    }
    return { original: instruction, evolved, mutations_applied: count };
}

Chain this with a judge: generate 10 mutations, score each, keep the top 3. Ship them as training examples. The ChainActor state tracks how many evolutions it has produced, so you can throttle and monitor via Prometheus.


Part 6: Scaling Patterns

This is why PlexSpaces was built, e.g., how do you scale AI inference across 16 nodes without writing a distributed systems PhD thesis? Ray solves it with remote functions. Horovod solves the AllReduce piece. Spark solves the batch piece. But they’re three separate frameworks with three separate observability stacks and three separate deployment models. PlexSpaces gives you four parallelization mechanisms in the same framework, accessible from the same actor, using the same host.* API:

MechanismAPIUse CaseRay Equivalent
Shard Grouphost.scatter_gather()Stateful parallel workers, RAG shards, parameter serverray.map_batches() + Ray Actors
Elastic Poolhost.pool_checkout() / host.pool_checkin()Stateless workers, burst capacityray.remote() concurrency
MPI Collectiveshost.broadcast/reduce/allreduce/barrier_shard_group()Distributed training, gradient sync, consensusHorovod (external)
Process Groupshost.PG().Join/Broadcast/Members()Dynamic membership, pub-sub coordinationray.util.collective (partial)

The Python parallel_ai_inference demonstrates all four in one example. Run it with 2, 4, 8, or 16 shards and the BenchmarkActor measures throughput and latency at each level.

Pattern 14: Shard Groups for Stateful Parallelism

Use this pattern when your workload partitions naturally by key (documents by ID, users by hash) and each worker needs warm state across requests. For example, a model loaded in memory that should not be reloaded per request. If work is stateless and uniform, use elastic pools instead. The Python parallel_ai_inference below benchmark measures shard group throughput across 2, 4, 8, and 16 shards:

# From parallel_ai_inference_actor.py — BenchmarkActor.run_shard_benchmark()
for num_shards in shard_counts:
    group = host.create_shard_group({
        "group_id": f"bench-shard-{num_shards}-{host.now_ms()}",
        "actor_type": "inference_worker",
        "shard_count": num_shards,
        "partition_strategy": "hash",
        "placement": {"strategy": "from_registry"},
    })
    bench_start = host.now_ms()
    for i in range(requests_per_shard):
        response = host.scatter_gather({
            "group_id": group_id,
            "query": {"op": "infer", "request_id": f"bench-{num_shards}-{i}", "input": "sample-data"},
            "aggregation": "concat",
            "min_responses": num_shards,
            "timeout_ms": 30000,
        })
        for shard in _extract_shard_responses(response):
            payload = _unwrap_payload(shard.get("payload", {}))
            if payload.get("status") == "ok":
                latencies.append(int(payload.get("latency_ms", 0)))
    # ... compute throughput, p50, p99

Scaling (on my Apple M3 Pro):

ShardsTotalReqKB/reqWall msp50p95p99Compute msCoord msComp%GranEff%
2320256.0163101111447038.60.63100.0
4640256.0179111212878351.21.0591.1
81280256.01901112121768766.92.0285.8
162560256.025511121336712774.32.8963.9
325120256.046611141676426474.32.8935.0

Run parallel_ai_inference on your hardware to get real numbers and the BenchmarkActor outputs these metrics automatically. The key difference from Ray map_batches(): shard actors are stateful. The InferenceWorkerActor loads its model once in on_init and keeps it warm across requests. Ray’s stateless task model reloads the model on every batch.

Pattern 15: Elastic Pools

Use this pattern when your workload is stateless and bursty with no affinity requirement. Pools give you burst capacity without pre-partitioning; the virtual_actor facet shuts idle workers down automatically so you pay nothing at rest. The run_pool_benchmark handler in Python demonstrates dynamic checkout/checkin , a worker pool where requests lease actors, use them, and return them:

# From parallel_ai_inference_actor.py — BenchmarkActor.run_pool_benchmark()
for i in range(total_requests):
    checkout_start = host.now_ms()
    checkout = host.pool_checkout(pool_name, timeout_ms=5000)
    wait_ms = host.now_ms() - checkout_start

    if not checkout:
        failed += 1
        continue

    actor_id = checkout.get("actor_id")
    checkout_id = checkout.get("checkout_id")
    exec_start = host.now_ms()
    try:
        host.ask(actor_id, {"op": "infer", "request_id": f"pool-{i}", "input": "pool-sample"},
                 timeout_ms=10000)
        exec_ms = host.now_ms() - exec_start
        exec_times.append(exec_ms)
        successful += 1
    finally:
        host.pool_checkin(pool_name, actor_id, checkout_id, healthy=(failed == 0))

The pool tracks avg_wait_ms, avg_exec_ms, and pool_utilization. When utilization exceeds a threshold, the supervisor spawns additional pool workers. When it drops, idle workers deactivate via the virtual_actor facet and you pay zero at rest. Shard groups vs elastic pools: use shard groups when work partitions naturally (documents by ID, users by hash). Use pools when work is uniform and you want burst capacity without pre-partitioning.

Pattern 16: MPI Collectives

You can use MPI collective when you are running distributed training or gradient synchronization across multiple workers and need AllReduce, Barrier, or Broadcast semantics without pulling in a separate framework like Horovod. Also use for any distributed computation where all workers must agree on a shared value before proceeding to the next step. This is the capability that separates PlexSpaces from every other actor framework: native MPI-grade collective operations. Five collective operations, built in, available in Python, Go, Rust, and TypeScript.

# From parallel_ai_inference_actor.py — BenchmarkActor.run_collective_benchmark()
# 1. BroadcastShardGroup — config reset to all workers (MPI_Bcast equivalent)
t0 = host.now_ms()
broadcast_result = host.broadcast_shard_group({
    "group_id": group_id, "message": {"op": "reset"},
    "min_acks": num_shards, "timeout_ms": 10000,
})
timings["broadcast_ms"] = host.now_ms() - t0

# 2. BarrierShardGroup — wait for all workers to be ready (MPI_Barrier)
t0 = host.now_ms()
barrier_result = host.barrier_shard_group({"group_id": group_id, "timeout_ms": 10000})
timings["barrier_ms"] = host.now_ms() - t0

# 3. ReduceShardGroup — aggregate inference stats (MPI_Reduce with sum)
t0 = host.now_ms()
reduce_result = host.reduce_shard_group({
    "group_id": group_id, "map_function": {"op": "get_metrics"},
    "reduction": "sum", "timeout_ms": 10000,
})
timings["reduce_ms"] = host.now_ms() - t0

# 4. AllReduceShardGroup — consensus metrics across all workers (MPI_Allreduce)
t0 = host.now_ms()
allreduce_result = host.all_reduce_shard_group({
    "group_id": group_id, "map_function": {"op": "get_metrics"},
    "reduction": "sum", "timeout_ms": 10000,
})
timings["allreduce_ms"] = host.now_ms() - t0

What each operation does in AI/ML context:

OperationAPIML Use CaseMPI Equivalent
BroadcastShardGrouphost.broadcast_shard_group()Push updated model weights to all workersMPI_Bcast
BarrierShardGrouphost.barrier_shard_group()Synchronize all workers before next training stepMPI_Barrier
ReduceShardGrouphost.reduce_shard_group()Aggregate gradients from all workers -> coordinatorMPI_Reduce
AllReduceShardGrouphost.all_reduce_shard_group()Every worker gets the aggregated gradient (Ring AllReduce)MPI_Allreduce
ScatterGatherhost.scatter_gather()Fan-out inference requests, fan-in resultsMPI_Scatter + MPI_Gather

Ray needs Horovod for AllReduce, and Horovod is Python-only, requires NCCL, and runs as a separate job. PlexSpaces bakes all five collectives into the actor runtime, in all four languages, accessible from the same host.* API you use for everything else.

Pattern 17: Resource-Aware Cost Optimization

Use this pattern when you serve multiple tenants with different budgets and need to enforce financial limits at the infrastructure level. For example, BudgetManagerActor in Go tracks per-tenant USD spending across all inference calls. The state lives in TupleSpace and shared across all actor instances, race-safe via take-then-write:

// From resource_aware_inference_actor.go — BudgetManagerActor.getReport()
// State is in TupleSpace, not per-actor KV — all instances see the same data
func (b *BudgetManagerActor) getReport() string {
    // ReadAll matches pattern ["budget", tenantID, value] across all tenants
    tuples := host.TS().ReadAll([]any{"budget", nil, nil})
    report := make([]any, 0, len(tuples))
    for _, tup := range tuples {
        if len(tup) < 3 { continue }
        tenantID, _ := tup[1].(string)
        budgetUSD := b.tsReadBudgetFloat("budget", tenantID)
        usedCost := b.tsReadBudgetFloat("usage_cost", tenantID)
        report = append(report, map[string]any{
            "tenant_id": tenantID, "budget_usd": budgetUSD,
            "used_usd": usedCost, "remaining_usd": budgetUSD - usedCost,
        })
    }
    return marshal(map[string]any{"status": "ok", "report": report})
}

The model registry selects tier based on complexity AND remaining budget, large model for complex prompts when budget allows, fall back to small model when budget is tight. The resource-affinity side lives in app-config.toml:

# From resource_aware_inference/app-config.toml
[[supervisor.children]]
id = "inference_worker_large"
type = "inference_worker_large"
behavior_kind = "GenServer"
facets = [
  { type = "virtual_actor", priority = 100,
    config = { idle_timeout = "15m", activation_strategy = "lazy",
               labels = { tier = "large", gpu_capable = "true", memory_tier = "high" } } },
  { type = "metrics", priority = 50 }
]
args = { tier = "large", base_latency_ms = "400" }

Set gpu_capable = "true" on GPU nodes. The ModelRegistryActor.select_model() checks the prefer_gpu flag from the request and routes accordingly. Large-tier workers with gpu_capable = "true" get routed GPU-heavy requests. CPU workers handle small and medium requests. The BudgetFSM enforces the financial ceiling, no matter how capable the GPU, if the tenant budget is exhausted, requests get budget_exceeded before any GPU cycles are wasted.


Part 7: Agent Patterns

Pattern 18: Tool Calling and MCP Integration

Use this pattern when your agents need to call external tools (search APIs, databases) and you want those tools to be stateful, fault-tolerant, and observable as first-class actors rather than raw HTTP calls that fail silently and leave no audit trail. For example, the Python mcp_tool_server implements full MCP (Model Context Protocol) tool calling via actors. Each MCP tool is an actor. The registry is an actor. The gateway is a workflow actor.

# From mcp_tool_server_actor.py — ToolRegistryActor.tools_call()
@handler("tools_call")
def tools_call(self, tool_name: str = "", input: dict = None) -> dict:
    if tool_name not in self.tools:
        return {"error": "tool_not_found", "available_tools": list(self.tools.keys())}

    # Validate required fields from JSON schema
    schema = self.tools[tool_name]
    required_fields = schema.get("inputSchema", {}).get("required", [])
    missing = [f for f in required_fields if f not in input]
    if missing:
        return {"error": "missing_required_fields", "missing": missing}

    # Route to specialist tool actor — location transparent
    target_actor = {"calculator": "calculator_tool", "search": "search_tool",
                    "weather": "weather_tool"}.get(tool_name, tool_name)
    self.invocation_counts[tool_name] = self.invocation_counts.get(tool_name, 0) + 1
    try:
        return host.ask(target_actor, "execute", input, timeout_ms=10000)
    except Exception as exc:
        self.error_counts[tool_name] = self.error_counts.get(tool_name, 0) + 1
        return {"error": "tool_execution_failed", "tool": tool_name, "message": str(exc)}

What standalone MCP servers lack: built-in state (registry survives restarts), multi-tenant access control (tenant namespace validation), Prometheus metrics (invocation counts, error rates, latency), and fault tolerance (supervisor tree restarts crashed tool actors). Actors provide all four for free.

Pattern 19: Multi-Agent Collaboration and A2A

Use this pattern when a single agent’s context window or capability set is insufficient for the full task, and you need specialist agents to collaborate with explicit coordination. Use TupleSpace result sharing rather than shared memory; it makes the coordination auditable and race-free. For example, the Go a2a_multi_agent shows a complete multi-agent system with dynamic agent discovery and TupleSpace coordination. Critically, it uses the same TupleSpace patterns that solve the coordination problem identified in the FLP analysis and write results to addressable slots, never share memory directly:

// From a2a_multi_agent_actor.go — OrchestratorAgent.Run()
// Step 1: Discover research agents by capability
discoverResp, err := host.Ask(registryID, "discover", map[string]any{
    "capabilities": []string{"research"},
}, 10000)
researchAgentID := o.pickFirstAgent(discoverResp, selfID, "research_agent")

// Step 2: Delegate research
researchResp, err := host.Ask(researchAgentID, "research", map[string]any{
    "topic": task, "depth": 1,
}, 10000)

// Store in TupleSpace — other agents can read without polling or shared state
researchJSON, _ := json.Marshal(researchResp)
_ = host.TS().Write([]any{"task", taskID, "step", "research", string(researchJSON)})

// ... delegate to analysis and writing agents, each storing to TupleSpace

// Step 7: Aggregate all results from TupleSpace — pattern match retrieves all steps
allResults := host.TS().ReadAll([]any{"task", taskID, "step", nil, nil})

Location transparency is the critical insight for multi-agent systems. When OrchestratorAgent calls host.Ask(researchAgentID, "research", ...), it does not care whether the research agent is on the same node, a different node in the same cluster, or a different cluster entirely. The framework routes transparently.

Pattern 20: Batch Inference Pipeline

Use this pattern you need to process a large, bounded dataset through an inference pipeline as efficiently as possible like nightly jobs, model evaluation runs, bulk document processing. The Broadcast -> Barrier -> Scatter-Gather -> Reduce sequence maps directly to the initialization and execution steps of a distributed training or batch scoring job. For example, the parallel_ai_inference OrchestratorWorkflow runs multi-mode parallel inference:

# From parallel_ai_inference_actor.py — OrchestratorWorkflow._run_collective_mode()
# Broadcast -> Barrier -> Scatter-Gather -> Reduce
host.broadcast_shard_group({
    "group_id": group_id, "message": {"op": "reset"}, "min_acks": num_shards
})
host.barrier_shard_group({"group_id": group_id, "timeout_ms": 10000})

response = host.scatter_gather({
    "group_id": group_id,
    "query": {"op": "infer", "request_id": "collective-infer-0", "input": "collective-input"},
    "aggregation": "concat", "min_responses": num_shards,
})

host.reduce_shard_group({
    "group_id": group_id, "map_function": {"op": "get_metrics"}, "reduction": "sum"
})

Four operations in sequence: reset all workers (broadcast), synchronize (barrier), run inference (scatter-gather), collect metrics (reduce). This is exactly the initialization sequence for a distributed training step and it runs in one actor, in Python, in the same framework as the REST endpoint that triggered the inference.

Pattern 21: Async Agent Sessions

Use this pattern when your agents need to outlive the HTTP connection that triggered them such as background tasks, scheduled routines, multi-device handoff, or multi-user collaboration on a single agent session. For example, a synchronous HTTP/SSE transport couples the agent’s work lifetime to the connection lifetime.

ScenarioHTTP/SSE Failure ModePlexSpaces Solution
Agent outlives the callerResults stored in DB; client must polldurability facet + Workflow Actor: state survives node restart, client reconnects and reads result from TupleSpace
Agent pushes unpromptedMust email or Slack out-of-bandChannels primitive (Kafka/Redis/SQS backends): agent publishes to channel, subscriber receives regardless of original connection state
Caller changes deviceRequires custom session backendvirtual_actor + TupleSpace session state: agent is location-transparent, new device connects to same logical session
Multiple humans in one sessionNot supported nativelyProcess Groups + Broadcast: all session participants join a group; agent broadcasts to all members

PlexSpaces addresses both problems without external dependencies:

  • Durable state: actor-local KV + durability facet checkpointing + TupleSpace for shared session data
  • Durable transport: Channels primitive with six durable backends (Kafka, Redis, SQS, PostgreSQL, and others) — the agent writes to a channel, the subscriber reads from it regardless of whether the two were ever simultaneously connected
# Agent side — write result to durable channel when work completes
# No assumption that any client is currently connected
@workflow_actor(facets=["virtual_actor", "durability"])
class BackgroundResearchAgent:
    session_id: str = state(default="")
    
    @run_handler
    def start(self, request: dict = None) -> dict:
        # Do expensive, long-running work
        result = self._run_research(request.get("topic", ""))
        
        # Publish to named channel — durable, no connection required
        host.channel_publish(f"session:{self.session_id}:results", {
            "status": "complete",
            "result": result,
            "ts": host.now_ms()
        })
        
        # Also write to TupleSpace — any device reconnecting can pull directly
        host.ts_write(["session", self.session_id, "result", host.now_ms()])
        return {"status": "accepted", "session_id": self.session_id}
# Client side — subscribe to channel; survives disconnect/reconnect
# Works identically whether the client is a browser, mobile app, or another agent
subscriber = host.channel_subscribe(f"session:{session_id}:results")
# Blocks until a message arrives — no polling loop, no session URL
result = subscriber.next(timeout_ms=300_000)

The critical difference from the Anthropic and Cloudflare hosted approaches: this runs on your infrastructure, in your cluster, with your data. There is no proprietary session backend you are locked into. The Channels primitive is a configuration choice and you can swap Kafka for Redis for SQS without touching agent code.


Part 8: The Distributed Systems Case for the Actor Model

Why Formal Coordination Protocol Matters

The FLP theorem and Byzantine bounds are mathematical facts, not engineering challenges to be optimized away. In distributed systems, we don’t try to make all nodes infallible, we design protocols that tolerate failures like Zab (ZooKeeper), Raft, PBFT. The actor model applies the same principle to AI agents:

  1. Accept that agents crash: host.monitor() + supervisor restart strategies
  2. Accept that agents misinterpret: external validation via ValidatorActor + structured retry
  3. Accept that messages can be delayed: async host.Ask() with timeout + circuit breaker
  4. Accept shared state is dangerous: TupleSpace coordination instead of direct state sharing
  5. Accept that consensus is expensive: explicit checkpointing so you don’t re-run completed work

None of these require smarter models. They require the right coordination infrastructure.

What Makes the Actor Model the Right Foundation

The actor model, as implemented in PlexSpaces, gives you exactly the properties that distributed systems theory says you need for safe multi-agent coordination:

Distributed Systems PropertyActor Model MechanismPlexSpaces API
Failure atomicity without partial state corruptionPer-actor isolated stateActor KV + TupleSpace
Failure detection know when a peer crashesLink + Monitorhost.monitor(), host.link()
Crash recovery restart from last good stateJournaled checkpointingdurability facet
Consensus without shared memoryMessage passing onlyhost.Ask(), host.Send()
Coordination without deadlockLinda model TupleSpacehost.ts.write/read/take()
Liveness under partial failureSupervisor treeone_for_one, rest_for_one strategies
Byzantine isolationNo cross-actor direct state accessActor boundaries enforced by WASM sandbox
External validationStandalone validator actorsValidatorActor + retry loop pattern

Framework Comparison

PlexSpacesRaySparkHorovodLambda + SQS
Cold start~50 microsecond (WASM AOT)~100ms (Python)~10s (JVM)N/A100ms–10s
Worker stateActor-local, durableExternal storeShuffleStatelessStateless
Ring AllReduceNativeNeeds HorovodNoYesNo
Workflow durabilityPer-stage checkpointNoNoNoStep Functions
MPI collectives5 ops built-inNoNoPartialNo
Multi-tenancyBuilt-in, JWTNoNoNoIAM per function
MCP tool callingActor-nativeNoNoNoNo
A2A multi-agentTupleSpace + registryNoNoNoNo
Durable async transportChannels (6 backends)NoNoNoSQS only
Failure detectionmonitor() + supervisorLimitedNoNoDLQ
PolyglotPython, Go, Rust, TypeScriptPython primarilyJVM + PySparkPython/C++Any FaaS
APP-file deployYes, multi-app per nodeNoNoNoPer-function
Ecosystem maturityEarly-stage; smaller community and fewer third-party integrationsLarge ML ecosystem, extensive documentationMassive data engineering ecosystemNarrow but well-understoodAWS-native, excellent managed ops
Learning curveHigh: new coordination model, four-language SDK, WASM packagingMedium: Python-first, familiar to ML teamsMedium for PySpark, high for ScalaLow if you know PyTorchLow: functions are simple, AWS handles ops
Best fitStateful polyglot agent systems with strict coordination, isolation, and durability requirementsLarge-scale stateless Python ML workloads; teams already on RayBatch ETL and analytics at petabyte scaleDistributed deep learning gradient syncLightweight serverless event processing; AWS-native shops
Avoid whenYour team is Python-only and already invested in Ray or other similar frameworksYou need stateful actors with durability, strict multi-tenancy, or non-Python languagesYou need low-latency online serving or stateful agentsYou need anything beyond gradient synchronizationYou need stateful workflows, complex coordination, or multi-tenant isolation

Conclusion

Every pattern in this post is ultimately the same argument applied to a different surface area: accept the mathematical constraints of distributed systems rather than pretending they dissolve when the nodes are language models instead of databases. The FLP theorem does not care that your consensus participants are generating text. Byzantine fault tolerance does not care that the incorrect messages are hallucinated API shapes instead of corrupted packets. The constraints are identical like the need for isolated state, explicit coordination, crash detection, and external validation.

The actor model has provided exactly those properties since the 1970s. What’s new is the workload, not the substrate. The 20+ patterns in this post cover the full spectrum from single-agent durability to 10,000-agent distributed coordination. They all reduce to four primitives applied consistently:

  • FLP safety: isolated actor state, message-only communication, no shared memory corruption
  • FLP liveness: supervision trees, host.monitor() crash detection, durability facet checkpointing
  • Byzantine isolation: external ValidatorActor, WASM sandbox per actor, structured retry
  • Coordination without deadlock: TupleSpace write-once registration, Linda-model result sharing, Channels for durable async transport

The gap between “one agent that works in a demo” and “ten thousand agents that work at 3 AM on a Tuesday when two nodes are down and one tenant’s budget is exhausted” is not a gap that better prompts or bigger models close. It’s a distributed systems engineering problem, and it has distributed systems solutions. That’s what PlexSpaces is built around and it’s why the actor model, fifty years after its introduction, is still the right foundation.


GitHub: github.com/bhatti/PlexSpaces

Previous posts in this series:

November 4, 2020

Structured Concurrency in modern programming languages – Part-II

Filed under: Computing,Erlang,Languages — admin @ 8:46 pm

In this second part of the series on structured concurrency (Part-I, Part-III, Part-IV, Swift-Followup), I will review Elixir and Erlang languages for writing concurrent applications and their support for structured concurrency:

Erlang

The Erlang language was created by late Joe Armstrong when he worked at Ericsson and it is designed for massive concurrency by means of very light weight processes that are based on actors. Each process has its own mailbox for storing incoming messages of various kinds. The receive block in Erlang is triggered upon new message arrival and the message is removed and executed when it matches specific pattern match. The Erlang language uses supervisors for monitoring processes and immutable functional paradigm for writing robust concurrent systems. Following is high-level architecture of Erlang system:

As the cost of each process or actor is only few hundred bytes, you can create millions of these processes for writing highly scalable concurrent systems. Erlang is a functional language where all data is immutable by default and the state within each actor is held private so there is no shared state or race conditions.

An actor keeps a mailbox for incoming messages and processes one message at a time using the receive API. Erlang doesn’t provide native async/await primitives but you can simulate async by sending an asynchronous message to an actor, which can then reply back to the sender using its process-id. The requester process can then block using receive API until reply is received. Erlang process model has better support for timeouts with receive API to exit early if it doesn’t receive response within a time period. Erlang system uses the mantra of let it crash for building fault tolerant applications and you can terminate a process and all children processes connected.

Using actor model in Erlang

Following code shows how native send and receive primitives can be used to build the toy web crawler:

-module(erlcrawler).

-export([start_link/0, crawl_urls/3, total_crawl_urls/1]).

-record(request, {clientPid, ref, url, depth, timeout, created_at=erlang:system_time(millisecond)}).
-record(result, {url, status=pending, child_urls=0, started_at=erlang:system_time(millisecond), completed_at, error}).

-define(MAX_DEPTH, 4).
-define(MAX_URL, 11).
-define(DOMAINS, [
  "ab.com",
  "bc.com",
  "cd.com",
  "de.com",
  "ef.com",
  "fg.com",
  "yz.com"]).

make_request(ClientPid, Ref, Url, Depth, Timeout) ->
    #request{clientPid=ClientPid, ref=Ref, url=Url, depth=Depth, timeout=Timeout}.

make_result(Req) ->
    Url = Req#request.url,
    #result{url=Url}.

%%% Client API
start_link() ->
    spawn_link(fun init/0).

%%%%%%%%%%%% public method for crawling %%%%%%%%%%%%
%%% calling private method for crawling
%%% Pid - process-id of actor
%%% 0 - current depth
%%% Urls - list of urls to crawl
%%% Timeout - max timeout
crawl_urls(Pid, Urls, Timeout) when is_pid(Pid), is_list(Urls)  ->
    %% Boundary for concurrency and it will not return until all
    %% child URLs are crawled up to MAX_DEPTH limit.
    do_crawl_urls(Pid, 0, Urls, [], Timeout, 0).

total_crawl_urls(Pid) when is_pid(Pid) ->
    Self = self(),
    Pid ! {total, Self},
    receive {total_reply, Self, N} ->
        N
    end.

%%% Server functions
init() ->
    {ok, DownloaderPid} = downloader:start_link(),
    {ok, IndexerPid} = indexer:start_link(),
    loop(DownloaderPid, IndexerPid, 0).

%%% Main server loop
loop(DownloaderPid, IndexerPid, N) ->
    receive
        {crawl, Req} ->
            CrawlerPid = self(),
            spawn_link(fun() -> handle_crawl(CrawlerPid, Req, DownloaderPid, IndexerPid) end),
            debug_print(N),
            loop(DownloaderPid, IndexerPid, N+1);
        {total, Pid} ->
            Pid ! {total_reply, Pid, N},
            loop(DownloaderPid, IndexerPid, N);
        terminate ->
            ok
    end.


%%% Internal client functions
debug_print(N) when N rem 10000 == 0 ->
    io:format("~p...~n", [{N}]);
debug_print(_) ->
    ok.

%% Go through URLs to crawl, send asynchronous request to crawl and
%% then add request to a list to monitor that will be used to receive
%% reply back from the crawling actor.
do_crawl_urls(_, _, [], [], _, ChildURLs) ->
    ChildURLs; % all done
do_crawl_urls(_, ?MAX_DEPTH, _, _, _, _) ->
    0; % reached max depth, stop more crawling
do_crawl_urls(Pid, Depth, [Url|T], SubmittedRequests, Timeout, 0) when is_pid(Pid), is_integer(Depth), is_integer(Timeout) ->
    %%% monitoring actor so that we are notified when actor process dies
    Ref = erlang:monitor(process, Pid),
    %%% crawling next url to process
    Req = make_request(self(), Ref, Url, Depth, Timeout),
    Pid ! {crawl, Req},
    do_crawl_urls(Pid, Depth, T, SubmittedRequests ++ [Req], Timeout, 0);
do_crawl_urls(Pid, Depth, [], [Req|T], Timeout, ChildURLs) when is_pid(Pid) ->
    %%% receiving response from the requests that were previously stored
    Ref = Req#request.ref,
    receive
        {crawl_done, Ref, Res} ->
            erlang:demonitor(Ref, [flush]),
            do_crawl_urls(Pid, Depth, [], T, Timeout, Res#result.child_urls+ChildURLs+1);
        {'DOWN', Ref, process, Pid, Reason} ->
            erlang:error(Reason)
    after Timeout ->
        erlang:error({crawl_timeout, Timeout})
    end.


%%% Internal server functions called by actor to process the crawling request
handle_crawl(CrawlerPid, Req, DownloaderPid, IndexerPid) ->
    Res = make_result(Req),
    ClientPid = Req#request.clientPid,
    Url = Req#request.url,
    Ref = Req#request.ref,
    Depth = Req#request.depth,
    Timeout = Req#request.timeout,

    case downloader:download(DownloaderPid, Url) of
        {ok, Contents} ->
        {ok, Contents1} = downloader:jsrender(DownloaderPid, Url, Contents),
        Changed = has_content_changed(Url, Contents1),
        Spam = is_spam(Url, Contents1),
        if Changed and not Spam ->
            indexer:index(IndexerPid, Url, Contents1), % asynchronous call
        Urls = parse_urls(Url, Contents1),
                %% Crawling child urls synchronously before returning
                ChildURLs = do_crawl_urls(CrawlerPid, Depth+1, Urls, [], Timeout, 0) + 1,
                Res1 = Res#result{completed_at=erlang:system_time(millisecond), child_urls=ChildURLs},
                ClientPid ! {crawl_done, Ref, Res1};
            true ->
                Res1 = Res#result{completed_at=erlang:system_time(millisecond)},
                ClientPid ! {crawl_done, Ref, Res1}
            end;
        Err ->
            Res1 = Res#result{completed_at=erlang:system_time(millisecond), error = Err},
            ClientPid ! {crawl_done, Ref, Res1}
        end,
    ok.

%%%%%%%%%%%%%%% INTERNAL METHODS FOR CRAWLING %%%%%%%%%%%%%%%%
parse_urls(_Url, _Contents) ->
    % tokenize contents and extract href/image/script urls
    random_urls(?MAX_URL).

random_urls(N) ->
    [random_url() || _ <- lists:seq(1, N)].

has_content_changed(_Url, _Contents) ->
     % calculate hash digest and compare it with last digest
    true.

is_spam(_Url, _Contents) ->
     % apply standardize, stem, ngram, etc for indexing
    false.

random_url() ->
    "https://" ++ random_domain() ++ "/" ++ random_string(20).

random_domain() ->
    lists:nth(random:uniform(length(?DOMAINS)), ?DOMAINS).

random_string(Length) ->
    AllowedChars = "abcdefghijklmnopqrstuvwxyz",
    lists:foldl(fun(_, Acc) -> [lists:nth(random:uniform(length(AllowedChars)), AllowedChars)] ++ Acc end, [], lists:seq(1, Length)).

In above implementation, crawl_urls method takes list of URLs and time out and waits until all URLs are crawled. It uses spawn_link to create a process, which invokes handle_crawl method to process requests concurrently. The handle_crawl method recursively crawl the URL and its children up to MAX_DEPTH limit. This implementation uses separate Erlang OTP processes for downloading, rendering and indexing contents. The handle_crawl sends back the response with number of child URLs that it crawled.

-module(erlcrawler_test).
-include_lib("eunit/include/eunit.hrl").

-define(ROOT_URLS, ["a.com", "b.com", "c.com", "d.com", "e.com", "f.com", "g.com", "h.com", "i.com", "j.com", "k.com", "l.com", "n.com"]).

crawl_urls_test() ->
    {spawn, {timeout,30, do_crawl_urls(10000)}}.

%% Testing timeout and by default, it will terminate the test process so we will instead convert
%% kill signal into a message using erlang:exit
crawl_urls_with_timeout_test() ->
    %%% crawling next url to process
    Started = erlang:system_time(millisecond),
    Timeout = 10, % We know that processing takes longer than 10 milliseconds
    Pid = erlcrawler:start_link(),
    process_flag(trap_exit, true),
    spawn_link(fun() ->
        erlcrawler:crawl_urls(Pid, ?ROOT_URLS, Timeout)
    end),
    {{crawl_timeout, _}, _} = receive
        {'EXIT', _, Reason} -> Reason
    after 1000 ->
        erlang:error(unexpected_timeout)
    end,
    Elapsed = erlang:system_time(millisecond) - Started,
    ?debugFmt("crawl_urls_with_timeout_test: timedout as expected in millis ~p ~n", [{Elapsed}]).

%% Testing terminate/cancellation and killing a process will kill all its children
crawl_urls_with_terminate_test() ->
    %%% crawling next url to process
    Started = erlang:system_time(millisecond),
    Pid = erlcrawler:start_link(),
    spawn_link(fun() ->
        erlcrawler:crawl_urls(Pid, ?ROOT_URLS, 1000) % crawl_urls is synchronous method so calling in another process
    end),
    receive
    after 15 -> % waiting for a bit before terminating (canceling) process
        exit(Pid, {test_terminated})
    end,
    {test_terminated} = receive
        {'EXIT', Pid, Reason} -> Reason
    after 200 ->
        erlang:error(unexpected_timeout)
    end,
    Elapsed = erlang:system_time(millisecond) - Started,
    ?debugFmt("crawl_urls_with_terminate_test: terminated as expected in millis ~p ~n", [{Elapsed}]).

do_crawl_urls(Timeout) ->
    Started = erlang:system_time(millisecond),
    Pid = erlcrawler:start_link(),
    N = erlcrawler:crawl_urls(Pid, ?ROOT_URLS, Timeout),
    N1 = erlcrawler:total_crawl_urls(Pid),
    Elapsed = erlang:system_time(millisecond) - Started,
    ?debugFmt("do_crawl_urls: Crawled URLs in millis: ~p ~n", [{N, N1, Elapsed}]),
    ?assertEqual(N1, 19032).

Above tests show three ways to try out the crawl_urls API. First test crawl_urls_test tests happy case of crawling URLs within 10 seconds. The crawl_urls_with_timeout_test tests the timeout behavior to make sure proper error message is returned and all Erlang processes are terminated. The crawl_urls_with_terminate_test tests cancellation behavior by terminating the main crawling process. You can download the full source code from https://github.com/bhatti/concurency-katas/tree/main/erl_actor.

Following are major benefits of using this process model to implement structured concurrency:

  • The main crawl_urls method defines high level scope of concurrency and it waits for the completion of child tasks.
  • crawl_urls method takes a timeout parameter so that the crawling all URLs must complete with the time period.
  • Erlang allows parent-child relationship between processes where you can monitor child processes and get notified when a child process dies. You can use this feature to cancel the asynchronous task. However, it will abruptly end all processes and all state within the process will be lost.
  • Erlang implementation captures the error within the response so the client can handle all error handling using pattern matching or other approach common in Erlang applications.

Following are shortcomings using this approach for structured concurrency:

  • The terminate API is not suitable for clean cancellation so you will need to implement a cooperative cancellation to persist any state or clean up underlying resources.
  • Though, you can combine processes in groups or parent child relationships manually but Erlang doesn’t give you a lot of flexibility to specify the context for execution.
  • Unlike async declared methods in Typescript, Erlang code is not easily composable but you can define client code to wrap send/receive messages so that high level code can be comprehended easily. Also, Erlang processes can be connected with parent-child relationships and you can manage composition via process-supervisor hierarchy.
  • Above code creates a new process for crawling each URL and though the overhead of each process is small but it may use other expensive resources such as network resource. We won’t use such approach for real crawler as it will strain the resources on the website being crawled. Instead, we may need to limit how many concurrent requests can be sent to a given website or maintain delay between successive requests.

Using pmap in Erlang

We can generalize above approach into a general purpose pmap that processes an array (similar to map function in functional languages) concurrently and then waits for their response such as:

-module(pmap).

-export([pmap/3]).

pmap(F, Es, Timeout) ->
   Parent = self(),
   Running = [exec(Parent, F, E) || E <- Es],
   collect(Running, Timeout).

exec(Parent, F, E) ->
    spawn_monitor(fun() -> Parent ! {self(), F(E)} end).

collect([], _Timeout) -> [];
collect([{Pid, MRef} | Next], Timeout) ->
  receive
    {Pid, Res} ->
      erlang:demonitor(MRef, [flush]),
      [{ok, Res} | collect(Next, Timeout)];
    {'DOWN', MRef, process, Pid, Reason} ->
      [{error, Reason} | collect(Next, Timeout)]
  after Timeout ->
    erlang:error({pmap_timeout, Timeout})
  end.

You can download full pmap example from https://github.com/bhatti/concurency-katas/tree/main/erl_pmap.

Elixir

The Elixir language is built upon Erlang BEAM VM and was created by Jose Valim to improve usability of Erlang language and introduce Rubyist syntax instead of Prologist syntax in Erlang language. It also removes some of the boilerplate that you needed in Erlang language and adds higher level abstractions for writing highly concurrent, distributed and fault tolerant applications.

Using a worker-pool and OTP in Elixir

As Elixir uses Erlang VM and runtime system, the application behavior will be similar to Erlang applications but following approach uses a worker pool design where the parent process keeps a list of child-processes and delegates the crawling work to child processes in a round-robin fashion:

defmodule Crawler do
  @max_depth 4

  @moduledoc """
  Documentation for Crawler.
  """

  ## Client API
  # {:ok, pid} = Crawler.start_link(100000)
  def start_link(size) when is_integer(size) do
    GenServer.start_link(__MODULE__, size)
  end

  def total_crawl_urls(pid) when is_pid(pid) do
    GenServer.call(pid, {:total_crawl_urls}, 30000)
  end

  ### Public client APIs
  def crawl_urls(pid, urls) when is_pid(pid) and is_list(urls) do
    ## Boundary for concurrency and it will not return until all
    ## child URLs are crawled up to MAX_DEPTH limit.
    crawl_urls(pid, urls, 0, self())
  end

  ### Internal client APIs
  def crawl_urls(pid, urls, depth, clientPid) when is_pid(pid) and is_list(urls) do
    if depth < @max_depth do
      requests = urls |> Enum.map(&(Request.new(&1, depth, clientPid)))
      requests |> Enum.map(&(GenServer.cast(pid, {:crawl, &1})))
    else
      :max_depth_exceeded
    end
  end

  ## init method create pool of workers based on given size
  def init(size) when is_integer(size) do
    Process.flag(:trap_exit, true)
    pid_to_workers = 0..size |> Enum.map(&child_spec/1)
    |> Enum.map(&start_child/1)
    |> Enum.into(%{})
    pids = Map.keys(pid_to_workers)
    {:ok, {pid_to_workers, pids, 0}}
  end

  ## handles crawling
  def handle_cast({:crawl, request}, {pid_to_workers, [pid|rest], total_in}) do
    GenServer.cast(pid, {:crawl, request}) # send request to workers in round-robin fashion
    {:noreply, {pid_to_workers, rest ++ [pid], total_in+1}}
  end

  def handle_call({:total_crawl_urls}, _from, {_, _, total_in} = state) do
    {:reply, total_in, state}
  end

  ## OTP Callbacks
  def handle_info({:EXIT, dead_pid, _reason}, {pid_to_workers, _, total_in}) do
    # Start new process based on dead_pid spec
    {new_pid, child_spec} = pid_to_workers
    |> Map.get(dead_pid)
    |> start_child()

    # Remove the dead_pid and insert the new_pid with its spec
    new_pid_to_workers = pid_to_workers
    |> Map.delete(dead_pid)
    |> Map.put(new_pid, child_spec)
    pids = Map.keys(new_pid_to_workers)
    {:noreply, {new_pid_to_workers, pids, total_in}}
  end

  ## Defines spec for worker
  defp child_spec(_) do
    {Worker, :start_link, [self()]}
  end

  ## Dynamically create child
  defp start_child({module, function, args} = spec) do
    {:ok, pid} = apply(module, function, args)
    Process.link(pid)
    {pid, spec}
  end

end

The parent process in above example defines crawl_urls method for crawling URLs, which is defined as an asynchronous API (handle_cast) and forwards the request to next worker. Following is implementation of the worker:

defmodule Worker do
  @moduledoc """
  Documentation for crawling worker.
  """
  @max_url 11
  @domains [
    "ab.com",
    "bc.com",
    "cd.com",
    "de.com",
    "yz.com"]
  @allowed_chars "abcdefghijklmnopqrstuvwxyz"

  use GenServer

  # Client APIs
  def start_link(crawler_pid) when is_pid(crawler_pid) do
    {:ok, downloader_pid} = Downloader.start_link()
    {:ok, indexer_pid} = Indexer.start_link()
    GenServer.start_link(__MODULE__, {crawler_pid, downloader_pid, indexer_pid})
  end

  @doc """
  Crawls web url asynchronously
  """
  def handle_cast({:crawl, request}, {crawler_pid, downloader_pid, indexer_pid}=state) do
    handle_crawl(crawler_pid, downloader_pid, indexer_pid, request)
    {:noreply, state}
  end

  def init(crawler_pid) do
      {:ok, crawler_pid}
  end

  # Internal private methods
  defp handle_crawl(crawler_pid, downloader_pid, indexer_pid, req) do
    res = Result.new(req)
    contents = Downloader.download(downloader_pid, req.url)
    new_contents = Downloader.jsrender(downloader_pid, req.url, contents)
    if has_content_changed(req.url, new_contents) and !is_spam(req.url, new_contents) do
      Indexer.index(indexer_pid, req.url, new_contents)
      urls = parse_urls(req.url, new_contents)
      Crawler.crawl_urls(crawler_pid, urls, req.depth+1, req.clientPid)
      send req.clientPid, {:crawl_done, Result.completed(res)}
    else
      send req.clientPid, {:crawl_done, Result.failed(req, :skipped_crawl)}
    end
  end

  defp parse_urls(_Url, _Contents) do
    # tokenize contents and extract href/image/script urls
    random_urls(@max_url)
  end

  defp random_urls(n) do
    1..n |> Enum.map(&(random_url/1))
  end

  defp has_content_changed(_url, _contents) do
    # calculate hash digest and compare it with last digest
    true
  end

  defp is_spam(_url, _contents) do
    # apply standardize, stem, ngram, etc for indexing
    false
  end

  defp random_url(_) do
    "https://" <> random_domain() <> "/" <> random_string(20)
  end

  defp random_domain() do
    Enum.random(@domains)
  end

  defp random_string(n) do
    1..n
    |> Enum.reduce([], fn(_, acc) -> [Enum.random(to_charlist(@allowed_chars)) | acc] end)
    |> Enum.join("")
  end
end

The worker process starts downloader and indexer processes upon start and crawls the URL upon receiving the next request. It then sends back the response to the originator of request using process-id in the request. Following unit tests are used to test the behavior of normal processing, timeouts and cancellation:

defmodule CrawlerTest do
  use ExUnit.Case
  doctest Crawler
  @max_processes 10000
  @max_wait_messages 19032
  @root_urls ["a.com", "b.com", "c.com", "d.com", "e.com", "f.com", "g.com", "h.com", "i.com", "j.com", "k.com", "l.com", "n.com"]

  test "test crawling urls" do
    started = System.system_time(:millisecond)
    {:ok, pid} = Crawler.start_link(@max_processes)
    Crawler.crawl_urls(pid, @root_urls)
    wait_until_total_crawl_urls(pid, @max_wait_messages, started)
  end

  defp wait_until_total_crawl_urls(pid, 0, started) do
    n = Crawler.total_crawl_urls(pid)
    elapsed = System.system_time(:millisecond) - started
    IO.puts("Crawled URLs in millis: #{n} #{elapsed}")
    assert n >= @max_wait_messages
  end

  defp wait_until_total_crawl_urls(pid, max, started) do
    if rem(max, 1000) == 0 do
      IO.puts("#{max}...")
    end
    receive do
      {:crawl_done, _} -> wait_until_total_crawl_urls(pid, max-1, started)
    end
  end

end

Following are major benefits of this approach for its support of structured concurrency:

  • The crawl_urls method in parent process defines high level scope of concurrency and it waits for the completion of child tasks.
  • Above implementation also uses timeout similar to the Erlang example to ensure task is completed within given time period.
  • Above implementation also captures the error within the response similar to Erlang for error handling.
  • This approach addresses some of the shortcomings of previous approach in Erlang implementation where a new process was created for each request. Instead a pool of process is used to manage the capacity of resources.

Following are shortcomings using this approach for structured concurrency:

  • This approach also suffers the same drawbacks as Erlang approach regarding cancellation behavior and you will need to implement a cooperative cancellation to cleanup the resources properly.
  • Similar to Erlang, Elixir also doesn’t give you a lot of flexibility to specify the context for execution and it’s not easily composable.

Using async-await in Elixir

Elixir defines abstracts Erlang process with Task when you only need to execute a single action throughout its lifetime. Here is an example that combines Task async/await with pmap implementation:

defmodule Parallel do
  def pmap(collection, func, timeout) do
    collection
    |> Enum.map(&(Task.async(fn -> func.(&1) end)))
    |> Enum.map(fn t -> Task.await(t, timeout) end)
  end
end
defmodule Crawler do
  @domains [
    "ab.com",
    "bc.com",
    "cd.com",
    "de.com",
    "ef.com",
    "yz.com"]
  @allowed_chars "abcdefghijklmnopqrstuvwxyz"
  @max_depth 4
  @max_url 11

  @moduledoc """
  Documentation for Crawler.
  """

  ## Client API
  def crawl_urls(urls, timeout) when is_list(urls) do
    ## Boundary for concurrency and it will not return until all
    ## child URLs are crawled up to MAX_DEPTH limit.
    ## Starting external services using OTP for downloading and indexing
    {:ok, downloader_pid} = Downloader.start_link()
    {:ok, indexer_pid} = Indexer.start_link()
    res = crawl_urls(urls, downloader_pid, indexer_pid, 0, timeout)
    ## Stopping external services using OTP for downloading and indexing
    Process.exit(downloader_pid, :normal)
    Process.exit(indexer_pid, :normal)
    res
  end

  def crawl_urls(urls, downloader_pid, indexer_pid, depth, timeout) when is_list(urls) and is_pid(downloader_pid) and is_pid(indexer_pid) and is_integer(depth) and is_integer(timeout) do
    if depth < @max_depth do
      requests = urls |> Enum.map(&(Request.new(&1, downloader_pid, indexer_pid, depth, timeout)))
      Parallel.pmap(requests, &(handle_crawl/1), timeout)
    else
      []
    end
  end

  # Internal private methods
  defp handle_crawl(req) do
    {:ok, contents} = Downloader.download(req.downloader_pid, req.url, req.timeout)
    {:ok, new_contents} = Downloader.jsrender(req.downloader_pid, req.url, contents, req.timeout)
    if has_content_changed(req.url, new_contents) and !is_spam(req.url, new_contents) do
      Indexer.index(req.indexer_pid, req.url, new_contents, req.timeout)
      urls = parse_urls(req.url, new_contents)
      res = Crawler.crawl_urls(urls, req.downloader_pid, req.indexer_pid, req.depth+1, req.timeout)
      Enum.reduce(res, 0, &(&1 + &2)) + 1
    else
      0
    end
  end

  defp parse_urls(_Url, _Contents) do
    # tokenize contents and extract href/image/script urls
    random_urls(@max_url)
  end

  defp random_urls(n) do
    1..n |> Enum.map(&(random_url/1))
  end

  defp has_content_changed(_url, _contents) do
    # calculate hash digest and compare it with last digest
    true
  end

  defp is_spam(_url, _contents) do
    # apply standardize, stem, ngram, etc for indexing
    false
  end

  defp random_url(_) do
    "https://" <> random_domain() <> "/" <> random_string(20)
  end

  defp random_domain() do
    Enum.random(@domains)
  end

  defp random_string(n) do
    1..n
    |> Enum.reduce([], fn(_, acc) -> [Enum.random(to_charlist(@allowed_chars)) | acc] end)
    |> Enum.join("")
  end
end

Above example is a bit shorter due to the high level Task abstraction but its design has similar pros/cons as actor and pmap implementation of Erlang example. You can download full source code for this implementation from https://github.com/bhatti/concurency-katas/tree/main/elx_pmap.

Using Queue in Elixir

Following example shows web crawler implementation using queue:

defmodule Crawler do
  @max_depth 4

  @moduledoc """
  Documentation for Crawler.
  """

  ## Client API
  def start_link(size) when is_integer(size) do
    {:ok, downloader_pid} = Downloader.start_link()
    {:ok, indexer_pid} = Indexer.start_link()
    GenServer.start_link(__MODULE__, {size, downloader_pid, indexer_pid})
  end

  ## crawl list of url
  def crawl_urls(pid, urls, timeout) when is_pid(pid) and is_list(urls) and is_integer(timeout) do
    ## Boundary for concurrency and it will not return until all
    ## child URLs are crawled up to MAX_DEPTH limit.
    crawl_urls(pid, urls, 0, self(), timeout)
  end

  # returns number of urls crawled
  def total_crawl_urls(pid, timeout) when is_pid(pid) do
    GenServer.call(pid, {:total_crawl_urls}, timeout)
  end

  ## dequeue returns pops top request from the queue and returns it
  def dequeue(pid) when is_pid(pid) do
    GenServer.call(pid, {:dequeue})
  end

  ###########################################
  ## internal api to crawl urls
  def crawl_urls(pid, urls, depth, clientPid, timeout) when is_pid(pid) and is_list(urls) and is_pid(clientPid) and is_integer(timeout) do
    if depth < @max_depth do
      requests = urls |> Enum.map(&(Request.new(&1, depth, clientPid, timeout)))
      requests |> Enum.map(&(GenServer.cast(pid, {:crawl, &1})))
    else
      :max_depth_exceeded
    end
  end

  ###########################################
  ## init method create pool of workers based on given size
  def init({size, downloader_pid, indexer_pid}) when is_integer(size) and is_pid(downloader_pid) and is_pid(indexer_pid) do
    Process.flag(:trap_exit, true)
    pid_to_workers = 0..size |> Enum.map(&child_spec/1)
    |> Enum.map(&start_child/1)
    |> Enum.into(%{})
    {:ok, {pid_to_workers, :queue.new, 0, 0, downloader_pid, indexer_pid}}
  end

  ## asynchronous server handler for adding request to crawl in the queue
  def handle_cast({:crawl, request}, {pid_to_workers, queue, total_in, total_out, downloader_pid, indexer_pid}) do
    new_queue = :queue.in(request, queue)
    {:noreply, {pid_to_workers, new_queue, total_in+1, total_out, downloader_pid, indexer_pid}}
  end

  ## synchronous server handler for returning total urls crawled
  def handle_call({:total_crawl_urls}, _from, {_, _, _total_in, total_out, _, _} = state) do
    {:reply, total_out, state}
  end

  ## synchronous server handler to pop top request from the queue and returning it
  def handle_call({:dequeue}, _from, {pid_to_workers, queue, total_in, total_out, downloader_pid, indexer_pid}) do
    {head, new_queue} = :queue.out(queue)
    if head == :empty do
      {:reply, {head, downloader_pid, indexer_pid}, {pid_to_workers, new_queue, total_in, total_out, downloader_pid, indexer_pid}}
    else
      if rem(:queue.len(queue), 1000) == 0 or rem(total_out+1, 1000) == 0do
        IO.puts("#{total_out+1}...")
      end
      {:value, req} = head
      {:reply, {req, downloader_pid, indexer_pid}, {pid_to_workers, new_queue, total_in, total_out+1, downloader_pid, indexer_pid}}
    end
  end

  ## OTP helper callbacks
  def handle_info({:EXIT, dead_pid, _reason}, {pid_to_workers, queue, total_in, total_out}) do
    # Start new process based on dead_pid spec
    {new_pid, child_spec} = pid_to_workers
    |> Map.get(dead_pid)
    |> start_child()

    # Remove the dead_pid and insert the new_pid with its spec
    new_pid_to_workers = pid_to_workers
    |> Map.delete(dead_pid)
    |> Map.put(new_pid, child_spec)

    {:noreply, {new_pid_to_workers, queue, total_in, total_out}}
  end

  ## Defines spec for worker
  defp child_spec(_) do
    {Worker, :start_link, [self()]}
  end

  ## Dynamically create child
  defp start_child({module, function, args} = spec) do
    {:ok, pid} = apply(module, function, args)
    Process.link(pid)
    {pid, spec}
  end

end

You can download full source code of this example from https://github.com/bhatti/concurency-katas/tree/main/elx_queue.

Using Actor model as Abstract Data Structure

As the cost of actors is very small, you can also use it as an abstract data structure or objects that maintains internal state. Alan Kay, the pioneer in object-oriented programming described message-passing, isolation and state encapsulation as foundation of object-oriented design and Joe Armstrong described Erlang as the only object-oriented language. For example, let’s say you need to create a cache of stock quotes using dictionary data structure, which is updated from another source and provides easy access to the latest quotes. You would need to protect access to shared data in multi-threaded environment with synchronization. However, with actor-based design, you may define an actor for each stock symbol that keeps latest value internally and provides API to access or update quote data. This design will remove the need to synchronize shared data structure and will result in better performance.

Overall, Erlang process model is a bit low-level compared to async/await syntax and lacks composition in asynchronous code but it can be designed to provide structured scope, error handling and termination. Further, immutable data structures and message passing obviates the need for locks to protect shared state. Another benefit of Erlang/Elixir is its support of distributed services so it can be used for automatically distributing tasks to remote machines seamlessly.

May 10, 2010

Building a stock quote server in Erlang using Ejabberd, XMPP, Bosh, Exmpp, Strophe and Yaws

Filed under: Erlang — admin @ 1:40 pm

Recently, I have been building a stock quote server at work that publishes financial data using using Ejabberd, XMPP, PubSub, Exmpp and Bosh on the server side and Strophe library on the web application front. I will describe a simplified implementation of the quote server using Yahoo Quotes.

Installation

Download Ejabberd and go through the installation wizad. You will be asked your host name, admin account/password and whether ejabberd would be running in a clustered environment. For this tutorial, we will be running ejabberd on a single. Once installed, you can start the ejabbered server using

 /Applications/ejabberd-2.1.3/bin/ejabberdctl start
 

As, I am using Mac, the actual path on your machine may be different. The ejabbered comes with a web baesd admin tool, that you can access using

 http://<your-host-name>:5280/admin
 

and you would be able to see available nodes, users, etc.


Registering Users

We will be creating two users: producer and consumer, where the former would be used for publishing stock quotes and latter would be used for subscribing quotes on the web side, i.e.,

 sudo /Applications/ejabberd-2.1.3/bin/ejabberdctl register producer  producer
 sudo /Applications/ejabberd-2.1.3/bin/ejabberdctl register consumer  consumer
 

Debuging with Psi

You can debug XMPP communications using a jabber client such as Psi, which you can download. After you download, you can install and specify your local hostname as a server, e.g.



You can then login using consumer@<your-host-name> with password consumer. As, we will be using PubSub protocol, you can discover available nodes or topics using General->Service Discovery from the menu, e.g.


Downloading Sample Code

I have stored all code needed for this example on http://github.com/bhatti/FQPubSub, that you can checkout using:

 git clone git@github.com:bhatti/FQPubSub.git
 

The sample code depends on exmpp, lhttpc, jsonerl, and yaws modules so after downloading the code, checkout dependent modules using

 git submodule init
 git submodule update
 

Above commands will checkout dependent modules in deps directory.

Building Sample Code

Before building, ensure you have make and autoconf tools installed, then replace <paraclete.local> with your <your-host-name> in docroot/index.html and src/quote_utils.hrl. Then type following command

 make
 

to build all sample code and dependent libraries

Starting Web Server

Though, the web code including Srophe library and Javascript can be run directly in the browser, but you can start Yaws to serve the application as follows:

 erl -pa ebin deps/exmpp/ebin/ deps/lhttpc/ebin/ deps/yaws/ebin -boot start_sasl -run web_server start 
 

Note, that the web server will be continuously running, so you can open a separate shell before typing above command.

Publishing Quotes

Create two separate shells and type following command in first shell:

   erl -pa ebin deps/exmpp/ebin/ deps/lhttpc/ebin/ deps/yaws/ebin -boot start_sasl -run quote_publisher start AAPL
 

and following command in second shell

   erl -pa ebin deps/exmpp/ebin/ deps/lhttpc/ebin/ deps/yaws/ebin -boot start_sasl -run quote_publisher start IBM
 

Above commands will start Erlang processes, that will poll Yahoo Quotes every second and publish the quotes on the node AAPL and IBM respectively.

Next point your browser to http://<your-host-name>:8000/, and add “IBM” and “AAPL” symbols, you would then see quotes for both symbols, e.g.

Code under the hood

Now that you are able to run the example, let’s take a look at the code how it works:

Client library for Yahoo Finance

Though, at work we use our own real time stock quote feed, but for this sample I implemented stock quote feed using Yahoo Finance. The src/yquote_client.hrl and src/yquote_client.erl define client API for accessing Yahoo finance service. Here is the Erlang code for requesting the quote using HTTP request and parsing it:

  1 %%%-------------------------------------------------------------------
 
  2 %%% File : yquote_client.erl
  3 %%% Author : Shahzad Bhatti
  4 %%% Purpose : Wrapper Library for Yahoo Stock Quotes
 
  5 %%% Created : May 8, 2010
  6 %%%-------------------------------------------------------------------
  7 
  8 -module(yquote_client).
 
  9 
 10 -author('bhatti@plexobject.com').
 11 
 12 -export([
 13          quote/1
 14         ]).
 
 15 
 16 -record(quote, {
 17         symbol,
 18         price,
 19         change,
 20         volume,
 
 21         avg_daily_volume,
 22         stock_exchange,
 23         market_cap,
 24         book_value,
 25         ebitda,
 26         dividend_per_share,
 
 27         dividend_yield,
 28         earnings_per_share,
 29         week_52_high,
 30         week_52_low,
 31         day_50_moving_avg,
 32         day_200_moving_avg,
 
 33         price_earnings_ratio,
 34         price_earnings_growth_ratio,
 35         price_sales_ratio,
 36         price_book_ratio,
 37         short_ratio}).
 38 
 
 39 
 40 
 41 quote(Symbol) ->
 42     inets:start(),
 43     {ok,{_Status, _Headers, Response}} = http:request(get, {url(Symbol), []},
 
 44         [{timeout, 5000}], [{sync, true}]),
 45 
 46     Values = re:split(Response, "[,\r\n]"),
 47     #quote{
 
 48         symbol = list_to_binary(Symbol),
 49         price = to_float(lists:nth(1, Values)),
 50         change = to_float(lists:nth(2, Values)),
 51         volume = to_integer(lists:nth(3, Values)),
 
 52         avg_daily_volume = to_integer(lists:nth(4, Values)),
 53         stock_exchange = lists:nth(5, Values), % to_string
 54         market_cap = to_float(lists:nth(6, Values)), % B
 
 55         book_value = to_float(lists:nth(7, Values)),
 56         ebitda = to_float(lists:nth(8, Values)), % B
 57         dividend_per_share = to_float(lists:nth(9, Values)),
 
 58         dividend_yield = to_float(lists:nth(10, Values)),
 59         earnings_per_share = to_float(lists:nth(11, Values)),
 60         week_52_high = to_float(lists:nth(12, Values)),
 61         week_52_low = to_float(lists:nth(13, Values)),
 
 62         day_50_moving_avg = to_float(lists:nth(14, Values)),
 63         day_200_moving_avg = to_float(lists:nth(15, Values)),
 64         price_earnings_ratio = to_float(lists:nth(16, Values)),
 65         price_earnings_growth_ratio = to_float(lists:nth(17, Values)),
 
 66         price_sales_ratio = to_float(lists:nth(18, Values)),
 67         price_book_ratio = to_float(lists:nth(19, Values)),
 68         short_ratio = to_float(lists:nth(20, Values))}.
 69 
 
 70 url(Symbol) ->
 71     "http://finance.yahoo.com/d/quotes.csv?s=" ++ Symbol ++ "&f=l1c1va2xj1b4j4dyekjm3m4rr5p5p6s7".
 72 
 
 73 to_float(<<"N/A">>) ->
 74     -1;
 75 to_float(Bin) ->
 76     {Multiplier, Bin1} = case bin_ends_with(Bin, <<$B>>) of
 
 77         true ->
 78             {1000000000, bin_replace(Bin, <<$B>>, <<>>)};
 79         false ->
 80             case bin_ends_with(Bin, <<$M>>) of
 
 81                 true ->
 82                     {1000000, bin_replace(Bin, <<$M>>, <<>>)};
 83                 false ->
 84                     {1,Bin}
 
 85             end
 86     end,
 87     L = binary_to_list(Bin1),
 88     list_to_float(L) * Multiplier.
 
 89 
 90 
 91 
 

Note that I am omitting some code in above listing, as I just wanted to highlight HTTP request and parsing code.

Publishing the Stock Quote

I used exmpp library to communicate with the XMPP server in Erlang. Here is the code for publishing the quotes using Bosh/XMPP protocol:

  1 %%%-------------------------------------------------------------------
 
  2 %%% File : quote_publisher.erl
  3 %%% Author : Shahzad Bhatti
  4 %%% Purpose : OTP server for publishing quotes
 
  5 %%% Created : May 8, 2010
  6 %%%-------------------------------------------------------------------
  7 -module(quote_publisher).
 
  8 
  9 -export([
 10     start/1,
 11     start/5,
 12     stop/1]).
 13 
 
 14 -export([init/5]).
 15 
 16 -include_lib("quote_utils.hrl").
 17 
 18 -record(state, {session, jid, service=?TEST_XMPP_PUBSUB, symbol}).
 
 19 
 20 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 21 %% APIs
 22 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 
 23 start(Symbol) ->
 24     start(?TEST_XMPP_SERVER, ?TEST_XMPP_PORT, ?PRODUCER_USERNAME,
 25         ?PRODUCER_PASSWORD, Symbol).
 
 26 
 27 start(Host, Port, User, Password, Symbol) ->
 28     spawn(?MODULE, init, [Host, Port, User, Password, Symbol]).
 
 29 
 30 stop(Pid) ->
 31     Pid ! stop.
 32   
 33 init(Host, Port, User, Password, Symbol) ->
 
 34     {ok, {MySession, MyJID}} = quote_utils:connect(Host, Port, User, Password),
 35     State = #state{session=MySession, jid=MyJID, symbol = Symbol},
 
 36     create_symbol_node(State),
 37     loop(State).
 38 
 39 loop(#state{session=MySession, jid=_MyJID, service = _Service,
 
 40         symbol = _Symbol}=State) ->
 41     receive
 42         stop ->
 43             quote_utils:disconnect(MySession);
 
 44         Record = #received_packet{packet_type=message, raw_packet=_Packet} ->
 45             loop(State);
 46         Record ->
 
 47             loop(State)
 48     after 2000 ->
 49         publish_quote(State),
 50         loop(State)
 
 51     end.
 52 
 53 create_symbol_node(#state{session=MySession, jid=MyJID, service = Service,
 
 54         symbol = Symbol}) ->
 55     IQ = exmpp_client_pubsub:create_node(Service, Symbol),
 56     PacketId = exmpp_session:send_packet(MySession, exmpp_stanza:set_sender(IQ, MyJID)),
 
 57     PacketId2 = erlang:binary_to_list(PacketId),
 58     receive #received_packet{id=PacketId2, raw_packet=Raw} ->
 
 59       case exmpp_iq:is_error(Raw) of
 60         true -> {error, Raw};
 61         _ -> ok
 
 62       end
 63     end.
 64   
 65 publish_quote(#state{session=MySession, jid=MyJID, service = Service, symbol = Symbol}) ->
 
 66     Quote = yquote_client:quote(Symbol),
 67     JsonQuote = ?record_to_json(quote, Quote),
 68     M = exmpp_xml:element(?QUOTE_DATA),
 
 69     IQ = exmpp_client_pubsub:publish(Service, Symbol, exmpp_xml:append_cdata(M,
 70             JsonQuote)),
 71     Xml = exmpp_stanza:set_id(exmpp_stanza:set_sender(IQ, MyJID), Symbol),
 
 72     PacketId = exmpp_session:send_packet(MySession, exmpp_stanza:set_sender(IQ, MyJID)),
 73     PacketId2 = erlang:binary_to_list(PacketId),
 
 74     receive #received_packet{id=PacketId2, raw_packet=Raw} ->
 75       case exmpp_iq:is_error(Raw) of
 
 76         true -> error;
 77         _ -> ok
 78       end
 79     end.
 
 80 
 81 
 82 
 

In above code, a process is created for each symbol, which periodically polls stock quote and publishes it to the XMPP node using pubsub/bosh protocol. Note that a unique node is created for each symbol and node must be created before anyone can publish or subscribe. Also, note that publish/subscribe APIs use request/ack protocol, so after sending the request, the process retrieves the acknowledgement of the request.

Here are some utility functions used by the publisher:

  1 -module(quote_utils).
 
  2   
  3 -include_lib("quote_utils.hrl").
  4 
  5 -export([
  6     init_session/2,
 
  7     connect/4,
  8     disconnect/1]).
  9 
 10 bosh_url(Host, Port) ->
 
 11     "http://" ++ Host ++ ":" ++ integer_to_list(Port) ++ "/http-bind".
 12 
 
 13 
 14 connect(Host, _Port, User, Password) ->
 15     safe_start_apps(),
 
 16     MySession = exmpp_session:start({1,0}),
 17     exmpp_xml:start_parser(), %% Create XMPP ID (Session Key):
 18     MyJID = exmpp_jid:make(User, Host, random),
 
 19     %% Create a new session with basic (digest) authentication:
 20     exmpp_session:auth_basic_digest(MySession, MyJID, Password),
 21     
 
 22     
 23     {ok, _StreamId, _Features} = exmpp_session:connect_BOSH(MySession, bosh_url(Host, 5280), Host, []),
 
 24     try quote_utils:init_session(MySession, Password)
 25     catch
 26         _:Error -> io:format("got error: ~p~n", [Error]), {error, Error}
 
 27     end,
 28     {ok, {MySession, MyJID}}.
 29 
 30 init_session(MySession, Password) ->
 
 31     %% Login with defined JID / Authentication:
 32     try exmpp_session:login(MySession, "PLAIN")
 33     catch
 
 34         throw:{auth_error, 'not-authorized'} ->
 35         %% Try creating a new user:
 36         io:format("Register~n",[]),
 37         %% In a real life client, we should trap error case here
 
 38         %% and print the correct message.
 39         exmpp_session:register_account(MySession, Password),
 40         %% After registration, retry to login:
 
 41         exmpp_session:login(MySession)
 42     end,
 43     %% We explicitely send presence:
 44     exmpp_session:send_packet(MySession, exmpp_presence:set_status(exmpp_presence:available(), "Ready to publish!!!")),
 
 45     ok.
 46 
 47 disconnect(MySession) ->
 48     exmpp_session:stop(MySession).
 49 
 
 50 safe_start_apps() ->
 51     try start_apps()
 52     catch
 53         _:Error -> io:format("apps already started : ~p~n", [Error]), {error, Error}
 
 54     end.
 55 
 56 start_apps() ->
 57     ok = application:start(exmpp),
 58     ok = application:start(crypto),
 59     ok = application:start(ssl),
 
 60     ok = application:start(lhttpc).
 61 
 

Note that above code auto-registers users, which is not recommended for production use.

Javascript code using Strophe library

The web application depends on jQuery, Strophe and Strophe Pubsub. These libraries are included in docroot directory that are imported by index.html. The Strophe library and ejabbered 2.1.3 version supports cross domain scripting so that bosh service here doesn’t need to be on the same domain/port, but it must have a /crossdomain.xml policy file that allows access from wherever index.html lives. The Javascript initializes the connection parameter as follows (you would have to change Host):

   1 <script type="text/javascript">
 
   2     // The BOSH_SERVICE here doesn't need to be on the same domain/port, but
 
   3     // it must have a /crossdomain.xml policy file that allows access from
 
   4     // wherever crossdomain.html lives.
   5     // TODO: REPLACE <paraclete.local> with your <host-name>
 
   6     var HOST = 'paraclete.local';
   7     var JID = 'consumer@' + HOST;
 
   8     var PASSWORD = 'consumer';
   9     var BOSH_SERVICE = 'http://' + HOST + ':5280/http-bind'; //'/xmpp-httpbind'
 
  10     var PUBSUB = 'pubsub.' + HOST;
  11     var connection = null;
 
  12     var autoReconnect = true;
  13     var hasQuotes = [];
  14     var subscriptions = [];
 
  15   
  16     function log(msg) {
  17         $('#log').append('<div></div>').append(document.createTextNode(msg));
 
  18     }
  19   
  20     function rawInput(data) {
  21         //log('RECV: ' + data);
 
  22     }
  23     
  24     function rawOutput(data) {
  25         //log('SENT: ' + data);
 
  26     }
  27     function onQuote(stanza) {
  28         //log('onQuote###### ' + stanza);
 
  29         try {
  30             $(stanza).find('event items item data').each(function(idx, elem) {
  31                 quote = jQuery.parseJSON($(elem).text());
 
  32                 //{"price":235.86,"change":-10.39,"volume":59857756,"avg_daily_volume":20775600,"stock_exchange":[78,97,115,100,97,113,78,77],"market_cap":2.146e+11,
 
  33                 //"book_value":43.257,"ebitda":1.5805e+10,"dividend_per_share":0.0,"dividend_yield":-1,"earnings_per_share":11.796,"week_52_high":272.46,"week_52_low":119.38,
 
  34                 //"day_50_moving_avg":245.206,"day_200_moving_avg":214.119,"price_earnings_ratio":20.88,"price_earnings_growth_ratio":1.05,"price_sales_ratio":4.38,
 
  35                 //"price_book_ratio":5.69,"short_ratio":0.7}
  36                 if (hasQuotes[quote.symbol] != undefined) {
 
  37                     $('price_' + quote.symbol).innerHTML = quote.price;
  38                     $('change_' + quote.symbol).innerHTML = quote.change;
  39                     $('volume_' + quote.symbol).innerHTML = quote.volume;
 
  40                 } else {
  41                     hasQuotes[quote.symbol] = true;
  42                     $('#quotesTable > tbody:last').append('<tr id="quote_' +
 
  43                         quote.symbol + '"><td>' + quote.symbol +
  44                         '</td><td id="price_' + quote.symbol + '">' + quote.price +
 
  45                         '</td><td id="change_' + quote.symbol + '" class="class_change_' + quote.symbol + '">' +
  46                         quote.change + '</td><td id="volume_' +
 
  47                         quote.symbol + '">' +
  48                         quote.volume + '</td></tr>');
  49                 }
 
  50 
  51                 if(quote.change < 0) {
  52                     $('.class_change_' + quote.symbol).css('color', 'red');
 
  53                 } else {
  54                     $('.class_change_' + quote.symbol).css('color', 'green');
 
  55                 }
  56             });
  57         } catch (e) {
  58             log(e)
 
  59         }
  60         return true;
  61     }
  62 
 
  63     function handleSubscriptionChange (stanza) {
  64         //log("***handleSubscriptionChange Received: " + stanza);
 
  65     }
  66         
  67     function onConnect(status) {
  68         if (status == Strophe.Status.CONNECTING) {
 
  69             log('Strophe is connecting.');
  70         } else if (status == Strophe.Status.CONNFAIL) {
  71             log('Strophe failed to connect.');
 
  72             $('#connect').get(0).value = 'connect';
  73         } else if (status == Strophe.Status.DISCONNECTING) {
 
  74             log('Strophe is disconnecting.');
  75         } else if (status == Strophe.Status.DISCONNECTED) {
  76             if (autoReconnect) {
 
  77                 log( "Streaming disconnected. Trying to reconnect...", METHODNAME );
  78                 connection.connect($('#jid').get(0).value, $('#pass').get(0).value, onConnect);
  79                 log( "Streaming reconnected.", METHODNAME );
 
  80             } else {
  81                 log('Strophe is disconnected.');
  82                 $('#connect').get(0).value = 'connect';
 
  83                 //publishEvent( "streamingDisconnected" );
  84             }
  85         } else if (status == Strophe.Status.CONNECTED) {
 
  86             log('Strophe is connected.');
  87             //log('QUOTE_BOT: Send a message to ' + connection.jid + ' to talk to me.');
 
  88             connection.addHandler(onMessage, null, 'message', null, null, null);
  89             connection.send($pres().tree());
 
  90             publishEvent( "streamingConnected" );
  91         }
  92     }
  93 
 
  94     function subscribe(symbol) {
  95         if (subscriptions[symbol]) return;
  96         try {
 
  97             connection.pubsub.subscribe(JID, PUBSUB, symbol, [], onQuote, handleSubscriptionChange);
  98             subscriptions[symbol] = true;
  99             log("Subscribed to " + symbol);
 
 100         } catch (e) {
 101             alert(e)
 102         }
 103     }
 104     function unsubscribe(symbol) {
 
 105         if (!subscriptions[symbol]) return;
 106         try {
 107             connection.pubsub.unsubscribe(JID, PUBSUB, symbol, handleSubscriptionChange);
 108             subscriptions[symbol] = false;
 
 109             log("Unsubscribed from " + symbol);
 110         } catch (e) {
 111             alert(e)
 112         }
 
 113     }
 114   
 115     function onMessage(msg) {
 116         var to = msg.getAttribute('to');
 
 117         var from = msg.getAttribute('from');
 118         var type = msg.getAttribute('type');
 119         var elems = msg.getElementsByTagName('body');
 
 120   
 121         if (type == "chat" && elems.length > 0) {
 122             var body = elems[0];
 
 123             log('QUOTE_BOT: I got a message from ' + from + ': ' + Strophe.getText(body));
 124             var reply = $msg({to: from, from: to, type: 'chat'}).cnode(Strophe.copyElement(body));
 125             connection.send(reply.tree());
 
 126             log('QUOTE_BOT: I sent ' + from + ': ' + Strophe.getText(body));
 127         }
 128         // we must return true to keep the handler alive.
 
 129         // returning false would remove it after it finishes.
 
 130         return true;
 131     }
 132  
 133     $(document).ready(function () {
 
 134         connection = new Strophe.Connection(BOSH_SERVICE);
 135         connection.rawInput = rawInput;
 136         connection.rawOutput = rawOutput;
 137         connection.connect(JID, PASSWORD, onConnect);
 138         //connection.disconnect();
 
 139         $('#add_symbol').bind('click', function () {
 140             var symbol = $('#symbol').get(0).value;
 
 141             subscribe(symbol);
 142         });
 143     });
 144 
 145 </script>
 146 
 
 

When the document is loaded, the connection to the ejabberd server is established. Here is the form and table that is used to add subscription and display current quote information for the symbols:

  1 <form name='symbols'>
 
  2     <label for='symbol'>Symbol:</label>
  3     <input type='text' id='symbol'/>
 
  4     <input type='button' id='add_symbol' value='add' />
 
  5 </form>
  6 <hr />
  7 <div id='log'></div>
 
  8 <table id="quotesTable" width="600" border="2" bordercolor="#333333">
 
  9     <thead>
 10         <tr>
 11             <th>Symbol</th>
 
 12             <th>Price</th>
 13             <th>Change</th>
 14             <th>Volume</th>
 
 15         </tr>
 16     </thead>
 17     <tbody>
 18     </tbody>
 
 19 </table>
 20 
 

When the form is submitted, it calls subscribe method, which in turn sends request to the ejabbered server for subscription. When a new quote is received, it calls onQuote function, which inserts a row in the table when a new symbol is added or updates the quote information if it already exists.

Conclusion

The ejabberd, XMPP, exmpp, Bosh and Strophe provides a robust and mature solution for messaging and are especially suitable for web applications that want to build highly scalable and interactive applications. Though, above code is fairly simple, but same design principles can be used to support large number of stock quotes updates. As, we need to send stock quotes from tens of thousands symbols for every tick within a fraction of a second, the Erlang provides very scalable solution, where each symbol is simply served by an Erlang process. Finally, I am still learning more about Ejabberd’s clustering, security, and other features so that it can truly survive the production load, so I would love to hear any feedback you might have with similar systems.

References


May 20, 2008

Rebooting philosophy in Erlang

Filed under: Erlang — admin @ 10:49 am

I just read “Let It Crash” Programming, which talks about how Erlang is designed as a fault tolerant language from ground up. I have been learning Erlang since Joe Armstrong’s book came out and have heard Joe a few times talk about fault tolerance. Steve Vionski has also talked about Erlang: It.s About Reliability in flame war between him and Ted Neward. For me, Erlang reminds of Microsoft Windows, i.e. when Windows stops working I just reboot the machine. Erlang does the same thing, when some process fails, it just restarts the processes. About sixteen years ago, I started my career in old VAX, Mainframe and UNIX environments and my managers used to say that he never had to restart Mainframe if something fails, but somehow bugs on Windows get fixed after reboot. When I worked at Fermilab in mid 90s, we had server farms of hundreds of machines and fault tolerance was quite important. Though, Google didn’t invent server farms, but it scaled them to new level, where failure of machines don’t stop the entire application. Erlang takes the same philosophy to the programming language. Obviously, in order to make truly fault tolerant application, the Erlang processes will need to be spawned on separate machines. Erlang’s support of CSP style communication and distributed computing such as OTP makes it trivial. You can further increase fault tolerance and high availibility by using machines on separate racks, networks, power sources or data centers. No wonder, Facebook is using Erlang in its Chat application.

December 23, 2007

Released ErlSDB 0.1

Filed under: Erlang,SimpleDB,Web Services — admin @ 7:09 pm

I started working on an Erlang library to access Amazon’s SimpleDB web service and I released an early version of the library this weekend. Here are some notes on its usage:
Installing

svn checkout http://erlsdb.googlecode.com/svn/trunk/ erlsdb-read-only

Building

make

Testing

edit Makefile and add access key and secret key, then type make test

Usage

Take a look at test/erlsdb_test.erl to learn usage, here is a sample code

Starting Server

erlsdb:start(type,
    	[#sdb_state{
		access_key = "YourAccessKey",
		secret_key = "YourSecretKey",
		domain = "YourDomain"
		}
	])

Creating Domain

    erlsdb:create_domain()

Note that the server will use the domain that was passed during initialization.

Listing all Domains

    {ok, List, _} = erlsdb:list_domains()

Deleting Domain

    erlsdb:delete_domain()

Adding an item

    Attributes = lists:sort([
	["StreetAddress", "705 5th Ave"],
        ["City", "Seattle"],
        ["State", "WA"],
        ["Zip", "98101"]
	]),
    erlsdb:put_attributes("TccAddr", Attributes)

Retrieving an item

    {ok, UnsortedAttrs} = erlsdb:get_attributes("TccAddr")

Deleting an item

    erlsdb:delete_attributes("TccAddr"),

Powered by WordPress