Skip to main content

Long-Running Task Notification System

Overview

The Long-Running Task Notification System enables real-time webhook notifications for tasks that run longer than typical request timeouts (minutes, hours, or days). This feature allows clients to receive push notifications about task state changes and artifact generation without maintaining persistent connections or polling. This implementation follows the A2A Protocol specification for push notifications.

Table of Contents


Features

Persistent Webhook Storage - Webhook configurations survive server restarts
Dual Registration Paths - Inline (message/send) or separate RPC endpoint
Global Webhook Fallback - Agent-level default webhook for all tasks
Comprehensive Events - Status updates and artifact notifications
A2A Protocol Compliant - Follows official specification
Secure by Design - Token-based authentication and validation

Architecture

Components

┌─────────────────────────────────────────────────────────────┐
│                        Client Application                    │
│  ┌────────────────┐              ┌─────────────────────┐   │
│  │ Task Requester │              │ Webhook Receiver    │   │
│  └────────┬───────┘              └──────────▲──────────┘   │
│           │                                  │               │
└───────────┼──────────────────────────────────┼──────────────┘
            │                                  │
            │ 1. POST /messages/send           │ 4. POST webhook
            │    (with webhook config)         │    (notifications)
            ▼                                  │
┌─────────────────────────────────────────────┼──────────────┐
│                    Bindu Server              │              │
│  ┌──────────────────────────────────────────┴───────────┐  │
│  │              MessageHandlers                          │  │
│  │  • Registers webhook inline                           │  │
│  │  • Persists if long_running=true                      │  │
│  └──────────────────┬────────────────────────────────────┘  │
│                     │                                        │
│                     │ 2. Schedule task                       │
│                     ▼                                        │
│  ┌──────────────────────────────────────────────────────┐  │
│  │              ManifestWorker                           │  │
│  │  • Executes task                                      │  │
│  │  • Generates artifacts                                │  │
│  │  • Calls notification methods                         │  │
│  └──────────────────┬────────────────────────────────────┘  │
│                     │                                        │
│                     │ 3. Notify events                       │
│                     ▼                                        │
│  ┌──────────────────────────────────────────────────────┐  │
│  │         PushNotificationManager                       │  │
│  │  • Manages webhook configs                            │  │
│  │  • Sends HTTP notifications                           │  │
│  │  • Handles global fallback                            │  │
│  └──────────────────┬────────────────────────────────────┘  │
│                     │                                        │
│                     │ Persists to                            │
│                     ▼                                        │
│  ┌──────────────────────────────────────────────────────┐  │
│  │              Storage (PostgreSQL/Memory)              │  │
│  │  • Stores webhook configs                             │  │
│  │  • Survives server restarts                           │  │
│  └───────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────┘

Data Flow

  1. Task Creation: Client sends task with webhook configuration
  2. Registration: MessageHandlers registers webhook (persists if long_running=true)
  3. Execution: ManifestWorker executes task
  4. Notification: PushNotificationManager sends events to webhook
  5. Persistence: Webhook config stored in database for long-running tasks

Quick Start

1. Enable Push Notifications in Agent Manifest

from bindu.common.models import AgentManifest

manifest = AgentManifest(
    name="Data Processor",
    capabilities={
        "push_notifications": True  # Required!
    },
    # Optional: Global webhook for all tasks
    global_webhook_url="https://myapp.com/webhooks/global",
    global_webhook_token="global_secret_token",
    # ... other config
)

2. Send Task with Webhook (Inline Registration)

import requests
from uuid import uuid4

response = requests.post("http://localhost:3773/messages/send", json={
    "jsonrpc": "2.0",
    "id": "req-1",
    "method": "messages/send",
    "params": {
        "message": {
            "message_id": str(uuid4()),
            "task_id": str(uuid4()),
            "context_id": str(uuid4()),
            "kind": "message",
            "role": "user",
            "parts": [{"kind": "text", "text": "Process large dataset"}]
        },
        "configuration": {
            "accepted_output_modes": ["application/json"],
            "long_running": True,  # Persist webhook across restarts
            "push_notification_config": {
                "id": str(uuid4()),
                "url": "https://myapp.com/webhooks/task-updates",
                "token": "secret_abc123"
            }
        }
    }
})

