Appearance
第47天:向量数据库对比
学习目标
- 了解主流向量数据库
- 掌握各数据库的特性
- 学习性能对比方法
- 理解成本对比
- 掌握选型方法
主流向量数据库
Pinecone
简介:
Pinecone是一个托管的向量数据库服务,提供高性能的向量搜索和存储。
核心特性:
- 完全托管服务
- 自动扩展
- 实时更新
- 元数据过滤
- 多种索引类型
安装:
bash
pip install pinecone-client使用示例:
python
import pinecone
class PineconeVectorDB:
def __init__(self, api_key: str, environment: str):
pinecone.init(api_key=api_key, environment=environment)
self.index_name = "my-index"
self.dimension = 1536
def create_index(self):
if self.index_name not in pinecone.list_indexes():
pinecone.create_index(
name=self.index_name,
dimension=self.dimension,
metric="cosine",
pod_type="p1"
)
self.index = pinecone.Index(self.index_name)
def add_vectors(self, vectors: List[np.ndarray],
metadata: List[Dict]):
vectors_to_upsert = [
{
"id": str(i),
"values": vector.tolist(),
"metadata": metadata[i]
}
for i, vector in enumerate(vectors)
]
self.index.upsert(vectors=vectors_to_upsert)
def search(self, query_vector: np.ndarray,
top_k: int = 10) -> List[Dict]:
results = self.index.query(
vector=query_vector.tolist(),
top_k=top_k,
include_metadata=True
)
return [
{
"id": match["id"],
"score": match["score"],
"metadata": match["metadata"]
}
for match in results["matches"]
]
def delete(self, vector_ids: List[str]):
self.index.delete(ids=vector_ids)Weaviate
简介:
Weaviate是一个开源的向量搜索引擎,支持多种数据类型和查询方式。
核心特性:
- 开源
- GraphQL API
- 模块化架构
- 多模态支持
- 实时更新
安装:
bash
pip install weaviate-client使用示例:
python
import weaviate
class WeaviateVectorDB:
def __init__(self, url: str = "http://localhost:8080"):
self.client = weaviate.Client(url)
self.class_name = "Document"
def create_class(self):
schema = {
"class": self.class_name,
"vectorizer": "none",
"properties": [
{
"name": "content",
"dataType": ["text"]
},
{
"name": "metadata",
"dataType": ["object"]
}
]
}
self.client.schema.create_class(schema)
def add_vectors(self, vectors: List[np.ndarray],
metadata: List[Dict]):
with self.client.batch as batch:
for i, (vector, meta) in enumerate(zip(vectors, metadata)):
batch.add_data_object(
data_object={
"content": meta.get("content", ""),
"metadata": meta
},
class_name=self.class_name,
vector=vector.tolist()
)
def search(self, query_vector: np.ndarray,
top_k: int = 10) -> List[Dict]:
results = self.client.query.get(
class_name=self.class_name,
near_vector={
"vector": query_vector.tolist(),
"certainty": 0.7
},
limit=top_k
)
return [
{
"id": result["_additional"]["id"],
"score": result["_additional"]["certainty"],
"content": result["properties"]["content"],
"metadata": result["properties"]["metadata"]
}
for result in results["data"]["Get"][self.class_name]
]Qdrant
简介:
Qdrant是一个高性能的开源向量搜索引擎,支持过滤和负载均衡。
核心特性:
- 高性能
- 过滤支持
- 负载均衡
- 易于部署
- REST API
安装:
bash
pip install qdrant-client使用示例:
python
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
class QdrantVectorDB:
def __init__(self, url: str = "http://localhost:6333"):
self.client = QdrantClient(url=url)
self.collection_name = "documents"
self.dimension = 1536
def create_collection(self):
self.client.create_collection(
collection_name=self.collection_name,
vectors_config=VectorParams(
size=self.dimension,
distance=Distance.COSINE
)
)
def add_vectors(self, vectors: List[np.ndarray],
metadata: List[Dict]):
points = [
PointStruct(
id=i,
vector=vector.tolist(),
payload=metadata[i]
)
for i, vector in enumerate(vectors)
]
self.client.upsert(
collection_name=self.collection_name,
points=points
)
def search(self, query_vector: np.ndarray,
top_k: int = 10) -> List[Dict]:
results = self.client.search(
collection_name=self.collection_name,
query_vector=query_vector.tolist(),
limit=top_k
)
return [
{
"id": result.id,
"score": result.score,
"metadata": result.payload
}
for result in results
]
def delete(self, points_ids: List[int]):
self.client.delete(
collection_name=self.collection_name,
points_selector=points_ids
)Chroma
简介:
Chroma是一个轻量级的开源向量数据库,适合小到中等规模的应用。
核心特性:
- 轻量级
- 易于使用
- 本地部署
- Python原生
- 快速原型开发
安装:
bash
pip install chromadb使用示例:
python
import chromadb
class ChromaVectorDB:
def __init__(self, persist_directory: str = "./chroma_db"):
self.client = chromadb.PersistentClient(path=persist_directory)
self.collection_name = "documents"
def create_collection(self):
self.collection = self.client.get_or_create_collection(
name=self.collection_name,
metadata={"hnsw:space": "cosine"}
)
def add_vectors(self, vectors: List[np.ndarray],
metadata: List[Dict]):
self.collection.add(
embeddings=[v.tolist() for v in vectors],
metadatas=metadata,
ids=[str(i) for i in range(len(vectors))]
)
def search(self, query_vector: np.ndarray,
top_k: int = 10) -> List[Dict]:
results = self.collection.query(
query_embeddings=[query_vector.tolist()],
n_results=top_k
)
return [
{
"id": results["ids"][0][i],
"distance": results["distances"][0][i],
"metadata": results["metadatas"][0][i]
}
for i in range(len(results["ids"][0]))
]
def delete(self, ids: List[str]):
self.collection.delete(ids=ids)Milvus
简介:
Milvus是一个开源的向量数据库,专为大规模向量搜索设计。
核心特性:
- 大规模支持
- 高性能
- 多种索引类型
- 云原生
- GPU加速
安装:
bash
pip install pymilvus使用示例:
python
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
class MilvusVectorDB:
def __init__(self, host: str = "localhost", port: int = 19530):
connections.connect(host=host, port=port)
self.collection_name = "documents"
self.dimension = 1536
def create_collection(self):
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=self.dimension),
FieldSchema(name="metadata", dtype=DataType.JSON)
]
schema = CollectionSchema(fields, self.collection_name)
self.collection = Collection(self.collection_name, schema)
def add_vectors(self, vectors: List[np.ndarray],
metadata: List[Dict]):
ids = [i for i in range(len(vectors))]
embeddings = [v.tolist() for v in vectors]
self.collection.insert([
ids,
embeddings,
metadata
])
self.collection.flush()
def search(self, query_vector: np.ndarray,
top_k: int = 10) -> List[Dict]:
self.collection.load()
search_params = {"metric_type": "IP", "params": {"nprobe": 10}}
results = self.collection.search(
data=[query_vector.tolist()],
anns_field="embedding",
param=search_params,
limit=top_k,
expr=None
)
return [
{
"id": result.id,
"distance": result.distance,
"metadata": result.entity.get("metadata")
}
for result in results[0]
]FAISS
简介:
FAISS是Facebook AI Research开发的向量相似度搜索库,专注于性能优化。
核心特性:
- 高性能
- CPU/GPU支持
- 多种索引类型
- 无服务器依赖
- 适合大规模数据
安装:
bash
pip install faiss-cpu使用示例:
python
import faiss
import numpy as np
class FAISSVectorDB:
def __init__(self, dimension: int = 1536):
self.dimension = dimension
self.index = None
self.metadata = []
def create_index(self, index_type: str = "IVF"):
if index_type == "IVF":
quantizer = faiss.IndexFlatL2(self.dimension)
self.index = faiss.IndexIVFFlat(quantizer, self.dimension, 100)
elif index_type == "HNSW":
self.index = faiss.IndexHNSWFlat(self.dimension, 32)
else:
self.index = faiss.IndexFlatL2(self.dimension)
def add_vectors(self, vectors: List[np.ndarray],
metadata: List[Dict]):
vectors_array = np.array([v.tolist() for v in vectors])
if isinstance(self.index, faiss.IndexIVFFlat):
self.index.train(vectors_array)
self.index.add(vectors_array)
self.metadata.extend(metadata)
def search(self, query_vector: np.ndarray,
top_k: int = 10) -> List[Dict]:
query_array = np.array([query_vector.tolist()])
distances, indices = self.index.search(query_array, top_k)
return [
{
"id": int(indices[0][i]),
"distance": float(distances[0][i]),
"metadata": self.metadata[int(indices[0][i])]
}
for i in range(top_k)
]性能对比
查询延迟
python
import time
from typing import List, Dict
class PerformanceBenchmark:
def __init__(self):
self.databases = {}
def add_database(self, name: str, database):
self.databases[name] = database
def benchmark_query_latency(self, n_queries: int = 100) -> Dict:
results = {}
for name, db in self.databases.items():
latencies = []
for _ in range(n_queries):
query_vector = np.random.randn(1536)
start_time = time.time()
db.search(query_vector, top_k=10)
end_time = time.time()
latencies.append(end_time - start_time)
results[name] = {
"mean": np.mean(latencies),
"median": np.median(latencies),
"p95": np.percentile(latencies, 95),
"p99": np.percentile(latencies, 99)
}
return results
def print_results(self, results: Dict):
print("Query Latency Benchmark (ms)")
print("-" * 60)
for name, metrics in results.items():
print(f"\n{name}:")
print(f" Mean: {metrics['mean']*1000:.2f}")
print(f" Median: {metrics['median']*1000:.2f}")
print(f" P95: {metrics['p95']*1000:.2f}")
print(f" P99: {metrics['p99']*1000:.2f}")吞吐量
python
class ThroughputBenchmark:
def __init__(self):
self.databases = {}
def add_database(self, name: str, database):
self.databases[name] = database
def benchmark_throughput(self, duration: int = 60) -> Dict:
results = {}
for name, db in self.databases.items():
start_time = time.time()
query_count = 0
while time.time() - start_time < duration:
query_vector = np.random.randn(1536)
db.search(query_vector, top_k=10)
query_count += 1
throughput = query_count / duration
results[name] = throughput
return results
def print_results(self, results: Dict):
print("Throughput Benchmark (queries/second)")
print("-" * 60)
for name, throughput in results.items():
print(f"{name}: {throughput:.2f}")成本对比
托管服务成本
python
class CostCalculator:
def __init__(self):
self.pricing = {
"pinecone": {
"starter": {"price": 70, "vectors": 100000},
"production": {"price": 70, "vectors": 1000000}
},
"weaviate": {
"cloud": {"price": 100, "vectors": 1000000}
}
}
def calculate_monthly_cost(self, service: str,
n_vectors: int) -> float:
if service not in self.pricing:
return 0.0
pricing = self.pricing[service]
for tier, info in pricing.items():
if n_vectors <= info["vectors"]:
return info["price"]
return self._calculate_custom_cost(service, n_vectors)
def _calculate_custom_cost(self, service: str,
n_vectors: int) -> float:
base_tier = list(self.pricing[service].values())[0]
price_per_vector = base_tier["price"] / base_tier["vectors"]
return price_per_vector * n_vectors
def compare_costs(self, n_vectors: int) -> Dict:
costs = {}
for service in self.pricing:
costs[service] = self.calculate_monthly_cost(
service,
n_vectors
)
return costs自建成本
python
class SelfHostedCostCalculator:
def __init__(self):
self.hardware_costs = {
"cpu": {"price": 50, "vectors": 1000000},
"gpu": {"price": 200, "vectors": 10000000}
}
def calculate_monthly_cost(self, hardware: str,
n_vectors: int) -> float:
if hardware not in self.hardware_costs:
return 0.0
cost_info = self.hardware_costs[hardware]
instances = max(1, n_vectors / cost_info["vectors"])
return cost_info["price"] * instances选型指南
决策矩阵
python
class SelectionGuide:
def __init__(self):
self.criteria = {
"performance": 0.3,
"cost": 0.2,
"ease_of_use": 0.2,
"scalability": 0.15,
"features": 0.15
}
self.database_scores = {
"pinecone": {
"performance": 0.9,
"cost": 0.6,
"ease_of_use": 0.9,
"scalability": 0.9,
"features": 0.8
},
"weaviate": {
"performance": 0.8,
"cost": 0.7,
"ease_of_use": 0.7,
"scalability": 0.8,
"features": 0.9
},
"qdrant": {
"performance": 0.85,
"cost": 0.8,
"ease_of_use": 0.8,
"scalability": 0.85,
"features": 0.85
},
"chroma": {
"performance": 0.6,
"cost": 0.95,
"ease_of_use": 0.95,
"scalability": 0.5,
"features": 0.6
},
"milvus": {
"performance": 0.95,
"cost": 0.85,
"ease_of_use": 0.6,
"scalability": 0.95,
"features": 0.9
},
"faiss": {
"performance": 0.95,
"cost": 0.95,
"ease_of_use": 0.5,
"scalability": 0.9,
"features": 0.7
}
}
def recommend(self, requirements: Dict) -> List[str]:
recommendations = []
for db_name, scores in self.database_scores.items():
total_score = 0.0
for criterion, weight in self.criteria.items():
if criterion in requirements:
importance = requirements[criterion]
total_score += scores[criterion] * weight * importance
recommendations.append((db_name, total_score))
recommendations.sort(key=lambda x: x[1], reverse=True)
return [db_name for db_name, _ in recommendations]场景推荐
python
class ScenarioRecommender:
def recommend_for_scenario(self, scenario: str) -> str:
recommendations = {
"startup": "pinecone",
"enterprise": "milvus",
"research": "faiss",
"prototype": "chroma",
"production": "qdrant",
"multimodal": "weaviate"
}
return recommendations.get(scenario, "qdrant")
def get_recommendation_reason(self, scenario: str) -> str:
reasons = {
"startup": "Pinecone offers managed service with good performance and reasonable pricing",
"enterprise": "Milvus provides excellent scalability and performance for large-scale deployments",
"research": "FAISS offers maximum performance and flexibility for research workloads",
"prototype": "Chroma is lightweight and easy to use for rapid prototyping",
"production": "Qdrant balances performance, cost, and features for production use",
"multimodal": "Weaviate has excellent support for multimodal data"
}
return reasons.get(scenario, "")实践练习
练习1:对比向量数据库性能
python
def compare_databases():
benchmark = PerformanceBenchmark()
pinecone_db = PineconeVectorDB("api_key", "environment")
qdrant_db = QdrantVectorDB()
benchmark.add_database("Pinecone", pinecone_db)
benchmark.add_database("Qdrant", qdrant_db)
results = benchmark.benchmark_query_latency()
benchmark.print_results(results)练习2:选择合适的向量数据库
python
def select_database():
guide = SelectionGuide()
requirements = {
"performance": 1.0,
"cost": 0.8,
"ease_of_use": 0.9,
"scalability": 0.7,
"features": 0.8
}
recommendations = guide.recommend(requirements)
print("Recommended databases (in order):")
for i, db in enumerate(recommendations, 1):
print(f"{i}. {db}")总结
本节我们学习了向量数据库对比:
- 主流向量数据库(Pinecone、Weaviate、Qdrant、Chroma、Milvus、FAISS)
- 各数据库的核心特性和使用方法
- 性能对比方法(查询延迟、吞吐量)
- 成本对比(托管服务、自建)
- 选型指南(决策矩阵、场景推荐)
选择合适的向量数据库对于构建高效的RAG系统至关重要。
