Skip to content

第48天:文档处理与切片

学习目标

  • 掌握文档加载技术
  • 学习文本预处理方法
  • 理解文档切片策略
  • 掌握元数据提取
  • 学习文档索引方法

文档加载

基础文档加载器

python
from typing import List, Dict
from pathlib import Path

class DocumentLoader:
    def __init__(self):
        self.loaders = {
            ".txt": self._load_text,
            ".md": self._load_markdown,
            ".pdf": self._load_pdf,
            ".docx": self._load_docx,
            ".html": self._load_html
        }
    
    def load(self, file_path: str) -> Dict:
        path = Path(file_path)
        
        if not path.exists():
            raise FileNotFoundError(f"File not found: {file_path}")
        
        extension = path.suffix.lower()
        
        if extension not in self.loaders:
            raise ValueError(f"Unsupported file type: {extension}")
        
        loader = self.loaders[extension]
        content = loader(file_path)
        
        return {
            "content": content,
            "metadata": self._extract_metadata(path)
        }
    
    def _load_text(self, file_path: str) -> str:
        with open(file_path, 'r', encoding='utf-8') as f:
            return f.read()
    
    def _load_markdown(self, file_path: str) -> str:
        return self._load_text(file_path)
    
    def _load_pdf(self, file_path: str) -> str:
        try:
            import PyPDF2
            reader = PyPDF2.PdfReader(file_path)
            text = ""
            
            for page in reader.pages:
                text += page.extract_text() + "\n"
            
            return text
        except ImportError:
            raise ImportError("PyPDF2 not installed. Install with: pip install PyPDF2")
    
    def _load_docx(self, file_path: str) -> str:
        try:
            from docx import Document
            doc = Document(file_path)
            text = "\n".join([para.text for para in doc.paragraphs])
            return text
        except ImportError:
            raise ImportError("python-docx not installed. Install with: pip install python-docx")
    
    def _load_html(self, file_path: str) -> str:
        try:
            from bs4 import BeautifulSoup
            with open(file_path, 'r', encoding='utf-8') as f:
                soup = BeautifulSoup(f.read(), 'html.parser')
                return soup.get_text()
        except ImportError:
            raise ImportError("beautifulsoup4 not installed. Install with: pip install beautifulsoup4")
    
    def _extract_metadata(self, path: Path) -> Dict:
        return {
            "filename": path.name,
            "filepath": str(path),
            "extension": path.suffix,
            "size": path.stat().st_size,
            "created": path.stat().st_ctime,
            "modified": path.stat().st_mtime
        }

批量文档加载

python
class BatchDocumentLoader:
    def __init__(self):
        self.loader = DocumentLoader()
    
    def load_directory(self, directory: str, 
                     extensions: List[str] = None) -> List[Dict]:
        path = Path(directory)
        
        if not path.exists():
            raise FileNotFoundError(f"Directory not found: {directory}")
        
        if extensions is None:
            extensions = [".txt", ".md", ".pdf", ".docx", ".html"]
        
        documents = []
        
        for file_path in path.rglob("*"):
            if file_path.suffix.lower() in extensions:
                try:
                    doc = self.loader.load(str(file_path))
                    documents.append(doc)
                except Exception as e:
                    print(f"Error loading {file_path}: {e}")
        
        return documents
    
    def load_files(self, file_paths: List[str]) -> List[Dict]:
        documents = []
        
        for file_path in file_paths:
            try:
                doc = self.loader.load(file_path)
                documents.append(doc)
            except Exception as e:
                print(f"Error loading {file_path}: {e}")
        
        return documents

文本预处理

基础预处理

python
import re
from typing import List

