跳转至

Lambda 写入矩阵 — 总览

每个 Lambda 对每张 Neon 表的字段级写入职责清单,给开发者改 Lambda 写入逻辑前对照。字段所有权是硬约束 — 违反会导致 last-write-wins 竞态或数据不一致。

实施溯源 + 落地状态(2026-05-04)

设计文档:

  • Pipeline 设计: .claude/specs/2026-03-25-task-pipeline-write-flow.md
  • Flow 3(AI-driven task decisions): .claude/specs/2026-03-28-flow3-contact-analysis-design.md

Schema 状态(callytics-common 0.34.0 + migration 0014-0023):

  • 主业务表 contacts / calls / messages / leads / contact_timeline / tasks / staff,辅助表 rc-stores / store-config / user-stores 均具备 store_id
  • task_events 仍只有 account_id,无 store_id — audit 表暂保留 account-level isolation,store-level 终态后续补
  • contacts 复合主键 = (phone, store_id),store_id NOT NULL(migration 0014)
  • staff.store_id NOT NULL(migration 0015)
  • site_idaccount_id rename(migration 0016)
  • 其余表 store_id 仍 nullable(calls / leads / messages / contact_timeline / tasks),writer 全写但保留 NULL 兼容历史行,各表均有 idx_<table>_store_id WHERE store_id IS NOT NULL partial index

Lambda writer 当前 store_id 写入状态(2026-05-04):

  • transcribe-processorcalls.store_id + contact_timeline.store_id(call.status_changed,Phase 1 起 dual writer)
  • ai-analysis-processorcontacts/calls/contact_timeline.store_id
  • contacts-analyzercontacts/tasks/contact_timeline.store_id
  • message-processormessages/contacts/contact_timeline.store_id
  • lead-tracking(独立 repo)→ leads/contacts/tasks/contact_timeline.store_id

