跳转至

Flow 1:lead-tracking 写入详情

Repo: lead-tracking(独立 repo + CDK stack) · 触发: EventBridge 每 5 分钟 IMAP 轮询 · 架构: 2 Lambda(poller leads-only + lead-processor contacts/tasks/timeline)· Neon 双写: 同时写 prod + test

字段命名约定

accountId = RingCentral OAuth providerAccountId,对应 Neon 列 account_id(callytics-common 0.21.0 起从 siteId rename)。下文遗留的 siteId 字眼按 accountId 理解。storeId 是 store-level isolation 的唯一隔离键(UUID),accountId 仅作审计。


数据流概览

不调用 AI,所有值来自 email 解析 [EMAIL] 或系统硬编码 [SYSTEM]

  EventBridge 5min
  ┌─────────────────────────────────────────────────────┐
  │  Poller Lambda (src/poller.ts)                      │
  │                                                     │
  │  IMAP 轮询 → Email lead 数据 [EMAIL]                │
  │  StoresV2 mapping → franchiseId/accountId/storeId   │
  │                                                     │
  │  ┌──────────────────────────────────┐               │
  │  │ [DDB] 过渡期 — 将来整块删除     │               │
  │  │   PutCommand leads              │               │
  │  │   QueryCommand phone dedup      │               │
  │  └──────────────────────────────────┘               │
  │                                                     │
  │  ① UPSERT leads (prod Neon) — 阻塞                  │
  │  ① UPSERT leads (test Neon) — fire-and-forget       │
  │                                                     │
  │  ② 发布 EventBridge LeadCreated 事件                 │
  │     前置条件: phone + franchiseId + accountId + storeId │
  │     缺任一 → 跳过, 只写 leads                        │
  └─────────────────────────┬───────────────────────────┘
                            │ EventBridge
                            │ source='lead-tracking'
                            │ detail-type='LeadCreated'
                    ┌───────────────┐
                    │  SQS Queue    │
                    │  batchSize=1  │
                    │  maxRetry=3   │──► DLQ (14天)
                    └───────┬───────┘
  ┌─────────────────────────────────────────────────────┐
  │  Lead Processor Lambda (src/lead-processor.ts)      │
  │                                                     │
  │  db.batch() — 1 次 HTTP,原子:                       │
  │                                                     │
  │  ③ UPSERT contacts (轻量)                           │
  │     lifecycle='lead' · leadStatus='new'             │
  │     ON CONFLICT (phone, storeId): 不覆盖 lifecycle  │
  │                                                     │
  │  ④ INSERT tasks                                     │
  │     lead_outreach · high · dueAt=SLA(5min)          │
  │     ON CONFLICT DO NOTHING(dedup)                  │
  │                                                     │
  │  ⑤ INSERT timeline                                  │
  │     lead.created · occurredAt=receivedAt            │
  │     ON CONFLICT DO NOTHING (idempotencyKey)         │
  │                                                     │
  │  prod Neon — 阻塞(失败 → SQS 重试)                │
  │  test Neon — fire-and-forget(失败只 log)           │
  └─────────────────────────────────────────────────────┘

2 Lambda 拆分理由:leads 写入失败(email stays UNSEEN 重拉)和 downstream 写入失败(SQS retry)解耦,避免互相阻塞。原 1 Lambda 4 表 atomic batch 设计已被替换。


错误处理

阶段 失败时行为 重试机制
Poller: leads UPSERT throw → email 不 mark SEEN 下次 5min poll 重拉同一封 email
Poller: EventBridge publish throw → email 不 mark SEEN 同上
Lead-processor: ③④⑤ batch throw → SQS 重试 最多 3 次,之后进 DLQ(14 天保留)
Test Neon(两个 Lambda 都有) catch → console.warn 不重试,不影响 prod

读取数据

来源 内容 备注
IMAP 轮询 Email lead 原始数据 EventBridge 每 5 分钟触发
StoresV2 DynamoDB leadEmail → franchiseId/siteId/storeId 冷启动 Scan,warm start 复用缓存

