Skip to content

OpenClaw 自动评估系统

自动评估系统是OpenClaw教育解决方案的关键组件,通过AI技术实现自动化作业批改、智能考试评分和学习效果分析,大幅提升评估效率和准确性。

系统概述

自动评估系统基于自然语言处理、计算机视觉和机器学习技术,能够:

  • 自动批改客观题和主观题
  • 智能评分并提供详细反馈
  • 分析学习效果和能力发展
  • 生成个性化学习建议

核心功能

1. 自动作业批改

功能描述

支持多种题型的自动批改,包括选择题、填空题、简答题、编程题等,提供准确的评分和详细的反馈。

技术实现

python
from typing import Dict, List, Optional
from datetime import datetime
import numpy as np

class AutomaticGradingSystem:
    def __init__(self, config: Dict):
        self.config = config
        self.submission_processor = SubmissionProcessor()
        self.question_classifier = QuestionClassifier()
        self.grading_engines = {
            'multiple_choice': MultipleChoiceGrader(),
            'fill_in_blank': FillInBlankGrader(),
            'short_answer': ShortAnswerGrader(),
            'essay': EssayGrader(),
            'programming': ProgrammingGrader()
        }
        self.feedback_generator = FeedbackGenerator()
        self.plagiarism_detector = PlagiarismDetector()
    
    async def grade_submission(
        self,
        submission: Dict
    ) -> Dict:
        processed_submission = await self.submission_processor.process(
            submission
        )
        
        question_types = await self.question_classifier.classify(
            processed_submission['questions']
        )
        
        results = []
        
        for i, question in enumerate(processed_submission['questions']):
            question_type = question_types[i]
            
            grader = self.grading_engines.get(question_type)
            
            if grader:
                result = await grader.grade(
                    question,
                    processed_submission['answers'][i]
                )
                
                results.append(result)
        
        total_score = sum(r['score'] for r in results)
        max_score = sum(r['max_score'] for r in results)
        
        plagiarism_check = await self.plagiarism_detector.check(
            processed_submission
        )
        
        feedback = await self.feedback_generator.generate(
            results,
            plagiarism_check
        )
        
        return {
            'submission_id': submission['id'],
            'student_id': submission['student_id'],
            'assignment_id': submission['assignment_id'],
            'total_score': total_score,
            'max_score': max_score,
            'percentage': (total_score / max_score) * 100 if max_score > 0 else 0,
            'results': results,
            'feedback': feedback,
            'plagiarism_report': plagiarism_check,
            'graded_at': datetime.now().isoformat()
        }

class SubmissionProcessor:
    async def process(self, submission: Dict) -> Dict:
        processed = {
            'id': submission['id'],
            'student_id': submission['student_id'],
            'assignment_id': submission['assignment_id'],
            'questions': submission['questions'],
            'answers': submission['answers'],
            'format': submission.get('format', 'text'),
            'metadata': submission.get('metadata', {})
        }
        
        if processed['format'] == 'image':
            processed = await self.process_image_submission(processed)
        elif processed['format'] == 'code':
            processed = await self.process_code_submission(processed)
        
        return processed
    
    async def process_image_submission(self, submission: Dict) -> Dict:
        return submission
    
    async def process_code_submission(self, submission: Dict) -> Dict:
        return submission

class QuestionClassifier:
    def __init__(self):
        self.patterns = {
            'multiple_choice': ['A.', 'B.', 'C.', 'D.', '选择'],
            'fill_in_blank': ['_____', '填空', '空白'],
            'short_answer': ['简答', '简要回答', '简述'],
            'essay': ['论述', '作文', '论文'],
            'programming': ['编写', '实现', '代码', '程序']
        }
    
    async def classify(self, questions: List[str]) -> List[str]:
        types = []
        
        for question in questions:
            question_type = self.classify_single(question)
            types.append(question_type)
        
        return types
    
    def classify_single(self, question: str) -> str:
        scores = {}
        
        for qtype, patterns in self.patterns.items():
            score = sum(1 for pattern in patterns if pattern in question)
            scores[qtype] = score
        
        if not scores or max(scores.values()) == 0:
            return 'short_answer'
        
        return max(scores, key=scores.get)

