Skip to content

第46天:向量数据库基础

学习目标

  • 理解向量数据库的基本概念
  • 掌握Embedding原理
  • 学习相似度计算方法
  • 了解向量索引技术
  • 掌握ANN搜索算法

向量数据库简介

什么是向量数据库

向量数据库是专门用于存储、索引和查询高维向量的数据库系统。它是RAG系统的核心组件,用于快速检索相似的文档片段。

核心功能

  1. 向量存储:高效存储高维向量
  2. 向量索引:构建索引加速查询
  3. 相似度搜索:快速找到最相似的向量
  4. 元数据过滤:支持基于元数据的过滤

与传统数据库的区别

特性传统数据库向量数据库
数据类型结构化数据高维向量
查询方式精确匹配相似度搜索
索引类型B-tree, HashHNSW, IVF
应用场景事务处理AI/ML应用

Embedding原理

什么是Embedding

Embedding是将文本、图像等数据映射到高维向量空间的技术。相似的内容在向量空间中距离更近。

示例

python
import numpy as np

class EmbeddingGenerator:
    def __init__(self, model_name: str = "text-embedding-ada-002"):
        self.model_name = model_name
        self.dimension = 1536
    
    def generate(self, text: str) -> np.ndarray:
        words = text.lower().split()
        
        word_vectors = []
        for word in words:
            vector = self._word_to_vector(word)
            word_vectors.append(vector)
        
        if word_vectors:
            return np.mean(word_vectors, axis=0)
        else:
            return np.zeros(self.dimension)
    
    def _word_to_vector(self, word: str) -> np.ndarray:
        vector = np.zeros(self.dimension)
        
        for i, char in enumerate(word[:self.dimension]):
            vector[i] = ord(char) / 256.0
        
        return vector
    
    def generate_batch(self, texts: List[str]) -> np.ndarray:
        return np.array([self.generate(text) for text in texts])

Embedding特性

语义相似性

python
def test_semantic_similarity():
    generator = EmbeddingGenerator()
    
    texts = [
        "人工智能",
        "机器学习",
        "深度学习",
        "苹果",
        "香蕉"
    ]
    
    embeddings = generator.generate_batch(texts)
    
    for i, text1 in enumerate(texts):
        for j, text2 in enumerate(texts):
            if i < j:
                similarity = cosine_similarity(embeddings[i], embeddings[j])
                print(f"{text1} vs {text2}: {similarity:.3f}")

def cosine_similarity(vec1: np.ndarray, vec2: np.ndarray) -> float:
    dot_product = np.dot(vec1, vec2)
    norm1 = np.linalg.norm(vec1)
    norm2 = np.linalg.norm(vec2)
    
    if norm1 == 0 or norm2 == 0:
        return 0.0
    
    return dot_product / (norm1 * norm2)

维度影响

python
def analyze_embedding_dimensions():
    dimensions = [128, 256, 512, 1024, 1536]
    
    for dim in dimensions:
        generator = EmbeddingGenerator()
        generator.dimension = dim
        
        embedding = generator.generate("test text")
        
        print(f"Dimension: {dim}")
        print(f"  - Norm: {np.linalg.norm(embedding):.3f}")
        print(f"  - Mean: {np.mean(embedding):.3f}")
        print(f"  - Std: {np.std(embedding):.3f}")

相似度计算

余弦相似度

python
class CosineSimilarity:
    @staticmethod
    def calculate(vec1: np.ndarray, vec2: np.ndarray) -> float:
        dot_product = np.dot(vec1, vec2)
        norm1 = np.linalg.norm(vec1)
        norm2 = np.linalg.norm(vec2)
        
        if norm1 == 0 or norm2 == 0:
            return 0.0
        
        return dot_product / (norm1 * norm2)
    
    @staticmethod
    def calculate_batch(query: np.ndarray, 
                       vectors: np.ndarray) -> np.ndarray:
        norms = np.linalg.norm(vectors, axis=1)
        query_norm = np.linalg.norm(query)
        
        if query_norm == 0 or np.any(norms == 0):
            return np.zeros(len(vectors))
        
        dot_products = np.dot(vectors, query)
        similarities = dot_products / (norms * query_norm)
        
        return similarities

欧氏距离

python
class EuclideanDistance:
    @staticmethod
    def calculate(vec1: np.ndarray, vec2: np.ndarray) -> float:
        return np.linalg.norm(vec1 - vec2)
    
    @staticmethod
    def calculate_batch(query: np.ndarray, 
                       vectors: np.ndarray) -> np.ndarray:
        distances = np.linalg.norm(vectors - query, axis=1)
        return distances

点积

