跳转至

Phase 1: Data Pipeline — 从信息孤岛到数据闭环

状态:⚠️ Code Complete — 代码已写入各仓库 feature branch,待 code review、merge 和部署 Feature Brancheslead-tracking/feat/phase1-e164-phone-dedup · callytics-infrastructure/feat/phase1-customer-history-outcome · studio-api/feat/phase1-lead-outcome-types

For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

Goal: 打通 lead-tracking、callytics-infrastructure、studio-api 三个系统的数据链路,实现 Phone 格式统一、CUSTOMER_HISTORY 注入、leadId 持久化关联、AI outcome 自动回写。

Architecture: 不重写任何系统,只在现有架构上添加连接线。lead-tracking 统一为 E.164 phone 格式并增加 phone 去重;callytics ai-analysis-processor Lambda 在构建 prompt 时查询 LeadTracking 表和历史通话,注入 CUSTOMER_HISTORY;分析完成后保存 leadId 关联;DynamoDB Streams 触发新 Lambda 将 AI outcome 自动映射回 LeadTracking。

Tech Stack: TypeScript, AWS CDK, DynamoDB, Lambda (Node.js 20/22), SQS, DynamoDB Streams, Bedrock

Repos: - lead-tracking/Users/maxwsy/workspace/lead-tracking - callytics-infrastructure/Users/maxwsy/workspace/callytics-infrastructure - studio-api/Users/maxwsy/workspace/studio-api


1. Task 1:统一 Phone 格式为 E.164 (lead-tracking)

Context: 当前 normalizePhone() 将 phone 存为 10 位数字(7328561597),但 call-analysis 存为 E.164(+17328561597)。这导致两个表无法可靠匹配。studio-api 的 call-history.ts 已经用 normalizeToE164() 做运行时转换,但 lead-tracking 存储源头需要统一。

Files: - Modify: /Users/maxwsy/workspace/lead-tracking/src/poller.ts:65-72 - Test: /Users/maxwsy/workspace/lead-tracking/test/handler.test.ts

Step 1: 更新 normalizePhone 的测试

/Users/maxwsy/workspace/lead-tracking/test/handler.test.ts 中,找到现有的 normalizePhone 测试并更新期望值为 E.164 格式:

describe('normalizePhone', () => {
  it('should return E.164 for 10-digit US number', () => {
    expect(normalizePhone('7328561597')).toBe('+17328561597');
  });

  it('should return E.164 for 11-digit US number with leading 1', () => {
    expect(normalizePhone('17328561597')).toBe('+17328561597');
  });

  it('should return E.164 for number with formatting', () => {
    expect(normalizePhone('(732) 856-1597')).toBe('+17328561597');
  });

  it('should return E.164 for number with dashes', () => {
    expect(normalizePhone('732-856-1597')).toBe('+17328561597');
  });

  it('should return E.164 for number with +1 prefix', () => {
    expect(normalizePhone('+17328561597')).toBe('+17328561597');
  });

  it('should return empty string for undefined', () => {
    expect(normalizePhone(undefined)).toBe('');
  });

  it('should return empty string for empty string', () => {
    expect(normalizePhone('')).toBe('');
  });

  it('should return empty string for non-US number (not 10 or 11 digits)', () => {
    expect(normalizePhone('123')).toBe('');
  });
});

Step 2: 运行测试,确认 FAIL

cd /Users/maxwsy/workspace/lead-tracking && npx jest test/handler.test.ts --testNamePattern="normalizePhone" -v

Expected: FAIL — 现有 normalizePhone 返回 7328561597(10 位),但测试期望 +17328561597

Step 3: 修改 normalizePhone() 实现

修改 /Users/maxwsy/workspace/lead-tracking/src/poller.ts:65-72

function normalizePhone(phone: string | undefined): string {
  if (!phone) return '';
  const digits = phone.replace(/\D/g, '');
  if (digits.length === 10) {
    return `+1${digits}`;
  }
  if (digits.length === 11 && digits.startsWith('1')) {
    return `+${digits}`;
  }
  // Non-US or invalid — return empty to avoid bad data in GSI
  return '';
}

逻辑变更: - 10 位 → 加 +1 前缀变成 E.164 - 11 位且以 1 开头 → 加 + 前缀 - 其他长度 → 返回空串(之前会原样返回,可能存入无效数据)

Step 4: 运行测试,确认 PASS

cd /Users/maxwsy/workspace/lead-tracking && npx jest test/handler.test.ts --testNamePattern="normalizePhone" -v

Expected: PASS

Step 5: 运行全部测试,确认无回归

cd /Users/maxwsy/workspace/lead-tracking && npx jest -v

Expected: 全部 PASS

Step 6: Commit

cd /Users/maxwsy/workspace/lead-tracking
git add src/poller.ts test/handler.test.ts
git commit -m "feat: normalize phone to E.164 format for cross-system consistency"

2. Task 2:Lead Phone 去重 (lead-tracking)

Context: 当前 dedup 基于 email#timestamppoller.ts:159),同一个人用不同邮箱提交会创建多条 Lead。Phone GSI(phone-receivedAt-index)已存在,可以用来查重。

Files: - Modify: /Users/maxwsy/workspace/lead-tracking/src/poller.ts:159 (dedup logic) - Modify: /Users/maxwsy/workspace/lead-tracking/src/poller.ts:270-290 (save logic) - Test: /Users/maxwsy/workspace/lead-tracking/test/handler.test.ts

Step 1: 写 phone dedup 辅助函数的测试

在 test 文件中新增:

describe('findExistingLeadByPhone', () => {
  it('should return null when phone is empty', async () => {
    const result = await findExistingLeadByPhone('', 'table-name');
    expect(result).toBeNull();
  });

  // Integration tests would mock DynamoDB — see Step 3
});