task = response.json()["result"]
print(f"Task created: {task['id']}")

3. Implement Webhook Receiver

from fastapi import FastAPI, Request, Header, HTTPException

app = FastAPI()

@app.post("/webhooks/task-updates")
async def handle_task_update(
    request: Request,
    authorization: str = Header(None)
):
    # Verify token
    if authorization != "Bearer secret_abc123":
        raise HTTPException(status_code=401, detail="Unauthorized")
    
    event = await request.json()
    
    # Handle different event types
    if event["kind"] == "status-update":
        print(f"Task {event['task_id']} state: {event['status']['state']}")
        
        if event["final"]:
            print(f"Task completed!")
    
    elif event["kind"] == "artifact-update":
        print(f"Artifact generated: {event['artifact']['name']}")
    
    return {"status": "received"}

Configuration

Agent Manifest Configuration

@dataclass
class AgentManifest:
    # Enable push notifications
    capabilities: dict = field(default_factory=lambda: {
        "push_notifications": True
    })
    
    # Global webhook (optional)
    global_webhook_url: str | None = None
    global_webhook_token: str | None = None

Message Send Configuration

class MessageSendConfiguration(TypedDict):
    accepted_output_modes: Required[list[str]]
    blocking: NotRequired[bool]
    history_length: NotRequired[int]
    
    # Push notification configuration
    push_notification_config: NotRequired[PushNotificationConfig]
    
    # Long-running flag (enables persistence)
    long_running: NotRequired[bool]  # Default: False

Push Notification Configuration

class PushNotificationConfig(TypedDict):
    id: Required[UUID]
    url: Required[str]  # HTTPS webhook URL
    token: NotRequired[str]  # Bearer token for authentication
    authentication: NotRequired[dict]  # Advanced auth schemes

Webhook Registration

Register webhook when creating the task:
POST /messages/send
{
  "params": {
    "message": {...},
    "configuration": {
      "accepted_output_modes": ["application/json"],
      "long_running": true,
      "push_notification_config": {
        "id": "webhook-123",
        "url": "https://myapp.com/webhook",
        "token": "secret_token"
      }
    }
  }
}
Advantages:
  • ✅ Single API call
  • ✅ No race conditions
  • ✅ Webhook ready before task starts

Method 2: Separate RPC Registration

Register webhook after task creation:
# Step 1: Create task
POST /messages/send
{
  "params": {
    "message": {...},
    "configuration": {
      "accepted_output_modes": ["application/json"]
    }
  }
}
# Returns: {"result": {"id": "task-123", ...}}

# Step 2: Register webhook
POST /rpc
{
  "jsonrpc": "2.0",
  "method": "tasks/pushNotification/set",
  "params": {
    "id": "task-123",
    "long_running": true,
    "push_notification_config": {
      "id": "webhook-456",
      "url": "https://myapp.com/webhook",
      "token": "secret_token"
    }
  }
}
Advantages:
  • ✅ Can update webhook mid-task
  • ✅ Useful for dynamic workflows
Disadvantages:
  • ⚠️ Two API calls
  • ⚠️ Possible race condition for fast tasks

Notification Events

Status Update Event

Sent when task state changes (submitted → working → completed/failed).
{
  "event_id": "550e8400-e29b-41d4-a716-446655440000",
  "sequence": 1,
  "timestamp": "2025-12-26T08:00:00Z",
  "kind": "status-update",
  "task_id": "123e4567-e89b-12d3-a456-426614174000",
  "context_id": "789e0123-e89b-12d3-a456-426614174000",
  "status": {
    "state": "working",
    "timestamp": "2025-12-26T08:00:00Z"
  },
  "final": false
}
Task States:
  • submitted - Task created
  • working - Task executing
  • input-required - Waiting for user input
  • auth-required - Waiting for authentication
  • completed - Task finished successfully
  • failed - Task failed with error
  • canceled - Task canceled by user

Artifact Update Event