class MultipleChoiceGrader:
    def __init__(self):
        self.normalizer = AnswerNormalizer()
    
    async def grade(
        self,
        question: Dict,
        answer: str
    ) -> Dict:
        correct_answer = question['correct_answer']
        normalized_answer = self.normalizer.normalize(answer)
        normalized_correct = self.normalizer.normalize(correct_answer)
        
        if normalized_answer == normalized_correct:
            score = question['points']
            is_correct = True
        else:
            score = 0
            is_correct = False
        
        return {
            'question_id': question['id'],
            'question_type': 'multiple_choice',
            'student_answer': answer,
            'correct_answer': correct_answer,
            'score': score,
            'max_score': question['points'],
            'is_correct': is_correct,
            'feedback': self.generate_feedback(is_correct, correct_answer)
        }
    
    def generate_feedback(self, is_correct: bool, correct_answer: str) -> str:
        if is_correct:
            return "回答正确!"
        else:
            return f"回答错误,正确答案是:{correct_answer}"

class AnswerNormalizer:
    def normalize(self, answer: str) -> str:
        answer = answer.strip().upper()
        
        answer = answer.replace(',', ',').replace('。', '.')
        
        answer = answer.replace(' ', '')
        
        return answer

class FillInBlankGrader:
    def __init__(self):
        self.normalizer = AnswerNormalizer()
        self.fuzzy_matcher = FuzzyMatcher()
    
    async def grade(
        self,
        question: Dict,
        answer: str
    ) -> Dict:
        correct_answers = question['correct_answers']
        
        normalized_answer = self.normalizer.normalize(answer)
        
        best_match = None
        best_score = 0
        
        for correct_answer in correct_answers:
            normalized_correct = self.normalizer.normalize(correct_answer)
            
            if normalized_answer == normalized_correct:
                best_match = correct_answer
                best_score = 1.0
                break
            else:
                similarity = self.fuzzy_matcher.calculate_similarity(
                    normalized_answer,
                    normalized_correct
                )
                
                if similarity > best_score:
                    best_score = similarity
                    best_match = correct_answer
        
        score = question['points'] * best_score
        
        return {
            'question_id': question['id'],
            'question_type': 'fill_in_blank',
            'student_answer': answer,
            'best_match': best_match,
            'similarity': best_score,
            'score': score,
            'max_score': question['points'],
            'feedback': self.generate_feedback(best_score, best_match)
        }
    
    def generate_feedback(self, similarity: float, best_match: str) -> str:
        if similarity >= 1.0:
            return "回答正确!"
        elif similarity >= 0.8:
            return f"基本正确,最佳匹配是:{best_match}"
        elif similarity >= 0.5:
            return f"部分正确,建议答案是:{best_match}"
        else:
            return f"回答不正确,正确答案是:{best_match}"

class FuzzyMatcher:
    def calculate_similarity(self, str1: str, str2: str) -> float:
        from difflib import SequenceMatcher
        
        return SequenceMatcher(None, str1, str2).ratio()

class ShortAnswerGrader:
    def __init__(self):
        self.llm_client = LLMClient()
        self.keyword_matcher = KeywordMatcher()
    
    async def grade(
        self,
        question: Dict,
        answer: str
    ) -> Dict:
        keyword_score = self.keyword_matcher.match(
            answer,
            question['keywords']
        )
        
        llm_evaluation = await self.llm_client.evaluate_short_answer(
            question['question'],
            answer,
            question['reference_answer']
        )
        
        combined_score = self.combine_scores(
            keyword_score,
            llm_evaluation['score']
        )
        
        score = question['points'] * combined_score
        
        return {
            'question_id': question['id'],
            'question_type': 'short_answer',
            'student_answer': answer,
            'keyword_score': keyword_score,
            'llm_score': llm_evaluation['score'],
            'combined_score': combined_score,
            'score': score,
            'max_score': question['points'],
            'feedback': llm_evaluation['feedback'],
            'improvements': llm_evaluation.get('improvements', [])
        }
    
    def combine_scores(self, keyword_score: float, llm_score: float) -> float:
        return keyword_score * 0.4 + llm_score * 0.6

class KeywordMatcher:
    def match(self, answer: str, keywords: List[str]) -> float:
        if not keywords:
            return 1.0
        
        matched = sum(1 for keyword in keywords if keyword in answer)
        
        return matched / len(keywords)

