跳转至

Pydantic + Bedrock Tool-Based Validation: Technical Flow Documentation

⚠️ CRITICAL API REQUIREMENT: AWS Bedrock Converse API requires system prompts to be passed via the system parameter, NOT in the messages array. Messages can only contain "user" or "assistant" roles. This differs from OpenAI and other LLM APIs. All examples in this document have been updated to reflect this requirement.

Executive Summary

This document explains how CloudAudioAI uses Pydantic models with AWS Bedrock's tool-calling feature to guarantee structured, validated JSON output from AI analysis. This approach eliminates parsing errors and ensures 100% schema compliance.

The Problem We're Solving

Traditional Approach (Unreliable)

# Just asking the AI nicely
prompt = "Please return JSON with these fields: category, staff_name, etc."
response = bedrock.invoke(prompt)

# Problems:
# - AI might return: "Here's the JSON you requested: {...}" (extra text)
# - Missing required fields
# - Wrong data types
# - Invalid enum values
# - Inconsistent structure

Our Solution (Guaranteed Structure)

# Force AI to fill a structured form
tool = create_tool_from_pydantic(CallAnalysisModel)
response = bedrock.invoke_with_tool(prompt, tool)

# Guaranteed:
# ✓ No extra text
# ✓ All required fields present
# ✓ Correct data types
# ✓ Valid enum values
# ✓ Consistent structure every time

Architecture Overview

┌─────────────────┐      ┌─────────────────┐      ┌─────────────────┐
│                 │      │                 │      │                 │
│   models.py     │─────▶│ bedrock_call.py │─────▶│  AWS Bedrock    │
│                 │      │                 │      │                 │
│  Pydantic       │      │  Converts to    │      │  Fills the      │
│  Schema         │      │  Tool Spec      │      │  Form           │
│  Definition     │      │                 │      │                 │
└─────────────────┘      └─────────────────┘      └─────────────────┘
        │                         │                         │
        │                         │                         │
        ▼                         ▼                         ▼
   WHAT fields              HOW to call               SMART filling
   are required             Bedrock                   based on prompt

┌─────────────────────────────────────────────────────────────────┐
│                         prompt-v6.txt                           │
│                                                                 │
│  Business Logic:                                                │
│  - WHEN to use "revenue_impacting" vs "service"               │
│  - HOW to classify cancellations                              │
│  - WHAT keywords indicate intro booking                        │
└─────────────────────────────────────────────────────────────────┘

Component Responsibilities

1. models.py - The Form Structure

Purpose: Define WHAT data we want, not HOW to get it

from pydantic import BaseModel, Field, validator
from typing import Literal

class CallAnalysis(BaseModel):
    """The exact structure of our expected output"""

    # Simple fields
    staff_name: str  # Must be a string

    # Enum fields (restricted values)
    category: Literal["revenue_impacting", "service", "scheduling", "other"]

    # Nested structures
    primary_topic: PrimaryTopic

    # Validation rules
    @validator('category')
    def validate_category(cls, v, values):
        # Business rule: if subcategory is "intro_booking",
        # category MUST be "revenue_impacting"
        if 'subcategory' in values:
            if values['subcategory'] == 'intro_booking' and v != 'revenue_impacting':
                raise ValueError('Intro booking must be revenue_impacting')
        return v

What models.py does NOT do: - Doesn't know WHY a call is "revenue_impacting" - Doesn't know HOW to identify staff names - Doesn't contain decision logic

2. bedrock_call.py - The Orchestrator

Purpose: Force Bedrock to use structured output

import boto3
import json
from models import CallAnalysis

def ask_claude_for_analysis(transcript: str, prompt_text: str) -> dict:
    """
    Forces Bedrock to respond with structured data

    Parameters:
    -----------
    transcript: The call transcript to analyze
    prompt_text: Business logic and instructions

    Returns:
    --------
    Validated, structured JSON matching CallAnalysis model
    """

    # Step 1: Convert Pydantic to Tool Specification
    tool_spec = {
        "name": "record_call_analysis",
        "description": "Record the structured analysis of the call",
        "inputSchema": {
            "json": CallAnalysis.model_json_schema()
        }
    }

    # Step 2: Call Bedrock with Tool Mode
    bedrock = boto3.client('bedrock-runtime')
    response = bedrock.converse(
        modelId='anthropic.claude-3-sonnet',
        system=[{"text": prompt_text}],  # Business logic in system parameter
        messages=[
            {
                "role": "user",
                "content": [{"text": f"Transcript: {transcript}"}]
            }
        ],
        toolConfig={
            "tools": [{"toolSpec": tool_spec}]  # THE FORM
        }
    )

    # Step 3: Extract Tool Response
    # Bedrock MUST respond with toolUse block
    content = response['output']['message']['content']
    tool_use = next(c for c in content if 'toolUse' in c)

    # Step 4: Validate with Pydantic
    result_data = tool_use['toolUse']['input']
    validated = CallAnalysis.model_validate(result_data)

    return validated.model_dump()