franchise_id 来源(读源代码核对):

  • message-processor:parseFranchiseFromClientId(_client_id, _account_id) 解析,不是硬编码
  • ai-analysis-processor:从 SQS 上游 metadata 传入
  • contacts-analyzer:从 contact 行的 franchiseId 读取
  • lead-tracking:StoresV2 leadEmail 映射 → parseBrandFromFranchise()
  • 当前所有 writer 都从外部数据派生 franchise_id,不硬编码(历史 P0 #632 fix:阻止"_client_id 整串含 site 后缀直接写入"的 bug)。

全局数据流

%%{init: {'theme': 'base', 'themeVariables': {'primaryColor': '#f5f5f5', 'lineColor': '#666'}, 'themeCSS': '.label { font-family: sans-serif; }'}}%%
flowchart TB
    subgraph entry["入口层"]
        LEAD["📧 Email Lead"]
        CALL["📞 通话结束"]
        SMS["💬 SMS/VM"]
        UI["👤 员工 UI\n(DEFERRED)"]
    end

    subgraph lt["Lead Processing"]
        LT["lead-tracking\n(独立 repo)\n非阻塞写入"]
    end

    subgraph p1["Per-Call Analysis"]
        AAP["ai-analysis-processor\n(Prompt A · Kimi/Claude)\n非阻塞写入"]
    end

    subgraph mp["Message Processing"]
        MSG["message-processor\n非阻塞写入"]
    end

    subgraph sa["Studio API (部分实施)"]
        STUDIO["studio-api\nClose ✅ · Postpone ❌\ntimeline / lastActivityAt / SQS gap"]
    end

    subgraph p2triggers["Contact Analysis 触发源\nV1 无 Cooldown — DB index 保护数据完整性"]
        CRON["❶ Cron 06:00 UTC ✅ ENABLED"]
        DEMAND["❷ On-demand\n后端 ✅ · 前端 DEFERRED"]
        FUN["❸ Per-Call Analysis SQS\n每通电话无条件触发(ai-analysis 非阻塞 send)\n✅ 已实施"]
        TASKCLOSE["❹ Task close ⚠️ 部分实施"]
        T5["T5 showed *"]
        T6["T6 trialed *"]
    end

    subgraph p2["Contact Analysis"]
        P2READ["读取\ncontacts(summary) · 增量 calls/msgs/leads\n+ tasks"]
        P2AI["① AI 分析 (Kimi/Claude)"]
        P2TEMP["② 温度计算 (纯规则)\ncomputeTemperature()\n⚠️ 待移除(从 timeline 时间戳计算)"]
        P2ID["②b 预生成 ID\ntaskId = randomUUID()\ncontactAnalysisRunId = sqsRecord.messageId"]
    end

    subgraph tx["db.batch() — 1 次 HTTP,原子\n⚠️ updatedAt=NOW() 必须显式传\n幂等: uq_tasks_analysis_run_category\n撞 unique → 整个 batch 回滚 → SQS 重试"]
        TX3["③ UPDATE contacts\ncustomerSummary · leadStatus\nlifecycleStage\nlastContactAnalysisAt=NOW()\n⚠️ temperature 待移除"]
        TX4["④ AI-driven task decisions 🆕\nclose: closeResult(11值) + closeNote\nupdate: priority/suggestedActions\ncreate: new tasks per AI"]
        TX5["⑤ taskDecisions 写入 🆕\nclose→UPDATE+timeline\ncreate→INSERT+timeline\nupdate→UPDATE+timeline"]
        TX6["⑥ INSERT contact_timeline 🆕\ntask.created · task.status_changed\ntask.updated · contact.lifecycle_changed"]
    end

    LEADS[("leads")]
    CALLS[("calls")]
    CONTACTS[("contacts ★\nSSoT")]
    MESSAGES[("messages")]
    TASKS[("tasks 🆕")]
    TIMELINE[("contact_timeline 🆕")]
    STAFF[("staff\nDEFERRED · 已建 · 零写入")]

    %% Entry → Lambda
    LEAD --> LT
    CALL --> AAP
    SMS --> MSG
    UI -.-> STUDIO

    %% lead-tracking writes
    LT -->|"INSERT"| LEADS
    LT -->|"UPSERT 🆕 轻量\nON CONFLICT → updated_at\n不覆盖 AI 字段"| CONTACTS
    LT -->|"INSERT 🆕\nlead_outreach · high\n双层 dedup: per-lead + per-contact\ndueAt=SLA(5min)-quiet hours"| TASKS
    LT -->|"INSERT 🆕\nlead.created"| TIMELINE

    %% Per-Call Analysis writes
    AAP -->|"33 AI 字段"| CALLS
    AAP -->|"UPSERT 基础字段\nfirstName · lastName\nlastActivityAt"| CONTACTS
    AAP -->|"INSERT 🆕\ncall.created"| TIMELINE
    AAP -->|"每通电话\n无条件触发"| FUN

    %% message-processor writes
    MSG -->|"UPSERT"| MESSAGES
    MSG -->|"UPSERT 🆕 轻量\nlastActivityAt + updated_at"| CONTACTS
    MSG -->|"INSERT 🆕\nmessage.created"| TIMELINE

    %% studio-api writes
    STUDIO -->|"close ✅\npostpone ❌"| TASKS
    STUDIO -.->|"lastActivityAt\n(gap) *"| CONTACTS
    STUDIO -.->|"task.status_changed\ntask.due_at_changed (gap) *"| TIMELINE
    STUDIO -.->|"SQS → Contact Analysis\n(gap) *"| TASKCLOSE

    %% Contact Analysis triggers
    CRON --> P2READ
    DEMAND --> P2READ
    FUN --> P2READ
    TASKCLOSE -.-> P2READ
    T5 -.-> P2READ
    T6 -.-> P2READ

    %% Contact Analysis reads
    CALLS -.->|"增量"| P2READ
    MESSAGES -.->|"增量"| P2READ
    LEADS -.->|"增量"| P2READ
    CONTACTS -.->|"summary"| P2READ
    TASKS -.->|"全量"| P2READ

    %% Contact Analysis internal flow
    P2READ --> P2AI --> P2TEMP --> P2ID --> TX3

    %% Transaction internal flow
    TX3 --> TX4 --> TX5 --> TX6

    %% Transaction writes to DB
    TX3 -->|"AI 综合字段"| CONTACTS
    TX4 -->|"status=closed"| TASKS
    TX5 -->|"status=pending"| TASKS
    TX6 -->|"事件记录"| TIMELINE

    %% Colors
    style entry fill:#f5f5f5,stroke:#bdbdbd
    style lt fill:#e8f5e9,stroke:#388e3c,color:#1b5e20
    style p1 fill:#e3f2fd,stroke:#1565c0,color:#0d47a1
    style mp fill:#ede7f6,stroke:#512da8,color:#311b92
    style sa fill:#efebe9,stroke:#795548,color:#3e2723
    style p2triggers fill:#fff8e1,stroke:#f9a825,color:#e65100
    style p2 fill:#fff3e0,stroke:#ef6c00,color:#bf360c
    style tx fill:#fce4ec,stroke:#c62828,color:#b71c1c
    style CONTACTS fill:#c8e6c9,stroke:#2e7d32,color:#1b5e20
    style TASKS fill:#f3e5f5,stroke:#7b1fa2,color:#4a148c
    style CALLS fill:#e0f2f1,stroke:#00796b,color:#004d40
    style TIMELINE fill:#e8eaf6,stroke:#283593,color:#1a237e
    style LEADS fill:#fff9c4,stroke:#f57f17,color:#e65100
    style MESSAGES fill:#fce4ec,stroke:#880e4f,color:#880e4f
    style STAFF fill:#eceff1,stroke:#607d8b,color:#37474f

图例:实线 = 写入 · 虚线 = 读取 / DEFERRED · 🆕 = Task 系统新增 · * = DEFERRED(V1 不实现)


一、写入模式总览

1.1 所有 Lambda 统一用 Neon HTTP driver + Drizzle ORM

连接方式:@neondatabase/serverless HTTP driver → drizzle-orm/neon-http。每条 SQL 是一个 HTTP POST。

1.2 多表写入:db.batch() vs 分开 await

写法 HTTP 请求数 原子性 用在哪
await db.batch([stmt1, stmt2, stmt3]) 1 ✅ 全成功 or 全回滚 写入之间有因果依赖
分开 await db.insert(...) N ❌ 各自独立 有明确"主记录",其他是补充
db.transaction(async tx => {...}) neon-http driver 不支持,会 THROW

db.batch() 底层调用 Neon 的 sql.transaction() — 所有 SQL 打包成 1 个 HTTP 请求,DB 内执行 BEGIN/COMMIT/ROLLBACK。

1.3 五个 Flow 写入模式

所有 Flow 的 Neon 写入统一用 db.batch() 打包(2026-03-28 Neon-primary 决策)。DDB 写入是过渡期冗余,独立标记不参与 batch;SQS 发送在 batch 外,batch 成功后才发,非阻塞。

Flow Lambda Neon 写入模式 原因
1 lead-tracking 2-Lambda 拆分:poller UPSERT leads(独立 await) → EventBridge → SQS → lead-processor db.batch([contacts, tasks, timeline]) leads 写入失败重试(email 不 mark SEEN)和 downstream 失败重试(SQS retry)解耦,避免互相阻塞
2 Per-Call Analysis db.batch([calls, contacts, timeline]) calls+contacts+timeline 原子;DDB calls 过渡期独立写
3 Contact Analysis db.batch([contacts, ...taskDecisions, timeline]) 关旧 task + 建新 task 必须原子(否则"幽灵分析")
4a studio-api Close raw SQL × 2(UPDATE tasks → INSERT task_events) 当前未原子化,前者成功后者失败 → task 已关但无 audit;后续可改用 db.batch()
4b studio-api Postpone ❌ 未实施 暂无端点;终态用 db.batch()
5 message-processor per-entry db.batch([messages, contacts, timeline]) 每条 SQS record 一个 batch,失败 → 该 entry 进 batchItemFailures 重试

1.4 踩坑注意事项

db.batch() 生产注意(社区踩坑总结)

  1. db.batch() 内不能 Promise.all — 并行语句会丢 transaction scope(Drizzle #2200
  2. $onUpdate(() => new Date()) 在 batch 内不生效 — Drizzle 的 $onUpdate 是 query builder hook,不是 DB trigger。batch 内所有 UPDATE 必须显式写 updatedAt: sql\NOW()``
  3. ON CONFLICT DO NOTHING 不导致 batch 回滚 — PostgreSQL 的 conflict 处理是静默跳过,不抛错。dedup index 在 batch 内安全工作
  4. Pin driver 版本@neondatabase/serverlessdrizzle-orm 之间有版本兼容性问题,社区报告升级后运行时报错(Drizzle #5208
  5. SQS 提供 Lambda 层重试db.batch() 解决 SQL 原子性,SQS batchItemFailures 解决 Lambda crash 重试。两者配合 = 幂等 + 原子

二、总览矩阵

Lambda 触发输入(原始数据从哪来) AI 分析前从 Neon 读什么 写入哪些表 详情
lead-tracking IMAP email(EventBridge 5min 轮询) leads ✅ · contacts 🆕 · tasks 🆕 · timeline 🆕 详情
Per-Call Analysis SQS(transcript,来自 transcribe-processor) contacts(customerSummary)+ calls(24h) + messages(24h) calls ✅ · contacts ✅ · timeline 🆕 详情
Contact Analysis SQS {phone, franchiseId, siteId} contacts(summary)+ 增量 calls/msgs/leads + tasks contacts ✅ · tasks 🆕 · timeline 🆕 详情
message-processor SQS(RingCentral SMS/VM webhook) messages ✅ · contacts 🆕 · timeline 🆕 详情
studio-api API request(员工 UI 操作) tasks(校验 status=pending)* tasks* · contacts* · timeline* 详情

✅ = 现有 🆕 = V1 新增 * = DEFERRED — = 不读 Neon(数据全在触发输入里)

Per-Call Analysis 读取:当前代码从 DDB 读(customer-history.ts),计划迁移到 Neon。如果 customerSummary 存在就读,不存在就跳过(graceful degradation)。

Contact Analysis 读取:customerSummary 是 compressed memory。lastContactAnalysisAt 为 NULL(首次)→ 读 30d 全量建立基线。有值 → 读 lastContactAnalysisAt 之后的增量。详见 contact-analysis-writes.md


三、Contact Analysis 触发源

6 个触发源,统一发 SQS 消息到同一 schema(Zod-validated by SqsMessageBodySchema):

{
  phone: string,        // 必填
  storeId: string,      // 必填 — contacts PK 组件(post-migration 0014)
  source?: 'cron' | 'on_demand' | 'per_call_analysis' | 'task_close' | 'showed' | 'trialed',
  // optional observability trace 字段 — 老 producer 不传也能解析
  callId?: string,
  telephonySessionId?: string,
  analysisCompletedAt?: number,
  queuedAt?: number,
}

franchiseId / accountId 不再传 — Consumer(contacts-analyzer)从 contact 行通过 PK lookup 自行读取。上游 producer 只需要 phone + storeId

技术约束:SQS ESM filtering 只支持 body key(不支持 MessageAttributes)。

FIFO Queue 已上线 — dailyBatchQueue 从 Standard 切换为 FIFO 并迁移到 SqsStack。触发源发送消息必须提供 MessageGroupIdMessageDeduplicationId,例如 ai-analysis-processor 用 ${phone}#${storeId} 作 group key、per_call_analysis-${phone}-${callId} 作 dedup id。

# 触发源 SQS Queue source 状态
Cron 每天 06:00 UTC dailyBatchQueue (FIFO) 'cron' ✅ ENABLED in CDK
On-demand(前端 Refresh AI) dailyBatchQueue(V1)/ expressQueue* 'on_demand' ✅ 后端现有,前端 DEFERRED
Per-Call Analysis(每通电话) dailyBatchQueue(V1)/ expressQueue* 'per_call_analysis' ✅ 已实施(ai-analysis-processor non-blocking SQS send)
Task close(员工关闭 Task) dailyBatchQueue (FIFO) / expressQueue* 'task_close' ⚠️ 部分实施 — studio-api 已能关 task + 写 audit,timeline / contacts.lastActivityAt / SQS 触发暂未补齐(详见 studio-api-writes.md
T5 showed(4h 黄金窗口) expressQueue* 'showed' 🆕 DEFERRED
T6 trialed(2-4h 黄金窗口) expressQueue* 'trialed' 🆕 DEFERRED

* expressQueue 尚未创建(Phase D5)。V1 期间 ❷❸❹ 用 dailyBatchQueue。 ❷ On-demand 后端 API 已存在,但前端 Refresh AI 按钮随员工 UI 一起 DEFERRED。

字段来源

触发源 phone / storeId 从哪来
❶ Cron contacts 表遍历(dispatch fan-out 时发 per-contact 消息)
❷ On-demand 前端 API request payload
❸ Per-Call Analysis ai-analysis-processor 解析 storeId 后发送(contactPhone + storeId 都解析才触发)
❹ Task close tasks 表的 contact_phone + store_id
T5/T6 Contact Analysis 自身检测到 leadStatus 变化后发送

V1 不做 Cooldown

不做应用层 cooldown,重复分析由 DB index 保护(uq_tasks_pending_contact_category 防重复 task,uq_tasks_analysis_run_category 防 SQS 重试)。成本 ~200 calls/day × $0.005 ≈ $1/day,可接受。lastContactAnalysisAt 每次分析仍写 NOW(),未来加 cooldown 时可直接读取。

T5/T6 推荐实现(DEFERRED)

Contact Analysis 自检测 leadStatus 变化 → EventBridge Scheduler 延迟触发(showed=4h, trialed=2-4h) — SQS DelaySeconds 最大 15min 不够用。Loop guard:source 已是 showed/trialed 时不再检测。


四、contacts 表写入详情

contacts 是中心化客户档案,4 个 Lambda 各写不同字段。

字段所有权

Per-Call Analysis 只写"活动标记"(什么时候发生了什么),Contact Analysis 只写"AI 判断"(客户状态是什么) — 两者字段互不重叠

分类 字段 所有者
身份 firstName, lastName Per-Call Analysis(初始值由 lead-tracking 设)
活动时间 lastActivityAt Per-Call Analysis + lead-tracking + message-processor + studio-api
业务数据 hasCardOnFile Per-Call Analysis(credit_card_captured AI 检测 → contacts 直写)
运营 notes 员工(studio-api,DEFERRED)
初始状态 lifecycleStage='lead', leadStatus='new' lead-tracking(仅 INSERT)
AI 画像 customerSummary, leadStatus, leadStatusReason, lifecycleStage, lifecycleState, purchaseIntent, purchaseIntentReason, goals, leadObjections, leadRejectionReasons Contact Analysis
AI 行动 actionNeeded, actionNeededReason, suggestedActions Contact Analysis
AI 风险 doNotContact, doNotContactUpdatedBy, hasOpenComplaint Contact Analysis
AI 追踪 lastContactAnalysisAt Contact Analysis
系统 createdAt, updatedAt DB / 所有 Lambda

三个 UPSERT contacts 对比

lead-tracking / Per-Call Analysis / message-processor 都 UPSERT contacts,但写的字段不同 — 数据源决定能写什么。

INSERT 时写入:

字段 lead-tracking Per-Call Analysis message-processor 差异原因
phone, franchiseId, siteId PK,三者都需要
lifecycleStage='lead' 只有 lead-tracking 知道这是 lead
leadStatus='new' 同上
firstName ✅(email 解析) ✅(call AI 提取) SMS webhook 不含名字(#521 未来可从 RC API 补写)
lastName 同上
lastActivityAt NOW() NOW() NOW() 三者一致 — 任何互动都是活动
firstAttemptedAt 待移除 通话才算"尝试联系"
firstConnectedAt 待移除 接通才算"首次接通"

ON CONFLICT 时更新:

字段 lead-tracking Per-Call Analysis message-processor 差异原因
lastActivityAt NOW()(覆盖) NOW()(覆盖) NOW()(覆盖) 三者一致
firstName COALESCE(只补写) 覆盖(call AI 更准) lead email 名字不如通话准,用 COALESCE 保留已有值
lastName COALESCE 覆盖 同上
lifecycleStage 不覆盖 已有 contact 由 Contact Analysis 管理
leadStatus 不覆盖 同上
updatedAt NOW() NOW() NOW() 三者一致

详见各 per-Lambda 文件的 Step ② section。


五、contact_timeline 表写入详情

contact_timeline 记录所有客户互动和状态变更事件,5 个 Lambda 共写 9 种事件类型。每条都包含公共必填字段(contactPhone / franchiseId / siteId / eventType / eventCategory / occurredAt / actorType)。

Lambda(文件) Step eventType entityType entityId oldValue newValue occurred_at actorType
lead-tracking(Step ④ lead.created lead lead.id lead.receivedAt lead_webhook
Per-Call Analysis(Step ④ call.created call call.telephonySessionId {callDirection, staffName, duration} call.startTime call_analysis
message-processor(Step ③ message.created message String(message.id) {direction, messageType} message.creationTime system
Contact Analysis(Step ⑥ task.created task String(taskId) {taskType, typeCategory, priority} NOW()(tx) contact_analysis
Contact Analysis(Step ⑥ task.status_changed task String(closedtaskid) {status: pending} {status: closed, closeType: auto_closed, closeResult: AI(11值)} NOW()(tx) contact_analysis
Contact Analysis(Step ⑥ task.updated task String(taskId) {priority: old, suggestedActions: old} {priority: new, suggestedActions: new, reason} NOW()(tx) contact_analysis
Contact Analysis(Step ⑥ contact.lifecycle_changed contact {lifecycleStage: old, lifecycleState: old} {lifecycleStage: new, lifecycleState: new} NOW()(tx) contact_analysis
Contact Analysis(Step ⑥ contact.temperature_changed contact {leadTemperature: old} {leadTemperature: new, reason} NOW()(tx) contact_analysis
studio-api(4a Step ③)* task.status_changed task String(task.taskId) {status: pending} {status: closed, closeType: manual_closed, closeResult} NOW() staff
studio-api(4b Step ②)* task.due_at_changed task String(task.taskId) {dueAt: old} {dueAt: new, reason} NOW() staff

* = DEFERRED

occurred_at 规则:非 transaction 写入(lead / call / SMS)用源记录时间防排序漂移;transaction 内(Contact Analysis Step ⑥)用 NOW(),所有语句共享同一时间。

错误处理:5 个 Lambda 的 timeline INSERT 都在 db.batch() 内,batch 阻塞(失败 → 整个 batch 回滚 → SQS 重试)。lead-tracking 的 prod Neon 写入阻塞,test Neon fire-and-forget;其他 Lambda 的 SQS send / DDB 旁路写入是非阻塞。studio-api Close 当前是 raw SQL × 2(UPDATE tasks + INSERT taskevents)未原子化;不写 contacttimeline。


六、字段清理计划

有了 contact_timeline 后,contacts 表上部分字段可从 timeline 派生 → 计划移除。temperature 相关字段和 timeline 事件也将一并移除。

已移除的 contacts 字段

字段 替代方案 PR
leadStatusChangelog timeline contact.lifecycle_changed 事件 callytics-common#33

待移除的 contacts 字段(需 engineer review,callytics-common#32)

Phase 1:timeline 有数据后可移除

移除字段 替代方案
acquiredAt MIN(occurred_at) FROM contact_timeline WHERE event_type='lead.created'
firstAttemptedAt MIN(occurred_at) FROM contact_timeline WHERE event_type='call.status_changed' AND new_value->>'status'='Setup' AND new_value->>'direction'='Outbound'
Phase 1 起(infra#592 / #681 merged 2026-04-29):用 call.status_changed Setup 事件 — 拨号瞬间记录,不等 AI 分析(原 call.created 是 Disconnected 后回填,失去实时性)
firstConnectedAt MIN(occurred_at) FROM contact_timeline WHERE event_type='call.status_changed' AND new_value->>'status'='Answered'
Phase 1 起:RC Answered = 路由层接通(不一定是 human conversation,见 SoT § 5)。如果只算"真说话",仍需读 calls.callState='human_conversation'(AI 分析填)
lastComplaintAt timeline complaint 事件
leadTemperature / leadTemperatureReason 从 timeline 到达时间实时计算(NOW() - occurredAt),无需存储或中间函数

Phase 2:Task Pipeline 完成 + 前端迁移后可移除

移除字段 替代方案 为什么等
actionNeeded EXISTS(SELECT 1 FROM tasks WHERE status='pending') 有了 tasks 表后这是 stale snapshot — 员工关了 task 但 contacts.actionNeeded 仍为 true
actionNeededReason tasks 表同名字段 跟随 actionNeeded
suggestedActions tasks 表同名字段(更新鲜) 同上。需删 idx_contacts_action_needed

待移除的 timeline 事件

contact.temperature_changed — 随 temperature 字段一起移除。

为什么 timeline 比 contacts 列更好

重复进入场景(客户 3 月来 → 冷却 → 9 月再来)时,contacts 列的 acquiredAt 永远冻结在 3 月。Timeline 可查当前周期的 lead.created,准确算 Speed to Lead。

相关决策变更

  • typeCategory 门控:temperature≠cold → 改为 AI prompt 指令("lastActivityAt > 5d 不建 leadfollowup")
  • Stale override:>30d → neglected → 改为"30 天无活动 + 无 pending task"查询

当前状态:新代码(lead-tracking / message-processor)不写这些字段。Contact Analysis 现有代码仍写,等 timeline 有数据 + 验证后停止。


七、已知问题与代码隐患

Expert Panel 代码验证发现 10 个待修复问题,按严重度排序。

# 问题 严重度 当前状态 GH Issue
1 getRecentLeadsfranchiseId 过滤 P0 ✅ 已修复 — 查询已加 franchiseId 过滤 #518
2 Per-Call Analysis 当前不触发 Contact Analysis P0 ✅ 已修复 — ai-analysis-processor 在 batch 写完后 non-blocking 发 SQS #519
3 lastContactAnalysisAt 从未写入 P1 ✅ 已修复 — Contact Analysis 每次写 NOW() #519
4 ContactData interface 缺字段 P1 ✅ 已修复 #519
5 Cooldown 未实现时 duplicate task V1 设计不做 Cooldown — DB index uq_tasks_pending_contact_category 保护数据完整性
6 冗余 contacts 字段清理(9 个字段) P2 待 timeline 有数据后移除 6 个 + Phase B 后移除 actionNeeded 三件套 callytics-common#32
7 writeAnalysis() 不在 transaction P1 ✅ 已修复 — 唯一路径是 writeAnalysisWithTasks()db.batch() 原子写入
8 Reserved concurrency=2 P2 多触发源时升到 5
9 Per-Call Analysis 从 DDB 读历史 → 迁移到 Neon P2 customer-history.ts 改为读 Neon contacts + calls(24h) + messages(24h)
10 Contact Analysis 30 天全量读 → 增量优化 P2 customerSummary + 增量(NULL → 30d 首次,有值 → since lastContactAnalysisAt)

八、与其他文档的关系

文档 关系
Task Pipeline 写入流程(.claude/specs/2026-03-25-task-pipeline-write-flow.md) Pipeline 级流程设计(本目录是字段级补充)
Expert Panel 审查报告(.claude/specs/2026-03-25-task-pipeline-expert-review.md) 技术决策背景和争议记录
Task 字段设计 tasks 表业务字段定义
Contacts 数据来源与字段设计 contacts 表字段设计来源
后端数据管道 数据管道总体架构