Files
AI-Health/backend/tests/full_e2e_test.py
MingNian c6395ea9b4 feat: 全功能前后端联调完成,47/47 测试通过
前端:
- 新增 DietCapturePage 独立拍照识别页
- 5种消息卡片类型完整实现(数据确认/用药/饮食/报告/快捷选项)
- 任务卡片区:异常警告+数据摘要+自动折叠
- 侧滑抽屉:历史对话列表+对话管理
- 运动计划:进度卡片+创建计划+每日打卡
- 报告页:拍照/相册/PDF上传+分析
- 面板按钮补全血氧/体重录入
- UI 升级:紫色主题+动画+气泡样式
- 全部迁移 Riverpod 3.x API

后端:
- 新增 _UpdateMessageTypeAndMetadata,Tool Calling 自动映射消息类型
- SSE answer 事件携带 type 字段
- 提示词优化(移除"阿福",语气规则归位)
- 运动计划支持 AI 创建和打卡

测试:
- 新增 full_e2e_test.py 全流程测试(认证/数据CRUD/6个Agent对话/VLM/报告)
2026-06-02 20:31:22 +08:00

284 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
健康管家 - 全流程端到端测试
模拟真实用户操作注册→登录→各Agent对话→数据录入→查询验证
"""
import urllib.request, urllib.parse, json, sys, time, os
BASE = "http://localhost:5000"
PASSED = 0
FAILED = 0
TOKEN = None
def api(method, path, data=None, token=None, files=None):
"""调用后端 API"""
url = f"{BASE}{path}"
headers = {"Content-Type": "application/json"}
if token:
headers["Authorization"] = f"Bearer {token}"
body = None
if data:
body = json.dumps(data, ensure_ascii=False).encode("utf-8")
req = urllib.request.Request(url, data=body, headers=headers, method=method)
try:
resp = urllib.request.urlopen(req, timeout=30)
return json.loads(resp.read().decode("utf-8"))
except Exception as e:
return {"error": str(e), "code": -1}
def check(name, condition, detail=""):
global PASSED, FAILED
if condition:
PASSED += 1
print(f" [PASS] {name}")
else:
FAILED += 1
print(f" [FAIL] {name} {detail}")
def login(phone="13800000001"):
"""发送验证码 + 登录,返回 token"""
sms = api("POST", "/api/auth/send-sms", {"phone": phone})
code = sms.get("data", {}).get("devCode", "")
if not code:
return None
result = api("POST", "/api/auth/login", {"phone": phone, "smsCode": code})
return result.get("data", {}).get("accessToken")
def sse_stream(token, agent_type, message):
"""连接 SSE 端点,返回所有事件"""
url = f"{BASE}/api/ai/{agent_type}/chat?message={urllib.parse.quote(message)}&token={urllib.parse.quote(token or '')}"
req = urllib.request.Request(url)
events = []
try:
resp = urllib.request.urlopen(req, timeout=60)
for line_bytes in resp:
line = line_bytes.decode("utf-8").strip()
if line.startswith("data: "):
data = line[6:]
if data == "[DONE]":
break
try:
events.append(json.loads(data))
except:
pass
except Exception as e:
events.append({"error": str(e)})
return events
def section(title):
print(f"\n{'='*60}")
print(f" {title}")
print(f"{'='*60}")
# ============================================================
section("1. 认证流程")
# ============================================================
print(" 1.1 发送验证码...")
sms = api("POST", "/api/auth/send-sms", {"phone": "13800000001"})
check("发送验证码", sms.get("code") == 0, str(sms.get("message","")))
code = sms.get("data", {}).get("devCode", "")
print(" 1.2 验证码登录...")
login_result = api("POST", "/api/auth/login", {"phone": "13800000001", "smsCode": code})
check("登录成功", login_result.get("code") == 0)
TOKEN = login_result.get("data", {}).get("accessToken", "")
REFRESH = login_result.get("data", {}).get("refreshToken", "")
check("返回accessToken", len(TOKEN) > 50)
check("返回refreshToken", len(REFRESH) > 20)
print(" 1.3 刷新Token...")
refresh_result = api("POST", "/api/auth/refresh", {"refreshToken": REFRESH})
check("刷新Token成功", refresh_result.get("code") == 0)
check("下发新Token", len(refresh_result.get("data", {}).get("accessToken", "")) > 50)
print(" 1.4 登出...")
logout_result = api("POST", "/api/auth/logout", {"refreshToken": REFRESH})
check("登出成功", logout_result.get("code") == 0)
# 重新登录获取token
TOKEN = login()
# ============================================================
section("2. 用户与档案")
# ============================================================
profile = api("GET", "/api/user/profile", token=TOKEN)
check("获取个人信息", profile.get("code") == 0)
archive = api("GET", "/api/user/health-archive", token=TOKEN)
check("获取健康档案", archive.get("code") == 0)
check("档案有诊断信息", archive.get("data", {}).get("diagnosis") is not None if archive.get("data") else False,
"诊断=" + str(archive.get("data", {}).get("diagnosis", "")))
# ============================================================
section("3. 健康数据 CRUD")
# ============================================================
print(" 3.1 录入血压...")
bp = api("POST", "/api/health-records", token=TOKEN, data={
"type": "BloodPressure", "systolic": 128, "diastolic": 82,
"unit": "mmHg", "source": "Manual"
})
check("录入血压", bp.get("code") == 0, str(bp.get("message","")))
print(" 3.2 录入心率...")
hr = api("POST", "/api/health-records", token=TOKEN, data={
"type": "HeartRate", "value": 72, "unit": "次/分", "source": "Manual"
})
check("录入心率", hr.get("code") == 0, str(hr.get("message","")))
print(" 3.3 录入血糖...")
glu = api("POST", "/api/health-records", token=TOKEN, data={
"type": "Glucose", "value": 5.5, "unit": "mmol/L", "source": "Manual"
})
check("录入血糖", glu.get("code") == 0, str(glu.get("message","")))
print(" 3.4 录入血氧...")
spo2 = api("POST", "/api/health-records", token=TOKEN, data={
"type": "SpO2", "value": 98, "unit": "%", "source": "Manual"
})
check("录入血氧", spo2.get("code") == 0, str(spo2.get("message","")))
print(" 3.5 获取最新数据...")
latest = api("GET", "/api/health-records/latest", token=TOKEN)
check("获取最新数据", latest.get("code") == 0)
check("血压存在", latest.get("data", {}).get("BloodPressure") is not None)
print(" 3.6 获取趋势数据...")
trend = api("GET", "/api/health-records/trend?type=HeartRate&period=7", token=TOKEN)
check("获取趋势数据", trend.get("code") == 0)
# ============================================================
section("4. 用药管理")
# ============================================================
print(" 4.1 获取用药列表...")
meds = api("GET", "/api/medications", token=TOKEN)
check("获取用药列表", meds.get("code") == 0)
print(" 4.2 添加用药...")
new_med = api("POST", "/api/medications", token=TOKEN, data={
"name": "阿司匹林", "dosage": "100mg", "frequency": "Daily",
"timeOfDay": ["08:00"], "source": "Manual", "startDate": "2026-06-02"
})
check("添加用药", new_med.get("code") == 0, str(new_med.get("message","")))
med_id = new_med.get("data", {}).get("id", "")
print(" 4.3 服药打卡...")
if med_id:
confirm = api("POST", f"/api/medications/{med_id}/confirm", token=TOKEN)
check("服药打卡", confirm.get("code") == 0, str(confirm.get("message","")))
# ============================================================
section("5. 饮食记录")
# ============================================================
diet = api("GET", "/api/diet-records?date=2026-06-02", token=TOKEN)
check("查询饮食记录", diet.get("code") == 0)
# ============================================================
section("6. 运动计划")
# ============================================================
print(" 6.1 获取当前计划...")
plan = api("GET", "/api/exercise-plans/current", token=TOKEN)
check("获取当前计划", plan.get("code") == 0)
print(" 6.2 创建运动计划...")
new_plan = api("POST", "/api/exercise-plans", token=TOKEN, data={
"weekStartDate": "2026-06-02",
"items": [
{"dayOfWeek": 1, "exerciseType": "散步", "durationMinutes": 30, "isRestDay": False},
{"dayOfWeek": 3, "exerciseType": "太极", "durationMinutes": 40, "isRestDay": False},
{"dayOfWeek": 5, "exerciseType": "散步", "durationMinutes": 30, "isRestDay": False},
]
})
check("创建运动计划", new_plan.get("code") == 0, str(new_plan.get("message","")))
# ============================================================
section("7. 医生与问诊")
# ============================================================
print(" 7.1 医生列表...")
docs = api("GET", "/api/doctors", token=TOKEN)
check("获取医生列表", docs.get("code") == 0)
check("有医生数据", len(docs.get("data", [])) > 0, f"{len(docs.get('data',[]))}位医生")
print(" 7.2 问诊配额...")
quota = api("GET", "/api/user/consultation-quota", token=TOKEN)
check("获取问诊配额", quota.get("code") == 0)
# ============================================================
section("8. AI 智能体对话")
# ============================================================
agents_to_test = [
("default", "你好,介绍一下你自己"),
("health", "我血压128/82"),
("medication", "我现在在吃什么药"),
("consultation", "最近胸口有点不舒服"),
("diet", "我中午吃了红烧肉和米饭"),
("exercise", "帮我查询运动计划"),
]
for agent_name, msg in agents_to_test:
print(f" 8.{agents_to_test.index((agent_name,msg))+1} {agent_name} Agent: \"{msg[:30]}...\"")
TOKEN = login() # fresh token
events = sse_stream(TOKEN, agent_name, msg)
has_answer = any(e.get("action") == "answer" for e in events)
has_tool = any(e.get("action") == "tool_result" for e in events)
has_conv_id = any(e.get("action") == "conversation_id" for e in events)
errors = [e for e in events if e.get("action") == "error"]
check(f"{agent_name}: 对话建立", has_conv_id)
check(f"{agent_name}: 有回复", has_answer or has_tool,
f"(events: {len(events)}, tools: {has_tool}, answer: {has_answer})")
check(f"{agent_name}: 无错误", len(errors) == 0,
f"errors: {[e.get('message','') for e in errors]}" if errors else "")
# ============================================================
section("9. 对话历史")
# ============================================================
convs = api("GET", "/api/ai/conversations", token=TOKEN)
check("获取对话列表", convs.get("code") == 0)
check("有对话记录", len(convs.get("data", [])) > 0, f"{len(convs.get('data',[]))}")
# ============================================================
section("10. VLM 食物识别")
# ============================================================
# 尝试上传测试图片
test_img = "D:/health_project/食堂三菜一饭热量估算.png"
if os.path.exists(test_img):
# Use subprocess for multipart upload
import subprocess
cmd = [
'curl', '-s', '--max-time', '30', '-X', 'POST',
f'{BASE}/api/ai/analyze-food-image',
'-H', f'Authorization: Bearer {TOKEN}',
'-F', f'images=@{test_img}'
]
r = subprocess.run(cmd, capture_output=True, text=True)
try:
vlm = json.loads(r.stdout)
check("VLM食物识别调通", vlm.get("code") == 0, str(vlm.get("message","")))
has_data = bool(vlm.get("data", ""))
check("VLM返回数据", has_data, f"data长度: {len(str(vlm.get('data','')))}")
except:
check("VLM食物识别", False, "JSON解析失败")
else:
check("VLM测试图片存在", False, f"{test_img} 不存在")
# ============================================================
section("11. 报告列表")
# ============================================================
reports = api("GET", "/api/reports", token=TOKEN)
check("获取报告列表", reports.get("code") == 0)
# ============================================================
section("12. 通知偏好")
# ============================================================
notifs = api("GET", "/api/notifications/preferences", token=TOKEN)
check("获取通知偏好", notifs.get("code") == 0)
# ============================================================
print(f"\n{'='*60}")
print(f" 测试结果: PASS={PASSED} FAIL={FAILED} TOTAL={PASSED+FAILED}")
print(f"{'='*60}")
if FAILED > 0:
sys.exit(1)