class EssayGrader:
    def __init__(self):
        self.llm_client = LLMClient()
        self.quality_analyzer = QualityAnalyzer()
        self.content_analyzer = ContentAnalyzer()
    
    async def grade(
        self,
        question: Dict,
        essay: str
    ) -> Dict:
        quality_analysis = await self.quality_analyzer.analyze(essay)
        
        content_analysis = await self.content_analyzer.analyze(
            essay,
            question['requirements']
        )
        
        llm_evaluation = await self.llm_client.evaluate_essay(
            question['question'],
            essay,
            question['rubric']
        )
        
        final_score = self.calculate_final_score(
            quality_analysis,
            content_analysis,
            llm_evaluation
        )
        
        score = question['points'] * final_score
        
        feedback = self.generate_comprehensive_feedback(
            quality_analysis,
            content_analysis,
            llm_evaluation
        )
        
        return {
            'question_id': question['id'],
            'question_type': 'essay',
            'essay': essay,
            'quality_analysis': quality_analysis,
            'content_analysis': content_analysis,
            'llm_evaluation': llm_evaluation,
            'final_score': final_score,
            'score': score,
            'max_score': question['points'],
            'feedback': feedback,
            'suggestions': self.generate_suggestions(
                quality_analysis,
                content_analysis
            )
        }
    
    def calculate_final_score(
        self,
        quality: Dict,
        content: Dict,
        llm: Dict
    ) -> float:
        quality_score = quality['overall_score']
        content_score = content['overall_score']
        llm_score = llm['score']
        
        return quality_score * 0.3 + content_score * 0.3 + llm_score * 0.4
    
    def generate_comprehensive_feedback(
        self,
        quality: Dict,
        content: Dict,
        llm: Dict
    ) -> str:
        feedback_parts = []
        
        if quality['overall_score'] >= 0.8:
            feedback_parts.append("文章质量优秀,语言表达流畅。")
        elif quality['overall_score'] >= 0.6:
            feedback_parts.append("文章质量良好,语言表达基本清晰。")
        else:
            feedback_parts.append("文章质量有待提高,建议加强语言训练。")
        
        if content['overall_score'] >= 0.8:
            feedback_parts.append("内容充实,论证有力。")
        elif content['overall_score'] >= 0.6:
            feedback_parts.append("内容基本完整,论证较为充分。")
        else:
            feedback_parts.append("内容需要进一步充实和完善。")
        
        feedback_parts.append(llm['feedback'])
        
        return ' '.join(feedback_parts)
    
    def generate_suggestions(
        self,
        quality: Dict,
        content: Dict
    ) -> List[str]:
        suggestions = []
        
        if quality['grammar_score'] < 0.7:
            suggestions.append("注意语法错误,建议仔细检查。")
        
        if quality['vocabulary_score'] < 0.7:
            suggestions.append("丰富词汇量,使用更多样化的表达。")
        
        if content['structure_score'] < 0.7:
            suggestions.append("优化文章结构,使逻辑更加清晰。")
        
        if content['relevance_score'] < 0.7:
            suggestions.append("确保内容紧扣题目,避免偏离主题。")
        
        return suggestions

class QualityAnalyzer:
    def __init__(self):
        self.grammar_checker = GrammarChecker()
        self.vocabulary_analyzer = VocabularyAnalyzer()
        self.readability_calculator = ReadabilityCalculator()
    
    async def analyze(self, text: str) -> Dict:
        grammar_score = await self.grammar_checker.check(text)
        
        vocabulary_score = self.vocabulary_analyzer.analyze(text)
        
        readability_score = self.readability_calculator.calculate(text)
        
        overall_score = (
            grammar_score * 0.4 +
            vocabulary_score * 0.3 +
            readability_score * 0.3
        )
        
        return {
            'grammar_score': grammar_score,
            'vocabulary_score': vocabulary_score,
            'readability_score': readability_score,
            'overall_score': overall_score
        }

class GrammarChecker:
    async def check(self, text: str) -> float:
        errors = self.detect_errors(text)
        
        sentences = text.split('。')
        total_sentences = len(sentences)
        
        if total_sentences == 0:
            return 1.0
        
        error_rate = errors / total_sentences
        
        return max(0, 1 - error_rate)
    
    def detect_errors(self, text: str) -> int:
        errors = 0
        
        if '的的' in text:
            errors += 1
        
        if '了了' in text:
            errors += 1
        
        return errors

class VocabularyAnalyzer:
    def analyze(self, text: str) -> float:
        import jieba
        
        words = jieba.lcut(text)
        
        unique_words = set(words)
        
        if not words:
            return 0.0
        
        diversity = len(unique_words) / len(words)
        
        return min(1.0, diversity * 2)

class ReadabilityCalculator:
    def calculate(self, text: str) -> float:
        sentences = text.split('。')
        
        if not sentences:
            return 0.0
        
        avg_length = np.mean([len(s) for s in sentences])
        
        if avg_length < 10:
            return 0.6
        elif avg_length < 30:
            return 1.0
        elif avg_length < 50:
            return 0.8
        else:
            return 0.5

