
1. 项目概述为什么是Locust如果你正在寻找一个能模拟成千上万用户、用代码定义用户行为、并且结果报告清晰直观的性能测试工具那么Locust很可能就是你的答案。它不是那种需要你在界面上点点划划的“录制-回放”式工具而是一个完全基于Python代码的开源框架。这意味着你的测试脚本就是普通的Python文件你可以用你熟悉的任何Python库来构造复杂的业务场景比如从数据库读取测试数据、对响应进行复杂的断言、或者集成到你的CI/CD流水线中。对于开发者和测试工程师来说这种“代码即脚本”的方式提供了极大的灵活性和控制力。Locust的核心思想很巧妙它用“蝗虫”来比喻并发用户。每个虚拟用户蝗虫都是一个独立的协程它们按照你定义的“任务”去执行操作比如访问网页、提交表单。这些用户的行为是随机的、并发的更贴近真实世界的场景。你只需要定义好用户会做什么以及他们做这些事的权重Locust就会帮你管理所有并发、收集数据并生成实时的Web UI报告。相比于JMeter这类工具Locust在模拟高并发、分布式压测以及测试脚本的维护性上有着独特的优势。这篇实战攻略就是带你从零开始深入掌握Locust的核心用法、高级技巧以及那些官方文档里不会写的“踩坑”经验。2. 环境准备与核心概念解析2.1 搭建你的Locust测试环境上手Locust的第一步是准备环境。由于Locust是Python包所以你需要一个Python环境。我强烈建议使用Python 3.7或更高版本并且使用虚拟环境来管理依赖避免污染系统环境。# 1. 创建并进入一个项目目录 mkdir locust_demo cd locust_demo # 2. 创建虚拟环境以venv为例 python -m venv venv # 3. 激活虚拟环境 # 在Windows上 venv\Scripts\activate # 在Linux/macOS上 source venv/bin/activate # 4. 安装Locust pip install locust安装完成后可以通过locust -V命令来验证安装是否成功。这里有一个关键点Locust的安装会附带一个轻量级的HTTP客户端但对于生产级压测我们通常会用更强大的locust-plugins库或者直接使用requests库。后续我们会详细讨论客户端的选型。2.2 理解Locust的四大核心组件在动手写脚本之前必须理解Locust的四个核心类它们构成了你测试脚本的骨架HttpUser或User类这是你的虚拟用户蓝图。每个模拟用户都是这个类的一个实例。HttpUser是Locust内置的、最常用的类它自带了一个client属性这是一个HttpSession的实例用于发送HTTP请求。你可以继承HttpUser来定义你的用户。TaskSet类任务集。用于将一系列任务API调用、页面访问组织在一起。你可以把它理解为用户的“行为模式”或“业务场景”。一个HttpUser可以包含一个TaskSet。task装饰器这是定义“任务”的关键。你用这个装饰器来标记一个类方法告诉Locust这是一个虚拟用户可能执行的操作。你可以通过task(权重)来设置不同任务被执行的相对概率。on_start和on_stop方法这两个是生命周期方法。on_start在每个虚拟用户开始执行任务前调用一次常用于登录、获取令牌等初始化操作。on_stop则在用户停止运行后调用用于清理工作。理解这四者的关系至关重要Locust启动后会生成指定数量的HttpUser实例。每个实例在启动时会运行一次on_start然后进入一个循环从其tasks属性定义的任务列表中按照权重随机选取并执行被task装饰的方法。如果任务定义在TaskSet中则用户会进入该任务集执行。3. 编写你的第一个Locust性能测试脚本理论说再多不如动手。我们来创建一个最经典的例子测试一个待办事项TodoAPI服务的性能。假设这个服务有列出待办项和创建待办项两个接口。3.1 基础脚本结构创建一个名为locustfile.py的文件这是Locust默认寻找的入口文件并写入以下内容from locust import HttpUser, task, between class TodoUser(HttpUser): # wait_time 定义了用户在执行每个任务后等待的时间范围秒 # between(1, 5) 表示等待1到5秒之间的一个随机时间模拟用户思考时间 wait_time between(1, 5) # 每个用户实例启动时执行用于初始化如登录 def on_start(self): # 这里假设登录接口并保存token # response self.client.post(/login, json{username:test, password:test}) # self.token response.json().get(token) print(A new user is starting its tasks...) # 任务1获取待办事项列表 # task 装饰器不带参数时权重默认为1 task(3) # 权重为3意味着执行频率是任务2的3倍 def get_todo_list(self): # 使用self.client发起HTTP请求其API与requests库高度相似 with self.client.get(/api/todos, catch_responseTrue) as response: # catch_responseTrue 允许我们手动控制请求的成功/失败判定 if response.status_code 200: response.success() # 可以进一步检查响应内容 # if “error” in response.text: # response.failure(“Found error in response body”) else: response.failure(fFailed with status code: {response.status_code}) # 任务2创建一个新的待办事项 task(1) # 权重为1 def create_todo(self): todo_data { title: fLocust Test Task at {time.time()}, completed: False } headers {Content-Type: application/json} # 如果需要认证可以添加headers: {Authorization: fBearer {self.token}} with self.client.post(/api/todos, jsontodo_data, headersheaders, catch_responseTrue) as response: if response.status_code 201: response.success() # 有时需要从创建响应中提取ID供后续任务使用 # self.created_id response.json().get(“id”) else: response.failure(fCreate failed: {response.status_code}) # 可选用户停止时执行 def on_stop(self): # 例如登出操作 # self.client.post(“/logout”) print(“User is stopping.”)注意catch_responseTrue和with语句的搭配是Locust中处理响应判定的推荐方式。它确保请求耗时会被正确记录并且允许你基于响应内容而不仅仅是状态码来定义请求成功与否。这是做业务正确性校验的关键。3.2 运行并观察测试保存好脚本后打开终端在你的项目目录下运行locust默认情况下Locust会使用当前目录下的locustfile.py并启动一个Web界面。访问http://localhost:8089你会看到Locust的Web UI。Number of users (peak concurrency)你需要模拟的最大并发用户数。Locust会以一定的速率见下一条逐步增加到这个数量。Spawn rate (users started/second)孵化率即每秒启动多少个新用户直到达到“Number of users”设置的最大值。Host被测试系统的根地址例如http://localhost:3000。注意我们在脚本中写的接口路径如/api/todos会拼接到这个Host后面。填写好参数后点击 “Start swarming”测试就开始了。Web UI会实时展示RPS每秒请求数、响应时间、失败率等关键指标。实操心得一关于“孵化率”的理解很多新手会把“并发用户数”和“RPS”混淆。Number of users是同时“活着”的虚拟用户数量。Spawn rate控制这些用户多快达到目标并发数。例如设置用户数为100孵化率为10意味着Locust会用10秒时间100/10每秒启动10个新用户最终稳定在100个并发用户执行任务。最终的RPS取决于这些用户的wait_time和任务执行耗时。如果你想直接冲击一个高RPS可能需要设置较短的wait_time和较高的并发用户数。4. 进阶技巧让测试脚本更贴近真实场景基础的脚本只能跑通流程但要做一个有说服力的性能测试必须让虚拟用户的行为更“像”真人。4.1 使用TaskSet组织复杂场景当你的业务逻辑包含多个步骤时例如浏览商品-加入购物车-下单使用TaskSet可以让代码结构更清晰。from locust import HttpUser, task, TaskSet, between class BrowseBehavior(TaskSet): # 这个TaskSet内的任务权重是独立的 task(2) def view_homepage(self): self.client.get(“/”) task(1) def view_product(self): # 假设有一个产品ID列表 product_id self.user.product_ids.pop() self.client.get(f”/product/{product_id}”, name”/product/[id]”) # 使用‘name’参数将动态URL归类在统计中不会为每个ID生成单独条目 task(1) def stop_browsing(self): # 调用self.interrupt()可以中断当前TaskSet的执行返回到父级 self.interrupt() class ShoppingCartBehavior(TaskSet): def on_start(self): # 假设进入购物车需要先添加一个物品 self.client.post(“/cart/add”, json{“item”: “sample”}) task def view_cart(self): self.client.get(“/cart”) task(1) def checkout(self): self.client.post(“/checkout”) # 结账后这个用户的任务可能就结束了或者跳转到其他行为 self.interrupt() class WebsiteUser(HttpUser): wait_time between(2, 10) # tasks 属性可以是一个列表包含 (TaskSet类, 权重) 的元组 tasks [ (BrowseBehavior, 3), # 权重3浏览行为更频繁 (ShoppingCartBehavior, 1), # 权重1购物车行为 ] def on_start(self): # 初始化一些用户数据比如产品ID列表 self.product_ids [1001, 1002, 1003, 1004, 1005]关键点解析TaskSet可以嵌套。self.user可以访问到顶层的HttpUser实例从而共享数据。self.interrupt()是控制流程的关键用于主动退出当前TaskSet。name参数在统计中非常重要它将相似的请求如不同ID的产品页归类避免报表被大量不同的URL淹没。4.2 参数化与测试数据管理真实的用户不会都用同样的数据。我们需要从外部文件如CSV、JSON中读取数据或者动态生成数据。方法一CSV文件参数化适用于登录用户等创建一个users.csv文件username,password user1,pass1 user2,pass2 user3,pass3在Locust脚本中读取import csv import random from locust import HttpUser, task, between class LoginUser(HttpUser): wait_time between(1, 3) # 类属性在测试开始前加载所有数据 user_data [] with open(‘users.csv’, newline‘’) as f: reader csv.DictReader(f) for row in reader: user_data.append(row) def on_start(self): # 每个用户实例启动时从数据池中随机取一条或顺序取作为其身份 if self.user_data: self.credentials random.choice(self.user_data) # 执行登录保存会话或token response self.client.post(“/login”, jsonself.credentials) if response.ok: self.auth_header {“Authorization”: f”Bearer {response.json()[‘token’]}“} else: # 登录失败可以标记此用户停止运行 self.stop(forceTrue) task def access_protected_resource(self): self.client.get(“/api/profile”, headersself.auth_header)方法二使用Faker库动态生成数据对于不需要持久关联的数据如创建内容的标题、正文使用动态生成更灵活。from locust import HttpUser, task, between from faker import Faker class DataDrivenUser(HttpUser): wait_time between(1, 5) def on_start(self): # 每个用户实例有自己的Faker生成器避免多线程问题 self.fake Faker() task def create_post(self): post_data { “title”: self.fake.sentence(), “content”: self.fake.text(), “author”: self.fake.name() } self.client.post(“/api/posts”, jsonpost_data)实操心得二数据池的线程安全如果你使用全局列表来存储共享的测试数据如未使用的用户凭证并在多用户并发环境下进行pop()操作可能会遇到数据竞争问题。一个简单的解决方案是使用队列queue.Queue它是线程安全的。或者为每个HttpUser类预先分配好独立的数据切片也是常见的做法。4.3 自定义客户端与请求断言Locust默认的HttpSession基于geventhttpclient性能很好。但有时你需要requests库更丰富的功能如自动处理重定向、更完善的Session管理。你可以轻松替换客户端。from locust import User, task, between, events import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry class RequestsUser(User): # 注意这里继承的是基类 User不是 HttpUser wait_time between(1, 3) def __init__(self, environment): super().__init__(environment) # 创建一个带重试机制的requests Session session requests.Session() retry_strategy Retry( total3, backoff_factor1, status_forcelist[429, 500, 502, 503, 504], ) adapter HTTPAdapter(max_retriesretry_strategy) session.mount(“http://”, adapter) session.mount(“https://”, adapter) self.client session # 将client替换为requests session task def custom_request(self): # 现在可以使用requests session的所有功能 url f”{self.host}/api/endpoint” # 但我们需要手动触发Locust的事件来记录请求耗时和结果 start_time time.time() try: response self.client.get(url) # 计算耗时毫秒 request_time int((time.time() - start_time) * 1000) # 触发请求成功事件 events.request.fire( request_type“GET”, name“/api/endpoint”, response_timerequest_time, response_lengthlen(response.content), contextself.environment, exceptionNone, ) # 业务断言 if response.status_code ! 200: # 触发请求失败事件 events.request.fire( request_type“GET”, name“/api/endpoint”, response_timerequest_time, response_length0, contextself.environment, exceptionException(f”Unexpected status: {response.status_code}”), ) except Exception as e: request_time int((time.time() - start_time) * 1000) events.request.fire( request_type“GET”, name“/api/endpoint”, response_timerequest_time, response_length0, contextself.environment, exceptione, )为什么这么做默认客户端为了极致性能牺牲了一些功能。如果你的测试场景需要严格的Cookie管理、复杂的重定向、或者使用requests特有的认证方式如OAuth1自定义客户端是必要的。但请注意这会在一定程度上影响性能因为requests是同步库而Locust的协程在等待IO时能更高效地切换。对于纯粹的高并发HTTP压测建议优先使用默认客户端。5. 分布式执行与资源监控单机Locust能模拟的用户数受限于CPU和网络。要产生更大的压力需要分布式运行。5.1 主从模式部署Locust采用主从Master-Worker架构进行分布式压测。启动Master节点Master负责分发任务、收集汇总数据。locust -f locustfile.py --master --web-host0.0.0.0--web-host0.0.0.0允许其他机器访问Web UI。启动Worker节点Worker负责执行任务生成负载。可以在同一台机器的不同端口或者不同的机器上启动。locust -f locustfile.py --worker --master-hostMASTER_IP将MASTER_IP替换为Master节点的IP地址。启动多个WorkerMaster的Web UI上会显示连接的Worker数量。关键配置与坑点网络与防火墙确保Master节点默认端口8089和Worker节点默认端口5557之间的端口是通的。云服务器需要注意安全组设置。时钟同步所有Master和Worker机器的系统时间必须同步使用NTP否则汇总的统计数据时间戳会混乱。测试数据一致性如果测试脚本中使用了共享的外部数据文件如CSV需要确保所有Worker节点都能访问到相同的数据源或者使用中央数据库。更常见的做法是在on_start中为每个用户独立生成数据。单机多Worker在一台性能强劲的机器上启动多个Worker进程可以更充分地利用多核CPU。但要注意单个机器网络带宽和端口数可能成为瓶颈。5.2 集成资源监控Master/Worker节点状态Locust本身只报告应用层的性能数据响应时间、RPS。我们还需要监控压测机即运行Locust的机器本身的资源使用情况避免压测机成为瓶颈。一个简单有效的方法是使用locust-plugins库的监听器功能将系统指标CPU、内存、网络发送到时序数据库如InfluxDB或直接打印到日志。pip install locust-plugins修改你的locustfile.py添加资源监控from locust import HttpUser, task, between, events from locust_plugins import listeners import psutil # 需要安装 psutil events.init.add_listener def on_locust_init(environment, **kwargs): # 初始化一个定时器每隔5秒收集一次系统指标 if not environment.runner: return def monitor_system_stats(): cpu_percent psutil.cpu_percent(intervalNone) memory_info psutil.virtual_memory() # 这里可以打印或者发送到外部监控系统 print(f”[System Monitor] CPU: {cpu_percent}%, Memory: {memory_info.percent}%”) # 例如使用locust_plugins的influxdb监听器发送 # environment.events.report_to_influxdb(…) # 每5秒执行一次 environment.runner.greenlet.spawn_later(5, monitor_system_stats)注意监控压测机资源至关重要。如果压测期间Locust Worker节点的CPU持续高于80%或者内存使用率不断增长说明单机模拟的用户数可能已达上限你需要增加Worker节点数量或者优化你的Locust脚本例如检查是否有内存泄漏是否使用了过重的第三方库。6. 结果分析与性能瓶颈定位测试完成后Locust的Web UI提供了丰富的图表但如何从中读出问题6.1 核心指标解读响应时间Response Times中位数Median50%的请求比这个时间快。它受异常值影响小能较好反映“典型”体验。95分位/99分位95th/99th percentile这是关键95%的请求响应时间低于此值。它反映了长尾延迟。如果99分位值突然飙升说明系统在某些请求上出现了严重延迟可能是缓存失效、数据库锁、或某个依赖服务变慢。RPSRequests per Second系统每秒处理的请求数。在并发用户数增加时观察RPS曲线的变化理想情况RPS随用户数线性增长资源充足。常见情况RPS增长逐渐变缓最后达到平台期系统达到瓶颈。异常情况RPS在达到某个点后开始下降系统过载性能劣化。失败率Failures任何非2xx/3xx的HTTP状态码或者你在代码中手动调用response.failure()都会计入失败。高失败率通常意味着被测服务返回错误5xx内部错误4xx客户端错误。连接超时、连接被拒绝网络问题或服务崩溃。你的测试断言逻辑过于严格。6.2 通过图表定位瓶颈响应时间与用户数曲线在“Charts”标签页下观察平均响应时间随并发用户数增加的变化。如果曲线出现明显的“拐点”即响应时间开始指数级上升那个拐点对应的用户数可能就是系统的最佳并发容量。RPS与用户数曲线同样观察RPS曲线。如果RPS不再增长甚至下降而响应时间急剧上升说明系统已经过载可能出现了线程池耗尽、数据库连接池满、或内存交换swapping等情况。失败请求详情点击“Failures”标签查看具体的失败请求URL和错误信息。如果是连接超时可能是网络或服务端处理能力不足如果是HTTP 500需要查看服务端日志如果是HTTP 429请求过多说明触发了限流。6.3 导出数据进行深入分析Web UI的数据在测试停止后可能会丢失除非设置了--csv参数。更专业的做法是在启动测试时指定导出CSVlocust -f locustfile.py --csvreport --headless -u 100 -r 10 -t 5m--csvreport会生成report_stats.csv,report_failures.csv等文件。--headless无头模式不启动Web UI。-u 100最大用户数。-r 10孵化率。-t 5m运行时间5分钟。你可以将CSV数据导入到Excel、Grafana或专业的APM工具中进行更复杂的关联分析比如将响应时间的变化与服务器当时的CPU/内存监控图进行时间轴对齐。7. 常见问题排查与实战避坑指南在实际使用中你一定会遇到各种问题。这里记录了几个最典型的“坑”和解决方法。7.1 “Socket” 相关错误问题出现大量ConnectionRefusedError,ConnectionResetError, 或OSError: [Errno 24] Too many open files。原因端口耗尽单个机器作为客户端可用的临时端口通常范围是32768-60999是有限的。当模拟数万并发且请求频繁时可能快速耗尽。系统限制Linux系统对单个进程打开文件数包括Socket有限制。服务端拒绝被测服务达到最大连接数限制拒绝新的连接。解决方案增加本地端口范围(Linux):sudo sysctl -w net.ipv4.ip_local_port_range“1024 65535”增加文件描述符限制(Linux):ulimit -n 65535并在Locust启动脚本中设置。使用连接池确保使用Locust默认客户端或正确配置的requests.Session它们会复用HTTP连接。分布式压测将负载分散到多台Worker机器上。检查服务端配置如Nginx的worker_connections后端应用服务器的最大线程数/进程数。7.2 结果中出现预料之外的“0ms”请求问题在统计报告中发现少量请求的响应时间为0毫秒。原因这通常不是请求真的瞬间完成而是发生了请求未真正发出就失败的情况。最常见的原因是DNS解析失败、或者试图连接一个无法访问的主机。Locust在建立TCP连接之前就遇到了错误因此记录耗时为0。排查查看“Failures”标签确认错误信息。如果是Failed to establish a new connection或Name or service not known检查你的Host地址是否正确以及DNS解析是否正常。在生产环境压测公网服务时可以考虑在Host中使用IP地址而非域名以排除DNS问题。7.3 内存使用量不断增长内存泄漏问题Locust进程的内存占用在长时间运行后持续增加。原因测试脚本中缓存了过多数据例如在全局或用户类中不断向一个列表追加数据且从未清理。Python/第三方库的内存泄漏相对少见但也有可能。解决方案审查脚本检查是否有全局的列表、字典在无限增长。确保测试数据是循环使用或及时清理的。使用对象池对于需要频繁创建的重对象如Faker实例考虑在on_start中初始化而不是在每个任务中创建。分阶段测试不要一次性运行长达数小时的测试。可以分阶段进行每阶段结束后重启Locust进程。使用内存分析工具如memory_profiler来定位脚本中的内存热点。7.4 如何模拟WebSocket或其它协议Locust的核心是事件驱动它不限于HTTP。你可以继承User类使用任何Python库来模拟任何协议。对于WebSocket可以使用websocket-client库。关键是在Locust的协程环境中正确使用。通常需要将同步的websocket-client调用用gevent的猴子补丁或异步包装起来或者寻找基于gevent或asyncio的WebSocket客户端库。通用模式在你的任务方法中记录开始时间执行你的协议操作发消息、收消息记录结束时间然后手动触发events.request事件来向Locust报告这次“请求”的耗时和成功与否。这给了你极大的灵活性。7.5 测试启动后RPS始终很低问题设置了高并发用户数但RPS远远达不到预期。排查步骤检查wait_time这是最常见的“坑”。如果你设置了wait_time between(5, 10)意味着每个用户在完成一个任务后会等待5到10秒。这极大地限制了单个用户发送请求的频率。对于纯粹的压力测试可以设置为constant(0)或一个很小的值。检查任务逻辑任务中是否有耗时的同步操作比如在任务中直接读写大文件、进行复杂的CPU计算这些操作会阻塞整个协程。应该将这些操作移到on_start中预先完成或者使用异步方式。检查网络延迟使用ping或curl简单测试到被测服务的网络延迟。如果延迟本身就高达几百毫秒那么RPS上限会受到限制。检查压测机资源按照5.2节的方法监控压测机的CPU和网络带宽。如果CPU已满或网络打满说明压测机本身已是瓶颈。检查服务端是否开启限流观察失败请求中是否有大量429状态码。性能测试本身就是一个“测-调-测”的循环过程。Locust给了你强大的武器来生成负载和收集数据但如何设计场景、分析结果、定位瓶颈更需要你对被测系统架构和业务逻辑的深入理解。记住性能测试的目标不是用数字“击垮”系统而是发现系统的能力边界和脆弱点为优化和容量规划提供数据支撑。在下一篇中我们将深入探讨如何将Locust集成到CI/CD流水线中以及如何编写可维护、可复用的性能测试代码库。