跳转至

系统架构总览

1. 架构图

系统架构图

源文件:backend-data-pipeline.drawio(用 draw.io 打开编辑)

1.1 图例

节点颜色

属性 颜色 对应组件
外部服务 蓝色 RingCentral API、Lark Suite / Gmail、Deepgram、Amazon Bedrock
自有代码(部署在 AWS) 橙色 + 绿色粗边框 所有 Lambda 函数、Prompt 配置文件 —— 代码存放在 GitHub,通过 CDK 部署到 AWS 运行
自有代码(未部署到 AWS) 橙色(无绿边) studio-website-monorepo 的 apps/web(部署在 Cloudflare)
共享库 灰色 callytics-common
AWS 托管服务 绿色 / 青色 DynamoDB(绿色圆柱)、SQS(绿色长方体)、S3(青色圆柱)
文档 蓝灰色 docs

连线颜色

数据流 颜色 路径
通话语音流 浅蓝色粗实线 RingCentral → Webhook接收 → SQS队列 → 下载录音 → S3
AI 分析流 深蓝色粗实线 Deepgram转录 → SQS → 结果处理 → Bedrock AI分析 → DynamoDB
修复管道 紫色线 每日凌晨检测遗漏通话 → 修复任务队列 → 修复Worker → 注入回主队列重新处理
SMS 数据流 品红色粗实线 RingCentral → apps/api → SMS缓存
线索数据流 绿色粗实线 邮箱 → lead-tracking → 线索数据库
应用层查询 黑色粗实线 数据库/S3 → apps/api → apps/web
依赖/配置 灰色虚线 服务间引用、OAuth认证、Prompt配置、Lambda调用

2. 数据线路

系统当前有 5 条活跃数据线路 + 1 条观测线。早期设计图例中的 "线路五"(neon-sync CDC)和 "线路六"(Entity Routing fan-out)从未实现,见 §2.5 / §2.6 备注。

架构方向(2026-04-23 复核)

Neon PostgreSQL 已从 "CQRS 读层" 升级为 contacts / messages / leads 的 primary SoT。DynamoDB 继续承载 call-analysis,并将逐步退役 message-system 孤儿表。详见 callytics-infrastructure CLAUDE.md "数据库" 演进表。

2.1 线路一:通话分析(主动推送) 浅蓝色 + 深蓝色

对应图例:通话语音流 浅蓝色)+ AI 分析流 深蓝色)

客户打电话 → RingCentral 记录 → Webhook 通知
→ ringcentralSubscriptionService 贴标签
→ SQS transcribe-queue(等5分钟)
→ TranscribeProcessor:下载录音 → S3 → Deepgram / AWS Transcribe
→ S3 Object Created → EventBridge TranscriptionResultRule → SQS ai-analysis-queue
→ ai-analysis-processor:Grok 4.1 via OneRouter (test/prod) 或 Bedrock (pre + code 默认)
→ 原子 db.batch() 双写:DynamoDB call-analysis + Neon calls / contacts / contact_timeline(每条 row 都带 `store_id` + `account_id`,infra #642 / #647 已 ship)
→ SQS fan-out 到 dailyBatchQueue.fifo → ContactsAnalyzer(per-call 触发,batchSize=1 / maxConcurrency=2 / reportBatchItemFailures=true)做 contact-level AI 分析(任务生成、lead outcome)
→ apps/api 查询 DDB + Neon → apps/web 展示

SQS 队列的完整命名

本 doc 使用简称(transcribe-queue / ai-analysis-queue 等)方便阅读。CDK 实际命名遵循 {env-prefix}-{queue-name}-{region} 模板,例如 call-analytics-test-ai-analysis-queue-us-west-2。详见 lib/stacks/sqs-stack.ts

per-client AI provider 覆盖

ai-client-factory.ts 的 provider 选择顺序 = configProvider || envProvider || 'bedrock'。env var USE_ONEROUTER='true' 仅在 test / prod 环境 set(lib/stacks/lambda-stack.ts:222-224);每个 client 的 DDB config ai_provider 可以 override。Bedrock 是 code 默认兜底。

CDK 内部变量命名 — resultProcessor = ai-analysis-processor

lib/stacks/lambda-stack.ts:729addEventSourcetranscriptionResultQueue 接到 CDK 变量 resultProcessor 上。这是 legacy 命名 —— resultProcessor 就是 ai-analysis-processor(entry: lambda/ai-analysis-processor/src/handler.ts),不要被变量名误导。SQS → Lambda 绑定的 SoT:addEventSource 第 718 行(callLogQueue → transcribe-processor)+ 729 行(transcriptionResultQueue → ai-analysis-processor / resultProcessor)。

4 个 Lambda writer 全部写 Neon store_id(2026-04-26 verified)

Pipeline 落库的 4 个 writer 已统一带 store_id(infra #642 / #647 shipped): - transcribe-processor — 写 calls 基础字段 + store_id - ai-analysis-processor — 写 calls AI 字段 + contacts + contact_timeline,全部带 store_id;同时 dual-write DDB call-analysis(legacy 兼容) - message-processor — 写 messages + contacts + contact_timeline,全部带 store_id - contacts-analyzer — 写 contacts + tasks,全部带 store_id;同时 dual-write DDB(legacy)

Lead pipeline(lead-tracking repo)也已写 leads.store_id(lead-tracking #88)。

2.2 线路二:修复管道(定时检测,当前 DISABLED) 紫色

对应图例:修复管道 紫色)

当前状态:EventBridge 调度 state=DISABLED(所有环境)

lib/stacks/eventbridge-stack.ts:100 显式 state: 'DISABLED',注释 "Disabled for all environments - enable manually via AWS Console when needed"。频率配置是 每 3 小时(cron 0 */3 * * ? *),不是 "每日凌晨"。需要修复时必须手动 Console 触发或改 state。

当 ENABLED 时:

每 3 小时触发 → reconciliationOrchestrator
→ 扫描 DynamoDB call-analysis 找遗漏/失败的通话记录
→ 生成修复任务 → SQS reconciliation-queue
→ reconciliationWorker 逐条处理
→ 注入回 transcribe-queue(线路一)重新转录 + AI 分析
→ 补全 DynamoDB + Neon 中缺失的分析结果

2.3 线路三:线索采集(定时轮询) 绿色

对应图例:线索数据流 绿色)

潜在客户在健身房网站填表
→ 系统发通知邮件到指定邮箱
→ lead-tracking 每5分钟轮询邮箱(IMAP)
→ 解析邮件提取:姓名、电话、邮箱、预约时间
→ DynamoDB LeadTracking-v2 表存储
→ apps/api 按电话号码关联通话记录
→ apps/web 展示线索跟踪

2.4 线路四:SMS 短信(Webhook 持久化,Neon-primary) 品红色

对应图例:SMS 数据流 品红色)