3. prompt-v6.txt - The Business Logic

Purpose: Tell AI HOW to make decisions

DECISION LOGIC FOR CATEGORIZATION:

Priority Order (MUST follow):
1. If ANY revenue discussion → category = "revenue_impacting"
2. If customer service issue → category = "service"
3. If scheduling only → category = "scheduling"
4. Everything else → category = "other"

SPECIAL RULES:

Intro Booking Detection:
- MUST have "$12" or "first class" or "intro" mentioned
- MUST involve credit card capture
- Exception: If staff calling about online booking → "service" (not revenue)

Cancellation Outcome Logic:
- outcome = "success" IF customer keeps ANY membership
- outcome = "attempted" IF customer cancels despite retention
- outcome = "na" IF no retention attempt made

Staff Name Detection:
Look for patterns:
- "This is [NAME] from..."
- "My name is [NAME]"
- Check against known staff list: {STAFF_LIST}

The Complete Flow

Step 1: Lambda Loads Components

# In TranscriptionResultProcessor
def process_transcript(transcript):
    # Load prompt with business logic
    prompt = load_from_s3("prompts/v6/prompt.txt")

    # Load staff list
    staff = load_from_s3("staff/staff-registry.json")
    prompt = prompt.replace("{STAFF_LIST}", json.dumps(staff))

    # Get the structured output
    result = ask_claude_for_analysis(transcript, prompt)

Step 2: Bedrock Receives Request

What Bedrock Sees:

{
  "system": [
    {"text": "You are a call analyst. [BUSINESS LOGIC HERE]..."}
  ],
  "messages": [
    {
      "role": "user",
      "content": [{"text": "Transcript: Hi, this is Drew from Orange Theory..."}]
    }
  ],
  "toolConfig": {
    "tools": [{
      "name": "record_call_analysis",
      "inputSchema": {
        "type": "object",
        "properties": {
          "staff_name": {"type": "string"},
          "category": {"enum": ["revenue_impacting", "service", "scheduling", "other"]}
        },
        "required": ["staff_name", "category"]
      }
    }]
  }
}

Step 3: AI Processes with Constraints

AI's Internal Process: 1. Read transcript: "Hi, this is Drew from Orange Theory, I see you booked online..." 2. Apply business logic from prompt: - Staff introduced as "Drew" ✓ - Mentions "booked online" → This is booking_confirmation - Booking confirmation → category = "service" (per special rule) 3. Fill the tool form with these values

Step 4: Structured Response

Bedrock MUST Respond With:

{
  "toolUse": {
    "name": "record_call_analysis",
    "input": {
      "staff_name": "Drew",
      "category": "service",
      "subcategory": "booking_confirmation",
      "customer_type": "prospective_client",
      "outcome": {
        "result": "success"
      }
    }
  }
}

Not allowed to respond with: - Free text explanation - Partial data - Different structure - Missing required fields

Step 5: Automatic Validation

# Pydantic validates automatically
try:
    validated = CallAnalysis.model_validate(tool_response['input'])
    # ✓ All required fields present
    # ✓ Correct data types
    # ✓ Valid enum values
    # ✓ Business rules pass
except ValidationError as e:
    # Retry with error feedback
    error_prompt = f"{prompt}\n\nFix these errors:\n{e.errors()}"
    retry_result = ask_claude_for_analysis(transcript, error_prompt)

Why This Architecture Works

Separation of Concerns

Component Responsibility Changes When
models.py Data structure (WHAT) New fields needed
prompt-v6.txt Business logic (HOW/WHEN) Rules change
bedrock_call.py Orchestration (plumbing) Rarely changes

