跳转至

Flow 3:Contact Analysis(contacts-analyzer)写入详情

contacts-analyzer Lambda 在客户 contact 累积新通话/消息/lead 后,跑 1 次 AI 调用得到客户状态 + per-task 决策(close/create/update),原子写入 contacts + tasks + contact_timeline。本文给写入字段映射 + 安全网 + 待办,供改 Lambda 行为或追溯字段所有权时参考。

触发: 6 个来源(见 总览) · 错误处理: 阻塞(SQS batchItemFailures,最多 3 次后进 DLQ) · 代码 SoT: callytics-infrastructure repo 的 lambda/contacts-analyzer/

实施溯源(2026-05-04 verified against source code)

权威设计: .claude/specs/2026-03-28-flow3-contact-analysis-design.md(AI-driven task decision 完整设计)

关键事实:

  • storeId 已写入 contacts + tasks + contact_timeline
  • account_id rename 完成(common migration 0016);下方设计中残留 siteId 字眼按 accountId 理解
  • franchiseId 从 contact 行读取,不硬编码
  • writeAnalysisWithTasks() 是唯一写入路径;旧 writeAnalysis() + TASK_ENABLED_SITES feature flag 已移除
  • Prompt 默认传 pending + closed tasks 给 AI

一、数据流概览

