Lambda 写入矩阵 — 总览¶
每个 Lambda 对每张 Neon 表的字段级写入职责清单,给开发者改 Lambda 写入逻辑前对照。字段所有权是硬约束 — 违反会导致 last-write-wins 竞态或数据不一致。
实施溯源 + 落地状态(2026-05-04)
设计文档:
- Pipeline 设计:
.claude/specs/2026-03-25-task-pipeline-write-flow.md - Flow 3(AI-driven task decisions):
.claude/specs/2026-03-28-flow3-contact-analysis-design.md
Schema 状态(callytics-common 0.34.0 + migration 0014-0023):
- 主业务表
contacts/calls/messages/leads/contact_timeline/tasks/staff,辅助表rc-stores/store-config/user-stores均具备store_id列 task_events仍只有account_id,无store_id— audit 表暂保留 account-level isolation,store-level 终态后续补contacts复合主键 =(phone, store_id),store_id NOT NULL(migration 0014)staff.store_idNOT NULL(migration 0015)site_id→account_idrename(migration 0016)- 其余表
store_id仍 nullable(calls/leads/messages/contact_timeline/tasks),writer 全写但保留 NULL 兼容历史行,各表均有idx_<table>_store_id WHERE store_id IS NOT NULLpartial index
Lambda writer 当前 store_id 写入状态(2026-05-04):
transcribe-processor→calls.store_id+contact_timeline.store_id(call.status_changed,Phase 1 起 dual writer)ai-analysis-processor→contacts/calls/contact_timeline.store_idcontacts-analyzer→contacts/tasks/contact_timeline.store_idmessage-processor→messages/contacts/contact_timeline.store_idlead-tracking(独立 repo)→leads/contacts/tasks/contact_timeline.store_id
franchise_id 来源(读源代码核对):
message-processor:parseFranchiseFromClientId(_client_id, _account_id)解析,不是硬编码ai-analysis-processor:从 SQS 上游 metadata 传入contacts-analyzer:从 contact 行的franchiseId读取lead-tracking:StoresV2 leadEmail 映射 →parseBrandFromFranchise()- 当前所有 writer 都从外部数据派生
franchise_id,不硬编码(历史 P0 #632 fix:阻止"_client_id整串含 site 后缀直接写入"的 bug)。
全局数据流¶
%%{init: {'theme': 'base', 'themeVariables': {'primaryColor': '#f5f5f5', 'lineColor': '#666'}, 'themeCSS': '.label { font-family: sans-serif; }'}}%%
flowchart TB
subgraph entry["入口层"]
LEAD["📧 Email Lead"]
CALL["📞 通话结束"]
SMS["💬 SMS/VM"]
UI["👤 员工 UI\n(DEFERRED)"]
end
subgraph lt["Lead Processing"]
LT["lead-tracking\n(独立 repo)\n非阻塞写入"]
end
subgraph p1["Per-Call Analysis"]
AAP["ai-analysis-processor\n(Prompt A · Kimi/Claude)\n非阻塞写入"]
end
subgraph mp["Message Processing"]
MSG["message-processor\n非阻塞写入"]
end
subgraph sa["Studio API (部分实施)"]
STUDIO["studio-api\nClose ✅ · Postpone ❌\ntimeline / lastActivityAt / SQS gap"]
end
subgraph p2triggers["Contact Analysis 触发源\nV1 无 Cooldown — DB index 保护数据完整性"]
CRON["❶ Cron 06:00 UTC ✅ ENABLED"]
DEMAND["❷ On-demand\n后端 ✅ · 前端 DEFERRED"]
FUN["❸ Per-Call Analysis SQS\n每通电话无条件触发(ai-analysis 非阻塞 send)\n✅ 已实施"]
TASKCLOSE["❹ Task close ⚠️ 部分实施"]
T5["T5 showed *"]
T6["T6 trialed *"]
end
subgraph p2["Contact Analysis"]
P2READ["读取\ncontacts(summary) · 增量 calls/msgs/leads\n+ tasks"]
P2AI["① AI 分析 (Kimi/Claude)"]
P2TEMP["② 温度计算 (纯规则)\ncomputeTemperature()\n⚠️ 待移除(从 timeline 时间戳计算)"]
P2ID["②b 预生成 ID\ntaskId = randomUUID()\ncontactAnalysisRunId = sqsRecord.messageId"]
end
subgraph tx["db.batch() — 1 次 HTTP,原子\n⚠️ updatedAt=NOW() 必须显式传\n幂等: uq_tasks_analysis_run_category\n撞 unique → 整个 batch 回滚 → SQS 重试"]
TX3["③ UPDATE contacts\ncustomerSummary · leadStatus\nlifecycleStage\nlastContactAnalysisAt=NOW()\n⚠️ temperature 待移除"]
TX4["④ AI-driven task decisions 🆕\nclose: closeResult(11值) + closeNote\nupdate: priority/suggestedActions\ncreate: new tasks per AI"]
TX5["⑤ taskDecisions 写入 🆕\nclose→UPDATE+timeline\ncreate→INSERT+timeline\nupdate→UPDATE+timeline"]
TX6["⑥ INSERT contact_timeline 🆕\ntask.created · task.status_changed\ntask.updated · contact.lifecycle_changed"]
end
LEADS[("leads")]
CALLS[("calls")]
CONTACTS[("contacts ★\nSSoT")]
MESSAGES[("messages")]
TASKS[("tasks 🆕")]
TIMELINE[("contact_timeline 🆕")]
STAFF[("staff\nDEFERRED · 已建 · 零写入")]
%% Entry → Lambda
LEAD --> LT
CALL --> AAP
SMS --> MSG
UI -.-> STUDIO
%% lead-tracking writes
LT -->|"INSERT"| LEADS
LT -->|"UPSERT 🆕 轻量\nON CONFLICT → updated_at\n不覆盖 AI 字段"| CONTACTS
LT -->|"INSERT 🆕\nlead_outreach · high\n双层 dedup: per-lead + per-contact\ndueAt=SLA(5min)-quiet hours"| TASKS
LT -->|"INSERT 🆕\nlead.created"| TIMELINE
%% Per-Call Analysis writes
AAP -->|"33 AI 字段"| CALLS
AAP -->|"UPSERT 基础字段\nfirstName · lastName\nlastActivityAt"| CONTACTS
AAP -->|"INSERT 🆕\ncall.created"| TIMELINE
AAP -->|"每通电话\n无条件触发"| FUN
%% message-processor writes
MSG -->|"UPSERT"| MESSAGES
MSG -->|"UPSERT 🆕 轻量\nlastActivityAt + updated_at"| CONTACTS
MSG -->|"INSERT 🆕\nmessage.created"| TIMELINE
%% studio-api writes
STUDIO -->|"close ✅\npostpone ❌"| TASKS
STUDIO -.->|"lastActivityAt\n(gap) *"| CONTACTS
STUDIO -.->|"task.status_changed\ntask.due_at_changed (gap) *"| TIMELINE
STUDIO -.->|"SQS → Contact Analysis\n(gap) *"| TASKCLOSE
%% Contact Analysis triggers
CRON --> P2READ
DEMAND --> P2READ
FUN --> P2READ
TASKCLOSE -.-> P2READ
T5 -.-> P2READ
T6 -.-> P2READ
%% Contact Analysis reads
CALLS -.->|"增量"| P2READ
MESSAGES -.->|"增量"| P2READ
LEADS -.->|"增量"| P2READ
CONTACTS -.->|"summary"| P2READ
TASKS -.->|"全量"| P2READ
%% Contact Analysis internal flow
P2READ --> P2AI --> P2TEMP --> P2ID --> TX3
%% Transaction internal flow
TX3 --> TX4 --> TX5 --> TX6
%% Transaction writes to DB
TX3 -->|"AI 综合字段"| CONTACTS
TX4 -->|"status=closed"| TASKS
TX5 -->|"status=pending"| TASKS
TX6 -->|"事件记录"| TIMELINE
%% Colors
style entry fill:#f5f5f5,stroke:#bdbdbd
style lt fill:#e8f5e9,stroke:#388e3c,color:#1b5e20
style p1 fill:#e3f2fd,stroke:#1565c0,color:#0d47a1
style mp fill:#ede7f6,stroke:#512da8,color:#311b92
style sa fill:#efebe9,stroke:#795548,color:#3e2723
style p2triggers fill:#fff8e1,stroke:#f9a825,color:#e65100
style p2 fill:#fff3e0,stroke:#ef6c00,color:#bf360c
style tx fill:#fce4ec,stroke:#c62828,color:#b71c1c
style CONTACTS fill:#c8e6c9,stroke:#2e7d32,color:#1b5e20
style TASKS fill:#f3e5f5,stroke:#7b1fa2,color:#4a148c
style CALLS fill:#e0f2f1,stroke:#00796b,color:#004d40
style TIMELINE fill:#e8eaf6,stroke:#283593,color:#1a237e
style LEADS fill:#fff9c4,stroke:#f57f17,color:#e65100
style MESSAGES fill:#fce4ec,stroke:#880e4f,color:#880e4f
style STAFF fill:#eceff1,stroke:#607d8b,color:#37474f
图例:实线 = 写入 · 虚线 = 读取 / DEFERRED · 🆕 = Task 系统新增 · * = DEFERRED(V1 不实现)
一、写入模式总览¶
1.1 所有 Lambda 统一用 Neon HTTP driver + Drizzle ORM¶
连接方式:@neondatabase/serverless HTTP driver → drizzle-orm/neon-http。每条 SQL 是一个 HTTP POST。
1.2 多表写入:db.batch() vs 分开 await¶
| 写法 | HTTP 请求数 | 原子性 | 用在哪 |
|---|---|---|---|
await db.batch([stmt1, stmt2, stmt3]) |
1 | ✅ 全成功 or 全回滚 | 写入之间有因果依赖 |
分开 await db.insert(...) |
N | ❌ 各自独立 | 有明确"主记录",其他是补充 |
db.transaction(async tx => {...}) |
— | — | ❌ neon-http driver 不支持,会 THROW |
db.batch()底层调用 Neon 的sql.transaction()— 所有 SQL 打包成 1 个 HTTP 请求,DB 内执行 BEGIN/COMMIT/ROLLBACK。
1.3 五个 Flow 写入模式¶
所有 Flow 的 Neon 写入统一用 db.batch() 打包(2026-03-28 Neon-primary 决策)。DDB 写入是过渡期冗余,独立标记不参与 batch;SQS 发送在 batch 外,batch 成功后才发,非阻塞。
| Flow | Lambda | Neon 写入模式 | 原因 |
|---|---|---|---|
| 1 | lead-tracking | 2-Lambda 拆分:poller UPSERT leads(独立 await) → EventBridge → SQS → lead-processor db.batch([contacts, tasks, timeline]) |
leads 写入失败重试(email 不 mark SEEN)和 downstream 失败重试(SQS retry)解耦,避免互相阻塞 |
| 2 | Per-Call Analysis | db.batch([calls, contacts, timeline]) |
calls+contacts+timeline 原子;DDB calls 过渡期独立写 |
| 3 | Contact Analysis | db.batch([contacts, ...taskDecisions, timeline]) |
关旧 task + 建新 task 必须原子(否则"幽灵分析") |
| 4a | studio-api Close | raw SQL × 2(UPDATE tasks → INSERT task_events) | 当前未原子化,前者成功后者失败 → task 已关但无 audit;后续可改用 db.batch() |
| 4b | studio-api Postpone | ❌ 未实施 | 暂无端点;终态用 db.batch() |
| 5 | message-processor | per-entry db.batch([messages, contacts, timeline]) |
每条 SQS record 一个 batch,失败 → 该 entry 进 batchItemFailures 重试 |
1.4 踩坑注意事项¶
db.batch() 生产注意(社区踩坑总结)
db.batch()内不能Promise.all— 并行语句会丢 transaction scope(Drizzle #2200)$onUpdate(() => new Date())在 batch 内不生效 — Drizzle 的$onUpdate是 query builder hook,不是 DB trigger。batch 内所有 UPDATE 必须显式写updatedAt: sql\NOW()``ON CONFLICT DO NOTHING不导致 batch 回滚 — PostgreSQL 的 conflict 处理是静默跳过,不抛错。dedup index 在 batch 内安全工作- Pin driver 版本 —
@neondatabase/serverless和drizzle-orm之间有版本兼容性问题,社区报告升级后运行时报错(Drizzle #5208) - SQS 提供 Lambda 层重试 —
db.batch()解决 SQL 原子性,SQSbatchItemFailures解决 Lambda crash 重试。两者配合 = 幂等 + 原子
二、总览矩阵¶
| Lambda | 触发输入(原始数据从哪来) | AI 分析前从 Neon 读什么 | 写入哪些表 | 详情 |
|---|---|---|---|---|
| lead-tracking | IMAP email(EventBridge 5min 轮询) | — | leads ✅ · contacts 🆕 · tasks 🆕 · timeline 🆕 | 详情 |
| Per-Call Analysis | SQS(transcript,来自 transcribe-processor) | contacts(customerSummary)+ calls(24h) + messages(24h) | calls ✅ · contacts ✅ · timeline 🆕 | 详情 |
| Contact Analysis | SQS {phone, franchiseId, siteId} |
contacts(summary)+ 增量 calls/msgs/leads + tasks | contacts ✅ · tasks 🆕 · timeline 🆕 | 详情 |
| message-processor | SQS(RingCentral SMS/VM webhook) | — | messages ✅ · contacts 🆕 · timeline 🆕 | 详情 |
| studio-api | API request(员工 UI 操作) | tasks(校验 status=pending)* | tasks* · contacts* · timeline* | 详情 |
✅ = 现有 🆕 = V1 新增 * = DEFERRED — = 不读 Neon(数据全在触发输入里)
Per-Call Analysis 读取:当前代码从 DDB 读(
customer-history.ts),计划迁移到 Neon。如果customerSummary存在就读,不存在就跳过(graceful degradation)。Contact Analysis 读取:customerSummary 是 compressed memory。
lastContactAnalysisAt为 NULL(首次)→ 读 30d 全量建立基线。有值 → 读lastContactAnalysisAt之后的增量。详见 contact-analysis-writes.md。
三、Contact Analysis 触发源¶
6 个触发源,统一发 SQS 消息到同一 schema(Zod-validated by SqsMessageBodySchema):
{
phone: string, // 必填
storeId: string, // 必填 — contacts PK 组件(post-migration 0014)
source?: 'cron' | 'on_demand' | 'per_call_analysis' | 'task_close' | 'showed' | 'trialed',
// optional observability trace 字段 — 老 producer 不传也能解析
callId?: string,
telephonySessionId?: string,
analysisCompletedAt?: number,
queuedAt?: number,
}
franchiseId/accountId不再传 — Consumer(contacts-analyzer)从 contact 行通过 PK lookup 自行读取。上游 producer 只需要phone + storeId。技术约束:SQS ESM filtering 只支持 body key(不支持 MessageAttributes)。
FIFO Queue 已上线 — dailyBatchQueue 从 Standard 切换为 FIFO 并迁移到
SqsStack。触发源发送消息必须提供MessageGroupId和MessageDeduplicationId,例如 ai-analysis-processor 用${phone}#${storeId}作 group key、per_call_analysis-${phone}-${callId}作 dedup id。
| # | 触发源 | SQS Queue | source |
状态 |
|---|---|---|---|---|
| ❶ | Cron 每天 06:00 UTC | dailyBatchQueue (FIFO) | 'cron' |
✅ ENABLED in CDK |
| ❷ | On-demand(前端 Refresh AI) | dailyBatchQueue(V1)/ expressQueue* | 'on_demand' |
✅ 后端现有,前端 DEFERRED |
| ❸ | Per-Call Analysis(每通电话) | dailyBatchQueue(V1)/ expressQueue* | 'per_call_analysis' |
✅ 已实施(ai-analysis-processor non-blocking SQS send) |
| ❹ | Task close(员工关闭 Task) | dailyBatchQueue (FIFO) / expressQueue* | 'task_close' |
⚠️ 部分实施 — studio-api 已能关 task + 写 audit,timeline / contacts.lastActivityAt / SQS 触发暂未补齐(详见 studio-api-writes.md) |
| T5 | showed(4h 黄金窗口) | expressQueue* | 'showed' |
🆕 DEFERRED |
| T6 | trialed(2-4h 黄金窗口) | expressQueue* | 'trialed' |
🆕 DEFERRED |
* expressQueue 尚未创建(Phase D5)。V1 期间 ❷❸❹ 用 dailyBatchQueue。 ❷ On-demand 后端 API 已存在,但前端 Refresh AI 按钮随员工 UI 一起 DEFERRED。
字段来源¶
| 触发源 | phone / storeId 从哪来 |
|---|---|
| ❶ Cron | contacts 表遍历(dispatch fan-out 时发 per-contact 消息) |
| ❷ On-demand | 前端 API request payload |
| ❸ Per-Call Analysis | ai-analysis-processor 解析 storeId 后发送(contactPhone + storeId 都解析才触发) |
| ❹ Task close | tasks 表的 contact_phone + store_id |
| T5/T6 | Contact Analysis 自身检测到 leadStatus 变化后发送 |
V1 不做 Cooldown¶
不做应用层 cooldown,重复分析由 DB index 保护(uq_tasks_pending_contact_category 防重复 task,uq_tasks_analysis_run_category 防 SQS 重试)。成本 ~200 calls/day × $0.005 ≈ $1/day,可接受。lastContactAnalysisAt 每次分析仍写 NOW(),未来加 cooldown 时可直接读取。
T5/T6 推荐实现(DEFERRED)¶
Contact Analysis 自检测 leadStatus 变化 → EventBridge Scheduler 延迟触发(showed=4h, trialed=2-4h) — SQS DelaySeconds 最大 15min 不够用。Loop guard:source 已是 showed/trialed 时不再检测。
四、contacts 表写入详情¶
contacts 是中心化客户档案,4 个 Lambda 各写不同字段。
字段所有权¶
Per-Call Analysis 只写"活动标记"(什么时候发生了什么),Contact Analysis 只写"AI 判断"(客户状态是什么) — 两者字段互不重叠。
| 分类 | 字段 | 所有者 |
|---|---|---|
| 身份 | firstName, lastName | Per-Call Analysis(初始值由 lead-tracking 设) |
| 活动时间 | lastActivityAt | Per-Call Analysis + lead-tracking + message-processor + studio-api |
| 业务数据 | hasCardOnFile | Per-Call Analysis(credit_card_captured AI 检测 → contacts 直写) |
| 运营 | notes | 员工(studio-api,DEFERRED) |
| 初始状态 | lifecycleStage='lead', leadStatus='new' | lead-tracking(仅 INSERT) |
| AI 画像 | customerSummary, leadStatus, leadStatusReason, lifecycleStage, lifecycleState, purchaseIntent, purchaseIntentReason, goals, leadObjections, leadRejectionReasons | Contact Analysis |
| AI 行动 | actionNeeded, actionNeededReason, suggestedActions | Contact Analysis |
| AI 风险 | doNotContact, doNotContactUpdatedBy, hasOpenComplaint | Contact Analysis |
| AI 追踪 | lastContactAnalysisAt | Contact Analysis |
| 系统 | createdAt, updatedAt | DB / 所有 Lambda |
三个 UPSERT contacts 对比¶
lead-tracking / Per-Call Analysis / message-processor 都 UPSERT contacts,但写的字段不同 — 数据源决定能写什么。
INSERT 时写入:
| 字段 | lead-tracking | Per-Call Analysis | message-processor | 差异原因 |
|---|---|---|---|---|
| phone, franchiseId, siteId | ✅ | ✅ | ✅ | PK,三者都需要 |
| lifecycleStage='lead' | ✅ | — | — | 只有 lead-tracking 知道这是 lead |
| leadStatus='new' | ✅ | — | — | 同上 |
| firstName | ✅(email 解析) | ✅(call AI 提取) | — | SMS webhook 不含名字(#521 未来可从 RC API 补写) |
| lastName | ✅ | ✅ | — | 同上 |
| lastActivityAt | ✅ NOW() |
✅ NOW() |
✅ NOW() |
三者一致 — 任何互动都是活动 |
| — | — | 通话才算"尝试联系" | ||
| — | — | 接通才算"首次接通" |
ON CONFLICT 时更新:
| 字段 | lead-tracking | Per-Call Analysis | message-processor | 差异原因 |
|---|---|---|---|---|
| lastActivityAt | NOW()(覆盖) |
NOW()(覆盖) |
NOW()(覆盖) |
三者一致 |
| firstName | COALESCE(只补写) |
覆盖(call AI 更准) | — | lead email 名字不如通话准,用 COALESCE 保留已有值 |
| lastName | COALESCE |
覆盖 | — | 同上 |
| lifecycleStage | 不覆盖 | — | — | 已有 contact 由 Contact Analysis 管理 |
| leadStatus | 不覆盖 | — | — | 同上 |
| updatedAt | NOW() |
NOW() |
NOW() |
三者一致 |
详见各 per-Lambda 文件的 Step ② section。
五、contact_timeline 表写入详情¶
contact_timeline 记录所有客户互动和状态变更事件,5 个 Lambda 共写 9 种事件类型。每条都包含公共必填字段(contactPhone / franchiseId / siteId / eventType / eventCategory / occurredAt / actorType)。
| Lambda(文件) | Step | eventType | entityType | entityId | oldValue | newValue | occurred_at | actorType |
|---|---|---|---|---|---|---|---|---|
| lead-tracking(Step ④) | ④ | lead.created |
lead | lead.id | — | — | lead.receivedAt | lead_webhook |
| Per-Call Analysis(Step ④) | ④ | call.created |
call | call.telephonySessionId | — | {callDirection, staffName, duration} | call.startTime | call_analysis |
| message-processor(Step ③) | ③ | message.created |
message | String(message.id) | — | {direction, messageType} | message.creationTime | system |
| Contact Analysis(Step ⑥) | ⑥ | task.created |
task | String(taskId) | — | {taskType, typeCategory, priority} | NOW()(tx) |
contact_analysis |
| Contact Analysis(Step ⑥) | ⑥ | task.status_changed |
task | String(closedtaskid) | {status: pending} | {status: closed, closeType: auto_closed, closeResult: AI(11值)} | NOW()(tx) |
contact_analysis |
| Contact Analysis(Step ⑥) | ⑥ | task.updated |
task | String(taskId) | {priority: old, suggestedActions: old} | {priority: new, suggestedActions: new, reason} | NOW()(tx) |
contact_analysis |
| Contact Analysis(Step ⑥) | ⑥ | contact.lifecycle_changed |
contact | — | {lifecycleStage: old, lifecycleState: old} | {lifecycleStage: new, lifecycleState: new} | NOW()(tx) |
contact_analysis |
| ⑥ | contact.temperature_changed |
contact | — | {leadTemperature: old} | {leadTemperature: new, reason} | NOW()(tx) |
contact_analysis |
|
| studio-api(4a Step ③)* | ③ | task.status_changed |
task | String(task.taskId) | {status: pending} | {status: closed, closeType: manual_closed, closeResult} | NOW() |
staff |
| studio-api(4b Step ②)* | ② | task.due_at_changed |
task | String(task.taskId) | {dueAt: old} | {dueAt: new, reason} | NOW() |
staff |
* = DEFERRED
occurred_at 规则:非 transaction 写入(lead / call / SMS)用源记录时间防排序漂移;transaction 内(Contact Analysis Step ⑥)用
NOW(),所有语句共享同一时间。错误处理:5 个 Lambda 的 timeline INSERT 都在
db.batch()内,batch 阻塞(失败 → 整个 batch 回滚 → SQS 重试)。lead-tracking 的 prod Neon 写入阻塞,test Neon fire-and-forget;其他 Lambda 的 SQS send / DDB 旁路写入是非阻塞。studio-api Close 当前是 raw SQL × 2(UPDATE tasks + INSERT taskevents)未原子化;不写 contacttimeline。
六、字段清理计划¶
有了 contact_timeline 后,contacts 表上部分字段可从 timeline 派生 → 计划移除。temperature 相关字段和 timeline 事件也将一并移除。
已移除的 contacts 字段¶
| 字段 | 替代方案 | PR |
|---|---|---|
leadStatusChangelog |
timeline contact.lifecycle_changed 事件 |
callytics-common#33 ✅ |
待移除的 contacts 字段(需 engineer review,callytics-common#32)¶
Phase 1:timeline 有数据后可移除
| 移除字段 | 替代方案 |
|---|---|
acquiredAt |
MIN(occurred_at) FROM contact_timeline WHERE event_type='lead.created' |
firstAttemptedAt |
MIN(occurred_at) FROM contact_timeline WHERE event_type='call.status_changed' AND new_value->>'status'='Setup' AND new_value->>'direction'='Outbound' Phase 1 起(infra#592 / #681 merged 2026-04-29):用 call.status_changed Setup 事件 — 拨号瞬间记录,不等 AI 分析(原 call.created 是 Disconnected 后回填,失去实时性) |
firstConnectedAt |
MIN(occurred_at) FROM contact_timeline WHERE event_type='call.status_changed' AND new_value->>'status'='Answered' Phase 1 起:RC Answered = 路由层接通(不一定是 human conversation,见 SoT § 5)。如果只算"真说话",仍需读 calls.callState='human_conversation'(AI 分析填) |
lastComplaintAt |
timeline complaint 事件 |
leadTemperature / leadTemperatureReason |
从 timeline 到达时间实时计算(NOW() - occurredAt),无需存储或中间函数 |
Phase 2:Task Pipeline 完成 + 前端迁移后可移除
| 移除字段 | 替代方案 | 为什么等 |
|---|---|---|
actionNeeded |
EXISTS(SELECT 1 FROM tasks WHERE status='pending') |
有了 tasks 表后这是 stale snapshot — 员工关了 task 但 contacts.actionNeeded 仍为 true |
actionNeededReason |
tasks 表同名字段 | 跟随 actionNeeded |
suggestedActions |
tasks 表同名字段(更新鲜) | 同上。需删 idx_contacts_action_needed |
待移除的 timeline 事件¶
— 随 temperature 字段一起移除。contact.temperature_changed
为什么 timeline 比 contacts 列更好¶
重复进入场景(客户 3 月来 → 冷却 → 9 月再来)时,contacts 列的 acquiredAt 永远冻结在 3 月。Timeline 可查当前周期的 lead.created,准确算 Speed to Lead。
相关决策变更¶
- typeCategory 门控:
temperature≠cold→ 改为 AI prompt 指令("lastActivityAt > 5d 不建 leadfollowup") - Stale override:>30d → neglected → 改为"30 天无活动 + 无 pending task"查询
当前状态:新代码(lead-tracking / message-processor)不写这些字段。Contact Analysis 现有代码仍写,等 timeline 有数据 + 验证后停止。
七、已知问题与代码隐患¶
Expert Panel 代码验证发现 10 个待修复问题,按严重度排序。
| # | 问题 | 严重度 | 当前状态 | GH Issue |
|---|---|---|---|---|
| 1 | getRecentLeads 缺 franchiseId 过滤 |
✅ 已修复 — 查询已加 franchiseId 过滤 |
#518 | |
| 2 | ✅ 已修复 — ai-analysis-processor 在 batch 写完后 non-blocking 发 SQS | #519 | ||
| 3 | lastContactAnalysisAt 从未写入 |
✅ 已修复 — Contact Analysis 每次写 NOW() |
#519 | |
| 4 | ContactData interface 缺字段 |
✅ 已修复 | #519 | |
| 5 | Cooldown 未实现时 duplicate task | — | V1 设计不做 Cooldown — DB index uq_tasks_pending_contact_category 保护数据完整性 |
— |
| 6 | 冗余 contacts 字段清理(9 个字段) | P2 | 待 timeline 有数据后移除 6 个 + Phase B 后移除 actionNeeded 三件套 | callytics-common#32 |
| 7 | writeAnalysis() 不在 transaction |
✅ 已修复 — 唯一路径是 writeAnalysisWithTasks() 用 db.batch() 原子写入 |
— | |
| 8 | Reserved concurrency=2 | P2 | 多触发源时升到 5 | — |
| 9 | Per-Call Analysis 从 DDB 读历史 → 迁移到 Neon | P2 | customer-history.ts 改为读 Neon contacts + calls(24h) + messages(24h) |
— |
| 10 | Contact Analysis 30 天全量读 → 增量优化 | P2 | customerSummary + 增量(NULL → 30d 首次,有值 → since lastContactAnalysisAt) |
— |
八、与其他文档的关系¶
| 文档 | 关系 |
|---|---|
Task Pipeline 写入流程(.claude/specs/2026-03-25-task-pipeline-write-flow.md) |
Pipeline 级流程设计(本目录是字段级补充) |
Expert Panel 审查报告(.claude/specs/2026-03-25-task-pipeline-expert-review.md) |
技术决策背景和争议记录 |
| Task 字段设计 | tasks 表业务字段定义 |
| Contacts 数据来源与字段设计 | contacts 表字段设计来源 |
| 后端数据管道 | 数据管道总体架构 |