Appearance
OpenClaw 自动评估系统
自动评估系统是OpenClaw教育解决方案的关键组件,通过AI技术实现自动化作业批改、智能考试评分和学习效果分析,大幅提升评估效率和准确性。
系统概述
自动评估系统基于自然语言处理、计算机视觉和机器学习技术,能够:
- 自动批改客观题和主观题
- 智能评分并提供详细反馈
- 分析学习效果和能力发展
- 生成个性化学习建议
核心功能
1. 自动作业批改
功能描述
支持多种题型的自动批改,包括选择题、填空题、简答题、编程题等,提供准确的评分和详细的反馈。
技术实现
python
from typing import Dict, List, Optional
from datetime import datetime
import numpy as np
class AutomaticGradingSystem:
def __init__(self, config: Dict):
self.config = config
self.submission_processor = SubmissionProcessor()
self.question_classifier = QuestionClassifier()
self.grading_engines = {
'multiple_choice': MultipleChoiceGrader(),
'fill_in_blank': FillInBlankGrader(),
'short_answer': ShortAnswerGrader(),
'essay': EssayGrader(),
'programming': ProgrammingGrader()
}
self.feedback_generator = FeedbackGenerator()
self.plagiarism_detector = PlagiarismDetector()
async def grade_submission(
self,
submission: Dict
) -> Dict:
processed_submission = await self.submission_processor.process(
submission
)
question_types = await self.question_classifier.classify(
processed_submission['questions']
)
results = []
for i, question in enumerate(processed_submission['questions']):
question_type = question_types[i]
grader = self.grading_engines.get(question_type)
if grader:
result = await grader.grade(
question,
processed_submission['answers'][i]
)
results.append(result)
total_score = sum(r['score'] for r in results)
max_score = sum(r['max_score'] for r in results)
plagiarism_check = await self.plagiarism_detector.check(
processed_submission
)
feedback = await self.feedback_generator.generate(
results,
plagiarism_check
)
return {
'submission_id': submission['id'],
'student_id': submission['student_id'],
'assignment_id': submission['assignment_id'],
'total_score': total_score,
'max_score': max_score,
'percentage': (total_score / max_score) * 100 if max_score > 0 else 0,
'results': results,
'feedback': feedback,
'plagiarism_report': plagiarism_check,
'graded_at': datetime.now().isoformat()
}
class SubmissionProcessor:
async def process(self, submission: Dict) -> Dict:
processed = {
'id': submission['id'],
'student_id': submission['student_id'],
'assignment_id': submission['assignment_id'],
'questions': submission['questions'],
'answers': submission['answers'],
'format': submission.get('format', 'text'),
'metadata': submission.get('metadata', {})
}
if processed['format'] == 'image':
processed = await self.process_image_submission(processed)
elif processed['format'] == 'code':
processed = await self.process_code_submission(processed)
return processed
async def process_image_submission(self, submission: Dict) -> Dict:
return submission
async def process_code_submission(self, submission: Dict) -> Dict:
return submission
class QuestionClassifier:
def __init__(self):
self.patterns = {
'multiple_choice': ['A.', 'B.', 'C.', 'D.', '选择'],
'fill_in_blank': ['_____', '填空', '空白'],
'short_answer': ['简答', '简要回答', '简述'],
'essay': ['论述', '作文', '论文'],
'programming': ['编写', '实现', '代码', '程序']
}
async def classify(self, questions: List[str]) -> List[str]:
types = []
for question in questions:
question_type = self.classify_single(question)
types.append(question_type)
return types
def classify_single(self, question: str) -> str:
scores = {}
for qtype, patterns in self.patterns.items():
score = sum(1 for pattern in patterns if pattern in question)
scores[qtype] = score
if not scores or max(scores.values()) == 0:
return 'short_answer'
return max(scores, key=scores.get)
class MultipleChoiceGrader:
def __init__(self):
self.normalizer = AnswerNormalizer()
async def grade(
self,
question: Dict,
answer: str
) -> Dict:
correct_answer = question['correct_answer']
normalized_answer = self.normalizer.normalize(answer)
normalized_correct = self.normalizer.normalize(correct_answer)
if normalized_answer == normalized_correct:
score = question['points']
is_correct = True
else:
score = 0
is_correct = False
return {
'question_id': question['id'],
'question_type': 'multiple_choice',
'student_answer': answer,
'correct_answer': correct_answer,
'score': score,
'max_score': question['points'],
'is_correct': is_correct,
'feedback': self.generate_feedback(is_correct, correct_answer)
}
def generate_feedback(self, is_correct: bool, correct_answer: str) -> str:
if is_correct:
return "回答正确!"
else:
return f"回答错误,正确答案是:{correct_answer}"
class AnswerNormalizer:
def normalize(self, answer: str) -> str:
answer = answer.strip().upper()
answer = answer.replace(',', ',').replace('。', '.')
answer = answer.replace(' ', '')
return answer
class FillInBlankGrader:
def __init__(self):
self.normalizer = AnswerNormalizer()
self.fuzzy_matcher = FuzzyMatcher()
async def grade(
self,
question: Dict,
answer: str
) -> Dict:
correct_answers = question['correct_answers']
normalized_answer = self.normalizer.normalize(answer)
best_match = None
best_score = 0
for correct_answer in correct_answers:
normalized_correct = self.normalizer.normalize(correct_answer)
if normalized_answer == normalized_correct:
best_match = correct_answer
best_score = 1.0
break
else:
similarity = self.fuzzy_matcher.calculate_similarity(
normalized_answer,
normalized_correct
)
if similarity > best_score:
best_score = similarity
best_match = correct_answer
score = question['points'] * best_score
return {
'question_id': question['id'],
'question_type': 'fill_in_blank',
'student_answer': answer,
'best_match': best_match,
'similarity': best_score,
'score': score,
'max_score': question['points'],
'feedback': self.generate_feedback(best_score, best_match)
}
def generate_feedback(self, similarity: float, best_match: str) -> str:
if similarity >= 1.0:
return "回答正确!"
elif similarity >= 0.8:
return f"基本正确,最佳匹配是:{best_match}"
elif similarity >= 0.5:
return f"部分正确,建议答案是:{best_match}"
else:
return f"回答不正确,正确答案是:{best_match}"
class FuzzyMatcher:
def calculate_similarity(self, str1: str, str2: str) -> float:
from difflib import SequenceMatcher
return SequenceMatcher(None, str1, str2).ratio()
class ShortAnswerGrader:
def __init__(self):
self.llm_client = LLMClient()
self.keyword_matcher = KeywordMatcher()
async def grade(
self,
question: Dict,
answer: str
) -> Dict:
keyword_score = self.keyword_matcher.match(
answer,
question['keywords']
)
llm_evaluation = await self.llm_client.evaluate_short_answer(
question['question'],
answer,
question['reference_answer']
)
combined_score = self.combine_scores(
keyword_score,
llm_evaluation['score']
)
score = question['points'] * combined_score
return {
'question_id': question['id'],
'question_type': 'short_answer',
'student_answer': answer,
'keyword_score': keyword_score,
'llm_score': llm_evaluation['score'],
'combined_score': combined_score,
'score': score,
'max_score': question['points'],
'feedback': llm_evaluation['feedback'],
'improvements': llm_evaluation.get('improvements', [])
}
def combine_scores(self, keyword_score: float, llm_score: float) -> float:
return keyword_score * 0.4 + llm_score * 0.6
class KeywordMatcher:
def match(self, answer: str, keywords: List[str]) -> float:
if not keywords:
return 1.0
matched = sum(1 for keyword in keywords if keyword in answer)
return matched / len(keywords)
class EssayGrader:
def __init__(self):
self.llm_client = LLMClient()
self.quality_analyzer = QualityAnalyzer()
self.content_analyzer = ContentAnalyzer()
async def grade(
self,
question: Dict,
essay: str
) -> Dict:
quality_analysis = await self.quality_analyzer.analyze(essay)
content_analysis = await self.content_analyzer.analyze(
essay,
question['requirements']
)
llm_evaluation = await self.llm_client.evaluate_essay(
question['question'],
essay,
question['rubric']
)
final_score = self.calculate_final_score(
quality_analysis,
content_analysis,
llm_evaluation
)
score = question['points'] * final_score
feedback = self.generate_comprehensive_feedback(
quality_analysis,
content_analysis,
llm_evaluation
)
return {
'question_id': question['id'],
'question_type': 'essay',
'essay': essay,
'quality_analysis': quality_analysis,
'content_analysis': content_analysis,
'llm_evaluation': llm_evaluation,
'final_score': final_score,
'score': score,
'max_score': question['points'],
'feedback': feedback,
'suggestions': self.generate_suggestions(
quality_analysis,
content_analysis
)
}
def calculate_final_score(
self,
quality: Dict,
content: Dict,
llm: Dict
) -> float:
quality_score = quality['overall_score']
content_score = content['overall_score']
llm_score = llm['score']
return quality_score * 0.3 + content_score * 0.3 + llm_score * 0.4
def generate_comprehensive_feedback(
self,
quality: Dict,
content: Dict,
llm: Dict
) -> str:
feedback_parts = []
if quality['overall_score'] >= 0.8:
feedback_parts.append("文章质量优秀,语言表达流畅。")
elif quality['overall_score'] >= 0.6:
feedback_parts.append("文章质量良好,语言表达基本清晰。")
else:
feedback_parts.append("文章质量有待提高,建议加强语言训练。")
if content['overall_score'] >= 0.8:
feedback_parts.append("内容充实,论证有力。")
elif content['overall_score'] >= 0.6:
feedback_parts.append("内容基本完整,论证较为充分。")
else:
feedback_parts.append("内容需要进一步充实和完善。")
feedback_parts.append(llm['feedback'])
return ' '.join(feedback_parts)
def generate_suggestions(
self,
quality: Dict,
content: Dict
) -> List[str]:
suggestions = []
if quality['grammar_score'] < 0.7:
suggestions.append("注意语法错误,建议仔细检查。")
if quality['vocabulary_score'] < 0.7:
suggestions.append("丰富词汇量,使用更多样化的表达。")
if content['structure_score'] < 0.7:
suggestions.append("优化文章结构,使逻辑更加清晰。")
if content['relevance_score'] < 0.7:
suggestions.append("确保内容紧扣题目,避免偏离主题。")
return suggestions
class QualityAnalyzer:
def __init__(self):
self.grammar_checker = GrammarChecker()
self.vocabulary_analyzer = VocabularyAnalyzer()
self.readability_calculator = ReadabilityCalculator()
async def analyze(self, text: str) -> Dict:
grammar_score = await self.grammar_checker.check(text)
vocabulary_score = self.vocabulary_analyzer.analyze(text)
readability_score = self.readability_calculator.calculate(text)
overall_score = (
grammar_score * 0.4 +
vocabulary_score * 0.3 +
readability_score * 0.3
)
return {
'grammar_score': grammar_score,
'vocabulary_score': vocabulary_score,
'readability_score': readability_score,
'overall_score': overall_score
}
class GrammarChecker:
async def check(self, text: str) -> float:
errors = self.detect_errors(text)
sentences = text.split('。')
total_sentences = len(sentences)
if total_sentences == 0:
return 1.0
error_rate = errors / total_sentences
return max(0, 1 - error_rate)
def detect_errors(self, text: str) -> int:
errors = 0
if '的的' in text:
errors += 1
if '了了' in text:
errors += 1
return errors
class VocabularyAnalyzer:
def analyze(self, text: str) -> float:
import jieba
words = jieba.lcut(text)
unique_words = set(words)
if not words:
return 0.0
diversity = len(unique_words) / len(words)
return min(1.0, diversity * 2)
class ReadabilityCalculator:
def calculate(self, text: str) -> float:
sentences = text.split('。')
if not sentences:
return 0.0
avg_length = np.mean([len(s) for s in sentences])
if avg_length < 10:
return 0.6
elif avg_length < 30:
return 1.0
elif avg_length < 50:
return 0.8
else:
return 0.5
class ContentAnalyzer:
def __init__(self):
self.relevance_checker = RelevanceChecker()
self.structure_analyzer = StructureAnalyzer()
self.depth_evaluator = DepthEvaluator()
async def analyze(self, text: str, requirements: List[str]) -> Dict:
relevance_score = await self.relevance_checker.check(
text,
requirements
)
structure_score = self.structure_analyzer.analyze(text)
depth_score = self.depth_evaluator.evaluate(text)
overall_score = (
relevance_score * 0.4 +
structure_score * 0.3 +
depth_score * 0.3
)
return {
'relevance_score': relevance_score,
'structure_score': structure_score,
'depth_score': depth_score,
'overall_score': overall_score
}
class RelevanceChecker:
async def check(self, text: str, requirements: List[str]) -> float:
if not requirements:
return 1.0
covered = sum(1 for req in requirements if req in text)
return covered / len(requirements)
class StructureAnalyzer:
def analyze(self, text: str) -> float:
has_intro = any(
marker in text[:100]
for marker in ['首先', '第一', '引言', '开始']
)
has_body = len(text) > 200
has_conclusion = any(
marker in text[-100:]
for marker in ['总之', '综上', '结论', '结束']
)
score = sum([has_intro, has_body, has_conclusion]) / 3
return score
class DepthEvaluator:
def evaluate(self, text: str) -> float:
depth_indicators = [
'分析', '论证', '探讨', '研究', '深入',
'因为', '所以', '因此', '然而', '但是'
]
count = sum(1 for indicator in depth_indicators if indicator in text)
return min(1.0, count / 5)
class ProgrammingGrader:
def __init__(self):
self.syntax_checker = SyntaxChecker()
self.test_runner = TestRunner()
self.code_analyzer = CodeAnalyzer()
self.style_checker = StyleChecker()
async def grade(
self,
question: Dict,
code: str
) -> Dict:
syntax_check = await self.syntax_checker.check(code)
test_results = await self.test_runner.run_tests(
code,
question['test_cases']
)
code_analysis = await self.code_analyzer.analyze(code)
style_check = await self.style_checker.check(code)
final_score = self.calculate_final_score(
syntax_check,
test_results,
code_analysis,
style_check
)
score = question['points'] * final_score
feedback = self.generate_feedback(
syntax_check,
test_results,
code_analysis,
style_check
)
return {
'question_id': question['id'],
'question_type': 'programming',
'code': code,
'syntax_check': syntax_check,
'test_results': test_results,
'code_analysis': code_analysis,
'style_check': style_check,
'final_score': final_score,
'score': score,
'max_score': question['points'],
'feedback': feedback,
'suggestions': self.generate_suggestions(
syntax_check,
test_results,
code_analysis,
style_check
)
}
def calculate_final_score(
self,
syntax: Dict,
tests: Dict,
analysis: Dict,
style: Dict
) -> float:
syntax_score = 1.0 if syntax['is_valid'] else 0.0
test_score = tests['pass_rate']
analysis_score = analysis['overall_score']
style_score = style['overall_score']
return (
syntax_score * 0.3 +
test_score * 0.4 +
analysis_score * 0.2 +
style_score * 0.1
)
def generate_feedback(
self,
syntax: Dict,
tests: Dict,
analysis: Dict,
style: Dict
) -> str:
feedback_parts = []
if not syntax['is_valid']:
feedback_parts.append(f"语法错误:{syntax['error']}")
else:
feedback_parts.append("语法正确。")
feedback_parts.append(
f"测试通过率:{tests['pass_rate']:.1%}"
)
if analysis['overall_score'] >= 0.8:
feedback_parts.append("代码质量优秀。")
elif analysis['overall_score'] >= 0.6:
feedback_parts.append("代码质量良好。")
else:
feedback_parts.append("代码质量有待提高。")
return ' '.join(feedback_parts)
def generate_suggestions(
self,
syntax: Dict,
tests: Dict,
analysis: Dict,
style: Dict
) -> List[str]:
suggestions = []
if not syntax['is_valid']:
suggestions.append("修复语法错误。")
if tests['pass_rate'] < 1.0:
suggestions.append("检查失败的测试用例。")
if analysis['complexity'] > 10:
suggestions.append("考虑简化复杂的代码逻辑。")
if style['overall_score'] < 0.7:
suggestions.append("遵循代码规范和最佳实践。")
return suggestions
class SyntaxChecker:
async def check(self, code: str) -> Dict:
try:
compile(code, '<string>', 'exec')
return {'is_valid': True, 'error': None}
except SyntaxError as e:
return {'is_valid': False, 'error': str(e)}
class TestRunner:
async def run_tests(self, code: str, test_cases: List[Dict]) -> Dict:
passed = 0
failed = 0
results = []
for test_case in test_cases:
try:
exec_globals = {}
exec(code, exec_globals)
function = exec_globals.get(test_case['function_name'])
if function:
result = function(*test_case['input'])
if result == test_case['expected_output']:
passed += 1
results.append({
'test_case': test_case,
'status': 'passed',
'result': result
})
else:
failed += 1
results.append({
'test_case': test_case,
'status': 'failed',
'expected': test_case['expected_output'],
'actual': result
})
else:
failed += 1
results.append({
'test_case': test_case,
'status': 'error',
'error': 'Function not found'
})
except Exception as e:
failed += 1
results.append({
'test_case': test_case,
'status': 'error',
'error': str(e)
})
total = passed + failed
return {
'passed': passed,
'failed': failed,
'total': total,
'pass_rate': passed / total if total > 0 else 0,
'results': results
}
class CodeAnalyzer:
async def analyze(self, code: str) -> Dict:
lines = code.split('\n')
complexity = self.calculate_complexity(code)
comments_ratio = self.calculate_comments_ratio(code, lines)
function_count = self.count_functions(code)
overall_score = self.calculate_overall_score(
complexity,
comments_ratio,
function_count
)
return {
'complexity': complexity,
'comments_ratio': comments_ratio,
'function_count': function_count,
'overall_score': overall_score
}
def calculate_complexity(self, code: str) -> int:
complexity_keywords = ['if', 'for', 'while', 'try', 'except', 'and', 'or']
complexity = sum(
code.count(keyword)
for keyword in complexity_keywords
)
return complexity
def calculate_comments_ratio(self, code: str, lines: List[str]) -> float:
comment_lines = sum(
1 for line in lines
if line.strip().startswith('#')
)
if not lines:
return 0.0
return comment_lines / len(lines)
def count_functions(self, code: str) -> int:
return code.count('def ')
def calculate_overall_score(
self,
complexity: int,
comments_ratio: float,
function_count: int
) -> float:
complexity_score = max(0, 1 - complexity / 20)
comments_score = min(1.0, comments_ratio * 3)
function_score = min(1.0, function_count / 5)
return (
complexity_score * 0.4 +
comments_score * 0.3 +
function_score * 0.3
)
class StyleChecker:
async def check(self, code: str) -> Dict:
lines = code.split('\n')
line_length_score = self.check_line_length(lines)
naming_score = self.check_naming_convention(code)
indentation_score = self.check_indentation(code)
overall_score = (
line_length_score * 0.4 +
naming_score * 0.3 +
indentation_score * 0.3
)
return {
'line_length_score': line_length_score,
'naming_score': naming_score,
'indentation_score': indentation_score,
'overall_score': overall_score
}
def check_line_length(self, lines: List[str]) -> float:
max_length = 79
violations = sum(
1 for line in lines
if len(line) > max_length
)
if not lines:
return 1.0
return max(0, 1 - violations / len(lines))
def check_naming_convention(self, code: str) -> float:
import re
function_pattern = r'def\s+([a-z_][a-z0-9_]*)'
variable_pattern = r'([a-z_][a-z0-9_]*)\s*='
function_matches = re.findall(function_pattern, code)
variable_matches = re.findall(variable_pattern, code)
total_matches = len(function_matches) + len(variable_matches)
if total_matches == 0:
return 1.0
valid_matches = 0
for match in function_matches + variable_matches:
if match.islower() or '_' in match:
valid_matches += 1
return valid_matches / total_matches
def check_indentation(self, code: str) -> float:
lines = code.split('\n')
inconsistent = 0
for line in lines:
if line.strip():
spaces = len(line) - len(line.lstrip())
if spaces % 4 != 0:
inconsistent += 1
if not lines:
return 1.0
return max(0, 1 - inconsistent / len(lines))
class FeedbackGenerator:
async def generate(
self,
results: List[Dict],
plagiarism_check: Dict
) -> Dict:
overall_feedback = self.generate_overall_feedback(results)
question_feedback = [
self.generate_question_feedback(result)
for result in results
]
improvement_suggestions = self.generate_improvement_suggestions(
results
)
return {
'overall': overall_feedback,
'question_feedback': question_feedback,
'improvement_suggestions': improvement_suggestions,
'plagiarism_warning': self.generate_plagiarism_warning(
plagiarism_check
)
}
def generate_overall_feedback(self, results: List[Dict]) -> str:
total_score = sum(r['score'] for r in results)
max_score = sum(r['max_score'] for r in results)
percentage = (total_score / max_score) * 100 if max_score > 0 else 0
if percentage >= 90:
return f"优秀!你的得分是{percentage:.1f}%。继续保持!"
elif percentage >= 80:
return f"很好!你的得分是{percentage:.1f}%。还有提升空间。"
elif percentage >= 70:
return f"良好!你的得分是{percentage:.1f}%。需要更加努力。"
elif percentage >= 60:
return f"及格!你的得分是{percentage:.1f}%。建议复习相关内容。"
else:
return f"需要加强!你的得分是{percentage:.1f}%。请认真复习。"
def generate_question_feedback(self, result: Dict) -> str:
if result['is_correct']:
return f"问题{result['question_id']}:回答正确。"
else:
return f"问题{result['question_id']}:{result['feedback']}"
def generate_improvement_suggestions(self, results: List[Dict]) -> List[str]:
suggestions = []
weak_areas = [
r['question_type'] for r in results
if not r.get('is_correct', True)
]
if 'multiple_choice' in weak_areas:
suggestions.append("加强对基础知识的理解和记忆。")
if 'short_answer' in weak_areas:
suggestions.append("练习简答题的答题技巧,提高表达能力。")
if 'essay' in weak_areas:
suggestions.append("多读多写,提高写作能力和逻辑思维。")
if 'programming' in weak_areas:
suggestions.append("多做编程练习,熟悉常见算法和数据结构。")
return suggestions
def generate_plagiarism_warning(self, plagiarism_check: Dict) -> Optional[str]:
if plagiarism_check.get('is_plagiarized', False):
return f"警告:检测到可能的抄袭行为,相似度为{plagiarism['similarity']:.1%}。"
return None
class PlagiarismDetector:
def __init__(self):
self.similarity_calculator = SimilarityCalculator()
self.database = PlagiarismDatabase()
async def check(self, submission: Dict) -> Dict:
max_similarity = 0
matched_source = None
for answer in submission['answers']:
similarity, source = await self.check_answer(answer)
if similarity > max_similarity:
max_similarity = similarity
matched_source = source
is_plagiarized = max_similarity > 0.8
return {
'is_plagiarized': is_plagiarized,
'similarity': max_similarity,
'matched_source': matched_source
}
async def check_answer(self, answer: str) -> tuple:
similar_answers = await self.database.search_similar(answer)
if not similar_answers:
return 0.0, None
best_similarity = max(
self.similarity_calculator.calculate(answer, sa['text'])
for sa in similar_answers
)
best_match = max(
similar_answers,
key=lambda sa: self.similarity_calculator.calculate(answer, sa['text'])
)
return best_similarity, best_match['source']
class SimilarityCalculator:
def calculate(self, text1: str, text2: str) -> float:
from difflib import SequenceMatcher
return SequenceMatcher(None, text1, text2).ratio()
class PlagiarismDatabase:
async def search_similar(self, text: str) -> List[Dict]:
return []2. 智能考试评分
功能描述
支持在线考试的自动评分,包括客观题自动评分和主观题AI辅助评分,提供即时成绩反馈。
技术实现
python
from typing import Dict, List, Optional
from datetime import datetime
import numpy as np
class IntelligentExamScoring:
def __init__(self, config: Dict):
self.config = config
self.exam_processor = ExamProcessor()
self.auto_scorer = AutoScorer()
self.ai_scorer = AIScorer()
self.score_normalizer = ScoreNormalizer()
self.report_generator = ReportGenerator()
async def score_exam(
self,
exam_submission: Dict
) -> Dict:
processed_exam = await self.exam_processor.process(exam_submission)
auto_scored = await self.auto_scorer.score(processed_exam)
ai_scored = await self.ai_scorer.score(processed_exam)
normalized_scores = await self.score_normalizer.normalize(
auto_scored,
ai_scored
)
report = await self.report_generator.generate(
exam_submission,
normalized_scores
)
return {
'submission_id': exam_submission['id'],
'student_id': exam_submission['student_id'],
'exam_id': exam_submission['exam_id'],
'scores': normalized_scores,
'report': report,
'scored_at': datetime.now().isoformat()
}
class ExamProcessor:
async def process(self, submission: Dict) -> Dict:
return {
'id': submission['id'],
'student_id': submission['student_id'],
'exam_id': submission['exam_id'],
'questions': submission['questions'],
'answers': submission['answers'],
'metadata': submission.get('metadata', {})
}
class AutoScorer:
async def score(self, exam: Dict) -> Dict:
scores = []
for i, question in enumerate(exam['questions']):
if question['type'] in ['multiple_choice', 'true_false']:
score = self.score_objective(question, exam['answers'][i])
scores.append(score)
return {
'type': 'auto',
'scores': scores,
'total': sum(s['score'] for s in scores),
'max_total': sum(s['max_score'] for s in scores)
}
def score_objective(self, question: Dict, answer: str) -> Dict:
is_correct = answer == question['correct_answer']
return {
'question_id': question['id'],
'score': question['points'] if is_correct else 0,
'max_score': question['points'],
'is_correct': is_correct
}
class AIScorer:
def __init__(self):
self.llm_client = LLMClient()
async def score(self, exam: Dict) -> Dict:
scores = []
for i, question in enumerate(exam['questions']):
if question['type'] in ['short_answer', 'essay']:
score = await self.score_subjective(
question,
exam['answers'][i]
)
scores.append(score)
return {
'type': 'ai',
'scores': scores,
'total': sum(s['score'] for s in scores),
'max_total': sum(s['max_score'] for s in scores)
}
async def score_subjective(
self,
question: Dict,
answer: str
) -> Dict:
prompt = f"""
请对以下答案进行评分:
问题:{question['question']}
参考答案:{question.get('reference_answer', '')}
评分标准:{question.get('rubric', '')}
学生答案:{answer}
请给出:
1. 得分(0-{question['points']}分)
2. 评分理由
3. 改进建议
"""
evaluation = await self.llm_client.generate(prompt)
score = self.extract_score(evaluation, question['points'])
return {
'question_id': question['id'],
'score': score,
'max_score': question['points'],
'evaluation': evaluation
}
def extract_score(self, evaluation: str, max_score: int) -> int:
import re
match = re.search(r'得分[::]\s*(\d+)', evaluation)
if match:
score = int(match.group(1))
return min(score, max_score)
return max_score // 2
class ScoreNormalizer:
async def normalize(
self,
auto_scored: Dict,
ai_scored: Dict
) -> Dict:
total_score = auto_scored['total'] + ai_scored['total']
max_score = auto_scored['max_total'] + ai_scored['max_total']
percentage = (total_score / max_score) * 100 if max_score > 0 else 0
return {
'auto_scored': auto_scored,
'ai_scored': ai_scored,
'total_score': total_score,
'max_score': max_score,
'percentage': percentage,
'grade': self.calculate_grade(percentage)
}
def calculate_grade(self, percentage: float) -> str:
if percentage >= 90:
return 'A'
elif percentage >= 80:
return 'B'
elif percentage >= 70:
return 'C'
elif percentage >= 60:
return 'D'
else:
return 'F'
class ReportGenerator:
async def generate(
self,
submission: Dict,
scores: Dict
) -> Dict:
summary = self.generate_summary(scores)
analysis = self.generate_analysis(scores)
recommendations = self.generate_recommendations(scores)
return {
'summary': summary,
'analysis': analysis,
'recommendations': recommendations
}
def generate_summary(self, scores: Dict) -> str:
return f"""
考试总结:
- 总分:{scores['total_score']}/{scores['max_score']}
- 百分比:{scores['percentage']:.1f}%
- 等级:{scores['grade']}
"""
def generate_analysis(self, scores: Dict) -> Dict:
return {
'strengths': [],
'weaknesses': [],
'performance_trend': 'stable'
}
def generate_recommendations(self, scores: Dict) -> List[str]:
recommendations = []
if scores['percentage'] < 60:
recommendations.append("建议复习基础知识")
return recommendations3. 学习效果分析
功能描述
分析学生的学习数据,评估学习效果,识别学习困难,提供个性化学习建议。
技术实现
python
from typing import Dict, List, Optional
from datetime import datetime, timedelta
import numpy as np
class LearningEffectivenessAnalyzer:
def __init__(self, config: Dict):
self.config = config
self.data_collector = DataCollector()
self.performance_analyzer = PerformanceAnalyzer()
self.progress_tracker = ProgressTracker()
self.recommendation_engine = RecommendationEngine()
async def analyze(
self,
student_id: str,
time_period: Optional[Dict] = None
) -> Dict:
learning_data = await self.data_collector.collect(
student_id,
time_period
)
performance = await self.performance_analyzer.analyze(
learning_data
)
progress = await self.progress_tracker.track(
student_id,
learning_data
)
recommendations = await self.recommendation_engine.generate(
student_id,
performance,
progress
)
return {
'student_id': student_id,
'time_period': time_period,
'performance': performance,
'progress': progress,
'recommendations': recommendations,
'analyzed_at': datetime.now().isoformat()
}
class DataCollector:
async def collect(
self,
student_id: str,
time_period: Optional[Dict]
) -> Dict:
return {
'assignments': [],
'exams': [],
'activities': []
}
class PerformanceAnalyzer:
async def analyze(self, data: Dict) -> Dict:
return {
'average_score': 85.0,
'score_trend': 'improving',
'strengths': ['mathematics', 'science'],
'weaknesses': ['writing']
}
class ProgressTracker:
async def track(
self,
student_id: str,
data: Dict
) -> Dict:
return {
'skills_mastered': 10,
'skills_in_progress': 5,
'overall_progress': 0.75
}
class RecommendationEngine:
async def generate(
self,
student_id: str,
performance: Dict,
progress: Dict
) -> List[str]:
return [
"加强写作练习",
"继续在数学方面保持优势"
]系统架构
javascript
const automaticAssessmentSystemArchitecture = {
layers: {
presentationLayer: {
components: ['学生界面', '教师界面', '管理后台'],
technologies: ['React', 'Vue.js', 'Ant Design']
},
serviceLayer: {
components: ['批改服务', '评分服务', '分析服务', '报告服务'],
technologies: ['Node.js', 'Python', 'FastAPI']
},
aiLayer: {
components: ['NLP引擎', '代码分析', '相似度检测', '评分模型'],
technologies: ['BERT', 'GPT', 'CodeBERT', 'TensorFlow']
},
dataLayer: {
components: ['作业数据库', '答案数据库', '评分数据库', '抄袭数据库'],
technologies: ['PostgreSQL', 'MongoDB', 'Redis', 'Elasticsearch']
}
}
};最后更新时间:2026-03-10
自动评估系统通过AI驱动的自动批改、智能评分和学习效果分析,大幅提升评估效率和准确性,为教师和学生提供及时、准确的反馈。
