Appearance
第90天:智能文档分析-核心功能开发与部署
今日目标
- 学习智能文档分析系统的核心功能开发
- 掌握文档处理系统的实现
- 实现内容分析系统和智能处理引擎
- 学习系统部署和监控
核心功能开发
文档处理系统
多格式文档处理器
python
class MultiFormatDocumentProcessor:
def __init__(self):
self.parsers = {
"pdf": PDFParser(),
"docx": DOCXParser(),
"txt": TXTParser(),
"csv": CSVParser(),
"xlsx": ExcelParser()
}
def process(self, file_path, file_type=None):
"""处理多种格式的文档"""
if not file_type:
file_type = self._detect_file_type(file_path)
if file_type in self.parsers:
try:
parser = self.parsers[file_type]
return parser.parse(file_path)
except Exception as e:
return {
"error": f"文档处理错误: {str(e)}",
"status": "failed"
}
else:
return {
"error": "不支持的文件类型",
"status": "failed"
}
def _detect_file_type(self, file_path):
"""检测文件类型"""
import os
ext = os.path.splitext(file_path)[1].lower().lstrip('.')
if ext == "pdf":
return "pdf"
elif ext in ["docx", "doc"]:
return "docx"
elif ext == "txt":
return "txt"
elif ext == "csv":
return "csv"
elif ext in ["xlsx", "xls"]:
return "xlsx"
else:
return "unknown"DOCX解析器
python
class DOCXParser:
def parse(self, file_path):
"""解析DOCX文档"""
try:
from docx import Document
doc = Document(file_path)
document = {
"type": "docx",
"path": file_path,
"content": {
"text": "",
"tables": []
},
"metadata": {}
}
# 提取文本
text_content = []
for para in doc.paragraphs:
text_content.append(para.text)
document["content"]["text"] = "\n".join(text_content)
# 提取表格
tables = []
for table in doc.tables:
table_data = []
for row in table.rows:
row_data = []
for cell in row.cells:
row_data.append(cell.text)
table_data.append(row_data)
tables.append(table_data)
document["content"]["tables"] = tables
return document
except Exception as e:
raise Exception(f"DOCX解析错误: {str(e)}")Excel解析器
python
class ExcelParser:
def parse(self, file_path):
"""解析Excel文档"""
try:
import pandas as pd
document = {
"type": "excel",
"path": file_path,
"content": {
"sheets": {}
},
"metadata": {}
}
# 读取Excel文件
xls = pd.ExcelFile(file_path)
document["metadata"]["sheets"] = xls.sheet_names
# 提取每个sheet的数据
for sheet_name in xls.sheet_names:
df = pd.read_excel(file_path, sheet_name=sheet_name)
document["content"]["sheets"][sheet_name] = {
"data": df.to_dict('records'),
"columns": df.columns.tolist()
}
return document
except Exception as e:
raise Exception(f"Excel解析错误: {str(e)}")内容分析系统
文本分析器
python
class TextAnalyzer:
def __init__(self):
import nltk
import spacy
# 下载必要的资源
nltk.download('punkt')
nltk.download('stopwords')
# 加载spaCy模型
try:
self.nlp = spacy.load('en_core_web_sm')
except:
# 如果没有英文模型,尝试使用中文模型
try:
self.nlp = spacy.load('zh_core_web_sm')
except:
self.nlp = None
def analyze(self, text):
"""分析文本内容"""
analysis = {
"word_count": 0,
"sentence_count": 0,
"keywords": [],
"entities": [],
"topics": []
}
if not text:
return analysis
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize, sent_tokenize
from collections import Counter
# 计算词数和句子数
words = word_tokenize(text)
sentences = sent_tokenize(text)
analysis["word_count"] = len(words)
analysis["sentence_count"] = len(sentences)
# 提取关键词
stop_words = set(stopwords.words('english'))
filtered_words = [word.lower() for word in words if word.isalnum() and word.lower() not in stop_words]
word_freq = Counter(filtered_words)
analysis["keywords"] = [word for word, _ in word_freq.most_common(10)]
# 提取实体
if self.nlp:
doc = self.nlp(text)
entities = []
for ent in doc.ents:
entities.append({
"text": ent.text,
"label": ent.label_
})
analysis["entities"] = entities
return analysis表格分析器
python
class TableAnalyzer:
def analyze(self, tables):
"""分析表格内容"""
analysis = {
"table_count": len(tables),
"tables": []
}
for i, table in enumerate(tables):
table_analysis = {
"index": i,
"rows": len(table),
"columns": len(table[0]) if table else 0,
"headers": table[0] if table else [],
"data_types": []
}
# 分析数据类型
if table and len(table) > 1:
import pandas as pd
df = pd.DataFrame(table[1:], columns=table[0])
data_types = {}
for col in df.columns:
try:
# 尝试转换为数字
pd.to_numeric(df[col])
data_types[col] = "numeric"
except:
data_types[col] = "text"
table_analysis["data_types"] = data_types
analysis["tables"].append(table_analysis)
return analysis智能处理引擎
文档摘要生成器
python
class Summarizer:
def __init__(self):
try:
from transformers import pipeline
self.summarizer = pipeline("summarization", model="facebook/bart-large-cnn")
except:
self.summarizer = None
def summarize(self, text_analysis):
"""生成文档摘要"""
text = text_analysis.get("text", "")
if not text:
return ""
try:
if self.summarizer:
# 使用transformers生成摘要
max_length = min(150, len(text) // 3)
min_length = max(30, len(text) // 10)
summary = self.summarizer(
text,
max_length=max_length,
min_length=min_length,
do_sample=False
)[0]["summary_text"]
return summary
else:
# 简单的摘要生成
sentences = text.split('. ')
if len(sentences) > 3:
return '. '.join(sentences[:3]) + '.'
return text
except Exception as e:
print(f"摘要生成错误: {str(e)}")
return ""文档问答系统
python
class QASystem:
def __init__(self):
try:
from langchain.llms import OpenAI
from langchain.chains import RetrievalQA
from langchain.vectorstores import FAISS
from langchain.embeddings import OpenAIEmbeddings
from langchain.text_splitter import CharacterTextSplitter
self.llm = OpenAI(temperature=0.7)
self.embeddings = OpenAIEmbeddings()
except:
self.llm = None
def answer_question(self, document, question):
"""回答关于文档的问题"""
try:
if not self.llm:
return "问答系统未初始化"
# 准备文档内容
text = document.get("content", {}).get("text", "")
if not text:
return "文档内容为空"
# 分割文本
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
texts = text_splitter.split_text(text)
# 创建向量存储
docsearch = FAISS.from_texts(texts, self.embeddings)
# 创建QA链
qa = RetrievalQA.from_chain_type(
llm=self.llm,
chain_type="stuff",
retriever=docsearch.as_retriever()
)
# 回答问题
answer = qa.run(question)
return answer
except Exception as e:
print(f"问答系统错误: {str(e)}")
return "无法回答问题"系统部署
FastAPI应用
python
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
import os
import tempfile
from document_processor import MultiFormatDocumentProcessor
from content_analyzer import ContentAnalyzer
from intelligent_engine import IntelligentEngine
from storage_system import StorageSystem
app = FastAPI(
title="智能文档分析系统",
description="一个功能强大的智能文档分析系统",
version="1.0.0"
)
# 配置CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 初始化系统组件
processor = MultiFormatDocumentProcessor()
analyzer = ContentAnalyzer()
intelligent_engine = IntelligentEngine()
storage = StorageSystem()
@app.post("/api/documents/process")
async def process_document(file: UploadFile = File(...)):
"""处理文档"""
try:
# 保存上传的文件
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(file.filename)[1]) as temp_file:
content = await file.read()
temp_file.write(content)
temp_file_path = temp_file.name
try:
# 处理文档
document = processor.process(temp_file_path)
# 分析文档
analysis_result = analyzer.analyze(document)
# 智能处理
intelligent_result = intelligent_engine.process(analysis_result)
# 存储结果
storage_result = storage.store(intelligent_result)
return {
"document_id": storage_result["document_id"],
"analysis_result": intelligent_result,
"status": "completed"
}
finally:
# 清理临时文件
if os.path.exists(temp_file_path):
os.unlink(temp_file_path)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/documents/{document_id}")
async def get_document(document_id: str):
"""获取文档分析结果"""
try:
document = storage.get(document_id)
if not document:
raise HTTPException(status_code=404, detail="文档不存在")
return document
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/documents/{document_id}/qa")
async def ask_question(document_id: str, question: str):
"""向文档提问"""
try:
document = storage.get(document_id)
if not document:
raise HTTPException(status_code=404, detail="文档不存在")
answer = intelligent_engine.engines["qa_system"].answer_question(document, question)
return {
"question": question,
"answer": answer
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)存储系统
python
class StorageSystem:
def __init__(self):
try:
import pymongo
from pymongo import MongoClient
import redis
# 连接MongoDB
self.client = MongoClient("mongodb://localhost:27017/")
self.db = self.client["document_analysis"]
self.collection = self.db["documents"]
# 连接Redis
self.redis_client = redis.Redis(host="localhost", port=6379, db=0)
except Exception as e:
print(f"存储系统初始化错误: {str(e)}")
self.client = None
self.redis_client = None
def store(self, document):
"""存储文档分析结果"""
try:
if self.collection:
# 生成文档ID
import uuid
document_id = str(uuid.uuid4())
document["_id"] = document_id
document["document_id"] = document_id
# 存储到MongoDB
self.collection.insert_one(document)
# 缓存到Redis
if self.redis_client:
import json
self.redis_client.set(f"document:{document_id}", json.dumps(document))
return {
"document_id": document_id,
"status": "stored"
}
else:
return {
"error": "存储系统未初始化",
"status": "failed"
}
except Exception as e:
return {
"error": str(e),
"status": "failed"
}
def get(self, document_id):
"""获取文档分析结果"""
try:
# 先从缓存获取
if self.redis_client:
import json
cached_document = self.redis_client.get(f"document:{document_id}")
if cached_document:
return json.loads(cached_document)
# 从MongoDB获取
if self.collection:
document = self.collection.find_one({"_id": document_id})
if document:
# 转换ObjectId为字符串
if "_id" in document:
document["_id"] = str(document["_id"])
# 更新缓存
if self.redis_client:
import json
self.redis_client.set(f"document:{document_id}", json.dumps(document))
return document
return None
except Exception as e:
print(f"获取文档错误: {str(e)}")
return None系统监控与日志
监控系统
python
class MonitoringSystem:
def __init__(self):
self.metrics = {
"document_processing_time": [],
"content_analysis_time": [],
"intelligent_processing_time": [],
"total_processing_time": [],
"error_count": 0,
"success_count": 0
}
def record_metric(self, metric_name, value):
"""记录指标"""
if metric_name in self.metrics:
if isinstance(self.metrics[metric_name], list):
self.metrics[metric_name].append(value)
else:
self.metrics[metric_name] += value
def get_metrics(self):
"""获取指标"""
return self.metrics
def generate_report(self):
"""生成报告"""
report = {}
for metric_name, values in self.metrics.items():
if isinstance(values, list) and values:
report[metric_name] = {
"average": sum(values) / len(values),
"min": min(values),
"max": max(values),
"count": len(values)
}
else:
report[metric_name] = values
return report日志系统
python
import logging
import os
from logging.handlers import RotatingFileHandler
class LoggingSystem:
def __init__(self, log_dir="logs"):
# 创建日志目录
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# 配置日志
log_file = os.path.join(log_dir, "document_analysis.log")
# 设置日志级别
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
RotatingFileHandler(log_file, maxBytes=10485760, backupCount=5),
logging.StreamHandler()
]
)
self.logger = logging.getLogger("DocumentAnalysisSystem")
def info(self, message):
"""记录信息日志"""
self.logger.info(message)
def error(self, message, exc_info=False):
"""记录错误日志"""
self.logger.error(message, exc_info=exc_info)
def warning(self, message):
"""记录警告日志"""
self.logger.warning(message)
def debug(self, message):
"""记录调试日志"""
self.logger.debug(message)今日总结
今天我们完成了智能文档分析系统的核心功能开发与部署:
核心功能开发:
- 文档处理系统:支持PDF、DOCX、Excel等多种格式的文档解析
- 内容分析系统:实现了文本分析和表格分析功能
- 智能处理引擎:实现了文档摘要生成和文档问答系统
系统部署:
- 使用FastAPI构建了RESTful API服务
- 实现了存储系统,支持MongoDB和Redis
- 配置了CORS中间件,支持跨域请求
系统监控与日志:
- 实现了监控系统,记录系统性能指标
- 实现了日志系统,记录系统运行状态
智能文档分析系统可以帮助用户快速处理和分析大量文档,提取关键信息,生成摘要,回答问题,提高工作效率。
作业
- 部署智能文档分析系统到Docker容器
- 测试系统对不同格式文档的处理能力
- 优化系统性能,提高文档处理速度
- 开发一个前端界面,与智能文档分析系统集成
课程总结
至此,我们已经完成了整个AI课程的学习,包括:
第一阶段:基础入门(第1-15天)
- LLM大模型基础
- 国内外主流平台对比
- 提示词工程
第二阶段:核心技术(第16-72天)
- MCP多智能体协作平台
- 技能系统开发
- Agent智能体
- RAG技术
- 大模型微调
- AI应用部署
- AI安全与伦理
- 国内外平台深度对比
第三阶段:综合实战项目(第73-90天)
- 智能客服系统
- 个人助理Agent
- 企业知识库
- 代码助手系统
- 智能文档分析系统
通过这90天的学习,我们掌握了AI领域的核心技术和应用,能够开发各种AI应用系统,为企业和个人提供智能服务。
希望大家能够将所学知识应用到实际项目中,不断探索AI技术的新应用场景,为AI技术的发展做出贡献!
