Skip to main content
CBT therapy protocol generator for mental health applications.

Code

Create supervisor_cbt.py with the code below, or save it directly from your editor.
from bindu.penguin.bindufy import bindufy
from bindu.utils.logging import get_logger
from langgraph_integration import LangGraphWorkflowAdapter
from state_mapper import (
    build_langgraph_input,
    protocol_state_to_bindu_artifact,
    bindu_message_from_artifact,
)
from uuid import UUID, uuid4

logger = get_logger("cerina_bindu.cbt.supervisor")

_workflow_adapter: LangGraphWorkflowAdapter | None = None

def get_workflow_adapter() -> LangGraphWorkflowAdapter:
    global _workflow_adapter
    if _workflow_adapter is None:
        _workflow_adapter = LangGraphWorkflowAdapter()
    return _workflow_adapter

async def handler(messages: list[dict]):
    if not messages:
        return [{"role": "assistant", "content": "No input provided"}]
    
    last_message = messages[-1]
    context_id = UUID(last_message.get("context_id") or str(uuid4()))
    task_id = UUID(last_message.get("task_id") or str(uuid4()))
    
    try:
        langgraph_input = build_langgraph_input(messages, context_id, task_id)
        user_intent = langgraph_input["user_intent"]
        thread_id = langgraph_input["thread_id"]
        
        if not user_intent:
            return [{"role": "assistant", "content": "No user intent provided"}]
        
        adapter = get_workflow_adapter()
        final_state = await adapter.invoke(
            user_intent=user_intent,
            thread_id=thread_id,
            task_id=str(task_id),
            metadata=langgraph_input.get("metadata", {}),
        )
        
        artifact_id = uuid4()
        artifact = protocol_state_to_bindu_artifact(final_state, artifact_id)
        assistant_messages = bindu_message_from_artifact(artifact)
        
        if assistant_messages:
            assistant_messages[0]["metadata"] = artifact.get("metadata", {})
        
        return assistant_messages
    
    except Exception as e:
        logger.error(f"Error processing CBT request: {str(e)}", exc_info=True)
        return [{"role": "assistant", "content": f"Error generating CBT exercise: {str(e)}"}]

config = {
    "author": "imdanishakhtar7@gmail.com",
    "name": "cerina_supervisor_cbt",
    "description": "Supervisor orchestrator for Cerina CBT integration with LangGraph workflow.",
    "deployment": {
        "url": "http://localhost:3773",
        "expose": True,
        "cors_origins": ["http://localhost:5173"]
    },
    "skills": ["../../skills/cbt-supervisor-orchestrator"],
}

bindufy(config, handler)

#bindufy(config, handler, launch=True)
# This will create a tunnel to your agent and expose it on port 3773

Additional Files

Create these supporting files in the same directory:

agents.py

"""
Individual Agent Implementations
Each agent has a specific role in the CBT exercise generation process.
"""

from typing import Dict, Any
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
import os
import json
from pathlib import Path
from dotenv import load_dotenv

from state import ProtocolState
from utils import log_agent_activity

# Load environment variables from .env file in cbt folder
env_path = Path(__file__).parent / ".env"
load_dotenv(dotenv_path=env_path)


def get_llm():
    """Get the LLM instance using OpenAI API"""
    api_key = os.getenv("OPENROUTER_API_KEY")
    if not api_key:
        raise ValueError(
            "OPENROUTER_API_KEY not found. Get your key from https://openrouter.ai/keys"
        )

    base_url = "https://openrouter.ai/api/v1"

    # Using OpenRouter's GPT model
    model_name = "openai/gpt-oss-120b"
    return ChatOpenAI(
        api_key=api_key, base_url=base_url, model=model_name, temperature=0.7
    )


class DrafterAgent:
    """Creates initial CBT exercise drafts based on user intent"""

    def __init__(self, llm=None):
        self.llm = llm or get_llm()
        self.name = "Drafter"

    async def draft(self, state: ProtocolState) -> Dict[str, Any]:
        """Generate an initial draft of the CBT exercise"""
        intent = state["user_intent"]
        iteration = state["iteration_count"]

        print(f"\n{'=' * 60}")
        print(f"DEBUG DRAFT NODE: intent={repr(intent)[:100]}")
        print(f"DEBUG DRAFT NODE: iteration={iteration}")
        print(f"{'=' * 60}\n")

        # Check for feedback from other agents
        recent_notes = [
            n for n in state["agent_notes"][-5:] if n.get("target_agent") == "Drafter"
        ]
        feedback_context = "\n".join([f"- {n['note']}" for n in recent_notes])

        prompt = ChatPromptTemplate.from_messages(
            [
                (
                    "system",
                    """You are a clinical psychologist specializing in Cognitive Behavioral Therapy (CBT).
Your task is to create structured, empathetic, and evidence-based CBT exercises.

Guidelines:
1. Structure: Clear sections (Introduction, Steps, Practice, Reflection)
2. Tone: Warm, supportive, non-judgmental
3. Safety: Never provide medical advice or encourage self-harm
4. Evidence-based: Use established CBT techniques
5. Accessibility: Clear language, actionable steps

Format your response as a complete CBT exercise protocol.""",
                ),
                (
                    "human",
                    """Create a CBT exercise for: {intent}

{feedback_context}

{iteration_context}""",
                ),
            ]
        )

        iteration_context = ""
        if iteration > 0:
            iteration_context = f"This is iteration {iteration + 1}. Previous drafts had issues that need addressing."

        messages = prompt.format_messages(
            intent=intent,
            feedback_context=feedback_context or "No specific feedback yet.",
            iteration_context=iteration_context,
        )

        print(f"DEBUG: Calling LLM with intent: {intent[:100]}")
        print(f"DEBUG: Full prompt messages count: {len(messages)}")
        if messages:
            print(
                f"DEBUG: Human message content preview: {str(messages[-1].content)[:150]}..."
            )

        response = await self.llm.ainvoke(messages)
        draft_content = response.content

        print(
            f"DEBUG: LLM response received, length: {len(draft_content) if draft_content else 0}"
        )
        print(
            f"DEBUG: Draft content preview: {draft_content[:100] if draft_content else '[empty]'}..."
        )

        # Log activity
        log_agent_activity(
            session_id=state.get("session_id", "unknown"),
            agent_name=self.name,
            action="drafted_protocol",
            reasoning=f"Created draft for: {intent[:100]}",
        )

        # Create new agent note
        new_note = {
            "agent_name": self.name,
            "note": f"Created draft version {state['current_version'] + 1}",
            "target_agent": None,
            "timestamp": state.get("last_update"),
            "priority": "info",
        }

        result = {
            "current_draft": draft_content,
            "active_agent": self.name,
            "status": "drafting",
            "agent_notes": [new_note],  # Return as list to be appended by reducer
        }

        print(f"DEBUG: draft() returning: {list(result.keys())}\n")
        return result


