Here are the programming languages I’ve used over the last three decades. From BASIC in the late 80s to Rust today, each one taught me something about solving problems with code.
Late 1980s – Early 1990s
I learned coding with BASIC/QUICKBASIC on Atari and later IBM XT computer in 1980s.
I learned other languages in college or on my own like C, Pascal, Prolog, Lisp, FORTRAN and PERL.
In college, I used Icon to build compilers.
My first job was mainframe work and I used COBOL and CICS for applications, JCL and REXX for scripting and SAS for data processing.
Later at a physics lab, I used C/C++, Fortran for applications and Python for scripting and glue language.
I used a number of 4GL languages like dBase, FoxPro, Paradox, Delphi. Later I used Visual Basic and PowerBuilder for building client applications.
I used SQL and PL/SQL throughout my career for relational databases.
Web Era Mid/Late 1990s
The web era introduced a number of new languages like HTML, Javascript, CSS, ColdFusion, and Java.
I used XML/XSLT/XPath/XQuery, PHP, VBScript and ActionScript.
I used RSS/SPARQL/RDF for buiding semantic web applications.
I used IDL/CORBA for building distributed systems.
Mobile/Services Era 2000s
I used Ruby for building web applications, Erlang/Elixir for building concurrent applications.
I used Groovy for writing tests and R for data analysis.
When iOS was released, I used Objective-C to build mobile applications.
In this era, functional languages gained popularity and I used Scala/Haskell/Clojure for some projects.
New Languages Era Mid 2010s
I started using Go for networking/concurrent applications.
I started using Swift for iOS applications and Kotlin for Android apps.
I initially used Flow language from Facebook but then started using TypeScript instead of JavaScript.
I used Dart for Flutter applications.
I used GraphQL for some of client friendly backend APIs.
I used Solidity for Ethereum smart contracts.
I used Lua as a glue language with Redis, HAProxy and other similar systems.
I used Rust and became my go to language for highly performant applications.
What Three Decades of Languages Taught Me
Every language is a bet on what matters most: Safety vs. speed vs. expressiveness vs. ecosystem vs. hiring.
Languages don’t die, they fade: I still see COBOL in production. I still debug Perl scripts. Legacy is measured in decades.
The fundamentals never change: Whether it’s BASIC or Rust, you’re still managing state, controlling flow, and abstracting complexity.
Polyglotism is a superpower: Each language teaches you a different way to think. Functional programming makes you better at OOP. Systems programming makes you better at scripting.
The best language is the one your team can maintain: I’ve seen beautiful Scala codebases become liabilities and ugly PHP applications become billion-dollar businesses.
What’s Next?
I’m watching Zig (Rust without the complexity?) and it’s on my list for next language to learn.
I’ve spent the last year building AI agents in enterprise environments. During this time, I’ve extensively applied emerging standards like Model Context Protocol (MCP) from Anthropic and the more recent Agent-to-Agent (A2A) Protocol for agent communication and coordination. What I’ve learned: there’s a massive gap between building a quick proof-of-concept with these protocols and deploying a production-grade system. The concerns that get overlooked in production deployments are exactly what will take you down at 3 AM:
Multi-tenant isolation with row-level security (because one leaked document = lawsuit)
JWT-based authentication across microservices (no shared sessions, fully stateless)
Real-time observability of agent actions (when agents misbehave, you need to know WHY)
Cost tracking and budgeting per user and model (because OpenAI bills compound FAST)
Graceful degradation when embeddings aren’t available (real data is messy)
Integration testing against real databases (mocks lie to you)
Disregarding security concerns can lead to incidents like the Salesloft breach where their AI chatbot inadvertently stored authentication tokens for hundreds of services, which exposed customer data across multiple platforms. More recently in October 2025, Filevine (a billion-dollar legal AI platform) exposed 100,000+ confidential legal documents through an unauthenticated API endpoint that returned full admin tokens to their Box filesystem. No authentication required, just a simple API call. I’ve personally witnessed security issues from inadequate AuthN/AuthZ controls and cost overruns exceeding hundreds of thousands of dollars, which are preventable with proper security and budget enforcement.
The good news is that MCP and A2A protocols provide the foundation to solve these problems. Most articles treat these as competing standards but they are complementary. In this guide, I’ll show you exactly how to combine MCP and A2A to build a system that handles real production concerns: multi-tenancy, authentication, cost control, and observability.
Reference Implementation
To demonstrate these concepts in action, I’ve built a reference implementation that showcases production-ready patterns.
Architecture Philosophy:
Three principles guided every decision:
Go for servers, Python for workflows – Use the right tool for each job. Go handles high-throughput protocol servers. Python handles AI workflows.
Database-level security – Multi-tenancy enforced via PostgreSQL row-level security (RLS), not application code. Impossible to bypass accidentally.
Stateless everything – Every service can scale horizontally. No sticky sessions, no shared state, no single points of failure.
All containerized, fully tested, and ready for production deployment.
But before we dive into the implementation, let’s understand the fundamental problem these protocols solve and why you need both.
Part 1: Understanding MCP and A2A
The Core Problem: Integration Chaos
Prior to MCP protocol in 2024, you had to build custom integration with LLM providers, data sources and AI frameworks. Every AI application had to reinvent authentication, data access, and orchestration, whichdoesn’t scale. MCP and A2A emerged to solve different aspects of this chaos:
The MCP Side: Standardized Tool Execution
Think of MCP as a standardized toolbox for AI models. Instead of every AI application writing custom integrations for databases, APIs, and file systems, MCP provides a JSON-RPC 2.0 protocol that models use to:
“MCP excels at synchronous, stateless tool execution. It’s perfect when you need an AI model to retrieve information, execute a function, and return results immediately.”
The server executes the tool and returns results. Simple, stateless, fast.
Why JSON-RPC 2.0? Because it’s:
Language-agnostic – Works with any language that speaks HTTP
Batch-capable – Multiple requests in one HTTP call
Error-standardized – Consistent error codes across implementations
Widely adopted – 20+ years of production battle-testing
The A2A Side: Stateful Workflow Orchestration
A2A handles what MCP doesn’t: multi-step, stateful workflows where agents collaborate. From the A2A Protocol docs:
“A2A is designed for asynchronous, stateful orchestration of complex tasks that require multiple steps, agent coordination, and long-running processes.”
A2A provides:
Task creation and management with persistent state
Real-time streaming of progress updates (Server-Sent Events)
Agent coordination across multiple services
Artifact management for intermediate results
Why Both Protocols Matter
Here’s a real scenario from my fintech work that illustrates why you need both:
Use Case: Compliance analyst needs to research a company across 10,000 documents, verify regulatory compliance, cross-reference with SEC filings, and generate an audit-ready report.
“Use MCP when you need fast, stateless tool execution. Use A2A when you need complex, stateful orchestration. Use both when building production systems.”
Part 2: Architecture
System Overview
Key Design Decisions
Protocol Servers (Go):
MCP Server – Secure document retrieval with pgvector and hybrid search. Go’s concurrency model handles 5,000+ req/sec, and its type safety catches integration bugs at compile time (not at runtime).
A2A Server – Multi-step workflow orchestration with Server-Sent Events for real-time progress tracking. Stateless design enables horizontal scaling.
AI Workflows (Python):
LangGraph Workflows – RAG, research, and hybrid pipelines. Python was the right choice here because the AI ecosystem (LangChain, embeddings, model integrations) lives in Python.
PostgreSQL with pgvector – Multi-tenant document storage with row-level security policies enforced at the database level (not application level)
Ollama – Local LLM inference for development and testing (no OpenAI API keys required)
DatabaseSecurity:
Application-level tenant filtering for database is not enough so row-level security policies are enforced:
// ? BAD: Application-level filtering (can be bypassed)
func GetDocuments(tenantID string) ([]Document, error) {
query := "SELECT * FROM documents WHERE tenant_id = ?"
// What if someone forgets the WHERE clause?
// What if there's a SQL injection?
// What if a bug skips this check?
}
-- ? GOOD: Database-level Row-Level Security (impossible to bypass)
ALTER TABLE documents ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_isolation ON documents
USING (tenant_id = current_setting('app.current_tenant_id')::uuid);
Every query automatically filters by tenant so there is no way to accidentally leak data. Even if your application has a bug, the database enforces isolation.
JWT Authentication
MCP server and UI share RSA keys for token verification, which provides:
Asymmetric: MCP server only needs public key (can’t forge tokens)
Rotation: Rotate private key without redeploying services
The reference implementation (hybrid_search.go) uses PostgreSQL’s full-text search (BM25-like) combined with pgvector:
// Hybrid search query using Reciprocal Rank Fusion
query := `
WITH bm25_results AS (
SELECT
id,
ts_rank_cd(
to_tsvector('english', title || ' ' || content),
plainto_tsquery('english', $1)
) AS bm25_score,
ROW_NUMBER() OVER (ORDER BY ts_rank_cd(...) DESC) AS bm25_rank
FROM documents
WHERE to_tsvector('english', title || ' ' || content) @@ plainto_tsquery('english', $1)
),
vector_results AS (
SELECT
id,
1 - (embedding <=> $2) AS vector_score,
ROW_NUMBER() OVER (ORDER BY embedding <=> $2) AS vector_rank
FROM documents
WHERE embedding IS NOT NULL
),
combined AS (
SELECT
COALESCE(b.id, v.id) AS id,
-- Reciprocal Rank Fusion score
(
COALESCE(1.0 / (60 + b.bm25_rank), 0) * $3 +
COALESCE(1.0 / (60 + v.vector_rank), 0) * $4
) AS combined_score
FROM bm25_results b
FULL OUTER JOIN vector_results v ON b.id = v.id
)
SELECT * FROM combined
ORDER BY combined_score DESC
LIMIT $7
`
Why Reciprocal Rank Fusion (RRF)? Because:
Score normalization: BM25 scores and vector similarities aren’t comparable
Rank-based: Uses position, not raw scores
Research-backed: Used by search engines (Elasticsearch, Vespa)
Tunable: Adjust k parameter (60 in our case) for different behaviors
Part 3: The MCP Server – Secure Document Retrieval
Understanding JSON-RPC 2.0
Before we dive into implementation, let’s understand why MCP chose JSON-RPC 2.0.
Here’s the complete hybrid search tool (hybrid_search.go) implementation with detailed comments:
// mcp-server/internal/tools/hybrid_search.go
type HybridSearchTool struct {
db database.Store
}
func (t *HybridSearchTool) Execute(ctx context.Context, args map[string]interface{}) (protocol.ToolCallResult, error) {
// 1. AUTHENTICATION: Extract tenant from JWT claims
// This happens at middleware level, but we verify here
tenantID, ok := ctx.Value(auth.ContextKeyTenantID).(string)
if !ok {
return protocol.ToolCallResult{IsError: true}, fmt.Errorf("tenant ID not found in context")
}
// 2. PARAMETER PARSING: Extract and validate arguments
query, _ := args["query"].(string)
if query == "" {
return protocol.ToolCallResult{IsError: true}, fmt.Errorf("query is required")
}
limit, _ := args["limit"].(float64)
if limit <= 0 {
limit = 10 // default
}
if limit > 50 {
limit = 50 // max cap
}
bm25Weight, _ := args["bm25_weight"].(float64)
vectorWeight, _ := args["vector_weight"].(float64)
// 3. WEIGHT NORMALIZATION: Ensure weights sum to 1.0
if bm25Weight == 0 && vectorWeight == 0 {
bm25Weight = 0.5
vectorWeight = 0.5
}
// 4. EMBEDDING GENERATION: Using Ollama for query embedding
var embedding []float32
if vectorWeight > 0 {
embedding = generateEmbedding(query) // Calls Ollama API
}
// 5. DATABASE QUERY: Execute hybrid search with RLS
params := database.HybridSearchParams{
Query: query,
Embedding: embedding,
Limit: int(limit),
BM25Weight: bm25Weight,
VectorWeight: vectorWeight,
}
results, err := t.db.HybridSearch(ctx, tenantID, params)
if err != nil {
return protocol.ToolCallResult{IsError: true}, err
}
// 6. RESPONSE FORMATTING: Convert to JSON for client
jsonData, _ := json.Marshal(results)
return protocol.ToolCallResult{
Content: []protocol.ContentBlock{{Type: "text", Text: string(jsonData)}},
IsError: false,
}, nil
}
The NULL Embedding Problem
Real-world data is messy. Not every document has an embedding. Here’s what happened:
Initial Implementation (Broken):
// ? This crashes with NULL embeddings
var embedding pgvector.Vector
err = tx.QueryRow(ctx, query, docID).Scan(
&doc.ID,
&doc.TenantID,
&doc.Title,
&doc.Content,
&doc.Metadata,
&embedding, // CRASH: can't scan <nil> into pgvector.Vector
&doc.CreatedAt,
&doc.UpdatedAt,
)
Error:
can't scan into dest[5]: unsupported data type: <nil>
The Fix (Correct):
// ? Use pointer types for nullable fields
var embedding *pgvector.Vector // Pointer allows NULL
err = tx.QueryRow(ctx, query, docID).Scan(
&doc.ID,
&doc.TenantID,
&doc.Title,
&doc.Content,
&doc.Metadata,
&embedding, // Can be NULL now
&doc.CreatedAt,
&doc.UpdatedAt,
)
// Handle NULL embeddings gracefully
if embedding != nil && embedding.Slice() != nil {
doc.Embedding = embedding.Slice()
} else {
doc.Embedding = nil // Explicitly set to nil
}
return doc, nil
Hybrid search handles this elegantly—documents without embeddings get vector_score = 0 but still appear in results if they match BM25:
-- Hybrid search handles NULL embeddings gracefully
WITH bm25_results AS (
SELECT id, ts_rank(to_tsvector('english', content), query) AS bm25_score
FROM documents
WHERE to_tsvector('english', content) @@ query
),
vector_results AS (
SELECT id, 1 - (embedding <=> $1) AS vector_score
FROM documents
WHERE embedding IS NOT NULL -- ? Skip NULL embeddings
)
SELECT
d.*,
COALESCE(b.bm25_score, 0) AS bm25_score,
COALESCE(v.vector_score, 0) AS vector_score,
($2 * COALESCE(b.bm25_score, 0) + $3 * COALESCE(v.vector_score, 0)) AS combined_score
FROM documents d
LEFT JOIN bm25_results b ON d.id = b.id
LEFT JOIN vector_results v ON d.id = v.id
WHERE COALESCE(b.bm25_score, 0) > 0 OR COALESCE(v.vector_score, 0) > 0
ORDER BY combined_score DESC
LIMIT $4;
Why this matters:
? Documents without embeddings still searchable (BM25)
? New documents usable immediately (embeddings generated async)
? System degrades gracefully (not all-or-nothing)
? Zero downtime for embedding model updates
Tenant Isolation in Action
Every MCP request sets the tenant context at the database transaction level:
// mcp-server/internal/database/postgres.go
func (db *DB) SetTenantContext(ctx context.Context, tx pgx.Tx, tenantID string) error {
// Note: SET commands don't support parameter binding
// TenantID is validated as UUID by JWT validator, so this is safe
query := fmt.Sprintf("SET LOCAL app.current_tenant_id = '%s'", tenantID)
_, err := tx.Exec(ctx, query)
return err
}
Combined with RLS policies, this ensures complete tenant isolation at the database level.
Real-world security test:
// Integration test: Verify tenant isolation
func TestTenantIsolation(t *testing.T) {
// Create documents for two tenants
tenant1Doc := createDocument(t, db, "tenant-1", "Secret Data A")
tenant2Doc := createDocument(t, db, "tenant-2", "Secret Data B")
// Query as tenant-1
ctx1 := contextWithTenant(ctx, "tenant-1")
results1, _ := db.ListDocuments(ctx1, "tenant-1", ListParams{Limit: 100})
// Query as tenant-2
ctx2 := contextWithTenant(ctx, "tenant-2")
results2, _ := db.ListDocuments(ctx2, "tenant-2", ListParams{Limit: 100})
// Assertions
assert.Contains(t, results1, tenant1Doc)
assert.NotContains(t, results1, tenant2Doc) // ? Cannot see other tenant
assert.Contains(t, results2, tenant2Doc)
assert.NotContains(t, results2, tenant1Doc) // ? Cannot see other tenant
}
Part 4: The A2A Server – Workflow Orchestration
Task Lifecycle
A2A manages stateful tasks through their entire lifecycle:
Server-Sent Events for Real-Time Updates
Why SSE instead of WebSockets?
Feature
SSE
WebSocket
Unidirectional
? Yes (server?client)
? No (bidirectional)
HTTP/2 multiplexing
? Yes
? No
Automatic reconnection
? Built-in
? Manual
Firewall-friendly
? Yes (HTTP)
?? Sometimes blocked
Complexity
? Simple
? Complex
Browser support
? All modern
? All modern
SSE is perfect for agent progress updates because:
One-way communication (server pushes updates)
Simple implementation
Automatic reconnection
Works through corporate firewalls
SSE provides real-time streaming without WebSocket complexity:
// a2a-server/internal/handlers/tasks.go
func (h *TaskHandler) StreamEvents(w http.ResponseWriter, r *http.Request) {
taskID := chi.URLParam(r, "taskId")
// Set SSE headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming not supported", http.StatusInternalServerError)
return
}
// Stream task events
for {
event := h.taskManager.GetNextEvent(taskID)
if event == nil {
break // Task complete
}
// Format as SSE event
data, _ := json.Marshal(event)
fmt.Fprintf(w, "event: task_update\n")
fmt.Fprintf(w, "data: %s\n\n", data)
flusher.Flush()
if event.Status == "completed" || event.Status == "failed" {
break
}
}
}
Client-side consumption is trivial:
# streamlit-ui/pages/3_?_A2A_Tasks.py
def stream_task_events(task_id: str):
url = f"{A2A_BASE_URL}/tasks/{task_id}/events"
with requests.get(url, stream=True) as response:
for line in response.iter_lines():
if line.startswith(b'data:'):
data = json.loads(line[5:])
st.write(f"Update: {data['message']}")
yield data
LangGraph Workflow Integration
LangGraph workflows call MCP tools through the A2A server:
# orchestration/workflows/rag_workflow.py
class RAGWorkflow:
def __init__(self, mcp_url: str):
self.mcp_client = MCPClient(mcp_url)
self.workflow = self.build_workflow()
def build_workflow(self) -> StateGraph:
workflow = StateGraph(RAGState)
# Define workflow steps
workflow.add_node("search", self.search_documents)
workflow.add_node("rank", self.rank_results)
workflow.add_node("generate", self.generate_answer)
workflow.add_node("verify", self.verify_sources)
# Define edges (workflow graph)
workflow.add_edge(START, "search")
workflow.add_edge("search", "rank")
workflow.add_edge("rank", "generate")
workflow.add_edge("generate", "verify")
workflow.add_edge("verify", END)
return workflow.compile()
def search_documents(self, state: RAGState) -> RAGState:
"""Search for relevant documents using MCP hybrid search"""
# This is where MCP and A2A integrate!
results = self.mcp_client.hybrid_search(
query=state["query"],
limit=10,
bm25_weight=0.5,
vector_weight=0.5
)
state["documents"] = results
state["progress"] = f"Found {len(results)} documents"
# Emit progress event via A2A
emit_progress_event(state["task_id"], "search_complete", state["progress"])
return state
def rank_results(self, state: RAGState) -> RAGState:
"""Rank results by combined score"""
docs = sorted(
state["documents"],
key=lambda x: x["score"],
reverse=True
)[:5]
state["ranked_docs"] = docs
state["progress"] = "Ranked top 5 documents"
emit_progress_event(state["task_id"], "ranking_complete", state["progress"])
return state
def generate_answer(self, state: RAGState) -> RAGState:
"""Generate answer using retrieved context"""
context = "\n\n".join([
f"Document: {doc['title']}\n{doc['content']}"
for doc in state["ranked_docs"]
])
prompt = f"""Based on the following documents, answer the question.
Context:
{context}
Question: {state['query']}
Answer:"""
# Call Ollama for local inference
response = ollama.generate(
model="llama3.2",
prompt=prompt
)
state["answer"] = response["response"]
state["progress"] = "Generated final answer"
emit_progress_event(state["task_id"], "generation_complete", state["progress"])
return state
def verify_sources(self, state: RAGState) -> RAGState:
"""Verify sources are accurately cited"""
# Check each cited document exists in ranked_docs
cited_docs = extract_citations(state["answer"])
verified = all(doc in state["ranked_docs"] for doc in cited_docs)
state["verified"] = verified
state["progress"] = "Verified sources" if verified else "Source verification failed"
emit_progress_event(state["task_id"], "verification_complete", state["progress"])
return state
The workflow executes as a multi-step pipeline, with each step:
Calling MCP tools for data access
Updating state
Emitting progress events via A2A
Handling errors gracefully
Part 5: Production-Grade Features
1. Authentication & Security
JWT Token Generation (Streamlit UI):
# streamlit-ui/pages/1_?_Authentication.py
def generate_jwt_token(tenant_id: str, user_id: str, ttl: int = 3600) -> str:
"""Generate RS256 JWT token with proper claims"""
now = datetime.now(timezone.utc)
payload = {
"tenant_id": tenant_id,
"user_id": user_id,
"iat": now, # Issued at
"exp": now + timedelta(seconds=ttl), # Expiration
"nbf": now, # Not before
"jti": str(uuid.uuid4()), # JWT ID (for revocation)
"iss": "mcp-demo-ui", # Issuer
"aud": "mcp-server" # Audience
}
# Sign with RSA private key
with open("/app/certs/private_key.pem", "rb") as f:
private_key = serialization.load_pem_private_key(
f.read(),
password=None
)
token = jwt.encode(payload, private_key, algorithm="RS256")
return token
OpenTelemetry excels at infrastructure observability but lacks LLM-specific context. Langfuse provides deep LLM insights but doesn’t trace service-to-service calls. Together, they provide complete visibility.
Example: End-to-End Trace
Python Workflow (OpenTelemetry + Langfuse):
from opentelemetry import trace
from langfuse.decorators import observe
class RAGWorkflow:
def __init__(self):
# OTel for distributed tracing
self.tracer = setup_otel_tracing("rag-workflow")
# Langfuse for LLM tracking
self.langfuse = Langfuse(...)
@observe(name="search_documents") # Langfuse tracks this
def _search_documents(self, state):
# OTel: Create span for MCP call
with self.tracer.start_as_current_span("mcp.hybrid_search") as span:
span.set_attribute("search.query", state["query"])
span.set_attribute("search.top_k", 5)
# HTTP request auto-instrumented, propagates trace context
result = self.mcp_client.hybrid_search(
query=state["query"],
limit=5
)
span.set_attribute("search.result_count", len(documents))
return state
MCP Client (W3C Trace Context Propagation):
from opentelemetry.propagate import inject
def _make_request(self, method: str, params: Any = None):
headers = {'Content-Type': 'application/json'}
# Inject trace context into HTTP headers
inject(headers) # Adds 'traceparent' header
response = self.session.post(
f"{self.base_url}/mcp",
json=payload,
headers=headers # Trace continues in Go server
)
# Unit tests (fast, no dependencies)
cd mcp-server
go test -v ./...
# Integration tests (requires PostgreSQL)
./scripts/run-integration-tests.sh
The integration test script:
Checks if PostgreSQL is running
Waits for database ready
Runs all integration tests
Reports coverage
Output:
? Running MCP Server Integration Tests
========================================
? PostgreSQL is ready
? Running integration tests...
=== RUN TestGetDocument_WithNullEmbedding
--- PASS: TestGetDocument_WithNullEmbedding (0.05s)
=== RUN TestGetDocument_WithEmbedding
--- PASS: TestGetDocument_WithEmbedding (0.04s)
=== RUN TestHybridSearch_HandlesNullEmbeddings
--- PASS: TestHybridSearch_HandlesNullEmbeddings (0.12s)
=== RUN TestTenantIsolation
--- PASS: TestTenantIsolation (0.08s)
=== RUN TestConcurrentRetrievals
--- PASS: TestConcurrentRetrievals (2.34s)
PASS
coverage: 95.3% of statements
ok github.com/bhatti/mcp-a2a-go/mcp-server/internal/database 3.456s
? Integration tests completed!
Part 7: Real-World Use Cases
Use Case 1: Enterprise RAG Search
Scenario: Consulting firm managing 50,000+ contract documents across multiple clients. Each client (tenant) must have complete data isolation. Legal team needs to:
Search with exact terms (case citations, contract clauses)
Find semantically similar clauses (non-obvious connections)
Track who accessed what (audit trail)
Enforce budget limits per client matter
Solution: Hybrid search combining BM25 (keywords) and vector similarity (semantics).
# Client code
results = mcp_client.hybrid_search(
query="data breach notification requirements GDPR Article 33",
limit=10,
bm25_weight=0.6, # Favor exact keyword matches for legal terms
vector_weight=0.4 # But include semantic similarity
)
for result in results:
print(f"Document: {result['title']}")
print(f"BM25 Score: {result['bm25_score']:.2f}")
print(f"Vector Score: {result['vector_score']:.2f}")
print(f"Combined: {result['score']:.2f}")
print(f"Tenant: {result['tenant_id']}")
print("---")
? Finds documents with exact terms (“GDPR”, “Article 33”)
? Surfaces semantically similar docs (“privacy breach”, “data protection”)
? Tenant isolation ensures Client A can’t see Client B’s contracts
? Audit trail via structured logging
? Cost tracking per client matter
Use Case 2: Multi-Step Research Workflows
Scenario: Investment analyst needs to research a company across multiple data sources:
Company filings (10-K, 10-Q, 8-K)
Competitor analysis
Market trends
Financial metrics
Regulatory filings
News sentiment
Traditional RAG: Query each source separately, manually synthesize results.
With A2A + MCP: Orchestrate multi-step workflow with progress tracking.
# orchestration/workflows/research_workflow.py
class ResearchWorkflow:
def build_workflow(self):
workflow = StateGraph(ResearchState)
# Define research steps
workflow.add_node("search_company", self.search_company_docs)
workflow.add_node("search_competitors", self.search_competitors)
workflow.add_node("search_financials", self.search_financial_data)
workflow.add_node("analyze_trends", self.analyze_market_trends)
workflow.add_node("verify_facts", self.verify_with_sources)
workflow.add_node("generate_report", self.generate_final_report)
# Define workflow graph
workflow.add_edge(START, "search_company")
workflow.add_edge("search_company", "search_competitors")
workflow.add_edge("search_competitors", "search_financials")
workflow.add_edge("search_financials", "analyze_trends")
workflow.add_edge("analyze_trends", "verify_facts")
workflow.add_edge("verify_facts", "generate_report")
workflow.add_edge("generate_report", END)
return workflow.compile()
def search_company_docs(self, state: ResearchState) -> ResearchState:
"""Step 1: Search company documents via MCP"""
company = state["company_name"]
# Call MCP hybrid search
results = self.mcp_client.hybrid_search(
query=f"{company} business operations revenue products",
limit=20,
bm25_weight=0.5,
vector_weight=0.5
)
state["company_docs"] = results
state["progress"] = f"Found {len(results)} company documents"
# Emit progress via A2A SSE
emit_progress("search_company_complete", state["progress"])
return state
def search_competitors(self, state: ResearchState) -> ResearchState:
"""Step 2: Identify and search competitors"""
company = state["company_name"]
# Extract competitors from company docs
competitors = self.extract_competitors(state["company_docs"])
# Search each competitor
competitor_data = {}
for competitor in competitors:
results = self.mcp_client.hybrid_search(
query=f"{competitor} market share products revenue",
limit=10
)
competitor_data[competitor] = results
state["competitors"] = competitor_data
state["progress"] = f"Analyzed {len(competitors)} competitors"
emit_progress("search_competitors_complete", state["progress"])
return state
def search_financial_data(self, state: ResearchState) -> ResearchState:
"""Step 3: Extract financial metrics"""
company = state["company_name"]
# Search for financial documents
results = self.mcp_client.hybrid_search(
query=f"{company} revenue earnings profit margin cash flow",
limit=15,
bm25_weight=0.7, # Favor exact financial terms
vector_weight=0.3
)
# Extract key metrics
metrics = self.extract_financial_metrics(results)
state["financials"] = metrics
state["progress"] = f"Extracted {len(metrics)} financial metrics"
emit_progress("search_financials_complete", state["progress"])
return state
def verify_facts(self, state: ResearchState) -> ResearchState:
"""Step 5: Verify all facts with sources"""
# Check each claim has supporting document
claims = self.extract_claims(state["report_draft"])
verified_claims = []
for claim in claims:
sources = self.find_supporting_docs(claim, state)
if sources:
verified_claims.append({
"claim": claim,
"sources": sources,
"verified": True
})
state["verified_claims"] = verified_claims
state["progress"] = f"Verified {len(verified_claims)} claims"
emit_progress("verification_complete", state["progress"])
return state
Benefits:
? Multi-step orchestration with state management
? Real-time progress via SSE (analyst sees each step)
? Intermediate results saved as artifacts
? Each step calls MCP tools for data retrieval
? Final report with verified sources
? Cost tracking across all steps
Use Case 3: Budget-Controlled AI Assistance
Scenario: SaaS company (e.g., document management platform) offers AI features to customers based on tiered subscription: Without budget control: Customer on free tier makes 10,000 queries in one day.
With budget control:
# Before each request
tier = get_user_tier(user_id)
budget = BUDGET_TIERS[tier]["monthly_budget"]
allowed, remaining = cost_tracker.check_budget(user_id, budget)
if not allowed:
raise BudgetExceededError(
f"Monthly budget of ${budget} exceeded. "
f"Upgrade to {next_tier} for higher limits."
)
# Track the request
response = llm.generate(prompt)
cost = cost_tracker.track_request(
user_id=user_id,
model="llama3.2",
input_tokens=len(prompt.split()),
output_tokens=len(response.split())
)
# Alert when approaching limit
if remaining < 5.0: # $5 remaining
send_alert(user_id, f"Budget alert: ${remaining:.2f} remaining")
Real-world budget enforcement:
# streamlit-ui/pages/4_?_Cost_Tracking.py
def enforce_budget_limits():
"""Check budget before task creation"""
user_tier = st.session_state.get("user_tier", "free")
budget = BUDGET_TIERS[user_tier]["monthly_budget"]
# Calculate current spend
spent = cost_tracker.get_total_cost(user_id)
remaining = budget - spent
# Display budget status
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Budget", f"${budget:.2f}")
with col2:
st.metric("Spent", f"${spent:.2f}",
delta=f"-${spent:.2f}", delta_color="inverse")
with col3:
progress = (spent / budget) * 100
st.metric("Remaining", f"${remaining:.2f}")
st.progress(progress / 100)
# Block if exceeded
if remaining <= 0:
st.error("? Monthly budget exceeded. Upgrade to continue.")
st.button("Upgrade to Pro ($25/month)", on_click=upgrade_tier)
return False
# Warn if close
if remaining < 5.0:
st.warning(f"?? Budget alert: Only ${remaining:.2f} remaining this month")
return True
Benefits:
? Prevent cost overruns per customer
? Fair usage enforcement across tiers
? Export data for billing/accounting
? Different limits per tier
? Automatic alerts before limits
? Graceful degradation (local models for free tier)
5,000+ req/sec means 432 million requests/day per instance
<100ms search means interactive UX
52MB memory means cost-effective scaling
Load Testing Results
# Using hey (HTTP load generator)
hey -n 10000 -c 100 -m POST \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"hybrid_search","arguments":{"query":"machine learning","limit":10}}}' \
http://localhost:8080/mcp
Summary:
Total: 19.8421 secs
Slowest: 0.2847 secs
Fastest: 0.0089 secs
Average: 0.1974 secs
Requests/sec: 503.98
Status code distribution:
[200] 10000 responses
Latency distribution:
10% in 0.0234 secs
25% in 0.0456 secs
50% in 0.1842 secs
75% in 0.3123 secs
90% in 0.4234 secs
95% in 0.4867 secs
99% in 0.5634 secs
Scaling Strategy
Horizontal Scaling:
MCP and A2A servers are stateless—scale with container replicas
Database read replicas for read-heavy workloads (search queries)
Redis cache for frequently accessed queries (30-second TTL)
Load balancer distributes requests (sticky sessions not needed)
Vertical Scaling:
Increase PostgreSQL resources for larger datasets
Add pgvector HNSW indexes for faster vector search
Tune connection pool sizes (PgBouncer)
When to scale what:
Symptom
Solution
High MCP server CPU
Add more MCP replicas
Slow database queries
Add read replicas
High memory on MCP
Check for memory leaks, add replicas
Cache misses
Increase Redis memory, tune TTL
Slow embeddings
Deploy dedicated embedding service
Part 10: Lessons Learned & Best Practices
1. Go for Protocol Servers
Go’s performance and type safety provides a good support for AI deployment in production.
2. PostgreSQL Row-Level Security
Database-level tenant isolation is non-negotiable for enterprise. Application-level filtering is too easy to screw up. With RLS, even if your application has a bug, the database enforces isolation.
3. Integration Tests Against Real Databases
Unit tests with mocks didn’t catch the NULL embedding issues. Integration tests did. Test against production-like environments.
4. Optional Langfuse
Making Langfuse optional (try/except imports) lets developers run locally without complex setup while enabling full observability in production.
5. Comprehensive Documentation
Document your design and testing process from day one.
Use both Langfuse and OpenTelemetry. OTel traces service flow, Langfuse tracks LLM behavior. They complement, not replace each other.
OpenTelemetry for infrastructure: Trace context propagation across Python ? Go ? Database gave complete visibility into request flow. The traceparent header auto-propagation through requests/httpx made it seamless.
Langfuse for LLM calls: Token counts, costs, and prompt tracking. Essential for budget control and debugging LLM behavior.
Prometheus + Jaeger: Prometheus for metrics dashboards (query “What’s our P95 latency?”), Jaeger for debugging specific slow traces (“Why was this request slow?”).
That’s 10 layers of production concerns. Miss one, and you have a security incident waiting to happen.
Distributed Systems Lessons Apply Here
AI agents are distributed systems. The problems from microservices apply, because agents make autonomous decisions with potentially unbounded costs. From my fault tolerance article, these patterns are essential:
Without timeouts:
embedding = ollama.embed(text) # Ollama down ? hangs forever ? system freezes
Tenant A: 10,000 req/sec ? Database crashes ? ALL tenants down
With rate limiting:
if !rateLimiter.Allow(tenantID) {
return ErrRateLimitExceeded // Other tenants unaffected
}
The Bottom Line
MCP and A2A are excellent protocols. They solve real problems:
? MCP standardizes tool execution
? A2A standardizes agent coordination
But protocols are not products. Building on MCP/A2A is like building on HTTP—the protocol is solved, but you still need web servers, frameworks, security layers, and monitoring tools.
This repository shows the other 90%:
Real authentication (not “TODO: add auth”)
Real multi-tenancy (database RLS, not app filtering)
Real observability (Langfuse integration, not “we should add logging”)
Real testing (integration tests, not just mocks)
Real deployment (K8s manifests, not “works on my laptop”)
Get Started
git clone https://github.com/bhatti/mcp-a2a-go
cd mcp-a2a-go
docker compose up -d
./scripts/run-integration-tests.sh
open http://localhost:8501