Flow 2:Per-Call Analysis(ai-analysis-processor)写入详情¶
Lambda: ai-analysis-processor(callytics-infrastructure) · 触发: SQS(transcribe-processor 完成后) · 错误处理: Neon batch 阻塞,SQS send 非阻塞
性能预期(SLA)¶
End-to-end p95 ≤ 5 分钟(recording_available → AI 分析完成、Neon 落库)。Runtime 24h 实测(2026-05-06 audit):95.7% < 5min,p95 = 4.48 min,max 19.33 min(异常 outlier ~1%)。Lambda compute 本身 p95 = 61s,主要 latency 来自 OneRouter AI 调用 + S3/SSM round-trip。
⚠️ 已知 gap — DDB / Neon 双写 reconciliation 缺失¶
DDB write(saveAnalysis / updateAnalysisFlags)在 Neon db.batch() 之前且独立。若 DDB 成功 + Neon 失败:SQS retry 时 checkExistingAnalysis 命中 DDB 已有 record → skip 整段 → Neon 永远拿不到这通通话。详见 infra#782。Migration 期间双写 reconciliation 决策见 #454 epic owner。
实施溯源(2026-05-04)
写入模式:全部 db.batch()(2026-03-28 Neon-primary 决策)。DDB 写入(call-analysis 表)是过渡期冗余,独立代码块,不参与 batch。
关键事实校对:
franchiseId从 SQS 上游 metadata 传入,不硬编码。P0 #632 修复防止把_client_id整串当 franchiseId 写入resolvePhoneIdentity经 PhoneStoreAssignments DDB 返回{ storePhone, contactPhone, storeId }- 下游 Contact Analysis trigger:non-blocking SQS send,需
contactPhone + storeId都解析成功才触发,失败由 daily cron 兜底 store_id已写入 contacts / calls / contact_timelinesite_id→account_idrename 已完成(common migration 0016)
timeline 双 writer 共存(Phase 1):2026-04-29 起 transcribe-processor 也写 timeline(call.status_changed),与本 Flow 的 call.created 互补。Phase 2(infra#683,≥4 周后)删 transcribe 的 call.created writer。详见 call-lifecycle-tracking.md § 6.3。
数据流概览¶
db.batch() 一次 HTTP 原子写 calls UPDATE + contacts UPSERT + timeline INSERT。DDB 写入在 batch 之外,SQS 发送在 batch 之后。
SQS: transcript + call metadata [CALL DATA]
│
┌───────────────▼───────────────┐
│ [DB READ] Neon (可失败) │
│ contacts → customerSummary │
│ calls → 24h summary │
│ messages → 24h SMS │
└───────────────┬───────────────┘
│
┌───────────────▼───────────────┐
│ [AI] Kimi/Claude (1 次调用) │
│ → 33 字段 per-call 输出 │
└───────────────┬───────────────┘
│
┌───────────────▼───────────────────┐
│ [DDB] 过渡期 — 将来整块删除 │
│ DDB call-analysis UPDATE │
│ DDB 其他 writes │
└───────────────┬───────────────────┘
│
╔═══════════════▼═══════════════════════════════════╗
║ db.batch() — 1 次 HTTP,原子 ║
║ ║
║ ① UPDATE calls (33 AI 字段) ║
║ ② UPSERT contacts (基础字段) ║
║ phone/names/lastActivityAt ║
║ ③ 不写 tasks (Per-Call 没有全量 context) ║
║ ④ INSERT timeline (call.created) 🆕 ║
║ occurredAt = call.startTime ║
╚═══════════════╤═══════════════════════════════════╝
│
┌───────────────▼───────────────┐
│ ⑤ [SYSTEM] SQS → P2 🆕 │
│ batch 外 — 成功后才发 │
│ 无条件触发 Contact Analysis │
│ source='per_call_analysis' │
│ 错误处理:非阻塞 │
└───────────────────────────────┘
读取数据¶
触发输入¶
| 来源 | 内容 |
|---|---|
| SQS 消息 | 通话 transcript + call metadata(来自 transcribe-processor) |
AI 分析前从 Neon 读取¶
从 Neon 读以下 3 张表(2026-05 起 customer-history reader 已切到 Neon),包含 customerSummary(DDB 没有)。
| 表 | 查询 | 读取字段 | 为什么读 |
|---|---|---|---|
contacts |
getContact(phone, franchiseId, siteId) |
customerSummary, lifecycleStage, leadStatus, firstName, lastName | 客户画像 + 阶段。customerSummary 是 Contact Analysis 综合分析的精华,有则读,无则跳过 |
calls |
最近 24 小时,按 contactPhone + franchiseId + siteId 过滤 | executiveSummary, direction, primaryCategory | 补上 customerSummary 发布后到现在的通话(Contact Analysis 还没覆盖的部分) |
messages |
最近 24 小时,按 contactPhone + franchiseId + siteId 过滤 | direction, subject | SMS 上下文,例如"客户昨天回了 I'll come Thursday" |
读取失败不阻塞 AI 分析(graceful degradation,跟当前 DDB 模式一致)。
AI Prompt 注入方式¶
历史上下文放 user message,不放 system prompt,保持 system prompt 可缓存:
CUSTOMER PROFILE:
Stage: lead | Status: attempted | Name: John Smith
Summary: "This lead has called 3 times about pricing..."
RECENT CALLS (last 24h):
- Mar 25 14:30 (inbound): "Customer asked about membership pricing"
- Mar 24 10:15 (outbound): "Left voicemail about trial class"
RECENT MESSAGES (last 24h):
- Mar 25 09:00 (inbound): "I'll come Thursday for the trial"
TRANSCRIPT:
[当前通话 transcript]
Per-Call Analysis AI 输出¶
Per-Call Analysis 分析单通电话(含上下文),输出 33 个字段写入 calls 表(不是 contacts 表):staffName / callState / customerType / confidence / topics / summary / follow-up / coaching feedback 等。
calls 表另有 2 个 AI 相关字段(original_staff_name + amendment_history)由 studio-api 在用户手动 amend 时写入,不属于 ai-analysis-processor 路径。详见 Step ①.5。
Step ①:UPDATE calls(33 AI 字段)¶
写入 33 个 AI 分析字段到 calls 表。
Step ①.5:UPDATE calls(2 个 staff amendment 字段)¶
由 studio-api 在用户手动修正 staff 归属时写入,不属于 ai-analysis-processor 路径。callytics-common PR #85 加列,studio-website-monorepo PR #289 / #290 加双写逻辑。
| 字段 | 类型 | 写入时机 | 说明 |
|---|---|---|---|
original_staff_name |
text (nullable) |
首次 amend 时写入 | 修改前的初始 staff 名,审计快照 |
amendment_history |
jsonb (nullable) |
每次 amend 追加 entry | 完整 audit trail:[{from, to, amendedBy, amendedAt, reason}] |
端点 PUT /v3/calls/:id/staff 双写 DDB + Neon。PUT /v2/calls/:id 已恢复纯 DDB,不再写 Neon(v2/v3 解耦)。
Step ②:UPSERT contacts(基础字段)¶
只写基础字段,不写任何 AI 判断字段(AI 判断由 Contact Analysis 负责,见本节末尾)。
INSERT 时写入¶
| 字段 | 值 | 说明 |
|---|---|---|
phone |
call.customerPhone | PK 一部分。PK 改为 (phone, store_id),common migration 0014 |
franchiseId |
从 SQS 上游 metadata 传入 | 不硬编码。P0 #632 fix 阻止把 _client_id 整串当 franchiseId 写入 |
accountId |
SQS metadata.siteid,写入时映射到 accountid 列 | 旧名 siteId,common migration 0016 改名 account_id |
storeId |
resolvePhoneIdentity().storeId,PR #642 / #647 起写入 | PK 一部分 |
lastActivityAt |
NOW() |
|
firstName |
AI 提取(可空) | 仅当 AI 识别到名字 |
lastName |
AI 提取(可空) |
firstAttemptedAt/firstConnectedAt字段已从 schema 移除(callytics-common#32 完成),由contact_timeline.call.created事件替代。code 不再写。
ON CONFLICT (phone, store_id) DO UPDATE¶
PK 由 (phone, franchise_id, site_id) 改为 (phone, store_id),common migration 0014。
| 字段 | 值 | 说明 |
|---|---|---|
lastActivityAt |
NOW() |
无条件覆盖,每次互动都更新 |
firstName |
新值(如果有) | 覆盖 |
lastName |
新值(如果有) | 覆盖 |
accountId |
SQS metadata.siteid,写入时映射到 accountid 列 | 覆盖,保持新鲜 |
updatedAt |
NOW() |
Per-Call Analysis 不写的字段(Contact Analysis 所有权)¶
以下字段全部由 Contact Analysis 负责,Per-Call Analysis 不碰:
customerSummary— Contact Analysis 综合分析leadStatus,leadStatusReason— Contact Analysis AI 判断— Contact Analysis 规则计算,待移除(#32)leadTemperature,leadTemperatureReasonlifecycleStage,lifecycleState— Contact Analysis AI 判断purchaseIntent,goals,leadObjections,leadRejectionReasons— Contact Analysis AI 判断actionNeeded,actionNeededReason,suggestedActions— Contact Analysis AI 判断lastContactAnalysisAt— Contact Analysis 追踪字段
Step ③:不写 tasks¶
Per-Call Analysis 只看单通电话 + 24h 上下文,没有全量 context,不应创建 task。Task 创建由 Contact Analysis(综合分析后)或 lead-tracking(新 Lead SLA)负责。
Step ④:INSERT contact_timeline¶
| 字段 | 值 |
|---|---|
contactPhone |
call.customerPhone |
franchiseId |
从 SQS 上游 metadata 传入(同 Step ② contacts) |
accountId |
SQS metadata.siteid,写入时映射到 accountid 列(旧名 site_id) |
storeId |
resolvePhoneIdentity().storeId,PR #642 / #647 起写入 |
eventType |
'call.created' |
eventCategory |
'call' |
entityType |
'call' |
entityId |
call.telephonySessionId |
newValue |
{callDirection, staffName, duration} |
occurredAt |
call.startTime(事件时间,非 NOW())— 与 transcribe-processor 的 call.created writer 一致 |
actorType |
'call_analysis' |
Phase 2 (infra#683) 完成后:
transcribe-processor删除其call.createdwriter,留下ai-analysis这一行作为唯一call.created事件,occurredAt = call.startTime不变(用真实通话开始时间,不是 AI 完成时间)。
错误处理:在 db.batch() 内原子写入。失败 → 整个 batch 回滚 → SQS batchItemFailures 重试。
Step ⑤:SQS → Contact Analysis¶
每通电话无条件触发 Contact Analysis re-analysis。dailyBatchQueue 是 FIFO,SendMessageCommand 必须带 MessageGroupId 和 MessageDeduplicationId。
消息体字段:
| 字段 | 说明 |
|---|---|
phone |
contacts PK 组件 |
storeId |
contacts PK 组件(post-migration 0014) |
source |
'per_call_analysis' |
callId / telephonySessionId |
observability 追踪字段 |
analysisCompletedAt / queuedAt |
时序 metric 字段 |
MessageGroupId 用 ${phone}#${storeId},同一 contact 串行处理。MessageDeduplicationId 用 per_call_analysis-${phone}-${callId},不同通话不被合并。
Body schema 约定:消息体只携带 phone + storeId,不再传 franchiseId / accountId。Consumer(contacts-analyzer)从 contact 行通过 PK lookup 自行读取 franchiseId / accountId。
触发条件:contactPhone + storeId 都解析成功才触发。storeId 未解析时跳过本次触发,daily cron 兜底。SQS send 失败 log warn,不阻塞主流程。
V1 不做 Cooldown。uq_tasks_pending_contact_category DB index 保护数据完整性,duplicate task 在 DB 层拦截。