class SafetyGuardianAgent:
    """Validates content for safety risks and medical advice"""

    def __init__(self, llm=None):
        self.llm = llm or get_llm()
        self.name = "SafetyGuardian"

    async def review(self, state: ProtocolState) -> Dict[str, Any]:
        """Review draft for safety issues"""
        draft = state.get("current_draft")
        if not draft:
            return {"status": "no_draft"}

        prompt = ChatPromptTemplate.from_messages(
            [
                (
                    "system",
                    """You are a safety reviewer for clinical content.
Your job is to identify:
1. References to self-harm or suicide
2. Medical advice (diagnosis, medication, treatment)
3. Dangerous practices
4. Inappropriate content

Respond with a JSON object:
{{
    "safe": true/false,
    "safety_score": 0.0-1.0,
    "issues": ["list of issues"],
    "recommendations": ["how to fix"]
}}""",
                ),
                ("human", "Review this CBT exercise for safety:\n\n{draft}"),
            ]
        )

        messages = prompt.format_messages(draft=draft)
        response = await self.llm.ainvoke(messages)

        try:
            # Try to parse JSON response
            result = json.loads(response.content)
        except:
            # Fallback if not JSON
            result = {
                "safe": "safe" in response.content.lower(),
                "safety_score": 0.8 if "safe" in response.content.lower() else 0.3,
                "issues": [],
                "recommendations": [response.content],
            }

        safety_score = result.get("safety_score", 0.5)
        is_safe = result.get("safe", safety_score > 0.7)

        # Update safety checks
        safety_checks = state.get("safety_checks", {})
        safety_checks["medical_advice"] = "medical" not in response.content.lower()
        safety_checks["self_harm"] = (
            "self-harm" not in response.content.lower()
            and "suicide" not in response.content.lower()
        )
        safety_checks["overall"] = is_safe

        # Create notes for issues if found
        agent_notes_to_add = []
        if not is_safe or safety_score < 0.8:
            issues = result.get("issues", [])
            for issue in issues:
                agent_notes_to_add.append(
                    {
                        "agent_name": self.name,
                        "note": f"SAFETY ISSUE: {issue}",
                        "target_agent": "Drafter",
                        "timestamp": state.get("last_update"),
                        "priority": "critical",
                    }
                )

        # Log activity
        log_agent_activity(
            session_id=state.get("session_id", "unknown"),
            agent_name=self.name,
            action="safety_review",
            reasoning=f"Safety score: {safety_score}, Safe: {is_safe}",
        )

        return {
            "safety_checks": safety_checks,
            "safety_score": safety_score,
            "status": "reviewed",
            "agent_notes": agent_notes_to_add,  # Return as list to be appended by reducer
            "active_agent": self.name,
        }


class ClinicalCriticAgent:
    """Evaluates clinical appropriateness, tone, and empathy"""

    def __init__(self, llm=None):
        self.llm = llm or get_llm()
        self.name = "ClinicalCritic"

    async def critique(self, state: ProtocolState) -> Dict[str, Any]:
        """Critique draft for clinical quality"""
        draft = state.get("current_draft")
        if not draft:
            return {"status": "no_draft"}

        prompt = ChatPromptTemplate.from_messages(
            [
                (
                    "system",
                    """You are a senior clinical psychologist reviewing CBT exercises.
Evaluate:
1. Clinical appropriateness (evidence-based techniques)
2. Empathy and tone (warm, supportive, non-judgmental)
3. Structure and clarity
4. Actionability (can users actually do this?)

Respond with JSON:
{{
    "empathy_score": 0.0-1.0,
    "clinical_score": 0.0-1.0,
    "overall_score": 0.0-1.0,
    "strengths": ["list"],
    "weaknesses": ["list"],
    "recommendations": ["how to improve"]
}}""",
                ),
                ("human", "Critique this CBT exercise:\n\n{draft}"),
            ]
        )

        messages = prompt.format_messages(draft=draft)
        response = await self.llm.ainvoke(messages)

        try:
            result = json.loads(response.content)
        except:
            result = {
                "empathy_score": 0.7,
                "clinical_score": 0.7,
                "overall_score": 0.7,
                "strengths": [],
                "weaknesses": [],
                "recommendations": [],
            }

        empathy_score = result.get("empathy_score", 0.7)
        clinical_score = result.get("clinical_score", 0.7)
        overall_score = result.get("overall_score", 0.7)

        # Create feedback notes
        agent_notes_to_add = []
        weaknesses = result.get("weaknesses", [])
        if weaknesses:
            for weakness in weaknesses[:3]:  # Limit to top 3
                agent_notes_to_add.append(
                    {
                        "agent_name": self.name,
                        "note": f"IMPROVEMENT NEEDED: {weakness}",
                        "target_agent": "Drafter",
                        "timestamp": state.get("last_update"),
                        "priority": "warning",
                    }
                )

        # Log activity
        log_agent_activity(
            session_id=state.get("session_id", "unknown"),
            agent_name=self.name,
            action="clinical_critique",
            reasoning=f"Empathy: {empathy_score}, Clinical: {clinical_score}, Overall: {overall_score}",
        )

        return {
            "empathy_score": empathy_score,
            "clinical_score": clinical_score,
            "status": "critiqued",
            "agent_notes": agent_notes_to_add,  # Return as list to be appended by reducer
            "active_agent": self.name,
        }


