跳转至

Flow 5:message-processor 写入详情

Lambda: message-processor(callytics-infrastructure) · 触发: SQS(RingCentral SMS/VM webhook) · 错误处理: 阻塞(SQS batchItemFailures)

P0 漂移修复状态

2026-04-21 的 3 项 P0 漂移已全部修复:

  • franchiseIdparseFranchiseFromClientId(_client_id, _account_id) 派生,不硬编码(P0 #632 fix 防止把 _client_id 整串如 "orangeTheory-193365026" 误写为 franchiseId)
  • site_id 列改名为 account_id(common migration 0016)
  • store_id 已写入 messages(通过 resolvePhoneIdentity() 解析)

设计意图: message-processor 不触发 contacts-analyzer — SMS 不触发 AI 重分析(2026-03-27 设计决策,不是 bug)。


数据流概览

  SQS: RingCentral SMS/VM webhook [RC DATA]
    ╔═══════════════▼═══════════════════════════╗
    ║  db.batch() — 1 次 HTTP,原子             ║
    ║                                           ║
    ║  ① UPSERT messages                        ║
    ║     逐条 upsert(ON CONFLICT DO UPDATE)  ║
    ║                                           ║
    ║  ② UPSERT contacts (轻量)                 ║
    ║     lastActivityAt=NOW()                  ║
    ║     不写 names/lifecycle/AI 字段           ║
    ║                                           ║
    ║  ③ INSERT timeline                        ║
    ║     message.created                       ║
    ║     occurredAt = msg.creationTime         ║
    ╚═══════════════════════════════════════════╝
              不写 tasks (V1)
              SMS 数据由 Contact Analysis 综合分析时读取

不调用 AI。值来源:RingCentral 消息 [RC DATA] 或系统硬编码 [SYSTEM]。全部走 db.batch()(1 次 HTTP,原子);SQS batchItemFailures 在 Lambda 层重试;所有操作幂等(UPSERT / ON CONFLICT)。


读取数据

来源 内容 备注
SQS 消息 RC SMS/voicemail 事件 RingCentral webhook

Step ①:UPSERT messages 现有

逐条 upsert 到 messages 表(ON CONFLICT DO UPDATE)。


Step ②:UPSERT contacts(轻量) 新增

确保 contact 记录存在 — 解决 SMS-only 客户不在 contacts 中的 gap。

INSERT 时写入

PK 是 (phone, store_id)(common migration 0014 起)。

字段 说明
phone message.phone PK 一部分
franchiseId parseFranchiseFromClientId(webhookMessage._client_id, webhookMessage._account_id) 派生品牌名(如 "orangeTheory"),不硬编码;P0 #632 fix
accountId webhookMessage._account_id SQS 注入字段;migration 0016 把 site_id 列改名为 account_id
storeId resolvePhoneIdentity().storeId PK 一部分;PR #642 / #647 起写入
lastActivityAt lastActivityAtForward(message.creationTime) 用事件时间(message.creationTime)而不是 NOW();canonical pattern: COALESCE(GREATEST(existing, eventTime), eventTime)

反例 — 短码 SMS: 当发送方是短码(5-6 位数字,如 447267)时 customerPhone 解析不出,不写 contacts + timeline,只写 messages。短码不属于任何客户实体,只作通知/验证码归档。Flow 5 audit 24h 实测约 0.3% 触发(1/350),为合法跳过。

ON CONFLICT (phone, store_id) DO UPDATE

字段 说明
lastActivityAt lastActivityAtForward(message.creationTime) 同上 — 用事件时间,forward-only
accountId webhookMessage._account_id 覆盖保持新鲜
updatedAt NOW()

不写的字段

  • AI 字段 — Contact Analysis 负责
  • firstName / lastName — Per-Call Analysis 负责。RC API 的 from.name 是单个 string,已存在 messages 表 JSONB,未来可 COALESCE 补写到 contacts(#521)
  • lifecycleStage / leadStatus — lead-tracking 负责初始设置

Step ③:INSERT contact_timeline 新增

字段
contactPhone message.phone
franchiseId parseFranchiseFromClientId(webhookMessage._client_id, webhookMessage._account_id)
accountId webhookMessage._account_id(migration 0016 起列名 account_id)
storeId resolvePhoneIdentity().storeId(PR #642 / #647 起写入)
eventType 'message.created'
eventCategory 'message'
entityType 'message'
entityId String(message.id)(bigint → text)
newValue {direction, messageType}
occurredAt message.creationTime(源记录时间, NOW())
actorType 'system'

错误处理走 per-entry db.batch() 原子写入:失败 entry 加入 failedEntries → SQS batchItemFailures 重试。


不写 tasks(V1)

V1 不基于 SMS 创建 task;SMS 数据由 Contact Analysis 综合分析时读取。