Appearance
第48天:文档处理与切片
学习目标
- 掌握文档加载技术
- 学习文本预处理方法
- 理解文档切片策略
- 掌握元数据提取
- 学习文档索引方法
文档加载
基础文档加载器
python
from typing import List, Dict
from pathlib import Path
class DocumentLoader:
def __init__(self):
self.loaders = {
".txt": self._load_text,
".md": self._load_markdown,
".pdf": self._load_pdf,
".docx": self._load_docx,
".html": self._load_html
}
def load(self, file_path: str) -> Dict:
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
extension = path.suffix.lower()
if extension not in self.loaders:
raise ValueError(f"Unsupported file type: {extension}")
loader = self.loaders[extension]
content = loader(file_path)
return {
"content": content,
"metadata": self._extract_metadata(path)
}
def _load_text(self, file_path: str) -> str:
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
def _load_markdown(self, file_path: str) -> str:
return self._load_text(file_path)
def _load_pdf(self, file_path: str) -> str:
try:
import PyPDF2
reader = PyPDF2.PdfReader(file_path)
text = ""
for page in reader.pages:
text += page.extract_text() + "\n"
return text
except ImportError:
raise ImportError("PyPDF2 not installed. Install with: pip install PyPDF2")
def _load_docx(self, file_path: str) -> str:
try:
from docx import Document
doc = Document(file_path)
text = "\n".join([para.text for para in doc.paragraphs])
return text
except ImportError:
raise ImportError("python-docx not installed. Install with: pip install python-docx")
def _load_html(self, file_path: str) -> str:
try:
from bs4 import BeautifulSoup
with open(file_path, 'r', encoding='utf-8') as f:
soup = BeautifulSoup(f.read(), 'html.parser')
return soup.get_text()
except ImportError:
raise ImportError("beautifulsoup4 not installed. Install with: pip install beautifulsoup4")
def _extract_metadata(self, path: Path) -> Dict:
return {
"filename": path.name,
"filepath": str(path),
"extension": path.suffix,
"size": path.stat().st_size,
"created": path.stat().st_ctime,
"modified": path.stat().st_mtime
}批量文档加载
python
class BatchDocumentLoader:
def __init__(self):
self.loader = DocumentLoader()
def load_directory(self, directory: str,
extensions: List[str] = None) -> List[Dict]:
path = Path(directory)
if not path.exists():
raise FileNotFoundError(f"Directory not found: {directory}")
if extensions is None:
extensions = [".txt", ".md", ".pdf", ".docx", ".html"]
documents = []
for file_path in path.rglob("*"):
if file_path.suffix.lower() in extensions:
try:
doc = self.loader.load(str(file_path))
documents.append(doc)
except Exception as e:
print(f"Error loading {file_path}: {e}")
return documents
def load_files(self, file_paths: List[str]) -> List[Dict]:
documents = []
for file_path in file_paths:
try:
doc = self.loader.load(file_path)
documents.append(doc)
except Exception as e:
print(f"Error loading {file_path}: {e}")
return documents文本预处理
基础预处理
python
import re
from typing import List
class TextPreprocessor:
def __init__(self):
self.cleaners = [
self._remove_extra_whitespace,
self._remove_special_chars,
self._normalize_quotes,
self._normalize_dashes
]
def preprocess(self, text: str) -> str:
for cleaner in self.cleaners:
text = cleaner(text)
return text
def _remove_extra_whitespace(self, text: str) -> str:
text = re.sub(r'\s+', ' ', text)
text = text.strip()
return text
def _remove_special_chars(self, text: str) -> str:
text = re.sub(r'[^\w\s\.,!?;:()\-\']', '', text)
return text
def _normalize_quotes(self, text: str) -> str:
text = re.sub(r'["'"]', '"', text)
text = re.sub(r"['']", "'", text)
return text
def _normalize_dashes(self, text: str) -> str:
text = re.sub(r'[-–—]', '-', text)
return text高级预处理
python
class AdvancedTextPreprocessor(TextPreprocessor):
def __init__(self):
super().__init__()
self.stop_words = self._load_stop_words()
def preprocess(self, text: str) -> str:
text = super().preprocess(text)
text = self._lowercase(text)
text = self._remove_stop_words(text)
text = self._lemmatize(text)
return text
def _lowercase(self, text: str) -> str:
return text.lower()
def _remove_stop_words(self, text: str) -> str:
words = text.split()
filtered_words = [
word for word in words
if word not in self.stop_words
]
return " ".join(filtered_words)
def _lemmatize(self, text: str) -> str:
try:
import nltk
from nltk.stem import WordNetLemmatizer
lemmatizer = WordNetLemmatizer()
words = text.split()
lemmatized_words = [
lemmatizer.lemmatize(word)
for word in words
]
return " ".join(lemmatized_words)
except ImportError:
return text
def _load_stop_words(self) -> set:
return {
'a', 'an', 'the', 'and', 'or', 'but',
'in', 'on', 'at', 'to', 'for', 'of',
'with', 'by', 'is', 'are', 'was', 'were'
}文档切片
固定大小切片
python
class FixedSizeSplitter:
def __init__(self, chunk_size: int = 1000,
overlap: int = 200):
self.chunk_size = chunk_size
self.overlap = overlap
def split(self, text: str) -> List[Dict]:
chunks = []
start = 0
while start < len(text):
end = start + self.chunk_size
chunk_text = text[start:end]
chunks.append({
"content": chunk_text,
"metadata": {
"start": start,
"end": end,
"chunk_id": len(chunks)
}
})
start = end - self.overlap
return chunks段落切片
python
class ParagraphSplitter:
def __init__(self, max_paragraphs: int = 3):
self.max_paragraphs = max_paragraphs
def split(self, text: str) -> List[Dict]:
paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
chunks = []
for i in range(0, len(paragraphs), self.max_paragraphs):
chunk_paragraphs = paragraphs[i:i + self.max_paragraphs]
chunk_text = '\n\n'.join(chunk_paragraphs)
chunks.append({
"content": chunk_text,
"metadata": {
"start": i,
"end": min(i + self.max_paragraphs, len(paragraphs)),
"chunk_id": len(chunks),
"type": "paragraph"
}
})
return chunks语义切片
python
class SemanticSplitter:
def __init__(self, max_chunk_size: int = 1000):
self.max_chunk_size = max_chunk_size
def split(self, text: str) -> List[Dict]:
sentences = self._split_sentences(text)
chunks = []
current_chunk = []
current_size = 0
for sentence in sentences:
sentence_size = len(sentence)
if current_size + sentence_size > self.max_chunk_size and current_chunk:
chunk_text = ' '.join(current_chunk)
chunks.append({
"content": chunk_text,
"metadata": {
"chunk_id": len(chunks),
"type": "semantic"
}
})
current_chunk = []
current_size = 0
current_chunk.append(sentence)
current_size += sentence_size
if current_chunk:
chunk_text = ' '.join(current_chunk)
chunks.append({
"content": chunk_text,
"metadata": {
"chunk_id": len(chunks),
"type": "semantic"
}
})
return chunks
def _split_sentences(self, text: str) -> List[str]:
sentences = re.split(r'[.!?]+', text)
return [s.strip() for s in sentences if s.strip()]递归切片
python
class RecursiveSplitter:
def __init__(self, separators: List[str] = None):
self.separators = separators or ['\n\n', '\n', '. ', ' ', '']
def split(self, text: str) -> List[Dict]:
return self._recursive_split(text, self.separators)
def _recursive_split(self, text: str,
separators: List[str]) -> List[Dict]:
if not separators:
return [{
"content": text,
"metadata": {"chunk_id": 0}
}]
separator = separators[0]
remaining_separators = separators[1:]
if separator not in text:
return self._recursive_split(text, remaining_separators)
splits = text.split(separator)
chunks = []
for i, split in enumerate(splits):
if split.strip():
chunks.append({
"content": split.strip(),
"metadata": {
"chunk_id": len(chunks),
"separator": separator
}
})
return chunks元数据提取
基础元数据
python
class MetadataExtractor:
def __init__(self):
pass
def extract(self, document: Dict) -> Dict:
metadata = {}
metadata.update(self._extract_basic_metadata(document))
metadata.update(self._extract_content_metadata(document))
metadata.update(self._extract_structure_metadata(document))
return metadata
def _extract_basic_metadata(self, document: Dict) -> Dict:
return {
"filename": document["metadata"].get("filename", ""),
"extension": document["metadata"].get("extension", ""),
"size": document["metadata"].get("size", 0),
"created": document["metadata"].get("created", ""),
"modified": document["metadata"].get("modified", "")
}
def _extract_content_metadata(self, document: Dict) -> Dict:
content = document["content"]
return {
"length": len(content),
"word_count": len(content.split()),
"sentence_count": len([s for s in content.split('.') if s.strip()]),
"paragraph_count": len([p for p in content.split('\n\n') if p.strip()])
}
def _extract_structure_metadata(self, document: Dict) -> Dict:
content = document["content"]
return {
"has_headings": '#' in content,
"has_lists": any(marker in content for marker in ['* ', '- ', '1. ']),
"has_code": '```' in content,
"has_links': '[' in content and '](' in content
}高级元数据
python
class AdvancedMetadataExtractor(MetadataExtractor):
def __init__(self):
super().__init__()
self.keyword_extractor = KeywordExtractor()
self.entity_extractor = EntityExtractor()
def extract(self, document: Dict) -> Dict:
metadata = super().extract(document)
metadata.update(self._extract_keywords(document))
metadata.update(self._extract_entities(document))
metadata.update(self._extract_topics(document))
return metadata
def _extract_keywords(self, document: Dict) -> Dict:
keywords = self.keyword_extractor.extract(document["content"])
return {
"keywords": keywords[:10]
}
def _extract_entities(self, document: Dict) -> Dict:
entities = self.entity_extractor.extract(document["content"])
return {
"entities": entities
}
def _extract_topics(self, document: Dict) -> Dict:
content = document["content"]
topics = []
if "machine learning" in content.lower():
topics.append("machine learning")
if "deep learning" in content.lower():
topics.append("deep learning")
if "nlp" in content.lower() or "natural language" in content.lower():
topics.append("nlp")
return {
"topics": topics
}文档索引
索引构建
python
class DocumentIndexer:
def __init__(self, embedding_generator, vector_db):
self.embedding_generator = embedding_generator
self.vector_db = vector_db
self.indexed_documents = {}
def index_documents(self, documents: List[Dict]) -> Dict:
results = []
for i, document in enumerate(documents):
chunks = self._split_document(document)
for chunk in chunks:
embedding = self.embedding_generator.generate(chunk["content"])
chunk_metadata = {
**document["metadata"],
**chunk["metadata"],
"content": chunk["content"]
}
vector_id = self.vector_db.add(embedding, chunk_metadata)
results.append({
"document_id": i,
"chunk_id": chunk["metadata"]["chunk_id"],
"vector_id": vector_id
})
self.indexed_documents = {
i: doc for i, doc in enumerate(documents)
}
return {
"status": "success",
"indexed_chunks": len(results),
"indexed_documents": len(documents)
}
def _split_document(self, document: Dict) -> List[Dict]:
splitter = RecursiveSplitter()
return splitter.split(document["content"])
def update_document(self, document_id: int,
new_document: Dict) -> Dict:
old_document = self.indexed_documents.get(document_id)
if old_document:
self._delete_document(document_id)
return self.index_documents([new_document])
def _delete_document(self, document_id: int):
pass索引查询
python
class DocumentRetriever:
def __init__(self, vector_db, embedding_generator):
self.vector_db = vector_db
self.embedding_generator = embedding_generator
def retrieve(self, query: str, top_k: int = 5) -> List[Dict]:
query_embedding = self.embedding_generator.generate(query)
results = self.vector_db.search(query_embedding, top_k)
return [
{
"content": result["metadata"]["content"],
"similarity": result["similarity"],
"metadata": result["metadata"]
}
for result in results
]
def retrieve_with_filter(self, query: str,
filters: Dict, top_k: int = 5) -> List[Dict]:
query_embedding = self.embedding_generator.generate(query)
results = self.vector_db.search(query_embedding, top_k * 2)
filtered_results = [
result for result in results
if self._matches_filters(result["metadata"], filters)
]
return filtered_results[:top_k]
def _matches_filters(self, metadata: Dict,
filters: Dict) -> bool:
for key, value in filters.items():
if key not in metadata:
return False
if isinstance(value, list):
if metadata[key] not in value:
return False
elif metadata[key] != value:
return False
return True实践练习
练习1:实现完整的文档处理流程
python
class DocumentProcessor:
def __init__(self):
self.loader = DocumentLoader()
self.preprocessor = TextPreprocessor()
self.splitter = RecursiveSplitter()
self.metadata_extractor = MetadataExtractor()
def process_file(self, file_path: str) -> List[Dict]:
document = self.loader.load(file_path)
document["content"] = self.preprocessor.preprocess(
document["content"]
)
chunks = self.splitter.split(document["content"])
processed_chunks = []
for chunk in chunks:
chunk["metadata"] = {
**document["metadata"],
**chunk["metadata"]
}
chunk["metadata"].update(
self.metadata_extractor.extract(chunk)
)
processed_chunks.append(chunk)
return processed_chunks练习2:实现批量文档处理
python
class BatchDocumentProcessor:
def __init__(self):
self.batch_loader = BatchDocumentLoader()
self.processor = DocumentProcessor()
def process_directory(self, directory: str) -> List[Dict]:
documents = self.batch_loader.load_directory(directory)
all_chunks = []
for document in documents:
chunks = self.processor.process_file(
document["metadata"]["filepath"]
)
all_chunks.extend(chunks)
return all_chunks总结
本节我们学习了文档处理与切片:
- 文档加载技术(基础加载器、批量加载)
- 文本预处理方法(基础、高级)
- 文档切片策略(固定大小、段落、语义、递归)
- 元数据提取(基础、高级)
- 文档索引(构建、查询)
文档处理是RAG系统的基础,掌握这些技术对于构建高效的RAG系统至关重要。