Sent when artifacts are generated (currently only on task completion).
{
  "event_id": "550e8400-e29b-41d4-a716-446655440001",
  "sequence": 2,
  "timestamp": "2025-12-26T08:05:00Z",
  "kind": "artifact-update",
  "task_id": "123e4567-e89b-12d3-a456-426614174000",
  "context_id": "789e0123-e89b-12d3-a456-426614174000",
  "artifact": {
    "artifact_id": "456e7890-e89b-12d3-a456-426614174000",
    "name": "results.json",
    "parts": [
      {
        "kind": "data",
        "data": {"status": "success", "records_processed": 10000}
      }
    ]
  }
}

Persistence & Server Restarts

How It Works

When long_running=true:
  1. Registration: Webhook config saved to database
  2. Server Restart: TaskManager loads all persisted configs on startup
  3. Task Continues: Notifications resume automatically

Database Schema

CREATE TABLE webhook_configs (
    task_id UUID PRIMARY KEY,
    config JSONB NOT NULL,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE
);

Example Flow

# Before restart
POST /messages/send (long_running=true)
→ Task created: task-123
→ Webhook persisted to database
→ Server crashes/restarts

# After restart
→ TaskManager.initialize() loads webhook for task-123
→ Task continues executing
→ Notifications still work! ✅

Global Webhook Fallback

Configuration

manifest = AgentManifest(
    name="My Agent",
    capabilities={"push_notifications": True},
    global_webhook_url="https://myapp.com/webhooks/global",
    global_webhook_token="global_secret"
)

Priority Order

  1. Task-specific webhook (highest priority)
  2. Global webhook (fallback)
  3. No webhook (no notifications)

Example

# Task 1: Has explicit webhook
POST /messages/send
{
  "configuration": {
    "push_notification_config": {
      "url": "https://task-specific.com/webhook"
    }
  }
}
# → Uses task-specific webhook

# Task 2: No explicit webhook
POST /messages/send
{
  "configuration": {
    "accepted_output_modes": ["application/json"]
  }
}
# → Uses global webhook (if configured)

Security Considerations

Server-Side (Sending Notifications)

1. Webhook URL Validation

Risk: SSRF attacks, DDoS amplification Mitigation:
# Validate webhook URLs
ALLOWED_DOMAINS = ["myapp.com", "trusted-service.com"]

def validate_webhook_url(url: str) -> bool:
    parsed = urlparse(url)
    
    # Require HTTPS
    if parsed.scheme != "https":
        return False
    
    # Check domain allowlist
    if parsed.hostname not in ALLOWED_DOMAINS:
        return False
    
    # Block internal IPs
    if is_internal_ip(parsed.hostname):
        return False
    
    return True

2. Authentication to Client Webhook

Current: Bearer token in Authorization header
headers = {
    "Authorization": f"Bearer {config['token']}",
    "Content-Type": "application/json"
}
Future: Support for HMAC signatures, JWT, mTLS

Client-Side (Receiving Notifications)

1. Verify Token

@app.post("/webhook")
async def handle_webhook(request: Request, authorization: str = Header(None)):
    expected_token = "Bearer secret_abc123"
    
    if authorization != expected_token:
        raise HTTPException(status_code=401)
    
    # Process event

2. Prevent Replay Attacks

# Check timestamp
event = await request.json()
event_time = datetime.fromisoformat(event["timestamp"])
now = datetime.now(timezone.utc)

if (now - event_time).total_seconds() > 300:  # 5 minutes
    raise HTTPException(status_code=400, detail="Event too old")

# Check event_id uniqueness (store in Redis/DB)
if redis.exists(f"event:{event['event_id']}"):
    raise HTTPException(status_code=400, detail="Duplicate event")

redis.setex(f"event:{event['event_id']}", 600, "1")  # 10 min TTL

3. Validate Event Structure

from pydantic import BaseModel, ValidationError

class StatusUpdateEvent(BaseModel):
    event_id: str
    sequence: int
    timestamp: str
    kind: Literal["status-update"]
    task_id: str
    context_id: str
    status: dict
    final: bool

try:
    event = StatusUpdateEvent(**await request.json())
except ValidationError as e:
    raise HTTPException(status_code=400, detail=str(e))

API Reference

MessageSendConfiguration

class MessageSendConfiguration(TypedDict):
    accepted_output_modes: Required[list[str]]
    blocking: NotRequired[bool]
    history_length: NotRequired[int]
    push_notification_config: NotRequired[PushNotificationConfig]
    long_running: NotRequired[bool]

