Flow 3:Contact Analysis(contacts-analyzer)写入详情¶
contacts-analyzer Lambda 在客户 contact 累积新通话/消息/lead 后,跑 1 次 AI 调用得到客户状态 + per-task 决策(close/create/update),原子写入 contacts + tasks + contact_timeline。本文给写入字段映射 + 安全网 + 待办,供改 Lambda 行为或追溯字段所有权时参考。
触发: 6 个来源(见 总览) · 错误处理: 阻塞(SQS batchItemFailures,最多 3 次后进 DLQ) · 代码 SoT: callytics-infrastructure repo 的 lambda/contacts-analyzer/
实施溯源(2026-05-04 verified against source code)
权威设计: .claude/specs/2026-03-28-flow3-contact-analysis-design.md(AI-driven task decision 完整设计)
关键事实:
storeId已写入 contacts + tasks + contact_timelineaccount_idrename 完成(common migration 0016);下方设计中残留siteId字眼按accountId理解franchiseId从 contact 行读取,不硬编码writeAnalysisWithTasks()是唯一写入路径;旧writeAnalysis()+TASK_ENABLED_SITESfeature flag 已移除- Prompt 默认传 pending + closed tasks 给 AI
一、数据流概览¶
1 次 AI 调用 + 1 次 transaction 写入。AI 输出客户状态字段 + per-task 决策(close/create/update),系统执行写入。
6 触发源 → SQS {phone, storeId, source?}(FIFO,MessageGroupId = `phone#storeId`;PR #676 SQS contract 已从 `{phone, franchiseId, accountId}` 迁到 `{phone, storeId}`)
│
┌───────────▼────────────────────────────────────────┐
│ 读取 5 张表 │
│ contacts → summary + lifecycle + lastAnalysisAt │
│ calls/messages/leads → 增量数据 │
│ tasks → pending + closed (全量) │
│ │
│ 增量窗口: │
│ lastContactAnalysisAt = NULL → 30d 首次全量 │
│ 有值 → since lastContactAnalysisAt │
└───────────┬────────────────────────────────────────┘
│
┌───────────▼────────────────────────────────────────┐
│ 构建 Prompt → AI 调用 (1 次) │
│ → 客户状态字段 + taskDecisions[] (Zod 验证) │
└───────────┬────────────────────────────────────────┘
│
╔═══════════▼═══════════════════════════════════════╗
║ db.batch() — 1 次 HTTP,全成功 or 全回滚 ║
║ ║
║ ③ UPDATE contacts ← AI 字段 + system 字段 ║
║ ④ AI-driven task decisions: ║
║ 遍历 taskDecisions[],per-task 执行: ║
║ - close → UPDATE task (status+closeResult) ║
║ - create → INSERT 新 task(可多个) ║
║ - update → UPDATE task (priority/actions) ║
║ ⑤ INSERT timeline ← ④ 执行结果的审计记录 ║
╚═══════════════════════════════════════════════════╝
│
⑥ CloudWatch 日志
二、输入:什么数据喂给 AI¶
读 5 张表(contacts / calls / messages / leads / tasks),拼成 prompt 给 AI。增量窗口由 contacts 行的 lastContactAnalysisAt 决定。
2.1 contacts 表¶
| 字段 | 为什么读 |
|---|---|
customerSummary |
compressed memory — 上次分析的精华,AI 用它作为历史上下文 |
lastContactAnalysisAt |
增量读取的时间边界(NULL = 首次分析) |
leadStatus |
当前漏斗阶段 |
lifecycleStage |
lead / member / churned |
firstName, lastName |
客户身份 |
2.2 增量数据:calls + messages + leads¶
if (lastContactAnalysisAt === null) {
// 首次分析:读 30d 全量,建立基线 customerSummary
时间边界 = NOW() - 30d
} else {
// 后续:只读上次分析后的新数据
时间边界 = lastContactAnalysisAt
}
| 表 | 查询条件 | 读取字段 |
|---|---|---|
| calls | customerPhone=? AND franchiseId=? AND startTime > 时间边界(PR #676 起 reader 不再带 accountId) |
startTime, direction, duration, executiveSummary, primaryCategory, callState |
| messages | fromPhoneNumber=? AND franchiseId=? AND creationTime > 时间边界 |
creationTime, direction, type, subject |
| leads | phone=? AND franchiseId=? AND receivedAt > 时间边界(infra#518 已修复) |
firstName, lastName, receivedAt, leadType |
2.3 tasks 表¶
读取该 contact 的所有 tasks(per-contact 通常 1-5 条,数据量极小):
| 状态 | 读取字段 | AI 用来做什么 |
|---|---|---|
| pending | typeCategory, priority, dueAt, suggestedActions | 知道已有什么 task → 不重复建 |
| closed | typeCategory, closeResult, closeNote, closedAt | closeResult 最关键 — 员工说 "booked" → AI 建 bookednotconverted 跟进 |
2.4 Prompt 结构¶
System Prompt(静态,可缓存)
└─ 角色定义 + 输出格式要求
User Message(动态,per-contact)
├─ CUSTOMER PROFILE: phone, siteId, lifecycleStage, leadStatus
├─ PREVIOUS SUMMARY: customerSummary
├─ PENDING TASKS:
│ - [LEAD_OUTREACH] high priority, due in 2h
│ Suggested: "Call within SLA window"
├─ RECENTLY CLOSED TASKS:
│ - [LEAD_FOLLOW_UP] closed Mar 25 by Sarah
│ Result: booked | Note: "Customer said Thursday 3pm for trial"
├─ NEW CALLS (since last analysis):
│ - Mar 26 10:00 (inbound): "Customer called back, ready to sign up"
├─ NEW MESSAGES (since last analysis):
│ - Mar 25 16:00 (inbound): "I'll come Thursday for the trial"
└─ NEW LEADS (since last analysis):
(none)
当前 prompt 默认传入
tasks: { pending, closed },渲染为 PENDING TASKS / RECENTLY CLOSED TASKS section。TASK_ENABLED_SITESfeature flag 已移除。
2.5 跳过条件¶
- 如果 calls + messages + leads 全部为空 且 无 customerSummary → skip(无数据可分析)
2.6 customerSummary 准确性 — Prompt 要求¶
customerSummary 是 compressed memory:AI 每次读旧 summary + 新数据 → 重写(不追加)。准确度靠 prompt,system prompt 必须包含 3 条:
- Previous summary 是 context,不是绝对真理 — 新数据可以 override
- 保留关键细节 — dates / commitments / objections / complaints
- 区分确认 vs 可能 — "customer confirmed Thursday 3pm" vs "customer mentioned possibly coming"
为什么不保存历史 summary:
contact_timeline已记录精确事件历史,customerSummary 只需要"当前快照"。
三、AI 输出字段路由表 ★¶
AI 输出落到 contacts / tasks / contact_timeline 三张表;系统补充 timestamp / id / store 隔离字段。
3.1 AI 输出 → 3 张表¶
字段集由 ContactsAnalysisSchema Zod schema 定义。
客户状态字段(写入 contacts 表):
| # | AI 字段 | 类型 | → contacts | → timeline | 决定/触发 | Zod |
|---|---|---|---|---|---|---|
| 1 | customerSummary | text | ✅ | ✅ | ||
| 2 | leadStatus | text enum (12) | ✅ | ✅ | ||
| 3 | leadStatusReason | text | ✅ | ✅ | ||
| 4 | lifecycleStage | text enum (4) | ✅ | ✅ 6c new | 变了 → 写 6c 事件 | ✅ |
| 5 | lifecycleState | text enum (3) | ✅ | ✅ 6c new | 变了 → 写 6c 事件 | ✅ |
| 6 | purchaseIntent | text enum (3) | ✅ | ✅ | ||
| 7 | purchaseIntentReason | text | ✅ | ✅ | ||
| 8 | goals | jsonb | ✅ | ✅ | ||
| 9 | leadObjections | jsonb | ✅ | ✅ | ||
| 10 | leadRejectionReasons | jsonb | ✅ | ✅ | ||
| 11 | actionNeeded | boolean | ✅ | 信号字段,不直接触发 task | ✅ | |
| 12 | actionNeededReason | text | ✅ | ✅ | ||
| 13 | suggestedActions | jsonb | ✅ | contacts 级别建议(legacy 兼容) | ✅ | |
| 14 | typeCategory | text enum (9) | legacy 兼容,task 操作由 taskDecisions 驱动 | ✅ | ||
| 15 | priority | text enum (3) | legacy 兼容 | ✅ | ||
| 16 | doNotContact | boolean | ✅ | ✅ | ||
| 17 | hasOpenComplaint | boolean | ✅ | ✅ |
taskDecisions[](AI-driven task 操作,写入 tasks + timeline):
| action | → tasks | → timeline | 说明 |
|---|---|---|---|
close |
✅ UPDATE status→closed + closeResult + closeNote | ✅ task.status_changed | AI 从 11 值 closeResult 枚举选,reason → closeNote |
create |
✅ INSERT 新 task(可多个,不同 typeCategory) | ✅ task.created | onConflictDoNothing dedup |
update |
✅ UPDATE priority / suggestedActions / dueAt | ✅ task.updated | 同件事,情况变了 |
taskDecisions 驱动所有 task 操作:
actionNeeded留在 contacts 当信号字段,但 task 的 close / create / update 完全由taskDecisions[]决定,不再由actionNeeded=true触发。AI 能关任何 sourceType 的 task:不限
sourceType='contact_analysis',也能关sourceType='lead'的 lead_outreach task。typeCategory 9 值 + closeResult 11 值:见 tasks-field-design.md;Follow-up Task 生命周期:见 task-lifecycle.md。
3.2 System 补充字段(非 AI 输出)¶
| 字段 | 来源 | → contacts | → tasks | → timeline |
|---|---|---|---|---|
lastContactAnalysisAt |
NOW() |
✅ | ||
updatedAt |
NOW()(显式,$onUpdate 在 tx 内不生效) |
✅ | ✅ | |
taskId |
crypto.randomUUID()(create action) |
✅ | ✅ entityId | |
contactAnalysisRunId |
sqsRecord.messageId(SQS envelope 天然 UUID,直接使用) |
✅ | ||
taskType |
从 decision.typeCategory 派生:lead_outreach→自身,其他→follow_up |
✅ | ✅ newValue | |
sourceType |
'contact_analysis' 硬编码 |
✅ | ||
dueAt |
computeDueAt(priority)(high=4h / med=24h / low=72h) |
✅ | ||
status='closed' |
close action | ✅ | ✅ 6a newValue | |
closeType='auto_closed' |
close action | ✅ | ✅ 6a newValue | |
closedByStaffName='system' |
close action | ✅ | ||
closeResult |
AI 从 11 值枚举选(close action 的 decision.closeResult) |
✅ | ✅ 6a newValue | |
closeNote |
AI 写原因(close action 的 decision.reason) |
✅ | ✅ 6a newValue | |
storeId |
resolvePhoneIdentity(calls[0].from, calls[0].to) → storeId(fallback messages[0]) |
✅ | ✅ | ✅ |
storePhone |
同上,resolvePhoneIdentity 返回的 storePhone |
— contacts 无 store_phone 列 |
✅ | |
predecessorTaskId |
close→create 同 typeCategory 时记录前任 taskId | ✅ newValue | ||
leadTemperature |
computeTemperature() |
✅ | ⚠️ 待移除 #32 |
store_id 覆盖现状:8 张 Neon 表(contacts / calls / messages / leads / contacttimeline / staff / tasks / contactanalysis_runs)均具备
store_id列。contactsPK =(phone, store_id);staff.store_idNOT NULL;其余 6 表 nullable,writer 全写,老数据靠 backfill 脚本补齐。相关文档:store-level-isolation.md(终态 SoT)· infra#628(实施审计)。
3.3 Timeline 事件路由(Step ⑤)¶
Timeline 记录 ④ 中 taskDecisions 的执行结果(audit records),不是 AI 决策本身。
| 事件 | 触发条件 | oldValue | newValue |
|---|---|---|---|
6a task.status_changed |
taskDecisions[].action='close'(AI per-task 决策) | {status: 'pending'} |
{status: 'closed', closeType: 'auto_closed', closeResult: '<AI选择>', closeNote: '<AI原因>'} |
6b task.created |
taskDecisions[].action='create'(可多个,不同 typeCategory) | — | {taskType, typeCategory, priority} |
6b+ task.updated |
taskDecisions[].action='update'(优先级/建议更新) | {priority: old, suggestedActions: old} |
{priority: new, suggestedActions: new, reason: '<AI原因>'} |
6c contact.lifecycle_changed |
AI 输出 ≠ DB 旧值(AI vs DB diff) | {stage, state} DB READ |
{stage, state} AI |
6d contact.temperature_changed |
规则计算 ≠ DB 旧值(⚠️ 待移除) | {temp} DB READ |
{temp, reason} RULE |
四、执行流程¶
唯一路径是 writeAnalysisWithTasks(),所有触发走同一原子写入。底层用 db.batch() 不是 db.transaction() —— neon-http driver 的 db.transaction() 会直接 THROW;db.batch() 把所有 statement 打包成 1 次 HTTP,在 DB 内 BEGIN/COMMIT,全成功或全回滚。
4.1 AI 之后、batch 之前(后处理)¶
| 步骤 | 动作 |
|---|---|
派生 contactAnalysisRunId |
直接用 SQS envelope 的 messageId,天然 UUID |
派生 leadTemperature |
规则计算 ⚠️ 待移除 #32 |
| Pre-batch guard | 比对 AI 的 create 决策和现有 pending tasks,已有 typeCategory 不生成 INSERT statement(避免无意义的 onConflictDoNothing) |
4.2 遍历 taskDecisions[] 生成 statements¶
| action | 生成的 statement | 关键字段 |
|---|---|---|
close |
UPDATE tasks + INSERT timeline(task.status_changed) |
closeResult = AI 从 11 值枚举选;closeNote = AI 原因;closedByStaffName='system';WHERE 带 status='pending' guard 防关已关 |
create |
INSERT tasks + INSERT timeline(task.created) |
taskId = 新 UUID;taskType 从 typeCategory 派生(lead_outreach→自身,其他→follow_up);onConflictDoNothing 兜底 unique index |
update |
UPDATE tasks + INSERT timeline(task.updated) |
只 set 提供的字段(priority / suggestedActions / dueAt);priority 变了会重算 dueAt;WHERE 同样 pending guard |
4.3 batch 内 statement 顺序¶
- ③ UPDATE contacts(AI 字段 +
lastContactAnalysisAt=NOW()+updatedAt=NOW()) - ④ 所有 close/create/update statements + 对应 timeline INSERT
- ⑤ lifecycle 变了 → INSERT timeline(
contact.lifecycle_changed),没变就不加这条 statement
batch 后 ⑥ 输出 CloudWatch 日志:tasksClosedCount / tasksCreatedCount / tasksUpdatedCount / timelineEventsCount / contactAnalysisRunId / transactionDurationMs / source / isFirstAnalysis。
Statement 数量:最少 1 条(只有 ③,taskDecisions 为空);典型 3-5 条;最多 ~15 条(多个 close/create/update + 对应 timeline + lifecycle)。
⚠️ batch 内 3 条硬约束:
updatedAt必须显式sql\NOW()`($onUpdate在 batch 内不生效);ON CONFLICT DO NOTHING不导致 batch 回滚;不能在 batch 内用Promise.all`。
五、安全网¶
9 层保护防 AI 误决策 / 并发 / 重试 / 数据损坏。任一保护被绕过都需要 incident review。
| 保护层 | 机制 | 防什么 |
|---|---|---|
| Zod validation | AI 输出 → ContactsAnalysisSchema.parse() + TaskDecision discriminated union |
非法类型/枚举值/action 类型 |
uq_tasks_analysis_run_category |
(contactAnalysisRunId, typeCategory) WHERE NOT NULL |
SQS 重试重复 |
uq_tasks_pending_contact_category |
(contactPhone, franchiseId, accountId, typeCategory) WHERE pending ⚠️ 仍是老 3-key,store-level isolation 后未升级到 (phone, storeId, typeCategory) — 见 infra#776 |
并发重复(多 store 共享 phone 场景失效) |
chk_tasks_closed_integrity |
(pending → no closeType) OR (closed → has closeType) |
状态不一致 |
| Close/update WHERE guard | eq(tasks.status, 'pending') 在每个 close/update 的 WHERE 中 |
防止关/改已关的 task |
| AI 引用不存在的 taskId | UPDATE 影响 0 行,batch 继续 | 安全(无数据损坏),log warning 监控 |
| Create 预检查 | batch 前比对 AI 的 create 和现有 pending,已存在的 typeCategory 不生成 statements | 避免生成无意义的 onConflictDoNothing |
| Transaction rollback | 撞任一 index → 整个 tx 回滚 → SQS ack 成功 | 幂等保证 |
| FIFO SQS | 同一 contact 串行(MessageGroupId = phone#storeId,PR #676 起;cron + per-call 一致) |
防同 contact 并发分析 |
六、待加入 / 待移除¶
| 项目 | 状态 |
|---|---|
| Prompt 加 pending + closed tasks 数据 | ✅ 已落地,渲染为 PENDING TASKS / RECENTLY CLOSED TASKS section |
| Zod schema 加 taskDecisions + doNotContact + hasOpenComplaint | ✅ 已落地(TaskDecision discriminated union) |
writeAnalysisWithTasks() 新路径(AI-driven task decisions) |
✅ 已落地(唯一写入路径) |
| FIFO SQS queue(dailyBatchQueue Standard → FIFO) | ✅ 已落地,同 contact 串行,不同 contact 并行 |
| SMS 正文内容传给 AI | 未来 — 当前只传 direction/type/subject |
| ⚠️ 待移除 #32 — 从 timeline 到达时间实时计算 | |
| ⚠️ 随 temperature 一起移除 |