class ContentAnalyzer:
    def __init__(self):
        self.relevance_checker = RelevanceChecker()
        self.structure_analyzer = StructureAnalyzer()
        self.depth_evaluator = DepthEvaluator()
    
    async def analyze(self, text: str, requirements: List[str]) -> Dict:
        relevance_score = await self.relevance_checker.check(
            text,
            requirements
        )
        
        structure_score = self.structure_analyzer.analyze(text)
        
        depth_score = self.depth_evaluator.evaluate(text)
        
        overall_score = (
            relevance_score * 0.4 +
            structure_score * 0.3 +
            depth_score * 0.3
        )
        
        return {
            'relevance_score': relevance_score,
            'structure_score': structure_score,
            'depth_score': depth_score,
            'overall_score': overall_score
        }

class RelevanceChecker:
    async def check(self, text: str, requirements: List[str]) -> float:
        if not requirements:
            return 1.0
        
        covered = sum(1 for req in requirements if req in text)
        
        return covered / len(requirements)

class StructureAnalyzer:
    def analyze(self, text: str) -> float:
        has_intro = any(
            marker in text[:100]
            for marker in ['首先', '第一', '引言', '开始']
        )
        
        has_body = len(text) > 200
        
        has_conclusion = any(
            marker in text[-100:]
            for marker in ['总之', '综上', '结论', '结束']
        )
        
        score = sum([has_intro, has_body, has_conclusion]) / 3
        
        return score

class DepthEvaluator:
    def evaluate(self, text: str) -> float:
        depth_indicators = [
            '分析', '论证', '探讨', '研究', '深入',
            '因为', '所以', '因此', '然而', '但是'
        ]
        
        count = sum(1 for indicator in depth_indicators if indicator in text)
        
        return min(1.0, count / 5)

class ProgrammingGrader:
    def __init__(self):
        self.syntax_checker = SyntaxChecker()
        self.test_runner = TestRunner()
        self.code_analyzer = CodeAnalyzer()
        self.style_checker = StyleChecker()
    
    async def grade(
        self,
        question: Dict,
        code: str
    ) -> Dict:
        syntax_check = await self.syntax_checker.check(code)
        
        test_results = await self.test_runner.run_tests(
            code,
            question['test_cases']
        )
        
        code_analysis = await self.code_analyzer.analyze(code)
        
        style_check = await self.style_checker.check(code)
        
        final_score = self.calculate_final_score(
            syntax_check,
            test_results,
            code_analysis,
            style_check
        )
        
        score = question['points'] * final_score
        
        feedback = self.generate_feedback(
            syntax_check,
            test_results,
            code_analysis,
            style_check
        )
        
        return {
            'question_id': question['id'],
            'question_type': 'programming',
            'code': code,
            'syntax_check': syntax_check,
            'test_results': test_results,
            'code_analysis': code_analysis,
            'style_check': style_check,
            'final_score': final_score,
            'score': score,
            'max_score': question['points'],
            'feedback': feedback,
            'suggestions': self.generate_suggestions(
                syntax_check,
                test_results,
                code_analysis,
                style_check
            )
        }
    
    def calculate_final_score(
        self,
        syntax: Dict,
        tests: Dict,
        analysis: Dict,
        style: Dict
    ) -> float:
        syntax_score = 1.0 if syntax['is_valid'] else 0.0
        test_score = tests['pass_rate']
        analysis_score = analysis['overall_score']
        style_score = style['overall_score']
        
        return (
            syntax_score * 0.3 +
            test_score * 0.4 +
            analysis_score * 0.2 +
            style_score * 0.1
        )
    
    def generate_feedback(
        self,
        syntax: Dict,
        tests: Dict,
        analysis: Dict,
        style: Dict
    ) -> str:
        feedback_parts = []
        
        if not syntax['is_valid']:
            feedback_parts.append(f"语法错误:{syntax['error']}")
        else:
            feedback_parts.append("语法正确。")
        
        feedback_parts.append(
            f"测试通过率:{tests['pass_rate']:.1%}"
        )
        
        if analysis['overall_score'] >= 0.8:
            feedback_parts.append("代码质量优秀。")
        elif analysis['overall_score'] >= 0.6:
            feedback_parts.append("代码质量良好。")
        else:
            feedback_parts.append("代码质量有待提高。")
        
        return ' '.join(feedback_parts)
    
    def generate_suggestions(
        self,
        syntax: Dict,
        tests: Dict,
        analysis: Dict,
        style: Dict
    ) -> List[str]:
        suggestions = []
        
        if not syntax['is_valid']:
            suggestions.append("修复语法错误。")
        
        if tests['pass_rate'] < 1.0:
            suggestions.append("检查失败的测试用例。")
        
        if analysis['complexity'] > 10:
            suggestions.append("考虑简化复杂的代码逻辑。")
        
        if style['overall_score'] < 0.7:
            suggestions.append("遵循代码规范和最佳实践。")
        
        return suggestions

