Appearance
第46天:向量数据库基础
学习目标
- 理解向量数据库的基本概念
- 掌握Embedding原理
- 学习相似度计算方法
- 了解向量索引技术
- 掌握ANN搜索算法
向量数据库简介
什么是向量数据库
向量数据库是专门用于存储、索引和查询高维向量的数据库系统。它是RAG系统的核心组件,用于快速检索相似的文档片段。
核心功能:
- 向量存储:高效存储高维向量
- 向量索引:构建索引加速查询
- 相似度搜索:快速找到最相似的向量
- 元数据过滤:支持基于元数据的过滤
与传统数据库的区别:
| 特性 | 传统数据库 | 向量数据库 |
|---|---|---|
| 数据类型 | 结构化数据 | 高维向量 |
| 查询方式 | 精确匹配 | 相似度搜索 |
| 索引类型 | B-tree, Hash | HNSW, IVF |
| 应用场景 | 事务处理 | AI/ML应用 |
Embedding原理
什么是Embedding
Embedding是将文本、图像等数据映射到高维向量空间的技术。相似的内容在向量空间中距离更近。
示例:
python
import numpy as np
class EmbeddingGenerator:
def __init__(self, model_name: str = "text-embedding-ada-002"):
self.model_name = model_name
self.dimension = 1536
def generate(self, text: str) -> np.ndarray:
words = text.lower().split()
word_vectors = []
for word in words:
vector = self._word_to_vector(word)
word_vectors.append(vector)
if word_vectors:
return np.mean(word_vectors, axis=0)
else:
return np.zeros(self.dimension)
def _word_to_vector(self, word: str) -> np.ndarray:
vector = np.zeros(self.dimension)
for i, char in enumerate(word[:self.dimension]):
vector[i] = ord(char) / 256.0
return vector
def generate_batch(self, texts: List[str]) -> np.ndarray:
return np.array([self.generate(text) for text in texts])Embedding特性
语义相似性:
python
def test_semantic_similarity():
generator = EmbeddingGenerator()
texts = [
"人工智能",
"机器学习",
"深度学习",
"苹果",
"香蕉"
]
embeddings = generator.generate_batch(texts)
for i, text1 in enumerate(texts):
for j, text2 in enumerate(texts):
if i < j:
similarity = cosine_similarity(embeddings[i], embeddings[j])
print(f"{text1} vs {text2}: {similarity:.3f}")
def cosine_similarity(vec1: np.ndarray, vec2: np.ndarray) -> float:
dot_product = np.dot(vec1, vec2)
norm1 = np.linalg.norm(vec1)
norm2 = np.linalg.norm(vec2)
if norm1 == 0 or norm2 == 0:
return 0.0
return dot_product / (norm1 * norm2)维度影响:
python
def analyze_embedding_dimensions():
dimensions = [128, 256, 512, 1024, 1536]
for dim in dimensions:
generator = EmbeddingGenerator()
generator.dimension = dim
embedding = generator.generate("test text")
print(f"Dimension: {dim}")
print(f" - Norm: {np.linalg.norm(embedding):.3f}")
print(f" - Mean: {np.mean(embedding):.3f}")
print(f" - Std: {np.std(embedding):.3f}")相似度计算
余弦相似度
python
class CosineSimilarity:
@staticmethod
def calculate(vec1: np.ndarray, vec2: np.ndarray) -> float:
dot_product = np.dot(vec1, vec2)
norm1 = np.linalg.norm(vec1)
norm2 = np.linalg.norm(vec2)
if norm1 == 0 or norm2 == 0:
return 0.0
return dot_product / (norm1 * norm2)
@staticmethod
def calculate_batch(query: np.ndarray,
vectors: np.ndarray) -> np.ndarray:
norms = np.linalg.norm(vectors, axis=1)
query_norm = np.linalg.norm(query)
if query_norm == 0 or np.any(norms == 0):
return np.zeros(len(vectors))
dot_products = np.dot(vectors, query)
similarities = dot_products / (norms * query_norm)
return similarities欧氏距离
python
class EuclideanDistance:
@staticmethod
def calculate(vec1: np.ndarray, vec2: np.ndarray) -> float:
return np.linalg.norm(vec1 - vec2)
@staticmethod
def calculate_batch(query: np.ndarray,
vectors: np.ndarray) -> np.ndarray:
distances = np.linalg.norm(vectors - query, axis=1)
return distances点积
python
class DotProduct:
@staticmethod
def calculate(vec1: np.ndarray, vec2: np.ndarray) -> float:
return np.dot(vec1, vec2)
@staticmethod
def calculate_batch(query: np.ndarray,
vectors: np.ndarray) -> np.ndarray:
return np.dot(vectors, query)相似度对比
python
def compare_similarity_metrics():
vec1 = np.array([1.0, 2.0, 3.0])
vec2 = np.array([1.1, 2.1, 3.1])
vec3 = np.array([2.0, 4.0, 6.0])
print("vec1 vs vec2:")
print(f" Cosine: {CosineSimilarity.calculate(vec1, vec2):.3f}")
print(f" Euclidean: {EuclideanDistance.calculate(vec1, vec2):.3f}")
print(f" Dot Product: {DotProduct.calculate(vec1, vec2):.3f}")
print("\nvec1 vs vec3:")
print(f" Cosine: {CosineSimilarity.calculate(vec1, vec3):.3f}")
print(f" Euclidean: {EuclideanDistance.calculate(vec1, vec3):.3f}")
print(f" Dot Product: {DotProduct.calculate(vec1, vec3):.3f}")向量索引
HNSW索引
python
import heapq
from typing import List, Tuple
class HNSWIndex:
def __init__(self, dimension: int, m: int = 16, ef: int = 50):
self.dimension = dimension
self.m = m
self.ef = ef
self.entry_point = None
self.graph = {}
def add(self, vector_id: int, vector: np.ndarray):
if len(vector) != self.dimension:
raise ValueError(f"Vector dimension mismatch")
node = {
"id": vector_id,
"vector": vector,
"neighbors": []
}
if self.entry_point is None:
self.entry_point = node
self.graph[vector_id] = node
return
self._search_and_insert(node)
def _search_and_insert(self, node: Dict):
candidates = self._search_knn(
node["vector"],
self.ef
)
for candidate in candidates:
if len(candidate["neighbors"]) < self.m:
candidate["neighbors"].append(node)
node["neighbors"].append(candidate)
self.graph[node["id"]] = node
def _search_knn(self, query: np.ndarray,
k: int) -> List[Dict]:
visited = set()
candidates = []
if self.entry_point is None:
return candidates
visited.add(self.entry_point["id"])
heapq.heappush(candidates, (
-self._distance(query, self.entry_point["vector"]),
self.entry_point
))
while candidates and len(visited) < k * 2:
_, current = heapq.heappop(candidates)
for neighbor in current["neighbors"]:
if neighbor["id"] not in visited:
visited.add(neighbor["id"])
dist = self._distance(query, neighbor["vector"])
heapq.heappush(candidates, (-dist, neighbor))
return [item[1] for item in heapq.nsmallest(k, candidates)]
def _distance(self, vec1: np.ndarray, vec2: np.ndarray) -> float:
return np.linalg.norm(vec1 - vec2)
def search(self, query: np.ndarray, k: int) -> List[Tuple[int, float]]:
candidates = self._search_knn(query, k)
results = []
for candidate in candidates[:k]:
dist = self._distance(query, candidate["vector"])
results.append((candidate["id"], dist))
return resultsIVF索引
python
from sklearn.cluster import KMeans
class IVFIndex:
def __init__(self, dimension: int, n_clusters: int = 100):
self.dimension = dimension
self.n_clusters = n_clusters
self.clusters = [[] for _ in range(n_clusters)]
self.centroids = None
def train(self, vectors: np.ndarray):
kmeans = KMeans(n_clusters=self.n_clusters)
kmeans.fit(vectors)
self.centroids = kmeans.cluster_centers_
for i, vector in enumerate(vectors):
cluster_id = kmeans.predict([vector])[0]
self.clusters[cluster_id].append((i, vector))
def add(self, vector_id: int, vector: np.ndarray):
if self.centroids is None:
self.train(np.array([vector]))
cluster_id = self._assign_cluster(vector)
self.clusters[cluster_id].append((vector_id, vector))
def _assign_cluster(self, vector: np.ndarray) -> int:
distances = [
np.linalg.norm(vector - centroid)
for centroid in self.centroids
]
return np.argmin(distances)
def search(self, query: np.ndarray,
k: int, nprobe: int = 5) -> List[Tuple[int, float]]:
cluster_distances = [
(i, np.linalg.norm(query - centroid))
for i, centroid in enumerate(self.centroids)
]
cluster_distances.sort(key=lambda x: x[1])
searched_clusters = [
cluster_id for cluster_id, _ in cluster_distances[:nprobe]
]
candidates = []
for cluster_id in searched_clusters:
candidates.extend(self.clusters[cluster_id])
if not candidates:
return []
vectors = np.array([vec for _, vec in candidates])
ids = [id for id, _ in candidates]
distances = np.linalg.norm(vectors - query, axis=1)
top_indices = np.argsort(distances)[:k]
return [
(ids[i], distances[i])
for i in top_indices
]ANN搜索算法
暴力搜索
python
class BruteForceSearch:
def __init__(self, vectors: np.ndarray):
self.vectors = vectors
def search(self, query: np.ndarray, k: int) -> List[Tuple[int, float]]:
distances = np.linalg.norm(self.vectors - query, axis=1)
top_indices = np.argsort(distances)[:k]
return [
(int(idx), float(distances[idx]))
for idx in top_indices
]近似搜索
python
class ApproximateSearch:
def __init__(self, index: HNSWIndex):
self.index = index
def search(self, query: np.ndarray, k: int) -> List[Tuple[int, float]]:
return self.index.search(query, k)搜索性能对比
python
def compare_search_performance():
dimension = 1536
n_vectors = 10000
n_queries = 100
vectors = np.random.randn(n_vectors, dimension)
queries = np.random.randn(n_queries, dimension)
brute_force = BruteForceSearch(vectors)
hnsw_index = HNSWIndex(dimension)
for i, vector in enumerate(vectors):
hnsw_index.add(i, vector)
print("Brute Force Search:")
start_time = time.time()
for query in queries:
brute_force.search(query, 10)
brute_time = time.time() - start_time
print(f" Time: {brute_time:.3f}s")
print("\nHNSW Search:")
start_time = time.time()
for query in queries:
hnsw_index.search(query, 10)
hnsw_time = time.time() - start_time
print(f" Time: {hnsw_time:.3f}s")
print(f"\nSpeedup: {brute_time / hnsw_time:.2f}x")实践练习
练习1:实现简单的向量数据库
python
class SimpleVectorDatabase:
def __init__(self, dimension: int = 1536):
self.dimension = dimension
self.vectors = []
self.metadata = []
def add(self, vector: np.ndarray, metadata: Dict):
if len(vector) != self.dimension:
raise ValueError(f"Vector dimension mismatch")
self.vectors.append(vector)
self.metadata.append(metadata)
def search(self, query: np.ndarray, top_k: int = 5):
if not self.vectors:
return []
similarities = []
for i, vector in enumerate(self.vectors):
similarity = cosine_similarity(query, vector)
similarities.append((i, similarity))
similarities.sort(key=lambda x: x[1], reverse=True)
return [
{
"id": idx,
"similarity": sim,
"metadata": self.metadata[idx]
}
for idx, sim in similarities[:top_k]
]练习2:实现带索引的向量数据库
python
class IndexedVectorDatabase:
def __init__(self, dimension: int = 1536):
self.dimension = dimension
self.vectors = []
self.metadata = []
self.index = HNSWIndex(dimension)
def add(self, vector: np.ndarray, metadata: Dict):
if len(vector) != self.dimension:
raise ValueError(f"Vector dimension mismatch")
vector_id = len(self.vectors)
self.vectors.append(vector)
self.metadata.append(metadata)
self.index.add(vector_id, vector)
def search(self, query: np.ndarray, top_k: int = 5):
results = self.index.search(query, top_k)
return [
{
"id": vector_id,
"distance": distance,
"metadata": self.metadata[vector_id]
}
for vector_id, distance in results
]总结
本节我们学习了向量数据库基础:
- 向量数据库的基本概念
- Embedding原理和特性
- 相似度计算方法(余弦、欧氏、点积)
- 向量索引技术(HNSW、IVF)
- ANN搜索算法(暴力搜索、近似搜索)
向量数据库是RAG系统的核心,理解这些基础知识对于构建高效的RAG系统至关重要。
