Skip to content

第90天:智能文档分析-核心功能开发与部署

今日目标

  • 学习智能文档分析系统的核心功能开发
  • 掌握文档处理系统的实现
  • 实现内容分析系统和智能处理引擎
  • 学习系统部署和监控

核心功能开发

文档处理系统

多格式文档处理器

python
class MultiFormatDocumentProcessor:
    def __init__(self):
        self.parsers = {
            "pdf": PDFParser(),
            "docx": DOCXParser(),
            "txt": TXTParser(),
            "csv": CSVParser(),
            "xlsx": ExcelParser()
        }
    
    def process(self, file_path, file_type=None):
        """处理多种格式的文档"""
        if not file_type:
            file_type = self._detect_file_type(file_path)
        
        if file_type in self.parsers:
            try:
                parser = self.parsers[file_type]
                return parser.parse(file_path)
            except Exception as e:
                return {
                    "error": f"文档处理错误: {str(e)}",
                    "status": "failed"
                }
        else:
            return {
                "error": "不支持的文件类型",
                "status": "failed"
            }
    
    def _detect_file_type(self, file_path):
        """检测文件类型"""
        import os
        ext = os.path.splitext(file_path)[1].lower().lstrip('.')
        
        if ext == "pdf":
            return "pdf"
        elif ext in ["docx", "doc"]:
            return "docx"
        elif ext == "txt":
            return "txt"
        elif ext == "csv":
            return "csv"
        elif ext in ["xlsx", "xls"]:
            return "xlsx"
        else:
            return "unknown"

DOCX解析器

python
class DOCXParser:
    def parse(self, file_path):
        """解析DOCX文档"""
        try:
            from docx import Document
            
            doc = Document(file_path)
            document = {
                "type": "docx",
                "path": file_path,
                "content": {
                    "text": "",
                    "tables": []
                },
                "metadata": {}
            }
            
            # 提取文本
            text_content = []
            for para in doc.paragraphs:
                text_content.append(para.text)
            document["content"]["text"] = "\n".join(text_content)
            
            # 提取表格
            tables = []
            for table in doc.tables:
                table_data = []
                for row in table.rows:
                    row_data = []
                    for cell in row.cells:
                        row_data.append(cell.text)
                    table_data.append(row_data)
                tables.append(table_data)
            document["content"]["tables"] = tables
            
            return document
        except Exception as e:
            raise Exception(f"DOCX解析错误: {str(e)}")

Excel解析器

python
class ExcelParser:
    def parse(self, file_path):
        """解析Excel文档"""
        try:
            import pandas as pd
            
            document = {
                "type": "excel",
                "path": file_path,
                "content": {
                    "sheets": {}
                },
                "metadata": {}
            }
            
            # 读取Excel文件
            xls = pd.ExcelFile(file_path)
            document["metadata"]["sheets"] = xls.sheet_names
            
            # 提取每个sheet的数据
            for sheet_name in xls.sheet_names:
                df = pd.read_excel(file_path, sheet_name=sheet_name)
                document["content"]["sheets"][sheet_name] = {
                    "data": df.to_dict('records'),
                    "columns": df.columns.tolist()
                }
            
            return document
        except Exception as e:
            raise Exception(f"Excel解析错误: {str(e)}")

内容分析系统

文本分析器