class SyntaxChecker:
    async def check(self, code: str) -> Dict:
        try:
            compile(code, '<string>', 'exec')
            return {'is_valid': True, 'error': None}
        except SyntaxError as e:
            return {'is_valid': False, 'error': str(e)}

class TestRunner:
    async def run_tests(self, code: str, test_cases: List[Dict]) -> Dict:
        passed = 0
        failed = 0
        results = []
        
        for test_case in test_cases:
            try:
                exec_globals = {}
                exec(code, exec_globals)
                
                function = exec_globals.get(test_case['function_name'])
                
                if function:
                    result = function(*test_case['input'])
                    
                    if result == test_case['expected_output']:
                        passed += 1
                        results.append({
                            'test_case': test_case,
                            'status': 'passed',
                            'result': result
                        })
                    else:
                        failed += 1
                        results.append({
                            'test_case': test_case,
                            'status': 'failed',
                            'expected': test_case['expected_output'],
                            'actual': result
                        })
                else:
                    failed += 1
                    results.append({
                        'test_case': test_case,
                        'status': 'error',
                        'error': 'Function not found'
                    })
            except Exception as e:
                failed += 1
                results.append({
                    'test_case': test_case,
                    'status': 'error',
                    'error': str(e)
                })
        
        total = passed + failed
        
        return {
            'passed': passed,
            'failed': failed,
            'total': total,
            'pass_rate': passed / total if total > 0 else 0,
            'results': results
        }

class CodeAnalyzer:
    async def analyze(self, code: str) -> Dict:
        lines = code.split('\n')
        
        complexity = self.calculate_complexity(code)
        
        comments_ratio = self.calculate_comments_ratio(code, lines)
        
        function_count = self.count_functions(code)
        
        overall_score = self.calculate_overall_score(
            complexity,
            comments_ratio,
            function_count
        )
        
        return {
            'complexity': complexity,
            'comments_ratio': comments_ratio,
            'function_count': function_count,
            'overall_score': overall_score
        }
    
    def calculate_complexity(self, code: str) -> int:
        complexity_keywords = ['if', 'for', 'while', 'try', 'except', 'and', 'or']
        
        complexity = sum(
            code.count(keyword)
            for keyword in complexity_keywords
        )
        
        return complexity
    
    def calculate_comments_ratio(self, code: str, lines: List[str]) -> float:
        comment_lines = sum(
            1 for line in lines
            if line.strip().startswith('#')
        )
        
        if not lines:
            return 0.0
        
        return comment_lines / len(lines)
    
    def count_functions(self, code: str) -> int:
        return code.count('def ')
    
    def calculate_overall_score(
        self,
        complexity: int,
        comments_ratio: float,
        function_count: int
    ) -> float:
        complexity_score = max(0, 1 - complexity / 20)
        comments_score = min(1.0, comments_ratio * 3)
        function_score = min(1.0, function_count / 5)
        
        return (
            complexity_score * 0.4 +
            comments_score * 0.3 +
            function_score * 0.3
        )

class StyleChecker:
    async def check(self, code: str) -> Dict:
        lines = code.split('\n')
        
        line_length_score = self.check_line_length(lines)
        
        naming_score = self.check_naming_convention(code)
        
        indentation_score = self.check_indentation(code)
        
        overall_score = (
            line_length_score * 0.4 +
            naming_score * 0.3 +
            indentation_score * 0.3
        )
        
        return {
            'line_length_score': line_length_score,
            'naming_score': naming_score,
            'indentation_score': indentation_score,
            'overall_score': overall_score
        }
    
    def check_line_length(self, lines: List[str]) -> float:
        max_length = 79
        
        violations = sum(
            1 for line in lines
            if len(line) > max_length
        )
        
        if not lines:
            return 1.0
        
        return max(0, 1 - violations / len(lines))
    
    def check_naming_convention(self, code: str) -> float:
        import re
        
        function_pattern = r'def\s+([a-z_][a-z0-9_]*)'
        variable_pattern = r'([a-z_][a-z0-9_]*)\s*='
        
        function_matches = re.findall(function_pattern, code)
        variable_matches = re.findall(variable_pattern, code)
        
        total_matches = len(function_matches) + len(variable_matches)
        
        if total_matches == 0:
            return 1.0
        
        valid_matches = 0
        
        for match in function_matches + variable_matches:
            if match.islower() or '_' in match:
                valid_matches += 1
        
        return valid_matches / total_matches
    
    def check_indentation(self, code: str) -> float:
        lines = code.split('\n')
        
        inconsistent = 0
        
        for line in lines:
            if line.strip():
                spaces = len(line) - len(line.lstrip())
                
                if spaces % 4 != 0:
                    inconsistent += 1
        
        if not lines:
            return 1.0
        
        return max(0, 1 - inconsistent / len(lines))