class SupervisorAgent:
    """Orchestrates the workflow and decides when drafts are ready"""

    def __init__(self, llm=None):
        self.llm = llm or get_llm()
        self.name = "Supervisor"

    async def decide(self, state: ProtocolState) -> Dict[str, Any]:
        """Decide next action based on current state"""
        iteration = state["iteration_count"]
        max_iterations = state["max_iterations"]
        safety_score = state.get("safety_score")
        empathy_score = state.get("empathy_score")
        clinical_score = state.get("clinical_score")

        # Prepare notes to add
        agent_notes_to_add = []

        # Check if we've exceeded max iterations
        if iteration >= max_iterations:
            agent_notes_to_add.append(
                {
                    "agent_name": self.name,
                    "note": f"Reached max iterations ({max_iterations}). Finalizing current draft.",
                    "target_agent": None,
                    "timestamp": state.get("last_update"),
                    "priority": "warning",
                }
            )
            return {
                "next_agent": "HALT",
                "status": "max_iterations_reached",
                "agent_notes": agent_notes_to_add,
                "active_agent": self.name,
            }

        # Check if draft meets quality thresholds
        quality_ok = (
            safety_score is not None
            and safety_score >= 0.8
            and empathy_score is not None
            and empathy_score >= 0.7
            and clinical_score is not None
            and clinical_score >= 0.7
        )

        if quality_ok and state.get("current_draft"):
            # Ready for human review
            agent_notes_to_add.append(
                {
                    "agent_name": self.name,
                    "note": "Draft meets quality thresholds. Ready for human review.",
                    "target_agent": None,
                    "timestamp": state.get("last_update"),
                    "priority": "info",
                }
            )
            # Log activity
            log_agent_activity(
                session_id=state.get("session_id", "unknown"),
                agent_name=self.name,
                action="approved_for_review",
                reasoning=f"Quality thresholds met. Safety: {safety_score}, Empathy: {empathy_score}, Clinical: {clinical_score}",
            )
            return {
                "next_agent": "finalize",
                "status": "ready_for_review",
                "halted": False,
                "agent_notes": agent_notes_to_add,
                "active_agent": self.name,
            }

        # Check if we need another iteration
        needs_revision = (
            safety_score is not None
            and safety_score < 0.8
            or empathy_score is not None
            and empathy_score < 0.7
            or clinical_score is not None
            and clinical_score < 0.7
        )

        if needs_revision and iteration < max_iterations:
            agent_notes_to_add.append(
                {
                    "agent_name": self.name,
                    "note": f"Quality scores below threshold. Requesting revision (iteration {iteration + 1}/{max_iterations})",
                    "target_agent": None,
                    "timestamp": state.get("last_update"),
                    "priority": "warning",
                }
            )
            return {
                "next_agent": "Drafter",
                "status": "needs_revision",
                "agent_notes": agent_notes_to_add,
                "active_agent": self.name,
            }

        # Default: continue with drafting if no draft exists
        if not state.get("current_draft"):
            return {
                "next_agent": "Drafter",
                "status": "initial_draft_needed",
                "agent_notes": agent_notes_to_add,
                "active_agent": self.name,
            }

        return {
            "next_agent": "finalize",
            "status": "awaiting_review",
            "halted": False,
            "agent_notes": agent_notes_to_add,
            "active_agent": self.name,
        }

    async def supervise(self, state: ProtocolState) -> Dict[str, Any]:
        """Wrapper node that runs decide() and returns the decision."""
        decision = await self.decide(state)
        # Don't modify state in place, just return the decision
        return decision

    async def finalize(self, state: ProtocolState) -> Dict[str, Any]:
        """Convert final draft to final_response for Bindu output."""
        draft = state.get("current_draft", "").strip()
        return {
            "final_response": draft or "[empty draft]",
            "halted": True,
            "status": "completed",
            "active_agent": self.name,
        }

database.py

"""
Database setup and utilities for persistence
"""

from sqlalchemy import (
    create_engine,
    Column,
    Integer,
    String,
    Text,
    DateTime,
    Float,
    JSON,
)
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import sessionmaker
import os
from datetime import datetime

Base = declarative_base()


class ProtocolSession(Base):
    """Database model for protocol generation sessions"""

    __tablename__ = "protocol_sessions"

    id = Column(Integer, primary_key=True, index=True)
    session_id = Column(String, unique=True, index=True)
    user_intent = Column(Text)
    status = Column(String)
    current_draft = Column(Text, nullable=True)
    final_protocol = Column(Text, nullable=True)
    safety_score = Column(Float, nullable=True)
    empathy_score = Column(Float, nullable=True)
    clinical_score = Column(Float, nullable=True)
    iteration_count = Column(Integer, default=0)
    created_at = Column(DateTime, default=datetime.now)
    updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
    meta_data = Column(
        JSON, default=dict
    )  # Renamed from 'metadata' to avoid SQLAlchemy reserved word conflict


class AgentActivity(Base):
    """Log of agent activities for history tracking"""

    __tablename__ = "agent_activities"

    id = Column(Integer, primary_key=True, index=True)
    session_id = Column(String, index=True)
    agent_name = Column(String)
    action = Column(String)
    reasoning = Column(Text, nullable=True)
    timestamp = Column(DateTime, default=datetime.now)
    state_snapshot = Column(JSON, nullable=True)


# Database setup
engine = None
SessionLocal = None


