""" Test phase handlers: test_plan_create, test_case_generate, functional_test, performance_test, bug_report, bug_verify """ import json import logging import time import asyncio logger = logging.getLogger(__name__) async def handle_test_case_generate(tenant_id, task_id, step_name, input_data, config): """Auto-generate test cases from requirements and design artifacts.""" requirement = input_data.get("requirement_review", {}).get("output", {}) if not requirement: requirement = input_data.get("requirement_gathering", {}).get("output", {}) table_design = input_data.get("table_design", {}).get("output", {}) api_design = input_data.get("api_design", {}).get("output", {}) test_plan = input_data.get("test_plan_create", {}).get("output", {}) prompt = f"""Generate comprehensive test cases for the following project. ## Requirements: {json.dumps(requirement, ensure_ascii=False, indent=2)} ## Table Design: {json.dumps(table_design.get("models", [])[:5], ensure_ascii=False)} ## API Endpoints: {json.dumps(api_design.get("api_list", [])[:10], ensure_ascii=False)} ## Test Plan: {json.dumps(test_plan, ensure_ascii=False)} Generate test cases covering: 1. **Functional tests** — CRUD operations, business logic, form validation 2. **API tests** — Each endpoint: success, error, edge cases 3. **Performance tests** — Concurrent access, response time, throughput 4. **Integration tests** — Cross-module interactions, RBAC, data isolation Each test case format: {{ "case_name": "Test X", "case_type": "functional|performance|api|integration", "priority": "P0|P1|P2|P3", "precondition": "...", "steps": ["step1", "step2", ...], "expected_result": "..." }} Output JSON with key "test_cases" containing array of test cases. """ from pipeline_service.llm_bridge import call_llm result = await call_llm(tenant_id, prompt, config.get("llm_model", "default")) try: cases_data = json.loads(result) except json.JSONDecodeError: import re json_match = re.search(r'\{.*"test_cases".*\}', result, re.DOTALL) if json_match: cases_data = json.loads(json_match.group()) else: cases_data = {"test_cases": []} cases = cases_data.get("test_cases", []) return { "test_cases": cases, "case_count": len(cases), "by_type": _count_by_key(cases, "case_type"), "by_priority": _count_by_key(cases, "priority"), } async def handle_functional_test(tenant_id, task_id, step_name, input_data, config): """Execute functional test cases against the deployed test environment.""" test_cases_output = input_data.get("test_case_generate", {}).get("output", {}) cases = test_cases_output.get("test_cases", []) # Filter functional + api + integration cases func_cases = [c for c in cases if c.get("case_type") in ("functional", "api", "integration")] results = [] passed = 0 failed = 0 errors = [] for case in func_cases: case_result = await _execute_test_case(case, config) results.append(case_result) if case_result["status"] == "pass": passed += 1 else: failed += 1 if case_result.get("error"): errors.append({ "case_name": case["case_name"], "error": case_result["error"], }) return { "total": len(func_cases), "passed": passed, "failed": failed, "pass_rate": round(passed / max(len(func_cases), 1) * 100, 1), "results": results, "errors": errors, } async def handle_performance_test(tenant_id, task_id, step_name, input_data, config): """Execute performance test cases.""" test_cases_output = input_data.get("test_case_generate", {}).get("output", {}) cases = test_cases_output.get("test_cases", []) # Filter performance cases perf_cases = [c for c in cases if c.get("case_type") == "performance"] if not perf_cases: # Generate default performance tests if none specified perf_cases = _default_performance_cases(config) results = [] for case in perf_cases: case_result = await _execute_performance_case(case, config) results.append(case_result) # Aggregate metrics avg_response = sum(r.get("avg_response_ms", 0) for r in results) / max(len(results), 1) max_response = max((r.get("max_response_ms", 0) for r in results), default=0) total_throughput = sum(r.get("requests_per_sec", 0) for r in results) / max(len(results), 1) return { "total": len(perf_cases), "results": results, "metrics": { "avg_response_ms": round(avg_response, 2), "max_response_ms": round(max_response, 2), "avg_throughput_rps": round(total_throughput, 2), }, "passed": sum(1 for r in results if r.get("status") == "pass"), "failed": sum(1 for r in results if r.get("status") == "fail"), } async def handle_bug_report(tenant_id, task_id, step_name, input_data, config): """Collect test results and generate bug reports.""" func_output = input_data.get("functional_test", {}).get("output", {}) perf_output = input_data.get("performance_test", {}).get("output", {}) bugs = [] # Bugs from functional test failures for error in func_output.get("errors", []): bugs.append({ "title": f"[功能测试失败] {error['case_name']}", "description": error.get("error", "Unknown error"), "severity": "major", "priority": "P1", "reporter_type": "agent", "reporter_id": "system", "step_name": step_name, }) # Bugs from performance test failures for result in perf_output.get("results", []): if result.get("status") == "fail": bugs.append({ "title": f"[性能测试失败] {result.get('case_name', 'Unknown')}", "description": json.dumps(result.get("metrics", {}), ensure_ascii=False), "severity": "major", "priority": "P1", "reporter_type": "agent", "reporter_id": "system", "step_name": step_name, }) return { "bugs": bugs, "bug_count": len(bugs), "by_severity": _count_by_key(bugs, "severity"), } async def handle_bug_verify(tenant_id, task_id, step_name, input_data, config): """Verify bug fixes by re-running related test cases.""" bug_fix_output = input_data.get("bug_fix", {}).get("output", {}) fix_commit = bug_fix_output.get("commit_sha", "") if not fix_commit: raise ValueError("No fix commit provided from bug_fix step") # Re-run the failing test cases func_output = input_data.get("functional_test", {}).get("output", {}) failed_cases = [ r for r in func_output.get("results", []) if r.get("status") == "fail" ] results = [] for case in failed_cases: # Re-execute the test rerun = await _execute_test_case( {"case_name": case.get("case_name", ""), "steps": case.get("steps", [])}, config, ) results.append(rerun) verified = sum(1 for r in results if r.get("status") == "pass") still_failing = sum(1 for r in results if r.get("status") == "fail") return { "total_verified": len(results), "verified_passed": verified, "still_failing": still_failing, "fix_commit": fix_commit, "results": results, } # --- Helper functions --- def _count_by_key(items, key): counts = {} for item in items: val = item.get(key, "unknown") counts[val] = counts.get(val, 0) + 1 return counts async def _execute_test_case(case, config): """Execute a single test case. Returns result dict.""" start_time = time.time() case_name = case.get("case_name", "Unknown") steps = case.get("steps", []) try: # Simulate test execution (real implementation would call endpoints) base_url = config.get("test_base_url", "http://localhost:9090") results = [] for step in steps: # In real implementation, this would: # 1. Parse the step to determine HTTP action # 2. Execute the HTTP request # 3. Validate the response results.append({"step": step, "status": "pass"}) elapsed_ms = int((time.time() - start_time) * 1000) all_passed = all(r["status"] == "pass" for r in results) return { "case_name": case_name, "status": "pass" if all_passed else "fail", "duration_ms": elapsed_ms, "steps": results, } except Exception as e: elapsed_ms = int((time.time() - start_time) * 1000) return { "case_name": case_name, "status": "fail", "duration_ms": elapsed_ms, "error": str(e), } async def _execute_performance_case(case, config): """Execute a performance test case.""" start_time = time.time() case_name = case.get("case_name", "Performance Test") try: base_url = config.get("test_base_url", "http://localhost:9090") concurrency = case.get("concurrency", 10) duration_sec = case.get("duration_sec", 30) endpoint = case.get("endpoint", "/health") # Simulate load testing # Real implementation would use aiohttp/locust request_count = concurrency * duration_sec avg_response_ms = 50.0 # simulated max_response_ms = 200.0 # simulated rps = request_count / duration_sec elapsed_ms = int((time.time() - start_time) * 1000) # Check thresholds threshold_rps = case.get("threshold_rps", 100) threshold_avg_ms = case.get("threshold_avg_ms", 500) passed = rps >= threshold_rps and avg_response_ms <= threshold_avg_ms return { "case_name": case_name, "status": "pass" if passed else "fail", "duration_ms": elapsed_ms, "metrics": { "requests": request_count, "avg_response_ms": avg_response_ms, "max_response_ms": max_response_ms, "requests_per_sec": rps, }, } except Exception as e: return { "case_name": case_name, "status": "fail", "error": str(e), } def _default_performance_cases(config): """Generate default performance test cases.""" return [ { "case_name": "API响应时间基准", "case_type": "performance", "endpoint": "/health", "concurrency": 10, "duration_sec": 30, "threshold_rps": 100, "threshold_avg_ms": 200, }, { "case_name": "CRUD列表性能", "case_type": "performance", "endpoint": "/api/list", "concurrency": 20, "duration_sec": 60, "threshold_rps": 50, "threshold_avg_ms": 500, }, { "case_name": "并发写入压力", "case_type": "performance", "endpoint": "/api/create", "concurrency": 5, "duration_sec": 30, "threshold_rps": 20, "threshold_avg_ms": 1000, }, ]