
1. 项目概述为什么我们需要一个并发测试工具最近在做一个AI问答系统的测试项目客户的核心诉求很明确系统上线前必须验证在高并发用户提问场景下的稳定性和响应质量。简单用Postman或者单线程脚本跑一下根本模拟不出真实压力。市面上成熟的压测工具像JMeter、LoadRunner对这类需要模拟真实浏览器交互、处理动态Token、执行复杂前端逻辑的场景配置起来又异常繁琐脚本维护成本高。于是我们决定自己动手用 Python Playwright 造一个轮子。这个工具的目标不是取代专业的性能测试工具而是填补一个精准的缺口——针对需要复杂UI交互和状态保持的Web应用尤其是AI对话类应用进行高保真、可编程的并发压力测试。它不仅能模拟成百上千个“真实用户”同时打开网页、登录、连续提问还能精准收集每个请求的响应时间、成功率甚至对AI回答的内容进行基础的质量校验。如果你也在为类似的问题头疼——比如想测试你的ChatGPT套壳应用在100人同时使用时会不会卡顿、丢会话或者想验证一个智能客服系统在流量高峰时的表现——那么这套实践方案或许能给你提供一个清晰的解决路径。接下来我会从设计思路、关键技术拆解、代码实现到踩坑实录完整地分享这次开发实战。2. 工具整体设计与核心思路拆解2.1 核心需求与挑战分析在动手之前我们先明确要解决的具体问题和面临的挑战真实并发模拟需要模拟数十甚至上百个独立用户会话每个会话包含完整的登录、保持登录状态、连续多轮问答。这与简单的HTTP接口压测有本质区别因为涉及浏览器上下文Cookies, LocalStorage、WebSocket连接等状态的隔离。复杂交互的自动化AI问答界面通常不是简单的表单提交。它可能涉及动态元素等待如“正在思考”的加载动画消失。消息流的持续监听回答是逐字输出还是一次性返回。文件上传、代码块渲染等富交互组件的操作。数据采集与断言测试不仅要看系统是否“挂掉”还要关注业务指标性能指标单次问答响应时间Time to First Token, Time to Last Token、页面加载时间。业务指标问答是否成功网络请求成功且前端无报错、回答内容是否相关可设置简单关键词匹配或调用NLP服务做基础校验。可维护性与可配置性测试脚本需要易于根据测试场景并发用户数、提问内容、持续时间进行配置并且当被测系统前端发生UI变更时脚本能相对容易地适配。2.2 技术选型为什么是Python Playwright面对上述挑战我们评估了几种方案Selenium Grid经典方案但需要维护一个Selenium Hub和Node集群资源消耗大且对于大规模并发管理和调试比较麻烦。Puppeteer Cluster基于Node.js能力很强但团队Python技术栈更熟悉且希望测试逻辑能与后端的数据分析脚本更好地集成。Playwright for Python最终胜出。理由如下强大的异步支持Playwright Python API 原生支持asyncio这是实现高效并发的基石。我们可以用asyncio.gather轻松管理数百个并发的浏览器上下文Context和页面Page而无需自己管理复杂的线程池。自动等待与可靠性Playwright 的操作如click,fill内置了智能等待会等待元素可操作、网络空闲等大大减少了编写time.sleep的需求让测试脚本更健壮。多浏览器支持与无头模式一套脚本可跑 Chromium, Firefox, WebKit。无头Headless模式在服务器上运行资源占用低且现代无头模式已能完美执行几乎所有渲染和交互。丰富的调试工具playwright codegen可以录制脚本playwright inspector可以实时调试trace viewer可以录制并可视化整个操作过程这对编写和排查复杂交互脚本极其友好。网络拦截与Mock可以轻松拦截和修改网络请求这对于模拟异常场景如网络延迟、API失败或注入测试数据非常有用。因此技术栈确定为Python 3.8 Playwright asyncio pytest可选用于组织测试用例。2.3 架构设计一个轻量级并发测试运行器我们的工具不追求大而全的测试平台核心是一个并发的测试任务运行器。其架构可以抽象为以下几个模块配置管理模块读取YAML或JSON配置文件定义全局参数如基础URL、总并发用户数、每个用户的提问次数、提问间隔、提问语料库、登录账号列表等。用户会话池核心模块。负责创建和管理一批独立的“虚拟用户”。每个用户是一个独立的asyncio.Task绑定一个Playwright的BrowserContext。Context天然隔离了Cookies、本地存储完美模拟独立用户。业务流程封装将登录、提问、登出等操作封装成可重用的异步函数。每个虚拟用户任务就是循环执行这些业务流程。监控与数据收集器在每个关键操作步骤如页面加载、请求发送、响应接收打点记录耗时和结果状态。数据可以实时输出到控制台也可以写入CSV、数据库或时序数据库如InfluxDB供后续分析。结果汇总与报告生成器所有虚拟用户任务结束后汇总成功率、平均响应时间、百分位数P95, P99等指标生成HTML或Markdown格式的测试报告。这个架构的优点是清晰、轻量、易于扩展。你可以很容易地增加新的业务流程如“先搜索再提问”或者更换数据收集的后端。3. 核心细节解析与实操要点3.1 环境搭建与Playwright初始化工欲善其事必先利其器。第一步是搭建一个稳定可复现的测试环境。# 1. 创建项目并初始化虚拟环境强烈推荐 python -m venv venv source venv/bin/activate # Linux/Mac # venv\Scripts\activate # Windows # 2. 安装核心依赖 pip install playwright pip install pytest pytest-asyncio # 如果使用pytest组织用例 pip install pyyaml # 用于读取YAML配置 pip install pandas # 用于数据处理和报告生成 # 3. 安装Playwright所需的浏览器内核 playwright install chromium # 通常安装Chromium就够了更轻量关键点与避坑指南虚拟环境是必须的避免全局Python包污染尤其是Playwright会安装浏览器放在虚拟环境里管理更干净。选择Chromium对于自动化测试Chromium足够稳定且性能最好。Firefox和WebKit可能在渲染细节上有差异除非你有跨浏览器测试需求否则用Chromium即可。网络问题playwright install可能会因为网络问题失败。可以尝试设置环境变量使用国内镜像或者手动下载浏览器包。最稳的办法是提前在有网的环境下载好再拷贝到测试服务器。无头模式生产环境运行务必使用无头模式headlessTrue可以节省大量资源。调试时可以先设为False观察浏览器行为。3.2 实现高并发asyncio与Playwright Context的配合这是整个工具最核心的部分。目标是创建N个并发的虚拟用户每个用户互不干扰。import asyncio from playwright.async_api import async_playwright async def single_user_task(user_id, config): 单个虚拟用户的测试任务 :param user_id: 用户标识可用于区分数据和登录账号 :param config: 全局配置字典 async with async_playwright() as p: # 为每个用户启动一个独立的浏览器实例更彻底隔离但资源消耗大 # browser await p.chromium.launch(headlessTrue) # 更推荐共用一个Browser实例但为每个用户创建独立的Context资源利用率高且足够隔离 browser await p.chromium.launch(headlessTrue) context await browser.new_context() page await context.new_page() try: # 1. 登录 await login(page, user_id, config) # 2. 执行多轮问答 for i in range(config[questions_per_user]): question get_question(i, user_id, config) # 从语料库获取问题 start_time asyncio.get_event_loop().time() answer await ask_question(page, question) end_time asyncio.get_event_loop().time() record_metrics(user_id, i, question, answer, end_time-start_time) await asyncio.sleep(config[think_time]) # 模拟用户思考间隔 # 3. 登出可选 await logout(page) except Exception as e: print(fUser {user_id} 任务失败: {e}) record_failure(user_id, e) finally: await context.close() await browser.close() async def main(config): 主函数并发运行所有用户任务 tasks [] for i in range(config[concurrent_users]): task asyncio.create_task(single_user_task(i, config)) tasks.append(task) # 等待所有并发任务完成 await asyncio.gather(*tasks, return_exceptionsTrue) # return_exceptions防止一个任务失败导致整个程序崩溃 print(所有用户任务执行完毕) generate_report() if __name__ __main__: config load_config(config.yaml) asyncio.run(main(config))核心要点解析Browser vs Context vs PageBrowser对应一个浏览器进程。创建成本高。Context浏览器上下文。相当于一个独立的“隐身会话”拥有独立的Cookie、本地存储、缓存。这是我们实现用户隔离的核心。所有用户共享同一个Browser进程但各自拥有独立的Context这在资源消耗和隔离性上取得了最佳平衡。PageContext中的标签页。一个Context可以有多个Page。asyncio.create_task与asyncio.gather这是实现并发的关键。create_task将协程函数包装成任务放入事件循环中调度。gather则并发地运行所有这些任务并等待它们全部完成。return_exceptionsTrue参数至关重要它确保即使某个虚拟用户任务因为网络波动或页面元素未找到而抛出异常也不会影响其他用户任务的执行主程序会收集所有异常最后统一处理。这保证了测试的健壮性。资源限制并发数不是越高越好。每个Context都会占用内存和CPU。你需要根据测试机器的配置内存、CPU核心数来调整concurrent_users。一个经验值是在8GB内存的机器上稳定运行50-100个Chromium Context是可行的但需要监控内存使用情况防止OOM内存溢出。3.3 编写健壮的页面交互函数模拟用户操作是UI自动化的基本功但对于动态加载的AI问答页面需要格外小心。async def ask_question(page, question): 在页面上完成一次提问并获取回答 # 1. 定位输入框并输入问题 - 使用更稳定的定位方式 # 避免使用易变的文本或索引优先使用data-testid等测试属性或稳定的CSS选择器 input_selector textarea[placeholder*输入] # 使用属性包含匹配 # 或者input_selector [data-testidquestion-input] await page.wait_for_selector(input_selector, statevisible, timeout10000) await page.fill(input_selector, question) # 2. 点击发送按钮 # 同样优先寻找有明确标识的按钮 send_button_selector button:has-text(发送) # 文本匹配 # 或者send_button_selector button[typesubmit] await page.click(send_button_selector) # 3. 等待AI回答开始出现这是关键 # 策略一等待“正在思考”之类的加载元素出现再消失 thinking_selector .thinking-indicator, [aria-label*思考] try: await page.wait_for_selector(thinking_selector, statevisible, timeout5000) await page.wait_for_selector(thinking_selector, statehidden, timeout30000) # 等待其消失 except Exception as e: print(f未检测到明确的‘思考中’状态继续后续等待。) # 可能系统没有加载提示直接进入下一步 # 策略二等待回答区域出现新内容更通用 answer_container_selector .chat-message-bot:last-of-type .content # 等待新回答的容器出现 await page.wait_for_selector(answer_container_selector, timeout30000) # 4. 获取完整的回答文本 # 对于流式输出可能需要等待文本稳定 last_answer_text for _ in range(10): # 最多轮询10次每次间隔1秒 await asyncio.sleep(1) current_text await page.text_content(answer_container_selector) if current_text and current_text ! last_answer_text: last_answer_text current_text else: # 文本连续两次没有变化认为回答完毕 break # 5. 可选简单的内容断言 if len(last_answer_text.strip()) 5: raise AssertionError(f回答内容过短或为空: {last_answer_text}) # 可以加入关键词检查例如检查是否包含“错误”、“抱歉”等异常词汇 if error in last_answer_text.lower() or sorry in last_answer_text.lower(): print(f警告回答中可能包含错误信息: {last_answer_text[:100]}...) return last_answer_text避坑经验选择器策略绝对不要依赖元素的绝对CSS路径如div div:nth-child(3) span前端一个微小的改动就会导致脚本失效。优先与开发团队约定使用>async def ask_question_with_monitor(page, question): # 监听特定的API请求 response_promise page.wait_for_response(lambda response: /api/chat in response.url) # ... 执行提问操作 ... response await response_promise if response.status ! 200: raise Exception(f问答API请求失败: {response.status}) # 还可以解析response.json()来获取更精确的服务器响应时间4. 实操过程与核心环节实现4.1 配置化驱动测试将测试参数外置到配置文件使工具无需修改代码就能适应不同测试场景。config.yaml示例base_url: https://your-ai-app.com concurrent_users: 20 ramp_up_time: 10 # 并发用户启动的斜坡时间秒避免瞬间打满 questions_per_user: 10 think_time_range: [1, 5] # 用户每次提问后的随机等待时间范围秒 timeout_per_action: 30000 # 每个操作如等待回答的超时时间毫秒 login: enabled: true url: /login username_selector: #username password_selector: #password submit_selector: button[typesubmit] # 用户凭证池可以从外部文件读取 credentials: - {username: user1test.com, password: pass123} - {username: user2test.com, password: pass123} question_pool: # 问题列表可以是一个文件路径或直接写在配置里 file: ./data/questions.txt # 或者 # questions: # - 请解释一下量子计算的基本原理。 # - 用Python写一个快速排序函数。 metrics: output_format: csv # csv, json, influxdb csv_path: ./results/test_run_{{timestamp}}.csv在代码中使用pyyaml加载配置并使用string.Template或jinja2来处理配置中的动态变量如{{timestamp}}。4.2 实现数据收集与监控没有数据性能测试就失去了意义。我们需要在关键路径上埋点。import csv import time from datetime import datetime class MetricsCollector: def __init__(self, output_path): self.output_path output_path self._init_csv() def _init_csv(self): with open(self.output_path, w, newline, encodingutf-8) as f: writer csv.writer(f) writer.writerow([timestamp, user_id, question_id, question, answer_length, response_time_ms, status, error]) def record_success(self, user_id, q_id, question, answer, response_time): 记录一次成功的问答 with open(self.output_path, a, newline, encodingutf-8) as f: writer csv.writer(f) writer.writerow([ datetime.now().isoformat(), user_id, q_id, question[:100], # 截断防止过长 len(answer), round(response_time * 1000, 2), # 秒转毫秒 SUCCESS, ]) def record_failure(self, user_id, q_id, question, error_msg): 记录一次失败的问答 with open(self.output_path, a, newline, encodingutf-8) as f: writer csv.writer(f) writer.writerow([ datetime.now().isoformat(), user_id, q_id, question[:100], 0, 0, FAILURE, str(error_msg)[:200] # 截断错误信息 ]) # 在 single_user_task 中使用 collector MetricsCollector(config[metrics][csv_path]) # ... 在ask_question成功后 ... collector.record_success(user_id, i, question, answer, end_time-start_time)进阶监控对于更复杂的场景可以集成influxdb-client库将实时指标写入InfluxDB然后用Grafana制作实时监控看板观察测试过程中系统的TPS、响应时间曲线、错误率等。4.3 测试执行与资源管理直接运行main函数可能会因为并发数过高导致机器资源耗尽。我们需要引入一些控制机制。import asyncio import signal class ConcurrentTestRunner: def __init__(self, config): self.config config self.tasks [] self.stop_signal False async def _ramp_up_users(self): 斜坡式启动用户避免对系统造成瞬间冲击 users_per_batch max(1, self.config[concurrent_users] // 10) delay self.config[ramp_up_time] / (self.config[concurrent_users] / users_per_batch) for i in range(0, self.config[concurrent_users], users_per_batch): if self.stop_signal: break batch [] for j in range(users_per_batch): if i j self.config[concurrent_users]: user_id i j task asyncio.create_task(single_user_task(user_id, self.config)) batch.append(task) self.tasks.extend(batch) print(f已启动批次 {i//users_per_batch 1}, 当前活跃任务数: {len(self.tasks)}) await asyncio.sleep(delay) # 批次间等待 async def run(self): 运行测试 print(f开始测试计划并发用户数: {self.config[concurrent_users]}) await self._ramp_up_users() print(所有用户任务已启动等待执行完毕...) # 等待所有任务完成或收到停止信号 done, pending await asyncio.wait(self.tasks, return_whenasyncio.FIRST_EXCEPTION, timeoutself.config.get(test_duration, None)) # 处理结果... def stop(self): 优雅停止 self.stop_signal True for task in self.tasks: task.cancel() print(收到停止信号正在取消任务...) # 处理CtrlC信号优雅退出 def signal_handler(runner): def handler(signum, frame): print(\n检测到中断信号开始优雅停止...) runner.stop() return handler async def main(): config load_config(config.yaml) runner ConcurrentTestRunner(config) # 注册信号处理器 signal.signal(signal.SIGINT, signal_handler(runner)) try: await runner.run() except asyncio.CancelledError: print(测试被取消) finally: await generate_report(config) # 即使被中断也尝试生成部分报告 if __name__ __main__: asyncio.run(main())这个ConcurrentTestRunner类提供了两个重要特性斜坡启动分批启动虚拟用户模拟真实世界中用户逐渐涌入的场景比瞬间并发更能暴露一些资源竞争问题。优雅停止捕获CtrlC信号取消所有正在运行的任务并尝试生成一份包含已收集数据的测试报告避免测试数据丢失。5. 常见问题与排查技巧实录在实际开发和使用过程中我们踩过不少坑。这里总结几个最具代表性的问题及其解决方案。5.1 问题一并发数上去后出现大量超时或浏览器崩溃现象当设置concurrent_users100时运行几分钟后大量任务报TimeoutError或TargetClosedError浏览器页面崩溃。根因分析内存耗尽每个Playwright Context都会消耗内存几十到上百MB。100个Context可能吃掉数GB内存导致系统开始交换Swap性能急剧下降最终浏览器进程被OOM Killer终止。CPU过载浏览器渲染和JavaScript执行是CPU密集型任务。并发数超过CPU核心数太多会导致严重的上下文切换开销所有任务都变慢触发超时。端口耗尽每个网络请求都可能打开一个临时端口。超高并发下可能耗尽本地可用端口范围。解决方案限制并发数根据测试机器硬件配置设定一个合理的上限。一个粗略的公式最大并发数 ≈ (可用内存GB * 1024) / 每个Context预估内存MB。例如8GB内存预留2GB给系统每个Context预估80MB则(6*1024)/80 ≈ 76。建议从较低并发数如20开始逐步增加同时监控系统资源htop,nmon。优化Context配置创建Context时可以禁用不必要的功能来节省资源context await browser.new_context( viewport{width: 1280, height: 720}, # 固定视口大小 java_script_enabledTrue, # 必须为True ignore_https_errorsTrue, # 忽略HTTPS证书错误测试环境 # 禁用图片、视频、字体加载大幅提升速度并减少内存 bypass_cspTrue, # 绕过内容安全策略谨慎使用 ) # 更激进的资源拦截 await context.route(**/*.{png,jpg,jpeg,svg,gif,woff,woff2}, lambda route: route.abort())使用更轻量的浏览器Playwright的chromium.launch可以传递args参数来禁用更多功能如--disable-gpu,--disable-dev-shm-usage在Docker中常用--no-sandbox。分布式执行如果单机资源无法满足可以考虑将用户任务分发到多台机器上执行需要一个中心节点来协调任务分配和汇总结果。5.2 问题二元素定位失败脚本不稳定现象脚本有时能成功有时失败报错Error: locator.click: Timeout 30000ms exceeded。根因分析页面加载时间波动网络延迟或后端响应慢导致页面元素出现时间超过脚本设置的默认等待时间。动态内容导致选择器失效前端框架如React, Vue在数据更新时可能会改变DOM结构或属性使得之前稳定的选择器失效。竞争条件在元素还未完全可交互如禁用状态时就尝试点击。解决方案增加智能等待不要滥用page.wait_for_timeout(5000)这种固定等待。使用Playwright内置的等待条件# 等待元素可见并可交互 await page.locator(button.submit).wait_for(statevisible) await page.locator(button.submit).wait_for(stateattached) # 甚至可以直接在操作中等待 await page.locator(button.submit).click(timeout10000) # 给click操作本身设置更长超时使用更稳健的定位器文本定位page.locator(text登录)。但注意文本可能变化或国际化。CSS 文本page.locator(button:has-text(发送))。XPath虽然强大但易碎谨慎使用。page.locator(xpath//button[aria-labelSend message])。最佳实践与前端开发约定为关键测试元素添加>import functools import asyncio async def retry_on_failure(func, max_attempts3, delay1): for attempt in range(max_attempts): try: return await func() except Exception as e: if attempt max_attempts - 1: raise print(f尝试 {func.__name__} 失败 (第{attempt1}次){delay}秒后重试... 错误: {e}) await asyncio.sleep(delay) # 使用 await retry_on_failure(lambda: page.click(button.submit))5.3 问题三如何验证AI回答的内容质量挑战性能测试关注“快不快”但功能测试还要关注“对不对”。如何自动化判断AI的回答是否相关、准确解决方案根据投入成本分级基础校验低成本非空检查回答长度大于阈值。关键词匹配对于特定问题检查回答中是否包含预期的关键词。例如问“Python的创始人是谁”检查回答中是否包含“Guido van Rossum”。否定词检查检查是否包含“抱歉”、“我不知道”、“错误”等词汇这可能表示模型未能回答问题。中级校验中等成本嵌入向量相似度使用句子转换器如sentence-transformers将问题和回答都转换为向量计算余弦相似度。设定一个阈值低于阈值则认为不相关。这比关键词匹配更灵活。调用另一个LLM进行评估使用一个轻量级或免费的LLM API如OpenAI GPT-3.5-turbo、Claude Haiku设计Prompt让其判断“回答是否针对问题”。这种方法更智能但成本较高且速度慢。高级校验高成本离线进行将测试中收集的所有问答对保存下来由人工或更复杂的评估流程进行事后分析。这通常用于版本间的回归测试对比。在我们的工具中我们实现了基础校验和简单的嵌入相似度校验作为可选的断言模块。关键在于这些校验不能严重影响测试执行速度所以复杂的评估应异步进行或放在测试后分析阶段。5.4 问题四测试报告不够直观痛点CSV文件数据虽然详细但不够直观难以快速发现问题。解决方案使用pandasmatplotlib或plotly生成可视化报告。import pandas as pd import matplotlib.pyplot as plt def generate_visual_report(csv_path): df pd.read_csv(csv_path) # 计算总体指标 total_requests len(df) success_rate (df[status] SUCCESS).sum() / total_requests * 100 avg_response_time df[df[status]SUCCESS][response_time_ms].mean() p95_response_time df[df[status]SUCCESS][response_time_ms].quantile(0.95) print(f总请求数: {total_requests}) print(f成功率: {success_rate:.2f}%) print(f平均响应时间: {avg_response_time:.2f} ms) print(fP95响应时间: {p95_response_time:.2f} ms) # 绘制响应时间分布直方图 plt.figure(figsize(10, 6)) success_df df[df[status]SUCCESS] plt.hist(success_df[response_time_ms], bins50, edgecolorblack, alpha0.7) plt.axvline(avg_response_time, colorr, linestyle--, labelf平均 ({avg_response_time:.0f}ms)) plt.axvline(p95_response_time, colorg, linestyle--, labelfP95 ({p95_response_time:.0f}ms)) plt.xlabel(响应时间 (ms)) plt.ylabel(频次) plt.title(AI问答响应时间分布) plt.legend() plt.grid(True, alpha0.3) plt.savefig(./results/response_time_distribution.png, dpi150) plt.close() # 绘制随时间变化的成功率折线图如果数据有时间戳 if timestamp in df.columns: df[timestamp] pd.to_datetime(df[timestamp]) df.set_index(timestamp, inplaceTrue) # 按分钟聚合成功率 success_series df[status].resample(1min).apply(lambda x: (xSUCCESS).sum() / len(x) * 100) plt.figure(figsize(12, 5)) success_series.plot() plt.axhline(95, colorr, linestyle--, labelSLA 95%) plt.ylabel(成功率 (%)) plt.title(每分钟成功率趋势) plt.legend() plt.grid(True, alpha0.3) plt.tight_layout() plt.savefig(./results/success_rate_trend.png, dpi150) plt.close()将生成的图表嵌入到HTML报告中可以一目了然地看到性能趋势和瓶颈点。开发这个工具的过程是一个典型的“用自动化解决自动化测试中的痛点”的案例。它可能没有商业压测工具那么功能全面但贵在高度定制、贴合业务、成本可控。通过Python和Playwright的组合我们能够以相对低的复杂度构建出一个能模拟真实用户行为、提供深入洞察的并发测试利器。最关键的是整个技术栈对测试团队非常友好易于维护和扩展。下次当你需要对一个复杂的Web应用进行压力测试时不妨考虑一下这个思路。