def init_db():
    """Initialize database connection and create tables"""
    global engine, SessionLocal

    db_url = os.getenv("DATABASE_URL", "sqlite:///./cerina_foundry.db")

    # Handle SQLite path
    if db_url.startswith("sqlite:///"):
        db_path = db_url.replace("sqlite:///", "")
        os.makedirs(
            os.path.dirname(db_path) if os.path.dirname(db_path) else ".", exist_ok=True
        )
        engine = create_engine(
            db_url,
            connect_args={"check_same_thread": False} if "sqlite" in db_url else {},
        )
    else:
        engine = create_engine(db_url)

    # Create tables
    Base.metadata.create_all(bind=engine)

    # Create session factory
    SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

    return engine, SessionLocal


def get_db_session():
    """Get database session"""
    if SessionLocal is None:
        init_db()
    return SessionLocal()

langgraph_integration.py

import os
from pathlib import Path
from typing import Any, Dict, Optional

from dotenv import load_dotenv

# Load .env from cbt folder
cbt_env_path = Path(__file__).parent / ".env"
load_dotenv(dotenv_path=cbt_env_path)

from langchain_openai import ChatOpenAI
from workflow import create_workflow
from state import create_initial_state


def get_llm_client(api_key: Optional[str] = None) -> ChatOpenAI:
    api_key = api_key or os.getenv("OPENROUTER_API_KEY")
    if not api_key:
        raise ValueError("OPENROUTER_API_KEY not set")

    base_url = "https://openrouter.ai/api/v1"
    model_name = "openai/gpt-oss-120b"
    return ChatOpenAI(
        api_key=api_key,
        base_url=base_url,
        model=model_name,
        temperature=0.7,
    )


class LangGraphWorkflowAdapter:
    def __init__(self):
        self.workflow = None
        self._initialized = False

    def _ensure_initialized(self):
        if not self._initialized:
            llm = get_llm_client()
            self.workflow = create_workflow(llm)
            self._initialized = True

    async def invoke(
        self,
        user_intent: str,
        thread_id: str,
        task_id: str,
        metadata: Optional[Dict[str, Any]] = None,
    ):
        self._ensure_initialized()

        # Use Cerina's create_initial_state helper to ensure all required fields are present
        input_state = create_initial_state(
            user_intent=user_intent, session_id=thread_id, max_iterations=3
        )

        # Update with our metadata
        input_state["metadata"] = metadata or {}
        input_state["metadata"]["task_id"] = task_id

        config = {"configurable": {"thread_id": thread_id}}

        # Invoke the workflow and get the final state
        try:
            # Use ainvoke directly - simpler and more reliable than manual astream merging
            final_state = await self.workflow.ainvoke(input_state, config)

        except Exception as e:
            import traceback

            print(f"DEBUG: astream error: {e}")
            print(f"DEBUG: Traceback: {traceback.format_exc()}")
            # Fall back to ainvoke if astream fails
            try:
                final_state = await self.workflow.ainvoke(input_state, config)
            except Exception as e2:
                print(f"DEBUG: ainvoke also failed: {e2}")
                raise

        # Convert to dict if it's a Pydantic model, with fallback
        try:
            if hasattr(final_state, "dict"):
                state_dict = final_state.dict()
            elif hasattr(final_state, "model_dump"):
                state_dict = final_state.model_dump()
            elif isinstance(final_state, dict):
                # Already a dict, use as-is
                state_dict = final_state
            else:
                # Try to convert to dict
                try:
                    state_dict = dict(final_state)
                except:
                    # If that fails, try vars()
                    state_dict = (
                        vars(final_state) if hasattr(final_state, "__dict__") else {}
                    )

            # Remove node execution metadata that shouldn't be in the state
            node_names = {
                "init",
                "draft",
                "safety_review",
                "clinical_critique",
                "supervisor",
                "finalize",
                "__pregel_pull__",
            }
            clean_state = {k: v for k, v in state_dict.items() if k not in node_names}

            return clean_state if clean_state else state_dict

        except Exception as e:
            # If conversion fails, try to extract as much as we can
            import json

            try:
                # Try JSON serialization as last resort
                json_str = json.dumps(final_state, default=str)
                parsed = json.loads(json_str)
                # Remove node metadata
                node_names = {
                    "init",
                    "draft",
                    "safety_review",
                    "clinical_critique",
                    "supervisor",
                    "finalize",
                    "__pregel_pull__",
                }
                return {k: v for k, v in parsed.items() if k not in node_names}
            except:
                # Return empty dict if all else fails
                print(f"WARNING: Could not convert final_state to dict: {e}")
                return {}

state.py

"""
Deep State Management - The "Blackboard" Pattern
Rich, structured state that agents share and collaborate on.
"""

from typing import TypedDict, List, Dict, Any, Optional, Annotated
from datetime import datetime


def merge_dicts(a: Optional[Dict], b: Optional[Dict]) -> Dict:
    """Merge two dictionaries, with b taking precedence"""
    if a is None:
        return b or {}
    if b is None:
        return a or {}
    result = dict(a)
    result.update(b)
    return result


def append_list(a: List, b: List) -> List:
    """Append two lists"""
    return (a or []) + (b or [])


def last_value_str(a: Optional[str], b: Optional[str]) -> Optional[str]:
    """Return the second (latest) non-None value"""
    return b if b is not None else a


def last_value_int(a: Optional[int], b: Optional[int]) -> Optional[int]:
    """Return the second (latest) non-None value"""
    return b if b is not None else a


def last_value_float(a: Optional[float], b: Optional[float]) -> Optional[float]:
    """Return the second (latest) non-None value"""
    return b if b is not None else a


class AgentNote(TypedDict):
    """Notes agents leave for each other on the blackboard"""

    agent_name: str
    note: str
    target_agent: Optional[str]  # If None, note is for all agents
    timestamp: datetime
    priority: str  # "info", "warning", "critical"