class FeedbackGenerator:
    async def generate(
        self,
        results: List[Dict],
        plagiarism_check: Dict
    ) -> Dict:
        overall_feedback = self.generate_overall_feedback(results)
        
        question_feedback = [
            self.generate_question_feedback(result)
            for result in results
        ]
        
        improvement_suggestions = self.generate_improvement_suggestions(
            results
        )
        
        return {
            'overall': overall_feedback,
            'question_feedback': question_feedback,
            'improvement_suggestions': improvement_suggestions,
            'plagiarism_warning': self.generate_plagiarism_warning(
                plagiarism_check
            )
        }
    
    def generate_overall_feedback(self, results: List[Dict]) -> str:
        total_score = sum(r['score'] for r in results)
        max_score = sum(r['max_score'] for r in results)
        percentage = (total_score / max_score) * 100 if max_score > 0 else 0
        
        if percentage >= 90:
            return f"优秀!你的得分是{percentage:.1f}%。继续保持!"
        elif percentage >= 80:
            return f"很好!你的得分是{percentage:.1f}%。还有提升空间。"
        elif percentage >= 70:
            return f"良好!你的得分是{percentage:.1f}%。需要更加努力。"
        elif percentage >= 60:
            return f"及格!你的得分是{percentage:.1f}%。建议复习相关内容。"
        else:
            return f"需要加强!你的得分是{percentage:.1f}%。请认真复习。"
    
    def generate_question_feedback(self, result: Dict) -> str:
        if result['is_correct']:
            return f"问题{result['question_id']}:回答正确。"
        else:
            return f"问题{result['question_id']}{result['feedback']}"
    
    def generate_improvement_suggestions(self, results: List[Dict]) -> List[str]:
        suggestions = []
        
        weak_areas = [
            r['question_type'] for r in results
            if not r.get('is_correct', True)
        ]
        
        if 'multiple_choice' in weak_areas:
            suggestions.append("加强对基础知识的理解和记忆。")
        
        if 'short_answer' in weak_areas:
            suggestions.append("练习简答题的答题技巧,提高表达能力。")
        
        if 'essay' in weak_areas:
            suggestions.append("多读多写,提高写作能力和逻辑思维。")
        
        if 'programming' in weak_areas:
            suggestions.append("多做编程练习,熟悉常见算法和数据结构。")
        
        return suggestions
    
    def generate_plagiarism_warning(self, plagiarism_check: Dict) -> Optional[str]:
        if plagiarism_check.get('is_plagiarized', False):
            return f"警告:检测到可能的抄袭行为,相似度为{plagiarism['similarity']:.1%}。"
        
        return None

class PlagiarismDetector:
    def __init__(self):
        self.similarity_calculator = SimilarityCalculator()
        self.database = PlagiarismDatabase()
    
    async def check(self, submission: Dict) -> Dict:
        max_similarity = 0
        matched_source = None
        
        for answer in submission['answers']:
            similarity, source = await self.check_answer(answer)
            
            if similarity > max_similarity:
                max_similarity = similarity
                matched_source = source
        
        is_plagiarized = max_similarity > 0.8
        
        return {
            'is_plagiarized': is_plagiarized,
            'similarity': max_similarity,
            'matched_source': matched_source
        }
    
    async def check_answer(self, answer: str) -> tuple:
        similar_answers = await self.database.search_similar(answer)
        
        if not similar_answers:
            return 0.0, None
        
        best_similarity = max(
            self.similarity_calculator.calculate(answer, sa['text'])
            for sa in similar_answers
        )
        
        best_match = max(
            similar_answers,
            key=lambda sa: self.similarity_calculator.calculate(answer, sa['text'])
        )
        
        return best_similarity, best_match['source']

