feat: 全功能前后端联调完成,47/47 测试通过

前端:
- 新增 DietCapturePage 独立拍照识别页
- 5种消息卡片类型完整实现(数据确认/用药/饮食/报告/快捷选项)
- 任务卡片区:异常警告+数据摘要+自动折叠
- 侧滑抽屉:历史对话列表+对话管理
- 运动计划:进度卡片+创建计划+每日打卡
- 报告页:拍照/相册/PDF上传+分析
- 面板按钮补全血氧/体重录入
- UI 升级:紫色主题+动画+气泡样式
- 全部迁移 Riverpod 3.x API

后端:
- 新增 _UpdateMessageTypeAndMetadata,Tool Calling 自动映射消息类型
- SSE answer 事件携带 type 字段
- 提示词优化(移除"阿福",语气规则归位)
- 运动计划支持 AI 创建和打卡

测试:
- 新增 full_e2e_test.py 全流程测试(认证/数据CRUD/6个Agent对话/VLM/报告)
This commit is contained in:
MingNian
2026-06-02 20:31:22 +08:00
parent 498708e568
commit c6395ea9b4
12 changed files with 2631 additions and 126 deletions

View File

@@ -21,8 +21,7 @@ public sealed class PromptManager
};
private const string DefaultPrompt = """
AI "阿福"
怀
AI健康管家
1.
@@ -34,6 +33,7 @@ public sealed class PromptManager
-
-
- /
- 怀
""";
private const string ConsultationPrompt = """

View File

@@ -107,6 +107,8 @@ public static class AiChatEndpoints
var maxIterations = 5;
var fullResponse = "";
var completedNormally = false;
var messageType = "text";
var metadata = new Dictionary<string, object>();
for (int i = 0; i < maxIterations; i++)
{
@@ -129,7 +131,7 @@ public static class AiChatEndpoints
if (!string.IsNullOrEmpty(content))
{
fullResponse += content;
await SseWriteAsync(http, new { action = "answer", data = content }, ct);
await SseWriteAsync(http, new { action = "answer", data = content, type = messageType }, ct);
}
}
catch (JsonException) { /* 跳过解析失败的 chunk */ }
@@ -160,6 +162,8 @@ public static class AiChatEndpoints
}
await SseWriteAsync(http, new { action = "tool_result", tool = tc.Function.Name, data = toolResult }, ct);
_UpdateMessageTypeAndMetadata(tc.Function.Name, toolResult, ref messageType, ref metadata);
messages.Add(new ChatMessage { Role = "tool", Content = JsonSerializer.Serialize(toolResult, JsonOpts), ToolCallId = tc.Id });
}
}
@@ -597,6 +601,40 @@ public static class AiChatEndpoints
}
};
/// <summary>根据工具调用结果更新消息类型和元数据</summary>
private static void _UpdateMessageTypeAndMetadata(string toolName, object toolResult, ref string messageType, ref Dictionary<string, object> metadata)
{
switch (toolName)
{
case "record_health_data":
messageType = "data_confirm";
if (toolResult is IDictionary<string, object> resultDict)
{
if (resultDict.TryGetValue("type", out var type))
metadata["type"] = type.ToString();
if (resultDict.TryGetValue("success", out var success) && success is bool b && b)
metadata["success"] = true;
}
break;
case "manage_medication":
messageType = "medication_confirm";
if (toolResult is IDictionary<string, object> medDict)
{
if (medDict.TryGetValue("name", out var name))
metadata["name"] = name.ToString();
if (medDict.TryGetValue("dosage", out var dosage))
metadata["dosage"] = dosage.ToString();
}
break;
case "estimate_food_text":
messageType = "diet_analysis";
break;
case "analyze_report":
messageType = "report_analysis";
break;
}
}
/// <summary>压缩图片到合理大小供 VLM API 使用</summary>
private static void CompressImage(string inputPath, string outputPath, int maxWidth, long quality)
{

View File

@@ -0,0 +1,283 @@
"""
健康管家 - 全流程端到端测试
模拟真实用户操作注册→登录→各Agent对话→数据录入→查询验证
"""
import urllib.request, urllib.parse, json, sys, time, os
BASE = "http://localhost:5000"
PASSED = 0
FAILED = 0
TOKEN = None
def api(method, path, data=None, token=None, files=None):
"""调用后端 API"""
url = f"{BASE}{path}"
headers = {"Content-Type": "application/json"}
if token:
headers["Authorization"] = f"Bearer {token}"
body = None
if data:
body = json.dumps(data, ensure_ascii=False).encode("utf-8")
req = urllib.request.Request(url, data=body, headers=headers, method=method)
try:
resp = urllib.request.urlopen(req, timeout=30)
return json.loads(resp.read().decode("utf-8"))
except Exception as e:
return {"error": str(e), "code": -1}
def check(name, condition, detail=""):
global PASSED, FAILED
if condition:
PASSED += 1
print(f" [PASS] {name}")
else:
FAILED += 1
print(f" [FAIL] {name} {detail}")
def login(phone="13800000001"):
"""发送验证码 + 登录,返回 token"""
sms = api("POST", "/api/auth/send-sms", {"phone": phone})
code = sms.get("data", {}).get("devCode", "")
if not code:
return None
result = api("POST", "/api/auth/login", {"phone": phone, "smsCode": code})
return result.get("data", {}).get("accessToken")
def sse_stream(token, agent_type, message):
"""连接 SSE 端点,返回所有事件"""
url = f"{BASE}/api/ai/{agent_type}/chat?message={urllib.parse.quote(message)}&token={urllib.parse.quote(token or '')}"
req = urllib.request.Request(url)
events = []
try:
resp = urllib.request.urlopen(req, timeout=60)
for line_bytes in resp:
line = line_bytes.decode("utf-8").strip()
if line.startswith("data: "):
data = line[6:]
if data == "[DONE]":
break
try:
events.append(json.loads(data))
except:
pass
except Exception as e:
events.append({"error": str(e)})
return events
def section(title):
print(f"\n{'='*60}")
print(f" {title}")
print(f"{'='*60}")
# ============================================================
section("1. 认证流程")
# ============================================================
print(" 1.1 发送验证码...")
sms = api("POST", "/api/auth/send-sms", {"phone": "13800000001"})
check("发送验证码", sms.get("code") == 0, str(sms.get("message","")))
code = sms.get("data", {}).get("devCode", "")
print(" 1.2 验证码登录...")
login_result = api("POST", "/api/auth/login", {"phone": "13800000001", "smsCode": code})
check("登录成功", login_result.get("code") == 0)
TOKEN = login_result.get("data", {}).get("accessToken", "")
REFRESH = login_result.get("data", {}).get("refreshToken", "")
check("返回accessToken", len(TOKEN) > 50)
check("返回refreshToken", len(REFRESH) > 20)
print(" 1.3 刷新Token...")
refresh_result = api("POST", "/api/auth/refresh", {"refreshToken": REFRESH})
check("刷新Token成功", refresh_result.get("code") == 0)
check("下发新Token", len(refresh_result.get("data", {}).get("accessToken", "")) > 50)
print(" 1.4 登出...")
logout_result = api("POST", "/api/auth/logout", {"refreshToken": REFRESH})
check("登出成功", logout_result.get("code") == 0)
# 重新登录获取token
TOKEN = login()
# ============================================================
section("2. 用户与档案")
# ============================================================
profile = api("GET", "/api/user/profile", token=TOKEN)
check("获取个人信息", profile.get("code") == 0)
archive = api("GET", "/api/user/health-archive", token=TOKEN)
check("获取健康档案", archive.get("code") == 0)
check("档案有诊断信息", archive.get("data", {}).get("diagnosis") is not None if archive.get("data") else False,
"诊断=" + str(archive.get("data", {}).get("diagnosis", "")))
# ============================================================
section("3. 健康数据 CRUD")
# ============================================================
print(" 3.1 录入血压...")
bp = api("POST", "/api/health-records", token=TOKEN, data={
"type": "BloodPressure", "systolic": 128, "diastolic": 82,
"unit": "mmHg", "source": "Manual"
})
check("录入血压", bp.get("code") == 0, str(bp.get("message","")))
print(" 3.2 录入心率...")
hr = api("POST", "/api/health-records", token=TOKEN, data={
"type": "HeartRate", "value": 72, "unit": "次/分", "source": "Manual"
})
check("录入心率", hr.get("code") == 0, str(hr.get("message","")))
print(" 3.3 录入血糖...")
glu = api("POST", "/api/health-records", token=TOKEN, data={
"type": "Glucose", "value": 5.5, "unit": "mmol/L", "source": "Manual"
})
check("录入血糖", glu.get("code") == 0, str(glu.get("message","")))
print(" 3.4 录入血氧...")
spo2 = api("POST", "/api/health-records", token=TOKEN, data={
"type": "SpO2", "value": 98, "unit": "%", "source": "Manual"
})
check("录入血氧", spo2.get("code") == 0, str(spo2.get("message","")))
print(" 3.5 获取最新数据...")
latest = api("GET", "/api/health-records/latest", token=TOKEN)
check("获取最新数据", latest.get("code") == 0)
check("血压存在", latest.get("data", {}).get("BloodPressure") is not None)
print(" 3.6 获取趋势数据...")
trend = api("GET", "/api/health-records/trend?type=HeartRate&period=7", token=TOKEN)
check("获取趋势数据", trend.get("code") == 0)
# ============================================================
section("4. 用药管理")
# ============================================================
print(" 4.1 获取用药列表...")
meds = api("GET", "/api/medications", token=TOKEN)
check("获取用药列表", meds.get("code") == 0)
print(" 4.2 添加用药...")
new_med = api("POST", "/api/medications", token=TOKEN, data={
"name": "阿司匹林", "dosage": "100mg", "frequency": "Daily",
"timeOfDay": ["08:00"], "source": "Manual", "startDate": "2026-06-02"
})
check("添加用药", new_med.get("code") == 0, str(new_med.get("message","")))
med_id = new_med.get("data", {}).get("id", "")
print(" 4.3 服药打卡...")
if med_id:
confirm = api("POST", f"/api/medications/{med_id}/confirm", token=TOKEN)
check("服药打卡", confirm.get("code") == 0, str(confirm.get("message","")))
# ============================================================
section("5. 饮食记录")
# ============================================================
diet = api("GET", "/api/diet-records?date=2026-06-02", token=TOKEN)
check("查询饮食记录", diet.get("code") == 0)
# ============================================================
section("6. 运动计划")
# ============================================================
print(" 6.1 获取当前计划...")
plan = api("GET", "/api/exercise-plans/current", token=TOKEN)
check("获取当前计划", plan.get("code") == 0)
print(" 6.2 创建运动计划...")
new_plan = api("POST", "/api/exercise-plans", token=TOKEN, data={
"weekStartDate": "2026-06-02",
"items": [
{"dayOfWeek": 1, "exerciseType": "散步", "durationMinutes": 30, "isRestDay": False},
{"dayOfWeek": 3, "exerciseType": "太极", "durationMinutes": 40, "isRestDay": False},
{"dayOfWeek": 5, "exerciseType": "散步", "durationMinutes": 30, "isRestDay": False},
]
})
check("创建运动计划", new_plan.get("code") == 0, str(new_plan.get("message","")))
# ============================================================
section("7. 医生与问诊")
# ============================================================
print(" 7.1 医生列表...")
docs = api("GET", "/api/doctors", token=TOKEN)
check("获取医生列表", docs.get("code") == 0)
check("有医生数据", len(docs.get("data", [])) > 0, f"{len(docs.get('data',[]))}位医生")
print(" 7.2 问诊配额...")
quota = api("GET", "/api/user/consultation-quota", token=TOKEN)
check("获取问诊配额", quota.get("code") == 0)
# ============================================================
section("8. AI 智能体对话")
# ============================================================
agents_to_test = [
("default", "你好,介绍一下你自己"),
("health", "我血压128/82"),
("medication", "我现在在吃什么药"),
("consultation", "最近胸口有点不舒服"),
("diet", "我中午吃了红烧肉和米饭"),
("exercise", "帮我查询运动计划"),
]
for agent_name, msg in agents_to_test:
print(f" 8.{agents_to_test.index((agent_name,msg))+1} {agent_name} Agent: \"{msg[:30]}...\"")
TOKEN = login() # fresh token
events = sse_stream(TOKEN, agent_name, msg)
has_answer = any(e.get("action") == "answer" for e in events)
has_tool = any(e.get("action") == "tool_result" for e in events)
has_conv_id = any(e.get("action") == "conversation_id" for e in events)
errors = [e for e in events if e.get("action") == "error"]
check(f"{agent_name}: 对话建立", has_conv_id)
check(f"{agent_name}: 有回复", has_answer or has_tool,
f"(events: {len(events)}, tools: {has_tool}, answer: {has_answer})")
check(f"{agent_name}: 无错误", len(errors) == 0,
f"errors: {[e.get('message','') for e in errors]}" if errors else "")
# ============================================================
section("9. 对话历史")
# ============================================================
convs = api("GET", "/api/ai/conversations", token=TOKEN)
check("获取对话列表", convs.get("code") == 0)
check("有对话记录", len(convs.get("data", [])) > 0, f"{len(convs.get('data',[]))}")
# ============================================================
section("10. VLM 食物识别")
# ============================================================
# 尝试上传测试图片
test_img = "D:/health_project/食堂三菜一饭热量估算.png"
if os.path.exists(test_img):
# Use subprocess for multipart upload
import subprocess
cmd = [
'curl', '-s', '--max-time', '30', '-X', 'POST',
f'{BASE}/api/ai/analyze-food-image',
'-H', f'Authorization: Bearer {TOKEN}',
'-F', f'images=@{test_img}'
]
r = subprocess.run(cmd, capture_output=True, text=True)
try:
vlm = json.loads(r.stdout)
check("VLM食物识别调通", vlm.get("code") == 0, str(vlm.get("message","")))
has_data = bool(vlm.get("data", ""))
check("VLM返回数据", has_data, f"data长度: {len(str(vlm.get('data','')))}")
except:
check("VLM食物识别", False, "JSON解析失败")
else:
check("VLM测试图片存在", False, f"{test_img} 不存在")
# ============================================================
section("11. 报告列表")
# ============================================================
reports = api("GET", "/api/reports", token=TOKEN)
check("获取报告列表", reports.get("code") == 0)
# ============================================================
section("12. 通知偏好")
# ============================================================
notifs = api("GET", "/api/notifications/preferences", token=TOKEN)
check("获取通知偏好", notifs.get("code") == 0)
# ============================================================
print(f"\n{'='*60}")
print(f" 测试结果: PASS={PASSED} FAIL={FAILED} TOTAL={PASSED+FAILED}")
print(f"{'='*60}")
if FAILED > 0:
sys.exit(1)