Step 2: 实现 phone dedup 查询函数

/Users/maxwsy/workspace/lead-tracking/src/poller.ts 中,在 normalizePhone 函数后新增:

async function findExistingLeadByPhone(
  phone: string,
  tableName: string
): Promise<string | null> {
  if (!phone) return null;

  const result = await docClient.send(
    new QueryCommand({
      TableName: tableName,
      IndexName: 'phone-receivedAt-index',
      KeyConditionExpression: 'phone = :phone',
      ExpressionAttributeValues: { ':phone': phone },
      ScanIndexForward: false, // newest first
      Limit: 1,
    })
  );

  if (result.Items && result.Items.length > 0) {
    return result.Items[0].id as string;
  }
  return null;
}

Step 3: 修改保存逻辑支持合并

poller.ts 的保存逻辑处(约第 270 行附近,PutCommand 之前),添加 phone 查重:

// Before PutCommand — check for existing lead with same phone
const existingLeadId = await findExistingLeadByPhone(
  item.phone as string || '',
  TABLE_NAME
);

if (existingLeadId && existingLeadId !== item.id) {
  // Merge: update existing lead's receivedAt if this one is newer
  await docClient.send(
    new UpdateCommand({
      TableName: TABLE_NAME,
      Key: { id: existingLeadId },
      UpdateExpression: 'SET lastDuplicateAt = :now, duplicateCount = if_not_exists(duplicateCount, :zero) + :one',
      ExpressionAttributeValues: {
        ':now': new Date().toISOString(),
        ':zero': 0,
        ':one': 1,
      },
    })
  );
  logger.info(`Merged duplicate lead by phone: ${item.phone} → existing ${existingLeadId}`);
  continue; // Skip creating new record
}

// Existing PutCommand with ConditionExpression stays as-is

Step 4: 运行测试

cd /Users/maxwsy/workspace/lead-tracking && npx jest -v

Expected: PASS

Step 5: Commit

cd /Users/maxwsy/workspace/lead-tracking
git add src/poller.ts test/handler.test.ts
git commit -m "feat: dedup leads by phone number using phone-receivedAt GSI"

3. Task 3:CUSTOMER_HISTORY 注入 (callytics-infrastructure)

Context: 这是分析文档中标注的 "最大的已有但未实现的功能"。standard-v1.0.0.txt:38-47{CUSTOMER_HISTORY_WILL_BE_INJECTED_HERE} 占位符,prompt 描述了如何使用历史数据,但 config-repository.ts:508-542injectBusinessInfo() 只注入了 STAFFLIST 和 PRICING,完全没处理 CUSTOMERHISTORY。

3.1 Task 3A:添加 LeadTracking 表读取权限 (CDK)

Files: - Modify: /Users/maxwsy/workspace/callytics-infrastructure/lib/stacks/lambda-stack.ts:134-191 - Modify: /Users/maxwsy/workspace/callytics-infrastructure/lib/stacks/iam-stack.ts (add read policy)

Step 1: 在 Lambda 环境变量中添加 LEADTRACKINGTABLE

/Users/maxwsy/workspace/callytics-infrastructure/lib/stacks/lambda-stack.tsresultProcessor Lambda 定义中(environment 对象内),添加:

LEAD_TRACKING_TABLE: `LeadTracking-v2-${envConfig.region}`,
LEAD_TRACKING_REGION: envConfig.region,

Step 2: 添加 IAM 读取权限

在 IAM stack 中,给 resultProcessorRole 添加 LeadTracking 表的读权限:

// Add to resultProcessorRole policies
new iam.PolicyStatement({
  effect: iam.Effect.ALLOW,
  actions: [
    'dynamodb:Query',
    'dynamodb:GetItem',
  ],
  resources: [
    `arn:aws:dynamodb:${envConfig.region}:${this.account}:table/LeadTracking-v2-${envConfig.region}`,
    `arn:aws:dynamodb:${envConfig.region}:${this.account}:table/LeadTracking-v2-${envConfig.region}/index/*`,
  ],
})

Step 3: Commit CDK 变更

cd /Users/maxwsy/workspace/callytics-infrastructure
git add lib/stacks/lambda-stack.ts lib/stacks/iam-stack.ts
git commit -m "feat: grant ai-analysis-processor read access to LeadTracking table"

3.2 Task 3B:实现 CUSTOMER_HISTORY 查询和格式化

Files: - Create: /Users/maxwsy/workspace/callytics-infrastructure/lambda/ai-analysis-processor/src/infrastructure/customer-history.ts - Test: /Users/maxwsy/workspace/callytics-infrastructure/lambda/ai-analysis-processor/src/__tests__/customer-history.test.ts

Step 1: 写 formatCustomerHistory 的测试

// customer-history.test.ts
import { describe, it, expect } from 'vitest';
import { formatCustomerHistory } from '../infrastructure/customer-history';

