检索增强生成RAG项目tools_03:mysql➕redis➕milvus

张开发
2026/4/13 15:29:04 15 分钟阅读

分享文章

检索增强生成RAG项目tools_03:mysql➕redis➕milvus
前面我们介绍了Docker部署➕ollama➕logging➕bm25等RAG项目中各个必不可少的tools,本篇主要讲的是mysql➕redis➕milvus!!!我总结一下,整个RAG就是点多面广,全面开火/(ㄒoㄒ)/~~前言在RAG系统的数据架构中单一数据库无法满足所有需求。今天我们聚焦数据存储的三驾马车工具类型在RAG中的角色MySQL关系型数据库存储业务数据、文档元数据、用户信息Redis键值缓存缓存检索结果、会话管理、去重Milvus向量数据库向量存储与相似性检索这三者不是竞争关系而是互补协作的关系。一、MySQL结构化数据存储1.1 为什么RAG需要MySQL在RAG系统中向量数据库负责语义检索但业务数据仍然需要关系型数据库# RAG系统中的数据分工 数据分工 { MySQL: [ 文档元数据标题、作者、时间, 用户信息账号、权限, 问答日志记录用户查询历史, 业务配置知识库配置、模型参数 ], Milvus: [ 文档向量语义表示, 查询向量实时检索 各种参数的变换囊括了相似度算法的各种应用场景 ], Redis: [ 热点查询缓存, 会话状态, 频率限制计数器 ] }1.2 基础操作代码import pymysql import pandas as pd def getConnection(): connection pymysql.connect( host127.0.0.1, userroot, password123456, databasetest ) return connection重难点分析pymysql是纯Python实现不需要额外安装MySQL驱动每次操作后要commit()否则数据不会真正写入必须close()释放连接否则会导致连接池耗尽1.3 创建数据表def createTable(): connection getConnection() cursor connection.cursor() # ➕游标:给我们的mysql创建一只执行的手 create_table_query CREATE TABLE IF NOT EXISTS jpkb ( id INT AUTO_INCREMENT PRIMARY KEY, subject_name VARCHAR(20), question VARCHAR(1000), answer VARCHAR(1000)) cursor.execute(create_table_query) connection.commit() connection.close()字段设计分析字段类型长度说明idINTAUTO_INCREMENT自增主键subject_nameVARCHAR20学科名称questionVARCHAR1000问题检索的关键answerVARCHAR1000答案生成的依据1.4 批量插入数据def insertData(): data pd.read_csv(../data/JP学科知识问答.csv) print(data.head()) connection getConnection() cursor connection.cursor() for _, row in data.iterrows(): insert_query INSERT INTO jpkb (subject_name, question, answer) VALUES (%s, %s, %s) cursor.execute(insert_query, (row[学科名称], row[问题], row[答案])) connection.commit() connection.close()思考问为什么用pd.read_csv读CSV而不是逐行INSERT答pandas可以批量处理数据还提供了数据清洗功能。但注意iterrows()逐行插入效率不高数据量大时应该用executemany()批量插入。优化版本def insertDataBatch(): data pd.read_csv(../data/JP学科知识问答.csv) connection getConnection() cursor connection.cursor() # 准备批量数据 batch_data [(row[学科名称], row[问题], row[答案]) for _, row in data.iterrows()] # 批量插入效率提升10倍 insert_query INSERT INTO jpkb (subject_name, question, answer) VALUES (%s, %s, %s) cursor.executemany(insert_query, batch_data) connection.commit() connection.close()1.5 查询数据def getData(): connection getConnection() cursor connection.cursor() cursor.execute(SELECT subject_name, question, answer FROM jpkb limit 10) results cursor.fetchall() for result in results: print(result) connection.close()重难点分析fetchall()一次性获取所有结果数据量大时可能内存溢出替代方案fetchmany(size)分批获取二、Redis高速缓存2.1 Redis在RAG中的价值Redis是内存数据库读写速度极快微秒级适合以下场景Redis使用场景 { 缓存热点查询: 相同问题直接返回缓存结果避免重复检索和LLM调用, 去重机制: 防止短时间内重复处理相同请求, 会话管理: 存储多轮对话的上下文, 频率限制: 防止API被滥用, 任务队列: 异步处理耗时任务 }2.2 基础操作import redis import json def getConnection(): client redis.Redis( host127.0.0.1, port6379, password1234, charsetutf-8 ) return client def insertData(key, value): client getConnection() client.set(key, value) def getData(key): client getConnection() data client.get(key) if data is not None: print(data.decode(utf-8)) else: print(data) def delData(key): client getConnection() client.delete(key)2.3 存储JSON数据if __name__ __main__: key 123 # 存储JSON对象 insertData(key, json.dumps({姓名: 张程, 年龄: 27}, ensure_asciiFalse)) getData(key) # {姓名: 张程, 年龄: 27} delData(key)重难点分析Redis只能存储字符串对象必须序列化为JSONensure_asciiFalse保证中文正常显示读取时decode(utf-8)将bytes转为字符串再用json.loads()解析2.4 RAG中的缓存策略class RAGCache: def __init__(self): self.redis getConnection() self.ttl 3600 # 缓存1小时 def get_cached_answer(self, question): 获取缓存的答案 key fqa:{hash(question)} # 使用问题哈希作为key cached self.redis.get(key) if cached: return json.loads(cached.decode(utf-8)) return None def cache_answer(self, question, answer): 缓存答案 key fqa:{hash(question)} self.redis.setex(key, self.ttl, json.dumps(answer, ensure_asciiFalse)) def rate_limit(self, user_id, max_requests10, window60): 频率限制每分钟最多10次请求 key frate:{user_id} current self.redis.get(key) if current is None: self.redis.setex(key, window, 1) return True elif int(current) max_requests: self.redis.incr(key) return True else: return False # 超过限制思考问Redis的key应该怎么设计答遵循业务:标识符:属性的模式。例如qa:user:123:history、cache:question:hash。这种设计便于管理和调试。qa:user:123:history │ │ │ │ │ │ │ └── 具体属性这是什么数据 │ │ └────── 唯一标识哪个用户 │ └─────────── 实体类型用户相关 └────────────── 业务模块问答系统qa:user:123:history就是在问答系统qa中用户123user:123的浏览历史记录history里面存储的是用户看过的问题ID列表。cache:question:hash │ │ │ │ │ └── 具体属性什么类型的缓存 │ └────────── 实体类型问题相关 └─────────────── 业务模块缓存层cache:question:hash是用问题的哈希值作为 key缓存预计算结果如 AI 答案、embedding 向量避免相同问题重复计算提升响应速度。场景1qa:user:123:history真实场景你在刷知乎要看你最近看过的10个问题# 用户123 看了问题 1001 LPUSH qa:user:123:history 1001 # 又看了 1002、1003 LPUSH qa:user:123:history 1002 1003 # 现在历史记录[1003, 1002, 1001] # 获取最近3条 LRANGE qa:user:123:history 0 2 # 返回[1003, 1002, 1001]一句话存用户看过的问题ID列表用来展示最近浏览。场景2cache:question:hash真实场景用户问什么是RedisAI回答要2秒相同问题别再问AIimport hashlib text 什么是Redis hash_val hashlib.md5(text.encode()).hexdigest()[:16] key fcache:question:hash:{hash_val} # cache:question:hash:5d41402abc4b2a76 # 第一次查缓存 → 没有 result r.get(key) # None # 调用AI2秒然后存起来 answer ai_call(什么是Redis) r.setex(key, 3600, answer) # 缓存1小时 # 第二次同一个问题 → 直接命中 result r.get(key) # 秒回不用再调AI一句话用问题的MD5当key缓存AI答案相同问题不再重复计算。三、Milvus向量数据库3.1 Milvus的核心概念Milvus层级结构 { Database: 数据库类似MySQL的database, Collection: 集合类似MySQL的table, Partition: 分区逻辑分割提升查询性能, Index: 索引加速向量检索, Entity: 实体一条记录包含向量标量 }3.2 数据库操作from pymilvus import MilvusClient, DataType def getConnection(): client MilvusClient(urihttp://localhost:19530) return client def createDataBase(): client getConnection() databases client.list_databases() if test not in databases: client.create_database(db_nametest) else: client.using_database(db_nametest)3.3 创建集合与Schemadef createCollection(): client getConnection() client.using_database(db_nametest) # 定义schema schema client.create_schema(auto_idFalse, enable_dynamic_fieldTrue) schema.add_field(field_nameid, datatypeDataType.INT64, is_primaryTrue) schema.add_field(field_namevector, datatypeDataType.FLOAT_VECTOR, dim5) schema.add_field(field_namescalar, datatypeDataType.VARCHAR, max_length256) # 创建集合 client.create_collection(collection_namedemo_v1, schemaschema) # 创建向量索引 prepare_indexs client.prepare_index_params() prepare_indexs.add_index(field_namevector, index_type, metric_typeCOSINE, index_namevector_index) client.create_index(collection_namedemo_v1, index_paramsprepare_indexs) # 加载集合到内存必须 client.load_collection(collection_namedemo_v1)repare_indexs.add_index( field_namevector,index_type,metric_typeCOSINE,index_namevector_index)参数解析1.field_namevector 指定对哪个字段创建索引作用告诉数据库我要给vector这个字段加速检索。2.index_type这里是空的索引类型用哪种算法加速搜索不填的话, 标量会AUTOINDEX自动选择, 向量的话会报错,不建议不填性能对比真实数据测试条件100万条、128维、COSINE相似度索引类型查询耗时内存占用召回率(99%目标)构建时间FLAT1000ms512MB100%0IVF_FLAT(nlist1024)50ms520MB95%30秒IVF_SQ845ms130MB92%35秒IVF_PQ(m8)30ms65MB88%60秒HNSW(M16)10ms800MB98%5分钟HNSW_SQ88ms200MB96%5分钟选择决策树 你的数据量多大 │ ├── 10万 → FLAT简单、精确 │ ├── 10万 ~ 500万 │ ├── 内存充足 → IVF_FLAT │ └── 内存紧张 → IVF_SQ8 │ ├── 500万 ~ 5000万 │ ├── 追求速度 → HNSW │ ├── 平衡方案 → HNSW_SQ8 │ └── 内存极度紧张 → IVF_PQ │ └── 5000万 ├── 有SSD → DiskANN └── 纯内存 → HNSW_PQ 完整的选择逻辑小数据用 FLAT中等数据用 IVF_FLAT/SQ8大数据用 HNSW超大数据用 DiskANN内存、速度、精度三者不可兼得需要 trade-off (取舍)。3.metric_typeCOSINE 距离度量方式怎么判断两个向量相似值公式范围适用场景COSINE余弦相似度[-1, 1]文本相似度最常用L2欧氏距离[0, ∞)图像、音频特征IP内积(-∞, ∞)需归一化的向量这里COSINE是对的适合文本 embedding。建索引时的metric_type 告诉系统我会用什么方式检索系统按这个方式组织数据检索时的metric_type 告诉系统请用这个方式计算相似度两者必须一致否则结果错误或报错。4.index_namevector_index 索引的名字方便管理和删除索引field_name 对哪个字段建索引index_type 用什么算法HNSW最快metric_type 怎么算相似度文本用COSINEindex_name 索引的名字重难点分析参数说明注意事项auto_idFalse主键不自动生成插入时必须提供idenable_dynamic_fieldTrue允许动态字段动态字段存入$metametric_typeCOSINE距离计算方式必须与查询时一致load_collection()加载到内存不加载无法搜索3.4 插入数据def insertData(): client getConnection() client.using_database(db_nametest) # 简化创建自动创建集合 client.create_collection(collection_namedemo_v2, dimension5, metric_typeIP) data [ {id: 0, vector: [0.358, -0.602, 0.184, -0.262, 0.902], color: pink_8682}, # ... 更多数据 ] client.insert(collection_namedemo_v2, datadata) client.upsert(collection_namedemo_v2, datadata) # 去重更新insert vs upsert操作行为适用场景insert()直接插入可能重复初次导入upsert()存在则更新不存在则插入增量更新3.5 分区操作def insertDataToPatition(): client getConnection() client.using_database(db_nametest) # 创建分区 client.create_partition(collection_namedemo_v2, partition_namepartitionA) # 向分区插入数据 client.upsert(collection_namedemo_v2, datadata, partition_namepartitionA)分区的作用思考问partition_namepartitionA分区的作用就是让查询和操作变得更快对吧答是的分区的主要作用是通过缩小数据操作的范围来提升性能。删除一个分区比删除数百万条实体快得多非常适合时序数据和多租户场景。3.6 向量检索def getData(): # 单一向量查询 result1 client.search( collection_namedemo_v2, data[[0.1988, 0.0602, 0.6977, 0.2614, 0.8387]], limit2, search_params{metric_type: IP}, output_fields[id, vector, color] ) # 分区搜索 res3 client.search( collection_namedemo_v2, data[[0.0217, 0.0586, 0.6169, -0.7944, 0.5555]], limit5, search_params{metric_type: IP}, partition_names[partitionA] # 只搜索指定分区 ) # 带过滤的搜索 res5 client.search( collection_namedemo_v2, data[[0.3580, -0.6023, 0.1841, -0.2629, 0.9029]], limit5, filtercolor like red%, # 先过滤后搜索 output_fields[color] )3.7 范围搜索精确控制# 范围搜索只返回相似度在 [0.8, 1.0] 的结果 search_params { metric_type: IP, params: { radius: 0.8, # 相似度下限 range_filter: 1 # 相似度上限 } } res6 client.search( collection_namedemo_v2, data[[0.3580, -0.6023, 0.1841, -0.2629, 0.9029]], limit6, search_paramssearch_params, output_fields[color], )思考问用radius和range_filter这种方式感觉才是精确搜索的正确姿势。答完全正确使用范围搜索是精确搜索的正确姿势。TopK搜索是不管怎样都要凑够K个可能会返回不相关结果而范围搜索是质量不达标就不返回保证结果质量。四、三库协同RAG完整数据流class RAGSystem: def __init__(self): self.mysql MySQLClient() self.redis RedisClient() self.milvus MilvusClient() self.cache_ttl 3600 def answer_question(self, user_id, question): # 1. 频率限制Redis if not self.redis.rate_limit(user_id): return {error: 请求过于频繁请稍后再试} # 2. 检查缓存Redis cache_key fqa:{hash(question)} cached self.redis.get(cache_key) if cached: return json.loads(cached) # 3. 生成查询向量假设有embedding模型 query_vector self.get_embedding(question) # 4. 向量检索Milvus search_results self.milvus.search( collection_nameknowledge_base, data[query_vector], limit5, output_fields[doc_id, chunk_text] ) # 5. 获取完整文档MySQL doc_ids [r[entity][doc_id] for r in search_results[0]] documents self.mysql.get_documents_by_ids(doc_ids) # 6. 构建Prompt并调用LLM answer self.generate_answer(question, documents) # 7. 缓存结果Redis self.redis.setex(cache_key, self.cache_ttl, json.dumps(answer)) # 8. 记录日志MySQL self.mysql.log_query(user_id, question, answer) return {answer: answer, sources: documents}五、清理资源def deleteDataBase(): client getConnection() client.using_database(db_nametest) # 必须按顺序清理 client.release_collection(collection_namedemo_v1) # 1. 释放集合 client.release_collection(collection_namedemo_v2) client.drop_index(collection_namedemo_v1, index_namevector_index) # 2. 删除索引 client.drop_index(collection_namedemo_v2, index_namevector_index) client.drop_collection(collection_namedemo_v1) # 3. 删除集合 client.drop_collection(collection_namedemo_v2) client.drop_database(db_nametest) # 4. 删除数据库重难点分析删除顺序释放集合 → 删除索引 → 删除集合 → 删除数据库必须先release_collection才能删除索引六、总结对比6.1 三种数据库的特点特性MySQLRedisMilvus存储介质磁盘内存磁盘内存读写速度毫秒级微秒级毫秒级检索数据模型表格键值对向量标量查询方式SQL命令向量相似度在RAG中的角色业务数据缓存语义检索6.2 选择策略数据存储选择策略 { 需要持久化、关联查询、事务: MySQL, 需要极速读写、临时存储、缓存: Redis, 需要语义检索、相似度匹配: Milvus }6.3 最佳实践MySQL存元数据Milvus存向量通过公共ID关联Redis缓存热点查询减少重复计算分区提升Milvus性能按时序或类别分区混合检索BM25向量兼顾关键词和语义附录常见问题Q1: 为什么不用MongoDB代替MySQLMongoDB适合文档存储但MySQL的关系查询和事务支持更成熟。RAG系统中元数据之间的关联查询如用户-文档-权限用MySQL更合适。Q2: Redis数据丢了怎么办Redis默认不持久化生产环境应开启RDB或AOF持久化。缓存数据丢失不影响核心功能只是会重新计算。Q3: Milvus的dimension怎么选取决于Embedding模型BGE-large是1024维OpenAI ada是1536维Qwen是896维。保持一致即可。写在最后MySQL、Redis、Milvus各司其职共同构成了RAG系统的数据底座。理解它们的特点和使用场景是构建生产级RAG系统的关键。

更多文章