class TextPreprocessor:
    def __init__(self):
        self.cleaners = [
            self._remove_extra_whitespace,
            self._remove_special_chars,
            self._normalize_quotes,
            self._normalize_dashes
        ]
    
    def preprocess(self, text: str) -> str:
        for cleaner in self.cleaners:
            text = cleaner(text)
        
        return text
    
    def _remove_extra_whitespace(self, text: str) -> str:
        text = re.sub(r'\s+', ' ', text)
        text = text.strip()
        return text
    
    def _remove_special_chars(self, text: str) -> str:
        text = re.sub(r'[^\w\s\.,!?;:()\-\']', '', text)
        return text
    
    def _normalize_quotes(self, text: str) -> str:
        text = re.sub(r'["'"]', '"', text)
        text = re.sub(r"['']", "'", text)
        return text
    
    def _normalize_dashes(self, text: str) -> str:
        text = re.sub(r'[-–—]', '-', text)
        return text

高级预处理

python
class AdvancedTextPreprocessor(TextPreprocessor):
    def __init__(self):
        super().__init__()
        self.stop_words = self._load_stop_words()
    
    def preprocess(self, text: str) -> str:
        text = super().preprocess(text)
        text = self._lowercase(text)
        text = self._remove_stop_words(text)
        text = self._lemmatize(text)
        
        return text
    
    def _lowercase(self, text: str) -> str:
        return text.lower()
    
    def _remove_stop_words(self, text: str) -> str:
        words = text.split()
        filtered_words = [
            word for word in words
            if word not in self.stop_words
        ]
        
        return " ".join(filtered_words)
    
    def _lemmatize(self, text: str) -> str:
        try:
            import nltk
            from nltk.stem import WordNetLemmatizer
            
            lemmatizer = WordNetLemmatizer()
            words = text.split()
            lemmatized_words = [
                lemmatizer.lemmatize(word)
                for word in words
            ]
            
            return " ".join(lemmatized_words)
        except ImportError:
            return text
    
    def _load_stop_words(self) -> set:
        return {
            'a', 'an', 'the', 'and', 'or', 'but',
            'in', 'on', 'at', 'to', 'for', 'of',
            'with', 'by', 'is', 'are', 'was', 'were'
        }

文档切片

固定大小切片

python
class FixedSizeSplitter:
    def __init__(self, chunk_size: int = 1000, 
                 overlap: int = 200):
        self.chunk_size = chunk_size
        self.overlap = overlap
    
    def split(self, text: str) -> List[Dict]:
        chunks = []
        start = 0
        
        while start < len(text):
            end = start + self.chunk_size
            chunk_text = text[start:end]
            
            chunks.append({
                "content": chunk_text,
                "metadata": {
                    "start": start,
                    "end": end,
                    "chunk_id": len(chunks)
                }
            })
            
            start = end - self.overlap
        
        return chunks

段落切片

python
class ParagraphSplitter:
    def __init__(self, max_paragraphs: int = 3):
        self.max_paragraphs = max_paragraphs
    
    def split(self, text: str) -> List[Dict]:
        paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
        
        chunks = []
        for i in range(0, len(paragraphs), self.max_paragraphs):
            chunk_paragraphs = paragraphs[i:i + self.max_paragraphs]
            chunk_text = '\n\n'.join(chunk_paragraphs)
            
            chunks.append({
                "content": chunk_text,
                "metadata": {
                    "start": i,
                    "end": min(i + self.max_paragraphs, len(paragraphs)),
                    "chunk_id": len(chunks),
                    "type": "paragraph"
                }
            })
        
        return chunks

语义切片

python
class SemanticSplitter:
    def __init__(self, max_chunk_size: int = 1000):
        self.max_chunk_size = max_chunk_size
    
    def split(self, text: str) -> List[Dict]:
        sentences = self._split_sentences(text)
        
        chunks = []
        current_chunk = []
        current_size = 0
        
        for sentence in sentences:
            sentence_size = len(sentence)
            
            if current_size + sentence_size > self.max_chunk_size and current_chunk:
                chunk_text = ' '.join(current_chunk)
                chunks.append({
                    "content": chunk_text,
                    "metadata": {
                        "chunk_id": len(chunks),
                        "type": "semantic"
                    }
                })
                
                current_chunk = []
                current_size = 0
            
            current_chunk.append(sentence)
            current_size += sentence_size
        
        if current_chunk:
            chunk_text = ' '.join(current_chunk)
            chunks.append({
                "content": chunk_text,
                "metadata": {
                    "chunk_id": len(chunks),
                    "type": "semantic"
                }
            })
        
        return chunks
    
    def _split_sentences(self, text: str) -> List[str]:
        sentences = re.split(r'[.!?]+', text)
        return [s.strip() for s in sentences if s.strip()]

递归切片

python
class RecursiveSplitter:
    def __init__(self, separators: List[str] = None):
        self.separators = separators or ['\n\n', '\n', '. ', ' ', '']
    
    def split(self, text: str) -> List[Dict]:
        return self._recursive_split(text, self.separators)
    
    def _recursive_split(self, text: str, 
                        separators: List[str]) -> List[Dict]:
        if not separators:
            return [{
                "content": text,
                "metadata": {"chunk_id": 0}
            }]
        
        separator = separators[0]
        remaining_separators = separators[1:]
        
        if separator not in text:
            return self._recursive_split(text, remaining_separators)
        
        splits = text.split(separator)
        
        chunks = []
        for i, split in enumerate(splits):
            if split.strip():
                chunks.append({
                    "content": split.strip(),
                    "metadata": {
                        "chunk_id": len(chunks),
                        "separator": separator
                    }
                })
        
        return chunks

元数据提取

基础元数据

python
class MetadataExtractor:
    def __init__(self):
        pass
    
    def extract(self, document: Dict) -> Dict:
        metadata = {}
        
        metadata.update(self._extract_basic_metadata(document))
        metadata.update(self._extract_content_metadata(document))
        metadata.update(self._extract_structure_metadata(document))
        
        return metadata
    
    def _extract_basic_metadata(self, document: Dict) -> Dict:
        return {
            "filename": document["metadata"].get("filename", ""),
            "extension": document["metadata"].get("extension", ""),
            "size": document["metadata"].get("size", 0),
            "created": document["metadata"].get("created", ""),
            "modified": document["metadata"].get("modified", "")
        }
    
    def _extract_content_metadata(self, document: Dict) -> Dict:
        content = document["content"]
        
        return {
            "length": len(content),
            "word_count": len(content.split()),
            "sentence_count": len([s for s in content.split('.') if s.strip()]),
            "paragraph_count": len([p for p in content.split('\n\n') if p.strip()])
        }
    
    def _extract_structure_metadata(self, document: Dict) -> Dict:
        content = document["content"]
        
        return {
            "has_headings": '#' in content,
            "has_lists": any(marker in content for marker in ['* ', '- ', '1. ']),
            "has_code": '```' in content,
            "has_links': '[' in content and '](' in content
        }

高级元数据

python
class AdvancedMetadataExtractor(MetadataExtractor):
    def __init__(self):
        super().__init__()
        self.keyword_extractor = KeywordExtractor()
        self.entity_extractor = EntityExtractor()
    
    def extract(self, document: Dict) -> Dict:
        metadata = super().extract(document)
        
        metadata.update(self._extract_keywords(document))
        metadata.update(self._extract_entities(document))
        metadata.update(self._extract_topics(document))
        
        return metadata
    
    def _extract_keywords(self, document: Dict) -> Dict:
        keywords = self.keyword_extractor.extract(document["content"])
        
        return {
            "keywords": keywords[:10]
        }
    
    def _extract_entities(self, document: Dict) -> Dict:
        entities = self.entity_extractor.extract(document["content"])
        
        return {
            "entities": entities
        }
    
    def _extract_topics(self, document: Dict) -> Dict:
        content = document["content"]
        
        topics = []
        if "machine learning" in content.lower():
            topics.append("machine learning")
        if "deep learning" in content.lower():
            topics.append("deep learning")
        if "nlp" in content.lower() or "natural language" in content.lower():
            topics.append("nlp")
        
        return {
            "topics": topics
        }

文档索引

索引构建

python
class DocumentIndexer:
    def __init__(self, embedding_generator, vector_db):
        self.embedding_generator = embedding_generator
        self.vector_db = vector_db
        self.indexed_documents = {}
    
    def index_documents(self, documents: List[Dict]) -> Dict:
        results = []
        
        for i, document in enumerate(documents):
            chunks = self._split_document(document)
            
            for chunk in chunks:
                embedding = self.embedding_generator.generate(chunk["content"])
                
                chunk_metadata = {
                    **document["metadata"],
                    **chunk["metadata"],
                    "content": chunk["content"]
                }
                
                vector_id = self.vector_db.add(embedding, chunk_metadata)
                
                results.append({
                    "document_id": i,
                    "chunk_id": chunk["metadata"]["chunk_id"],
                    "vector_id": vector_id
                })
        
        self.indexed_documents = {
            i: doc for i, doc in enumerate(documents)
        }
        
        return {
            "status": "success",
            "indexed_chunks": len(results),
            "indexed_documents": len(documents)
        }
    
    def _split_document(self, document: Dict) -> List[Dict]:
        splitter = RecursiveSplitter()
        return splitter.split(document["content"])
    
    def update_document(self, document_id: int, 
                       new_document: Dict) -> Dict:
        old_document = self.indexed_documents.get(document_id)
        
        if old_document:
            self._delete_document(document_id)
        
        return self.index_documents([new_document])
    
    def _delete_document(self, document_id: int):
        pass

索引查询

python
class DocumentRetriever:
    def __init__(self, vector_db, embedding_generator):
        self.vector_db = vector_db
        self.embedding_generator = embedding_generator
    
    def retrieve(self, query: str, top_k: int = 5) -> List[Dict]:
        query_embedding = self.embedding_generator.generate(query)
        
        results = self.vector_db.search(query_embedding, top_k)
        
        return [
            {
                "content": result["metadata"]["content"],
                "similarity": result["similarity"],
                "metadata": result["metadata"]
            }
            for result in results
        ]
    
    def retrieve_with_filter(self, query: str, 
                           filters: Dict, top_k: int = 5) -> List[Dict]:
        query_embedding = self.embedding_generator.generate(query)
        
        results = self.vector_db.search(query_embedding, top_k * 2)
        
        filtered_results = [
            result for result in results
            if self._matches_filters(result["metadata"], filters)
        ]
        
        return filtered_results[:top_k]
    
    def _matches_filters(self, metadata: Dict, 
                        filters: Dict) -> bool:
        for key, value in filters.items():
            if key not in metadata:
                return False
            
            if isinstance(value, list):
                if metadata[key] not in value:
                    return False
            elif metadata[key] != value:
                return False
        
        return True

实践练习

练习1:实现完整的文档处理流程

python
class DocumentProcessor:
    def __init__(self):
        self.loader = DocumentLoader()
        self.preprocessor = TextPreprocessor()
        self.splitter = RecursiveSplitter()
        self.metadata_extractor = MetadataExtractor()
    
    def process_file(self, file_path: str) -> List[Dict]:
        document = self.loader.load(file_path)
        
        document["content"] = self.preprocessor.preprocess(
            document["content"]
        )
        
        chunks = self.splitter.split(document["content"])
        
        processed_chunks = []
        for chunk in chunks:
            chunk["metadata"] = {
                **document["metadata"],
                **chunk["metadata"]
            }
            chunk["metadata"].update(
                self.metadata_extractor.extract(chunk)
            )
            
            processed_chunks.append(chunk)
        
        return processed_chunks

练习2:实现批量文档处理

python
class BatchDocumentProcessor:
    def __init__(self):
        self.batch_loader = BatchDocumentLoader()
        self.processor = DocumentProcessor()
    
    def process_directory(self, directory: str) -> List[Dict]:
        documents = self.batch_loader.load_directory(directory)
        
        all_chunks = []
        for document in documents:
            chunks = self.processor.process_file(
                document["metadata"]["filepath"]
            )
            all_chunks.extend(chunks)
        
        return all_chunks

总结

本节我们学习了文档处理与切片:

  1. 文档加载技术(基础加载器、批量加载)
  2. 文本预处理方法(基础、高级)
  3. 文档切片策略(固定大小、段落、语义、递归)
  4. 元数据提取(基础、高级)
  5. 文档索引(构建、查询)

文档处理是RAG系统的基础,掌握这些技术对于构建高效的RAG系统至关重要。

参考资源