python
class TextAnalyzer:
    def __init__(self):
        import nltk
        import spacy
        
        # 下载必要的资源
        nltk.download('punkt')
        nltk.download('stopwords')
        
        # 加载spaCy模型
        try:
            self.nlp = spacy.load('en_core_web_sm')
        except:
            # 如果没有英文模型,尝试使用中文模型
            try:
                self.nlp = spacy.load('zh_core_web_sm')
            except:
                self.nlp = None
    
    def analyze(self, text):
        """分析文本内容"""
        analysis = {
            "word_count": 0,
            "sentence_count": 0,
            "keywords": [],
            "entities": [],
            "topics": []
        }
        
        if not text:
            return analysis
        
        import nltk
        from nltk.corpus import stopwords
        from nltk.tokenize import word_tokenize, sent_tokenize
        from collections import Counter
        
        # 计算词数和句子数
        words = word_tokenize(text)
        sentences = sent_tokenize(text)
        analysis["word_count"] = len(words)
        analysis["sentence_count"] = len(sentences)
        
        # 提取关键词
        stop_words = set(stopwords.words('english'))
        filtered_words = [word.lower() for word in words if word.isalnum() and word.lower() not in stop_words]
        word_freq = Counter(filtered_words)
        analysis["keywords"] = [word for word, _ in word_freq.most_common(10)]
        
        # 提取实体
        if self.nlp:
            doc = self.nlp(text)
            entities = []
            for ent in doc.ents:
                entities.append({
                    "text": ent.text,
                    "label": ent.label_
                })
            analysis["entities"] = entities
        
        return analysis

表格分析器

python
class TableAnalyzer:
    def analyze(self, tables):
        """分析表格内容"""
        analysis = {
            "table_count": len(tables),
            "tables": []
        }
        
        for i, table in enumerate(tables):
            table_analysis = {
                "index": i,
                "rows": len(table),
                "columns": len(table[0]) if table else 0,
                "headers": table[0] if table else [],
                "data_types": []
            }
            
            # 分析数据类型
            if table and len(table) > 1:
                import pandas as pd
                df = pd.DataFrame(table[1:], columns=table[0])
                data_types = {}
                for col in df.columns:
                    try:
                        # 尝试转换为数字
                        pd.to_numeric(df[col])
                        data_types[col] = "numeric"
                    except:
                        data_types[col] = "text"
                table_analysis["data_types"] = data_types
            
            analysis["tables"].append(table_analysis)
        
        return analysis

智能处理引擎

文档摘要生成器