describe('formatCustomerHistory', () => {
  it('should return "No previous history" when no calls and no lead', () => {
    const result = formatCustomerHistory([], null);
    expect(result).toBe('No previous call history available for this phone number.');
  });

  it('should format lead info and call history', () => {
    const calls = [
      {
        callStartTime: '2026-02-11T10:00:00Z',
        primary_category: 'intro_booking',
        primary_outcome: 'success',
        executive_summary: 'Customer booked intro class',
      },
      {
        callStartTime: '2026-02-08T14:30:00Z',
        primary_category: 'membership_purchase_related',
        primary_outcome: 'no_resolution',
        executive_summary: 'Price inquiry, did not commit',
      },
    ];
    const lead = {
      firstName: 'Jane',
      lastName: 'Smith',
      receivedAt: '2026-02-07T09:00:00Z',
      leadType: 'Web Lead',
      outcome: null,
    };

    const result = formatCustomerHistory(calls, lead);

    expect(result).toContain('Jane Smith');
    expect(result).toContain('Web Lead');
    expect(result).toContain('2026-02-07');
    expect(result).toContain('intro_booking');
    expect(result).toContain('success');
    expect(result).toContain('Customer booked intro class');
    expect(result).toContain('Price inquiry');
  });

  it('should limit to 3 most recent calls', () => {
    const calls = Array.from({ length: 5 }, (_, i) => ({
      callStartTime: `2026-02-${10 - i}T10:00:00Z`,
      primary_category: 'service',
      primary_outcome: 'resolved',
      executive_summary: `Call ${i + 1}`,
    }));

    const result = formatCustomerHistory(calls, null);

    // Should only include 3 most recent
    expect(result).toContain('Call 1');
    expect(result).toContain('Call 2');
    expect(result).toContain('Call 3');
    expect(result).not.toContain('Call 4');
  });
});

Step 2: 运行测试,确认 FAIL

cd /Users/maxwsy/workspace/callytics-infrastructure/lambda/ai-analysis-processor && npx vitest run src/__tests__/customer-history.test.ts

Expected: FAIL — module not found

Step 3: 实现 customer-history.ts

// /Users/maxwsy/workspace/callytics-infrastructure/lambda/ai-analysis-processor/src/infrastructure/customer-history.ts

import { DynamoDBDocumentClient, QueryCommand } from '@aws-sdk/lib-dynamodb';

export interface CallHistoryRecord {
  callStartTime: string;
  primary_category?: string;
  primary_outcome?: string;
  executive_summary?: string;
  follow_up_needed?: string;
}

export interface LeadRecord {
  firstName?: string;
  lastName?: string;
  receivedAt?: string;
  leadType?: string;
  outcome?: string;
  bookedDate?: string;
}

const MAX_HISTORY_CALLS = 3;

/**
 * Query recent calls for a phone number from call-analysis table.
 */
export async function queryRecentCalls(
  docClient: DynamoDBDocumentClient,
  analysisTable: string,
  phone: string,
  direction: 'from' | 'to'
): Promise<CallHistoryRecord[]> {
  if (!phone) return [];

  const indexName = direction === 'from'
    ? 'fromPhoneNumber-callStartTime-index'
    : 'toPhoneNumber-callStartTime-index';
  const keyName = direction === 'from' ? 'fromPhoneNumber' : 'toPhoneNumber';

  const result = await docClient.send(
    new QueryCommand({
      TableName: analysisTable,
      IndexName: indexName,
      KeyConditionExpression: `${keyName} = :phone`,
      ExpressionAttributeValues: { ':phone': phone },
      ScanIndexForward: false, // newest first
      Limit: MAX_HISTORY_CALLS,
      ProjectionExpression: 'callStartTime, primary_category, primary_outcome, executive_summary, follow_up_needed',
    })
  );

  return (result.Items || []) as CallHistoryRecord[];
}

/**
 * Query lead info by phone from LeadTracking table.
 */
export async function queryLeadByPhone(
  docClient: DynamoDBDocumentClient,
  leadTable: string,
  phone: string
): Promise<LeadRecord | null> {
  if (!phone) return null;

  const result = await docClient.send(
    new QueryCommand({
      TableName: leadTable,
      IndexName: 'phone-receivedAt-index',
      KeyConditionExpression: 'phone = :phone',
      ExpressionAttributeValues: { ':phone': phone },
      ScanIndexForward: false,
      Limit: 1,
      ProjectionExpression: 'firstName, lastName, receivedAt, leadType, outcome, bookedDate',
    })
  );

  if (result.Items && result.Items.length > 0) {
    return result.Items[0] as LeadRecord;
  }
  return null;
}

/**
 * Format customer history for prompt injection.
 */
export function formatCustomerHistory(
  calls: CallHistoryRecord[],
  lead: LeadRecord | null
): string {
  if (calls.length === 0 && !lead) {
    return 'No previous call history available for this phone number.';
  }

  const parts: string[] = [];

  // Lead info section
  if (lead) {
    const name = [lead.firstName, lead.lastName].filter(Boolean).join(' ') || 'Unknown';
    parts.push(`**Lead Information:**`);
    parts.push(`- Name: ${name}`);
    parts.push(`- Lead Type: ${lead.leadType || 'Unknown'}`);
    parts.push(`- Received: ${lead.receivedAt?.split('T')[0] || 'Unknown'}`);
    if (lead.bookedDate) {
      parts.push(`- Booked Date: ${lead.bookedDate}`);
    }
    if (lead.outcome) {
      parts.push(`- Current Outcome: ${lead.outcome}`);
    }
    parts.push('');
  }

  // Recent calls section (max 3)
  const recentCalls = calls
    .sort((a, b) => new Date(b.callStartTime).getTime() - new Date(a.callStartTime).getTime())
    .slice(0, MAX_HISTORY_CALLS);

  if (recentCalls.length > 0) {
    parts.push(`**Previous Calls (${recentCalls.length} most recent):**`);
    for (const call of recentCalls) {
      const date = call.callStartTime?.split('T')[0] || 'Unknown date';
      const category = call.primary_category || 'unknown';
      const outcome = call.primary_outcome || 'unknown';
      const summary = call.executive_summary || 'No summary';
      parts.push(`- ${date}: [${category}] Outcome: ${outcome}${summary}`);
    }
  }

  return parts.join('\n');
}

/**
 * Build complete customer history for a phone number.
 * Queries both from/to directions and deduplicates.
 */
