跳转至

Flow 2:Per-Call Analysis(ai-analysis-processor)写入详情

Lambda: ai-analysis-processor(callytics-infrastructure) · 触发: SQS(transcribe-processor 完成后) · 错误处理: Neon batch 阻塞,SQS send 非阻塞

性能预期(SLA)

End-to-end p95 ≤ 5 分钟(recording_available → AI 分析完成、Neon 落库)。Runtime 24h 实测(2026-05-06 audit):95.7% < 5min,p95 = 4.48 min,max 19.33 min(异常 outlier ~1%)。Lambda compute 本身 p95 = 61s,主要 latency 来自 OneRouter AI 调用 + S3/SSM round-trip。

⚠️ 已知 gap — DDB / Neon 双写 reconciliation 缺失

DDB write(saveAnalysis / updateAnalysisFlags)在 Neon db.batch() 之前且独立。若 DDB 成功 + Neon 失败:SQS retry 时 checkExistingAnalysis 命中 DDB 已有 record → skip 整段 → Neon 永远拿不到这通通话。详见 infra#782。Migration 期间双写 reconciliation 决策见 #454 epic owner。

实施溯源(2026-05-04)

写入模式:全部 db.batch()(2026-03-28 Neon-primary 决策)。DDB 写入(call-analysis 表)是过渡期冗余,独立代码块,不参与 batch。

关键事实校对:

  • franchiseId 从 SQS 上游 metadata 传入,不硬编码。P0 #632 修复防止把 _client_id 整串当 franchiseId 写入
  • resolvePhoneIdentity 经 PhoneStoreAssignments DDB 返回 { storePhone, contactPhone, storeId }
  • 下游 Contact Analysis trigger:non-blocking SQS send,需 contactPhone + storeId 都解析成功才触发,失败由 daily cron 兜底
  • store_id 已写入 contacts / calls / contact_timeline
  • site_idaccount_id rename 已完成(common migration 0016)