class ProtocolDraft(TypedDict):
    """A versioned draft of the CBT exercise"""

    content: str
    version: int
    iteration: int
    created_by: str
    timestamp: datetime
    safety_score: Optional[float]
    empathy_score: Optional[float]
    clinical_score: Optional[float]
    feedback: List[str]  # Feedback from other agents


class ProtocolState(TypedDict):
    """
    The shared state (blackboard) that all agents read from and write to.
    This is the central workspace for the multi-agent collaboration.
    """

    # Core workflow state
    user_intent: str
    session_id: str
    status: Annotated[
        str, last_value_str
    ]  # "initializing", "drafting", "reviewing", "awaiting_approval", "approved", "completed"

    # Draft management
    current_draft: Annotated[
        Optional[str], last_value_str
    ]  # The latest draft content - takes last write
    draft_history: Annotated[List[ProtocolDraft], append_list]  # All previous drafts
    current_version: Annotated[int, last_value_int]
    iteration_count: Annotated[int, last_value_int]
    max_iterations: int  # Safety limit
    final_response: Annotated[Optional[str], last_value_str]  # Final output for user

    # Agent collaboration
    agent_notes: Annotated[List[AgentNote], append_list]  # The blackboard scratchpad
    active_agent: Annotated[
        Optional[str], last_value_str
    ]  # Which agent is currently working
    agent_decisions: Annotated[
        Dict[str, Any], merge_dicts
    ]  # Decisions made by each agent

    # Safety and quality metrics
    safety_checks: Annotated[
        Dict[str, bool], merge_dicts
    ]  # Results from safety guardian
    safety_score: Annotated[Optional[float], last_value_float]
    empathy_score: Annotated[Optional[float], last_value_float]
    clinical_score: Annotated[Optional[float], last_value_float]

    # Human-in-the-loop
    halted: Annotated[
        bool, last_value_int
    ]  # Whether workflow is paused for human review
    human_approved: Annotated[bool, last_value_int]
    human_edits: Annotated[Optional[str], last_value_str]  # Edits made by human

    # Metadata
    start_time: datetime
    last_update: (
        datetime  # Not annotated - updated by nodes managing their own timestamps
    )
    metadata: Annotated[Dict[str, Any], merge_dicts]  # Additional context


def create_initial_state(
    user_intent: str, session_id: str, max_iterations: int = 5
) -> ProtocolState:
    """Create initial state for a new protocol generation"""
    now = datetime.now()
    return ProtocolState(
        user_intent=user_intent,
        session_id=session_id,
        status="initializing",
        current_draft=None,
        draft_history=[],
        current_version=0,
        iteration_count=0,
        max_iterations=max_iterations,
        final_response=None,
        agent_notes=[],
        active_agent=None,
        agent_decisions={},
        safety_checks={},
        safety_score=None,
        empathy_score=None,
        clinical_score=None,
        halted=False,
        human_approved=False,
        human_edits=None,
        start_time=now,
        last_update=now,
        metadata={},
    )


def add_agent_note(
    state: ProtocolState,
    agent_name: str,
    note: str,
    target_agent: Optional[str] = None,
    priority: str = "info",
) -> ProtocolState:
    """Helper to add a note to the blackboard"""
    new_note = AgentNote(
        agent_name=agent_name,
        note=note,
        target_agent=target_agent,
        timestamp=datetime.now(),
        priority=priority,
    )
    state["agent_notes"].append(new_note)
    state["last_update"] = datetime.now()
    return state


def add_draft_version(
    state: ProtocolState,
    content: str,
    created_by: str,
    safety_score: Optional[float] = None,
    empathy_score: Optional[float] = None,
    clinical_score: Optional[float] = None,
) -> ProtocolState:
    """Helper to add a new draft version to history"""
    new_draft = ProtocolDraft(
        content=content,
        version=state["current_version"] + 1,
        iteration=state["iteration_count"],
        created_by=created_by,
        timestamp=datetime.now(),
        safety_score=safety_score,
        empathy_score=empathy_score,
        clinical_score=clinical_score,
        feedback=[],
    )
    state["draft_history"].append(new_draft)
    state["current_version"] = new_draft["version"]
    state["current_draft"] = content
    state["last_update"] = datetime.now()
    return state

state_mapper.py

"""Map Cerina ProtocolState to/from Bindu artifacts and messages."""

from uuid import UUID
from typing import Any, Dict, List

from bindu.common.protocol.types import Artifact, Part


def extract_text_from_bindu_message(messages: List[Dict[str, Any]]) -> str:
    """Extract user input text from Bindu message format.

    Args:
        messages: List of Bindu message dictionaries

    Returns:
        User's text input
    """
    if not messages:
        return ""

    last_message = messages[-1]

    print(f"\nDEBUG extract_text: Full message: {last_message}")
    print(f"DEBUG extract_text: Message keys: {list(last_message.keys())}")
    content_val = last_message.get("content")
    print(f"DEBUG extract_text: Type of 'content': {type(content_val)}")
    print(f"DEBUG extract_text: Value of 'content': {content_val}")

    # Try to extract text from different possible formats
    # Format 1: "content" field (string - most common)
    if "content" in last_message:
        content = last_message["content"]
        if isinstance(content, str):
            text = content.strip()
            print(f"DEBUG extract_text: Found in 'content' (string): {text[:80]}...")
            return text
        # If content is a dict, try to extract text from it
        elif isinstance(content, dict):
            if "text" in content:
                text = content["text"]
                if isinstance(text, str):
                    text = text.strip()
                    print(
                        f"DEBUG extract_text: Found in 'content.text': {text[:80]}..."
                    )
                    return text
            # Try body or message field in content dict
            for key in ["body", "message", "value"]:
                if key in content and isinstance(content[key], str):
                    text = content[key].strip()
                    print(
                        f"DEBUG extract_text: Found in 'content.{key}': {text[:80]}..."
                    )
                    return text

    # Format 2: "parts" field (alternative format)
    parts = last_message.get("parts", [])
    if parts and isinstance(parts, list):
        text_parts = []
        for p in parts:
            if isinstance(p, dict):
                if p.get("kind") == "text":
                    text_parts.append(p.get("text", ""))
                elif "text" in p:
                    text_parts.append(p["text"])
        text = " ".join(filter(None, text_parts)).strip()
        if text:
            print(f"DEBUG extract_text: Found in 'parts': {text[:80]}...")
            return text

    # Format 3: "message" field
    if "message" in last_message:
        text = last_message["message"]
        if isinstance(text, str):
            text = text.strip()
            print(f"DEBUG extract_text: Found in 'message' (string): {text[:80]}...")
            return text

    # Format 4: "text" field
    if "text" in last_message:
        text = last_message["text"]
        if isinstance(text, str):
            text = text.strip()
            print(f"DEBUG extract_text: Found in 'text': {text[:80]}...")
            return text

    print("DEBUG extract_text: No text found, returning empty string")
    return ""