export async function buildCustomerHistory(
  docClient: DynamoDBDocumentClient,
  analysisTable: string,
  leadTable: string,
  phone: string
): Promise<string> {
  if (!phone) {
    return 'No phone number available — cannot look up customer history.';
  }

  // Query calls from both directions in parallel + lead info
  const [fromCalls, toCalls, lead] = await Promise.all([
    queryRecentCalls(docClient, analysisTable, phone, 'from'),
    queryRecentCalls(docClient, analysisTable, phone, 'to'),
    queryLeadByPhone(docClient, leadTable, phone),
  ]);

  // Dedupe by callStartTime (same call appears in both from and to)
  const allCalls = [...fromCalls, ...toCalls];
  const unique = [...new Map(allCalls.map(c => [c.callStartTime, c])).values()];

  return formatCustomerHistory(unique, lead);
}

Step 4: 运行测试,确认 PASS

cd /Users/maxwsy/workspace/callytics-infrastructure/lambda/ai-analysis-processor && npx vitest run src/__tests__/customer-history.test.ts

Expected: PASS

Step 5: Commit

cd /Users/maxwsy/workspace/callytics-infrastructure
git add lambda/ai-analysis-processor/src/infrastructure/customer-history.ts lambda/ai-analysis-processor/src/__tests__/customer-history.test.ts
git commit -m "feat: add customer history query and formatting for prompt injection"

3.3 Task 3C:注入 CUSTOMER_HISTORY 到 Prompt

Files: - Modify: /Users/maxwsy/workspace/callytics-infrastructure/lambda/ai-analysis-processor/src/infrastructure/config-repository.ts:508-542 - Modify: /Users/maxwsy/workspace/callytics-infrastructure/lambda/ai-analysis-processor/src/handler.ts:289-716

Step 1: 在 config-repository.ts 的 injectBusinessInfo 中添加 CUSTOMER_HISTORY 注入

injectBusinessInfo 方法(config-repository.ts:508-542)的 pricing 注入之后,添加第三个注入:

// 3. Inject customer history (new)
if (customerHistory) {
  enhancedPrompt = enhancedPrompt.replace(
    '{CUSTOMER_HISTORY_WILL_BE_INJECTED_HERE}',
    customerHistory
  );
} else {
  enhancedPrompt = enhancedPrompt.replace(
    '{CUSTOMER_HISTORY_WILL_BE_INJECTED_HERE}',
    'No previous call history available for this phone number.'
  );
}

更新 injectBusinessInfo 方法签名,添加 customerHistory?: string 参数。

Step 2: 在 handler.ts 的 processSingleRecord 中调用 buildCustomerHistory

handler.ts 处理管道中,在 loadConfig 之后、buildAnalysisPrompt 之前(约第 380-400 行区域),添加:

// After getting callRecord and config, before building prompt:
import { buildCustomerHistory } from './infrastructure/customer-history';

const LEAD_TRACKING_TABLE = process.env['LEAD_TRACKING_TABLE'] || '';

// Extract customer phone from call record
const customerPhone = callRecord.callDirection === 'Inbound'
  ? callRecord.fromPhoneNumber
  : callRecord.toPhoneNumber;

// Build customer history (parallel query to LeadTracking + call-analysis)
let customerHistory = '';
if (customerPhone && LEAD_TRACKING_TABLE) {
  try {
    customerHistory = await buildCustomerHistory(
      deps.persistenceRepo.docClient,  // reuse existing DynamoDB client
      ANALYSIS_TABLE,
      LEAD_TRACKING_TABLE,
      customerPhone
    );
  } catch (error) {
    logger.warn('Failed to build customer history, proceeding without it', { error });
    customerHistory = 'Customer history lookup failed — analyze without historical context.';
  }
}

然后将 customerHistory 传递到 prompt 注入链中。

Step 3: 运行现有测试确认无回归

cd /Users/maxwsy/workspace/callytics-infrastructure/lambda/ai-analysis-processor && npx vitest run

Expected: PASS (existing tests should not break since CUSTOMER_HISTORY was previously unhandled)

Step 4: Commit

cd /Users/maxwsy/workspace/callytics-infrastructure
git add lambda/ai-analysis-processor/src/infrastructure/config-repository.ts lambda/ai-analysis-processor/src/handler.ts
git commit -m "feat: inject CUSTOMER_HISTORY into AI analysis prompt"

4. Task 4:保存 leadId 到 call-analysis (callytics-infrastructure)

Context: 当前 call-analysis 表没有 leadId 字段。每次展示都靠 studio-api 运行时匹配。AI 分析完成后,Lambda 已经知道 customer phone,可以查询 LeadTracking 找到对应的 leadId 并持久化。

Files: - Modify: /Users/maxwsy/workspace/callytics-infrastructure/lambda/ai-analysis-processor/src/infrastructure/persistence-repository.ts - Modify: /Users/maxwsy/workspace/callytics-infrastructure/lambda/ai-analysis-processor/src/handler.ts

Step 1: 在 saveAnalysis 中支持 leadId 字段

persistence-repository.tssaveAnalysis 方法(约第 86-164 行)中,metadata 参数已经是一个扩展对象。只需在调用处传入 leadId

// In handler.ts, after customerHistory lookup (we already queried the lead):
// Reuse the lead query result from Task 3C
const leadRecord = customerPhone && LEAD_TRACKING_TABLE
  ? await queryLeadByPhone(deps.persistenceRepo.docClient, LEAD_TRACKING_TABLE, customerPhone)
  : null;

// Extract leadId from the lead record (lead.id format is "email#timestamp")
const leadId = leadRecord ? (leadRecord as any).id : undefined;

