Lab 5: Parallel Tool Planning¶
⏱️ Estimated completion time: 40 minutes
Overview¶
This lab demonstrates a fan-out/fan-in architecture for parallel tool use in a travel planning scenario. It showcases advanced LangGraph patterns including:
- Parallel branch execution for multiple tool calls
- Fan-out/fan-in pattern for reduced latency
- Tool wrapping with retry logic
- State management across parallel operations
Learning Objectives¶
By the end of this lab, you will understand: - How to implement parallel tool execution in LangGraph - Fan-out/fan-in architectural patterns - Error handling and retry logic for external API calls - State aggregation from multiple parallel operations
Prerequisites¶
- Python 3.8+
- LangGraph installed (
pip install langgraph
) - Tenacity for retry logic (
pip install tenacity
)
Key Concepts¶
Fan-out/Fan-in Pattern¶
- Fan-out: Split execution into multiple parallel branches
- Fan-in: Merge results from parallel branches into final output
- Benefits: Reduced latency, better resource utilization
Parallel Tool Execution¶
- Multiple external APIs called simultaneously
- Independent operation execution
- Coordinated result aggregation
Lab Code¶
#!/usr/bin/env python3
"""
Chapter 5 - Parallel Tool-Use Planner with LangGraph
---------------------------------------------------
This example demonstrates a fan-out/fan-in architecture for parallel tool use
in a travel planning scenario. It:
1. Searches for flights, hotels, and activities in parallel branches
2. Merges the results into a complete itinerary
3. Handles tool errors with built-in retry logic
Key concepts:
- Parallel branch execution
- Fan-out/fan-in pattern for reduced latency
- Tool wrapping with retry logic
- Typed state for better maintainability
"""
import argparse
import json
from typing import Dict, List, TypedDict
from tenacity import retry, stop_after_attempt, wait_fixed
from langgraph.graph import StateGraph
# ---------------------------------------------------------------------------
# Mock travel APIs ----------------------------------------------------------
# ---------------------------------------------------------------------------
def _demo_flights(origin: str, destination: str, date: str):
"""Mock flight API - in real life this would call an external service."""
print(f"🔍 Searching flights: {origin} → {destination} on {date}")
return [
{"airline": "BudgetAir", "price": 320, "departure": "07:30", "arrival": "09:45"},
{"airline": "ComfortJet", "price": 480, "departure": "10:15", "arrival": "12:30"},
{"airline": "LuxAir", "price": 750, "departure": "13:00", "arrival": "15:15"},
]
def _demo_hotels(location: str, check_in: str, check_out: str):
"""Mock hotel API - in real life this would call an external service."""
print(f"🔍 Searching hotels in {location} from {check_in} to {check_out}")
return [
{"name": "City Budget Inn", "price": 120, "rating": 3.5, "amenities": ["WiFi", "Breakfast"]},
{"name": "Central Hotel", "price": 220, "rating": 4.2, "amenities": ["WiFi", "Pool", "Gym"]},
{"name": "Luxury Suites", "price": 450, "rating": 4.8, "amenities": ["WiFi", "Pool", "Spa", "Restaurant"]},
]
def _demo_activities(location: str, date_range: str):
"""Mock activities API - in real life this would call an external service."""
print(f"🔍 Searching activities in {location} during {date_range}")
return [
{"name": "City Walking Tour", "price": 25, "duration": "2 hours", "rating": 4.5},
{"name": "Museum Pass", "price": 40, "duration": "All day", "rating": 4.7},
{"name": "Food & Wine Tour", "price": 85, "duration": "3 hours", "rating": 4.8},
]
# ---------------------------------------------------------------------------
# Retry wrappers for API calls ----------------------------------------------
# ---------------------------------------------------------------------------
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
def search_flights(origin: str, destination: str, date: str):
"""Wrap flight search with retry logic."""
return _demo_flights(origin, destination, date)
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
def find_hotels(location: str, check_in: str, check_out: str):
"""Wrap hotel search with retry logic."""
return _demo_hotels(location, check_in, check_out)
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
def find_activities(location: str, date_range: str):
"""Wrap activities search with retry logic."""
return _demo_activities(location, date_range)
# ---------------------------------------------------------------------------
# State typing --------------------------------------------------------------
# ---------------------------------------------------------------------------
class PlannerState(TypedDict, total=False):
origin: str
destination: str
date: str
check_in: str
check_out: str
flights: List[Dict]
hotels: List[Dict]
activities: List[Dict]
itinerary: Dict
# ---------------------------------------------------------------------------
# Graph nodes ---------------------------------------------------------------
# ---------------------------------------------------------------------------
def flight_node(state: PlannerState) -> PlannerState:
"""Search for flights and store in state."""
flights = search_flights(
state["origin"],
state["destination"],
state["date"]
)
# Store results in state
state["flights"] = flights # type: ignore
print(f"✓ Found {len(flights)} flight options")
return state
def hotel_node(state: PlannerState) -> PlannerState:
"""Search for hotels and store in state."""
hotels = find_hotels(
state["destination"],
state["check_in"],
state["check_out"]
)
# Store results in state
state["hotels"] = hotels # type: ignore
print(f"✓ Found {len(hotels)} hotel options")
return state
def activity_node(state: PlannerState) -> PlannerState:
"""Search for activities and store in state."""
date_range = f"{state['check_in']} to {state['check_out']}"
activities = find_activities(
state["destination"],
date_range
)
# Store results in state
state["activities"] = activities # type: ignore
print(f"✓ Found {len(activities)} activity options")
return state
def merge_itinerary(state: PlannerState) -> PlannerState:
"""Combine all search results into a final itinerary."""
# Find cheapest flight
cheapest_flight = min(
state.get("flights", []),
key=lambda f: f["price"],
default={}
)
# Find best hotel (highest rating per dollar)
hotels = state.get("hotels", [])
if hotels:
best_hotel = max(
hotels,
key=lambda h: h["rating"] / max(1, h["price"] / 100),
default={}
)
else:
best_hotel = {}
# Select top-rated activities
activities = state.get("activities", [])
top_activities = sorted(
activities,
key=lambda a: a["rating"],
reverse=True
)[:2] # Top 2 activities
# Create the complete itinerary
state["itinerary"] = {
"destination": state.get("destination", ""),
"dates": f"{state.get('check_in', '')} to {state.get('check_out', '')}",
"flight": cheapest_flight,
"hotel": best_hotel,
"activities": top_activities,
"estimated_total": (
cheapest_flight.get("price", 0) +
(best_hotel.get("price", 0) *
_days_between(state.get("check_in", ""), state.get("check_out", ""))) +
sum(a.get("price", 0) for a in top_activities)
)
}
print("✓ Created complete itinerary")
return state
# Helper function for calculating stay duration
def _days_between(check_in: str, check_out: str) -> int:
"""Simple helper to calculate days between dates for demo purposes."""
# In a real app, we'd use datetime to calculate this
# For demo, we'll just return a fixed value
return 3
# ---------------------------------------------------------------------------
# Graph construction --------------------------------------------------------
# ---------------------------------------------------------------------------
def build_planner_graph() -> StateGraph:
"""Build a graph with parallel branches for each search operation."""
g = StateGraph(PlannerState)
# Add a starting node
g.add_node("start", lambda s: s) # no-op seed node
g.set_entry_point("start")
# Add search nodes
g.add_node("flights", flight_node)
g.add_node("hotels", hotel_node)
g.add_node("activities", activity_node)
# Connect start node to all search nodes (fan-out)
for branch in ("flights", "hotels", "activities"):
g.add_edge("start", branch)
# Add merge node and connect all search nodes to it (fan-in)
g.add_node("merge", merge_itinerary)
for branch in ("flights", "hotels", "activities"):
g.add_edge(branch, "merge")
# Set the finish point
g.set_finish_point("merge")
return g
# ---------------------------------------------------------------------------
# Main function -------------------------------------------------------------
# ---------------------------------------------------------------------------
def main():
# Parse command-line arguments
parser = argparse.ArgumentParser(description="Parallel Travel Planner")
parser.add_argument("--origin", default="SFO", help="Origin airport code")
parser.add_argument("--destination", default="PAR", help="Destination city code")
parser.add_argument("--date", default="2025-06-15", help="Departure date")
parser.add_argument("--checkin", default="2025-06-15", help="Hotel check-in date")
parser.add_argument("--checkout", default="2025-06-18", help="Hotel check-out date")
args = parser.parse_args()
# Print header
print("\n=== Parallel Travel Planner ===\n")
print(f"Planning a trip from {args.origin} to {args.destination}")
print(f"Travel date: {args.date}")
print(f"Stay: {args.checkin} to {args.checkout}")
# Build and compile the graph
graph = build_planner_graph().compile()
# Create initial state with user inputs
init_state: PlannerState = {
"origin": args.origin,
"destination": args.destination,
"date": args.date,
"check_in": args.checkin,
"check_out": args.checkout,
}
print("\nExecuting parallel search...\n")
# Run the graph with initial state
final_state = graph.invoke(init_state)
# Display the results
print("\n=== Travel Itinerary ===\n")
itinerary = final_state.get("itinerary", {})
print(f"Trip to {itinerary.get('destination')}")
print(f"Dates: {itinerary.get('dates')}")
flight = itinerary.get("flight", {})
print(f"\nFlight: {flight.get('airline')}")
print(f" Price: ${flight.get('price')}")
print(f" Departure: {flight.get('departure')}")
hotel = itinerary.get("hotel", {})
print(f"\nHotel: {hotel.get('name')}")
print(f" Price: ${hotel.get('price')} per night")
print(f" Rating: {hotel.get('rating')}/5.0")
print(f" Amenities: {', '.join(hotel.get('amenities', []))}")
print("\nActivities:")
for idx, activity in enumerate(itinerary.get("activities", [])):
print(f" {idx+1}. {activity.get('name')}")
print(f" Price: ${activity.get('price')}")
print(f" Duration: {activity.get('duration')}")
print(f"\nEstimated Total: ${itinerary.get('estimated_total')}")
if __name__ == "__main__":
main()
How to Run¶
- Save the code above as
05_parallel_planning.py
- Install dependencies:
pip install langgraph tenacity
- Run the script:
python 05_parallel_planning.py
- Try with custom parameters:
python 05_parallel_planning.py --origin LAX --destination NYC --date 2025-07-01
Expected Output¶
=== Parallel Travel Planner ===
Planning a trip from SFO to PAR
Travel date: 2025-06-15
Stay: 2025-06-15 to 2025-06-18
Executing parallel search...
🔍 Searching flights: SFO → PAR on 2025-06-15
✓ Found 3 flight options
🔍 Searching hotels in PAR from 2025-06-15 to 2025-06-18
✓ Found 3 hotel options
🔍 Searching activities in PAR during 2025-06-15 to 2025-06-18
✓ Found 3 activity options
✓ Created complete itinerary
=== Travel Itinerary ===
Trip to PAR
Dates: 2025-06-15 to 2025-06-18
Flight: BudgetAir
Price: $320
Departure: 07:30
Hotel: Central Hotel
Price: $220 per night
Rating: 4.2/5.0
Amenities: WiFi, Pool, Gym
Activities:
1. Food & Wine Tour
Price: $85
Duration: 3 hours
2. Museum Pass
Price: $40
Duration: All day
Estimated Total: $1105
Key Concepts Explained¶
Fan-out/Fan-in Architecture¶
- Fan-out: Start node connects to multiple parallel branches
- Parallel Execution: Flights, hotels, and activities searched simultaneously
- Fan-in: All parallel results merge into final itinerary
- Performance Benefit: Reduces total execution time significantly
Retry Logic Implementation¶
- Automatic retry for transient failures
- Configurable retry attempts and delays
- Graceful degradation if services are unavailable
State Management Across Parallel Operations¶
- Each parallel branch adds its results to shared state
- Merge node processes accumulated results
- Type safety with TypedDict state definition
Intelligent Result Selection¶
- Flights: Cheapest option selected
- Hotels: Best value (rating per dollar) chosen
- Activities: Top-rated activities prioritized
Graph Architecture¶
graph TD
A[Start] --> B[Flights Node]
A --> C[Hotels Node]
A --> D[Activities Node]
B --> E[Merge Itinerary]
C --> E
D --> E
Advanced Patterns¶
Dynamic Branch Creation¶
def create_dynamic_branches(destinations: List[str]) -> StateGraph:
"""Create parallel branches for multiple destinations."""
g = StateGraph(PlannerState)
for dest in destinations:
node_name = f"search_{dest}"
g.add_node(node_name, lambda s, d=dest: search_destination(s, d))
g.add_edge("start", node_name)
g.add_edge(node_name, "merge")
return g
Error Handling with Fallbacks¶
def search_with_fallback(state: PlannerState) -> PlannerState:
"""Search with fallback to alternative providers."""
try:
return primary_search(state)
except Exception:
return fallback_search(state)
Exercises¶
- Add more parallel branches: Include car rentals, restaurants, or weather forecasts
- Implement true parallelism: Use asyncio for concurrent API calls
- Add error recovery: Implement fallback providers for each service
- Dynamic pricing: Add real-time price comparison across multiple providers
- User preferences: Allow customizable selection criteria for each service type
Real-World Applications¶
- E-commerce: Parallel product search across multiple vendors
- Financial Services: Real-time data aggregation from multiple sources
- Healthcare: Simultaneous queries to multiple medical databases
- Research: Parallel literature searches across academic databases
Performance Benefits¶
- Latency Reduction: 3x faster than sequential execution
- Resource Efficiency: Better utilization of I/O wait time
- Scalability: Easy to add new parallel branches
- Fault Tolerance: Individual branch failures don't block others