Benefits

  1. 100% Structured Output
  2. No parsing errors
  3. No missing fields
  4. No type mismatches

  5. Business Logic Flexibility

  6. Update prompts without code changes
  7. Different logic per client
  8. A/B test different rules

  9. Type Safety

  10. Catch errors at validation time
  11. IDE autocomplete
  12. Clear contracts

  13. Multi-Industry Support

  14. Same structure, different prompts
  15. Fitness: "membership_cancel"
  16. Healthcare: "appointment_cancel"
  17. Banking: "account_closure"

Common Patterns

Pattern 1: Adding New Field

# 1. Add to models.py
class CallAnalysis(BaseModel):
    existing_field: str
    new_field: Optional[str]  # New!

# 2. Add to prompt
"Identify the new_field by looking for..."

# 3. No changes to bedrock_call.py needed!

Pattern 2: Changing Business Logic

# Only update prompt-v6.txt
OLD: "If discussing membership → revenue_impacting"
NEW: "If discussing membership AND >$100 → revenue_impacting"

# No code changes needed!

Pattern 3: Client-Specific Rules

# Different prompts per client
orange-theory/prompt.txt: "Intro class = $12"
planet-fitness/prompt.txt: "Intro class = $1 down"
chase-bank/prompt.txt: "New account = checking or savings"

# Same models.py for all!

Error Handling

Validation Failure Flow

1st Attempt → Validation Error → Include errors in prompt → 2nd Attempt
                                                    Still fails? → DLQ

Example Retry Prompt

Original prompt +

VALIDATION ERRORS TO FIX:
- Field 'category' must be one of [revenue_impacting, service, scheduling, other]
- Field 'staff_name' is required
Please call the tool again with corrected values.

Testing the System

Unit Test Example

def test_call_analysis():
    # Test transcript
    transcript = "Hi, this is Drew from Orange Theory..."

    # Expected structure (guaranteed!)
    result = ask_claude_for_analysis(transcript, prompt)

    # Assertions pass because structure is guaranteed
    assert isinstance(result['staff_name'], str)
    assert result['category'] in ['revenue_impacting', 'service', 'scheduling', 'other']
    assert 'primary_topic' in result
    assert 'subcategory' in result['primary_topic']

Handling Multiple Prompts with Different Business Logic

Question: What if we have multiple prompts with different business logic?

You have several architectural options for handling multiple prompts (prompt1 with logic A, prompt2 with logic B, etc.):

def ask_claude_for_analysis(transcript: str, prompts: List[str]) -> dict:
    """
    Combine multiple prompt modules into a single system message
    """
    # Concatenate all prompts with clear sections
    combined_prompt = "\n\n".join([
        "=== CORE INSTRUCTIONS ===",
        prompts[0],  # 01-core.txt
        "\n=== DOMAIN KNOWLEDGE ===",
        prompts[1],  # 02-knowledge.txt
        "\n=== COACHING RULES ===",
        prompts[2],  # 03-coaching.txt
        "\n=== OUTPUT FORMAT ===",
        prompts[3]   # 04-output-instructions.txt
    ])

    # Single system message with all logic
    response = bedrock.converse(
        modelId='anthropic.claude-3-sonnet',
        system=[{"text": combined_prompt}],  # ALL business logic in system parameter
        messages=[
            {
                "role": "user",
                "content": [{"text": f"Transcript: {transcript}"}]
            }
        ],
        toolConfig={
            "tools": [{"toolSpec": tool_spec}]
        }
    )

Pros: Simple, deterministic, single inference call Cons: All logic evaluated even if not needed

Option 2: Multi-Turn Conversation (Progressive Refinement) [NOT FOR v1.0 - FUTURE CONSIDERATION]

def ask_claude_with_refinement(transcript: str, prompts: Dict[str, str]) -> dict:
    """
    Use conversation turns to progressively apply different logic
    Future consideration when onboarding clients from various industries
    """
    messages = []

    # Initial user message with transcript
    messages.append({
        "role": "user",
        "content": [{"text": f"Transcript: {transcript}"}]
    })

    # First pass - basic categorization
    response1 = bedrock.converse(
        system=[{"text": prompts["core"]}],  # Core prompt in system parameter
        messages=messages,
        toolConfig={"tools": [{"toolSpec": basic_tool_spec}]}
    )

    # Add response to conversation
    messages.append(response1['output']['message'])

    # Second pass - detailed analysis based on category
    if "cancellation" in response1:
        messages.append({
            "role": "user",
            "content": [{"text": prompts["cancellation_logic"]}]
        })
    elif "sales" in response1:
        messages.append({
            "role": "user",
            "content": [{"text": prompts["sales_logic"]}]
        })

    # Final tool call with refined logic
    response2 = bedrock.converse(
        system=[{"text": prompts["core"]}],  # System prompt remains same
        messages=messages,
        toolConfig={"tools": [{"toolSpec": detailed_tool_spec}]}
    )

