Milvus 架构设计:从向量索引到分布式检索,AI 原生存储引擎的内部机制 Milvus 架构设计从向量索引到分布式检索AI 原生存储引擎的内部机制一、十亿向量检索的延迟困境传统数据库为何无法承载相似度查询在大模型应用落地的浪潮中向量检索已成为 AI 系统的基础设施。RAG检索增强生成场景下系统需要从数百万甚至数十亿条文本向量中在毫秒级找到与查询向量最相似的 Top-K 结果。这种高维近似最近邻ANN查询与传统数据库的精确匹配查询在计算模型上存在根本差异。传统数据库的 B 树索引基于精确比较和范围扫描对向量相似度查询完全失效。原因在于维度灾难——在高维空间中所有点之间的距离趋于均匀基于空间划分的索引结构如 R 树的剪枝效率急剧下降。一张包含 1000 万条 768 维向量的表使用暴力扫描计算余弦相似度的延迟在秒级远超在线服务的容忍范围。Milvus 正是为解决这一问题而设计的 AI 原生向量数据库。它的架构从底层开始就围绕向量检索的 I/O 特性和计算模式优化而非在传统数据库上叠加向量索引插件。二、存算分离与段式存储Milvus 的分布式架构内核Milvus 2.x 采用了云原生的存算分离架构核心组件包括 Coordinator、Worker Node 和 Storage Layer 三个层次。理解其架构需要从数据流的角度切入。flowchart TB Client[客户端请求] -- Proxy[Proxybr/请求路由与结果聚合] Proxy -- QCoord[Query Coordinatorbr/查询节点调度] Proxy -- DCoord[Data Coordinatorbr/数据节点调度] Proxy -- ICoord[Index Coordinatorbr/索引节点调度] QCoord -- QNode[Query Nodebr/向量检索执行] DCoord -- DNode[Data Nodebr/数据写入与持久化] ICoord -- INode[Index Nodebr/向量索引构建] DNode -- WAL[Write-Ahead Logbr/消息队列 Kafka/Pulsar] WAL -- QNode DNode -- ObjStore[对象存储 S3/MinIObr/段文件持久化] INode -- ObjStore QNode -- ObjStore ObjStore -- SegData[段文件br/binlog 格式] SegData -- GrowSeg[增长段br/可变在内存中] SegData -- SealSeg[密封段br/不可变已刷盘] QNode -- Search[向量检索br/IVF_FLAT / HNSW / SCANN] QNode -- Retrieve[标量过滤br/Prune ANN] style QNode fill:#e1f5fe style INode fill:#e8f5e9 style DNode fill:#fff3e0段式存储Segment是 Milvus 数据管理的核心单元。每个 Collection 的数据被划分为多个 Segment每个 Segment 包含一组向量数据及其对应的标量字段。Segment 分为两种状态增长段Growing Segment正在接收写入的段数据驻留在 Query Node 的内存中。写入路径为客户端 - Proxy - WAL消息队列- Data Node 持久化到对象存储 - Query Node 订阅 WAL 并在内存中构建增长段。密封段Sealed Segment增长段达到阈值后被密封数据不可变。Index Node 为密封段构建向量索引索引构建完成后替换原始数据Query Node 加载索引文件提供检索服务。这种设计的关键优势在于写入路径和查询路径完全解耦。写入只涉及 WAL 追加和对象存储写入不影响在线检索查询只涉及从对象存储加载索引文件到内存不受写入流量的干扰。向量索引的选型机制是 Milvus 性能的关键。Milvus 支持多种索引类型每种索引在召回率、延迟和内存占用之间做出不同的权衡索引类型召回率查询延迟内存占用构建速度适用场景FLAT100%高暴力扫描最高无需构建小数据集、精确结果IVF_FLAT90%-95%中中快通用场景IVF_PQ85%-92%低低中大规模、内存受限HNSW95%-99%极低高慢低延迟要求SCANN93%-97%低中中Google 推荐方案HNSWHierarchical Navigable Small World是目前工业界最常用的高性能向量索引。其核心思想是构建多层导航图——底层包含所有向量节点上层逐层稀疏采样。查询时从最稀疏的顶层开始逐层向下逼近目标类似于跳表的查找逻辑。HNSW 的查询复杂度为 O(log N)但内存占用约为原始向量的 1.5-2 倍因为需要存储图邻接表。三、生产级向量检索服务Milvus 集群部署与调优实践以下代码展示了一个基于 Milvus Python SDK 的生产级向量检索服务封装包含连接池管理、索引构建、混合检索和性能监控import time import logging import numpy as np from typing import List, Dict, Optional, Tuple from dataclasses import dataclass from contextlib import contextmanager from pymilvus import ( connections, Collection, FieldSchema, CollectionSchema, DataType, utility, AnnSearchRequest, WeightedRanker, ) from pymilvus.orm.types import CONSISTENCY_STRONG logger logging.getLogger(milvus_service) dataclass class VectorSearchResult: 向量检索结果 ids: List[int] # 结果 ID 列表 distances: List[float] # 距离/相似度列表 scores: List[float] # 归一化分数 [0, 1] latency_ms: float # 查询延迟毫秒 recall_hint: Optional[float] # 召回率估算仅 FLAT 对比时可用 class MilvusVectorService: Milvus 向量检索服务封装 支持连接池、索引管理、混合检索向量 标量过滤 # 连接池配置 _connections {} _pool_lock __import__(threading).Lock() def __init__( self, alias: str default, host: str localhost, port: int 19530, collection_name: str embeddings, dim: int 768, ): self.alias alias self.host host self.port port self.collection_name collection_name self.dim dim self._connect() def _connect(self) - None: 建立 Milvus 连接支持连接复用 with self._pool_lock: if self.alias not in self._connections: try: connections.connect( aliasself.alias, hostself.host, portself.port, timeout10, ) self._connections[self.alias] True logger.info(Milvus 连接成功: %s:%d, self.host, self.port) except Exception as e: logger.error(Milvus 连接失败: %s, e) raise contextmanager def _get_collection(self) - Collection: 获取 Collection 对象的上下文管理器 if not utility.has_collection(self.collection_name): self._create_collection() collection Collection(self.collection_name) try: yield collection finally: # 释放 Collection 引用不释放连接 pass def _create_collection(self) - None: 创建 Collection定义向量字段与标量字段 fields [ FieldSchema( nameid, dtypeDataType.INT64, is_primaryTrue, auto_idTrue, ), FieldSchema( nameembedding, dtypeDataType.FLOAT_VECTOR, dimself.dim, # 向量字段不支持默认值 ), FieldSchema( namesource_id, dtypeDataType.INT64, description原始数据源 ID用于标量过滤, ), FieldSchema( namecategory, dtypeDataType.VARCHAR, max_length64, description数据分类标签, ), FieldSchema( namecreated_at, dtypeDataType.INT64, description创建时间戳, ), ] schema CollectionSchema( fieldsfields, description向量嵌入存储集合, enable_dynamic_fieldFalse, # 禁用动态字段减少存储开销 ) collection Collection( nameself.collection_name, schemaschema, # 使用强一致性确保写入后立即可查 consistency_levelCONSISTENCY_STRONG, ) logger.info(Collection 创建成功: %s, self.collection_name) def create_index( self, index_type: str HNSW, metric_type: str COSINE, params: Optional[Dict] None, ) - None: 为向量字段创建索引 HNSW 默认参数M16, efConstruction256 default_params { HNSW: {M: 16, efConstruction: 256}, IVF_FLAT: {nlist: 1024}, IVF_PQ: {nlist: 1024, m: 48, nbits: 8}, } if params is None: params default_params.get(index_type, {}) with self._get_collection() as collection: index_params { index_type: index_type, metric_type: metric_type, params: params, } collection.create_index( field_nameembedding, index_paramsindex_params, ) logger.info( 索引创建完成: type%s, metric%s, params%s, index_type, metric_type, params, ) def search( self, query_vector: np.ndarray, top_k: int 10, filter_expr: Optional[str] None, search_params: Optional[Dict] None, output_fields: Optional[List[str]] None, ) - VectorSearchResult: 执行向量检索 支持标量过滤混合检索和自定义搜索参数 if query_vector.ndim 1: query_vector query_vector.reshape(1, -1) # HNSW 搜索参数ef 越大召回率越高但延迟越大 if search_params is None: search_params {metric_type: COSINE, params: {ef: 128}} default_output [source_id, category, created_at] if output_fields is None: output_fields default_output with self._get_collection() as collection: # 确保 Collection 已加载到内存 if not collection.is_loaded: collection.load() start_time time.monotonic() try: results collection.search( dataquery_vector.tolist(), anns_fieldembedding, paramsearch_params, limittop_k, exprfilter_expr, # 标量过滤表达式 output_fieldsoutput_fields, consistency_levelCONSISTENCY_STRONG, ) except Exception as e: logger.error(向量检索失败: %s, e) raise latency_ms (time.monotonic() - start_time) * 1000 # 解析检索结果 ids [] distances [] scores [] if results and len(results) 0: for hit in results[0]: ids.append(hit.id) distances.append(hit.distance) # 余弦相似度直接作为分数 scores.append(float(hit.distance)) logger.info( 向量检索完成: top_k%d, 返回%d, 延迟%.2fms, top_k, len(ids), latency_ms, ) return VectorSearchResult( idsids, distancesdistances, scoresscores, latency_mslatency_ms, recall_hintNone, ) def hybrid_search( self, query_vectors: List[np.ndarray], weights: List[float], top_k: int 10, filter_expr: Optional[str] None, ) - VectorSearchResult: 多向量混合检索多路召回 加权融合 典型场景文本向量 图像向量联合检索 search_requests [] for i, vec in enumerate(query_vectors): if vec.ndim 1: vec vec.reshape(1, -1) req AnnSearchRequest( datavec.tolist(), anns_fieldembedding, param{metric_type: COSINE, params: {ef: 128}}, limittop_k * 2, # 多召回融合后截断 exprfilter_expr, ) search_requests.append(req) with self._get_collection() as collection: if not collection.is_loaded: collection.load() start_time time.monotonic() try: results collection.hybrid_search( reqssearch_requests, rankerWeightedRanker(*weights), limittop_k, output_fields[source_id, category], ) except Exception as e: logger.error(混合检索失败: %s, e) raise latency_ms (time.monotonic() - start_time) * 1000 ids, distances, scores [], [], [] if results and len(results) 0: for hit in results[0]: ids.append(hit.id) distances.append(hit.distance) scores.append(float(hit.distance)) return VectorSearchResult( idsids, distancesdistances, scoresscores, latency_mslatency_ms, recall_hintNone, ) def insert( self, embeddings: np.ndarray, source_ids: List[int], categories: List[str], timestamps: Optional[List[int]] None, ) - List[int]: 批量插入向量数据 返回插入的 ID 列表 if timestamps is None: timestamps [int(time.time())] * len(source_ids) data [ embeddings.tolist(), source_ids, categories, timestamps, ] with self._get_collection() as collection: try: insert_result collection.insert(data) # 触发 Flush 确保数据持久化生产环境按需调整频率 collection.flush() logger.info( 向量插入完成: count%d, insert_result.insert_count, ) return insert_result.primary_keys except Exception as e: logger.error(向量插入失败: %s, e) raise # 使用示例 def demo(): 演示 Milvus 向量检索服务的完整工作流 service MilvusVectorService( hostlocalhost, port19530, collection_namedemo_embeddings, dim768, ) # 创建 HNSW 索引 service.create_index(index_typeHNSW, metric_typeCOSINE) # 批量插入向量 num_vectors 10000 embeddings np.random.randn(num_vectors, 768).astype(np.float32) # 归一化向量余弦相似度要求 embeddings embeddings / np.linalg.norm(embeddings, axis1, keepdimsTrue) source_ids list(range(num_vectors)) categories [tech] * (num_vectors // 2) [science] * (num_vectors - num_vectors // 2) service.insert(embeddings, source_ids, categories) # 单向量检索 query np.random.randn(768).astype(np.float32) query query / np.linalg.norm(query) result service.search( query_vectorquery, top_k10, filter_exprcategory tech, # 标量过滤 ) print(f检索结果: {len(result.ids)} 条, 延迟: {result.latency_ms:.2f}ms) if __name__ __main__: logging.basicConfig(levellogging.INFO) demo()关键设计决策连接池使用类级别的字典管理避免每次操作重建连接的开销。hybrid_search方法支持多路召回加权融合这是 RAG 系统中常见的文本向量 关键词向量双路检索模式。搜索参数中ef128是 HNSW 的搜索宽度——ef 越大召回率越高但延迟越大128 是 95% 召回率与 10ms 延迟的平衡点基于 1000 万 768 维向量的基准测试。四、Milvus 的性能边界与架构权衡向量数据库的架构选择本质上是在召回率、延迟和资源成本之间做权衡Milvus 也不例外召回率与延迟的不可调和矛盾。HNSW 的 ef 参数控制搜索宽度ef64 时 Top-10 召回率约 90%ef256 时约 98%但延迟增加 3-4 倍。在生产环境中必须根据业务对召回率的实际需求设定 ef 值——推荐系统通常 90%-95% 即可而法律合规检索可能要求 99% 以上。内存消耗的硬约束。HNSW 索引需要全部加载到内存中1 亿条 768 维 float32 向量的原始数据约 286GB加上 HNSW 邻接表约需 500GB 内存。对于超大规模数据集必须使用 IVF_PQ 等压缩索引但压缩会带来 5%-10% 的召回率损失。存算分离架构虽然降低了存储成本但查询时仍需将索引加载到 Query Node 内存内存成本是 Milvus 集群最大的运营支出。标量过滤的执行顺序。Milvus 支持在向量检索时附加标量过滤条件但过滤的执行时机影响性能。Pre-filter 模式先过滤再检索适用于过滤后数据量大幅减少的场景Post-filter 模式先检索再过滤适用于过滤条件宽松的场景。Milvus 2.x 默认使用 Post-filter当过滤条件过于严格时如过滤掉 99% 的数据可能返回不足 Top-K 的结果。需要根据业务场景调整过滤策略。一致性级别的选择。Milvus 支持强一致、有界一致、会话一致和最终一致四个级别。强一致保证写入后立即可查但延迟增加 10-20ms需要等待 WAL 同步。对于 RAG 场景通常使用会话一致即可——同一客户端写入后可以立即查到自己的数据但不保证其他客户端立即可见。五、总结Milvus 的架构设计体现了 AI 原生存储的核心理念存算分离、段式管理、索引可插拔。HNSW 索引在低延迟场景下的表现优于 IVF 系列但内存消耗是其硬约束IVF_PQ 适用于大规模内存受限场景但需要接受召回率损失。生产部署的关键决策在于根据数据规模和召回率需求选择索引类型根据查询 QPS 规划 Query Node 数量根据标量过滤的严格程度选择过滤策略。向量数据库不是传统数据库的替代品而是 AI 系统中专为高维近似检索优化的存储引擎——理解其 I/O 模型和计算特性才能做出正确的架构选型。