// Pass leadId into saveAnalysis metadata
await deps.persistenceRepo.saveAnalysis({
  callId: telephonySessionId,
  analysis: analysisResult.analysis,
  metadata: {
    ...saveMetadata,
    s3AnalysisPath,
    ...costTrackingData,
    // NEW: persist lead association
    ...(leadId && { leadId }),
    ...(customerPhone && { customerPhone }),
  },
  aiMapping: config.ai_mapping,
});

Step 2: 更新 queryLeadByPhone 返回 id 字段

customer-history.tsqueryLeadByPhone 中,将 ProjectionExpression 添加 id

ProjectionExpression: 'id, firstName, lastName, receivedAt, leadType, outcome, bookedDate',

Step 3: 运行测试

cd /Users/maxwsy/workspace/callytics-infrastructure/lambda/ai-analysis-processor && npx vitest run

Expected: PASS

Step 4: Commit

cd /Users/maxwsy/workspace/callytics-infrastructure
git add lambda/ai-analysis-processor/src/handler.ts lambda/ai-analysis-processor/src/infrastructure/persistence-repository.ts lambda/ai-analysis-processor/src/infrastructure/customer-history.ts
git commit -m "feat: persist leadId and customerPhone in call-analysis records"

5. Task 5:AI Outcome 自动写回 Lead (callytics-infrastructure)

Context: AI 分析输出 primary_outcome(如 intro_booking + success),但 Lead 的 outcome 仍为 null,需要手动在 Dashboard 更新。本 Task 通过 DynamoDB Streams + 新 Lambda 自动将 AI outcome 映射回 LeadTracking。

5.1 Task 5A:启用 DynamoDB Streams (CDK)

Files: - Modify: /Users/maxwsy/workspace/callytics-infrastructure/lib/stacks/storage-stack.ts:150-160

Step 1: 在 call-analysis table 上启用 Streams

storage-stack.tscallAnalysisTable 定义中添加 stream 属性:

this.callAnalysisTable = new dynamodb.Table(this, 'CallAnalysisTable', {
  tableName: `${envConfig.resourcePrefix}-call-analysis-${envConfig.region}`,
  partitionKey: { name: 'telephonySessionId', type: dynamodb.AttributeType.STRING },
  billingMode: dynamodb.BillingMode[envConfig.config.dynamodbBilling],
  pointInTimeRecoverySpecification: {
    pointInTimeRecoveryEnabled: envConfig.config.enableBackups,
  },
  removalPolicy: cdk.RemovalPolicy.RETAIN,
  stream: dynamodb.StreamViewType.NEW_IMAGE, // NEW: enable streams
});

Step 2: 验证 CDK diff

cd /Users/maxwsy/workspace/callytics-infrastructure && npx cdk diff --context env=test 2>&1 | head -40

Expected: 显示 StreamSpecification 变更。

WARNING: 在已有表上启用 Streams 是非破坏性变更(不会替换表),但务必先在 test 环境验证。

Step 3: Commit

cd /Users/maxwsy/workspace/callytics-infrastructure
git add lib/stacks/storage-stack.ts
git commit -m "feat: enable DynamoDB Streams on call-analysis table for outcome writeback"

5.2 Task 5B:创建 Outcome Inference Lambda

Files: - Create: /Users/maxwsy/workspace/callytics-infrastructure/lambda/lead-outcome-inference/src/handler.ts - Create: /Users/maxwsy/workspace/callytics-infrastructure/lambda/lead-outcome-inference/src/outcome-mapper.ts - Create: /Users/maxwsy/workspace/callytics-infrastructure/lambda/lead-outcome-inference/src/__tests__/outcome-mapper.test.ts - Create: /Users/maxwsy/workspace/callytics-infrastructure/lambda/lead-outcome-inference/package.json - Create: /Users/maxwsy/workspace/callytics-infrastructure/lambda/lead-outcome-inference/tsconfig.json

Step 1: 写 outcome mapping 测试

// outcome-mapper.test.ts
import { describe, it, expect } from 'vitest';
import { mapAIOutcomeToLeadOutcome, shouldUpdateLeadOutcome } from '../outcome-mapper';

describe('mapAIOutcomeToLeadOutcome', () => {
  it('should map intro_booking + success to INTRO_BOOKED', () => {
    expect(mapAIOutcomeToLeadOutcome('intro_booking', 'success')).toBe('INTRO_BOOKED');
  });

  it('should map membership_purchase + success to INTRO_BOOKED', () => {
    expect(mapAIOutcomeToLeadOutcome('membership_purchase_related', 'success')).toBe('INTRO_BOOKED');
  });

  it('should return null for non-mappable outcomes', () => {
    expect(mapAIOutcomeToLeadOutcome('service_inquiry', 'resolved')).toBeNull();
  });

  it('should return null for intro_booking + no_resolution', () => {
    expect(mapAIOutcomeToLeadOutcome('intro_booking', 'no_resolution')).toBeNull();
  });
});

describe('shouldUpdateLeadOutcome', () => {
  it('should update when current outcome is null', () => {
    expect(shouldUpdateLeadOutcome(null, 'INTRO_BOOKED')).toBe(true);
  });

  it('should NOT downgrade from INTRO_BOOKED to anything', () => {
    expect(shouldUpdateLeadOutcome('INTRO_BOOKED', 'INTRO_BOOKED')).toBe(false);
  });

  it('should NOT update when lead was manually set to CLOSED', () => {
    expect(shouldUpdateLeadOutcome('CLOSED', 'INTRO_BOOKED')).toBe(false);
  });

  it('should NOT update when lead was manually set to INVALID_LEAD', () => {
    expect(shouldUpdateLeadOutcome('INVALID_LEAD', 'INTRO_BOOKED')).toBe(false);
  });
});

