Skip to content

向量数据库实战

4.1 向量数据库作用与选型

4.1.1 向量数据库的作用

  • 高效存储:专门优化向量数据的存储结构
  • 快速检索:支持相似度搜索和近似最近邻搜索
  • 可扩展性:处理大规模向量数据
  • 多样化查询:支持多种相似度度量和查询方式
  • 元数据过滤:结合结构化数据进行复合查询
  • 高可用性:支持分布式部署和故障恢复

4.1.2 常用向量数据库对比

向量数据库类型特点适用场景
Milvus开源分布式高性能,可扩展性强企业级应用,大规模数据
Chroma开源轻量级易于使用,内存存储原型开发,小规模应用
FAISS开源库速度快,内存占用高离线应用,批量处理
PGVector开源插件集成PostgreSQL已有PostgreSQL基础的项目
Pinecone云服务托管服务,易用性高快速部署,无需维护
Qdrant开源高速向量搜索,过滤能力强实时应用,需要复杂过滤
Weaviate开源语义搜索,知识图谱需要知识图谱能力的场景

4.1.3 选型考虑因素

  • 数据规模:向量数量和维度
  • 查询性能:响应时间和吞吐量
  • 部署方式:本地部署 vs 云服务
  • 成本预算:硬件成本和维护成本
  • 技术栈:与现有系统的集成
  • 功能需求:是否需要元数据过滤、复杂查询等
  • 团队能力:维护和优化的技术实力

4.2 向量存储、索引、检索机制

4.2.1 向量存储

存储结构

  • 原始向量:存储完整的向量数据
  • 压缩向量:使用量化技术减少存储空间
  • 元数据:存储与向量相关的附加信息

存储优化

  • 分块存储:将向量数据分块存储,提高读写效率
  • 缓存机制:热门向量放入缓存,加快访问速度
  • 数据压缩:使用各种压缩算法减少存储空间

4.2.2 向量索引

索引类型

  1. FLAT:暴力搜索,准确性100%,速度慢
  2. IVF:倒排文件索引,平衡速度和准确性
  3. HNSW:层次化导航小世界图,查询速度快
  4. ANNOY:近似最近邻,内存占用低
  5. FAISS Indexes:多种索引类型,适应不同场景

索引参数调优

  • nlist:IVF索引的聚类数量
  • nprobe:查询时访问的聚类数量
  • M:HNSW索引的最大连接数
  • efConstruction:HNSW构建时的搜索范围
  • efSearch:HNSW查询时的搜索范围

4.2.3 检索机制

相似度搜索

  • KNN搜索:查找最相似的K个向量
  • 范围搜索:查找指定相似度范围内的向量
  • 混合搜索:结合关键词和向量相似度

检索流程

  1. 向量化:将查询文本转换为向量
  2. 索引查找:使用索引快速定位候选向量
  3. 相似度计算:计算查询向量与候选向量的相似度
  4. 结果排序:按相似度排序返回结果
  5. 过滤处理:应用元数据过滤条件

4.3 百万级数据向量库优化思路

4.3.1 数据优化

  • 数据去重:去除重复文本,减少向量数量
  • 数据压缩:使用量化技术压缩向量
  • 数据分片:按时间、主题等维度分片存储

4.3.2 索引优化

  • 选择合适的索引类型:根据数据特点选择索引
  • 调优索引参数:根据查询需求调整索引参数
  • 多级索引:结合多种索引类型
  • 增量索引:支持动态数据的增量索引

4.3.3 查询优化

  • 批量查询:合并多个查询,减少网络开销
  • 缓存机制:缓存热门查询结果
  • 异步查询:使用异步API提高并发性能
  • 查询过滤:提前过滤不符合条件的数据

4.3.4 硬件优化

  • 内存优化:增加内存,确保索引完全加载
  • 存储优化:使用SSD存储,提高IO性能
  • 计算优化:使用GPU加速相似度计算
  • 网络优化:确保网络带宽充足

4.4 本地 + 云端向量库部署

4.4.1 本地部署

FAISS 本地部署

python
# 安装依赖
pip install faiss-cpu  # 或 faiss-gpu

# 使用示例
import faiss
import numpy as np

# 生成随机向量
dimension = 768
n_vectors = 10000
vectors = np.random.random((n_vectors, dimension)).astype('float32')

# 构建索引
index = faiss.IndexFlatL2(dimension)  # 暴力搜索索引
index.add(vectors)  # 添加向量

# 搜索
query_vector = np.random.random((1, dimension)).astype('float32')
k = 5  # 返回5个最相似的向量
distances, indices = index.search(query_vector, k)

print(f"最相似的5个向量索引: {indices[0]}")
print(f"距离: {distances[0]}")

Chroma 本地部署

python
# 安装依赖
pip install chromadb

# 使用示例
import chromadb

# 创建客户端
client = chromadb.Client()