def build_langgraph_input(
    bindu_messages: List[Dict[str, Any]],
    context_id: UUID,
    task_id: UUID,
) -> Dict[str, Any]:
    """Convert Bindu message to LangGraph workflow input.

    Args:
        bindu_messages: Bindu message history
        context_id: Bindu context ID (maps to LangGraph thread_id)
        task_id: Bindu task ID

    Returns:
        LangGraph input dictionary
    """
    user_input = extract_text_from_bindu_message(bindu_messages)

    # Map Bindu context_id to LangGraph thread_id
    # Format: "bindu_{context_id}" allows multi-turn conversations
    thread_id = f"bindu_{context_id}"

    return {
        "user_intent": user_input,
        "thread_id": thread_id,
        "task_id": str(task_id),
        # Add any other metadata from Bindu messages
        "metadata": {
            "context_id": str(context_id),
            "task_id": str(task_id),
        },
    }


def protocol_state_to_bindu_artifact(
    final_state: Dict[str, Any], artifact_id: UUID
) -> Artifact:
    # Convert ProtocolState → dict if needed
    if hasattr(final_state, "dict"):
        try:
            final_state = final_state.dict()
        except Exception as e:
            # If dict() fails, try model_dump or vars
            print(f"DEBUG: Failed to call .dict(): {e}")
            if hasattr(final_state, "model_dump"):
                try:
                    final_state = final_state.model_dump()
                except Exception as e2:
                    print(f"DEBUG: Failed to call .model_dump(): {e2}")
                    final_state = (
                        vars(final_state) if hasattr(final_state, "__dict__") else {}
                    )
            else:
                final_state = (
                    vars(final_state) if hasattr(final_state, "__dict__") else {}
                )

    # Debug: print available keys
    print(
        f"DEBUG: Available keys in final_state: {list(final_state.keys()) if isinstance(final_state, dict) else 'Not a dict'}"
    )

    # Extract fields from the proper ProtocolState structure
    final_response = final_state.get("current_draft", "")
    draft_history = final_state.get("draft_history", [])
    agent_notes = final_state.get("agent_notes", [])
    safety_score = final_state.get("safety_score")
    empathy_score = final_state.get("empathy_score")
    clinical_score = final_state.get("clinical_score")
    safety_checks = final_state.get("safety_checks", {})
    agent_decisions = final_state.get("agent_decisions", {})
    trace = final_state.get("trace", [])

    parts: List[Part] = [
        {"kind": "text", "text": final_response or "[No draft generated]"}
    ]

    # Include structured debugging metadata
    parts.append(
        {
            "kind": "data",
            "data": {
                "draft": final_response,
                "draft_history": draft_history,
                "safety_score": safety_score,
                "empathy_score": empathy_score,
                "clinical_score": clinical_score,
                "safety_checks": safety_checks,
                "agent_decisions": agent_decisions,
                "agent_notes": agent_notes,
                "trace": final_state.get("trace", []),
            },
        }
    )

    return {
        "artifact_id": artifact_id,
        "name": "cbt_exercise",
        "description": "CBT exercise generated via Cerina Protocol Foundry (LangGraph)",
        "parts": parts,
        "metadata": {
            "source": "cerina",
            "pipeline": "drafter → safety → critic → supervisor",
            "final_draft": final_response,
            "draft_history": draft_history,
            "safety_score": safety_score,
            "empathy_score": empathy_score,
            "clinical_score": clinical_score,
            "safety_checks": safety_checks,
            "agent_decisions": agent_decisions,
            "agent_notes": agent_notes,
            "trace": final_state.get("trace", []),
        },
    }


def bindu_message_from_artifact(artifact: Artifact) -> List[Dict[str, Any]]:
    text_parts = [
        p.get("text", "") for p in artifact.get("parts", []) if p.get("kind") == "text"
    ]
    content = " ".join(text_parts).strip()

    return [
        {
            "role": "assistant",
            "content": content,
            "artifact": artifact,  # include entire artifact for UI or downstream agents
        }
    ]

utils.py

"""
Utility functions for error handling and logging
"""

import logging
from typing import Any, Dict

# Configure logging
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)


def log_agent_activity(
    session_id: str,
    agent_name: str,
    action: str,
    reasoning: str = None,
    state_snapshot: Dict[str, Any] = None,
):
    """Log agent activity (in-memory logging)"""
    logger.info(f"[{session_id}] {agent_name} - {action}: {reasoning}")


def safe_get_state_value(state: Dict[str, Any], key: str, default: Any = None):
    """Safely get value from state dictionary"""
    try:
        return state.get(key, default)
    except (AttributeError, TypeError):
        return default


def format_error_message(error: Exception) -> str:
    """Format error message for API responses"""
    error_type = type(error).__name__
    error_msg = str(error)
    return f"{error_type}: {error_msg}"

workflow.py

"""
LangGraph Workflow - The Multi-Agent Orchestration
Implements a Supervisor-Worker pattern with autonomous decision-making.
"""