python
class DotProduct:
    @staticmethod
    def calculate(vec1: np.ndarray, vec2: np.ndarray) -> float:
        return np.dot(vec1, vec2)
    
    @staticmethod
    def calculate_batch(query: np.ndarray, 
                       vectors: np.ndarray) -> np.ndarray:
        return np.dot(vectors, query)

相似度对比

python
def compare_similarity_metrics():
    vec1 = np.array([1.0, 2.0, 3.0])
    vec2 = np.array([1.1, 2.1, 3.1])
    vec3 = np.array([2.0, 4.0, 6.0])
    
    print("vec1 vs vec2:")
    print(f"  Cosine: {CosineSimilarity.calculate(vec1, vec2):.3f}")
    print(f"  Euclidean: {EuclideanDistance.calculate(vec1, vec2):.3f}")
    print(f"  Dot Product: {DotProduct.calculate(vec1, vec2):.3f}")
    
    print("\nvec1 vs vec3:")
    print(f"  Cosine: {CosineSimilarity.calculate(vec1, vec3):.3f}")
    print(f"  Euclidean: {EuclideanDistance.calculate(vec1, vec3):.3f}")
    print(f"  Dot Product: {DotProduct.calculate(vec1, vec3):.3f}")

向量索引

HNSW索引

python
import heapq
from typing import List, Tuple

class HNSWIndex:
    def __init__(self, dimension: int, m: int = 16, ef: int = 50):
        self.dimension = dimension
        self.m = m
        self.ef = ef
        self.entry_point = None
        self.graph = {}
    
    def add(self, vector_id: int, vector: np.ndarray):
        if len(vector) != self.dimension:
            raise ValueError(f"Vector dimension mismatch")
        
        node = {
            "id": vector_id,
            "vector": vector,
            "neighbors": []
        }
        
        if self.entry_point is None:
            self.entry_point = node
            self.graph[vector_id] = node
            return
        
        self._search_and_insert(node)
    
    def _search_and_insert(self, node: Dict):
        candidates = self._search_knn(
            node["vector"],
            self.ef
        )
        
        for candidate in candidates:
            if len(candidate["neighbors"]) < self.m:
                candidate["neighbors"].append(node)
                node["neighbors"].append(candidate)
        
        self.graph[node["id"]] = node
    
    def _search_knn(self, query: np.ndarray, 
                     k: int) -> List[Dict]:
        visited = set()
        candidates = []
        
        if self.entry_point is None:
            return candidates
        
        visited.add(self.entry_point["id"])
        heapq.heappush(candidates, (
            -self._distance(query, self.entry_point["vector"]),
            self.entry_point
        ))
        
        while candidates and len(visited) < k * 2:
            _, current = heapq.heappop(candidates)
            
            for neighbor in current["neighbors"]:
                if neighbor["id"] not in visited:
                    visited.add(neighbor["id"])
                    dist = self._distance(query, neighbor["vector"])
                    heapq.heappush(candidates, (-dist, neighbor))
        
        return [item[1] for item in heapq.nsmallest(k, candidates)]
    
    def _distance(self, vec1: np.ndarray, vec2: np.ndarray) -> float:
        return np.linalg.norm(vec1 - vec2)
    
    def search(self, query: np.ndarray, k: int) -> List[Tuple[int, float]]:
        candidates = self._search_knn(query, k)
        
        results = []
        for candidate in candidates[:k]:
            dist = self._distance(query, candidate["vector"])
            results.append((candidate["id"], dist))
        
        return results

IVF索引

python
from sklearn.cluster import KMeans

class IVFIndex:
    def __init__(self, dimension: int, n_clusters: int = 100):
        self.dimension = dimension
        self.n_clusters = n_clusters
        self.clusters = [[] for _ in range(n_clusters)]
        self.centroids = None
    
    def train(self, vectors: np.ndarray):
        kmeans = KMeans(n_clusters=self.n_clusters)
        kmeans.fit(vectors)
        self.centroids = kmeans.cluster_centers_
        
        for i, vector in enumerate(vectors):
            cluster_id = kmeans.predict([vector])[0]
            self.clusters[cluster_id].append((i, vector))
    
    def add(self, vector_id: int, vector: np.ndarray):
        if self.centroids is None:
            self.train(np.array([vector]))
        
        cluster_id = self._assign_cluster(vector)
        self.clusters[cluster_id].append((vector_id, vector))
    
    def _assign_cluster(self, vector: np.ndarray) -> int:
        distances = [
            np.linalg.norm(vector - centroid)
            for centroid in self.centroids
        ]
        return np.argmin(distances)
    
    def search(self, query: np.ndarray, 
              k: int, nprobe: int = 5) -> List[Tuple[int, float]]:
        cluster_distances = [
            (i, np.linalg.norm(query - centroid))
            for i, centroid in enumerate(self.centroids)
        ]
        
        cluster_distances.sort(key=lambda x: x[1])
        
        searched_clusters = [
            cluster_id for cluster_id, _ in cluster_distances[:nprobe]
        ]
        
        candidates = []
        for cluster_id in searched_clusters:
            candidates.extend(self.clusters[cluster_id])
        
        if not candidates:
            return []
        
        vectors = np.array([vec for _, vec in candidates])
        ids = [id for id, _ in candidates]
        
        distances = np.linalg.norm(vectors - query, axis=1)
        
        top_indices = np.argsort(distances)[:k]
        
        return [
            (ids[i], distances[i])
            for i in top_indices
        ]