PushNotificationConfig

class PushNotificationConfig(TypedDict):
    id: Required[UUID]
    url: Required[str]
    token: NotRequired[str]
    authentication: NotRequired[dict]

RPC Methods

tasks/pushNotification/set

Register or update webhook for a task. Request:
{
  "jsonrpc": "2.0",
  "method": "tasks/pushNotification/set",
  "params": {
    "id": "task-123",
    "long_running": true,
    "push_notification_config": {
      "id": "webhook-456",
      "url": "https://myapp.com/webhook",
      "token": "secret"
    }
  }
}
Response:
{
  "jsonrpc": "2.0",
  "result": {
    "id": "task-123",
    "push_notification_config": {
      "id": "webhook-456",
      "url": "https://myapp.com/webhook"
    }
  }
}

tasks/pushNotification/get

Get webhook configuration for a task. Request:
{
  "jsonrpc": "2.0",
  "method": "tasks/pushNotification/get",
  "params": {
    "task_id": "task-123"
  }
}

tasks/pushNotificationConfig/delete

Delete webhook configuration for a task. Request:
{
  "jsonrpc": "2.0",
  "method": "tasks/pushNotificationConfig/delete",
  "params": {
    "task_id": "task-123"
  }
}

Examples

Example 1: Data Processing Pipeline

import requests
from uuid import uuid4

# Start long-running data processing
response = requests.post("http://localhost:3773/messages/send", json={
    "jsonrpc": "2.0",
    "id": "req-1",
    "method": "messages/send",
    "params": {
        "message": {
            "message_id": str(uuid4()),
            "task_id": str(uuid4()),
            "context_id": str(uuid4()),
            "kind": "message",
            "role": "user",
            "parts": [{"kind": "text", "text": "Process 1M records from dataset.csv"}]
        },
        "configuration": {
            "accepted_output_modes": ["application/json"],
            "long_running": True,
            "push_notification_config": {
                "id": str(uuid4()),
                "url": "https://myapp.com/webhooks/data-pipeline",
                "token": "pipeline_secret_123"
            }
        }
    }
})

task_id = response.json()["result"]["id"]
print(f"Data processing started: {task_id}")
print("You will receive webhook notifications as processing progresses")

Example 2: Mobile App with Serverless Webhook

# Mobile app sends task
POST /messages/send
{
  "configuration": {
    "long_running": true,
    "push_notification_config": {
      "url": "https://us-central1-myapp.cloudfunctions.net/taskWebhook",
      "token": "mobile_token_456"
    }
  }
}

# AWS Lambda / Cloud Function webhook handler
def task_webhook(event, context):
    body = json.loads(event['body'])
    
    if body['kind'] == 'status-update' and body['final']:
        # Send push notification to mobile device
        send_fcm_notification(
            device_token=get_device_token(body['task_id']),
            title="Task Complete",
            body=f"Your task has finished"
        )
    
    return {'statusCode': 200}

Example 3: Multi-Tenant with Global Webhook

# Configure agent with global webhook
manifest = AgentManifest(
    name="Multi-Tenant Processor",
    capabilities={"push_notifications": True},
    global_webhook_url="https://api.myplatform.com/webhooks/tasks",
    global_webhook_token="platform_master_token"
)

# Tenant A: Uses global webhook
POST /messages/send
{
  "message": {"parts": [{"text": "Process tenant A data"}]},
  "configuration": {
    "accepted_output_modes": ["application/json"]
  }
}
# → Notifications sent to global webhook

# Tenant B: Uses custom webhook
POST /messages/send
{
  "message": {"parts": [{"text": "Process tenant B data"}]},
  "configuration": {
    "push_notification_config": {
      "url": "https://tenantb.com/webhook",
      "token": "tenant_b_token"
    }
  }
}
# → Notifications sent to tenant B's webhook

Troubleshooting

Webhook Not Receiving Notifications