Step 2: 运行测试,确认 FAIL

cd /Users/maxwsy/workspace/callytics-infrastructure/lambda/lead-outcome-inference && npx vitest run

Step 3: 实现 outcome-mapper.ts

// outcome-mapper.ts
export type LeadOutcomeValue =
  | 'INTRO_BOOKED'
  | 'SELF_BOOKED'
  | 'INVALID_LEAD'
  | 'NO_CALL_LIST'
  | 'CLOSED';

// Outcomes that were set manually and should NOT be overwritten by automation
const MANUAL_FINAL_OUTCOMES: LeadOutcomeValue[] = ['CLOSED', 'INVALID_LEAD', 'NO_CALL_LIST'];

/**
 * Map AI analysis primary_subcategory + primary_outcome to LeadOutcomeValue.
 * Returns null if no mapping applies (the call doesn't indicate a lead outcome change).
 */
export function mapAIOutcomeToLeadOutcome(
  primarySubcategory: string | undefined,
  primaryOutcome: string | undefined
): LeadOutcomeValue | null {
  if (!primarySubcategory || !primaryOutcome) return null;

  // Only successful intro bookings map to INTRO_BOOKED
  if (
    (primarySubcategory === 'intro_booking' || primarySubcategory === 'membership_purchase_related') &&
    primaryOutcome === 'success'
  ) {
    return 'INTRO_BOOKED';
  }

  return null;
}

/**
 * Determine if the lead outcome should be updated.
 * Rules:
 * - Don't update if current outcome is a manual final state (CLOSED, INVALID_LEAD, NO_CALL_LIST)
 * - Don't update if current outcome already matches proposed
 * - Only update if current outcome is null (not yet set)
 */
export function shouldUpdateLeadOutcome(
  currentOutcome: string | null | undefined,
  proposedOutcome: LeadOutcomeValue
): boolean {
  // Never overwrite manual final states
  if (currentOutcome && MANUAL_FINAL_OUTCOMES.includes(currentOutcome as LeadOutcomeValue)) {
    return false;
  }

  // Don't set same value
  if (currentOutcome === proposedOutcome) {
    return false;
  }

  // Only update if null (not yet set by anyone)
  if (currentOutcome === null || currentOutcome === undefined) {
    return true;
  }

  return false;
}

Step 4: 运行测试,确认 PASS

cd /Users/maxwsy/workspace/callytics-infrastructure/lambda/lead-outcome-inference && npx vitest run

Step 5: 实现 DynamoDB Streams handler

// handler.ts
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { DynamoDBDocumentClient, GetCommand, UpdateCommand } from '@aws-sdk/lib-dynamodb';
import { Logger } from '@aws-lambda-powertools/logger';
import { mapAIOutcomeToLeadOutcome, shouldUpdateLeadOutcome } from './outcome-mapper';

const logger = new Logger({ serviceName: 'lead-outcome-inference' });

const LEAD_TRACKING_TABLE = process.env['LEAD_TRACKING_TABLE'] || '';
const LEAD_TRACKING_REGION = process.env['LEAD_TRACKING_REGION'] || 'us-east-1';

const docClient = DynamoDBDocumentClient.from(
  new DynamoDBClient({ region: LEAD_TRACKING_REGION })
);

interface DynamoDBStreamEvent {
  Records: Array<{
    eventName: string;
    dynamodb: {
      NewImage?: Record<string, any>;
      OldImage?: Record<string, any>;
    };
  }>;
}

export async function handler(event: DynamoDBStreamEvent): Promise<void> {
  for (const record of event.Records) {
    // Only process INSERT and MODIFY events
    if (record.eventName !== 'INSERT' && record.eventName !== 'MODIFY') continue;

    const newImage = record.dynamodb.NewImage;
    if (!newImage) continue;

    // Extract relevant fields (DynamoDB Stream format uses AttributeValue)
    const leadId = newImage.leadId?.S;
    const primarySubcategory = newImage.primary_subcategory?.S;
    const primaryOutcome = newImage.primary_outcome?.S;
    const followUpNeeded = newImage.follow_up_needed?.S;
    const telephonySessionId = newImage.telephonySessionId?.S;

    if (!leadId) {
      // No lead association — skip (call not matched to a lead)
      continue;
    }

    // Map AI outcome to lead outcome
    const proposedOutcome = mapAIOutcomeToLeadOutcome(primarySubcategory, primaryOutcome);

    if (!proposedOutcome && followUpNeeded !== 'true') {
      // No actionable outcome from this call
      continue;
    }

    // Get current lead state
    const leadResult = await docClient.send(
      new GetCommand({
        TableName: LEAD_TRACKING_TABLE,
        Key: { id: leadId },
        ProjectionExpression: 'id, outcome, outcomeUpdatedBy',
      })
    );

    const lead = leadResult.Item;
    if (!lead) {
      logger.warn('Lead not found', { leadId });
      continue;
    }

    const now = new Date().toISOString();
    const updateParts: string[] = [];
    const exprNames: Record<string, string> = {};
    const exprValues: Record<string, any> = {};

    // Update outcome if applicable
    if (proposedOutcome && shouldUpdateLeadOutcome(lead.outcome, proposedOutcome)) {
      // Save previous outcome to history
      if (lead.outcome) {
        updateParts.push('#outcomeHistory = list_append(if_not_exists(#outcomeHistory, :emptyList), :historyEntry)');
        exprNames['#outcomeHistory'] = 'outcomeHistory';
        exprValues[':emptyList'] = [];
        exprValues[':historyEntry'] = [{
          outcome: lead.outcome,
          updatedAt: lead.outcomeUpdatedAt || now,
          updatedBy: lead.outcomeUpdatedBy || 'unknown',
        }];
      }

      updateParts.push('#outcome = :outcome');
      updateParts.push('#outcomeUpdatedAt = :now');
      updateParts.push('#outcomeUpdatedBy = :system');
      exprNames['#outcome'] = 'outcome';
      exprNames['#outcomeUpdatedAt'] = 'outcomeUpdatedAt';
      exprNames['#outcomeUpdatedBy'] = 'outcomeUpdatedBy';
      exprValues[':outcome'] = proposedOutcome;
      exprValues[':now'] = now;
      exprValues[':system'] = 'system:ai-inference';
    }

    // Always update lastContactTime and stage
    updateParts.push('#lastContactTime = :contactTime');
    exprNames['#lastContactTime'] = 'lastContactTime';
    exprValues[':contactTime'] = now;

    // Update followUpNeeded flag
    if (followUpNeeded === 'true') {
      updateParts.push('#followUpNeeded = :fu');
      exprNames['#followUpNeeded'] = 'followUpNeeded';
      exprValues[':fu'] = true;
    }

    // Add call reference
    updateParts.push('#lastCallRef = :callRef');
    exprNames['#lastCallRef'] = 'lastCallRef';
    exprValues[':callRef'] = telephonySessionId;

    if (updateParts.length === 0) continue;

    await docClient.send(
      new UpdateCommand({
        TableName: LEAD_TRACKING_TABLE,
        Key: { id: leadId },
        UpdateExpression: `SET ${updateParts.join(', ')}`,
        ExpressionAttributeNames: exprNames,
        ExpressionAttributeValues: exprValues,
      })
    );

    logger.info('Updated lead from AI analysis', {
      leadId,
      proposedOutcome,
      followUpNeeded,
      telephonySessionId,
    });
  }
}