Pros: Conditional logic, lower token usage Cons: Multiple API calls, more complex orchestration When to use: When we have multiple industries and need AI to determine call type first

Option 3: Multiple Tools for Different Logic (PREFERRED FUTURE DESIGN)

def ask_claude_with_multiple_tools(transcript: str, industry: str) -> dict:
    """
    Define different tools for different business logic
    THIS IS OUR PREFERRED FUTURE ARCHITECTURE
    """

    # Tool 1: Fitness Sales Analysis
    fitness_sales_tool = {
        "name": "analyze_fitness_sales",
        "description": "Use for gym intro bookings and memberships",
        "inputSchema": FitnessSalesAnalysis.model_json_schema()  # Different schema!
    }

    # Tool 2: Fitness Retention Analysis
    fitness_retention_tool = {
        "name": "analyze_fitness_retention",
        "description": "Use for membership cancellations and freezes",
        "inputSchema": FitnessRetentionAnalysis.model_json_schema()  # Different schema!
    }

    # Tool 3: Banking Service Analysis
    banking_service_tool = {
        "name": "analyze_banking_service",
        "description": "Use for account services and complaints",
        "inputSchema": BankingServiceAnalysis.model_json_schema()  # Totally different!
    }

    # Tool 4: Healthcare Appointment
    healthcare_appt_tool = {
        "name": "analyze_healthcare_appointment",
        "description": "Use for medical appointment scheduling",
        "inputSchema": HealthcareAppointment.model_json_schema()  # Industry-specific!
    }

    # Select tools based on industry
    if industry == "fitness":
        tools = [fitness_sales_tool, fitness_retention_tool]
    elif industry == "banking":
        tools = [banking_service_tool]
    elif industry == "healthcare":
        tools = [healthcare_appt_tool]

    # Let AI choose which tool based on content
    response = bedrock.converse(
        system=[{"text": f"You are analyzing a {industry} call. Choose the appropriate tool."}],
        messages=[
            {
                "role": "user",
                "content": [{"text": f"Transcript: {transcript}"}]
            }
        ],
        toolConfig={
            "tools": [{"toolSpec": tool} for tool in tools]
        }
    )

    # IMPORTANT: How Multiple Tools Work
    # - AI is presented with multiple tools in toolConfig
    # - AI analyzes the transcript and selects ONE tool to call
    # - Each tool has completely different schema/fields
    # - Only the selected tool's schema is populated and returned

    # Example: If AI determines this is a cancellation call:
    # - It will call ONLY "analyze_fitness_retention" tool
    # - Returns FitnessRetentionAnalysis schema fields
    # - Does NOT call or return data for sales tool

    tool_response = response['output']['message']['content'][0]['toolUse']
    tool_name = tool_response['name']
    tool_data = tool_response['input']

    return {
        "tool_used": tool_name,  # e.g., "analyze_fitness_retention"
        "analysis": tool_data     # Only fields from that specific tool's schema!
    }

    # Example Response Structures:
    # If fitness_sales_tool was called:
    # {
    #     "tool_used": "analyze_fitness_sales",
    #     "analysis": {
    #         "intro_type": "first_class",
    #         "membership_offered": "elite",
    #         "price_quoted": "$179",
    #         "credit_card_captured": "yes"
    #     }
    # }
    #
    # If fitness_retention_tool was called (DIFFERENT fields!):
    # {
    #     "tool_used": "analyze_fitness_retention",
    #     "analysis": {
    #         "cancellation_reason": "moving",
    #         "retention_attempted": "yes",
    #         "save_successful": "no",
    #         "final_date": "2025-02-15"
    #     }
    # }

Pros: - Clean separation of concerns - Each industry/call type has its own schema - AI intelligently selects the right tool - Scales well to many industries