Check 1: Push notifications enabled?
# Verify in agent manifest
manifest.capabilities["push_notifications"] == True
Check 2: Webhook registered?
# Query webhook config
GET /rpc
{
  "method": "tasks/pushNotification/get",
  "params": {"task_id": "task-123"}
}
Check 3: Webhook URL accessible?
# Test webhook endpoint
curl -X POST https://myapp.com/webhook \
  -H "Authorization: Bearer secret_token" \
  -H "Content-Type: application/json" \
  -d '{"test": "event"}'
Check 4: Check server logs
# Look for notification delivery errors
grep "notification delivery failed" logs/bindu.log
grep "webhook" logs/bindu.log

Webhooks Lost After Server Restart

Problem: long_running flag not set Solution:
# Ensure long_running=true for persistence
"configuration": {
    "long_running": True,  # Required!
    "push_notification_config": {...}
}
Verify persistence:
-- Check database
SELECT task_id, config FROM webhook_configs;

Duplicate Notifications

Problem: Webhook receiver not idempotent Solution: Track event_id to deduplicate
processed_events = set()

@app.post("/webhook")
async def handle_webhook(request: Request):
    event = await request.json()
    event_id = event["event_id"]
    
    if event_id in processed_events:
        return {"status": "duplicate"}
    
    processed_events.add(event_id)
    # Process event...

Authentication Failures

Problem: Token mismatch Solution: Verify token format
# Server sends
Authorization: Bearer secret_token

# Client expects
if authorization == "Bearer secret_token":
    # ✅ Match

Best Practices

1. Use long_running for Tasks > 30 seconds

# Short task (< 30s): No persistence needed
"configuration": {
    "push_notification_config": {...}
}

# Long task (> 30s): Enable persistence
"configuration": {
    "long_running": True,
    "push_notification_config": {...}
}

2. Implement Webhook Retry Logic

@app.post("/webhook")
async def handle_webhook(request: Request):
    try:
        event = await request.json()
        await process_event(event)
        return {"status": "success"}
    except Exception as e:
        logger.error(f"Webhook processing failed: {e}")
        # Return 500 to trigger retry
        raise HTTPException(status_code=500)

3. Use Global Webhook for Simplicity

# Instead of configuring webhook per-task
manifest = AgentManifest(
    global_webhook_url="https://myapp.com/webhook",
    global_webhook_token="shared_token"
)

# All tasks automatically get notifications
POST /messages/send
{
  "configuration": {
    "accepted_output_modes": ["application/json"]
  }
}

4. Monitor Webhook Delivery

# Track delivery metrics
from prometheus_client import Counter

webhook_success = Counter('webhook_delivery_success_total')
webhook_failure = Counter('webhook_delivery_failure_total')

# In notification service
try:
    await send_webhook(config, event)
    webhook_success.inc()
except Exception:
    webhook_failure.inc()

5. Secure Webhook URLs

# Use HTTPS only
"url": "https://myapp.com/webhook"  # ✅
"url": "http://myapp.com/webhook"   # ❌

# Avoid exposing internal services
"url": "https://localhost:8080/webhook"      # ❌
"url": "https://192.168.1.100/webhook"       # ❌
"url": "https://public.myapp.com/webhook"    # ✅

Migration Guide

From Polling to Push Notifications

Before (Polling):
# Create task
task = create_task(...)

# Poll for completion
while True:
    task = get_task(task["id"])
    if task["status"]["state"] in ["completed", "failed"]:
        break
    time.sleep(5)  # Poll every 5 seconds
After (Push Notifications):
# Create task with webhook
task = create_task(
    ...,
    configuration={
        "long_running": True,
        "push_notification_config": {
            "url": "https://myapp.com/webhook",
            "token": "secret"
        }
    }
)

# Webhook receives notification when complete
# No polling needed!

Performance Considerations

Webhook Delivery Latency

  • Target: < 1 second from event to webhook delivery
  • Factors: Network latency, webhook endpoint response time
  • Optimization: Use async HTTP client, connection pooling

Database Load

  • Webhook configs: Loaded once on startup, cached in memory
  • Impact: Minimal - only write on registration, read on startup
  • Scaling: Supports 10,000+ concurrent long-running tasks

Memory Usage

  • Per webhook config: ~200 bytes (in-memory)
  • 10,000 webhooks: ~2 MB memory
  • Negligible impact on server resources

References


Support

For issues or questions:
Last Updated: December 26, 2025
Version: 1.0.0