Phase 1: Data Pipeline — 从信息孤岛到数据闭环¶
状态:⚠️ Code Complete — 代码已写入各仓库 feature branch,待 code review、merge 和部署 Feature Branches:
lead-tracking/feat/phase1-e164-phone-dedup·callytics-infrastructure/feat/phase1-customer-history-outcome·studio-api/feat/phase1-lead-outcome-typesFor Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
Goal: 打通 lead-tracking、callytics-infrastructure、studio-api 三个系统的数据链路,实现 Phone 格式统一、CUSTOMER_HISTORY 注入、leadId 持久化关联、AI outcome 自动回写。
Architecture: 不重写任何系统,只在现有架构上添加连接线。lead-tracking 统一为 E.164 phone 格式并增加 phone 去重;callytics ai-analysis-processor Lambda 在构建 prompt 时查询 LeadTracking 表和历史通话,注入 CUSTOMER_HISTORY;分析完成后保存 leadId 关联;DynamoDB Streams 触发新 Lambda 将 AI outcome 自动映射回 LeadTracking。
Tech Stack: TypeScript, AWS CDK, DynamoDB, Lambda (Node.js 20/22), SQS, DynamoDB Streams, Bedrock
Repos:
- lead-tracking — /Users/maxwsy/workspace/lead-tracking
- callytics-infrastructure — /Users/maxwsy/workspace/callytics-infrastructure
- studio-api — /Users/maxwsy/workspace/studio-api
1. Task 1:统一 Phone 格式为 E.164 (lead-tracking)¶
Context: 当前 normalizePhone() 将 phone 存为 10 位数字(7328561597),但 call-analysis 存为 E.164(+17328561597)。这导致两个表无法可靠匹配。studio-api 的 call-history.ts 已经用 normalizeToE164() 做运行时转换,但 lead-tracking 存储源头需要统一。
Files:
- Modify: /Users/maxwsy/workspace/lead-tracking/src/poller.ts:65-72
- Test: /Users/maxwsy/workspace/lead-tracking/test/handler.test.ts
Step 1: 更新 normalizePhone 的测试
在 /Users/maxwsy/workspace/lead-tracking/test/handler.test.ts 中,找到现有的 normalizePhone 测试并更新期望值为 E.164 格式:
describe('normalizePhone', () => {
it('should return E.164 for 10-digit US number', () => {
expect(normalizePhone('7328561597')).toBe('+17328561597');
});
it('should return E.164 for 11-digit US number with leading 1', () => {
expect(normalizePhone('17328561597')).toBe('+17328561597');
});
it('should return E.164 for number with formatting', () => {
expect(normalizePhone('(732) 856-1597')).toBe('+17328561597');
});
it('should return E.164 for number with dashes', () => {
expect(normalizePhone('732-856-1597')).toBe('+17328561597');
});
it('should return E.164 for number with +1 prefix', () => {
expect(normalizePhone('+17328561597')).toBe('+17328561597');
});
it('should return empty string for undefined', () => {
expect(normalizePhone(undefined)).toBe('');
});
it('should return empty string for empty string', () => {
expect(normalizePhone('')).toBe('');
});
it('should return empty string for non-US number (not 10 or 11 digits)', () => {
expect(normalizePhone('123')).toBe('');
});
});
Step 2: 运行测试,确认 FAIL
cd /Users/maxwsy/workspace/lead-tracking && npx jest test/handler.test.ts --testNamePattern="normalizePhone" -v
Expected: FAIL — 现有 normalizePhone 返回 7328561597(10 位),但测试期望 +17328561597。
Step 3: 修改 normalizePhone() 实现
修改 /Users/maxwsy/workspace/lead-tracking/src/poller.ts:65-72:
function normalizePhone(phone: string | undefined): string {
if (!phone) return '';
const digits = phone.replace(/\D/g, '');
if (digits.length === 10) {
return `+1${digits}`;
}
if (digits.length === 11 && digits.startsWith('1')) {
return `+${digits}`;
}
// Non-US or invalid — return empty to avoid bad data in GSI
return '';
}
逻辑变更:
- 10 位 → 加 +1 前缀变成 E.164
- 11 位且以 1 开头 → 加 + 前缀
- 其他长度 → 返回空串(之前会原样返回,可能存入无效数据)
Step 4: 运行测试,确认 PASS
cd /Users/maxwsy/workspace/lead-tracking && npx jest test/handler.test.ts --testNamePattern="normalizePhone" -v
Expected: PASS
Step 5: 运行全部测试,确认无回归
Expected: 全部 PASS
Step 6: Commit
cd /Users/maxwsy/workspace/lead-tracking
git add src/poller.ts test/handler.test.ts
git commit -m "feat: normalize phone to E.164 format for cross-system consistency"
2. Task 2:Lead Phone 去重 (lead-tracking)¶
Context: 当前 dedup 基于 email#timestamp(poller.ts:159),同一个人用不同邮箱提交会创建多条 Lead。Phone GSI(phone-receivedAt-index)已存在,可以用来查重。
Files:
- Modify: /Users/maxwsy/workspace/lead-tracking/src/poller.ts:159 (dedup logic)
- Modify: /Users/maxwsy/workspace/lead-tracking/src/poller.ts:270-290 (save logic)
- Test: /Users/maxwsy/workspace/lead-tracking/test/handler.test.ts
Step 1: 写 phone dedup 辅助函数的测试
在 test 文件中新增:
describe('findExistingLeadByPhone', () => {
it('should return null when phone is empty', async () => {
const result = await findExistingLeadByPhone('', 'table-name');
expect(result).toBeNull();
});
// Integration tests would mock DynamoDB — see Step 3
});
Step 2: 实现 phone dedup 查询函数
在 /Users/maxwsy/workspace/lead-tracking/src/poller.ts 中,在 normalizePhone 函数后新增:
async function findExistingLeadByPhone(
phone: string,
tableName: string
): Promise<string | null> {
if (!phone) return null;
const result = await docClient.send(
new QueryCommand({
TableName: tableName,
IndexName: 'phone-receivedAt-index',
KeyConditionExpression: 'phone = :phone',
ExpressionAttributeValues: { ':phone': phone },
ScanIndexForward: false, // newest first
Limit: 1,
})
);
if (result.Items && result.Items.length > 0) {
return result.Items[0].id as string;
}
return null;
}
Step 3: 修改保存逻辑支持合并
在 poller.ts 的保存逻辑处(约第 270 行附近,PutCommand 之前),添加 phone 查重:
// Before PutCommand — check for existing lead with same phone
const existingLeadId = await findExistingLeadByPhone(
item.phone as string || '',
TABLE_NAME
);
if (existingLeadId && existingLeadId !== item.id) {
// Merge: update existing lead's receivedAt if this one is newer
await docClient.send(
new UpdateCommand({
TableName: TABLE_NAME,
Key: { id: existingLeadId },
UpdateExpression: 'SET lastDuplicateAt = :now, duplicateCount = if_not_exists(duplicateCount, :zero) + :one',
ExpressionAttributeValues: {
':now': new Date().toISOString(),
':zero': 0,
':one': 1,
},
})
);
logger.info(`Merged duplicate lead by phone: ${item.phone} → existing ${existingLeadId}`);
continue; // Skip creating new record
}
// Existing PutCommand with ConditionExpression stays as-is
Step 4: 运行测试
Expected: PASS
Step 5: Commit
cd /Users/maxwsy/workspace/lead-tracking
git add src/poller.ts test/handler.test.ts
git commit -m "feat: dedup leads by phone number using phone-receivedAt GSI"
3. Task 3:CUSTOMER_HISTORY 注入 (callytics-infrastructure)¶
Context: 这是分析文档中标注的 "最大的已有但未实现的功能"。standard-v1.0.0.txt:38-47 有 {CUSTOMER_HISTORY_WILL_BE_INJECTED_HERE} 占位符,prompt 描述了如何使用历史数据,但 config-repository.ts:508-542 的 injectBusinessInfo() 只注入了 STAFFLIST 和 PRICING,完全没处理 CUSTOMERHISTORY。
3.1 Task 3A:添加 LeadTracking 表读取权限 (CDK)¶
Files:
- Modify: /Users/maxwsy/workspace/callytics-infrastructure/lib/stacks/lambda-stack.ts:134-191
- Modify: /Users/maxwsy/workspace/callytics-infrastructure/lib/stacks/iam-stack.ts (add read policy)
Step 1: 在 Lambda 环境变量中添加 LEADTRACKINGTABLE
在 /Users/maxwsy/workspace/callytics-infrastructure/lib/stacks/lambda-stack.ts 的 resultProcessor Lambda 定义中(environment 对象内),添加:
Step 2: 添加 IAM 读取权限
在 IAM stack 中,给 resultProcessorRole 添加 LeadTracking 表的读权限:
// Add to resultProcessorRole policies
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
'dynamodb:Query',
'dynamodb:GetItem',
],
resources: [
`arn:aws:dynamodb:${envConfig.region}:${this.account}:table/LeadTracking-v2-${envConfig.region}`,
`arn:aws:dynamodb:${envConfig.region}:${this.account}:table/LeadTracking-v2-${envConfig.region}/index/*`,
],
})
Step 3: Commit CDK 变更
cd /Users/maxwsy/workspace/callytics-infrastructure
git add lib/stacks/lambda-stack.ts lib/stacks/iam-stack.ts
git commit -m "feat: grant ai-analysis-processor read access to LeadTracking table"
3.2 Task 3B:实现 CUSTOMER_HISTORY 查询和格式化¶
Files:
- Create: /Users/maxwsy/workspace/callytics-infrastructure/lambda/ai-analysis-processor/src/infrastructure/customer-history.ts
- Test: /Users/maxwsy/workspace/callytics-infrastructure/lambda/ai-analysis-processor/src/__tests__/customer-history.test.ts
Step 1: 写 formatCustomerHistory 的测试
// customer-history.test.ts
import { describe, it, expect } from 'vitest';
import { formatCustomerHistory } from '../infrastructure/customer-history';
describe('formatCustomerHistory', () => {
it('should return "No previous history" when no calls and no lead', () => {
const result = formatCustomerHistory([], null);
expect(result).toBe('No previous call history available for this phone number.');
});
it('should format lead info and call history', () => {
const calls = [
{
callStartTime: '2026-02-11T10:00:00Z',
primary_category: 'intro_booking',
primary_outcome: 'success',
executive_summary: 'Customer booked intro class',
},
{
callStartTime: '2026-02-08T14:30:00Z',
primary_category: 'membership_purchase_related',
primary_outcome: 'no_resolution',
executive_summary: 'Price inquiry, did not commit',
},
];
const lead = {
firstName: 'Jane',
lastName: 'Smith',
receivedAt: '2026-02-07T09:00:00Z',
leadType: 'Web Lead',
outcome: null,
};
const result = formatCustomerHistory(calls, lead);
expect(result).toContain('Jane Smith');
expect(result).toContain('Web Lead');
expect(result).toContain('2026-02-07');
expect(result).toContain('intro_booking');
expect(result).toContain('success');
expect(result).toContain('Customer booked intro class');
expect(result).toContain('Price inquiry');
});
it('should limit to 3 most recent calls', () => {
const calls = Array.from({ length: 5 }, (_, i) => ({
callStartTime: `2026-02-${10 - i}T10:00:00Z`,
primary_category: 'service',
primary_outcome: 'resolved',
executive_summary: `Call ${i + 1}`,
}));
const result = formatCustomerHistory(calls, null);
// Should only include 3 most recent
expect(result).toContain('Call 1');
expect(result).toContain('Call 2');
expect(result).toContain('Call 3');
expect(result).not.toContain('Call 4');
});
});
Step 2: 运行测试,确认 FAIL
cd /Users/maxwsy/workspace/callytics-infrastructure/lambda/ai-analysis-processor && npx vitest run src/__tests__/customer-history.test.ts
Expected: FAIL — module not found
Step 3: 实现 customer-history.ts
// /Users/maxwsy/workspace/callytics-infrastructure/lambda/ai-analysis-processor/src/infrastructure/customer-history.ts
import { DynamoDBDocumentClient, QueryCommand } from '@aws-sdk/lib-dynamodb';
export interface CallHistoryRecord {
callStartTime: string;
primary_category?: string;
primary_outcome?: string;
executive_summary?: string;
follow_up_needed?: string;
}
export interface LeadRecord {
firstName?: string;
lastName?: string;
receivedAt?: string;
leadType?: string;
outcome?: string;
bookedDate?: string;
}
const MAX_HISTORY_CALLS = 3;
/**
* Query recent calls for a phone number from call-analysis table.
*/
export async function queryRecentCalls(
docClient: DynamoDBDocumentClient,
analysisTable: string,
phone: string,
direction: 'from' | 'to'
): Promise<CallHistoryRecord[]> {
if (!phone) return [];
const indexName = direction === 'from'
? 'fromPhoneNumber-callStartTime-index'
: 'toPhoneNumber-callStartTime-index';
const keyName = direction === 'from' ? 'fromPhoneNumber' : 'toPhoneNumber';
const result = await docClient.send(
new QueryCommand({
TableName: analysisTable,
IndexName: indexName,
KeyConditionExpression: `${keyName} = :phone`,
ExpressionAttributeValues: { ':phone': phone },
ScanIndexForward: false, // newest first
Limit: MAX_HISTORY_CALLS,
ProjectionExpression: 'callStartTime, primary_category, primary_outcome, executive_summary, follow_up_needed',
})
);
return (result.Items || []) as CallHistoryRecord[];
}
/**
* Query lead info by phone from LeadTracking table.
*/
export async function queryLeadByPhone(
docClient: DynamoDBDocumentClient,
leadTable: string,
phone: string
): Promise<LeadRecord | null> {
if (!phone) return null;
const result = await docClient.send(
new QueryCommand({
TableName: leadTable,
IndexName: 'phone-receivedAt-index',
KeyConditionExpression: 'phone = :phone',
ExpressionAttributeValues: { ':phone': phone },
ScanIndexForward: false,
Limit: 1,
ProjectionExpression: 'firstName, lastName, receivedAt, leadType, outcome, bookedDate',
})
);
if (result.Items && result.Items.length > 0) {
return result.Items[0] as LeadRecord;
}
return null;
}
/**
* Format customer history for prompt injection.
*/
export function formatCustomerHistory(
calls: CallHistoryRecord[],
lead: LeadRecord | null
): string {
if (calls.length === 0 && !lead) {
return 'No previous call history available for this phone number.';
}
const parts: string[] = [];
// Lead info section
if (lead) {
const name = [lead.firstName, lead.lastName].filter(Boolean).join(' ') || 'Unknown';
parts.push(`**Lead Information:**`);
parts.push(`- Name: ${name}`);
parts.push(`- Lead Type: ${lead.leadType || 'Unknown'}`);
parts.push(`- Received: ${lead.receivedAt?.split('T')[0] || 'Unknown'}`);
if (lead.bookedDate) {
parts.push(`- Booked Date: ${lead.bookedDate}`);
}
if (lead.outcome) {
parts.push(`- Current Outcome: ${lead.outcome}`);
}
parts.push('');
}
// Recent calls section (max 3)
const recentCalls = calls
.sort((a, b) => new Date(b.callStartTime).getTime() - new Date(a.callStartTime).getTime())
.slice(0, MAX_HISTORY_CALLS);
if (recentCalls.length > 0) {
parts.push(`**Previous Calls (${recentCalls.length} most recent):**`);
for (const call of recentCalls) {
const date = call.callStartTime?.split('T')[0] || 'Unknown date';
const category = call.primary_category || 'unknown';
const outcome = call.primary_outcome || 'unknown';
const summary = call.executive_summary || 'No summary';
parts.push(`- ${date}: [${category}] Outcome: ${outcome} — ${summary}`);
}
}
return parts.join('\n');
}
/**
* Build complete customer history for a phone number.
* Queries both from/to directions and deduplicates.
*/
export async function buildCustomerHistory(
docClient: DynamoDBDocumentClient,
analysisTable: string,
leadTable: string,
phone: string
): Promise<string> {
if (!phone) {
return 'No phone number available — cannot look up customer history.';
}
// Query calls from both directions in parallel + lead info
const [fromCalls, toCalls, lead] = await Promise.all([
queryRecentCalls(docClient, analysisTable, phone, 'from'),
queryRecentCalls(docClient, analysisTable, phone, 'to'),
queryLeadByPhone(docClient, leadTable, phone),
]);
// Dedupe by callStartTime (same call appears in both from and to)
const allCalls = [...fromCalls, ...toCalls];
const unique = [...new Map(allCalls.map(c => [c.callStartTime, c])).values()];
return formatCustomerHistory(unique, lead);
}
Step 4: 运行测试,确认 PASS
cd /Users/maxwsy/workspace/callytics-infrastructure/lambda/ai-analysis-processor && npx vitest run src/__tests__/customer-history.test.ts
Expected: PASS
Step 5: Commit
cd /Users/maxwsy/workspace/callytics-infrastructure
git add lambda/ai-analysis-processor/src/infrastructure/customer-history.ts lambda/ai-analysis-processor/src/__tests__/customer-history.test.ts
git commit -m "feat: add customer history query and formatting for prompt injection"
3.3 Task 3C:注入 CUSTOMER_HISTORY 到 Prompt¶
Files:
- Modify: /Users/maxwsy/workspace/callytics-infrastructure/lambda/ai-analysis-processor/src/infrastructure/config-repository.ts:508-542
- Modify: /Users/maxwsy/workspace/callytics-infrastructure/lambda/ai-analysis-processor/src/handler.ts:289-716
Step 1: 在 config-repository.ts 的 injectBusinessInfo 中添加 CUSTOMER_HISTORY 注入
在 injectBusinessInfo 方法(config-repository.ts:508-542)的 pricing 注入之后,添加第三个注入:
// 3. Inject customer history (new)
if (customerHistory) {
enhancedPrompt = enhancedPrompt.replace(
'{CUSTOMER_HISTORY_WILL_BE_INJECTED_HERE}',
customerHistory
);
} else {
enhancedPrompt = enhancedPrompt.replace(
'{CUSTOMER_HISTORY_WILL_BE_INJECTED_HERE}',
'No previous call history available for this phone number.'
);
}
更新 injectBusinessInfo 方法签名,添加 customerHistory?: string 参数。
Step 2: 在 handler.ts 的 processSingleRecord 中调用 buildCustomerHistory
在 handler.ts 处理管道中,在 loadConfig 之后、buildAnalysisPrompt 之前(约第 380-400 行区域),添加:
// After getting callRecord and config, before building prompt:
import { buildCustomerHistory } from './infrastructure/customer-history';
const LEAD_TRACKING_TABLE = process.env['LEAD_TRACKING_TABLE'] || '';
// Extract customer phone from call record
const customerPhone = callRecord.callDirection === 'Inbound'
? callRecord.fromPhoneNumber
: callRecord.toPhoneNumber;
// Build customer history (parallel query to LeadTracking + call-analysis)
let customerHistory = '';
if (customerPhone && LEAD_TRACKING_TABLE) {
try {
customerHistory = await buildCustomerHistory(
deps.persistenceRepo.docClient, // reuse existing DynamoDB client
ANALYSIS_TABLE,
LEAD_TRACKING_TABLE,
customerPhone
);
} catch (error) {
logger.warn('Failed to build customer history, proceeding without it', { error });
customerHistory = 'Customer history lookup failed — analyze without historical context.';
}
}
然后将 customerHistory 传递到 prompt 注入链中。
Step 3: 运行现有测试确认无回归
Expected: PASS (existing tests should not break since CUSTOMER_HISTORY was previously unhandled)
Step 4: Commit
cd /Users/maxwsy/workspace/callytics-infrastructure
git add lambda/ai-analysis-processor/src/infrastructure/config-repository.ts lambda/ai-analysis-processor/src/handler.ts
git commit -m "feat: inject CUSTOMER_HISTORY into AI analysis prompt"
4. Task 4:保存 leadId 到 call-analysis (callytics-infrastructure)¶
Context: 当前 call-analysis 表没有 leadId 字段。每次展示都靠 studio-api 运行时匹配。AI 分析完成后,Lambda 已经知道 customer phone,可以查询 LeadTracking 找到对应的 leadId 并持久化。
Files:
- Modify: /Users/maxwsy/workspace/callytics-infrastructure/lambda/ai-analysis-processor/src/infrastructure/persistence-repository.ts
- Modify: /Users/maxwsy/workspace/callytics-infrastructure/lambda/ai-analysis-processor/src/handler.ts
Step 1: 在 saveAnalysis 中支持 leadId 字段
在 persistence-repository.ts 的 saveAnalysis 方法(约第 86-164 行)中,metadata 参数已经是一个扩展对象。只需在调用处传入 leadId:
// In handler.ts, after customerHistory lookup (we already queried the lead):
// Reuse the lead query result from Task 3C
const leadRecord = customerPhone && LEAD_TRACKING_TABLE
? await queryLeadByPhone(deps.persistenceRepo.docClient, LEAD_TRACKING_TABLE, customerPhone)
: null;
// Extract leadId from the lead record (lead.id format is "email#timestamp")
const leadId = leadRecord ? (leadRecord as any).id : undefined;
// Pass leadId into saveAnalysis metadata
await deps.persistenceRepo.saveAnalysis({
callId: telephonySessionId,
analysis: analysisResult.analysis,
metadata: {
...saveMetadata,
s3AnalysisPath,
...costTrackingData,
// NEW: persist lead association
...(leadId && { leadId }),
...(customerPhone && { customerPhone }),
},
aiMapping: config.ai_mapping,
});
Step 2: 更新 queryLeadByPhone 返回 id 字段
在 customer-history.ts 的 queryLeadByPhone 中,将 ProjectionExpression 添加 id:
Step 3: 运行测试
Expected: PASS
Step 4: Commit
cd /Users/maxwsy/workspace/callytics-infrastructure
git add lambda/ai-analysis-processor/src/handler.ts lambda/ai-analysis-processor/src/infrastructure/persistence-repository.ts lambda/ai-analysis-processor/src/infrastructure/customer-history.ts
git commit -m "feat: persist leadId and customerPhone in call-analysis records"
5. Task 5:AI Outcome 自动写回 Lead (callytics-infrastructure)¶
Context: AI 分析输出 primary_outcome(如 intro_booking + success),但 Lead 的 outcome 仍为 null,需要手动在 Dashboard 更新。本 Task 通过 DynamoDB Streams + 新 Lambda 自动将 AI outcome 映射回 LeadTracking。
5.1 Task 5A:启用 DynamoDB Streams (CDK)¶
Files:
- Modify: /Users/maxwsy/workspace/callytics-infrastructure/lib/stacks/storage-stack.ts:150-160
Step 1: 在 call-analysis table 上启用 Streams
在 storage-stack.ts 的 callAnalysisTable 定义中添加 stream 属性:
this.callAnalysisTable = new dynamodb.Table(this, 'CallAnalysisTable', {
tableName: `${envConfig.resourcePrefix}-call-analysis-${envConfig.region}`,
partitionKey: { name: 'telephonySessionId', type: dynamodb.AttributeType.STRING },
billingMode: dynamodb.BillingMode[envConfig.config.dynamodbBilling],
pointInTimeRecoverySpecification: {
pointInTimeRecoveryEnabled: envConfig.config.enableBackups,
},
removalPolicy: cdk.RemovalPolicy.RETAIN,
stream: dynamodb.StreamViewType.NEW_IMAGE, // NEW: enable streams
});
Step 2: 验证 CDK diff
cd /Users/maxwsy/workspace/callytics-infrastructure && npx cdk diff --context env=test 2>&1 | head -40
Expected: 显示 StreamSpecification 变更。
WARNING: 在已有表上启用 Streams 是非破坏性变更(不会替换表),但务必先在 test 环境验证。
Step 3: Commit
cd /Users/maxwsy/workspace/callytics-infrastructure
git add lib/stacks/storage-stack.ts
git commit -m "feat: enable DynamoDB Streams on call-analysis table for outcome writeback"
5.2 Task 5B:创建 Outcome Inference Lambda¶
Files:
- Create: /Users/maxwsy/workspace/callytics-infrastructure/lambda/lead-outcome-inference/src/handler.ts
- Create: /Users/maxwsy/workspace/callytics-infrastructure/lambda/lead-outcome-inference/src/outcome-mapper.ts
- Create: /Users/maxwsy/workspace/callytics-infrastructure/lambda/lead-outcome-inference/src/__tests__/outcome-mapper.test.ts
- Create: /Users/maxwsy/workspace/callytics-infrastructure/lambda/lead-outcome-inference/package.json
- Create: /Users/maxwsy/workspace/callytics-infrastructure/lambda/lead-outcome-inference/tsconfig.json
Step 1: 写 outcome mapping 测试
// outcome-mapper.test.ts
import { describe, it, expect } from 'vitest';
import { mapAIOutcomeToLeadOutcome, shouldUpdateLeadOutcome } from '../outcome-mapper';
describe('mapAIOutcomeToLeadOutcome', () => {
it('should map intro_booking + success to INTRO_BOOKED', () => {
expect(mapAIOutcomeToLeadOutcome('intro_booking', 'success')).toBe('INTRO_BOOKED');
});
it('should map membership_purchase + success to INTRO_BOOKED', () => {
expect(mapAIOutcomeToLeadOutcome('membership_purchase_related', 'success')).toBe('INTRO_BOOKED');
});
it('should return null for non-mappable outcomes', () => {
expect(mapAIOutcomeToLeadOutcome('service_inquiry', 'resolved')).toBeNull();
});
it('should return null for intro_booking + no_resolution', () => {
expect(mapAIOutcomeToLeadOutcome('intro_booking', 'no_resolution')).toBeNull();
});
});
describe('shouldUpdateLeadOutcome', () => {
it('should update when current outcome is null', () => {
expect(shouldUpdateLeadOutcome(null, 'INTRO_BOOKED')).toBe(true);
});
it('should NOT downgrade from INTRO_BOOKED to anything', () => {
expect(shouldUpdateLeadOutcome('INTRO_BOOKED', 'INTRO_BOOKED')).toBe(false);
});
it('should NOT update when lead was manually set to CLOSED', () => {
expect(shouldUpdateLeadOutcome('CLOSED', 'INTRO_BOOKED')).toBe(false);
});
it('should NOT update when lead was manually set to INVALID_LEAD', () => {
expect(shouldUpdateLeadOutcome('INVALID_LEAD', 'INTRO_BOOKED')).toBe(false);
});
});
Step 2: 运行测试,确认 FAIL
Step 3: 实现 outcome-mapper.ts
// outcome-mapper.ts
export type LeadOutcomeValue =
| 'INTRO_BOOKED'
| 'SELF_BOOKED'
| 'INVALID_LEAD'
| 'NO_CALL_LIST'
| 'CLOSED';
// Outcomes that were set manually and should NOT be overwritten by automation
const MANUAL_FINAL_OUTCOMES: LeadOutcomeValue[] = ['CLOSED', 'INVALID_LEAD', 'NO_CALL_LIST'];
/**
* Map AI analysis primary_subcategory + primary_outcome to LeadOutcomeValue.
* Returns null if no mapping applies (the call doesn't indicate a lead outcome change).
*/
export function mapAIOutcomeToLeadOutcome(
primarySubcategory: string | undefined,
primaryOutcome: string | undefined
): LeadOutcomeValue | null {
if (!primarySubcategory || !primaryOutcome) return null;
// Only successful intro bookings map to INTRO_BOOKED
if (
(primarySubcategory === 'intro_booking' || primarySubcategory === 'membership_purchase_related') &&
primaryOutcome === 'success'
) {
return 'INTRO_BOOKED';
}
return null;
}
/**
* Determine if the lead outcome should be updated.
* Rules:
* - Don't update if current outcome is a manual final state (CLOSED, INVALID_LEAD, NO_CALL_LIST)
* - Don't update if current outcome already matches proposed
* - Only update if current outcome is null (not yet set)
*/
export function shouldUpdateLeadOutcome(
currentOutcome: string | null | undefined,
proposedOutcome: LeadOutcomeValue
): boolean {
// Never overwrite manual final states
if (currentOutcome && MANUAL_FINAL_OUTCOMES.includes(currentOutcome as LeadOutcomeValue)) {
return false;
}
// Don't set same value
if (currentOutcome === proposedOutcome) {
return false;
}
// Only update if null (not yet set by anyone)
if (currentOutcome === null || currentOutcome === undefined) {
return true;
}
return false;
}
Step 4: 运行测试,确认 PASS
Step 5: 实现 DynamoDB Streams handler
// handler.ts
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { DynamoDBDocumentClient, GetCommand, UpdateCommand } from '@aws-sdk/lib-dynamodb';
import { Logger } from '@aws-lambda-powertools/logger';
import { mapAIOutcomeToLeadOutcome, shouldUpdateLeadOutcome } from './outcome-mapper';
const logger = new Logger({ serviceName: 'lead-outcome-inference' });
const LEAD_TRACKING_TABLE = process.env['LEAD_TRACKING_TABLE'] || '';
const LEAD_TRACKING_REGION = process.env['LEAD_TRACKING_REGION'] || 'us-east-1';
const docClient = DynamoDBDocumentClient.from(
new DynamoDBClient({ region: LEAD_TRACKING_REGION })
);
interface DynamoDBStreamEvent {
Records: Array<{
eventName: string;
dynamodb: {
NewImage?: Record<string, any>;
OldImage?: Record<string, any>;
};
}>;
}
export async function handler(event: DynamoDBStreamEvent): Promise<void> {
for (const record of event.Records) {
// Only process INSERT and MODIFY events
if (record.eventName !== 'INSERT' && record.eventName !== 'MODIFY') continue;
const newImage = record.dynamodb.NewImage;
if (!newImage) continue;
// Extract relevant fields (DynamoDB Stream format uses AttributeValue)
const leadId = newImage.leadId?.S;
const primarySubcategory = newImage.primary_subcategory?.S;
const primaryOutcome = newImage.primary_outcome?.S;
const followUpNeeded = newImage.follow_up_needed?.S;
const telephonySessionId = newImage.telephonySessionId?.S;
if (!leadId) {
// No lead association — skip (call not matched to a lead)
continue;
}
// Map AI outcome to lead outcome
const proposedOutcome = mapAIOutcomeToLeadOutcome(primarySubcategory, primaryOutcome);
if (!proposedOutcome && followUpNeeded !== 'true') {
// No actionable outcome from this call
continue;
}
// Get current lead state
const leadResult = await docClient.send(
new GetCommand({
TableName: LEAD_TRACKING_TABLE,
Key: { id: leadId },
ProjectionExpression: 'id, outcome, outcomeUpdatedBy',
})
);
const lead = leadResult.Item;
if (!lead) {
logger.warn('Lead not found', { leadId });
continue;
}
const now = new Date().toISOString();
const updateParts: string[] = [];
const exprNames: Record<string, string> = {};
const exprValues: Record<string, any> = {};
// Update outcome if applicable
if (proposedOutcome && shouldUpdateLeadOutcome(lead.outcome, proposedOutcome)) {
// Save previous outcome to history
if (lead.outcome) {
updateParts.push('#outcomeHistory = list_append(if_not_exists(#outcomeHistory, :emptyList), :historyEntry)');
exprNames['#outcomeHistory'] = 'outcomeHistory';
exprValues[':emptyList'] = [];
exprValues[':historyEntry'] = [{
outcome: lead.outcome,
updatedAt: lead.outcomeUpdatedAt || now,
updatedBy: lead.outcomeUpdatedBy || 'unknown',
}];
}
updateParts.push('#outcome = :outcome');
updateParts.push('#outcomeUpdatedAt = :now');
updateParts.push('#outcomeUpdatedBy = :system');
exprNames['#outcome'] = 'outcome';
exprNames['#outcomeUpdatedAt'] = 'outcomeUpdatedAt';
exprNames['#outcomeUpdatedBy'] = 'outcomeUpdatedBy';
exprValues[':outcome'] = proposedOutcome;
exprValues[':now'] = now;
exprValues[':system'] = 'system:ai-inference';
}
// Always update lastContactTime and stage
updateParts.push('#lastContactTime = :contactTime');
exprNames['#lastContactTime'] = 'lastContactTime';
exprValues[':contactTime'] = now;
// Update followUpNeeded flag
if (followUpNeeded === 'true') {
updateParts.push('#followUpNeeded = :fu');
exprNames['#followUpNeeded'] = 'followUpNeeded';
exprValues[':fu'] = true;
}
// Add call reference
updateParts.push('#lastCallRef = :callRef');
exprNames['#lastCallRef'] = 'lastCallRef';
exprValues[':callRef'] = telephonySessionId;
if (updateParts.length === 0) continue;
await docClient.send(
new UpdateCommand({
TableName: LEAD_TRACKING_TABLE,
Key: { id: leadId },
UpdateExpression: `SET ${updateParts.join(', ')}`,
ExpressionAttributeNames: exprNames,
ExpressionAttributeValues: exprValues,
})
);
logger.info('Updated lead from AI analysis', {
leadId,
proposedOutcome,
followUpNeeded,
telephonySessionId,
});
}
}
Step 6: 创建 package.json
{
"name": "lead-outcome-inference",
"version": "1.0.0",
"private": true,
"type": "module",
"dependencies": {
"@aws-lambda-powertools/logger": "^2.30.2",
"@aws-sdk/client-dynamodb": "^3.966.0",
"@aws-sdk/lib-dynamodb": "^3.966.0"
},
"devDependencies": {
"vitest": "^3.0.0",
"typescript": "^5.6.0"
}
}
Step 7: 运行所有测试
cd /Users/maxwsy/workspace/callytics-infrastructure/lambda/lead-outcome-inference && npm install && npx vitest run
Expected: PASS
Step 8: Commit
cd /Users/maxwsy/workspace/callytics-infrastructure
git add lambda/lead-outcome-inference/
git commit -m "feat: create lead-outcome-inference Lambda for AI outcome writeback"
5.3 Task 5C:CDK — 注册 Outcome Inference Lambda¶
Files:
- Modify: /Users/maxwsy/workspace/callytics-infrastructure/lib/stacks/lambda-stack.ts
- Modify: /Users/maxwsy/workspace/callytics-infrastructure/lib/stacks/iam-stack.ts
Step 1: 添加 Lambda 定义
在 lambda-stack.ts 中,在 resultProcessor 定义之后,添加新 Lambda:
// Lead Outcome Inference Lambda — triggered by call-analysis DynamoDB Streams
this.leadOutcomeInference = new NodejsFunction(this, 'LeadOutcomeInference', {
functionName: `${envConfig.resourcePrefix}-lead-outcome-inference-${envConfig.region}`,
entry: 'lambda/lead-outcome-inference/src/handler.ts',
handler: 'handler',
runtime: lambda.Runtime.NODEJS_20_X,
timeout: cdk.Duration.seconds(30),
memorySize: 256,
description: 'Maps AI call analysis outcomes back to LeadTracking records',
bundling: {
minify: true,
sourceMap: true,
target: 'node20',
externalModules: ['@aws-sdk/*'],
},
environment: {
LEAD_TRACKING_TABLE: `LeadTracking-v2-${envConfig.region}`,
LEAD_TRACKING_REGION: envConfig.region,
},
});
// Trigger from DynamoDB Streams
this.leadOutcomeInference.addEventSource(
new lambdaEventSources.DynamoEventSource(storageStack.callAnalysisTable, {
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
batchSize: 10,
retryAttempts: 3,
bisectBatchOnError: true,
reportBatchItemFailures: true,
filters: [
lambda.FilterCriteria.filter({
eventName: lambda.FilterRule.isEqual('INSERT'),
}),
],
})
);
Step 2: 添加 IAM 权限
// LeadTracking table read/write for outcome inference
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
'dynamodb:GetItem',
'dynamodb:UpdateItem',
'dynamodb:Query',
],
resources: [
`arn:aws:dynamodb:${envConfig.region}:${this.account}:table/LeadTracking-v2-${envConfig.region}`,
`arn:aws:dynamodb:${envConfig.region}:${this.account}:table/LeadTracking-v2-${envConfig.region}/index/*`,
],
})
Step 3: 验证 CDK diff
cd /Users/maxwsy/workspace/callytics-infrastructure && npx cdk diff --context env=test 2>&1 | head -60
Expected: 新增 Lambda function + DynamoDB Streams event source mapping + IAM role
Step 4: Commit
cd /Users/maxwsy/workspace/callytics-infrastructure
git add lib/stacks/lambda-stack.ts lib/stacks/iam-stack.ts
git commit -m "feat: register lead-outcome-inference Lambda with DynamoDB Streams trigger"
6. Task 6:studio-api 类型兼容 (studio-api)¶
Context: LeadTracking 表现在会被自动写入新字段(lastContactTime, followUpNeeded, lastCallRef, outcomeUpdatedBy: "system:ai-inference")。studio-api 需要能正确展示这些数据。
Files:
- Modify: /Users/maxwsy/workspace/studio-api/apps/api/src/routes/leads/types.ts
- Modify: /Users/maxwsy/workspace/studio-api/apps/api/src/routes/leads/list.ts
Step 1: 扩展 LeadCallInfo 类型
在 types.ts 的 LeadCallInfo 接口中添加新字段:
export interface LeadCallInfo {
// ... existing fields ...
outcome?: LeadOutcomeValue;
outcomeUpdatedAt?: string;
outcomeUpdatedBy?: string; // "system:ai-inference" or user email
outcomeHistory?: OutcomeHistoryEntry[];
// NEW fields from Phase 1:
lastContactTime?: string;
followUpNeeded?: boolean;
lastCallRef?: string; // telephonySessionId of last matched call
leadId?: string; // the lead's primary key
}
Step 2: 在 list.ts 响应中包含新字段
在 list.ts 将 lead 映射为 LeadCallInfo 的逻辑中(约第 219 行区域),确保新字段被传递:
// Add to lead response mapping
lastContactTime: lead.lastContactTime,
followUpNeeded: lead.followUpNeeded,
lastCallRef: lead.lastCallRef,
Step 3: 运行测试
Expected: PASS
Step 4: Commit
cd /Users/maxwsy/workspace/studio-api
git add apps/api/src/routes/leads/types.ts apps/api/src/routes/leads/list.ts
git commit -m "feat: support auto-inferred outcome fields from AI analysis pipeline"
7. Task 7:数据迁移 — 回填现有 Lead Phone 为 E.164¶
Context: Task 1 改了新数据的格式,但已有数据仍是 10 位格式。需要回填。
Files:
- Modify: /Users/maxwsy/workspace/lead-tracking/scripts/backfill-phones.ts
Step 1: 更新回填脚本
现有的 scripts/backfill-phones.ts 已有类似逻辑。更新其 normalizePhone 函数使用 E.164 格式,然后对所有现有记录执行 UpdateCommand:
// Key logic for backfill:
// 1. Scan all items in LeadTracking-v2 table
// 2. For each item with a phone field:
// - If phone is 10 digits: update to +1XXXXXXXXXX
// - If phone is 11 digits starting with 1: update to +XXXXXXXXXXX
// 3. Use batch writes for efficiency
Step 2: Dry run(只读统计)
Expected: 输出需要更新的记录数
Step 3: 执行回填(需要确认)
WARNING: 这是写操作,会修改生产数据。请在 test 环境先验证后再执行 prod。
Step 4: Commit
cd /Users/maxwsy/workspace/lead-tracking
git add scripts/backfill-phones.ts
git commit -m "feat: update backfill script for E.164 phone format migration"
8. 依赖关系¶
Task 1 (E.164 phone) ──┐
├── Task 3 (CUSTOMER_HISTORY) ── Task 4 (leadId) ── Task 5 (outcome writeback)
Task 2 (phone dedup) ──┘ │
▼
Task 6 (studio-api types)
Task 7 (data migration) — 可并行,但建议在 Task 1 之后执行
关键顺序: 1. Task 1 + Task 2 可并行(同 repo 不同函数) 2. Task 3 依赖 Task 1(phone 格式统一后查询才可靠) 3. Task 4 依赖 Task 3(复用 lead 查询结果) 4. Task 5 依赖 Task 4(需要 leadId 才能回写) 5. Task 6 依赖 Task 5(需要知道新字段名) 6. Task 7 可在 Task 1 之后随时执行
9. 验收标准¶
Phase 1 完成后,以下场景应该自动发生:
- Lead 到达 →
normalizePhone存为+17328561597(E.164) - 同 phone 第二次提交 → 不创建新 Lead,合并到已有记录
- AI 分析通话 → prompt 中
{CUSTOMER_HISTORY_WILL_BE_INJECTED_HERE}被替换为该 phone 的最近 3 通电话摘要 + lead 信息 - AI 分析完成 → call-analysis 记录包含
leadId和customerPhone字段 - AI 判断 intro_booking + success → LeadTracking 的 outcome 自动变为
INTRO_BOOKED,outcomeUpdatedBy = "system:ai-inference" - studio-api 返回 lead → 响应包含
lastContactTime、followUpNeeded、lastCallRef新字段
10. 风险和注意事项¶
| 风险 | 缓解 |
|---|---|
| DynamoDB Streams 启用可能影响写性能 | NEW_IMAGE 模式开销最小;先在 test 环境验证 |
| LeadTracking 表跨 repo IAM 权限 | 同 AWS 账户,只需在 CDK 中添加 IAM policy |
| 回填脚本修改生产数据 | 先 dry-run,先 test 环境,有 backfill 日志 |
| Outcome 自动回写覆盖手动 outcome | shouldUpdateLeadOutcome 严格检查:不覆盖 CLOSED/INVALID_LEAD |
| CUSTOMER_HISTORY 查询增加 Lambda 延迟 | 3 个并行 DynamoDB 查询,每个 <50ms,总增加约 50-80ms |