Cons: - Need to maintain multiple Pydantic models - More complex to validate different schemas - Need to handle different output structures

Why this is preferred for future: - Each tool can have completely different fields (fitness has "membershiptype", banking has "accountnumber") - No wasted tokens analyzing irrelevant fields - Clear separation between industries

Option 4: Industry-Based Prompt Selection (CORRECTED - Simple v1.0)

def select_and_combine_prompts(client_config: dict) -> str:
    """
    Load prompts based on client's industry and business type
    v1.0 keeps it simple - just concatenate all for the industry
    """
    franchise = client_config.get("franchise")  # e.g., "orange-theory"
    site_id = client_config.get("site_id")      # e.g., "5736520"
    industry = client_config.get("industry", "fitness")  # From client-configurations DynamoDB

    # Load industry-specific prompts
    prompts = []

    if industry == "fitness":
        # Load fitness-specific modules
        prompts.append(load_from_s3(f"{franchise}/{site_id}/prompts/v6/01-core-fitness.txt"))
        prompts.append(load_from_s3(f"{franchise}/{site_id}/prompts/v6/02-knowledge-fitness.txt"))
        prompts.append(load_from_s3(f"{franchise}/{site_id}/prompts/v6/03-coaching-fitness.txt"))
        prompts.append(load_from_s3(f"{franchise}/{site_id}/prompts/v6/04-output-fitness.txt"))
    elif industry == "banking":
        # Future: Load banking-specific modules
        prompts.append(load_from_s3(f"{franchise}/{site_id}/prompts/v6/01-core-banking.txt"))
        prompts.append(load_from_s3(f"{franchise}/{site_id}/prompts/v6/02-knowledge-banking.txt"))
        # Banking doesn't need coaching module
        prompts.append(load_from_s3(f"{franchise}/{site_id}/prompts/v6/04-output-banking.txt"))

    # Simple concatenation - AI determines everything from transcript
    return "\n\n".join(prompts)

# Use the combined prompt - still single pass for v1.0
combined_prompt = select_and_combine_prompts(client_config)
result = ask_claude_for_analysis(transcript, combined_prompt)

Key Points: - We know the industry from client-configurations DynamoDB table - We DON'T know if it's a sales/retention/service call - AI determines this - v1.0: Single pass with all prompts concatenated - Future: Multiple tools with different schemas per industry

Recommendation for CloudAudioAI Evolution

Current State (v1.0): Option 1 - Simple Concatenation - Single client type (fitness/gym studios) - Concatenate all 4 prompt files - Single Pydantic schema (CallAnalysis) - Single pass AI call

Future State (v2.0+): Option 3 - Multiple Tools (PREFERRED) - Multiple industries (fitness, banking, healthcare) - Multiple tools with different schemas - AI selects appropriate tool based on transcript - Each tool returns industry-specific fields

Why NOT Multi-Turn for now: - We have sufficient context in prompts - Single pass is simpler and deterministic - Multi-turn might be considered for complex industries later

Evolution Path: 1. v1.0: Simple concatenation (current) 2. v1.1: Industry-based prompt selection (still single schema) 3. v2.0: Multiple tools with different schemas per industry (preferred future) 4. v3.0: Consider multi-turn only if needed for complex analysis

This aligns with your "minimum changes for maximum flexibility" philosophy.

Migration Path for Multiple Industries

Current State (Fitness Only)

class CallAnalysis(BaseModel):
    category: Literal["revenue_impacting", "service", "scheduling", "other"]
    # Fitness-specific fields

Future State (Multi-Industry)

# Dynamic model based on industry
def get_model_for_industry(industry: str):
    if industry == "fitness":
        return FitnessAnalysis
    elif industry == "healthcare":
        return HealthcareAnalysis
    elif industry == "banking":
        return BankingAnalysis

Configuration-Driven

# Load from client-configurations DynamoDB
client_config = get_client_config(client_id)
industry = client_config['industry']
AnalysisModel = get_model_for_industry(industry)
tool_spec = create_tool_spec(AnalysisModel)

Summary

The Magic Formula: - Pydantic = Defines the form (WHAT fields) - Tool Calling = Forces AI to fill the form (MUST comply) - Prompt = Contains business logic (HOW to decide) - Result = Guaranteed structured, validated output

This architecture ensures that regardless of how complex the business logic gets, the output is always structured, validated, and consistent.