""" Development phase handlers: code_generate, code_compliance_check, code_auto_fix """ import json import logging import os import py_compile import tempfile import re logger = logging.getLogger(__name__) async def handle_code_generate(tenant_id, task_id, step_name, input_data, config): """Generate complete module skeleton from design artifacts.""" table_design = input_data.get("table_design", {}).get("output", {}) crud_design = input_data.get("crud_design", {}).get("output", {}) api_design = input_data.get("api_design", {}).get("output", {}) models = table_design.get("models", []) cruds = crud_design.get("cruds", []) api_specs = api_design.get("dspy_specs", []) module_name = config.get("module_name", "new_module") prompt = f"""Generate a complete Sage module based on the following design artifacts. ## Table Definitions (models/*.json): {json.dumps(models, ensure_ascii=False, indent=2)} ## CRUD Definitions (json/*.json): {json.dumps(cruds, ensure_ascii=False, indent=2)} ## API Specifications: {json.dumps(api_specs, ensure_ascii=False, indent=2)} Module name: {module_name} Generate the following files: 1. **{module_name}/init.py** — Module init with load_{module_name}() function - Register all CRUD functions with ServerEnv - Include both singular and plural function names 2. **{module_name}/__init__.py** — Import all public functions from init.py 3. **wwwroot/api/*.dspy** — All API endpoint files - NO imports (pre-loaded: json, datetime, debug, DBPools, get_user, params_kw, getID, etc.) - Use return instead of print - Use getID() instead of uuid() - Use `async with get_sor_context(request._run_ns, dbname) as sor:` pattern - sor.U/I/C/D with correct parameter counts (sor.U = 2 params: tablename + data dict with id inside) 4. **wwwroot/index.ui** — Module entry page (bricks JSON format) 5. **pyproject.toml** — Package config 6. **build.sh** — Build script 7. **scripts/load_path.py** — RBAC path registration (NO wildcards, explicit paths only) Output a JSON object with key "files" containing: {{"path": "relative/path", "content": "file content"}} Follow module-development-spec strictly. """ from pipeline_service.llm_bridge import call_llm result = await call_llm(tenant_id, prompt, config.get("llm_model", "default")) try: code_data = json.loads(result) except json.JSONDecodeError: json_match = re.search(r'\{.*"files".*\}', result, re.DOTALL) if json_match: code_data = json.loads(json_match.group()) else: raise ValueError(f"LLM returned non-JSON: {result[:200]}") files = code_data.get("files", []) return { "files": files, "file_count": len(files), "module_name": module_name, "generated_by": "llm", "needs_review": True, } async def handle_code_compliance_check(tenant_id, task_id, step_name, input_data, config): """Check generated code against all module development specs.""" code_output = input_data.get("code_generate", {}).get("output", {}) files = code_output.get("files", []) if not files: raise ValueError("No code files found from code_generate step") report = { "total_files": len(files), "violations": [], "auto_fixable": [], "summary": {"pass": 0, "fail": 0, "warning": 0}, } for file_entry in files: path = file_entry.get("path", "") content = file_entry.get("content", "") file_violations = [] if path.endswith(".py"): file_violations = _check_python_file(path, content) elif path.endswith(".dspy"): file_violations = _check_dspy_file(path, content) elif path.endswith(".json") and "/models/" in path: file_violations = _check_model_json(path, content) elif path.endswith(".json") and "/json/" in path: file_violations = _check_crud_json(path, content) elif path == "build.sh" or path.endswith("build.sh"): file_violations = _check_build_sh(path, content) elif path == "pyproject.toml": file_violations = _check_pyproject(path, content) elif "load_path.py" in path: file_violations = _check_load_path(path, content) for v in file_violations: v["file"] = path if v.get("auto_fixable"): report["auto_fixable"].append(v) report["violations"].append(v) if not file_violations: report["summary"]["pass"] += 1 else: report["summary"]["fail"] += 1 report["summary"]["total_violations"] = len(report["violations"]) report["summary"]["auto_fixable_count"] = len(report["auto_fixable"]) return report async def handle_code_auto_fix(tenant_id, task_id, step_name, input_data, config): """Auto-fix fixable compliance violations.""" check_output = input_data.get("code_compliance_check", {}).get("output", {}) code_output = input_data.get("code_generate", {}).get("output", {}) auto_fixable = check_output.get("auto_fixable", []) if not auto_fixable: return {"fixed_count": 0, "files": code_output.get("files", [])} files = code_output.get("files", []) fixed_count = 0 fix_log = [] # Group violations by file violations_by_file = {} for v in auto_fixable: fpath = v.get("file", "") violations_by_file.setdefault(fpath, []).append(v) for file_entry in files: path = file_entry.get("path", "") if path not in violations_by_file: continue content = file_entry.get("content", "") for v in violations_by_file[path]: rule = v.get("rule", "") new_content = _apply_fix(content, rule, v) if new_content != content: file_entry["content"] = new_content content = new_content fixed_count += 1 fix_log.append({"file": path, "rule": rule, "fix": v.get("message", "")}) return { "fixed_count": fixed_count, "fix_log": fix_log, "files": files, "remaining_violations": check_output.get("summary", {}).get("total_violations", 0) - fixed_count, } # --- Compliance checkers --- def _check_python_file(path, content): """Check Python file compliance.""" violations = [] # Syntax check try: with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f: f.write(content) f.flush() py_compile.compile(f.name, doraise=True) os.unlink(f.name) except py_compile.PyCompileError as e: violations.append({ "rule": "py_syntax", "severity": "error", "message": f"Syntax error: {str(e)[:200]}", "auto_fixable": False, }) # Check for print() instead of return/logging if re.search(r'^\s*print\s*\(', content, re.MULTILINE): violations.append({ "rule": "no_print", "severity": "error", "message": "Use logger/logging instead of print()", "auto_fixable": True, }) return violations def _check_dspy_file(path, content): """Check .dspy file compliance.""" violations = [] # No imports import_lines = re.findall(r'^(import |from .+ import )', content, re.MULTILINE) if import_lines: # Allow only sqlor.filter import for line in import_lines: if "from sqlor.filter import" not in line: violations.append({ "rule": "dspy_no_import", "severity": "error", "message": f"Import not allowed in .dspy: {line.strip()}", "auto_fixable": True, }) # No print() if re.search(r'^\s*print\s*\(', content, re.MULTILINE): violations.append({ "rule": "dspy_no_print", "severity": "error", "message": "Use return instead of print() in .dspy", "auto_fixable": True, }) # No uuid() if "uuid" in content.lower() and "getID()" not in content: violations.append({ "rule": "dspy_no_uuid", "severity": "error", "message": "Use getID() instead of uuid", "auto_fixable": True, }) # No shebang if content.startswith("#!"): violations.append({ "rule": "dspy_no_shebang", "severity": "warning", "message": "Remove shebang line from .dspy", "auto_fixable": True, }) # Check ServerEnv usage if "ServerEnv()" in content: violations.append({ "rule": "dspy_no_serverenv", "severity": "error", "message": "Do not use ServerEnv() in .dspy - functions are pre-loaded", "auto_fixable": True, }) # Check get_user() without await if re.search(r'(? 0: s = data["summary"][0] pk = s.get("primary") if pk and not isinstance(pk, list): violations.append({ "rule": "model_primary_array", "severity": "error", "message": f"'primary' must be array, got {type(pk).__name__}: {pk}", "auto_fixable": True, }) if "fields" in data and isinstance(data["fields"], list): for f in data["fields"]: ftype = f.get("type", "") fname = f.get("name", "?") if ftype in ("float", "double", "ddouble"): if not f.get("dec"): violations.append({ "rule": "model_dec_required", "severity": "error", "message": f"Field '{fname}' type={ftype} missing 'dec'", "auto_fixable": False, }) dec_val = f.get("dec") if isinstance(dec_val, str): violations.append({ "rule": "model_dec_integer", "severity": "error", "message": f"Field '{fname}' dec must be integer, got string", "auto_fixable": True, }) if "indexes" in data: for idx in data.get("indexes", []): idxfields = idx.get("idxfields") if idxfields and not isinstance(idxfields, list): violations.append({ "rule": "model_idxfields_array", "severity": "error", "message": f"Index '{idx.get('name')}' idxfields must be array", "auto_fixable": True, }) if "codes" in data: for code in data.get("codes", []): if code.get("table") == "appcodes_kv": cond = code.get("cond", "") if cond and "id=" in cond and "parentid=" not in cond: violations.append({ "rule": "model_codes_parentid", "severity": "error", "message": f"Code field '{code.get('field')}' uses id= instead of parentid=", "auto_fixable": True, }) return violations def _check_crud_json(path, content): """Check CRUD JSON against crud-definition-spec.""" violations = [] try: data = json.loads(content) except json.JSONDecodeError as e: violations.append({ "rule": "json_syntax", "severity": "error", "message": f"Invalid JSON: {str(e)[:100]}", "auto_fixable": False, }) return violations if "tablename" in data: violations.append({ "rule": "crud_tblname_not_tablename", "severity": "error", "message": "Use 'tblname' not 'tablename'", "auto_fixable": True, }) if "tblname" not in data: violations.append({ "rule": "crud_tblname_required", "severity": "error", "message": "Missing 'tblname' root key", "auto_fixable": False, }) params = data.get("params", {}) if "editable" not in params: violations.append({ "rule": "crud_editable_required", "severity": "error", "message": "Missing 'editable' section in params", "auto_fixable": False, }) return violations def _check_build_sh(path, content): """Check build.sh compliance.""" violations = [] if "#!/usr/bin/env bash" not in content and "#!/bin/bash" not in content: violations.append({ "rule": "build_sh_shebang", "severity": "warning", "message": "Missing bash shebang", "auto_fixable": True, }) if "set -e" not in content: violations.append({ "rule": "build_sh_set_e", "severity": "warning", "message": "Missing 'set -e' for fail-fast", "auto_fixable": True, }) return violations def _check_pyproject(path, content): """Check pyproject.toml compliance.""" violations = [] if "ahserver" in content: violations.append({ "rule": "pyproject_no_ahserver", "severity": "error", "message": "Do not declare ahserver as dependency (installed by build.sh)", "auto_fixable": True, }) return violations def _check_load_path(path, content): """Check load_path.py — no wildcards.""" violations = [] if "%" in content or "*" in content: # Exclude comments for line in content.split("\n"): stripped = line.strip() if stripped.startswith("#"): continue if "%" in stripped or "*" in stripped: violations.append({ "rule": "load_path_no_wildcard", "severity": "error", "message": f"Wildcard found in load_path.py: {stripped[:80]}", "auto_fixable": False, }) return violations # --- Auto-fix functions --- def _apply_fix(content, rule, violation): """Apply auto-fix for a known rule violation.""" if rule == "no_print": # Replace print() with logger.info() content = re.sub(r'^(\s*)print\s*\(', r'\1logger.info(', content, flags=re.MULTILINE) elif rule == "dspy_no_print": content = re.sub(r'^(\s*)print\s*\(', r'\1# return ', content, flags=re.MULTILINE) elif rule == "dspy_no_shebang": if content.startswith("#!"): content = content.split("\n", 1)[1] if "\n" in content else "" elif rule == "dspy_no_serverenv": content = re.sub(r'env\s*=\s*ServerEnv\(\)\s*\n?', '', content) elif rule == "model_primary_array": try: data = json.loads(content) pk = data.get("summary", [{}])[0].get("primary") if isinstance(pk, str): data["summary"][0]["primary"] = [pk] content = json.dumps(data, ensure_ascii=False, indent=4) except Exception: pass elif rule == "model_idxfields_array": try: data = json.loads(content) for idx in data.get("indexes", []): if isinstance(idx.get("idxfields"), str): idx["idxfields"] = [idx["idxfields"]] content = json.dumps(data, ensure_ascii=False, indent=4) except Exception: pass elif rule == "model_codes_parentid": try: data = json.loads(content) for code in data.get("codes", []): if code.get("table") == "appcodes_kv": cond = code.get("cond", "") if "id=" in cond and "parentid=" not in cond: code["cond"] = cond.replace("id=", "parentid=") content = json.dumps(data, ensure_ascii=False, indent=4) except Exception: pass elif rule == "crud_tblname_not_tablename": try: data = json.loads(content) if "tablename" in data and "tblname" not in data: data["tblname"] = data.pop("tablename") content = json.dumps(data, ensure_ascii=False, indent=4) except Exception: pass elif rule == "build_sh_shebang": content = "#!/usr/bin/env bash\nset -e\n\n" + content elif rule == "build_sh_set_e": content = content.replace("#!/usr/bin/env bash\n", "#!/usr/bin/env bash\nset -e\n", 1) elif rule == "pyproject_no_ahserver": content = re.sub(r'["\']ahserver["\'],?\s*', '', content) return content