class SimilarityCalculator:
    def calculate(self, text1: str, text2: str) -> float:
        from difflib import SequenceMatcher
        
        return SequenceMatcher(None, text1, text2).ratio()

class PlagiarismDatabase:
    async def search_similar(self, text: str) -> List[Dict]:
        return []

2. 智能考试评分

功能描述

支持在线考试的自动评分,包括客观题自动评分和主观题AI辅助评分,提供即时成绩反馈。

技术实现

python
from typing import Dict, List, Optional
from datetime import datetime
import numpy as np

class IntelligentExamScoring:
    def __init__(self, config: Dict):
        self.config = config
        self.exam_processor = ExamProcessor()
        self.auto_scorer = AutoScorer()
        self.ai_scorer = AIScorer()
        self.score_normalizer = ScoreNormalizer()
        self.report_generator = ReportGenerator()
    
    async def score_exam(
        self,
        exam_submission: Dict
    ) -> Dict:
        processed_exam = await self.exam_processor.process(exam_submission)
        
        auto_scored = await self.auto_scorer.score(processed_exam)
        
        ai_scored = await self.ai_scorer.score(processed_exam)
        
        normalized_scores = await self.score_normalizer.normalize(
            auto_scored,
            ai_scored
        )
        
        report = await self.report_generator.generate(
            exam_submission,
            normalized_scores
        )
        
        return {
            'submission_id': exam_submission['id'],
            'student_id': exam_submission['student_id'],
            'exam_id': exam_submission['exam_id'],
            'scores': normalized_scores,
            'report': report,
            'scored_at': datetime.now().isoformat()
        }

class ExamProcessor:
    async def process(self, submission: Dict) -> Dict:
        return {
            'id': submission['id'],
            'student_id': submission['student_id'],
            'exam_id': submission['exam_id'],
            'questions': submission['questions'],
            'answers': submission['answers'],
            'metadata': submission.get('metadata', {})
        }

class AutoScorer:
    async def score(self, exam: Dict) -> Dict:
        scores = []
        
        for i, question in enumerate(exam['questions']):
            if question['type'] in ['multiple_choice', 'true_false']:
                score = self.score_objective(question, exam['answers'][i])
                scores.append(score)
        
        return {
            'type': 'auto',
            'scores': scores,
            'total': sum(s['score'] for s in scores),
            'max_total': sum(s['max_score'] for s in scores)
        }
    
    def score_objective(self, question: Dict, answer: str) -> Dict:
        is_correct = answer == question['correct_answer']
        
        return {
            'question_id': question['id'],
            'score': question['points'] if is_correct else 0,
            'max_score': question['points'],
            'is_correct': is_correct
        }

class AIScorer:
    def __init__(self):
        self.llm_client = LLMClient()
    
    async def score(self, exam: Dict) -> Dict:
        scores = []
        
        for i, question in enumerate(exam['questions']):
            if question['type'] in ['short_answer', 'essay']:
                score = await self.score_subjective(
                    question,
                    exam['answers'][i]
                )
                scores.append(score)
        
        return {
            'type': 'ai',
            'scores': scores,
            'total': sum(s['score'] for s in scores),
            'max_total': sum(s['max_score'] for s in scores)
        }
    
    async def score_subjective(
        self,
        question: Dict,
        answer: str
    ) -> Dict:
        prompt = f"""
请对以下答案进行评分:

问题:{question['question']}
参考答案:{question.get('reference_answer', '')}
评分标准:{question.get('rubric', '')}

学生答案:{answer}

请给出:
1. 得分(0-{question['points']}分)
2. 评分理由
3. 改进建议
"""
        
        evaluation = await self.llm_client.generate(prompt)
        
        score = self.extract_score(evaluation, question['points'])
        
        return {
            'question_id': question['id'],
            'score': score,
            'max_score': question['points'],
            'evaluation': evaluation
        }
    
    def extract_score(self, evaluation: str, max_score: int) -> int:
        import re
        
        match = re.search(r'得分[::]\s*(\d+)', evaluation)
        
        if match:
            score = int(match.group(1))
            return min(score, max_score)
        
        return max_score // 2

class ScoreNormalizer:
    async def normalize(
        self,
        auto_scored: Dict,
        ai_scored: Dict
    ) -> Dict:
        total_score = auto_scored['total'] + ai_scored['total']
        max_score = auto_scored['max_total'] + ai_scored['max_total']
        
        percentage = (total_score / max_score) * 100 if max_score > 0 else 0
        
        return {
            'auto_scored': auto_scored,
            'ai_scored': ai_scored,
            'total_score': total_score,
            'max_score': max_score,
            'percentage': percentage,
            'grade': self.calculate_grade(percentage)
        }
    
    def calculate_grade(self, percentage: float) -> str:
        if percentage >= 90:
            return 'A'
        elif percentage >= 80:
            return 'B'
        elif percentage >= 70:
            return 'C'
        elif percentage >= 60:
            return 'D'
        else:
            return 'F'