Step 6: 创建 package.json

{
  "name": "lead-outcome-inference",
  "version": "1.0.0",
  "private": true,
  "type": "module",
  "dependencies": {
    "@aws-lambda-powertools/logger": "^2.30.2",
    "@aws-sdk/client-dynamodb": "^3.966.0",
    "@aws-sdk/lib-dynamodb": "^3.966.0"
  },
  "devDependencies": {
    "vitest": "^3.0.0",
    "typescript": "^5.6.0"
  }
}

Step 7: 运行所有测试

cd /Users/maxwsy/workspace/callytics-infrastructure/lambda/lead-outcome-inference && npm install && npx vitest run

Expected: PASS

Step 8: Commit

cd /Users/maxwsy/workspace/callytics-infrastructure
git add lambda/lead-outcome-inference/
git commit -m "feat: create lead-outcome-inference Lambda for AI outcome writeback"

5.3 Task 5C:CDK — 注册 Outcome Inference Lambda

Files: - Modify: /Users/maxwsy/workspace/callytics-infrastructure/lib/stacks/lambda-stack.ts - Modify: /Users/maxwsy/workspace/callytics-infrastructure/lib/stacks/iam-stack.ts

Step 1: 添加 Lambda 定义

lambda-stack.ts 中,在 resultProcessor 定义之后,添加新 Lambda:

// Lead Outcome Inference Lambda — triggered by call-analysis DynamoDB Streams
this.leadOutcomeInference = new NodejsFunction(this, 'LeadOutcomeInference', {
  functionName: `${envConfig.resourcePrefix}-lead-outcome-inference-${envConfig.region}`,
  entry: 'lambda/lead-outcome-inference/src/handler.ts',
  handler: 'handler',
  runtime: lambda.Runtime.NODEJS_20_X,
  timeout: cdk.Duration.seconds(30),
  memorySize: 256,
  description: 'Maps AI call analysis outcomes back to LeadTracking records',
  bundling: {
    minify: true,
    sourceMap: true,
    target: 'node20',
    externalModules: ['@aws-sdk/*'],
  },
  environment: {
    LEAD_TRACKING_TABLE: `LeadTracking-v2-${envConfig.region}`,
    LEAD_TRACKING_REGION: envConfig.region,
  },
});

// Trigger from DynamoDB Streams
this.leadOutcomeInference.addEventSource(
  new lambdaEventSources.DynamoEventSource(storageStack.callAnalysisTable, {
    startingPosition: lambda.StartingPosition.TRIM_HORIZON,
    batchSize: 10,
    retryAttempts: 3,
    bisectBatchOnError: true,
    reportBatchItemFailures: true,
    filters: [
      lambda.FilterCriteria.filter({
        eventName: lambda.FilterRule.isEqual('INSERT'),
      }),
    ],
  })
);

Step 2: 添加 IAM 权限

// LeadTracking table read/write for outcome inference
new iam.PolicyStatement({
  effect: iam.Effect.ALLOW,
  actions: [
    'dynamodb:GetItem',
    'dynamodb:UpdateItem',
    'dynamodb:Query',
  ],
  resources: [
    `arn:aws:dynamodb:${envConfig.region}:${this.account}:table/LeadTracking-v2-${envConfig.region}`,
    `arn:aws:dynamodb:${envConfig.region}:${this.account}:table/LeadTracking-v2-${envConfig.region}/index/*`,
  ],
})

Step 3: 验证 CDK diff

cd /Users/maxwsy/workspace/callytics-infrastructure && npx cdk diff --context env=test 2>&1 | head -60

Expected: 新增 Lambda function + DynamoDB Streams event source mapping + IAM role

Step 4: Commit

cd /Users/maxwsy/workspace/callytics-infrastructure
git add lib/stacks/lambda-stack.ts lib/stacks/iam-stack.ts
git commit -m "feat: register lead-outcome-inference Lambda with DynamoDB Streams trigger"