from typing import Literal
from langgraph.graph import StateGraph, END

from state import ProtocolState, add_draft_version
from agents import (
    DrafterAgent,
    SafetyGuardianAgent,
    ClinicalCriticAgent,
    SupervisorAgent,
)


async def draft_node(state: ProtocolState) -> ProtocolState:
    """Node: Drafter creates/revises draft"""
    result = await drafter.draft(state)

    # Update state with draft
    state["current_draft"] = result.get("current_draft", state.get("current_draft"))
    state["active_agent"] = result.get("active_agent", "Drafter")
    state["status"] = result.get("status", "drafting")
    state["iteration_count"] += 1

    # Add draft to history
    if state["current_draft"]:
        state = add_draft_version(state, state["current_draft"], "Drafter")

    return state


async def safety_review_node(state: ProtocolState) -> ProtocolState:
    """Node: Safety Guardian reviews for safety issues"""
    result = await safety_guardian.review(state)

    # Update safety metrics
    if "safety_checks" in result:
        state["safety_checks"].update(result["safety_checks"])
    if "safety_score" in result:
        state["safety_score"] = result["safety_score"]
    state["active_agent"] = "SafetyGuardian"
    state["status"] = result.get("status", "reviewing")

    return state


async def clinical_critique_node(state: ProtocolState) -> ProtocolState:
    """Node: Clinical Critic evaluates quality"""
    result = await clinical_critic.critique(state)

    # Update quality metrics
    if "empathy_score" in result:
        state["empathy_score"] = result["empathy_score"]
    if "clinical_score" in result:
        state["clinical_score"] = result["clinical_score"]
    state["active_agent"] = "ClinicalCritic"
    state["status"] = result.get("status", "critiquing")

    return state


async def finalize_node(state: ProtocolState) -> ProtocolState:
    result = await supervisor.finalize(state)
    state["status"] = "completed"
    state.update(result)
    return state


def should_continue(
    state: ProtocolState,
) -> Literal[
    "draft", "safety_review", "clinical_critique", "supervisor", "halt", "end"
]:
    """
    Router function: Decides which node to execute next based on current state.
    This implements the autonomous decision-making logic.
    """
    active_agent = state.get("active_agent")
    status = state.get("status", "")
    current_draft = state.get("current_draft")

    print(
        f"DEBUG should_continue: active_agent={active_agent}, status={status}, draft_exists={bool(current_draft)}"
    )

    # If halted, wait for human approval
    if state.get("halted"):
        print("DEBUG: Halted, returning 'end'")
        return "end"

    # If human approved, continue to finalization
    if state.get("human_approved"):
        state["halted"] = False
        print("DEBUG: Human approved, returning 'end'")
        return "end"

    # After drafting, always review safety
    if active_agent == "Drafter":
        print("DEBUG: After Drafter, routing to safety_review")
        return "safety_review"

    # After safety review, critique clinically
    if active_agent == "SafetyGuardian":
        print("DEBUG: After SafetyGuardian, routing to clinical_critique")
        return "clinical_critique"

    # After critique, supervisor decides
    if active_agent == "ClinicalCritic":
        print("DEBUG: After ClinicalCritic, routing to supervisor")
        return "supervisor"

    # Supervisor decides next action
    if active_agent == "Supervisor":
        print(f"DEBUG: Supervisor decision - status: {status}")
        # Check supervisor's decision
        if "needs_revision" in status:
            print("DEBUG: Supervisor says needs_revision, routing back to draft")
            return "draft"  # Route back to drafter for revision
        elif "ready_for_review" in status or "awaiting_review" in status:
            print("DEBUG: Supervisor says ready, routing to finalize")
            return "finalize"
        else:
            print("DEBUG: Supervisor status unclear, routing to end")
            return "end"

    # Default: if no draft exists, start with draft
    if not state.get("current_draft"):
        print("DEBUG: No draft exists, routing to draft")
        return "draft"

    # Default: supervisor decides
    print("DEBUG: Default routing to supervisor")
    return "supervisor"


def init_state(state: ProtocolState) -> ProtocolState:
    print(f"\n{'=' * 60}")
    print("INIT_STATE CALLED")
    print(f"  - state keys: {list(state.keys())}")
    print(f"  - user_intent: {state.get('user_intent', 'NOT SET')}")
    print(f"  - session_id: {state.get('session_id', 'NOT SET')}")
    print(f"  - status: {state.get('status', 'NOT SET')}")
    print(f"{'=' * 60}\n")

    state.setdefault("iteration_count", 0)
    state.setdefault("max_iterations", 3)
    state.setdefault("safety_score", None)
    state.setdefault("empathy_score", None)
    state.setdefault("clinical_score", None)
    state.setdefault("current_draft", "")
    state.setdefault("halted", False)
    # Ensure list fields are initialized as lists
    if "agent_notes" not in state:
        state["agent_notes"] = []
    if "draft_history" not in state:
        state["draft_history"] = []
    if "safety_checks" not in state:
        state["safety_checks"] = {}
    if "agent_decisions" not in state:
        state["agent_decisions"] = {}
    if "metadata" not in state:
        state["metadata"] = {}
    return state