class ReportGenerator:
    async def generate(
        self,
        submission: Dict,
        scores: Dict
    ) -> Dict:
        summary = self.generate_summary(scores)
        
        analysis = self.generate_analysis(scores)
        
        recommendations = self.generate_recommendations(scores)
        
        return {
            'summary': summary,
            'analysis': analysis,
            'recommendations': recommendations
        }
    
    def generate_summary(self, scores: Dict) -> str:
        return f"""
考试总结:
- 总分:{scores['total_score']}/{scores['max_score']}
- 百分比:{scores['percentage']:.1f}%
- 等级:{scores['grade']}
"""
    
    def generate_analysis(self, scores: Dict) -> Dict:
        return {
            'strengths': [],
            'weaknesses': [],
            'performance_trend': 'stable'
        }
    
    def generate_recommendations(self, scores: Dict) -> List[str]:
        recommendations = []
        
        if scores['percentage'] < 60:
            recommendations.append("建议复习基础知识")
        
        return recommendations

3. 学习效果分析

功能描述

分析学生的学习数据,评估学习效果,识别学习困难,提供个性化学习建议。

技术实现

python
from typing import Dict, List, Optional
from datetime import datetime, timedelta
import numpy as np

class LearningEffectivenessAnalyzer:
    def __init__(self, config: Dict):
        self.config = config
        self.data_collector = DataCollector()
        self.performance_analyzer = PerformanceAnalyzer()
        self.progress_tracker = ProgressTracker()
        self.recommendation_engine = RecommendationEngine()
    
    async def analyze(
        self,
        student_id: str,
        time_period: Optional[Dict] = None
    ) -> Dict:
        learning_data = await self.data_collector.collect(
            student_id,
            time_period
        )
        
        performance = await self.performance_analyzer.analyze(
            learning_data
        )
        
        progress = await self.progress_tracker.track(
            student_id,
            learning_data
        )
        
        recommendations = await self.recommendation_engine.generate(
            student_id,
            performance,
            progress
        )
        
        return {
            'student_id': student_id,
            'time_period': time_period,
            'performance': performance,
            'progress': progress,
            'recommendations': recommendations,
            'analyzed_at': datetime.now().isoformat()
        }

class DataCollector:
    async def collect(
        self,
        student_id: str,
        time_period: Optional[Dict]
    ) -> Dict:
        return {
            'assignments': [],
            'exams': [],
            'activities': []
        }

class PerformanceAnalyzer:
    async def analyze(self, data: Dict) -> Dict:
        return {
            'average_score': 85.0,
            'score_trend': 'improving',
            'strengths': ['mathematics', 'science'],
            'weaknesses': ['writing']
        }

class ProgressTracker:
    async def track(
        self,
        student_id: str,
        data: Dict
    ) -> Dict:
        return {
            'skills_mastered': 10,
            'skills_in_progress': 5,
            'overall_progress': 0.75
        }

class RecommendationEngine:
    async def generate(
        self,
        student_id: str,
        performance: Dict,
        progress: Dict
    ) -> List[str]:
        return [
            "加强写作练习",
            "继续在数学方面保持优势"
        ]

系统架构

javascript
const automaticAssessmentSystemArchitecture = {
  layers: {
    presentationLayer: {
      components: ['学生界面', '教师界面', '管理后台'],
      technologies: ['React', 'Vue.js', 'Ant Design']
    },
    serviceLayer: {
      components: ['批改服务', '评分服务', '分析服务', '报告服务'],
      technologies: ['Node.js', 'Python', 'FastAPI']
    },
    aiLayer: {
      components: ['NLP引擎', '代码分析', '相似度检测', '评分模型'],
      technologies: ['BERT', 'GPT', 'CodeBERT', 'TensorFlow']
    },
    dataLayer: {
      components: ['作业数据库', '答案数据库', '评分数据库', '抄袭数据库'],
      technologies: ['PostgreSQL', 'MongoDB', 'Redis', 'Elasticsearch']
    }
  }
};

最后更新时间:2026-03-10

自动评估系统通过AI驱动的自动批改、智能评分和学习效果分析,大幅提升评估效率和准确性,为教师和学生提供及时、准确的反馈。