Lab 6: Nested Agent Graphs¶
⏱️ Estimated completion time: 45 minutes
Overview¶
This lab demonstrates the Coordinator/Worker/Delegator (CWD) pattern using LangGraph, modeling a hierarchical organizational structure. It showcases:
- Hierarchical organization in a graph structure
- Parallel execution of specialized worker nodes
- State management across organizational layers
- Advanced patterns with nested subgraphs
Learning Objectives¶
By the end of this lab, you will understand: - How to implement organizational hierarchies in LangGraph - Coordinator-Worker-Delegator design patterns - Nested subgraph architectures - Complex state management across multiple layers
Prerequisites¶
- Python 3.8+
- LangGraph installed (
pip install langgraph
) - Tenacity for retry logic (
pip install tenacity
)
Key Concepts¶
Coordinator/Worker/Delegator Pattern¶
- Coordinator: Creates high-level plans from user requests
- Delegator: Distributes work to specialized workers
- Workers: Handle specific tasks in parallel
- Assembly: Combines results into final output
Hierarchical Graph Organization¶
- Multiple layers of decision-making and execution
- Clear separation of concerns between roles
- Scalable architecture for complex workflows
Lab Code¶
#!/usr/bin/env python3
"""
Chapter 6 - Coordinator/Worker/Delegator with Nested Graphs
----------------------------------------------------------
This example demonstrates how to implement the Coordinator/Worker/Delegator (CWD)
pattern using LangGraph. It models an organizational hierarchy where:
1. A coordinator creates a high-level plan from a user request
2. A delegator distributes work to specialized workers
3. Workers handle specific tasks in parallel
4. Results are assembled into a final output
Key concepts:
- Hierarchical organization in a graph
- Parallel execution of worker nodes
- State management across organization layers
- Nested subgraphs
"""
import argparse
import json
from typing import Dict, List, TypedDict
from tenacity import retry, stop_after_attempt, wait_fixed
from langgraph.graph import StateGraph
# ---------------------------------------------------------------------------
# Mock external APIs --------------------------------------------------------
# ---------------------------------------------------------------------------
def _mock_flights_api(origin: str, destination: str):
"""Mock flight search API."""
print(f"Worker: Searching flights from {origin} to {destination}")
return [
{"airline": "EconoFly", "price": 350, "duration": "2h 15m"},
{"airline": "LuxAir", "price": 720, "duration": "1h 55m"},
]
def _mock_hotels_api(location: str, max_price: int):
"""Mock hotel search API."""
print(f"Worker: Finding hotels in {location} under ${max_price}")
return [
{"name": "Downtown Inn", "price": 175, "rating": 3.8},
{"name": "Grand Hotel", "price": 340, "rating": 4.5},
]
def _mock_activities_api(location: str, preference: str):
"""Mock activities search API."""
print(f"Worker: Finding {preference} activities in {location}")
return [
{"name": "Guided City Tour", "price": 35, "duration": "3 hours"},
{"name": "Local Food Experience", "price": 75, "duration": "4 hours"},
]
# ---------------------------------------------------------------------------
# API wrappers with retry logic ---------------------------------------------
# ---------------------------------------------------------------------------
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
def search_flights(origin: str, destination: str):
"""Wrap flight search with retry logic."""
return _mock_flights_api(origin, destination)
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
def find_hotels(location: str, max_price: int):
"""Wrap hotel search with retry logic."""
return _mock_hotels_api(location, max_price)
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
def find_activities(location: str, preference: str):
"""Wrap activities search with retry logic."""
return _mock_activities_api(location, preference)
# ---------------------------------------------------------------------------
# State definition ----------------------------------------------------------
# ---------------------------------------------------------------------------
class CWDState(TypedDict, total=False):
request_text: str # User's request
coordinator_plan: Dict # Plan created by coordinator
flights: List[Dict] # Flight options found by worker
hotels: List[Dict] # Hotel options found by worker
activities: List[Dict] # Activity options found by worker
complete_itinerary: Dict # Final assembled result
# ---------------------------------------------------------------------------
# Coordinator node ----------------------------------------------------------
# ---------------------------------------------------------------------------
def coordinator_node(state: CWDState) -> CWDState:
"""
The coordinator analyzes the user request and creates a high-level plan.
In a real system, this would use an LLM to extract information from natural
language. For simplicity in this example, we'll use basic text matching.
"""
request = state["request_text"].lower()
print("\nCoordinator: Analyzing request and creating plan")
# Extract destination
destination = None
if "paris" in request:
destination = "Paris"
elif "tokyo" in request:
destination = "Tokyo"
elif "new york" in request:
destination = "New York"
# Extract origin
origin = None
if "from london" in request:
origin = "London"
elif "from berlin" in request:
origin = "Berlin"
elif "from san francisco" in request:
origin = "San Francisco"
# Set defaults if not found
if not destination:
destination = "Paris" # Default destination
if not origin:
origin = "London" # Default origin
# Determine budget preferences
max_price_hotel = 400 # Default
if "luxury" in request:
max_price_hotel = 800
elif "budget" in request:
max_price_hotel = 200
# Determine activity preferences
preference = "general" # Default
if "museum" in request or "culture" in request or "art" in request:
preference = "cultural"
elif "food" in request or "cuisine" in request:
preference = "culinary"
elif "outdoor" in request or "adventure" in request:
preference = "outdoor"
# Create the plan
plan = {
"origin": origin,
"destination": destination,
"max_price_hotel": max_price_hotel,
"preference": preference
}
print(f"Coordinator Plan: {json.dumps(plan, indent=2)}")
# Store plan in state
state["coordinator_plan"] = plan # type: ignore
return state
# ---------------------------------------------------------------------------
# Delegator node ------------------------------------------------------------
# ---------------------------------------------------------------------------
def delegator_prepare(state: CWDState) -> CWDState:
"""
The delegator takes the coordinator's plan and prepares it for workers.
In more complex systems, this would involve breaking down tasks, assigning
priorities, and managing worker selection.
"""
plan = state["coordinator_plan"]
print("\nDelegator: Distributing tasks to specialized workers")
# In this simple example, we just pass through the state
# A more complex delegator might transform the plan into worker-specific tasks
print(f" • Assigned flight search: {plan['origin']} → {plan['destination']}")
print(f" • Assigned hotel search: {plan['destination']} (max ${plan['max_price_hotel']})")
print(f" • Assigned activity search: {plan['preference']} in {plan['destination']}")
return state
# ---------------------------------------------------------------------------
# Worker nodes --------------------------------------------------------------
# ---------------------------------------------------------------------------
def flights_worker(state: CWDState) -> CWDState:
"""Worker responsible for finding flights."""
plan = state["coordinator_plan"]
# Call the flight search API with parameters from the plan
flights = search_flights(plan["origin"], plan["destination"])
# Store results in state
state["flights"] = flights # type: ignore
return state
def hotels_worker(state: CWDState) -> CWDState:
"""Worker responsible for finding hotels."""
plan = state["coordinator_plan"]
# Call the hotel search API with parameters from the plan
hotels = find_hotels(plan["destination"], plan["max_price_hotel"])
# Store results in state
state["hotels"] = hotels # type: ignore
return state
def activities_worker(state: CWDState) -> CWDState:
"""Worker responsible for finding activities."""
plan = state["coordinator_plan"]
# Call the activities search API with parameters from the plan
activities = find_activities(plan["destination"], plan["preference"])
# Store results in state
state["activities"] = activities # type: ignore
return state
# ---------------------------------------------------------------------------
# Result assembly -----------------------------------------------------------
# ---------------------------------------------------------------------------
def assemble_itinerary(state: CWDState) -> CWDState:
"""
Combine all worker results into a complete itinerary.
This represents the final integration of all parallel work streams.
"""
plan = state["coordinator_plan"]
print("\nAssembling final itinerary from worker results")
# For this demo, we'll choose the cheapest flight, highest rated hotel,
# and include all activities
# Select cheapest flight
flights = state.get("flights", [])
selected_flight = min(flights, key=lambda f: f["price"]) if flights else None
# Select highest rated hotel
hotels = state.get("hotels", [])
selected_hotel = max(hotels, key=lambda h: h["rating"]) if hotels else None
# Include all activities found
activities = state.get("activities", [])
# Create the complete itinerary
state["complete_itinerary"] = {
"destination": plan.get("destination"),
"origin": plan.get("origin"),
"flight": selected_flight,
"hotel": selected_hotel,
"activities": activities,
}
return state
# ---------------------------------------------------------------------------
# Graph construction --------------------------------------------------------
# ---------------------------------------------------------------------------
def build_cwd_graph() -> StateGraph:
"""Build the graph representing CWD organizational structure."""
g = StateGraph(CWDState)
# Add the coordinator node as entry point
g.add_node("coordinator", coordinator_node)
g.set_entry_point("coordinator")
# Add delegator node
g.add_node("delegator", delegator_prepare)
g.add_edge("coordinator", "delegator")
# Add worker nodes
g.add_node("flights", flights_worker)
g.add_node("hotels", hotels_worker)
g.add_node("activities", activities_worker)
# Connect delegator to all workers
for worker in ("flights", "hotels", "activities"):
g.add_edge("delegator", worker)
# Connect all workers to assembly
g.add_node("assemble", assemble_itinerary)
for worker in ("flights", "hotels", "activities"):
g.add_edge(worker, "assemble")
# Set assembly as finish point
g.set_finish_point("assemble")
return g
# ---------------------------------------------------------------------------
# Advanced nested subgraphs example -----------------------------------------
# ---------------------------------------------------------------------------
def create_nested_subgraph_example():
"""
Example of using nested subgraphs for even more complex organization.
This function is not called in the main demo but illustrates how
subgraphs can be nested for complex hierarchies.
"""
# This is a conceptual example showing how nested graphs would work
# but not executed in the main flow
# Main graph representing the entire organization
main_graph = StateGraph(Dict)
# Create strategy team subgraph
strategy_team = StateGraph(Dict)
strategy_team.add_node("analyze", lambda s: s)
strategy_team.add_node("plan", lambda s: s)
strategy_team.add_edge("analyze", "plan")
strategy_team.set_entry_point("analyze")
strategy_team.set_finish_point("plan")
# Create operations team subgraph
operations_team = StateGraph(Dict)
operations_team.add_node("logistics", lambda s: s)
operations_team.add_node("execution", lambda s: s)
operations_team.add_edge("logistics", "execution")
operations_team.set_entry_point("logistics")
operations_team.set_finish_point("execution")
# Add team subgraphs to main graph
main_graph.add_node("strategy", strategy_team.compile())
main_graph.add_node("operations", operations_team.compile())
main_graph.add_edge("strategy", "operations")
# This pattern mirrors real organizational charts
return main_graph
# ---------------------------------------------------------------------------
# Main function -------------------------------------------------------------
# ---------------------------------------------------------------------------
def main():
# Parse command-line arguments
parser = argparse.ArgumentParser(description="Coordinator-Worker-Delegator Demo")
parser.add_argument("--request", type=str,
default="I want to travel from London to Paris, stay in a nice hotel, and enjoy some cultural activities.",
help="User request text")
args = parser.parse_args()
# Print header
print("\n=== Coordinator-Worker-Delegator Pattern Demo ===\n")
print(f"User Request: \"{args.request}\"")
# Build the CWD graph
graph = build_cwd_graph().compile()
# Create initial state with user request
initial_state: CWDState = {"request_text": args.request}
# Execute the graph with organizational workflow
final_state = graph.invoke(initial_state)
# Display the results
print("\n=== Final Travel Package ===\n")
itinerary = final_state["complete_itinerary"]
print(f"Trip from {itinerary['origin']} to {itinerary['destination']}")
# Flight information
flight = itinerary.get("flight", {})
if flight:
print(f"\nFlight: {flight['airline']}")
print(f" Price: ${flight['price']}")
print(f" Duration: {flight['duration']}")
# Hotel information
hotel = itinerary.get("hotel", {})
if hotel:
print(f"\nHotel: {hotel['name']}")
print(f" Price: ${hotel['price']} per night")
print(f" Rating: {hotel['rating']}/5.0")
# Activities
print("\nRecommended Activities:")
for idx, activity in enumerate(itinerary.get("activities", [])):
print(f" {idx+1}. {activity['name']}")
print(f" ${activity['price']} - {activity['duration']}")
if __name__ == "__main__":
main()
How to Run¶
- Save the code above as
06_nested_graphs.py
- Install dependencies:
pip install langgraph tenacity
- Run the script:
python 06_nested_graphs.py
- Try with custom requests:
python 06_nested_graphs.py --request "I want a luxury trip from Berlin to Tokyo with outdoor activities"
Expected Output¶
=== Coordinator-Worker-Delegator Pattern Demo ===
User Request: "I want to travel from London to Paris, stay in a nice hotel, and enjoy some cultural activities."
Coordinator: Analyzing request and creating plan
Coordinator Plan: {
"origin": "London",
"destination": "Paris",
"max_price_hotel": 400,
"preference": "cultural"
}
Delegator: Distributing tasks to specialized workers
• Assigned flight search: London → Paris
• Assigned hotel search: Paris (max $400)
• Assigned activity search: cultural in Paris
Worker: Searching flights from London to Paris
Worker: Finding hotels in Paris under $400
Worker: Finding cultural activities in Paris
Assembling final itinerary from worker results
=== Final Travel Package ===
Trip from London to Paris
Flight: EconoFly
Price: $350
Duration: 2h 15m
Hotel: Grand Hotel
Price: $340 per night
Rating: 4.5/5.0
Recommended Activities:
1. Guided City Tour
$35 - 3 hours
2. Local Food Experience
$75 - 4 hours
Key Concepts Explained¶
Organizational Hierarchy¶
- Coordinator: Strategic planning and requirement analysis
- Delegator: Task distribution and worker management
- Workers: Specialized execution units
- Assembly: Result integration and synthesis
Graph Architecture¶
graph TD
A[User Request] --> B[Coordinator]
B --> C[Delegator]
C --> D[Flights Worker]
C --> E[Hotels Worker]
C --> F[Activities Worker]
D --> G[Assemble Results]
E --> G
F --> G
State Flow Management¶
- State flows through hierarchical layers
- Each layer adds its contribution to the state
- Workers operate in parallel on shared state
- Assembly combines all worker outputs
Natural Language Processing¶
- Coordinator extracts structured data from natural language
- Simple keyword matching in this demo
- Real implementations would use LLMs for understanding
Advanced Patterns¶
Nested Subgraphs¶
def create_complex_organization():
"""Example of deeply nested organizational structure."""
main_graph = StateGraph(Dict)
# Department-level subgraphs
marketing_dept = create_marketing_subgraph()
engineering_dept = create_engineering_subgraph()
# Team-level subgraphs within departments
frontend_team = create_frontend_subgraph()
backend_team = create_backend_subgraph()
# Nest teams within departments
engineering_dept.add_node("frontend", frontend_team.compile())
engineering_dept.add_node("backend", backend_team.compile())
return main_graph
Dynamic Worker Assignment¶
def smart_delegator(state: CWDState) -> CWDState:
"""Intelligently assign tasks based on worker capabilities and load."""
available_workers = get_available_workers()
task_complexity = analyze_task_complexity(state["coordinator_plan"])
# Select best workers for each task
for task in state["tasks"]:
best_worker = select_optimal_worker(task, available_workers)
assign_task(task, best_worker)
return state
Error Handling and Recovery¶
def resilient_worker(state: CWDState) -> CWDState:
"""Worker with fallback strategies and error recovery."""
try:
return primary_work_function(state)
except PrimaryServiceError:
return fallback_work_function(state)
except Exception as e:
return error_recovery_function(state, e)
Exercises¶
- Add more worker types: Implement workers for weather, transportation, or currency exchange
- Intelligent delegation: Create a smarter delegator that assigns tasks based on worker availability and expertise
- Nested hierarchies: Implement deeper organizational structures with teams within departments
- Dynamic scaling: Add the ability to spawn additional workers based on workload
- Monitoring and metrics: Track worker performance and success rates
Real-World Applications¶
- Enterprise Workflow Management: Model complex business processes
- Microservices Orchestration: Coordinate distributed service calls
- Data Pipeline Management: Orchestrate ETL processes with multiple stages
- DevOps Automation: Coordinate deployment across multiple environments
- Customer Service: Route requests through specialized support teams
Performance Considerations¶
- Parallel Execution: Workers operate simultaneously for better throughput
- Resource Management: Each worker can be optimized for its specific task
- Fault Tolerance: Individual worker failures don't bring down the entire system
- Scalability: Easy to add new workers or scale existing ones