跳转至

通话分析 Pipeline — 跨 Repo 全流程

读者: AI Agent、后端工程师 — 了解从 RingCentral 录音到 Dashboard 展示的完整数据流,以及每一步涉及哪个 repo 和哪些文件。 这是整个系统中跨 repo 依赖最多的核心流程。


涉及的 Repo

Repo 角色
ringcentralSubscriptionService 第 1 步:接收 RC Webhook,路由到 SQS
callytics-infrastructure 第 2-5 步:转录 → AI 分析 → 存储 → 联系人分析
studio-website-monorepo 第 6 步:API + Dashboard 展示

完整流程图

sequenceDiagram
    participant RC as RingCentral
    participant WH as WebhookReceiver<br/>(ringcentralSubscriptionService)
    participant SQS as SQS<br/>call-analytics-{env}-transcribe-queue
    participant TP as TranscribeProcessor<br/>(callytics-infrastructure)
    participant S3 as S3<br/>call-analytics-{env}-recordings
    participant DG as Deepgram API
    participant RP as ResultProcessor<br/>(callytics-infrastructure)
    participant AI as Kimi K2 / Claude
    participant DDB as DynamoDB<br/>call-analysis table
    participant Neon as Neon PostgreSQL<br/>calls table
    participant CA as ContactsAnalyzer<br/>(callytics-infrastructure, 每日批量)
    participant API as studio-api<br/>(studio-website-monorepo)

    RC->>WH: Webhook 事件 (telephony callSessionId)
    WH->>WH: 注入租户信息 (_client_id, _account_id)
    WH->>SQS: 发送消息(5 分钟延迟,等待录音就绪)

    SQS->>TP: 消费消息 (concurrency=2)
    TP->>RC: 下载录音文件
    TP->>S3: 保存录音 ({franchise}/{siteId}/...)
    TP->>DG: 语音 → 文字(主)
    Note over TP,DG: 备选:AWS Transcribe
    TP->>DDB: 写入 call-events (原始事件)
    TP->>SQS: 转发到 result-processor 队列

    SQS->>RP: 消费转录结果 (concurrency=20)
    RP->>DDB: 查询 call-analysis + LeadTracking-v2(客户历史)
    RP->>AI: 注入客户历史 + 发送 Prompt
    Note over RP,AI: standard-v1.0.0.txt prompt<br/>含 CUSTOMER_HISTORY 占位符
    AI->>RP: 返回结构化分析结果
    RP->>DDB: 写入 call-analysis(AI 分析字段)
    RP->>Neon: 双写 calls 表(33 个 AI 字段,非阻塞)
    Note over RP,Neon: 失败 → neon-retry-queue,最多 5 次重试

    Note over CA: EventBridge 每日触发,或 SQS 即时触发
    CA->>DDB: 分页读取 call-analysis + LeadTracking-v2
    CA->>AI: 客户综合分析(温度评分、跟进优先级)
    CA->>Neon: 更新 contacts 表

    API->>DDB: 读取 call-analysis / LeadTracking-v2
    API->>Neon: 读取 contacts / calls(复杂查询)
    API->>RC: 实时读取 (token 从 DDB orangetheory-UserConnections)

每步关键信息

第 1 步:Webhook 接收(ringcentralSubscriptionService)

文件: src/handlers/webhook-receiver.ts

输入: RingCentral Telephony Webhook(callSessionId, accountId, extensionId)

输出: SQS 消息,附加租户字段:

{
  "_client_id": "orangetheory-xxx",
  "_account_id": "RC_ACCOUNT_ID",
  "_user_id": "RC_USER_ID",
  "callSessionId": "...",
  "recordingId": "..."
}

注意: 5 分钟 SQS 延迟是有意为之——等待 RC 录音处理完成。


第 2-3 步:转录处理(callytics-infrastructure)

Lambda: TranscribeProcessorTS(concurrency=2,防 RC API 限流)

文件: lambda/transcribe-processor/

处理链: 1. 读 SQS 消息 → 用 _client_id 查 DDB 获取 RC token 2. 调 RC API 下载录音 3. 上传到 S3(路径:{franchise}/{siteId}/{callSessionId}.mp3) 4. 调 Deepgram 转文字 5. 结果发到下游 SQS


第 4-5 步:AI 分析(callytics-infrastructure)

Lambda: TranscriptionResultProcessorTS(concurrency=20)

文件: lambda/ai-analysis-processor/

客户历史注入(GAP #3 修复):

customer-history.ts 在发送 Prompt 前,先查询: - call-analysis 表:该客户过去 30 天的通话分析结果 - LeadTracking-v2 表:该客户的 Lead 记录

组装成摘要注入 {CUSTOMER_HISTORY} 占位符。

AI 模型选择:

情况 使用模型
默认 Kimi K2
Kimi 不可用 Claude(Bedrock)

DynamoDB 写入字段(call-analysis 表关键字段):

字段 含义
callCategory 通话分类(咨询/预约/投诉等)
leadStatus AI 推断的 Lead 状态
purchaseIntent 购买意向评分
coachingNotes 给员工的 coaching 建议
followUpRequired 是否需要跟进
outcome 通话结果(成交/未成交等)

第 6 步:Dashboard 展示(studio-website-monorepo)

文件: apps/api/src/routes/, apps/web/src/pages/

数据来源: - DynamoDB call-analysis:通话列表、分析详情 - DynamoDB LeadTracking-v2:Lead 追踪 - Neon contacts:客户档案(aggregated) - RingCentral API(实时):部分页面直接调 RC


改动影响速查

要改什么 需要动哪个 repo 风险
Webhook 格式 / 路由逻辑 ringcentralSubscriptionService 影响所有后续处理
转录参数(Deepgram 配置) callytics-infrastructure/lambda/transcribe-processor/ 影响转录质量
AI Prompt / 分析逻辑 callytics-infrastructure/lambda/ai-analysis-processor/ 影响所有 AI 分析字段
AI Prompt 文件(无需部署) callytics-infrastructure/config/schemas/orangeTheory/prompts/ 改完即生效,无需 CDK deploy
call-analysis DDB schema callytics-infrastructure/lib/stacks/ ⚠️ 表变更需要 migration
Dashboard UI studio-website-monorepo/apps/web/ 仅前端影响
API 响应格式 studio-website-monorepo/apps/api/ 影响前端
联系人分析逻辑 callytics-infrastructure/lambda/contacts-analyzer/ 影响客户档案每日更新

相关文档