Flow 5:message-processor 写入详情¶
Lambda: message-processor(callytics-infrastructure) · 触发: SQS(RingCentral SMS/VM webhook) · 错误处理: 阻塞(SQS batchItemFailures)
P0 漂移修复状态
2026-04-21 的 3 项 P0 漂移已全部修复:
franchiseId由parseFranchiseFromClientId(_client_id, _account_id)派生,不硬编码(P0 #632 fix 防止把_client_id整串如"orangeTheory-193365026"误写为 franchiseId)site_id列改名为account_id(common migration 0016)store_id已写入 messages(通过resolvePhoneIdentity()解析)
设计意图: message-processor 不触发 contacts-analyzer — SMS 不触发 AI 重分析(2026-03-27 设计决策,不是 bug)。
数据流概览¶
SQS: RingCentral SMS/VM webhook [RC DATA]
│
╔═══════════════▼═══════════════════════════╗
║ db.batch() — 1 次 HTTP,原子 ║
║ ║
║ ① UPSERT messages ║
║ 逐条 upsert(ON CONFLICT DO UPDATE) ║
║ ║
║ ② UPSERT contacts (轻量) ║
║ lastActivityAt=NOW() ║
║ 不写 names/lifecycle/AI 字段 ║
║ ║
║ ③ INSERT timeline ║
║ message.created ║
║ occurredAt = msg.creationTime ║
╚═══════════════════════════════════════════╝
│
不写 tasks (V1)
SMS 数据由 Contact Analysis 综合分析时读取
不调用 AI。值来源:RingCentral 消息 [RC DATA] 或系统硬编码 [SYSTEM]。全部走 db.batch()(1 次 HTTP,原子);SQS batchItemFailures 在 Lambda 层重试;所有操作幂等(UPSERT / ON CONFLICT)。
读取数据¶
| 来源 | 内容 | 备注 |
|---|---|---|
| SQS 消息 | RC SMS/voicemail 事件 | RingCentral webhook |
Step ①:UPSERT messages 现有¶
逐条 upsert 到 messages 表(ON CONFLICT DO UPDATE)。
Step ②:UPSERT contacts(轻量) 新增¶
确保 contact 记录存在 — 解决 SMS-only 客户不在 contacts 中的 gap。
INSERT 时写入¶
PK 是 (phone, store_id)(common migration 0014 起)。
| 字段 | 值 | 说明 |
|---|---|---|
phone |
message.phone | PK 一部分 |
franchiseId |
parseFranchiseFromClientId(webhookMessage._client_id, webhookMessage._account_id) |
派生品牌名(如 "orangeTheory"),不硬编码;P0 #632 fix |
accountId |
webhookMessage._account_id |
SQS 注入字段;migration 0016 把 site_id 列改名为 account_id |
storeId |
resolvePhoneIdentity().storeId |
PK 一部分;PR #642 / #647 起写入 |
lastActivityAt |
lastActivityAtForward(message.creationTime) |
用事件时间(message.creationTime)而不是 NOW();canonical pattern: COALESCE(GREATEST(existing, eventTime), eventTime) |
反例 — 短码 SMS: 当发送方是短码(5-6 位数字,如
447267)时customerPhone解析不出,不写 contacts + timeline,只写 messages。短码不属于任何客户实体,只作通知/验证码归档。Flow 5 audit 24h 实测约 0.3% 触发(1/350),为合法跳过。
ON CONFLICT (phone, store_id) DO UPDATE¶
| 字段 | 值 | 说明 |
|---|---|---|
lastActivityAt |
lastActivityAtForward(message.creationTime) |
同上 — 用事件时间,forward-only |
accountId |
webhookMessage._account_id |
覆盖保持新鲜 |
updatedAt |
NOW() |
不写的字段¶
- AI 字段 — Contact Analysis 负责
firstName/lastName— Per-Call Analysis 负责。RC API 的from.name是单个 string,已存在 messages 表 JSONB,未来可 COALESCE 补写到 contacts(#521)lifecycleStage/leadStatus— lead-tracking 负责初始设置
Step ③:INSERT contact_timeline 新增¶
| 字段 | 值 |
|---|---|
contactPhone |
message.phone |
franchiseId |
parseFranchiseFromClientId(webhookMessage._client_id, webhookMessage._account_id) |
accountId |
webhookMessage._account_id(migration 0016 起列名 account_id) |
storeId |
resolvePhoneIdentity().storeId(PR #642 / #647 起写入) |
eventType |
'message.created' |
eventCategory |
'message' |
entityType |
'message' |
entityId |
String(message.id)(bigint → text) |
newValue |
{direction, messageType} |
occurredAt |
message.creationTime(源记录时间,非 NOW()) |
actorType |
'system' |
错误处理走 per-entry db.batch() 原子写入:失败 entry 加入 failedEntries → SQS batchItemFailures 重试。
不写 tasks(V1)¶
V1 不基于 SMS 创建 task;SMS 数据由 Contact Analysis 综合分析时读取。