Skip to content

第47天:向量数据库对比

学习目标

  • 了解主流向量数据库
  • 掌握各数据库的特性
  • 学习性能对比方法
  • 理解成本对比
  • 掌握选型方法

主流向量数据库

Pinecone

简介

Pinecone是一个托管的向量数据库服务,提供高性能的向量搜索和存储。

核心特性

  • 完全托管服务
  • 自动扩展
  • 实时更新
  • 元数据过滤
  • 多种索引类型

安装

bash
pip install pinecone-client

使用示例

python
import pinecone

class PineconeVectorDB:
    def __init__(self, api_key: str, environment: str):
        pinecone.init(api_key=api_key, environment=environment)
        self.index_name = "my-index"
        self.dimension = 1536
    
    def create_index(self):
        if self.index_name not in pinecone.list_indexes():
            pinecone.create_index(
                name=self.index_name,
                dimension=self.dimension,
                metric="cosine",
                pod_type="p1"
            )
        
        self.index = pinecone.Index(self.index_name)
    
    def add_vectors(self, vectors: List[np.ndarray], 
                    metadata: List[Dict]):
        vectors_to_upsert = [
            {
                "id": str(i),
                "values": vector.tolist(),
                "metadata": metadata[i]
            }
            for i, vector in enumerate(vectors)
        ]
        
        self.index.upsert(vectors=vectors_to_upsert)
    
    def search(self, query_vector: np.ndarray, 
              top_k: int = 10) -> List[Dict]:
        results = self.index.query(
            vector=query_vector.tolist(),
            top_k=top_k,
            include_metadata=True
        )
        
        return [
            {
                "id": match["id"],
                "score": match["score"],
                "metadata": match["metadata"]
            }
            for match in results["matches"]
        ]
    
    def delete(self, vector_ids: List[str]):
        self.index.delete(ids=vector_ids)

Weaviate

简介

Weaviate是一个开源的向量搜索引擎,支持多种数据类型和查询方式。

核心特性

  • 开源
  • GraphQL API
  • 模块化架构
  • 多模态支持
  • 实时更新

安装

bash
pip install weaviate-client

使用示例

python
import weaviate

class WeaviateVectorDB:
    def __init__(self, url: str = "http://localhost:8080"):
        self.client = weaviate.Client(url)
        self.class_name = "Document"
    
    def create_class(self):
        schema = {
            "class": self.class_name,
            "vectorizer": "none",
            "properties": [
                {
                    "name": "content",
                    "dataType": ["text"]
                },
                {
                    "name": "metadata",
                    "dataType": ["object"]
                }
            ]
        }
        
        self.client.schema.create_class(schema)
    
    def add_vectors(self, vectors: List[np.ndarray], 
                    metadata: List[Dict]):
        with self.client.batch as batch:
            for i, (vector, meta) in enumerate(zip(vectors, metadata)):
                batch.add_data_object(
                    data_object={
                        "content": meta.get("content", ""),
                        "metadata": meta
                    },
                    class_name=self.class_name,
                    vector=vector.tolist()
                )
    
    def search(self, query_vector: np.ndarray, 
              top_k: int = 10) -> List[Dict]:
        results = self.client.query.get(
            class_name=self.class_name,
            near_vector={
                "vector": query_vector.tolist(),
                "certainty": 0.7
            },
            limit=top_k
        )
        
        return [
            {
                "id": result["_additional"]["id"],
                "score": result["_additional"]["certainty"],
                "content": result["properties"]["content"],
                "metadata": result["properties"]["metadata"]
            }
            for result in results["data"]["Get"][self.class_name]
        ]

Qdrant

简介

Qdrant是一个高性能的开源向量搜索引擎,支持过滤和负载均衡。

核心特性

  • 高性能
  • 过滤支持
  • 负载均衡
  • 易于部署
  • REST API

安装

bash
pip install qdrant-client

使用示例

python
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct

class QdrantVectorDB:
    def __init__(self, url: str = "http://localhost:6333"):
        self.client = QdrantClient(url=url)
        self.collection_name = "documents"
        self.dimension = 1536
    
    def create_collection(self):
        self.client.create_collection(
            collection_name=self.collection_name,
            vectors_config=VectorParams(
                size=self.dimension,
                distance=Distance.COSINE
            )
        )
    
    def add_vectors(self, vectors: List[np.ndarray], 
                    metadata: List[Dict]):
        points = [
            PointStruct(
                id=i,
                vector=vector.tolist(),
                payload=metadata[i]
            )
            for i, vector in enumerate(vectors)
        ]
        
        self.client.upsert(
            collection_name=self.collection_name,
            points=points
        )
    
    def search(self, query_vector: np.ndarray, 
              top_k: int = 10) -> List[Dict]:
        results = self.client.search(
            collection_name=self.collection_name,
            query_vector=query_vector.tolist(),
            limit=top_k
        )
        
        return [
            {
                "id": result.id,
                "score": result.score,
                "metadata": result.payload
            }
            for result in results
        ]
    
    def delete(self, points_ids: List[int]):
        self.client.delete(
            collection_name=self.collection_name,
            points_selector=points_ids
        )

Chroma

简介

Chroma是一个轻量级的开源向量数据库,适合小到中等规模的应用。

核心特性

  • 轻量级
  • 易于使用
  • 本地部署
  • Python原生
  • 快速原型开发

安装

bash
pip install chromadb

使用示例

python
import chromadb

class ChromaVectorDB:
    def __init__(self, persist_directory: str = "./chroma_db"):
        self.client = chromadb.PersistentClient(path=persist_directory)
        self.collection_name = "documents"
    
    def create_collection(self):
        self.collection = self.client.get_or_create_collection(
            name=self.collection_name,
            metadata={"hnsw:space": "cosine"}
        )
    
    def add_vectors(self, vectors: List[np.ndarray], 
                    metadata: List[Dict]):
        self.collection.add(
            embeddings=[v.tolist() for v in vectors],
            metadatas=metadata,
            ids=[str(i) for i in range(len(vectors))]
        )
    
    def search(self, query_vector: np.ndarray, 
              top_k: int = 10) -> List[Dict]:
        results = self.collection.query(
            query_embeddings=[query_vector.tolist()],
            n_results=top_k
        )
        
        return [
            {
                "id": results["ids"][0][i],
                "distance": results["distances"][0][i],
                "metadata": results["metadatas"][0][i]
            }
            for i in range(len(results["ids"][0]))
        ]
    
    def delete(self, ids: List[str]):
        self.collection.delete(ids=ids)

Milvus

简介

Milvus是一个开源的向量数据库,专为大规模向量搜索设计。

核心特性

  • 大规模支持
  • 高性能
  • 多种索引类型
  • 云原生
  • GPU加速

安装

bash
pip install pymilvus

使用示例

python
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType

class MilvusVectorDB:
    def __init__(self, host: str = "localhost", port: int = 19530):
        connections.connect(host=host, port=port)
        self.collection_name = "documents"
        self.dimension = 1536
    
    def create_collection(self):
        fields = [
            FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
            FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=self.dimension),
            FieldSchema(name="metadata", dtype=DataType.JSON)
        ]
        
        schema = CollectionSchema(fields, self.collection_name)
        self.collection = Collection(self.collection_name, schema)
    
    def add_vectors(self, vectors: List[np.ndarray], 
                    metadata: List[Dict]):
        ids = [i for i in range(len(vectors))]
        embeddings = [v.tolist() for v in vectors]
        
        self.collection.insert([
            ids,
            embeddings,
            metadata
        ])
        
        self.collection.flush()
    
    def search(self, query_vector: np.ndarray, 
              top_k: int = 10) -> List[Dict]:
        self.collection.load()
        
        search_params = {"metric_type": "IP", "params": {"nprobe": 10}}
        
        results = self.collection.search(
            data=[query_vector.tolist()],
            anns_field="embedding",
            param=search_params,
            limit=top_k,
            expr=None
        )
        
        return [
            {
                "id": result.id,
                "distance": result.distance,
                "metadata": result.entity.get("metadata")
            }
            for result in results[0]
        ]

FAISS

简介

FAISS是Facebook AI Research开发的向量相似度搜索库,专注于性能优化。

核心特性

  • 高性能
  • CPU/GPU支持
  • 多种索引类型
  • 无服务器依赖
  • 适合大规模数据

安装

bash
pip install faiss-cpu

使用示例

python
import faiss
import numpy as np