python
class Summarizer:
    def __init__(self):
        try:
            from transformers import pipeline
            self.summarizer = pipeline("summarization", model="facebook/bart-large-cnn")
        except:
            self.summarizer = None
    
    def summarize(self, text_analysis):
        """生成文档摘要"""
        text = text_analysis.get("text", "")
        if not text:
            return ""
        
        try:
            if self.summarizer:
                # 使用transformers生成摘要
                max_length = min(150, len(text) // 3)
                min_length = max(30, len(text) // 10)
                
                summary = self.summarizer(
                    text,
                    max_length=max_length,
                    min_length=min_length,
                    do_sample=False
                )[0]["summary_text"]
                return summary
            else:
                # 简单的摘要生成
                sentences = text.split('. ')
                if len(sentences) > 3:
                    return '. '.join(sentences[:3]) + '.'
                return text
        except Exception as e:
            print(f"摘要生成错误: {str(e)}")
            return ""

文档问答系统

python
class QASystem:
    def __init__(self):
        try:
            from langchain.llms import OpenAI
            from langchain.chains import RetrievalQA
            from langchain.vectorstores import FAISS
            from langchain.embeddings import OpenAIEmbeddings
            from langchain.text_splitter import CharacterTextSplitter
            
            self.llm = OpenAI(temperature=0.7)
            self.embeddings = OpenAIEmbeddings()
        except:
            self.llm = None
    
    def answer_question(self, document, question):
        """回答关于文档的问题"""
        try:
            if not self.llm:
                return "问答系统未初始化"
            
            # 准备文档内容
            text = document.get("content", {}).get("text", "")
            if not text:
                return "文档内容为空"
            
            # 分割文本
            text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
            texts = text_splitter.split_text(text)
            
            # 创建向量存储
            docsearch = FAISS.from_texts(texts, self.embeddings)
            
            # 创建QA链
            qa = RetrievalQA.from_chain_type(
                llm=self.llm,
                chain_type="stuff",
                retriever=docsearch.as_retriever()
            )
            
            # 回答问题
            answer = qa.run(question)
            return answer
        except Exception as e:
            print(f"问答系统错误: {str(e)}")
            return "无法回答问题"

系统部署

FastAPI应用

python
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
import os
import tempfile
from document_processor import MultiFormatDocumentProcessor
from content_analyzer import ContentAnalyzer
from intelligent_engine import IntelligentEngine
from storage_system import StorageSystem

app = FastAPI(
    title="智能文档分析系统",
    description="一个功能强大的智能文档分析系统",
    version="1.0.0"
)

# 配置CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 初始化系统组件
processor = MultiFormatDocumentProcessor()
analyzer = ContentAnalyzer()
intelligent_engine = IntelligentEngine()
storage = StorageSystem()

@app.post("/api/documents/process")
async def process_document(file: UploadFile = File(...)):
    """处理文档"""
    try:
        # 保存上传的文件
        with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(file.filename)[1]) as temp_file:
            content = await file.read()
            temp_file.write(content)
            temp_file_path = temp_file.name
        
        try:
            # 处理文档
            document = processor.process(temp_file_path)
            
            # 分析文档
            analysis_result = analyzer.analyze(document)
            
            # 智能处理
            intelligent_result = intelligent_engine.process(analysis_result)
            
            # 存储结果
            storage_result = storage.store(intelligent_result)
            
            return {
                "document_id": storage_result["document_id"],
                "analysis_result": intelligent_result,
                "status": "completed"
            }
        finally:
            # 清理临时文件
            if os.path.exists(temp_file_path):
                os.unlink(temp_file_path)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/api/documents/{document_id}")
async def get_document(document_id: str):
    """获取文档分析结果"""
    try:
        document = storage.get(document_id)
        if not document:
            raise HTTPException(status_code=404, detail="文档不存在")
        return document
    except HTTPException:
        raise
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/api/documents/{document_id}/qa")
async def ask_question(document_id: str, question: str):
    """向文档提问"""
    try:
        document = storage.get(document_id)
        if not document:
            raise HTTPException(status_code=404, detail="文档不存在")
        
        answer = intelligent_engine.engines["qa_system"].answer_question(document, question)
        return {
            "question": question,
            "answer": answer
        }
    except HTTPException:
        raise
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

存储系统

python
class StorageSystem:
    def __init__(self):
        try:
            import pymongo
            from pymongo import MongoClient
            import redis
            
            # 连接MongoDB
            self.client = MongoClient("mongodb://localhost:27017/")
            self.db = self.client["document_analysis"]
            self.collection = self.db["documents"]
            
            # 连接Redis
            self.redis_client = redis.Redis(host="localhost", port=6379, db=0)
        except Exception as e:
            print(f"存储系统初始化错误: {str(e)}")
            self.client = None
            self.redis_client = None
    
    def store(self, document):
        """存储文档分析结果"""
        try:
            if self.collection:
                # 生成文档ID
                import uuid
                document_id = str(uuid.uuid4())
                document["_id"] = document_id
                document["document_id"] = document_id
                
                # 存储到MongoDB
                self.collection.insert_one(document)
                
                # 缓存到Redis
                if self.redis_client:
                    import json
                    self.redis_client.set(f"document:{document_id}", json.dumps(document))
                
                return {
                    "document_id": document_id,
                    "status": "stored"
                }
            else:
                return {
                    "error": "存储系统未初始化",
                    "status": "failed"
                }
        except Exception as e:
            return {
                "error": str(e),
                "status": "failed"
            }
    
    def get(self, document_id):
        """获取文档分析结果"""
        try:
            # 先从缓存获取
            if self.redis_client:
                import json
                cached_document = self.redis_client.get(f"document:{document_id}")
                if cached_document:
                    return json.loads(cached_document)
            
            # 从MongoDB获取
            if self.collection:
                document = self.collection.find_one({"_id": document_id})
                if document:
                    # 转换ObjectId为字符串
                    if "_id" in document:
                        document["_id"] = str(document["_id"])
                    
                    # 更新缓存
                    if self.redis_client:
                        import json
                        self.redis_client.set(f"document:{document_id}", json.dumps(document))
                    
                    return document
            
            return None
        except Exception as e:
            print(f"获取文档错误: {str(e)}")
            return None

系统监控与日志

监控系统

python
class MonitoringSystem:
    def __init__(self):
        self.metrics = {
            "document_processing_time": [],
            "content_analysis_time": [],
            "intelligent_processing_time": [],
            "total_processing_time": [],
            "error_count": 0,
            "success_count": 0
        }
    
    def record_metric(self, metric_name, value):
        """记录指标"""
        if metric_name in self.metrics:
            if isinstance(self.metrics[metric_name], list):
                self.metrics[metric_name].append(value)
            else:
                self.metrics[metric_name] += value
    
    def get_metrics(self):
        """获取指标"""
        return self.metrics
    
    def generate_report(self):
        """生成报告"""
        report = {}
        for metric_name, values in self.metrics.items():
            if isinstance(values, list) and values:
                report[metric_name] = {
                    "average": sum(values) / len(values),
                    "min": min(values),
                    "max": max(values),
                    "count": len(values)
                }
            else:
                report[metric_name] = values
        return report

日志系统

python
import logging
import os
from logging.handlers import RotatingFileHandler

class LoggingSystem:
    def __init__(self, log_dir="logs"):
        # 创建日志目录
        if not os.path.exists(log_dir):
            os.makedirs(log_dir)
        
        # 配置日志
        log_file = os.path.join(log_dir, "document_analysis.log")
        
        # 设置日志级别
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                RotatingFileHandler(log_file, maxBytes=10485760, backupCount=5),
                logging.StreamHandler()
            ]
        )
        
        self.logger = logging.getLogger("DocumentAnalysisSystem")
    
    def info(self, message):
        """记录信息日志"""
        self.logger.info(message)
    
    def error(self, message, exc_info=False):
        """记录错误日志"""
        self.logger.error(message, exc_info=exc_info)
    
    def warning(self, message):
        """记录警告日志"""
        self.logger.warning(message)
    
    def debug(self, message):
        """记录调试日志"""
        self.logger.debug(message)

