通话分析 Pipeline — 跨 Repo 全流程¶
读者: AI Agent、后端工程师 — 了解从 RingCentral 录音到 Dashboard 展示的完整数据流,以及每一步涉及哪个 repo 和哪些文件。 这是整个系统中跨 repo 依赖最多的核心流程。
涉及的 Repo¶
| Repo | 角色 |
|---|---|
ringcentralSubscriptionService |
第 1 步:接收 RC Webhook,路由到 SQS |
callytics-infrastructure |
第 2-5 步:转录 → AI 分析 → 存储 → 联系人分析 |
studio-website-monorepo |
第 6 步:API + Dashboard 展示 |
完整流程图¶
sequenceDiagram
participant RC as RingCentral
participant WH as WebhookReceiver<br/>(ringcentralSubscriptionService)
participant SQS as SQS<br/>call-analytics-{env}-transcribe-queue
participant TP as TranscribeProcessor<br/>(callytics-infrastructure)
participant S3 as S3<br/>call-analytics-{env}-recordings
participant DG as Deepgram API
participant RP as ResultProcessor<br/>(callytics-infrastructure)
participant AI as Kimi K2 / Claude
participant DDB as DynamoDB<br/>call-analysis table
participant Neon as Neon PostgreSQL<br/>calls table
participant CA as ContactsAnalyzer<br/>(callytics-infrastructure, 每日批量)
participant API as studio-api<br/>(studio-website-monorepo)
RC->>WH: Webhook 事件 (telephony callSessionId)
WH->>WH: 注入租户信息 (_client_id, _account_id)
WH->>SQS: 发送消息(5 分钟延迟,等待录音就绪)
SQS->>TP: 消费消息 (concurrency=2)
TP->>RC: 下载录音文件
TP->>S3: 保存录音 ({franchise}/{siteId}/...)
TP->>DG: 语音 → 文字(主)
Note over TP,DG: 备选:AWS Transcribe
TP->>DDB: 写入 call-events (原始事件)
TP->>SQS: 转发到 result-processor 队列
SQS->>RP: 消费转录结果 (concurrency=20)
RP->>DDB: 查询 call-analysis + LeadTracking-v2(客户历史)
RP->>AI: 注入客户历史 + 发送 Prompt
Note over RP,AI: standard-v1.0.0.txt prompt<br/>含 CUSTOMER_HISTORY 占位符
AI->>RP: 返回结构化分析结果
RP->>DDB: 写入 call-analysis(AI 分析字段)
RP->>Neon: 双写 calls 表(33 个 AI 字段,非阻塞)
Note over RP,Neon: 失败 → neon-retry-queue,最多 5 次重试
Note over CA: EventBridge 每日触发,或 SQS 即时触发
CA->>DDB: 分页读取 call-analysis + LeadTracking-v2
CA->>AI: 客户综合分析(温度评分、跟进优先级)
CA->>Neon: 更新 contacts 表
API->>DDB: 读取 call-analysis / LeadTracking-v2
API->>Neon: 读取 contacts / calls(复杂查询)
API->>RC: 实时读取 (token 从 DDB orangetheory-UserConnections)
每步关键信息¶
第 1 步:Webhook 接收(ringcentralSubscriptionService)¶
文件: src/handlers/webhook-receiver.ts
输入: RingCentral Telephony Webhook(callSessionId, accountId, extensionId)
输出: SQS 消息,附加租户字段:
{
"_client_id": "orangetheory-xxx",
"_account_id": "RC_ACCOUNT_ID",
"_user_id": "RC_USER_ID",
"callSessionId": "...",
"recordingId": "..."
}
注意: 5 分钟 SQS 延迟是有意为之——等待 RC 录音处理完成。
第 2-3 步:转录处理(callytics-infrastructure)¶
Lambda: TranscribeProcessorTS(concurrency=2,防 RC API 限流)
文件: lambda/transcribe-processor/
处理链:
1. 读 SQS 消息 → 用 _client_id 查 DDB 获取 RC token
2. 调 RC API 下载录音
3. 上传到 S3(路径:{franchise}/{siteId}/{callSessionId}.mp3)
4. 调 Deepgram 转文字
5. 结果发到下游 SQS
第 4-5 步:AI 分析(callytics-infrastructure)¶
Lambda: TranscriptionResultProcessorTS(concurrency=20)
文件: lambda/ai-analysis-processor/
客户历史注入(GAP #3 修复):
customer-history.ts 在发送 Prompt 前,先查询:
- call-analysis 表:该客户过去 30 天的通话分析结果
- LeadTracking-v2 表:该客户的 Lead 记录
组装成摘要注入 {CUSTOMER_HISTORY} 占位符。
AI 模型选择:
| 情况 | 使用模型 |
|---|---|
| 默认 | Kimi K2 |
| Kimi 不可用 | Claude(Bedrock) |
DynamoDB 写入字段(call-analysis 表关键字段):
| 字段 | 含义 |
|---|---|
callCategory |
通话分类(咨询/预约/投诉等) |
leadStatus |
AI 推断的 Lead 状态 |
purchaseIntent |
购买意向评分 |
coachingNotes |
给员工的 coaching 建议 |
followUpRequired |
是否需要跟进 |
outcome |
通话结果(成交/未成交等) |
第 6 步:Dashboard 展示(studio-website-monorepo)¶
文件: apps/api/src/routes/, apps/web/src/pages/
数据来源:
- DynamoDB call-analysis:通话列表、分析详情
- DynamoDB LeadTracking-v2:Lead 追踪
- Neon contacts:客户档案(aggregated)
- RingCentral API(实时):部分页面直接调 RC
改动影响速查¶
| 要改什么 | 需要动哪个 repo | 风险 |
|---|---|---|
| Webhook 格式 / 路由逻辑 | ringcentralSubscriptionService |
影响所有后续处理 |
| 转录参数(Deepgram 配置) | callytics-infrastructure/lambda/transcribe-processor/ |
影响转录质量 |
| AI Prompt / 分析逻辑 | callytics-infrastructure/lambda/ai-analysis-processor/ |
影响所有 AI 分析字段 |
| AI Prompt 文件(无需部署) | callytics-infrastructure/config/schemas/orangeTheory/prompts/ |
改完即生效,无需 CDK deploy |
| call-analysis DDB schema | callytics-infrastructure/lib/stacks/ |
⚠️ 表变更需要 migration |
| Dashboard UI | studio-website-monorepo/apps/web/ |
仅前端影响 |
| API 响应格式 | studio-website-monorepo/apps/api/ |
影响前端 |
| 联系人分析逻辑 | callytics-infrastructure/lambda/contacts-analyzer/ |
影响客户档案每日更新 |