客户发短信到 RingCentral 号码
→ RingCentral Webhook → ringcentralSubscriptionService(external repo)→ SQS message-processing-queue
→ message-processor Lambda
→ 从 RC API fetch message + S3 保存 MMS 附件
→ 原子 db.batch() 写入 Neon PostgreSQL:messages + contacts + contact_timeline(per-entry atomic)
→ apps/api 通过 SQL 查询 Neon
→ apps/web 展示(聊天气泡、已读/未读、MMS图片)

MessageStore / Conversations DDB 是孤儿表

lib/constructs/message-system/data-store.ts:34,74 仍 CDK 配置这两张 DDB 表,但 message-processor/src/handler.ts:6 明确声明 "No DynamoDB dependency — Neon is the sole persistence target for messages"。2026-04-23 verify:两张表 7 天 WCU=0,全工作区无写入者,apps/api /v2/messages 直接 hit RC API 不读这两张表。属于 CDK-provisioned zombie,团队需决定删除或保留(用于 PITR 查历史 stale 数据)。

2.5 线路五:Contacts → Neon 同步(neon-sync) — 从未实现

本线路从未落地;neon-sync 是 dead code

旧设计用 DDB Streams → neon-sync Lambda 把 Contacts 同步到 Neon。实际:

  • lambda/neon-sync/ 源码存在于 workspace,但没有任何 CDK stack 引用(grep neon-sync lib/ 返回空)
  • AWS 任何 region 没有 live Lambda 叫 *neon-sync*
  • 没有 DDB 表启用 Streams
  • pr-validation.yml:78,89 仍在 CI build + test 这坨 dead code(追踪 issue:infra#657

实际的 Contacts 写入路径ai-analysis-processormessage-processor 直接原子 db.batch() 写 Neon contacts + contact_timeline(无 intermediary queue / Stream)。详见 §2.1 + §2.4。设计依据:lib/stacks/contacts-stack.ts:28 — "Architecture: direct write — contactsAnalyzer reads Neon tables and writes AI results directly back to Neon contacts. No DDB Streams or intermediary queue needed."

2.6 线路六:Entity Routing(stream-router fan-out) — 从未实现

本线路从未落地;3 个 Lambda 都不存在

旧设计用 stream-router / contacts-updater / lead-outcome-mapper 3 个 Lambda 从 DDB Streams fan-out 路由。实际:

  • grep "stream-router\|contacts-updater\|lead-outcome-mapper" lib/ lambda/ 返回空
  • lambda/ 目录中没有任何对应子目录

实际的 fan-out 路径ai-analysis-processor 写完 DDB + Neon 后,emit SQS message 到 dailyBatchQueue(FIFO)→ ContactsAnalyzer 消费做 per-call contact-level AI 分析 + task 决策。Lead outcome 更新目前通过 lead-tracking Lambda 内部处理,不经 fan-out Lambda 链。

2.7 线路七:观测线(storeId 覆盖率监控)

2026-04-22 新增(PR #651)。取代 CloudWatch custom metric + SNS 模式(无订阅者黑洞),改走 logger.error 直通 Discord/Lark/Sentry。

部署状态:目前仅 test 环境

2026-04-23 aws lambda list-functions 确认:Lambda call-analytics-test-storeid-coverage-monitor-us-west-2 已 deploy(LastModified 2026-04-22)。pre / prod 尚未部署。Rollout 到 prod 需等 test 运行稳定后推进(参考 PR #651 rollout notes + PR #652 review round-2 fixes)。

每小时定时 rate(1 hour) → storeid-coverage-monitor Lambda(test us-west-2)
→ 单次 UNION ALL 查询 Neon 6 张表的 NULL storeId 写入率(过去 1 小时窗口):
  calls / messages / contact_timeline / contacts / tasks / leads
→ 任意表超阈值(>2% NULL 且 total >= 20)触发 logger.error(...)
→ shared/utils/logger.ts fan-out → Discord webhook + Lark webhook + Sentry captureException

设计原理:优先复用现有 logger.error 通知通道而不是重建 CloudWatch alarm → SNS → email,成本从 ~\$3.70/月 降到 \$0/月(无新订阅者基础设施)。MINVOLUMEFLOOR=20 过滤低样本误报(例:leads 在 test 因 lead-tracking 未 deploy 写入 ~0/小时,永远低于 floor 不会 alert)。详见 callytics-infrastructure CLAUDE.md "新增 periodic observability Lambda 的 7 步 checklist"。

2.8 Daily 分析报告(Python Lambda)

每日 America/New_York 7:00 AM(EST/EDT 自动切换):

EventBridge analytics-schedule(state=ENABLED)
→ AnalyticsGenerator Lambda(Python 3.13)
→ 汇总前一日 call-analysis 数据
→ 生成 PDF/HTML 报告
→ 发送邮件到指定收件人

force-refresh Python 3.12 Lambda 为手动触发类,不在定时管道中。)

六条活跃线路在 studio-monorepo 汇合——后端通过电话号码把"谁打了电话"、"谁是网站来的线索"、"谁发过短信"关联起来。Neon 是 contacts / messages / leads 的 primary SoT,DynamoDB 承载 call-analysis,studio-api 混合 SQL + DDB 查询以支撑前端排序/筛选/搜索。

3. 仓库说明

仓库 角色 说明
studio-website-monorepo 前端 + 后端 Turborepo monorepo——apps/web 是用户看到的网页,apps/api 是后端服务,负责查询数据库并关联通话与线索
callytics-infrastructure AI 工厂 录音进去,分析结果出来。Prompt 文件在这里驱动 AI 分析
ringcentralSubscriptionService 接线员 接收 RingCentral 电话通知,贴标签后丢进队列
lead-tracking 线索采集员 每5分钟轮询邮箱,解析潜在客户表单邮件,提取姓名/电话/预约信息存入数据库
callytics-common 公共工具箱 错误处理、日志、重试等公共能力,被后端和 AI 工厂共同依赖
docs 内部知识库 产品设计、架构文档、业务背景知识

4. 技术栈总览

层级 技术
前端 Vue 3 + Vite + Tailwind CSS + shadcn-vue + ECharts
前端部署 Cloudflare Pages
后端 Hono + TypeScript (Lambda Node.js 20) + Python 3.12/3.13(AnalyticsGenerator / ForceRefresh)
数据库 DynamoDB(call-analysis 主库 + 10+ 张其他表)+ Neon PostgreSQL(contacts / messages / leads 的 primary SoT)
存储 S3(录音 + 配置 + MMS 附件)
消息队列 SQS(含 FIFO daily-batch-queue + 5+ 个 standard queue;全部带 DLQ)
AI 分析 Grok 4.1 via OneRouter(test/prod 启用)+ AWS Bedrock(pre 环境 + code 默认兜底;per-client DDB config 可 override)
语音转录 Deepgram(primary) + AWS Transcribe(fallback)
邮件采集 IMAP (Lark Suite) + Google Apps Script
认证 AWS Cognito + RingCentral OAuth
基础设施 AWS CDK (TypeScript),多 region:test=us-west-2 / pre=us-east-2 / prod=us-east-1
监控 CloudWatch + Sentry + Discord/飞书通知(统一走 logger.error fan-out)

5. 外部依赖

服务 用途
RingCentral 电话数据来源(通话记录、录音、SMS)
AWS 全套云服务(Lambda、DynamoDB、S3、SQS、Cognito、Bedrock)
Deepgram 语音转文字
Lark Suite 邮箱服务(线索邮件接收)
Cloudflare 前端托管 + 文档托管
Sentry 错误追踪
Discord / 飞书 告警通知