1 次 AI 调用 + 1 次 transaction 写入。AI 输出客户状态字段 + per-task 决策(close/create/update),系统执行写入。

  6 触发源 → SQS {phone, storeId, source?}(FIFO,MessageGroupId = `phone#storeId`;PR #676 SQS contract 已从 `{phone, franchiseId, accountId}` 迁到 `{phone, storeId}`)
    ┌───────────▼────────────────────────────────────────┐
    │  读取 5 张表                                        │
    │    contacts → summary + lifecycle + lastAnalysisAt  │
    │    calls/messages/leads → 增量数据                   │
    │    tasks → pending + closed (全量)                   │
    │                                                     │
    │  增量窗口:                                           │
    │    lastContactAnalysisAt = NULL → 30d 首次全量       │
    │    有值 → since lastContactAnalysisAt               │
    └───────────┬────────────────────────────────────────┘
    ┌───────────▼────────────────────────────────────────┐
    │  构建 Prompt → AI 调用 (1 次)                       │
    │  → 客户状态字段 + taskDecisions[] (Zod 验证)        │
    └───────────┬────────────────────────────────────────┘
    ╔═══════════▼═══════════════════════════════════════╗
    ║  db.batch() — 1 次 HTTP,全成功 or 全回滚          ║
    ║                                                   ║
    ║  ③ UPDATE contacts       ← AI 字段 + system 字段  ║
    ║  ④ AI-driven task decisions:                      ║
    ║     遍历 taskDecisions[],per-task 执行:           ║
    ║     - close  → UPDATE task (status+closeResult)   ║
    ║     - create → INSERT 新 task(可多个)             ║
    ║     - update → UPDATE task (priority/actions)     ║
    ║  ⑤ INSERT timeline       ← ④ 执行结果的审计记录   ║
    ╚═══════════════════════════════════════════════════╝
    ⑥ CloudWatch 日志

二、输入:什么数据喂给 AI

读 5 张表(contacts / calls / messages / leads / tasks),拼成 prompt 给 AI。增量窗口由 contacts 行的 lastContactAnalysisAt 决定。

2.1 contacts 表

字段 为什么读
customerSummary compressed memory — 上次分析的精华,AI 用它作为历史上下文
lastContactAnalysisAt 增量读取的时间边界(NULL = 首次分析)
leadStatus 当前漏斗阶段
lifecycleStage lead / member / churned
firstName, lastName 客户身份

2.2 增量数据:calls + messages + leads

if (lastContactAnalysisAt === null) {
  // 首次分析:读 30d 全量,建立基线 customerSummary
  时间边界 = NOW() - 30d
} else {
  // 后续:只读上次分析后的新数据
  时间边界 = lastContactAnalysisAt
}
查询条件 读取字段
calls customerPhone=? AND franchiseId=? AND startTime > 时间边界(PR #676 起 reader 不再带 accountId) startTime, direction, duration, executiveSummary, primaryCategory, callState
messages fromPhoneNumber=? AND franchiseId=? AND creationTime > 时间边界 creationTime, direction, type, subject
leads phone=? AND franchiseId=? AND receivedAt > 时间边界(infra#518 已修复) firstName, lastName, receivedAt, leadType

2.3 tasks 表

读取该 contact 的所有 tasks(per-contact 通常 1-5 条,数据量极小):

状态 读取字段 AI 用来做什么
pending typeCategory, priority, dueAt, suggestedActions 知道已有什么 task → 不重复建
closed typeCategory, closeResult, closeNote, closedAt closeResult 最关键 — 员工说 "booked" → AI 建 bookednotconverted 跟进

2.4 Prompt 结构

System Prompt(静态,可缓存)
  └─ 角色定义 + 输出格式要求

User Message(动态,per-contact)
  ├─ CUSTOMER PROFILE: phone, siteId, lifecycleStage, leadStatus
  ├─ PREVIOUS SUMMARY: customerSummary
  ├─ PENDING TASKS:
  │     - [LEAD_OUTREACH] high priority, due in 2h
  │       Suggested: "Call within SLA window"
  ├─ RECENTLY CLOSED TASKS:
  │     - [LEAD_FOLLOW_UP] closed Mar 25 by Sarah
  │       Result: booked | Note: "Customer said Thursday 3pm for trial"
  ├─ NEW CALLS (since last analysis):
  │     - Mar 26 10:00 (inbound): "Customer called back, ready to sign up"
  ├─ NEW MESSAGES (since last analysis):
  │     - Mar 25 16:00 (inbound): "I'll come Thursday for the trial"
  └─ NEW LEADS (since last analysis):
        (none)

当前 prompt 默认传入 tasks: { pending, closed },渲染为 PENDING TASKS / RECENTLY CLOSED TASKS section。TASK_ENABLED_SITES feature flag 已移除。

2.5 跳过条件

  • 如果 calls + messages + leads 全部为空 无 customerSummary → skip(无数据可分析)

2.6 customerSummary 准确性 — Prompt 要求

customerSummary 是 compressed memory:AI 每次读旧 summary + 新数据 → 重写(不追加)。准确度靠 prompt,system prompt 必须包含 3 条:

  1. Previous summary 是 context,不是绝对真理 — 新数据可以 override
  2. 保留关键细节 — dates / commitments / objections / complaints
  3. 区分确认 vs 可能 — "customer confirmed Thursday 3pm" vs "customer mentioned possibly coming"

为什么不保存历史 summary:contact_timeline 已记录精确事件历史,customerSummary 只需要"当前快照"。


三、AI 输出字段路由表 ★

AI 输出落到 contacts / tasks / contact_timeline 三张表;系统补充 timestamp / id / store 隔离字段。

3.1 AI 输出 → 3 张表

字段集由 ContactsAnalysisSchema Zod schema 定义。

客户状态字段(写入 contacts 表):

# AI 字段 类型 → contacts → timeline 决定/触发 Zod
1 customerSummary text
2 leadStatus text enum (12)
3 leadStatusReason text
4 lifecycleStage text enum (4) ✅ 6c new 变了 → 写 6c 事件
5 lifecycleState text enum (3) ✅ 6c new 变了 → 写 6c 事件
6 purchaseIntent text enum (3)
7 purchaseIntentReason text
8 goals jsonb
9 leadObjections jsonb
10 leadRejectionReasons jsonb
11 actionNeeded boolean 信号字段,不直接触发 task
12 actionNeededReason text
13 suggestedActions jsonb contacts 级别建议(legacy 兼容)
14 typeCategory text enum (9) legacy 兼容,task 操作由 taskDecisions 驱动
15 priority text enum (3) legacy 兼容
16 doNotContact boolean
17 hasOpenComplaint boolean

taskDecisions[](AI-driven task 操作,写入 tasks + timeline):

action → tasks → timeline 说明
close ✅ UPDATE status→closed + closeResult + closeNote ✅ task.status_changed AI 从 11 值 closeResult 枚举选,reason → closeNote
create ✅ INSERT 新 task(可多个,不同 typeCategory) ✅ task.created onConflictDoNothing dedup
update ✅ UPDATE priority / suggestedActions / dueAt ✅ task.updated 同件事,情况变了

taskDecisions 驱动所有 task 操作:actionNeeded 留在 contacts 当信号字段,但 task 的 close / create / update 完全由 taskDecisions[] 决定,不再由 actionNeeded=true 触发。

AI 能关任何 sourceType 的 task:不限 sourceType='contact_analysis',也能关 sourceType='lead' 的 lead_outreach task。

typeCategory 9 值 + closeResult 11 值:见 tasks-field-design.md;Follow-up Task 生命周期:见 task-lifecycle.md

3.2 System 补充字段(非 AI 输出)

字段 来源 → contacts → tasks → timeline
lastContactAnalysisAt NOW()
updatedAt NOW()(显式,$onUpdate 在 tx 内不生效)
taskId crypto.randomUUID()(create action) ✅ entityId
contactAnalysisRunId sqsRecord.messageId(SQS envelope 天然 UUID,直接使用)
taskType decision.typeCategory 派生:lead_outreach→自身,其他→follow_up ✅ newValue
sourceType 'contact_analysis' 硬编码
dueAt computeDueAt(priority)(high=4h / med=24h / low=72h)
status='closed' close action ✅ 6a newValue
closeType='auto_closed' close action ✅ 6a newValue
closedByStaffName='system' close action
closeResult AI 从 11 值枚举选(close action 的 decision.closeResult) ✅ 6a newValue
closeNote AI 写原因(close action 的 decision.reason) ✅ 6a newValue
storeId resolvePhoneIdentity(calls[0].from, calls[0].to) → storeId(fallback messages[0])
storePhone 同上,resolvePhoneIdentity 返回的 storePhone — contacts 无 store_phone
predecessorTaskId close→create 同 typeCategory 时记录前任 taskId ✅ newValue
leadTemperature computeTemperature() ⚠️ 待移除 #32

store_id 覆盖现状:8 张 Neon 表(contacts / calls / messages / leads / contacttimeline / staff / tasks / contactanalysis_runs)均具备 store_id 列。contacts PK = (phone, store_id);staff.store_id NOT NULL;其余 6 表 nullable,writer 全写,老数据靠 backfill 脚本补齐。

相关文档:store-level-isolation.md(终态 SoT)· infra#628(实施审计)。

3.3 Timeline 事件路由(Step ⑤)

Timeline 记录 ④ 中 taskDecisions 的执行结果(audit records),不是 AI 决策本身。

事件 触发条件 oldValue newValue
6a task.status_changed taskDecisions[].action='close'(AI per-task 决策) {status: 'pending'} {status: 'closed', closeType: 'auto_closed', closeResult: '<AI选择>', closeNote: '<AI原因>'}
6b task.created taskDecisions[].action='create'(可多个,不同 typeCategory) {taskType, typeCategory, priority}
6b+ task.updated taskDecisions[].action='update'(优先级/建议更新) {priority: old, suggestedActions: old} {priority: new, suggestedActions: new, reason: '<AI原因>'}
6c contact.lifecycle_changed AI 输出 ≠ DB 旧值(AI vs DB diff) {stage, state} DB READ {stage, state} AI
6d contact.temperature_changed 规则计算 ≠ DB 旧值(⚠️ 待移除) {temp} DB READ {temp, reason} RULE

四、执行流程

唯一路径是 writeAnalysisWithTasks(),所有触发走同一原子写入。底层用 db.batch() 不是 db.transaction() —— neon-http driver 的 db.transaction() 会直接 THROW;db.batch() 把所有 statement 打包成 1 次 HTTP,在 DB 内 BEGIN/COMMIT,全成功或全回滚。

4.1 AI 之后、batch 之前(后处理)

步骤 动作
派生 contactAnalysisRunId 直接用 SQS envelope 的 messageId,天然 UUID
派生 leadTemperature 规则计算 ⚠️ 待移除 #32
Pre-batch guard 比对 AI 的 create 决策和现有 pending tasks,已有 typeCategory 不生成 INSERT statement(避免无意义的 onConflictDoNothing)

4.2 遍历 taskDecisions[] 生成 statements

action 生成的 statement 关键字段
close UPDATE tasks + INSERT timeline(task.status_changed) closeResult = AI 从 11 值枚举选;closeNote = AI 原因;closedByStaffName='system';WHERE 带 status='pending' guard 防关已关
create INSERT tasks + INSERT timeline(task.created) taskId = 新 UUID;taskTypetypeCategory 派生(lead_outreach→自身,其他→follow_up);onConflictDoNothing 兜底 unique index
update UPDATE tasks + INSERT timeline(task.updated) 只 set 提供的字段(priority / suggestedActions / dueAt);priority 变了会重算 dueAt;WHERE 同样 pending guard

4.3 batch 内 statement 顺序

  1. UPDATE contacts(AI 字段 + lastContactAnalysisAt=NOW() + updatedAt=NOW())
  2. 所有 close/create/update statements + 对应 timeline INSERT
  3. lifecycle 变了 → INSERT timeline(contact.lifecycle_changed),没变就不加这条 statement

batch 后 ⑥ 输出 CloudWatch 日志:tasksClosedCount / tasksCreatedCount / tasksUpdatedCount / timelineEventsCount / contactAnalysisRunId / transactionDurationMs / source / isFirstAnalysis

Statement 数量:最少 1 条(只有 ③,taskDecisions 为空);典型 3-5 条;最多 ~15 条(多个 close/create/update + 对应 timeline + lifecycle)。

⚠️ batch 内 3 条硬约束:updatedAt 必须显式 sql\NOW()`($onUpdate在 batch 内不生效);ON CONFLICT DO NOTHING不导致 batch 回滚;不能在 batch 内用Promise.all`。


五、安全网

9 层保护防 AI 误决策 / 并发 / 重试 / 数据损坏。任一保护被绕过都需要 incident review。

保护层 机制 防什么
Zod validation AI 输出 → ContactsAnalysisSchema.parse() + TaskDecision discriminated union 非法类型/枚举值/action 类型
uq_tasks_analysis_run_category (contactAnalysisRunId, typeCategory) WHERE NOT NULL SQS 重试重复
uq_tasks_pending_contact_category (contactPhone, franchiseId, accountId, typeCategory) WHERE pending ⚠️ 仍是老 3-key,store-level isolation 后未升级到 (phone, storeId, typeCategory) — 见 infra#776 并发重复(多 store 共享 phone 场景失效)
chk_tasks_closed_integrity (pending → no closeType) OR (closed → has closeType) 状态不一致
Close/update WHERE guard eq(tasks.status, 'pending') 在每个 close/update 的 WHERE 中 防止关/改已关的 task
AI 引用不存在的 taskId UPDATE 影响 0 行,batch 继续 安全(无数据损坏),log warning 监控
Create 预检查 batch 前比对 AI 的 create 和现有 pending,已存在的 typeCategory 不生成 statements 避免生成无意义的 onConflictDoNothing
Transaction rollback 撞任一 index → 整个 tx 回滚 → SQS ack 成功 幂等保证
FIFO SQS 同一 contact 串行(MessageGroupId = phone#storeId,PR #676 起;cron + per-call 一致) 防同 contact 并发分析

六、待加入 / 待移除

项目 状态
Prompt 加 pending + closed tasks 数据 ✅ 已落地,渲染为 PENDING TASKS / RECENTLY CLOSED TASKS section
Zod schema 加 taskDecisions + doNotContact + hasOpenComplaint ✅ 已落地(TaskDecision discriminated union)
writeAnalysisWithTasks() 新路径(AI-driven task decisions) ✅ 已落地(唯一写入路径)
FIFO SQS queue(dailyBatchQueue Standard → FIFO) ✅ 已落地,同 contact 串行,不同 contact 并行
SMS 正文内容传给 AI 未来 — 当前只传 direction/type/subject
leadTemperature / leadTemperatureReason ⚠️ 待移除 #32 — 从 timeline 到达时间实时计算
contact.temperature_changed timeline ⚠️ 随 temperature 一起移除