class FAISSVectorDB:
    def __init__(self, dimension: int = 1536):
        self.dimension = dimension
        self.index = None
        self.metadata = []
    
    def create_index(self, index_type: str = "IVF"):
        if index_type == "IVF":
            quantizer = faiss.IndexFlatL2(self.dimension)
            self.index = faiss.IndexIVFFlat(quantizer, self.dimension, 100)
        elif index_type == "HNSW":
            self.index = faiss.IndexHNSWFlat(self.dimension, 32)
        else:
            self.index = faiss.IndexFlatL2(self.dimension)
    
    def add_vectors(self, vectors: List[np.ndarray], 
                    metadata: List[Dict]):
        vectors_array = np.array([v.tolist() for v in vectors])
        
        if isinstance(self.index, faiss.IndexIVFFlat):
            self.index.train(vectors_array)
        
        self.index.add(vectors_array)
        self.metadata.extend(metadata)
    
    def search(self, query_vector: np.ndarray, 
              top_k: int = 10) -> List[Dict]:
        query_array = np.array([query_vector.tolist()])
        
        distances, indices = self.index.search(query_array, top_k)
        
        return [
            {
                "id": int(indices[0][i]),
                "distance": float(distances[0][i]),
                "metadata": self.metadata[int(indices[0][i])]
            }
            for i in range(top_k)
        ]

性能对比

查询延迟

python
import time
from typing import List, Dict

class PerformanceBenchmark:
    def __init__(self):
        self.databases = {}
    
    def add_database(self, name: str, database):
        self.databases[name] = database
    
    def benchmark_query_latency(self, n_queries: int = 100) -> Dict:
        results = {}
        
        for name, db in self.databases.items():
            latencies = []
            
            for _ in range(n_queries):
                query_vector = np.random.randn(1536)
                
                start_time = time.time()
                db.search(query_vector, top_k=10)
                end_time = time.time()
                
                latencies.append(end_time - start_time)
            
            results[name] = {
                "mean": np.mean(latencies),
                "median": np.median(latencies),
                "p95": np.percentile(latencies, 95),
                "p99": np.percentile(latencies, 99)
            }
        
        return results
    
    def print_results(self, results: Dict):
        print("Query Latency Benchmark (ms)")
        print("-" * 60)
        
        for name, metrics in results.items():
            print(f"\n{name}:")
            print(f"  Mean:   {metrics['mean']*1000:.2f}")
            print(f"  Median: {metrics['median']*1000:.2f}")
            print(f"  P95:    {metrics['p95']*1000:.2f}")
            print(f"  P99:    {metrics['p99']*1000:.2f}")

吞吐量

python
class ThroughputBenchmark:
    def __init__(self):
        self.databases = {}
    
    def add_database(self, name: str, database):
        self.databases[name] = database
    
    def benchmark_throughput(self, duration: int = 60) -> Dict:
        results = {}
        
        for name, db in self.databases.items():
            start_time = time.time()
            query_count = 0
            
            while time.time() - start_time < duration:
                query_vector = np.random.randn(1536)
                db.search(query_vector, top_k=10)
                query_count += 1
            
            throughput = query_count / duration
            results[name] = throughput
        
        return results
    
    def print_results(self, results: Dict):
        print("Throughput Benchmark (queries/second)")
        print("-" * 60)
        
        for name, throughput in results.items():
            print(f"{name}: {throughput:.2f}")

成本对比

托管服务成本

python
class CostCalculator:
    def __init__(self):
        self.pricing = {
            "pinecone": {
                "starter": {"price": 70, "vectors": 100000},
                "production": {"price": 70, "vectors": 1000000}
            },
            "weaviate": {
                "cloud": {"price": 100, "vectors": 1000000}
            }
        }
    
    def calculate_monthly_cost(self, service: str, 
                           n_vectors: int) -> float:
        if service not in self.pricing:
            return 0.0
        
        pricing = self.pricing[service]
        
        for tier, info in pricing.items():
            if n_vectors <= info["vectors"]:
                return info["price"]
        
        return self._calculate_custom_cost(service, n_vectors)
    
    def _calculate_custom_cost(self, service: str, 
                              n_vectors: int) -> float:
        base_tier = list(self.pricing[service].values())[0]
        price_per_vector = base_tier["price"] / base_tier["vectors"]
        
        return price_per_vector * n_vectors
    
    def compare_costs(self, n_vectors: int) -> Dict:
        costs = {}
        
        for service in self.pricing:
            costs[service] = self.calculate_monthly_cost(
                service, 
                n_vectors
            )
        
        return costs

自建成本

