Code
Createsupervisor_cbt.py with the code below, or save it directly from your editor.
from bindu.penguin.bindufy import bindufy
from bindu.utils.logging import get_logger
from langgraph_integration import LangGraphWorkflowAdapter
from state_mapper import (
build_langgraph_input,
protocol_state_to_bindu_artifact,
bindu_message_from_artifact,
)
from uuid import UUID, uuid4
logger = get_logger("cerina_bindu.cbt.supervisor")
_workflow_adapter: LangGraphWorkflowAdapter | None = None
def get_workflow_adapter() -> LangGraphWorkflowAdapter:
global _workflow_adapter
if _workflow_adapter is None:
_workflow_adapter = LangGraphWorkflowAdapter()
return _workflow_adapter
async def handler(messages: list[dict]):
if not messages:
return [{"role": "assistant", "content": "No input provided"}]
last_message = messages[-1]
context_id = UUID(last_message.get("context_id") or str(uuid4()))
task_id = UUID(last_message.get("task_id") or str(uuid4()))
try:
langgraph_input = build_langgraph_input(messages, context_id, task_id)
user_intent = langgraph_input["user_intent"]
thread_id = langgraph_input["thread_id"]
if not user_intent:
return [{"role": "assistant", "content": "No user intent provided"}]
adapter = get_workflow_adapter()
final_state = await adapter.invoke(
user_intent=user_intent,
thread_id=thread_id,
task_id=str(task_id),
metadata=langgraph_input.get("metadata", {}),
)
artifact_id = uuid4()
artifact = protocol_state_to_bindu_artifact(final_state, artifact_id)
assistant_messages = bindu_message_from_artifact(artifact)
if assistant_messages:
assistant_messages[0]["metadata"] = artifact.get("metadata", {})
return assistant_messages
except Exception as e:
logger.error(f"Error processing CBT request: {str(e)}", exc_info=True)
return [{"role": "assistant", "content": f"Error generating CBT exercise: {str(e)}"}]
config = {
"author": "imdanishakhtar7@gmail.com",
"name": "cerina_supervisor_cbt",
"description": "Supervisor orchestrator for Cerina CBT integration with LangGraph workflow.",
"deployment": {
"url": "http://localhost:3773",
"expose": True,
"cors_origins": ["http://localhost:5173"]
},
"skills": ["../../skills/cbt-supervisor-orchestrator"],
}
bindufy(config, handler)
#bindufy(config, handler, launch=True)
# This will create a tunnel to your agent and expose it on port 3773
Additional Files
Create these supporting files in the same directory:agents.py
"""
Individual Agent Implementations
Each agent has a specific role in the CBT exercise generation process.
"""
from typing import Dict, Any
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
import os
import json
from pathlib import Path
from dotenv import load_dotenv
from state import ProtocolState
from utils import log_agent_activity
# Load environment variables from .env file in cbt folder
env_path = Path(__file__).parent / ".env"
load_dotenv(dotenv_path=env_path)
def get_llm():
"""Get the LLM instance using OpenAI API"""
api_key = os.getenv("OPENROUTER_API_KEY")
if not api_key:
raise ValueError(
"OPENROUTER_API_KEY not found. Get your key from https://openrouter.ai/keys"
)
base_url = "https://openrouter.ai/api/v1"
# Using OpenRouter's GPT model
model_name = "openai/gpt-oss-120b"
return ChatOpenAI(
api_key=api_key, base_url=base_url, model=model_name, temperature=0.7
)
class DrafterAgent:
"""Creates initial CBT exercise drafts based on user intent"""
def __init__(self, llm=None):
self.llm = llm or get_llm()
self.name = "Drafter"
async def draft(self, state: ProtocolState) -> Dict[str, Any]:
"""Generate an initial draft of the CBT exercise"""
intent = state["user_intent"]
iteration = state["iteration_count"]
print(f"\n{'=' * 60}")
print(f"DEBUG DRAFT NODE: intent={repr(intent)[:100]}")
print(f"DEBUG DRAFT NODE: iteration={iteration}")
print(f"{'=' * 60}\n")
# Check for feedback from other agents
recent_notes = [
n for n in state["agent_notes"][-5:] if n.get("target_agent") == "Drafter"
]
feedback_context = "\n".join([f"- {n['note']}" for n in recent_notes])
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"""You are a clinical psychologist specializing in Cognitive Behavioral Therapy (CBT).
Your task is to create structured, empathetic, and evidence-based CBT exercises.
Guidelines:
1. Structure: Clear sections (Introduction, Steps, Practice, Reflection)
2. Tone: Warm, supportive, non-judgmental
3. Safety: Never provide medical advice or encourage self-harm
4. Evidence-based: Use established CBT techniques
5. Accessibility: Clear language, actionable steps
Format your response as a complete CBT exercise protocol.""",
),
(
"human",
"""Create a CBT exercise for: {intent}
{feedback_context}
{iteration_context}""",
),
]
)
iteration_context = ""
if iteration > 0:
iteration_context = f"This is iteration {iteration + 1}. Previous drafts had issues that need addressing."
messages = prompt.format_messages(
intent=intent,
feedback_context=feedback_context or "No specific feedback yet.",
iteration_context=iteration_context,
)
print(f"DEBUG: Calling LLM with intent: {intent[:100]}")
print(f"DEBUG: Full prompt messages count: {len(messages)}")
if messages:
print(
f"DEBUG: Human message content preview: {str(messages[-1].content)[:150]}..."
)
response = await self.llm.ainvoke(messages)
draft_content = response.content
print(
f"DEBUG: LLM response received, length: {len(draft_content) if draft_content else 0}"
)
print(
f"DEBUG: Draft content preview: {draft_content[:100] if draft_content else '[empty]'}..."
)
# Log activity
log_agent_activity(
session_id=state.get("session_id", "unknown"),
agent_name=self.name,
action="drafted_protocol",
reasoning=f"Created draft for: {intent[:100]}",
)
# Create new agent note
new_note = {
"agent_name": self.name,
"note": f"Created draft version {state['current_version'] + 1}",
"target_agent": None,
"timestamp": state.get("last_update"),
"priority": "info",
}
result = {
"current_draft": draft_content,
"active_agent": self.name,
"status": "drafting",
"agent_notes": [new_note], # Return as list to be appended by reducer
}
print(f"DEBUG: draft() returning: {list(result.keys())}\n")
return result
class SafetyGuardianAgent:
"""Validates content for safety risks and medical advice"""
def __init__(self, llm=None):
self.llm = llm or get_llm()
self.name = "SafetyGuardian"
async def review(self, state: ProtocolState) -> Dict[str, Any]:
"""Review draft for safety issues"""
draft = state.get("current_draft")
if not draft:
return {"status": "no_draft"}
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"""You are a safety reviewer for clinical content.
Your job is to identify:
1. References to self-harm or suicide
2. Medical advice (diagnosis, medication, treatment)
3. Dangerous practices
4. Inappropriate content
Respond with a JSON object:
{{
"safe": true/false,
"safety_score": 0.0-1.0,
"issues": ["list of issues"],
"recommendations": ["how to fix"]
}}""",
),
("human", "Review this CBT exercise for safety:\n\n{draft}"),
]
)
messages = prompt.format_messages(draft=draft)
response = await self.llm.ainvoke(messages)
try:
# Try to parse JSON response
result = json.loads(response.content)
except:
# Fallback if not JSON
result = {
"safe": "safe" in response.content.lower(),
"safety_score": 0.8 if "safe" in response.content.lower() else 0.3,
"issues": [],
"recommendations": [response.content],
}
safety_score = result.get("safety_score", 0.5)
is_safe = result.get("safe", safety_score > 0.7)
# Update safety checks
safety_checks = state.get("safety_checks", {})
safety_checks["medical_advice"] = "medical" not in response.content.lower()
safety_checks["self_harm"] = (
"self-harm" not in response.content.lower()
and "suicide" not in response.content.lower()
)
safety_checks["overall"] = is_safe
# Create notes for issues if found
agent_notes_to_add = []
if not is_safe or safety_score < 0.8:
issues = result.get("issues", [])
for issue in issues:
agent_notes_to_add.append(
{
"agent_name": self.name,
"note": f"SAFETY ISSUE: {issue}",
"target_agent": "Drafter",
"timestamp": state.get("last_update"),
"priority": "critical",
}
)
# Log activity
log_agent_activity(
session_id=state.get("session_id", "unknown"),
agent_name=self.name,
action="safety_review",
reasoning=f"Safety score: {safety_score}, Safe: {is_safe}",
)
return {
"safety_checks": safety_checks,
"safety_score": safety_score,
"status": "reviewed",
"agent_notes": agent_notes_to_add, # Return as list to be appended by reducer
"active_agent": self.name,
}
class ClinicalCriticAgent:
"""Evaluates clinical appropriateness, tone, and empathy"""
def __init__(self, llm=None):
self.llm = llm or get_llm()
self.name = "ClinicalCritic"
async def critique(self, state: ProtocolState) -> Dict[str, Any]:
"""Critique draft for clinical quality"""
draft = state.get("current_draft")
if not draft:
return {"status": "no_draft"}
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"""You are a senior clinical psychologist reviewing CBT exercises.
Evaluate:
1. Clinical appropriateness (evidence-based techniques)
2. Empathy and tone (warm, supportive, non-judgmental)
3. Structure and clarity
4. Actionability (can users actually do this?)
Respond with JSON:
{{
"empathy_score": 0.0-1.0,
"clinical_score": 0.0-1.0,
"overall_score": 0.0-1.0,
"strengths": ["list"],
"weaknesses": ["list"],
"recommendations": ["how to improve"]
}}""",
),
("human", "Critique this CBT exercise:\n\n{draft}"),
]
)
messages = prompt.format_messages(draft=draft)
response = await self.llm.ainvoke(messages)
try:
result = json.loads(response.content)
except:
result = {
"empathy_score": 0.7,
"clinical_score": 0.7,
"overall_score": 0.7,
"strengths": [],
"weaknesses": [],
"recommendations": [],
}
empathy_score = result.get("empathy_score", 0.7)
clinical_score = result.get("clinical_score", 0.7)
overall_score = result.get("overall_score", 0.7)
# Create feedback notes
agent_notes_to_add = []
weaknesses = result.get("weaknesses", [])
if weaknesses:
for weakness in weaknesses[:3]: # Limit to top 3
agent_notes_to_add.append(
{
"agent_name": self.name,
"note": f"IMPROVEMENT NEEDED: {weakness}",
"target_agent": "Drafter",
"timestamp": state.get("last_update"),
"priority": "warning",
}
)
# Log activity
log_agent_activity(
session_id=state.get("session_id", "unknown"),
agent_name=self.name,
action="clinical_critique",
reasoning=f"Empathy: {empathy_score}, Clinical: {clinical_score}, Overall: {overall_score}",
)
return {
"empathy_score": empathy_score,
"clinical_score": clinical_score,
"status": "critiqued",
"agent_notes": agent_notes_to_add, # Return as list to be appended by reducer
"active_agent": self.name,
}
class SupervisorAgent:
"""Orchestrates the workflow and decides when drafts are ready"""
def __init__(self, llm=None):
self.llm = llm or get_llm()
self.name = "Supervisor"
async def decide(self, state: ProtocolState) -> Dict[str, Any]:
"""Decide next action based on current state"""
iteration = state["iteration_count"]
max_iterations = state["max_iterations"]
safety_score = state.get("safety_score")
empathy_score = state.get("empathy_score")
clinical_score = state.get("clinical_score")
# Prepare notes to add
agent_notes_to_add = []
# Check if we've exceeded max iterations
if iteration >= max_iterations:
agent_notes_to_add.append(
{
"agent_name": self.name,
"note": f"Reached max iterations ({max_iterations}). Finalizing current draft.",
"target_agent": None,
"timestamp": state.get("last_update"),
"priority": "warning",
}
)
return {
"next_agent": "HALT",
"status": "max_iterations_reached",
"agent_notes": agent_notes_to_add,
"active_agent": self.name,
}
# Check if draft meets quality thresholds
quality_ok = (
safety_score is not None
and safety_score >= 0.8
and empathy_score is not None
and empathy_score >= 0.7
and clinical_score is not None
and clinical_score >= 0.7
)
if quality_ok and state.get("current_draft"):
# Ready for human review
agent_notes_to_add.append(
{
"agent_name": self.name,
"note": "Draft meets quality thresholds. Ready for human review.",
"target_agent": None,
"timestamp": state.get("last_update"),
"priority": "info",
}
)
# Log activity
log_agent_activity(
session_id=state.get("session_id", "unknown"),
agent_name=self.name,
action="approved_for_review",
reasoning=f"Quality thresholds met. Safety: {safety_score}, Empathy: {empathy_score}, Clinical: {clinical_score}",
)
return {
"next_agent": "finalize",
"status": "ready_for_review",
"halted": False,
"agent_notes": agent_notes_to_add,
"active_agent": self.name,
}
# Check if we need another iteration
needs_revision = (
safety_score is not None
and safety_score < 0.8
or empathy_score is not None
and empathy_score < 0.7
or clinical_score is not None
and clinical_score < 0.7
)
if needs_revision and iteration < max_iterations:
agent_notes_to_add.append(
{
"agent_name": self.name,
"note": f"Quality scores below threshold. Requesting revision (iteration {iteration + 1}/{max_iterations})",
"target_agent": None,
"timestamp": state.get("last_update"),
"priority": "warning",
}
)
return {
"next_agent": "Drafter",
"status": "needs_revision",
"agent_notes": agent_notes_to_add,
"active_agent": self.name,
}
# Default: continue with drafting if no draft exists
if not state.get("current_draft"):
return {
"next_agent": "Drafter",
"status": "initial_draft_needed",
"agent_notes": agent_notes_to_add,
"active_agent": self.name,
}
return {
"next_agent": "finalize",
"status": "awaiting_review",
"halted": False,
"agent_notes": agent_notes_to_add,
"active_agent": self.name,
}
async def supervise(self, state: ProtocolState) -> Dict[str, Any]:
"""Wrapper node that runs decide() and returns the decision."""
decision = await self.decide(state)
# Don't modify state in place, just return the decision
return decision
async def finalize(self, state: ProtocolState) -> Dict[str, Any]:
"""Convert final draft to final_response for Bindu output."""
draft = state.get("current_draft", "").strip()
return {
"final_response": draft or "[empty draft]",
"halted": True,
"status": "completed",
"active_agent": self.name,
}
database.py
"""
Database setup and utilities for persistence
"""
from sqlalchemy import (
create_engine,
Column,
Integer,
String,
Text,
DateTime,
Float,
JSON,
)
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import sessionmaker
import os
from datetime import datetime
Base = declarative_base()
class ProtocolSession(Base):
"""Database model for protocol generation sessions"""
__tablename__ = "protocol_sessions"
id = Column(Integer, primary_key=True, index=True)
session_id = Column(String, unique=True, index=True)
user_intent = Column(Text)
status = Column(String)
current_draft = Column(Text, nullable=True)
final_protocol = Column(Text, nullable=True)
safety_score = Column(Float, nullable=True)
empathy_score = Column(Float, nullable=True)
clinical_score = Column(Float, nullable=True)
iteration_count = Column(Integer, default=0)
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
meta_data = Column(
JSON, default=dict
) # Renamed from 'metadata' to avoid SQLAlchemy reserved word conflict
class AgentActivity(Base):
"""Log of agent activities for history tracking"""
__tablename__ = "agent_activities"
id = Column(Integer, primary_key=True, index=True)
session_id = Column(String, index=True)
agent_name = Column(String)
action = Column(String)
reasoning = Column(Text, nullable=True)
timestamp = Column(DateTime, default=datetime.now)
state_snapshot = Column(JSON, nullable=True)
# Database setup
engine = None
SessionLocal = None
def init_db():
"""Initialize database connection and create tables"""
global engine, SessionLocal
db_url = os.getenv("DATABASE_URL", "sqlite:///./cerina_foundry.db")
# Handle SQLite path
if db_url.startswith("sqlite:///"):
db_path = db_url.replace("sqlite:///", "")
os.makedirs(
os.path.dirname(db_path) if os.path.dirname(db_path) else ".", exist_ok=True
)
engine = create_engine(
db_url,
connect_args={"check_same_thread": False} if "sqlite" in db_url else {},
)
else:
engine = create_engine(db_url)
# Create tables
Base.metadata.create_all(bind=engine)
# Create session factory
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
return engine, SessionLocal
def get_db_session():
"""Get database session"""
if SessionLocal is None:
init_db()
return SessionLocal()
langgraph_integration.py
import os
from pathlib import Path
from typing import Any, Dict, Optional
from dotenv import load_dotenv
# Load .env from cbt folder
cbt_env_path = Path(__file__).parent / ".env"
load_dotenv(dotenv_path=cbt_env_path)
from langchain_openai import ChatOpenAI
from workflow import create_workflow
from state import create_initial_state
def get_llm_client(api_key: Optional[str] = None) -> ChatOpenAI:
api_key = api_key or os.getenv("OPENROUTER_API_KEY")
if not api_key:
raise ValueError("OPENROUTER_API_KEY not set")
base_url = "https://openrouter.ai/api/v1"
model_name = "openai/gpt-oss-120b"
return ChatOpenAI(
api_key=api_key,
base_url=base_url,
model=model_name,
temperature=0.7,
)
class LangGraphWorkflowAdapter:
def __init__(self):
self.workflow = None
self._initialized = False
def _ensure_initialized(self):
if not self._initialized:
llm = get_llm_client()
self.workflow = create_workflow(llm)
self._initialized = True
async def invoke(
self,
user_intent: str,
thread_id: str,
task_id: str,
metadata: Optional[Dict[str, Any]] = None,
):
self._ensure_initialized()
# Use Cerina's create_initial_state helper to ensure all required fields are present
input_state = create_initial_state(
user_intent=user_intent, session_id=thread_id, max_iterations=3
)
# Update with our metadata
input_state["metadata"] = metadata or {}
input_state["metadata"]["task_id"] = task_id
config = {"configurable": {"thread_id": thread_id}}
# Invoke the workflow and get the final state
try:
# Use ainvoke directly - simpler and more reliable than manual astream merging
final_state = await self.workflow.ainvoke(input_state, config)
except Exception as e:
import traceback
print(f"DEBUG: astream error: {e}")
print(f"DEBUG: Traceback: {traceback.format_exc()}")
# Fall back to ainvoke if astream fails
try:
final_state = await self.workflow.ainvoke(input_state, config)
except Exception as e2:
print(f"DEBUG: ainvoke also failed: {e2}")
raise
# Convert to dict if it's a Pydantic model, with fallback
try:
if hasattr(final_state, "dict"):
state_dict = final_state.dict()
elif hasattr(final_state, "model_dump"):
state_dict = final_state.model_dump()
elif isinstance(final_state, dict):
# Already a dict, use as-is
state_dict = final_state
else:
# Try to convert to dict
try:
state_dict = dict(final_state)
except:
# If that fails, try vars()
state_dict = (
vars(final_state) if hasattr(final_state, "__dict__") else {}
)
# Remove node execution metadata that shouldn't be in the state
node_names = {
"init",
"draft",
"safety_review",
"clinical_critique",
"supervisor",
"finalize",
"__pregel_pull__",
}
clean_state = {k: v for k, v in state_dict.items() if k not in node_names}
return clean_state if clean_state else state_dict
except Exception as e:
# If conversion fails, try to extract as much as we can
import json
try:
# Try JSON serialization as last resort
json_str = json.dumps(final_state, default=str)
parsed = json.loads(json_str)
# Remove node metadata
node_names = {
"init",
"draft",
"safety_review",
"clinical_critique",
"supervisor",
"finalize",
"__pregel_pull__",
}
return {k: v for k, v in parsed.items() if k not in node_names}
except:
# Return empty dict if all else fails
print(f"WARNING: Could not convert final_state to dict: {e}")
return {}
state.py
"""
Deep State Management - The "Blackboard" Pattern
Rich, structured state that agents share and collaborate on.
"""
from typing import TypedDict, List, Dict, Any, Optional, Annotated
from datetime import datetime
def merge_dicts(a: Optional[Dict], b: Optional[Dict]) -> Dict:
"""Merge two dictionaries, with b taking precedence"""
if a is None:
return b or {}
if b is None:
return a or {}
result = dict(a)
result.update(b)
return result
def append_list(a: List, b: List) -> List:
"""Append two lists"""
return (a or []) + (b or [])
def last_value_str(a: Optional[str], b: Optional[str]) -> Optional[str]:
"""Return the second (latest) non-None value"""
return b if b is not None else a
def last_value_int(a: Optional[int], b: Optional[int]) -> Optional[int]:
"""Return the second (latest) non-None value"""
return b if b is not None else a
def last_value_float(a: Optional[float], b: Optional[float]) -> Optional[float]:
"""Return the second (latest) non-None value"""
return b if b is not None else a
class AgentNote(TypedDict):
"""Notes agents leave for each other on the blackboard"""
agent_name: str
note: str
target_agent: Optional[str] # If None, note is for all agents
timestamp: datetime
priority: str # "info", "warning", "critical"
class ProtocolDraft(TypedDict):
"""A versioned draft of the CBT exercise"""
content: str
version: int
iteration: int
created_by: str
timestamp: datetime
safety_score: Optional[float]
empathy_score: Optional[float]
clinical_score: Optional[float]
feedback: List[str] # Feedback from other agents
class ProtocolState(TypedDict):
"""
The shared state (blackboard) that all agents read from and write to.
This is the central workspace for the multi-agent collaboration.
"""
# Core workflow state
user_intent: str
session_id: str
status: Annotated[
str, last_value_str
] # "initializing", "drafting", "reviewing", "awaiting_approval", "approved", "completed"
# Draft management
current_draft: Annotated[
Optional[str], last_value_str
] # The latest draft content - takes last write
draft_history: Annotated[List[ProtocolDraft], append_list] # All previous drafts
current_version: Annotated[int, last_value_int]
iteration_count: Annotated[int, last_value_int]
max_iterations: int # Safety limit
final_response: Annotated[Optional[str], last_value_str] # Final output for user
# Agent collaboration
agent_notes: Annotated[List[AgentNote], append_list] # The blackboard scratchpad
active_agent: Annotated[
Optional[str], last_value_str
] # Which agent is currently working
agent_decisions: Annotated[
Dict[str, Any], merge_dicts
] # Decisions made by each agent
# Safety and quality metrics
safety_checks: Annotated[
Dict[str, bool], merge_dicts
] # Results from safety guardian
safety_score: Annotated[Optional[float], last_value_float]
empathy_score: Annotated[Optional[float], last_value_float]
clinical_score: Annotated[Optional[float], last_value_float]
# Human-in-the-loop
halted: Annotated[
bool, last_value_int
] # Whether workflow is paused for human review
human_approved: Annotated[bool, last_value_int]
human_edits: Annotated[Optional[str], last_value_str] # Edits made by human
# Metadata
start_time: datetime
last_update: (
datetime # Not annotated - updated by nodes managing their own timestamps
)
metadata: Annotated[Dict[str, Any], merge_dicts] # Additional context
def create_initial_state(
user_intent: str, session_id: str, max_iterations: int = 5
) -> ProtocolState:
"""Create initial state for a new protocol generation"""
now = datetime.now()
return ProtocolState(
user_intent=user_intent,
session_id=session_id,
status="initializing",
current_draft=None,
draft_history=[],
current_version=0,
iteration_count=0,
max_iterations=max_iterations,
final_response=None,
agent_notes=[],
active_agent=None,
agent_decisions={},
safety_checks={},
safety_score=None,
empathy_score=None,
clinical_score=None,
halted=False,
human_approved=False,
human_edits=None,
start_time=now,
last_update=now,
metadata={},
)
def add_agent_note(
state: ProtocolState,
agent_name: str,
note: str,
target_agent: Optional[str] = None,
priority: str = "info",
) -> ProtocolState:
"""Helper to add a note to the blackboard"""
new_note = AgentNote(
agent_name=agent_name,
note=note,
target_agent=target_agent,
timestamp=datetime.now(),
priority=priority,
)
state["agent_notes"].append(new_note)
state["last_update"] = datetime.now()
return state
def add_draft_version(
state: ProtocolState,
content: str,
created_by: str,
safety_score: Optional[float] = None,
empathy_score: Optional[float] = None,
clinical_score: Optional[float] = None,
) -> ProtocolState:
"""Helper to add a new draft version to history"""
new_draft = ProtocolDraft(
content=content,
version=state["current_version"] + 1,
iteration=state["iteration_count"],
created_by=created_by,
timestamp=datetime.now(),
safety_score=safety_score,
empathy_score=empathy_score,
clinical_score=clinical_score,
feedback=[],
)
state["draft_history"].append(new_draft)
state["current_version"] = new_draft["version"]
state["current_draft"] = content
state["last_update"] = datetime.now()
return state
state_mapper.py
"""Map Cerina ProtocolState to/from Bindu artifacts and messages."""
from uuid import UUID
from typing import Any, Dict, List
from bindu.common.protocol.types import Artifact, Part
def extract_text_from_bindu_message(messages: List[Dict[str, Any]]) -> str:
"""Extract user input text from Bindu message format.
Args:
messages: List of Bindu message dictionaries
Returns:
User's text input
"""
if not messages:
return ""
last_message = messages[-1]
print(f"\nDEBUG extract_text: Full message: {last_message}")
print(f"DEBUG extract_text: Message keys: {list(last_message.keys())}")
content_val = last_message.get("content")
print(f"DEBUG extract_text: Type of 'content': {type(content_val)}")
print(f"DEBUG extract_text: Value of 'content': {content_val}")
# Try to extract text from different possible formats
# Format 1: "content" field (string - most common)
if "content" in last_message:
content = last_message["content"]
if isinstance(content, str):
text = content.strip()
print(f"DEBUG extract_text: Found in 'content' (string): {text[:80]}...")
return text
# If content is a dict, try to extract text from it
elif isinstance(content, dict):
if "text" in content:
text = content["text"]
if isinstance(text, str):
text = text.strip()
print(
f"DEBUG extract_text: Found in 'content.text': {text[:80]}..."
)
return text
# Try body or message field in content dict
for key in ["body", "message", "value"]:
if key in content and isinstance(content[key], str):
text = content[key].strip()
print(
f"DEBUG extract_text: Found in 'content.{key}': {text[:80]}..."
)
return text
# Format 2: "parts" field (alternative format)
parts = last_message.get("parts", [])
if parts and isinstance(parts, list):
text_parts = []
for p in parts:
if isinstance(p, dict):
if p.get("kind") == "text":
text_parts.append(p.get("text", ""))
elif "text" in p:
text_parts.append(p["text"])
text = " ".join(filter(None, text_parts)).strip()
if text:
print(f"DEBUG extract_text: Found in 'parts': {text[:80]}...")
return text
# Format 3: "message" field
if "message" in last_message:
text = last_message["message"]
if isinstance(text, str):
text = text.strip()
print(f"DEBUG extract_text: Found in 'message' (string): {text[:80]}...")
return text
# Format 4: "text" field
if "text" in last_message:
text = last_message["text"]
if isinstance(text, str):
text = text.strip()
print(f"DEBUG extract_text: Found in 'text': {text[:80]}...")
return text
print("DEBUG extract_text: No text found, returning empty string")
return ""
def build_langgraph_input(
bindu_messages: List[Dict[str, Any]],
context_id: UUID,
task_id: UUID,
) -> Dict[str, Any]:
"""Convert Bindu message to LangGraph workflow input.
Args:
bindu_messages: Bindu message history
context_id: Bindu context ID (maps to LangGraph thread_id)
task_id: Bindu task ID
Returns:
LangGraph input dictionary
"""
user_input = extract_text_from_bindu_message(bindu_messages)
# Map Bindu context_id to LangGraph thread_id
# Format: "bindu_{context_id}" allows multi-turn conversations
thread_id = f"bindu_{context_id}"
return {
"user_intent": user_input,
"thread_id": thread_id,
"task_id": str(task_id),
# Add any other metadata from Bindu messages
"metadata": {
"context_id": str(context_id),
"task_id": str(task_id),
},
}
def protocol_state_to_bindu_artifact(
final_state: Dict[str, Any], artifact_id: UUID
) -> Artifact:
# Convert ProtocolState → dict if needed
if hasattr(final_state, "dict"):
try:
final_state = final_state.dict()
except Exception as e:
# If dict() fails, try model_dump or vars
print(f"DEBUG: Failed to call .dict(): {e}")
if hasattr(final_state, "model_dump"):
try:
final_state = final_state.model_dump()
except Exception as e2:
print(f"DEBUG: Failed to call .model_dump(): {e2}")
final_state = (
vars(final_state) if hasattr(final_state, "__dict__") else {}
)
else:
final_state = (
vars(final_state) if hasattr(final_state, "__dict__") else {}
)
# Debug: print available keys
print(
f"DEBUG: Available keys in final_state: {list(final_state.keys()) if isinstance(final_state, dict) else 'Not a dict'}"
)
# Extract fields from the proper ProtocolState structure
final_response = final_state.get("current_draft", "")
draft_history = final_state.get("draft_history", [])
agent_notes = final_state.get("agent_notes", [])
safety_score = final_state.get("safety_score")
empathy_score = final_state.get("empathy_score")
clinical_score = final_state.get("clinical_score")
safety_checks = final_state.get("safety_checks", {})
agent_decisions = final_state.get("agent_decisions", {})
trace = final_state.get("trace", [])
parts: List[Part] = [
{"kind": "text", "text": final_response or "[No draft generated]"}
]
# Include structured debugging metadata
parts.append(
{
"kind": "data",
"data": {
"draft": final_response,
"draft_history": draft_history,
"safety_score": safety_score,
"empathy_score": empathy_score,
"clinical_score": clinical_score,
"safety_checks": safety_checks,
"agent_decisions": agent_decisions,
"agent_notes": agent_notes,
"trace": final_state.get("trace", []),
},
}
)
return {
"artifact_id": artifact_id,
"name": "cbt_exercise",
"description": "CBT exercise generated via Cerina Protocol Foundry (LangGraph)",
"parts": parts,
"metadata": {
"source": "cerina",
"pipeline": "drafter → safety → critic → supervisor",
"final_draft": final_response,
"draft_history": draft_history,
"safety_score": safety_score,
"empathy_score": empathy_score,
"clinical_score": clinical_score,
"safety_checks": safety_checks,
"agent_decisions": agent_decisions,
"agent_notes": agent_notes,
"trace": final_state.get("trace", []),
},
}
def bindu_message_from_artifact(artifact: Artifact) -> List[Dict[str, Any]]:
text_parts = [
p.get("text", "") for p in artifact.get("parts", []) if p.get("kind") == "text"
]
content = " ".join(text_parts).strip()
return [
{
"role": "assistant",
"content": content,
"artifact": artifact, # include entire artifact for UI or downstream agents
}
]
utils.py
"""
Utility functions for error handling and logging
"""
import logging
from typing import Any, Dict
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
def log_agent_activity(
session_id: str,
agent_name: str,
action: str,
reasoning: str = None,
state_snapshot: Dict[str, Any] = None,
):
"""Log agent activity (in-memory logging)"""
logger.info(f"[{session_id}] {agent_name} - {action}: {reasoning}")
def safe_get_state_value(state: Dict[str, Any], key: str, default: Any = None):
"""Safely get value from state dictionary"""
try:
return state.get(key, default)
except (AttributeError, TypeError):
return default
def format_error_message(error: Exception) -> str:
"""Format error message for API responses"""
error_type = type(error).__name__
error_msg = str(error)
return f"{error_type}: {error_msg}"
workflow.py
"""
LangGraph Workflow - The Multi-Agent Orchestration
Implements a Supervisor-Worker pattern with autonomous decision-making.
"""
from typing import Literal
from langgraph.graph import StateGraph, END
from state import ProtocolState, add_draft_version
from agents import (
DrafterAgent,
SafetyGuardianAgent,
ClinicalCriticAgent,
SupervisorAgent,
)
async def draft_node(state: ProtocolState) -> ProtocolState:
"""Node: Drafter creates/revises draft"""
result = await drafter.draft(state)
# Update state with draft
state["current_draft"] = result.get("current_draft", state.get("current_draft"))
state["active_agent"] = result.get("active_agent", "Drafter")
state["status"] = result.get("status", "drafting")
state["iteration_count"] += 1
# Add draft to history
if state["current_draft"]:
state = add_draft_version(state, state["current_draft"], "Drafter")
return state
async def safety_review_node(state: ProtocolState) -> ProtocolState:
"""Node: Safety Guardian reviews for safety issues"""
result = await safety_guardian.review(state)
# Update safety metrics
if "safety_checks" in result:
state["safety_checks"].update(result["safety_checks"])
if "safety_score" in result:
state["safety_score"] = result["safety_score"]
state["active_agent"] = "SafetyGuardian"
state["status"] = result.get("status", "reviewing")
return state
async def clinical_critique_node(state: ProtocolState) -> ProtocolState:
"""Node: Clinical Critic evaluates quality"""
result = await clinical_critic.critique(state)
# Update quality metrics
if "empathy_score" in result:
state["empathy_score"] = result["empathy_score"]
if "clinical_score" in result:
state["clinical_score"] = result["clinical_score"]
state["active_agent"] = "ClinicalCritic"
state["status"] = result.get("status", "critiquing")
return state
async def finalize_node(state: ProtocolState) -> ProtocolState:
result = await supervisor.finalize(state)
state["status"] = "completed"
state.update(result)
return state
def should_continue(
state: ProtocolState,
) -> Literal[
"draft", "safety_review", "clinical_critique", "supervisor", "halt", "end"
]:
"""
Router function: Decides which node to execute next based on current state.
This implements the autonomous decision-making logic.
"""
active_agent = state.get("active_agent")
status = state.get("status", "")
current_draft = state.get("current_draft")
print(
f"DEBUG should_continue: active_agent={active_agent}, status={status}, draft_exists={bool(current_draft)}"
)
# If halted, wait for human approval
if state.get("halted"):
print("DEBUG: Halted, returning 'end'")
return "end"
# If human approved, continue to finalization
if state.get("human_approved"):
state["halted"] = False
print("DEBUG: Human approved, returning 'end'")
return "end"
# After drafting, always review safety
if active_agent == "Drafter":
print("DEBUG: After Drafter, routing to safety_review")
return "safety_review"
# After safety review, critique clinically
if active_agent == "SafetyGuardian":
print("DEBUG: After SafetyGuardian, routing to clinical_critique")
return "clinical_critique"
# After critique, supervisor decides
if active_agent == "ClinicalCritic":
print("DEBUG: After ClinicalCritic, routing to supervisor")
return "supervisor"
# Supervisor decides next action
if active_agent == "Supervisor":
print(f"DEBUG: Supervisor decision - status: {status}")
# Check supervisor's decision
if "needs_revision" in status:
print("DEBUG: Supervisor says needs_revision, routing back to draft")
return "draft" # Route back to drafter for revision
elif "ready_for_review" in status or "awaiting_review" in status:
print("DEBUG: Supervisor says ready, routing to finalize")
return "finalize"
else:
print("DEBUG: Supervisor status unclear, routing to end")
return "end"
# Default: if no draft exists, start with draft
if not state.get("current_draft"):
print("DEBUG: No draft exists, routing to draft")
return "draft"
# Default: supervisor decides
print("DEBUG: Default routing to supervisor")
return "supervisor"
def init_state(state: ProtocolState) -> ProtocolState:
print(f"\n{'=' * 60}")
print("INIT_STATE CALLED")
print(f" - state keys: {list(state.keys())}")
print(f" - user_intent: {state.get('user_intent', 'NOT SET')}")
print(f" - session_id: {state.get('session_id', 'NOT SET')}")
print(f" - status: {state.get('status', 'NOT SET')}")
print(f"{'=' * 60}\n")
state.setdefault("iteration_count", 0)
state.setdefault("max_iterations", 3)
state.setdefault("safety_score", None)
state.setdefault("empathy_score", None)
state.setdefault("clinical_score", None)
state.setdefault("current_draft", "")
state.setdefault("halted", False)
# Ensure list fields are initialized as lists
if "agent_notes" not in state:
state["agent_notes"] = []
if "draft_history" not in state:
state["draft_history"] = []
if "safety_checks" not in state:
state["safety_checks"] = {}
if "agent_decisions" not in state:
state["agent_decisions"] = {}
if "metadata" not in state:
state["metadata"] = {}
return state
def create_workflow(llm):
"""Create and compile the LangGraph workflow"""
drafter = DrafterAgent(llm)
safety_guardian = SafetyGuardianAgent(llm)
clinical_critic = ClinicalCriticAgent(llm)
supervisor = SupervisorAgent(llm)
# Create graph
workflow = StateGraph(ProtocolState)
# Add nodes - use the agent methods directly, they return partial state updates
workflow.add_node("init", init_state)
workflow.set_entry_point("init")
workflow.add_edge("init", "draft")
workflow.add_node("draft", drafter.draft)
workflow.add_node("safety_review", safety_guardian.review)
workflow.add_node("clinical_critique", clinical_critic.critique)
workflow.add_node("supervisor", supervisor.supervise)
workflow.add_node("finalize", supervisor.finalize)
# Set entry point
workflow.set_entry_point("draft")
# Add conditional edges (autonomous routing)
workflow.add_conditional_edges(
"draft",
should_continue,
{
"draft": "draft",
"safety_review": "safety_review",
"clinical_critique": "clinical_critique",
"supervisor": "supervisor",
"halt": END,
"end": END,
},
)
workflow.add_conditional_edges(
"safety_review",
should_continue,
{
"draft": "draft",
"clinical_critique": "clinical_critique",
"supervisor": "supervisor",
"halt": END,
"end": END,
},
)
workflow.add_conditional_edges(
"clinical_critique",
should_continue,
{"draft": "draft", "supervisor": "supervisor", "halt": END, "end": END},
)
workflow.add_conditional_edges(
"supervisor",
should_continue,
{
"draft": "draft",
"clinical_critique": "clinical_critique",
"supervisor": "supervisor",
"finalize": "finalize",
"halt": END,
"end": END,
},
)
workflow.add_conditional_edges("finalize", should_continue, {"end": END})
# Compile workflow (stateless, no persistence)
app = workflow.compile()
return app
Skill Configuration
Createskills/cbt-therapy-skill/skill.yaml:
# CBT Therapy Skill
# Cognitive Behavioral Therapy with multi-agent protocol generation
id: cbt-therapy-skill
name: cbt-therapy-skill
version: 1.0.0
author: bindu.builder@getbindu.com
description: |
Cognitive Behavioral Therapy (CBT) skill that provides personalized therapeutic exercises
and protocols based on user concerns and mental health needs.
Features:
- Multi-agent CBT protocol generation
- Clinical safety validation
- Therapeutic quality assessment
- Personalized exercise creation
- Evidence-based interventions
Uses a sophisticated multi-agent workflow with:
- Drafter Agent: Creates initial CBT exercise drafts
- Safety Guardian: Validates clinical safety and ethics
- Clinical Critic: Ensures therapeutic quality and effectiveness
All protocols are reviewed for safety and clinical appropriateness
before being provided to users.
tags:
- cbt
- therapy
- mental-health
- cognitive-behavioral
- psychological
- wellness
- self-help
input_modes:
- application/json
output_modes:
- application/json
examples:
- "I am overwhelmed with sleep problems"
- "Help me manage my anxiety about work"
- "I'm struggling with negative thoughts"
- "Can you help me with stress management?"
- "I need help coping with depression"
capabilities_detail:
cbt_protocol_generation:
supported: true
description: "Generates personalized CBT exercises and protocols"
safety_validation:
supported: true
description: "Clinical safety and ethics validation for all protocols"
quality_assessment:
supported: true
description: "Therapeutic quality and effectiveness assessment"
multi_agent_workflow:
supported: true
description: "Sophisticated multi-agent orchestration for protocol creation"
evidence_based:
supported: true
description: "Evidence-based CBT interventions and techniques"
How It Works
Multi-Agent Workflow- Drafter Agent: Creates initial CBT exercise draft
- Safety Guardian: Validates clinical safety and ethics
- Clinical Critic: Reviews therapeutic quality
- Uses LangGraph for workflow orchestration
- Stateless execution (one-shot invocation)
- Maps Bindu context to LangGraph thread
- Converts Bindu messages to LangGraph input
- Transforms ProtocolState to Bindu artifacts
- Includes safety scores and clinical feedback
- Extract user concern from Bindu message
- Map context_id to LangGraph thread_id
- Invoke multi-agent workflow (Drafter → Safety → Critic)
- Convert ProtocolState to Bindu artifact format
- Return structured CBT exercise with metadata
- Safety score (0-100) from Safety Guardian
- Clinical score (0-100) from Clinical Critic
- Safety verdict validation
- Quality feedback in metadata
Dependencies
uv init
uv add bindu langchain langchain-openai langgraph sqlalchemy python-dotenv
Environment Setup
Create.env file:
OPENROUTER_API_KEY=your_openrouter_api_key_here
Run
uv run supervisor_cbt.py
- “I am overwhelmed with sleep problems”
- “Can you help me create a coping strategy for anxiety?”
- “What are the key principles of CBT for depression?”
Example API Calls
Message Send Request
Message Send Request
{
"jsonrpc": "2.0",
"method": "message/send",
"params": {
"message": {
"role": "user",
"kind": "message",
"messageId": "9f11c870-5616-49ad-b187-d93cbb100001",
"contextId": "9f11c870-5616-49ad-b187-d93cbb100002",
"taskId": "9f11c870-5616-49ad-b187-d93cbb100003",
"parts": [
{
"kind": "text",
"text": "I am overwhelmed with sleep problems"
}
]
},
"skillId": "cbt-therapy-skill",
"configuration": {
"acceptedOutputModes": ["application/json"]
}
},
"id": "9f11c870-5616-49ad-b187-d93cbb100003"
}
Task get Request
Task get Request
{
"jsonrpc": "2.0",
"method": "tasks/get",
"params": {
"taskId": "9f11c870-5616-49ad-b187-d93cbb100003"
},
"id": "9f11c870-5616-49ad-b187-d93cbb100004"
}
Frontend Setup
# Clone the Bindu repository
git clone https://github.com/GetBindu/Bindu
# Navigate to frontend directory
cd frontend
# Install dependencies
npm install
# Start frontend development server
npm run dev