# 创建集合
collection = client.create_collection(name="knowledge_base")

# 添加文档
collection.add(
    documents=["企业私有知识库", "RAG 技术原理", "向量数据库"],
    metadatas=[{"source": "doc1"}, {"source": "doc2"}, {"source": "doc3"}],
    ids=["id1", "id2", "id3"]
)

# 查询
results = collection.query(
    query_texts=["什么是 RAG?"],
    n_results=2
)

print(results)

Milvus 本地部署(使用Docker)

bash
# 拉取Milvus镜像
docker pull milvusdb/milvus:latest

# 启动Milvus
docker run -d --name milvus -p 19530:19530 -p 9091:9091 milvusdb/milvus:latest
python
# 安装依赖
pip install pymilvus

# 使用示例
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType

# 连接Milvus
connections.connect(host="localhost", port="19530")

# 定义模式
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=768),
    FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=1000),
    FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=100)
]
schema = CollectionSchema(fields, "Knowledge base collection")

# 创建集合
collection = Collection("knowledge_base", schema)

# 创建索引
index_params = {
    "index_type": "IVF_FLAT",
    "metric_type": "L2",
    "params": {"nlist": 128}
}
collection.create_index("embedding", index_params)

# 插入数据
import numpy as np
data = [
    [np.random.random(768).tolist() for _ in range(100)],  # 嵌入向量
    [f"Text {i}" for i in range(100)],  # 文本
    [f"Source {i}" for i in range(100)]  # 来源
]
collection.insert(data)

# 加载集合到内存
collection.load()

# 搜索
query_vector = np.random.random(768).tolist()
search_params = {"metric_type": "L2", "params": {"nprobe": 10}}
results = collection.search(
    data=[query_vector],
    anns_field="embedding",
    param=search_params,
    limit=5,
    output_fields=["text", "source"]
)

for result in results[0]:
    print(f"ID: {result.id}, Distance: {result.distance}, Text: {result.entity.get('text')}")

4.4.2 云端部署

Pinecone 云服务

python
# 安装依赖
pip install pinecone-client

# 使用示例
import pinecone

# 初始化Pinecone
pinecone.init(api_key="YOUR_API_KEY", environment="YOUR_ENVIRONMENT")

# 创建索引
index_name = "knowledge-base"
if index_name not in pinecone.list_indexes():
    pinecone.create_index(
        name=index_name,
        dimension=768,
        metric="cosine"
    )

# 连接索引
index = pinecone.Index(index_name)

# 插入向量
vectors = [
    ("id1", [0.1, 0.2, 0.3, ...], {"source": "doc1"}),
    ("id2", [0.4, 0.5, 0.6, ...], {"source": "doc2"}),
    ("id3", [0.7, 0.8, 0.9, ...], {"source": "doc3"})
]
index.upsert(vectors)

# 查询
query_vector = [0.1, 0.2, 0.3, ...]
results = index.query(
    vector=query_vector,
    top_k=5,
    include_metadata=True
)

print(results)

AWS OpenSearch Service

python
# 安装依赖
pip install boto3 opensearch-py

# 使用示例
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
import boto3

# 配置AWS认证
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, 
                   'us-east-1', 'es')

# 连接OpenSearch
host = 'your-opensearch-domain.us-east-1.es.amazonaws.com'
port = 443

client = OpenSearch(
    hosts=[{'host': host, 'port': port}],
    http_auth=awsauth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection
)

# 创建索引
index_name = 'knowledge-base'
if not client.indices.exists(index=index_name):
    client.indices.create(
        index=index_name,
        body={
            'settings': {
                'index': {
                    'knn': True
                }
            },
            'mappings': {
                'properties': {
                    'embedding': {
                        'type': 'knn_vector',
                        'dimension': 768
                    },
                    'text': {
                        'type': 'text'
                    },
                    'source': {
                        'type': 'keyword'
                    }
                }
            }
        }
    )

# 插入文档
document = {
    'embedding': [0.1, 0.2, 0.3, ...],
    'text': '企业私有知识库',
    'source': 'doc1'
}
client.index(index=index_name, body=document, id='1')

# 查询
query = {
    'size': 5,
    'query': {
        'knn': {
            'embedding': {
                'vector': [0.1, 0.2, 0.3, ...],
                'k': 5
            }
        }
    }
}
response = client.search(index=index_name, body=query)
print(response)

4.5 本章小结

  • 学习了向量数据库的作用与选型考虑因素
  • 掌握了向量存储、索引、检索的核心机制
  • 了解了百万级数据向量库的优化思路
  • 熟悉了本地和云端向量库的部署方法

这些知识为我们后续构建高性能的 RAG 系统打下了基础。在接下来的章节中,我们将学习 RAG 核心流程的实现,包括数据加载、切片、向量化、入库、检索和生成的完整流程。