Flow 1:lead-tracking 写入详情¶
Repo: lead-tracking(独立 repo + CDK stack) · 触发: EventBridge 每 5 分钟 IMAP 轮询 · 架构: 2 Lambda(poller leads-only + lead-processor contacts/tasks/timeline)· Neon 双写: 同时写 prod + test
字段命名约定
accountId = RingCentral OAuth providerAccountId,对应 Neon 列 account_id(callytics-common 0.21.0 起从 siteId rename)。下文遗留的 siteId 字眼按 accountId 理解。storeId 是 store-level isolation 的唯一隔离键(UUID),accountId 仅作审计。
数据流概览¶
不调用 AI,所有值来自 email 解析 [EMAIL] 或系统硬编码 [SYSTEM]。
EventBridge 5min
│
▼
┌─────────────────────────────────────────────────────┐
│ Poller Lambda (src/poller.ts) │
│ │
│ IMAP 轮询 → Email lead 数据 [EMAIL] │
│ StoresV2 mapping → franchiseId/accountId/storeId │
│ │
│ ┌──────────────────────────────────┐ │
│ │ [DDB] 过渡期 — 将来整块删除 │ │
│ │ PutCommand leads │ │
│ │ QueryCommand phone dedup │ │
│ └──────────────────────────────────┘ │
│ │
│ ① UPSERT leads (prod Neon) — 阻塞 │
│ ① UPSERT leads (test Neon) — fire-and-forget │
│ │
│ ② 发布 EventBridge LeadCreated 事件 │
│ 前置条件: phone + franchiseId + accountId + storeId │
│ 缺任一 → 跳过, 只写 leads │
└─────────────────────────┬───────────────────────────┘
│ EventBridge
│ source='lead-tracking'
│ detail-type='LeadCreated'
▼
┌───────────────┐
│ SQS Queue │
│ batchSize=1 │
│ maxRetry=3 │──► DLQ (14天)
└───────┬───────┘
│
▼
┌─────────────────────────────────────────────────────┐
│ Lead Processor Lambda (src/lead-processor.ts) │
│ │
│ db.batch() — 1 次 HTTP,原子: │
│ │
│ ③ UPSERT contacts (轻量) │
│ lifecycle='lead' · leadStatus='new' │
│ ON CONFLICT (phone, storeId): 不覆盖 lifecycle │
│ │
│ ④ INSERT tasks │
│ lead_outreach · high · dueAt=SLA(5min) │
│ ON CONFLICT DO NOTHING(dedup) │
│ │
│ ⑤ INSERT timeline │
│ lead.created · occurredAt=receivedAt │
│ ON CONFLICT DO NOTHING (idempotencyKey) │
│ │
│ prod Neon — 阻塞(失败 → SQS 重试) │
│ test Neon — fire-and-forget(失败只 log) │
└─────────────────────────────────────────────────────┘
2 Lambda 拆分理由:leads 写入失败(email stays UNSEEN 重拉)和 downstream 写入失败(SQS retry)解耦,避免互相阻塞。原 1 Lambda 4 表 atomic batch 设计已被替换。
错误处理¶
| 阶段 | 失败时行为 | 重试机制 |
|---|---|---|
| Poller: leads UPSERT | throw → email 不 mark SEEN | 下次 5min poll 重拉同一封 email |
| Poller: EventBridge publish | throw → email 不 mark SEEN | 同上 |
| Lead-processor: ③④⑤ batch | throw → SQS 重试 | 最多 3 次,之后进 DLQ(14 天保留) |
| Test Neon(两个 Lambda 都有) | catch → console.warn | 不重试,不影响 prod |
读取数据¶
| 来源 | 内容 | 备注 |
|---|---|---|
| IMAP 轮询 | Email lead 原始数据 | EventBridge 每 5 分钟触发 |
| StoresV2 DynamoDB | leadEmail → franchiseId/siteId/storeId | 冷启动 Scan,warm start 复用缓存 |
Step ①:UPSERT leads¶
Poller Lambda 执行 Drizzle upsert,PK 是 composite dedup key({email}#{phone}#{date} 或 {email}#{receivedAt})。
INSERT 时写入¶
| 字段 | 值 | 说明 |
|---|---|---|
id |
composite dedup key | PK,防 IMAP 重复轮询 |
leadEmail |
email 发件人 | |
leadType |
email 解析 | Web Lead / Online Intro 等 |
firstName |
email body 解析(可空) | |
lastName |
email body 解析(可空) | |
phone |
email body 解析(可空) | 标准化为 E.164 格式(+1XXXXXXXXXX) |
bookedDate |
email body 解析(可空) | 已预约日期 |
bookedTime |
email body 解析(可空) | |
emailSubject |
原始 subject | |
emailFrom |
发件人地址 | |
emailRecipient |
收件人地址 | |
extractedTrackingId |
从收件人地址提取 | lead-tracking+{ID}@... → 映射 site |
franchiseId |
StoresV2 → parseBrandFromFranchise() | 解析 brand 名(如 "orangeTheory"),不含 store code |
siteId |
StoresV2.providerAccountId | RC 账户 ID |
storeId |
StoresV2.storeId (UUID) | 唯一隔离键(Store-Level Isolation) |
rawBody |
完整 email body | 调试用 |
isForwarded |
Boolean,默认 false | 是否转发 email |
forwardedOriginalFrom |
原始发件人(可空) | 转发 email 时 |
forwardedOriginalTo |
原始收件人(可空) | 转发 email 时 |
forwardedOriginalDate |
原始日期(可空) | 转发 email 时 |
processedBy |
'imap-poller' |
|
receivedAt |
email 接收时间 | |
syncedAt |
defaultNow() |
ON CONFLICT (id) DO UPDATE¶
| 字段 | 值 | 说明 |
|---|---|---|
leadType |
更新 | email 可能重新分类 |
franchiseId |
更新 | StoresV2 映射可能修正 |
siteId |
更新 | 同上 |
storeId |
COALESCE(leads.store_id, EXCLUDED.store_id) |
Forward-only:不用 NULL 覆盖已有值 |
syncedAt |
NOW() |
Step ②:发布 EventBridge LeadCreated 事件¶
leads UPSERT 成功后由 poller 发布。Event payload: {Source: 'lead-tracking', DetailType: 'LeadCreated', Detail: <LeadRow JSON>}。
前置条件(全部非 null 才发布):
| 字段 | 来源 | 为什么需要 |
|---|---|---|
phone |
email 解析 | contacts PK 的一部分 |
franchiseId |
StoresV2 映射 | tasks/审计需要 |
accountId |
StoresV2 映射(RC OAuth providerAccountId) |
tasks/审计需要 |
storeId |
StoresV2 映射(UUID) | contacts PK 的一部分,store-level isolation 唯一隔离键 |
缺任一字段 → 跳过 EventBridge 发布,只写 leads 表,log [EVENTBRIDGE] Skipping — missing required fields。
CDK 路由:LeadCreatedRule-${region} Rule → SQS → Lead Processor Lambda。
Step ③:UPSERT contacts(轻量)¶
确保 contact 记录存在 + 设置初始状态。Lead Processor 在 db.batch() 内与 ④⑤ 原子执行。
INSERT 时写入¶
| 字段 | 值 | 说明 |
|---|---|---|
phone |
lead.phone | 复合 PK |
storeId |
lead.storeId | 复合 PK(Store-Level Isolation 终态) |
franchiseId |
lead.franchiseId | |
accountId |
lead.accountId | RC providerAccountId(审计字段) |
lifecycleStage |
'lead' |
标记为 lead |
leadStatus |
'new' |
新 lead 初始状态。Contact Analysis 后续覆盖 |
firstName |
lead.firstName(可空) | Email 解析出的名字 |
lastName |
lead.lastName(可空) | 同上 |
lastActivityAt |
lead.receivedAt |
用 lead 接收时间,不是 NOW()(防 Lambda 执行时间漂移) |
ON CONFLICT (phone, storeId) DO UPDATE¶
ON CONFLICT 只补写身份和 activity 字段,不写 leadStatus 和 lifecycleStage(这两个已有 contact 的字段由 Contact Analysis 管理)。
| 字段 | 值 | 说明 |
|---|---|---|
lastActivityAt |
lastActivityAtForward(lead.receivedAt) |
forward-only 语义(COALESCE(GREATEST(existing, eventTime), eventTime)),乱序 lead 不能把已有 lastActivity 倒推。共享 helper 跨 5 个 contacts writer 一致 |
firstName |
COALESCE(contacts.first_name, lead.firstName) |
只补写,不覆盖已有值 |
lastName |
COALESCE(contacts.last_name, lead.lastName) |
同上 |
updatedAt |
now(lead-processor 当前时间) |
不写的字段¶
| 字段 | 为什么不写 |
|---|---|
acquiredAt |
已移除,timeline lead.created 的 occurredAt 已记录 |
leadTemperature |
已移除,是 NOW() - 到达时间 的派生值,timeline 已记录到达时间 |
customerSummary / purchaseIntent / goals 等 AI 字段 |
Contact Analysis 负责 |
leadStatusReason |
Contact Analysis 负责 |
firstAttemptedAt / firstConnectedAt |
Per-Call Analysis 负责(通话事件,不是 lead 到达) |
Step ④:INSERT tasks¶
Lead Processor 在 db.batch() 内与 ③⑤ 原子执行,创建 1 条 high-priority lead_outreach task。
| 字段 | 值 | 说明 |
|---|---|---|
taskId |
crypto.randomUUID() |
客户端预生成 |
contactPhone |
lead.phone | 通过 buildContactPhoneIdentityFields() 写入 4-field 身份块 |
franchiseId |
lead.franchiseId | 同上 helper |
accountId |
lead.accountId | 同上 helper |
storeId |
lead.storeId(已 guard 非空) | 同上 helper |
taskType |
'lead_outreach' |
固定 |
typeCategory |
'lead_outreach' |
固定 |
sourceType |
'lead' |
|
sourceLeadId |
lead.id | 幂等 key |
status |
'pending' |
|
priority |
'high' |
固定 |
actionNeeded |
true |
|
actionNeededReason |
New ${leadType} from ${firstName \|\| 'unknown'} ${lastName \|\| ''}(trim 后) |
动态生成 |
suggestedActions |
jsonb 模板 | 1 项 high-priority action,提示员工"5 分钟黄金窗口内回拨新 lead" |
dueAt |
computeDueAt('high', now, { slaMinutes: 5 }) |
排除 quiet hours |
去重保护(ON CONFLICT DO NOTHING)¶
撞任一 index 安全跳过,不报错。
| Index | 保护场景 |
|---|---|
uq_tasks_source_lead (sourceLeadId) |
同一封 email 的 SQS 重试 |
uq_tasks_pending_contact_category (contactPhone, franchiseId, siteId, typeCategory) WHERE status='pending' |
同一 contact 已有 pending lead_outreach |
Step ⑤:INSERT contact_timeline¶
Lead Processor 在 db.batch() 内与 ③④ 原子执行。所有 timeline writer 通过共享 buildTimelineValues() helper(@retaintive/common/db)写入,统一字段约定。
| 字段 | 值 |
|---|---|
contactPhone |
lead.phone |
franchiseId |
lead.franchiseId |
accountId |
lead.accountId |
storeId |
lead.storeId |
eventType |
'lead.created' |
eventCategory |
'lead' |
entityType |
'lead' |
entityId |
lead.id |
occurredAt |
lead.receivedAt(源记录时间,非 NOW()) |
actorType |
'lead_webhook' |
newValue |
{leadType, firstName, lastName} jsonb |
idempotencyKey |
lead.created:${lead.id} |
ON CONFLICT DO NOTHING 通过 idempotencyKey 去重,SQS 重试幂等。
CDK 基础设施¶
| 资源 | 名称 | 说明 |
|---|---|---|
| Lambda | ImapPollerFunction-${region} |
IMAP 轮询 + leads 写入 + EventBridge 发布 |
| Lambda | LeadProcessorFunction-${region} |
SQS 消费 → contacts/tasks/timeline |
| EventBridge Rule | ImapPollerSchedule-${region} |
每 5 分钟触发 poller |
| EventBridge Rule | LeadCreatedRule-${region} |
source=lead-tracking, detail-type=LeadCreated → SQS |
| SQS Queue | lead-processor-queue-${region} |
visibilityTimeout=300s, retention=4d, maxRetry=3 |
| SQS DLQ | lead-processor-dlq-${region} |
retention=14d |
| DynamoDB | LeadTracking-v2-${region} |
过渡期主表,将来移除 |
Test/Prod 双写模式¶
两个 Lambda 都同时写 prod 和 test Neon。
| Lambda | Prod Neon | Test Neon |
|---|---|---|
| Poller (leads) | 阻塞(失败 → email stays UNSEEN) | fire-and-forget(失败只 log) |
| Lead Processor (③④⑤) | 阻塞(失败 → SQS retry) | fire-and-forget(失败只 log) |
SSM 参数:
| 参数 | 用途 |
|---|---|
/lead-tracking/neon-database-url |
Prod Neon 连接串 |
/lead-tracking/neon-database-url-test |
Test Neon 连接串 |