Step ①:UPSERT leads

Poller Lambda 执行 Drizzle upsert,PK 是 composite dedup key({email}#{phone}#{date}{email}#{receivedAt})。

INSERT 时写入

字段 说明
id composite dedup key PK,防 IMAP 重复轮询
leadEmail email 发件人
leadType email 解析 Web Lead / Online Intro 等
firstName email body 解析(可空)
lastName email body 解析(可空)
phone email body 解析(可空) 标准化为 E.164 格式(+1XXXXXXXXXX)
bookedDate email body 解析(可空) 已预约日期
bookedTime email body 解析(可空)
emailSubject 原始 subject
emailFrom 发件人地址
emailRecipient 收件人地址
extractedTrackingId 从收件人地址提取 lead-tracking+{ID}@... → 映射 site
franchiseId StoresV2 → parseBrandFromFranchise() 解析 brand 名(如 "orangeTheory"),不含 store code
siteId StoresV2.providerAccountId RC 账户 ID
storeId StoresV2.storeId (UUID) 唯一隔离键(Store-Level Isolation)
rawBody 完整 email body 调试用
isForwarded Boolean,默认 false 是否转发 email
forwardedOriginalFrom 原始发件人(可空) 转发 email 时
forwardedOriginalTo 原始收件人(可空) 转发 email 时
forwardedOriginalDate 原始日期(可空) 转发 email 时
processedBy 'imap-poller'
receivedAt email 接收时间
syncedAt defaultNow()

ON CONFLICT (id) DO UPDATE

字段 说明
leadType 更新 email 可能重新分类
franchiseId 更新 StoresV2 映射可能修正
siteId 更新 同上
storeId COALESCE(leads.store_id, EXCLUDED.store_id) Forward-only:不用 NULL 覆盖已有值
syncedAt NOW()

Step ②:发布 EventBridge LeadCreated 事件

leads UPSERT 成功后由 poller 发布。Event payload: {Source: 'lead-tracking', DetailType: 'LeadCreated', Detail: <LeadRow JSON>}

前置条件(全部非 null 才发布):

字段 来源 为什么需要
phone email 解析 contacts PK 的一部分
franchiseId StoresV2 映射 tasks/审计需要
accountId StoresV2 映射(RC OAuth providerAccountId) tasks/审计需要
storeId StoresV2 映射(UUID) contacts PK 的一部分,store-level isolation 唯一隔离键

缺任一字段 → 跳过 EventBridge 发布,只写 leads 表,log [EVENTBRIDGE] Skipping — missing required fields

CDK 路由:LeadCreatedRule-${region} Rule → SQS → Lead Processor Lambda。


Step ③:UPSERT contacts(轻量)

确保 contact 记录存在 + 设置初始状态。Lead Processor 在 db.batch() 内与 ④⑤ 原子执行。

INSERT 时写入

字段 说明
phone lead.phone 复合 PK
storeId lead.storeId 复合 PK(Store-Level Isolation 终态)
franchiseId lead.franchiseId
accountId lead.accountId RC providerAccountId(审计字段)
lifecycleStage 'lead' 标记为 lead
leadStatus 'new' 新 lead 初始状态。Contact Analysis 后续覆盖
firstName lead.firstName(可空) Email 解析出的名字
lastName lead.lastName(可空) 同上
lastActivityAt lead.receivedAt 用 lead 接收时间,不是 NOW()(防 Lambda 执行时间漂移)

ON CONFLICT (phone, storeId) DO UPDATE

ON CONFLICT 只补写身份和 activity 字段,不写 leadStatuslifecycleStage(这两个已有 contact 的字段由 Contact Analysis 管理)。

字段 说明
lastActivityAt lastActivityAtForward(lead.receivedAt) forward-only 语义(COALESCE(GREATEST(existing, eventTime), eventTime)),乱序 lead 不能把已有 lastActivity 倒推。共享 helper 跨 5 个 contacts writer 一致
firstName COALESCE(contacts.first_name, lead.firstName) 只补写,不覆盖已有值
lastName COALESCE(contacts.last_name, lead.lastName) 同上
updatedAt now(lead-processor 当前时间)

不写的字段

字段 为什么不写
acquiredAt 已移除,timeline lead.createdoccurredAt 已记录
leadTemperature 已移除,是 NOW() - 到达时间 的派生值,timeline 已记录到达时间
customerSummary / purchaseIntent / goals 等 AI 字段 Contact Analysis 负责
leadStatusReason Contact Analysis 负责
firstAttemptedAt / firstConnectedAt Per-Call Analysis 负责(通话事件,不是 lead 到达)

Step ④:INSERT tasks

Lead Processor 在 db.batch() 内与 ③⑤ 原子执行,创建 1 条 high-priority lead_outreach task。

字段 说明
taskId crypto.randomUUID() 客户端预生成
contactPhone lead.phone 通过 buildContactPhoneIdentityFields() 写入 4-field 身份块
franchiseId lead.franchiseId 同上 helper
accountId lead.accountId 同上 helper
storeId lead.storeId(已 guard 非空) 同上 helper
taskType 'lead_outreach' 固定
typeCategory 'lead_outreach' 固定
sourceType 'lead'
sourceLeadId lead.id 幂等 key
status 'pending'
priority 'high' 固定
actionNeeded true
actionNeededReason New ${leadType} from ${firstName \|\| 'unknown'} ${lastName \|\| ''}(trim 后) 动态生成
suggestedActions jsonb 模板 1 项 high-priority action,提示员工"5 分钟黄金窗口内回拨新 lead"
dueAt computeDueAt('high', now, { slaMinutes: 5 }) 排除 quiet hours

去重保护(ON CONFLICT DO NOTHING)

撞任一 index 安全跳过,不报错。

Index 保护场景
uq_tasks_source_lead (sourceLeadId) 同一封 email 的 SQS 重试
uq_tasks_pending_contact_category (contactPhone, franchiseId, siteId, typeCategory) WHERE status='pending' 同一 contact 已有 pending lead_outreach

Step ⑤:INSERT contact_timeline

Lead Processor 在 db.batch() 内与 ③④ 原子执行。所有 timeline writer 通过共享 buildTimelineValues() helper(@retaintive/common/db)写入,统一字段约定。

字段
contactPhone lead.phone
franchiseId lead.franchiseId
accountId lead.accountId
storeId lead.storeId
eventType 'lead.created'
eventCategory 'lead'
entityType 'lead'
entityId lead.id
occurredAt lead.receivedAt(源记录时间, NOW())
actorType 'lead_webhook'
newValue {leadType, firstName, lastName} jsonb
idempotencyKey lead.created:${lead.id}

ON CONFLICT DO NOTHING 通过 idempotencyKey 去重,SQS 重试幂等。


CDK 基础设施

资源 名称 说明
Lambda ImapPollerFunction-${region} IMAP 轮询 + leads 写入 + EventBridge 发布
Lambda LeadProcessorFunction-${region} SQS 消费 → contacts/tasks/timeline
EventBridge Rule ImapPollerSchedule-${region} 每 5 分钟触发 poller
EventBridge Rule LeadCreatedRule-${region} source=lead-tracking, detail-type=LeadCreated → SQS
SQS Queue lead-processor-queue-${region} visibilityTimeout=300s, retention=4d, maxRetry=3
SQS DLQ lead-processor-dlq-${region} retention=14d
DynamoDB LeadTracking-v2-${region} 过渡期主表,将来移除

Test/Prod 双写模式

两个 Lambda 都同时写 prod 和 test Neon。

Lambda Prod Neon Test Neon
Poller (leads) 阻塞(失败 → email stays UNSEEN) fire-and-forget(失败只 log)
Lead Processor (③④⑤) 阻塞(失败 → SQS retry) fire-and-forget(失败只 log)

SSM 参数:

参数 用途
/lead-tracking/neon-database-url Prod Neon 连接串
/lead-tracking/neon-database-url-test Test Neon 连接串