
AI 推理任务调度基于优先级队列与 GPU 资源感知的调度策略实战一、GPU 资源争抢的困局从排队饥饿到低效利用AI 推理任务与传统的 CPU 任务调度有本质差异。CPU 任务可以细粒度分时复用一个 4 核 CPU 可以同时运行数十个线程。但 GPU 资源不可分时复用——一个推理任务加载模型后独占整张 GPU 显存其他任务必须等待模型卸载后才能使用。当多个推理任务争抢有限的 GPU 资源时会出现两类问题。第一类是排队饥饿低优先级的批量推理任务如文档摘要、向量嵌入占用了大量 GPU 时间高优先级的在线推理请求如实时对话、搜索排序被迫排队等待影响用户体验。第二类是低效利用不同模型对 GPU 资源的需求差异巨大7B 模型需要 14GB 显存70B 模型需要 140GB 显存。如果调度器不考虑模型大小将小模型和大模型分配到同一张 GPU 上小模型任务完成后 GPU 剩余大量显存闲置而大模型任务却因显存不足无法启动。解决这两类问题需要构建一个感知 GPU 资源和任务优先级的智能调度器。二、调度器架构优先级队列、资源感知与动态抢占调度器由三个核心组件构成优先级队列负责按任务紧急程度排序资源感知模块负责追踪 GPU 显存和算力使用情况调度决策引擎负责在优先级和资源约束下做出最优分配。flowchart TD subgraph 任务提交 T1[在线推理请求br/优先级: P0] T2[实时排序请求br/优先级: P1] T3[批量嵌入任务br/优先级: P2] T4[离线训练任务br/优先级: P3] end subgraph 优先级队列 PQ0[P0 队列br/在线推理] PQ1[P1 队列br/实时排序] PQ2[P2 队列br/批量嵌入] PQ3[P3 队列br/离线训练] end T1 -- PQ0 T2 -- PQ1 T3 -- PQ2 T4 -- PQ3 subgraph 调度决策引擎 SCHED[调度器] RES_TABLE[GPU 资源表br/显存/算力/模型状态] PREEMPT[抢占决策模块] end PQ0 -- SCHED PQ1 -- SCHED PQ2 -- SCHED PQ3 -- SCHED RES_TABLE -- SCHED SCHED -- PREEMPT subgraph GPU 集群 GPU1[GPU 1br/A100 80GBbr/运行: 7B模型] GPU2[GPU 2br/A100 80GBbr/运行: 7B模型] GPU3[GPU 3br/A100 80GBbr/空闲] GPU4[GPU 4br/A100 80GBbr/运行: 70B模型br/4卡并行] end SCHED --|分配任务| GPU1 SCHED --|分配任务| GPU2 SCHED --|分配任务| GPU3 PREEMPT --|抢占低优先级| GPU1 style PQ0 fill:#e74c3c,color:#fff style PQ3 fill:#95a5a6,color:#fff style SCHED fill:#3498db,color:#fff style PREEMPT fill:#e67e22,color:#fff优先级队列设计任务按业务紧急程度分为 4 个优先级。P0 为在线推理实时对话、搜索延迟要求 200msP1 为近线推理推荐排序、内容审核延迟要求 1sP2 为批量推理文档处理、向量嵌入延迟要求 5minP3 为离线训练无延迟要求。调度器严格按优先级从高到低分配资源只有高优先级队列为空时才调度低优先级任务。GPU 资源感知资源表实时追踪每张 GPU 的显存使用量、当前加载的模型、推理队列深度。关键指标是有效可用显存——总显存减去已加载模型的显存占用再减去 KV Cache 预留空间。调度器根据任务所需模型大小匹配有效可用显存充足的 GPU。动态抢占当 P0 任务到达但所有 GPU 都被低优先级任务占用时调度器触发抢占。抢占策略是选择预计完成时间最短的 P2/P3 任务等待其完成后再调度 P0 任务而非直接杀掉正在推理的任务。这种优雅抢占避免了中间结果丢失但增加了 P0 任务的等待时间。三、生产级调度器实现3.1 优先级队列与任务定义 AI 推理任务调度器 核心设计多级优先级队列 GPU 资源感知 优雅抢占 import asyncio import time from dataclasses import dataclass, field from enum import IntEnum from typing import Dict, List, Optional import logging logger logging.getLogger(__name__) class TaskPriority(IntEnum): 任务优先级数值越小优先级越高 ONLINE_INFERENCE 0 # 在线推理延迟 200ms NEARLINE_INFERENCE 1 # 近线推理延迟 1s BATCH_INFERENCE 2 # 批量推理延迟 5min OFFLINE_TRAINING 3 # 离线训练无延迟要求 dataclass class InferenceTask: 推理任务定义 task_id: str model_name: str # 模型所需显存GB调度器据此匹配 GPU required_vram_gb: float priority: TaskPriority # 预估推理时长秒用于抢占决策 estimated_duration_sec: float # 提交时间同优先级下先到先得 submit_time: float field(default_factorytime.time) # 输入数据 input_data: dict field(default_factorydict) # 是否可被抢占 preemptible: bool True dataclass class GPUResource: GPU 资源状态 gpu_id: str total_vram_gb: float # 当前已用显存 used_vram_gb: float 0.0 # 当前加载的模型名称 current_model: Optional[str] None # 当前运行的任务 running_task: Optional[InferenceTask] None # 任务预计完成时间 task_estimated_end: Optional[float] None property def available_vram_gb(self) - float: 有效可用显存总显存 - 已用显存 - KV Cache 预留 kv_cache_reserve 5.0 # 预留 5GB 给 KV Cache return max(0, self.total_vram_gb - self.used_vram_gb - kv_cache_reserve) property def is_idle(self) - bool: return self.running_task is None3.2 GPU 资源感知调度器class GPUAwareScheduler: GPU 资源感知调度器 调度策略 1. 严格按优先级从高到低调度 2. 优先分配已加载目标模型的 GPU零冷启动 3. 其次分配空闲 GPU需加载模型 4. 最后考虑抢占低优先级任务 def __init__(self, gpus: List[GPUResource]): self.gpus {gpu.gpu_id: gpu for gpu in gpus} # 每个优先级一个队列 self.queues: Dict[TaskPriority, List[InferenceTask]] { p: [] for p in TaskPriority } self._lock asyncio.Lock() async def submit_task(self, task: InferenceTask): 提交任务到优先级队列 async with self._lock: self.queues[task.priority].append(task) logger.info( f任务提交: id{task.task_id}, fpriority{task.priority.name}, fmodel{task.model_name} ) # 触发调度 await self.schedule() async def schedule(self): 执行一轮调度 async with self._lock: for priority in TaskPriority: queue self.queues[priority] while queue: task queue[0] gpu self._find_best_gpu(task) if gpu: queue.pop(0) await self._dispatch_task(task, gpu) else: # 没有可用 GPU尝试抢占 if priority.value TaskPriority.BATCH_INFERENCE.value: preempted await self._try_preempt(task) if preempted: continue # 无法调度等待下次触发 break def _find_best_gpu( self, task: InferenceTask ) - Optional[GPUResource]: 为任务寻找最优 GPU 策略优先级 1. 已加载目标模型的空闲 GPU零冷启动 2. 空闲且显存足够的 GPU需加载模型 3. 显存不足的 GPU 不考虑 # 第一优先已加载目标模型的空闲 GPU for gpu in self.gpus.values(): if (gpu.is_idle and gpu.current_model task.model_name and gpu.available_vram_gb 0): return gpu # 第二优先空闲且显存足够的 GPU # 按可用显存升序排列优先填满小 GPU大 GPU 留给大模型 idle_gpus sorted( [g for g in self.gpus.values() if g.is_idle and g.available_vram_gb task.required_vram_gb], keylambda g: g.available_vram_gb ) return idle_gpus[0] if idle_gpus else None async def _try_preempt( self, task: InferenceTask ) - bool: 尝试抢占低优先级任务的 GPU 策略选择预计完成时间最早的、优先级低于当前任务的可抢占任务 优雅抢占等待低优先级任务完成而非直接杀掉 # 找到所有可抢占的运行中任务 preemptible_tasks [ (gpu_id, gpu) for gpu_id, gpu in self.gpus.items() if (gpu.running_task is not None and gpu.running_task.preemptible and gpu.running_task.priority task.priority) ] if not preemptible_tasks: return False # 按预计完成时间排序选择最快完成的任务 preemptible_tasks.sort( keylambda x: x[1].task_estimated_end or float(inf) ) target_gpu_id, target_gpu preemptible_tasks[0] wait_time max(0, target_gpu.task_estimated_end - time.time()) if wait_time 30: # 等待时间超过 30 秒抢占代价过大放弃 logger.warning( f抢占等待时间过长: {wait_time:.0f}s, f放弃抢占, taskId{task.task_id} ) return False logger.info( f触发优雅抢占: gpu{target_gpu_id}, f等待{wait_time:.0f}s, f被抢占任务{target_gpu.running_task.task_id} ) # 等待低优先级任务完成 await asyncio.sleep(wait_time) target_gpu.running_task None target_gpu.task_estimated_end None return True async def _dispatch_task( self, task: InferenceTask, gpu: GPUResource ): 将任务分配到 GPU 执行 gpu.running_task task gpu.used_vram_gb task.required_vram_gb gpu.current_model task.model_name gpu.task_estimated_end ( time.time() task.estimated_duration_sec ) logger.info( f任务调度: id{task.task_id}, fgpu{gpu.gpu_id}, fmodel{task.model_name} )3.3 调度指标监控 调度器指标采集 核心指标队列深度、调度延迟、GPU 利用率、抢占次数 from prometheus_client import Gauge, Counter, Histogram # 各优先级队列深度 QUEUE_DEPTH Gauge( scheduler_queue_depth, 当前队列深度, [priority] ) # 任务调度延迟从提交到开始执行的等待时间 SCHEDULE_LATENCY Histogram( scheduler_latency_seconds, 任务调度延迟, [priority], buckets[0.1, 0.5, 1, 2, 5, 10, 30, 60, 120] ) # GPU 显存利用率 GPU_VRAM_USAGE Gauge( gpu_vram_usage_ratio, GPU 显存利用率, [gpu_id] ) # 抢占次数 PREEMPTION_COUNT Counter( scheduler_preemption_total, 任务抢占次数, [preempted_priority] ) class SchedulerMetrics: 调度器指标采集器 staticmethod def update_queue_depth( queues: Dict[TaskPriority, List[InferenceTask]] ): for priority, queue in queues.items(): QUEUE_DEPTH.labels( prioritypriority.name ).set(len(queue)) staticmethod def record_schedule_latency( task: InferenceTask, dispatch_time: float ): latency dispatch_time - task.submit_time SCHEDULE_LATENCY.labels( prioritytask.priority.name ).observe(latency) staticmethod def update_gpu_metrics(gpus: Dict[str, GPUResource]): for gpu_id, gpu in gpus.items(): usage_ratio gpu.used_vram_gb / gpu.total_vram_gb GPU_VRAM_USAGE.labels(gpu_idgpu_id).set(usage_ratio) staticmethod def record_preemption(preempted_priority: TaskPriority): PREEMPTION_COUNT.labels( preempted_prioritypreempted_priority.name ).inc()四、智能调度的代价抢占开销、模型切换成本与调度延迟抢占开销优雅抢占需要等待低优先级任务完成P0 任务的调度延迟可能达到数十秒。对于延迟要求极严的场景 200ms抢占策略无法满足要求必须预留专用 GPU 给在线推理服务但这会降低 GPU 利用率。模型切换成本当 GPU 需要从模型 A 切换到模型 B 时需要卸载模型 A 并加载模型 B耗时约 30-60 秒。频繁的模型切换会严重降低吞吐量。解决方案是按模型维度分组调度将同一模型的请求集中到同一 GPU 上减少切换次数。调度延迟调度器本身的决策时间虽然很短毫秒级但从任务提交到 GPU 实际开始推理中间经过队列等待、资源匹配、模型加载等多个环节端到端延迟可能达到分钟级。对于离线批量任务这是可接受的但对在线推理不可接受。五、总结AI 推理任务调度的核心挑战是 GPU 资源的不可分时复用性。优先级队列确保高优先级的在线推理请求优先获得 GPU 资源资源感知调度根据模型大小匹配 GPU 显存优雅抢占在资源紧张时为高优先级任务腾出空间。但抢占的等待时间、模型切换的加载成本、以及调度决策本身的延迟都是调度器必须面对的代价。落地路线建议第一步建立任务优先级分类体系明确在线推理、近线推理、批量推理的优先级和 SLA第二步实现 GPU 资源感知调度器追踪每张 GPU 的显存使用和模型加载状态第三步为在线推理服务预留专用 GPU避免与批量任务争抢资源第四步实现优雅抢占机制在资源紧张时优先保障高优先级任务第五步建立调度延迟和 GPU 利用率的监控看板持续优化调度策略。