ANN搜索算法

暴力搜索

python
class BruteForceSearch:
    def __init__(self, vectors: np.ndarray):
        self.vectors = vectors
    
    def search(self, query: np.ndarray, k: int) -> List[Tuple[int, float]]:
        distances = np.linalg.norm(self.vectors - query, axis=1)
        
        top_indices = np.argsort(distances)[:k]
        
        return [
            (int(idx), float(distances[idx]))
            for idx in top_indices
        ]

近似搜索

python
class ApproximateSearch:
    def __init__(self, index: HNSWIndex):
        self.index = index
    
    def search(self, query: np.ndarray, k: int) -> List[Tuple[int, float]]:
        return self.index.search(query, k)

搜索性能对比

python
def compare_search_performance():
    dimension = 1536
    n_vectors = 10000
    n_queries = 100
    
    vectors = np.random.randn(n_vectors, dimension)
    queries = np.random.randn(n_queries, dimension)
    
    brute_force = BruteForceSearch(vectors)
    hnsw_index = HNSWIndex(dimension)
    
    for i, vector in enumerate(vectors):
        hnsw_index.add(i, vector)
    
    print("Brute Force Search:")
    start_time = time.time()
    for query in queries:
        brute_force.search(query, 10)
    brute_time = time.time() - start_time
    print(f"  Time: {brute_time:.3f}s")
    
    print("\nHNSW Search:")
    start_time = time.time()
    for query in queries:
        hnsw_index.search(query, 10)
    hnsw_time = time.time() - start_time
    print(f"  Time: {hnsw_time:.3f}s")
    
    print(f"\nSpeedup: {brute_time / hnsw_time:.2f}x")

实践练习

练习1:实现简单的向量数据库

python
class SimpleVectorDatabase:
    def __init__(self, dimension: int = 1536):
        self.dimension = dimension
        self.vectors = []
        self.metadata = []
    
    def add(self, vector: np.ndarray, metadata: Dict):
        if len(vector) != self.dimension:
            raise ValueError(f"Vector dimension mismatch")
        
        self.vectors.append(vector)
        self.metadata.append(metadata)
    
    def search(self, query: np.ndarray, top_k: int = 5):
        if not self.vectors:
            return []
        
        similarities = []
        for i, vector in enumerate(self.vectors):
            similarity = cosine_similarity(query, vector)
            similarities.append((i, similarity))
        
        similarities.sort(key=lambda x: x[1], reverse=True)
        
        return [
            {
                "id": idx,
                "similarity": sim,
                "metadata": self.metadata[idx]
            }
            for idx, sim in similarities[:top_k]
        ]

练习2:实现带索引的向量数据库

python
class IndexedVectorDatabase:
    def __init__(self, dimension: int = 1536):
        self.dimension = dimension
        self.vectors = []
        self.metadata = []
        self.index = HNSWIndex(dimension)
    
    def add(self, vector: np.ndarray, metadata: Dict):
        if len(vector) != self.dimension:
            raise ValueError(f"Vector dimension mismatch")
        
        vector_id = len(self.vectors)
        self.vectors.append(vector)
        self.metadata.append(metadata)
        
        self.index.add(vector_id, vector)
    
    def search(self, query: np.ndarray, top_k: int = 5):
        results = self.index.search(query, top_k)
        
        return [
            {
                "id": vector_id,
                "distance": distance,
                "metadata": self.metadata[vector_id]
            }
            for vector_id, distance in results
        ]

总结

本节我们学习了向量数据库基础:

  1. 向量数据库的基本概念
  2. Embedding原理和特性
  3. 相似度计算方法(余弦、欧氏、点积)
  4. 向量索引技术(HNSW、IVF)
  5. ANN搜索算法(暴力搜索、近似搜索)

向量数据库是RAG系统的核心,理解这些基础知识对于构建高效的RAG系统至关重要。

参考资源