timeline 双 writer 共存(Phase 1):2026-04-29 起 transcribe-processor 也写 timeline(call.status_changed),与本 Flow 的 call.created 互补。Phase 2(infra#683,≥4 周后)删 transcribe 的 call.created writer。详见 call-lifecycle-tracking.md § 6.3


数据流概览

db.batch() 一次 HTTP 原子写 calls UPDATE + contacts UPSERT + timeline INSERT。DDB 写入在 batch 之外,SQS 发送在 batch 之后。

  SQS: transcript + call metadata [CALL DATA]
    ┌───────────────▼───────────────┐
    │ [DB READ] Neon (可失败)        │
    │   contacts → customerSummary   │
    │   calls    → 24h summary       │
    │   messages → 24h SMS           │
    └───────────────┬───────────────┘
    ┌───────────────▼───────────────┐
    │ [AI] Kimi/Claude (1 次调用)    │
    │ → 33 字段 per-call 输出        │
    └───────────────┬───────────────┘
    ┌───────────────▼───────────────────┐
    │ [DDB] 过渡期 — 将来整块删除       │
    │   DDB call-analysis UPDATE        │
    │   DDB 其他 writes                 │
    └───────────────┬───────────────────┘
    ╔═══════════════▼═══════════════════════════════════╗
    ║  db.batch() — 1 次 HTTP,原子                     ║
    ║                                                   ║
    ║  ① UPDATE calls (33 AI 字段)                      ║
    ║  ② UPSERT contacts (基础字段)                     ║
    ║     phone/names/lastActivityAt                    ║
    ║  ③ 不写 tasks (Per-Call 没有全量 context)          ║
    ║  ④ INSERT timeline (call.created) 🆕              ║
    ║     occurredAt = call.startTime                   ║
    ╚═══════════════╤═══════════════════════════════════╝
    ┌───────────────▼───────────────┐
    │ ⑤ [SYSTEM] SQS → P2  🆕       │
    │   batch 外 — 成功后才发        │
    │   无条件触发 Contact Analysis  │
    │   source='per_call_analysis'    │
    │   错误处理:非阻塞             │
    └───────────────────────────────┘

读取数据

触发输入

来源 内容
SQS 消息 通话 transcript + call metadata(来自 transcribe-processor)

AI 分析前从 Neon 读取

从 Neon 读以下 3 张表(2026-05 起 customer-history reader 已切到 Neon),包含 customerSummary(DDB 没有)。

查询 读取字段 为什么读
contacts getContact(phone, franchiseId, siteId) customerSummary, lifecycleStage, leadStatus, firstName, lastName 客户画像 + 阶段。customerSummary 是 Contact Analysis 综合分析的精华,有则读,无则跳过
calls 最近 24 小时,按 contactPhone + franchiseId + siteId 过滤 executiveSummary, direction, primaryCategory 补上 customerSummary 发布后到现在的通话(Contact Analysis 还没覆盖的部分)
messages 最近 24 小时,按 contactPhone + franchiseId + siteId 过滤 direction, subject SMS 上下文,例如"客户昨天回了 I'll come Thursday"

读取失败不阻塞 AI 分析(graceful degradation,跟当前 DDB 模式一致)。

AI Prompt 注入方式

历史上下文放 user message,不放 system prompt,保持 system prompt 可缓存:

CUSTOMER PROFILE:
  Stage: lead | Status: attempted | Name: John Smith
  Summary: "This lead has called 3 times about pricing..."

RECENT CALLS (last 24h):
  - Mar 25 14:30 (inbound): "Customer asked about membership pricing"
  - Mar 24 10:15 (outbound): "Left voicemail about trial class"

RECENT MESSAGES (last 24h):
  - Mar 25 09:00 (inbound): "I'll come Thursday for the trial"

TRANSCRIPT:
  [当前通话 transcript]

Per-Call Analysis AI 输出

Per-Call Analysis 分析单通电话(含上下文),输出 33 个字段写入 calls 表(不是 contacts 表):staffName / callState / customerType / confidence / topics / summary / follow-up / coaching feedback 等。

calls 表另有 2 个 AI 相关字段(original_staff_name + amendment_history)由 studio-api 在用户手动 amend 时写入,不属于 ai-analysis-processor 路径。详见 Step ①.5


Step ①:UPDATE calls(33 AI 字段)

写入 33 个 AI 分析字段到 calls 表。


Step ①.5:UPDATE calls(2 个 staff amendment 字段)

由 studio-api 在用户手动修正 staff 归属时写入,不属于 ai-analysis-processor 路径。callytics-common PR #85 加列,studio-website-monorepo PR #289 / #290 加双写逻辑。

字段 类型 写入时机 说明
original_staff_name text (nullable) 首次 amend 时写入 修改前的初始 staff 名,审计快照
amendment_history jsonb (nullable) 每次 amend 追加 entry 完整 audit trail:[{from, to, amendedBy, amendedAt, reason}]

端点 PUT /v3/calls/:id/staff 双写 DDB + Neon。PUT /v2/calls/:id 已恢复纯 DDB,不再写 Neon(v2/v3 解耦)。


Step ②:UPSERT contacts(基础字段)

只写基础字段,不写任何 AI 判断字段(AI 判断由 Contact Analysis 负责,见本节末尾)。

INSERT 时写入

字段 说明
phone call.customerPhone PK 一部分。PK 改为 (phone, store_id),common migration 0014
franchiseId 从 SQS 上游 metadata 传入 不硬编码。P0 #632 fix 阻止把 _client_id 整串当 franchiseId 写入
accountId SQS metadata.siteid,写入时映射到 accountid 列 旧名 siteId,common migration 0016 改名 account_id
storeId resolvePhoneIdentity().storeId,PR #642 / #647 起写入 PK 一部分
lastActivityAt NOW()
firstName AI 提取(可空) 仅当 AI 识别到名字
lastName AI 提取(可空)

firstAttemptedAt / firstConnectedAt 字段已从 schema 移除(callytics-common#32 完成),由 contact_timeline.call.created 事件替代。code 不再写。

ON CONFLICT (phone, store_id) DO UPDATE

PK 由 (phone, franchise_id, site_id) 改为 (phone, store_id),common migration 0014。

字段 说明
lastActivityAt NOW() 无条件覆盖,每次互动都更新
firstName 新值(如果有) 覆盖
lastName 新值(如果有) 覆盖
accountId SQS metadata.siteid,写入时映射到 accountid 列 覆盖,保持新鲜
updatedAt NOW()

Per-Call Analysis 不写的字段(Contact Analysis 所有权)

以下字段全部由 Contact Analysis 负责,Per-Call Analysis 不碰:

  • customerSummary — Contact Analysis 综合分析
  • leadStatus, leadStatusReason — Contact Analysis AI 判断
  • leadTemperature, leadTemperatureReason — Contact Analysis 规则计算,待移除(#32)
  • lifecycleStage, lifecycleState — Contact Analysis AI 判断
  • purchaseIntent, goals, leadObjections, leadRejectionReasons — Contact Analysis AI 判断
  • actionNeeded, actionNeededReason, suggestedActions — Contact Analysis AI 判断
  • lastContactAnalysisAt — Contact Analysis 追踪字段

Step ③:不写 tasks

Per-Call Analysis 只看单通电话 + 24h 上下文,没有全量 context,不应创建 task。Task 创建由 Contact Analysis(综合分析后)或 lead-tracking(新 Lead SLA)负责。


Step ④:INSERT contact_timeline

字段
contactPhone call.customerPhone
franchiseId 从 SQS 上游 metadata 传入(同 Step ② contacts)
accountId SQS metadata.siteid,写入时映射到 accountid 列(旧名 site_id)
storeId resolvePhoneIdentity().storeId,PR #642 / #647 起写入
eventType 'call.created'
eventCategory 'call'
entityType 'call'
entityId call.telephonySessionId
newValue {callDirection, staffName, duration}
occurredAt call.startTime(事件时间,非 NOW())— 与 transcribe-processorcall.created writer 一致
actorType 'call_analysis'

Phase 2 (infra#683) 完成后:transcribe-processor 删除其 call.created writer,留下 ai-analysis 这一行作为唯一 call.created 事件,occurredAt = call.startTime 不变(用真实通话开始时间,不是 AI 完成时间)。

错误处理:在 db.batch() 内原子写入。失败 → 整个 batch 回滚 → SQS batchItemFailures 重试。


Step ⑤:SQS → Contact Analysis

每通电话无条件触发 Contact Analysis re-analysis。dailyBatchQueue 是 FIFO,SendMessageCommand 必须带 MessageGroupIdMessageDeduplicationId

消息体字段:

字段 说明
phone contacts PK 组件
storeId contacts PK 组件(post-migration 0014)
source 'per_call_analysis'
callId / telephonySessionId observability 追踪字段
analysisCompletedAt / queuedAt 时序 metric 字段

MessageGroupId${phone}#${storeId},同一 contact 串行处理。MessageDeduplicationIdper_call_analysis-${phone}-${callId},不同通话不被合并。

Body schema 约定:消息体只携带 phone + storeId,不再传 franchiseId / accountId。Consumer(contacts-analyzer)从 contact 行通过 PK lookup 自行读取 franchiseId / accountId。

触发条件:contactPhone + storeId 都解析成功才触发。storeId 未解析时跳过本次触发,daily cron 兜底。SQS send 失败 log warn,不阻塞主流程。

V1 不做 Cooldown。uq_tasks_pending_contact_category DB index 保护数据完整性,duplicate task 在 DB 层拦截。