今日总结

今天我们完成了智能文档分析系统的核心功能开发与部署:

  1. 核心功能开发

    • 文档处理系统:支持PDF、DOCX、Excel等多种格式的文档解析
    • 内容分析系统:实现了文本分析和表格分析功能
    • 智能处理引擎:实现了文档摘要生成和文档问答系统
  2. 系统部署

    • 使用FastAPI构建了RESTful API服务
    • 实现了存储系统,支持MongoDB和Redis
    • 配置了CORS中间件,支持跨域请求
  3. 系统监控与日志

    • 实现了监控系统,记录系统性能指标
    • 实现了日志系统,记录系统运行状态

智能文档分析系统可以帮助用户快速处理和分析大量文档,提取关键信息,生成摘要,回答问题,提高工作效率。

作业

  1. 部署智能文档分析系统到Docker容器
  2. 测试系统对不同格式文档的处理能力
  3. 优化系统性能,提高文档处理速度
  4. 开发一个前端界面,与智能文档分析系统集成

课程总结

至此,我们已经完成了整个AI课程的学习,包括:

  1. 第一阶段:基础入门(第1-15天)

    • LLM大模型基础
    • 国内外主流平台对比
    • 提示词工程
  2. 第二阶段:核心技术(第16-72天)

    • MCP多智能体协作平台
    • 技能系统开发
    • Agent智能体
    • RAG技术
    • 大模型微调
    • AI应用部署
    • AI安全与伦理
    • 国内外平台深度对比
  3. 第三阶段:综合实战项目(第73-90天)

    • 智能客服系统
    • 个人助理Agent
    • 企业知识库
    • 代码助手系统
    • 智能文档分析系统

通过这90天的学习,我们掌握了AI领域的核心技术和应用,能够开发各种AI应用系统,为企业和个人提供智能服务。

希望大家能够将所学知识应用到实际项目中,不断探索AI技术的新应用场景,为AI技术的发展做出贡献!