def create_workflow(llm):
    """Create and compile the LangGraph workflow"""
    drafter = DrafterAgent(llm)
    safety_guardian = SafetyGuardianAgent(llm)
    clinical_critic = ClinicalCriticAgent(llm)
    supervisor = SupervisorAgent(llm)
    # Create graph
    workflow = StateGraph(ProtocolState)

    # Add nodes - use the agent methods directly, they return partial state updates
    workflow.add_node("init", init_state)
    workflow.set_entry_point("init")
    workflow.add_edge("init", "draft")
    workflow.add_node("draft", drafter.draft)
    workflow.add_node("safety_review", safety_guardian.review)
    workflow.add_node("clinical_critique", clinical_critic.critique)
    workflow.add_node("supervisor", supervisor.supervise)
    workflow.add_node("finalize", supervisor.finalize)

    # Set entry point
    workflow.set_entry_point("draft")

    # Add conditional edges (autonomous routing)
    workflow.add_conditional_edges(
        "draft",
        should_continue,
        {
            "draft": "draft",
            "safety_review": "safety_review",
            "clinical_critique": "clinical_critique",
            "supervisor": "supervisor",
            "halt": END,
            "end": END,
        },
    )

    workflow.add_conditional_edges(
        "safety_review",
        should_continue,
        {
            "draft": "draft",
            "clinical_critique": "clinical_critique",
            "supervisor": "supervisor",
            "halt": END,
            "end": END,
        },
    )

    workflow.add_conditional_edges(
        "clinical_critique",
        should_continue,
        {"draft": "draft", "supervisor": "supervisor", "halt": END, "end": END},
    )

    workflow.add_conditional_edges(
        "supervisor",
        should_continue,
        {
            "draft": "draft",
            "clinical_critique": "clinical_critique",
            "supervisor": "supervisor",
            "finalize": "finalize",
            "halt": END,
            "end": END,
        },
    )

    workflow.add_conditional_edges("finalize", should_continue, {"end": END})

    # Compile workflow (stateless, no persistence)
    app = workflow.compile()
    return app

Skill Configuration

Create skills/cbt-therapy-skill/skill.yaml:
# CBT Therapy Skill
# Cognitive Behavioral Therapy with multi-agent protocol generation

id: cbt-therapy-skill
name: cbt-therapy-skill
version: 1.0.0
author: bindu.builder@getbindu.com

description: |
  Cognitive Behavioral Therapy (CBT) skill that provides personalized therapeutic exercises
  and protocols based on user concerns and mental health needs.

  Features:
  - Multi-agent CBT protocol generation
  - Clinical safety validation
  - Therapeutic quality assessment
  - Personalized exercise creation
  - Evidence-based interventions

  Uses a sophisticated multi-agent workflow with:
  - Drafter Agent: Creates initial CBT exercise drafts
  - Safety Guardian: Validates clinical safety and ethics
  - Clinical Critic: Ensures therapeutic quality and effectiveness

  All protocols are reviewed for safety and clinical appropriateness
  before being provided to users.
tags:
  - cbt
  - therapy
  - mental-health
  - cognitive-behavioral
  - psychological
  - wellness
  - self-help
input_modes:
  - application/json
output_modes:
  - application/json
examples:
  - "I am overwhelmed with sleep problems"
  - "Help me manage my anxiety about work"
  - "I'm struggling with negative thoughts"
  - "Can you help me with stress management?"
  - "I need help coping with depression"
capabilities_detail:
  cbt_protocol_generation:
    supported: true
    description: "Generates personalized CBT exercises and protocols"
  safety_validation:
    supported: true
    description: "Clinical safety and ethics validation for all protocols"
  quality_assessment:
    supported: true
    description: "Therapeutic quality and effectiveness assessment"
  multi_agent_workflow:
    supported: true
    description: "Sophisticated multi-agent orchestration for protocol creation"
  evidence_based:
    supported: true
    description: "Evidence-based CBT interventions and techniques"

How It Works

Multi-Agent Workflow
  • Drafter Agent: Creates initial CBT exercise draft
  • Safety Guardian: Validates clinical safety and ethics
  • Clinical Critic: Reviews therapeutic quality
LangGraph Integration
  • Uses LangGraph for workflow orchestration
  • Stateless execution (one-shot invocation)
  • Maps Bindu context to LangGraph thread
State Mapping
  • Converts Bindu messages to LangGraph input
  • Transforms ProtocolState to Bindu artifacts
  • Includes safety scores and clinical feedback
Workflow Flow
  1. Extract user concern from Bindu message
  2. Map context_id to LangGraph thread_id
  3. Invoke multi-agent workflow (Drafter → Safety → Critic)
  4. Convert ProtocolState to Bindu artifact format
  5. Return structured CBT exercise with metadata
Safety & Quality
  • Safety score (0-100) from Safety Guardian
  • Clinical score (0-100) from Clinical Critic
  • Safety verdict validation
  • Quality feedback in metadata

Dependencies

uv init
uv add bindu langchain langchain-openai langgraph sqlalchemy python-dotenv

Environment Setup

Create .env file:
OPENROUTER_API_KEY=your_openrouter_api_key_here

Run

uv run supervisor_cbt.py
Examples:
  • “I am overwhelmed with sleep problems”
  • “Can you help me create a coping strategy for anxiety?”
  • “What are the key principles of CBT for depression?”

Example API Calls

{
  "jsonrpc": "2.0",
  "method": "message/send",
  "params": {
    "message": {
      "role": "user",
      "kind": "message",
      "messageId": "9f11c870-5616-49ad-b187-d93cbb100001",
      "contextId": "9f11c870-5616-49ad-b187-d93cbb100002",
      "taskId": "9f11c870-5616-49ad-b187-d93cbb100003",
      "parts": [
        {
          "kind": "text",
          "text": "I am overwhelmed with sleep problems"
        }
      ]
    },
     "skillId": "cbt-therapy-skill",
    "configuration": {
      "acceptedOutputModes": ["application/json"]
    }
  },
  "id": "9f11c870-5616-49ad-b187-d93cbb100003"
}
{
  "jsonrpc": "2.0",
  "method": "tasks/get",
  "params": {
    "taskId": "9f11c870-5616-49ad-b187-d93cbb100003"
  },
  "id": "9f11c870-5616-49ad-b187-d93cbb100004"
}

Frontend Setup

# Clone the Bindu repository
git clone https://github.com/GetBindu/Bindu

# Navigate to frontend directory
cd frontend

# Install dependencies
npm install

# Start frontend development server
npm run dev
Open http://localhost:5173 and try to chat with the CBT therapy agent