6. Task 6:studio-api 类型兼容 (studio-api)

Context: LeadTracking 表现在会被自动写入新字段(lastContactTime, followUpNeeded, lastCallRef, outcomeUpdatedBy: "system:ai-inference")。studio-api 需要能正确展示这些数据。

Files: - Modify: /Users/maxwsy/workspace/studio-api/apps/api/src/routes/leads/types.ts - Modify: /Users/maxwsy/workspace/studio-api/apps/api/src/routes/leads/list.ts

Step 1: 扩展 LeadCallInfo 类型

types.tsLeadCallInfo 接口中添加新字段:

export interface LeadCallInfo {
  // ... existing fields ...
  outcome?: LeadOutcomeValue;
  outcomeUpdatedAt?: string;
  outcomeUpdatedBy?: string;       // "system:ai-inference" or user email
  outcomeHistory?: OutcomeHistoryEntry[];
  // NEW fields from Phase 1:
  lastContactTime?: string;
  followUpNeeded?: boolean;
  lastCallRef?: string;             // telephonySessionId of last matched call
  leadId?: string;                  // the lead's primary key
}

Step 2: 在 list.ts 响应中包含新字段

list.ts 将 lead 映射为 LeadCallInfo 的逻辑中(约第 219 行区域),确保新字段被传递:

// Add to lead response mapping
lastContactTime: lead.lastContactTime,
followUpNeeded: lead.followUpNeeded,
lastCallRef: lead.lastCallRef,

Step 3: 运行测试

cd /Users/maxwsy/workspace/studio-api && bun run test

Expected: PASS

Step 4: Commit

cd /Users/maxwsy/workspace/studio-api
git add apps/api/src/routes/leads/types.ts apps/api/src/routes/leads/list.ts
git commit -m "feat: support auto-inferred outcome fields from AI analysis pipeline"

7. Task 7:数据迁移 — 回填现有 Lead Phone 为 E.164

Context: Task 1 改了新数据的格式,但已有数据仍是 10 位格式。需要回填。

Files: - Modify: /Users/maxwsy/workspace/lead-tracking/scripts/backfill-phones.ts

Step 1: 更新回填脚本

现有的 scripts/backfill-phones.ts 已有类似逻辑。更新其 normalizePhone 函数使用 E.164 格式,然后对所有现有记录执行 UpdateCommand:

// Key logic for backfill:
// 1. Scan all items in LeadTracking-v2 table
// 2. For each item with a phone field:
//    - If phone is 10 digits: update to +1XXXXXXXXXX
//    - If phone is 11 digits starting with 1: update to +XXXXXXXXXXX
// 3. Use batch writes for efficiency

Step 2: Dry run(只读统计)

cd /Users/maxwsy/workspace/lead-tracking && npx ts-node scripts/backfill-phones.ts --dry-run

Expected: 输出需要更新的记录数

Step 3: 执行回填(需要确认)

WARNING: 这是写操作,会修改生产数据。请在 test 环境先验证后再执行 prod。

cd /Users/maxwsy/workspace/lead-tracking && npx ts-node scripts/backfill-phones.ts --execute

Step 4: Commit

cd /Users/maxwsy/workspace/lead-tracking
git add scripts/backfill-phones.ts
git commit -m "feat: update backfill script for E.164 phone format migration"

8. 依赖关系

Task 1 (E.164 phone) ──┐
                       ├── Task 3 (CUSTOMER_HISTORY) ── Task 4 (leadId) ── Task 5 (outcome writeback)
Task 2 (phone dedup) ──┘                                                         │
                                                                           Task 6 (studio-api types)
Task 7 (data migration) — 可并行,但建议在 Task 1 之后执行

关键顺序: 1. Task 1 + Task 2 可并行(同 repo 不同函数) 2. Task 3 依赖 Task 1(phone 格式统一后查询才可靠) 3. Task 4 依赖 Task 3(复用 lead 查询结果) 4. Task 5 依赖 Task 4(需要 leadId 才能回写) 5. Task 6 依赖 Task 5(需要知道新字段名) 6. Task 7 可在 Task 1 之后随时执行


9. 验收标准

Phase 1 完成后,以下场景应该自动发生:

  1. Lead 到达normalizePhone 存为 +17328561597(E.164)
  2. 同 phone 第二次提交 → 不创建新 Lead,合并到已有记录
  3. AI 分析通话 → prompt 中 {CUSTOMER_HISTORY_WILL_BE_INJECTED_HERE} 被替换为该 phone 的最近 3 通电话摘要 + lead 信息
  4. AI 分析完成 → call-analysis 记录包含 leadIdcustomerPhone 字段
  5. AI 判断 intro_booking + success → LeadTracking 的 outcome 自动变为 INTRO_BOOKEDoutcomeUpdatedBy = "system:ai-inference"
  6. studio-api 返回 lead → 响应包含 lastContactTimefollowUpNeededlastCallRef 新字段

10. 风险和注意事项

风险 缓解
DynamoDB Streams 启用可能影响写性能 NEW_IMAGE 模式开销最小;先在 test 环境验证
LeadTracking 表跨 repo IAM 权限 同 AWS 账户,只需在 CDK 中添加 IAM policy
回填脚本修改生产数据 先 dry-run,先 test 环境,有 backfill 日志
Outcome 自动回写覆盖手动 outcome shouldUpdateLeadOutcome 严格检查:不覆盖 CLOSED/INVALID_LEAD
CUSTOMER_HISTORY 查询增加 Lambda 延迟 3 个并行 DynamoDB 查询,每个 <50ms,总增加约 50-80ms