""" SDLC project and iteration management — CRUD logic for sd_projects and sd_iterations. """ import json import logging logger = logging.getLogger(__name__) async def create_sd_project(data): """Create a new SDLC project.""" from pipeline_service.storage import get_storage storage = await get_storage() project_id = data.get("id") if not project_id: from appPublic.uniqueID import getID project_id = getID() data["id"] = project_id await storage.insert("sd_projects", data) return {"id": project_id, "status": "ok"} async def update_sd_project(project_id, data): """Update an existing SDLC project.""" from pipeline_service.storage import get_storage storage = await get_storage() data["id"] = project_id await storage.update("sd_projects", data) return {"id": project_id, "status": "ok"} async def delete_sd_project(project_id): """Delete an SDLC project and its iterations.""" from pipeline_service.storage import get_storage storage = await get_storage() # Cascade delete iterations await storage.delete_where("sd_iterations", {"project_id": project_id}) await storage.delete("sd_projects", project_id) return {"status": "ok"} async def get_sd_project(project_id): """Get a project by ID.""" from pipeline_service.storage import get_storage storage = await get_storage() return await storage.get("sd_projects", project_id) async def list_sd_projects(org_id=None, status=None): """List projects with optional filters.""" from pipeline_service.storage import get_storage storage = await get_storage() filters = {} if org_id: filters["org_id"] = org_id if status: filters["status"] = status return await storage.list("sd_projects", filters=filters, sort_by="created_at desc") async def create_sd_iteration(data): """Create a new iteration for a project.""" from pipeline_service.storage import get_storage storage = await get_storage() iteration_id = data.get("id") if not iteration_id: from appPublic.uniqueID import getID iteration_id = getID() data["id"] = iteration_id await storage.insert("sd_iterations", data) # Auto-submit pipeline task if pipeline_id is configured on the project project = await get_sd_project(data.get("project_id")) if project and project.get("pipeline_id"): from pipeline_service.executor import submit_task task_id = await submit_task( tenant_id=data.get("org_id", "0"), pipeline_id=project["pipeline_id"], params={"iteration_id": iteration_id, "iteration_type": data.get("iteration_type", "new_feature")}, ) data["task_id"] = task_id await storage.update("sd_iterations", {"id": iteration_id, "task_id": task_id}) return {"id": iteration_id, "status": "ok"} async def update_sd_iteration(iteration_id, data): """Update an iteration.""" from pipeline_service.storage import get_storage storage = await get_storage() data["id"] = iteration_id await storage.update("sd_iterations", data) return {"id": iteration_id, "status": "ok"} async def delete_sd_iteration(iteration_id): """Delete an iteration.""" from pipeline_service.storage import get_storage storage = await get_storage() await storage.delete("sd_iterations", iteration_id) return {"status": "ok"} async def list_sd_iterations(project_id=None, status=None): """List iterations with optional project filter.""" from pipeline_service.storage import get_storage storage = await get_storage() filters = {} if project_id: filters["project_id"] = project_id if status: filters["status"] = status return await storage.list("sd_iterations", filters=filters, sort_by="created_at desc") # --- Bug management --- async def create_sd_bug(data): """Create a bug report (agent or human).""" from pipeline_service.storage import get_storage storage = await get_storage() bug_id = data.get("id") if not bug_id: from appPublic.uniqueID import getID bug_id = getID() data["id"] = bug_id if not data.get("status"): data["status"] = "open" if not data.get("reporter_type"): data["reporter_type"] = "human" await storage.insert("sd_bugs", data) return {"id": bug_id, "status": "ok"} async def update_sd_bug(bug_id, data): """Update a bug.""" from pipeline_service.storage import get_storage storage = await get_storage() data["id"] = bug_id await storage.update("sd_bugs", data) return {"id": bug_id, "status": "ok"} async def close_sd_bug(bug_id, verified_by=None): """Close a verified bug.""" from pipeline_service.storage import get_storage from datetime import datetime storage = await get_storage() await storage.update("sd_bugs", { "id": bug_id, "status": "closed", "verified_by": verified_by, "closed_at": datetime.now().isoformat(), }) return {"id": bug_id, "status": "ok"} async def list_sd_bugs(iteration_id=None, status=None, severity=None): """List bugs with filters.""" from pipeline_service.storage import get_storage storage = await get_storage() filters = {} if iteration_id: filters["iteration_id"] = iteration_id if status: filters["status"] = status if severity: filters["severity"] = severity return await storage.list("sd_bugs", filters=filters, sort_by="created_at desc") async def check_iteration_bugs_closed(iteration_id): """Check if all bugs for an iteration are closed. Returns True if ready for deploy.""" from pipeline_service.storage import get_storage storage = await get_storage() open_bugs = await storage.list("sd_bugs", filters={ "iteration_id": iteration_id, }) open_count = sum(1 for b in open_bugs if b.get("status") not in ("closed", "rejected")) return {"all_closed": open_count == 0, "open_count": open_count, "total": len(open_bugs)} # --- Deploy env management --- async def save_sd_deploy_env(data): """Create or update a deploy environment config.""" from pipeline_service.storage import get_storage storage = await get_storage() env_id = data.get("id") if not env_id: from appPublic.uniqueID import getID env_id = getID() data["id"] = env_id # Encrypt db_password if present if data.get("db_password"): from appPublic.password import password_encode data["db_password"] = password_encode(data["db_password"]) existing = await storage.get("sd_deploy_envs", env_id) if existing: await storage.update("sd_deploy_envs", data) else: await storage.insert("sd_deploy_envs", data) return {"id": env_id, "status": "ok"} async def list_sd_deploy_envs(project_id=None): """List deploy environments.""" from pipeline_service.storage import get_storage storage = await get_storage() filters = {} if project_id: filters["project_id"] = project_id return await storage.list("sd_deploy_envs", filters=filters)