Appearance
向量数据库实战
4.1 向量数据库作用与选型
4.1.1 向量数据库的作用
- 高效存储:专门优化向量数据的存储结构
- 快速检索:支持相似度搜索和近似最近邻搜索
- 可扩展性:处理大规模向量数据
- 多样化查询:支持多种相似度度量和查询方式
- 元数据过滤:结合结构化数据进行复合查询
- 高可用性:支持分布式部署和故障恢复
4.1.2 常用向量数据库对比
| 向量数据库 | 类型 | 特点 | 适用场景 |
|---|---|---|---|
| Milvus | 开源分布式 | 高性能,可扩展性强 | 企业级应用,大规模数据 |
| Chroma | 开源轻量级 | 易于使用,内存存储 | 原型开发,小规模应用 |
| FAISS | 开源库 | 速度快,内存占用高 | 离线应用,批量处理 |
| PGVector | 开源插件 | 集成PostgreSQL | 已有PostgreSQL基础的项目 |
| Pinecone | 云服务 | 托管服务,易用性高 | 快速部署,无需维护 |
| Qdrant | 开源 | 高速向量搜索,过滤能力强 | 实时应用,需要复杂过滤 |
| Weaviate | 开源 | 语义搜索,知识图谱 | 需要知识图谱能力的场景 |
4.1.3 选型考虑因素
- 数据规模:向量数量和维度
- 查询性能:响应时间和吞吐量
- 部署方式:本地部署 vs 云服务
- 成本预算:硬件成本和维护成本
- 技术栈:与现有系统的集成
- 功能需求:是否需要元数据过滤、复杂查询等
- 团队能力:维护和优化的技术实力
4.2 向量存储、索引、检索机制
4.2.1 向量存储
存储结构
- 原始向量:存储完整的向量数据
- 压缩向量:使用量化技术减少存储空间
- 元数据:存储与向量相关的附加信息
存储优化
- 分块存储:将向量数据分块存储,提高读写效率
- 缓存机制:热门向量放入缓存,加快访问速度
- 数据压缩:使用各种压缩算法减少存储空间
4.2.2 向量索引
索引类型
- FLAT:暴力搜索,准确性100%,速度慢
- IVF:倒排文件索引,平衡速度和准确性
- HNSW:层次化导航小世界图,查询速度快
- ANNOY:近似最近邻,内存占用低
- FAISS Indexes:多种索引类型,适应不同场景
索引参数调优
- nlist:IVF索引的聚类数量
- nprobe:查询时访问的聚类数量
- M:HNSW索引的最大连接数
- efConstruction:HNSW构建时的搜索范围
- efSearch:HNSW查询时的搜索范围
4.2.3 检索机制
相似度搜索
- KNN搜索:查找最相似的K个向量
- 范围搜索:查找指定相似度范围内的向量
- 混合搜索:结合关键词和向量相似度
检索流程
- 向量化:将查询文本转换为向量
- 索引查找:使用索引快速定位候选向量
- 相似度计算:计算查询向量与候选向量的相似度
- 结果排序:按相似度排序返回结果
- 过滤处理:应用元数据过滤条件
4.3 百万级数据向量库优化思路
4.3.1 数据优化
- 数据去重:去除重复文本,减少向量数量
- 数据压缩:使用量化技术压缩向量
- 数据分片:按时间、主题等维度分片存储
4.3.2 索引优化
- 选择合适的索引类型:根据数据特点选择索引
- 调优索引参数:根据查询需求调整索引参数
- 多级索引:结合多种索引类型
- 增量索引:支持动态数据的增量索引
4.3.3 查询优化
- 批量查询:合并多个查询,减少网络开销
- 缓存机制:缓存热门查询结果
- 异步查询:使用异步API提高并发性能
- 查询过滤:提前过滤不符合条件的数据
4.3.4 硬件优化
- 内存优化:增加内存,确保索引完全加载
- 存储优化:使用SSD存储,提高IO性能
- 计算优化:使用GPU加速相似度计算
- 网络优化:确保网络带宽充足
4.4 本地 + 云端向量库部署
4.4.1 本地部署
FAISS 本地部署
python
# 安装依赖
pip install faiss-cpu # 或 faiss-gpu
# 使用示例
import faiss
import numpy as np
# 生成随机向量
dimension = 768
n_vectors = 10000
vectors = np.random.random((n_vectors, dimension)).astype('float32')
# 构建索引
index = faiss.IndexFlatL2(dimension) # 暴力搜索索引
index.add(vectors) # 添加向量
# 搜索
query_vector = np.random.random((1, dimension)).astype('float32')
k = 5 # 返回5个最相似的向量
distances, indices = index.search(query_vector, k)
print(f"最相似的5个向量索引: {indices[0]}")
print(f"距离: {distances[0]}")Chroma 本地部署
python
# 安装依赖
pip install chromadb
# 使用示例
import chromadb
# 创建客户端
client = chromadb.Client()
# 创建集合
collection = client.create_collection(name="knowledge_base")
# 添加文档
collection.add(
documents=["企业私有知识库", "RAG 技术原理", "向量数据库"],
metadatas=[{"source": "doc1"}, {"source": "doc2"}, {"source": "doc3"}],
ids=["id1", "id2", "id3"]
)
# 查询
results = collection.query(
query_texts=["什么是 RAG?"],
n_results=2
)
print(results)Milvus 本地部署(使用Docker)
bash
# 拉取Milvus镜像
docker pull milvusdb/milvus:latest
# 启动Milvus
docker run -d --name milvus -p 19530:19530 -p 9091:9091 milvusdb/milvus:latestpython
# 安装依赖
pip install pymilvus
# 使用示例
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
# 连接Milvus
connections.connect(host="localhost", port="19530")
# 定义模式
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=768),
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=1000),
FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=100)
]
schema = CollectionSchema(fields, "Knowledge base collection")
# 创建集合
collection = Collection("knowledge_base", schema)
# 创建索引
index_params = {
"index_type": "IVF_FLAT",
"metric_type": "L2",
"params": {"nlist": 128}
}
collection.create_index("embedding", index_params)
# 插入数据
import numpy as np
data = [
[np.random.random(768).tolist() for _ in range(100)], # 嵌入向量
[f"Text {i}" for i in range(100)], # 文本
[f"Source {i}" for i in range(100)] # 来源
]
collection.insert(data)
# 加载集合到内存
collection.load()
# 搜索
query_vector = np.random.random(768).tolist()
search_params = {"metric_type": "L2", "params": {"nprobe": 10}}
results = collection.search(
data=[query_vector],
anns_field="embedding",
param=search_params,
limit=5,
output_fields=["text", "source"]
)
for result in results[0]:
print(f"ID: {result.id}, Distance: {result.distance}, Text: {result.entity.get('text')}")4.4.2 云端部署
Pinecone 云服务
python
# 安装依赖
pip install pinecone-client
# 使用示例
import pinecone
# 初始化Pinecone
pinecone.init(api_key="YOUR_API_KEY", environment="YOUR_ENVIRONMENT")
# 创建索引
index_name = "knowledge-base"
if index_name not in pinecone.list_indexes():
pinecone.create_index(
name=index_name,
dimension=768,
metric="cosine"
)
# 连接索引
index = pinecone.Index(index_name)
# 插入向量
vectors = [
("id1", [0.1, 0.2, 0.3, ...], {"source": "doc1"}),
("id2", [0.4, 0.5, 0.6, ...], {"source": "doc2"}),
("id3", [0.7, 0.8, 0.9, ...], {"source": "doc3"})
]
index.upsert(vectors)
# 查询
query_vector = [0.1, 0.2, 0.3, ...]
results = index.query(
vector=query_vector,
top_k=5,
include_metadata=True
)
print(results)AWS OpenSearch Service
python
# 安装依赖
pip install boto3 opensearch-py
# 使用示例
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
import boto3
# 配置AWS认证
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key,
'us-east-1', 'es')
# 连接OpenSearch
host = 'your-opensearch-domain.us-east-1.es.amazonaws.com'
port = 443
client = OpenSearch(
hosts=[{'host': host, 'port': port}],
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection
)
# 创建索引
index_name = 'knowledge-base'
if not client.indices.exists(index=index_name):
client.indices.create(
index=index_name,
body={
'settings': {
'index': {
'knn': True
}
},
'mappings': {
'properties': {
'embedding': {
'type': 'knn_vector',
'dimension': 768
},
'text': {
'type': 'text'
},
'source': {
'type': 'keyword'
}
}
}
}
)
# 插入文档
document = {
'embedding': [0.1, 0.2, 0.3, ...],
'text': '企业私有知识库',
'source': 'doc1'
}
client.index(index=index_name, body=document, id='1')
# 查询
query = {
'size': 5,
'query': {
'knn': {
'embedding': {
'vector': [0.1, 0.2, 0.3, ...],
'k': 5
}
}
}
}
response = client.search(index=index_name, body=query)
print(response)4.5 本章小结
- 学习了向量数据库的作用与选型考虑因素
- 掌握了向量存储、索引、检索的核心机制
- 了解了百万级数据向量库的优化思路
- 熟悉了本地和云端向量库的部署方法
这些知识为我们后续构建高性能的 RAG 系统打下了基础。在接下来的章节中,我们将学习 RAG 核心流程的实现,包括数据加载、切片、向量化、入库、检索和生成的完整流程。