python
class SelfHostedCostCalculator:
    def __init__(self):
        self.hardware_costs = {
            "cpu": {"price": 50, "vectors": 1000000},
            "gpu": {"price": 200, "vectors": 10000000}
        }
    
    def calculate_monthly_cost(self, hardware: str, 
                           n_vectors: int) -> float:
        if hardware not in self.hardware_costs:
            return 0.0
        
        cost_info = self.hardware_costs[hardware]
        instances = max(1, n_vectors / cost_info["vectors"])
        
        return cost_info["price"] * instances

选型指南

决策矩阵

python
class SelectionGuide:
    def __init__(self):
        self.criteria = {
            "performance": 0.3,
            "cost": 0.2,
            "ease_of_use": 0.2,
            "scalability": 0.15,
            "features": 0.15
        }
        
        self.database_scores = {
            "pinecone": {
                "performance": 0.9,
                "cost": 0.6,
                "ease_of_use": 0.9,
                "scalability": 0.9,
                "features": 0.8
            },
            "weaviate": {
                "performance": 0.8,
                "cost": 0.7,
                "ease_of_use": 0.7,
                "scalability": 0.8,
                "features": 0.9
            },
            "qdrant": {
                "performance": 0.85,
                "cost": 0.8,
                "ease_of_use": 0.8,
                "scalability": 0.85,
                "features": 0.85
            },
            "chroma": {
                "performance": 0.6,
                "cost": 0.95,
                "ease_of_use": 0.95,
                "scalability": 0.5,
                "features": 0.6
            },
            "milvus": {
                "performance": 0.95,
                "cost": 0.85,
                "ease_of_use": 0.6,
                "scalability": 0.95,
                "features": 0.9
            },
            "faiss": {
                "performance": 0.95,
                "cost": 0.95,
                "ease_of_use": 0.5,
                "scalability": 0.9,
                "features": 0.7
            }
        }
    
    def recommend(self, requirements: Dict) -> List[str]:
        recommendations = []
        
        for db_name, scores in self.database_scores.items():
            total_score = 0.0
            
            for criterion, weight in self.criteria.items():
                if criterion in requirements:
                    importance = requirements[criterion]
                    total_score += scores[criterion] * weight * importance
            
            recommendations.append((db_name, total_score))
        
        recommendations.sort(key=lambda x: x[1], reverse=True)
        
        return [db_name for db_name, _ in recommendations]

场景推荐

python
class ScenarioRecommender:
    def recommend_for_scenario(self, scenario: str) -> str:
        recommendations = {
            "startup": "pinecone",
            "enterprise": "milvus",
            "research": "faiss",
            "prototype": "chroma",
            "production": "qdrant",
            "multimodal": "weaviate"
        }
        
        return recommendations.get(scenario, "qdrant")
    
    def get_recommendation_reason(self, scenario: str) -> str:
        reasons = {
            "startup": "Pinecone offers managed service with good performance and reasonable pricing",
            "enterprise": "Milvus provides excellent scalability and performance for large-scale deployments",
            "research": "FAISS offers maximum performance and flexibility for research workloads",
            "prototype": "Chroma is lightweight and easy to use for rapid prototyping",
            "production": "Qdrant balances performance, cost, and features for production use",
            "multimodal": "Weaviate has excellent support for multimodal data"
        }
        
        return reasons.get(scenario, "")

实践练习

练习1:对比向量数据库性能

python
def compare_databases():
    benchmark = PerformanceBenchmark()
    
    pinecone_db = PineconeVectorDB("api_key", "environment")
    qdrant_db = QdrantVectorDB()
    
    benchmark.add_database("Pinecone", pinecone_db)
    benchmark.add_database("Qdrant", qdrant_db)
    
    results = benchmark.benchmark_query_latency()
    benchmark.print_results(results)

练习2:选择合适的向量数据库

python
def select_database():
    guide = SelectionGuide()
    
    requirements = {
        "performance": 1.0,
        "cost": 0.8,
        "ease_of_use": 0.9,
        "scalability": 0.7,
        "features": 0.8
    }
    
    recommendations = guide.recommend(requirements)
    
    print("Recommended databases (in order):")
    for i, db in enumerate(recommendations, 1):
        print(f"{i}. {db}")

总结

本节我们学习了向量数据库对比:

  1. 主流向量数据库(Pinecone、Weaviate、Qdrant、Chroma、Milvus、FAISS)
  2. 各数据库的核心特性和使用方法
  3. 性能对比方法(查询延迟、吞吐量)
  4. 成本对比(托管服务、自建)
  5. 选型指南(决策矩阵、场景推荐)

选择合适的向量数据库对于构建高效的RAG系统至关重要。

参考资源