diff --git a/mysql.ddl.sql b/mysql.ddl.sql index a47a4f4..ae3d258 100644 --- a/mysql.ddl.sql +++ b/mysql.ddl.sql @@ -1,67 +1,77 @@ --- 商机管理模块数据库表结构 +-- Table from opportunities.json +CREATE TABLE IF NOT EXISTS `opportunities` ( + `id` VARCHAR(32) NOT NULL COMMENT '主键 - UUID格式', + `customer_id` VARCHAR(32) NOT NULL COMMENT '关联客户管理模块的客户ID', + `customer_name` VARCHAR(255) NOT NULL COMMENT '客户名称,必填字段', + `opportunity_name` VARCHAR(255) NOT NULL COMMENT '商机标题或项目名称', + `estimated_amount` DECIMAL(15,2) NOT NULL COMMENT '预估成交金额,必填字段', + `current_stage` VARCHAR(50) NOT NULL COMMENT '当前所处的销售阶段,必填字段', + `expected_close_date` DATE NOT NULL COMMENT '预计成交日期,必填字段', + `source_type` VARCHAR(20) NOT NULL DEFAULT 'manual' COMMENT 'manual=手动录入, lead=线索转化', + `owner_id` VARCHAR(32) NOT NULL COMMENT '负责该商机的销售人员ID', + `owner_name` VARCHAR(100) NOT NULL COMMENT '负责该商机的销售人员姓名', + `region` VARCHAR(100) COMMENT '商机所属区域,用于分析筛选', + `description` TEXT COMMENT '商机详细描述信息', + `probability` FLOAT(5,2) NOT NULL DEFAULT '0.00' COMMENT '基于历史转化率计算的成交概率百分比', + `predicted_revenue` DECIMAL(15,2) NOT NULL DEFAULT '0.00' COMMENT '基于成交概率计算的预测收入', + `created_at` TIMESTAMP NOT NULL, + `updated_at` TIMESTAMP NOT NULL, + `status` VARCHAR(20) NOT NULL DEFAULT 'active' COMMENT 'active=活跃, won=已成交, lost=已丢失, closed=已关闭', + PRIMARY KEY (`id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='商机表'; --- 1. 商机表 (opportunities) -CREATE TABLE IF NOT EXISTS opportunities ( - id VARCHAR(64) PRIMARY KEY, - customer_name VARCHAR(255) NOT NULL COMMENT '客户名称', - customer_id VARCHAR(64) COMMENT '客户ID(关联客户管理模块)', - estimated_amount DECIMAL(15,2) NOT NULL COMMENT '预估金额', - sales_stage VARCHAR(64) NOT NULL COMMENT '销售阶段', - expected_close_date DATE NOT NULL COMMENT '预计成交时间', - actual_close_date DATE COMMENT '实际成交时间', - source VARCHAR(32) DEFAULT 'manual' COMMENT '来源: manual/lead_conversion', - description TEXT COMMENT '描述', - owner_id VARCHAR(64) NOT NULL COMMENT '负责人ID', - org_id VARCHAR(64) NOT NULL COMMENT '组织ID', - status VARCHAR(32) DEFAULT 'active' COMMENT '状态: active/won/lost/deleted', - probability DECIMAL(5,4) DEFAULT 0.1000 COMMENT '成交概率', - next_action_date DATE COMMENT '下次行动日期', - next_action_description VARCHAR(255) COMMENT '下次行动描述', - tags VARCHAR(255) COMMENT '标签', - created_at DATETIME DEFAULT CURRENT_TIMESTAMP, - updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - INDEX idx_customer_id (customer_id), - INDEX idx_sales_stage (sales_stage), - INDEX idx_owner_id (owner_id), - INDEX idx_org_id (org_id), - INDEX idx_status (status), - INDEX idx_expected_close_date (expected_close_date) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +CREATE INDEX `idx_opportunities_customer` ON `opportunities` (`customer_id`); +CREATE INDEX `idx_opportunities_owner` ON `opportunities` (`owner_id`); +CREATE INDEX `idx_opportunities_stage` ON `opportunities` (`current_stage`); +CREATE INDEX `idx_opportunities_region` ON `opportunities` (`region`); +CREATE INDEX `idx_opportunities_status` ON `opportunities` (`status`); +CREATE INDEX `idx_opportunities_expected_close` ON `opportunities` (`expected_close_date`); --- 2. 商机阶段变更历史表 (opportunity_stage_history) -CREATE TABLE IF NOT EXISTS opportunity_stage_history ( - id VARCHAR(64) PRIMARY KEY, - opportunity_id VARCHAR(64) NOT NULL, - old_stage VARCHAR(64) NOT NULL COMMENT '原阶段', - new_stage VARCHAR(64) NOT NULL COMMENT '新阶段', - change_reason TEXT NOT NULL COMMENT '变更原因', - changed_by VARCHAR(64) NOT NULL COMMENT '变更人ID', - created_at DATETIME DEFAULT CURRENT_TIMESTAMP, - INDEX idx_opportunity_id (opportunity_id), - INDEX idx_changed_by (changed_by), - FOREIGN KEY (opportunity_id) REFERENCES opportunities(id) ON DELETE CASCADE -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +-- Table from opportunity_predictions.json +CREATE TABLE IF NOT EXISTS `opportunity_predictions` ( + `id` VARCHAR(32) NOT NULL COMMENT '主键 - UUID格式', + `opportunity_id` VARCHAR(32) NOT NULL COMMENT '关联的商机ID', + `predicted_amount` DECIMAL(15,2) NOT NULL DEFAULT '0.00' COMMENT '基于历史转化率计算的预测成交金额', + `confidence_level` DECIMAL(5,4) NOT NULL DEFAULT '0.0000' COMMENT '预测的置信度(0-1)', + `prediction_date` DATE NOT NULL COMMENT '预测生成日期', + `actual_amount` DECIMAL(15,2) COMMENT '实际成交金额(成交后更新)', + `deviation_rate` DECIMAL(5,4) COMMENT '预测与实际的偏差率', + `created_at` TIMESTAMP NOT NULL COMMENT '记录创建时间', + PRIMARY KEY (`id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='商机预测表'; --- 3. 销售漏斗配置表 (sales_funnel_config) -CREATE TABLE IF NOT EXISTS sales_funnel_config ( - id VARCHAR(64) PRIMARY KEY, - org_id VARCHAR(64) NOT NULL, - stage_name VARCHAR(64) NOT NULL COMMENT '阶段名称', - stage_order INT NOT NULL COMMENT '阶段顺序', - default_probability DECIMAL(5,4) NOT NULL COMMENT '默认成交概率', - color_code VARCHAR(16) COMMENT '颜色代码', - is_active TINYINT(1) DEFAULT 1 COMMENT '是否激活', - created_at DATETIME DEFAULT CURRENT_TIMESTAMP, - updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - UNIQUE KEY uk_org_stage (org_id, stage_name), - INDEX idx_org_id (org_id), - INDEX idx_stage_order (stage_order) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +CREATE UNIQUE INDEX `idx_predictions_opportunity` ON `opportunity_predictions` (`opportunity_id`, `prediction_date`); +CREATE INDEX `idx_predictions_date` ON `opportunity_predictions` (`prediction_date`); --- 插入默认销售漏斗配置 -INSERT IGNORE INTO sales_funnel_config (id, org_id, stage_name, stage_order, default_probability, color_code) VALUES -(REPLACE(UUID(), '-', ''), 'default', '初步接洽', 1, 0.1000, '#FF6B6B'), -(REPLACE(UUID(), '-', ''), 'default', '需求确认', 2, 0.3000, '#4ECDC4'), -(REPLACE(UUID(), '-', ''), 'default', '方案报价', 3, 0.5000, '#45B7D1'), -(REPLACE(UUID(), '-', ''), 'default', '合同谈判', 4, 0.7000, '#96CEB4'), -(REPLACE(UUID(), '-', ''), 'default', '成交', 5, 1.0000, '#FFEAA7'); \ No newline at end of file +-- Table from opportunity_stage_history.json +CREATE TABLE IF NOT EXISTS `opportunity_stage_history` ( + `id` VARCHAR(32) NOT NULL COMMENT '主键 - UUID格式', + `opportunity_id` VARCHAR(32) NOT NULL COMMENT '关联的商机ID', + `from_stage` VARCHAR(50) COMMENT '变更前的销售阶段', + `to_stage` VARCHAR(50) NOT NULL COMMENT '变更后的销售阶段', + `change_reason` TEXT NOT NULL COMMENT '阶段变更的原因说明,必填字段', + `changed_by_id` VARCHAR(32) NOT NULL COMMENT '执行阶段变更的用户ID', + `changed_by_name` VARCHAR(100) NOT NULL COMMENT '执行阶段变更的用户姓名', + `changed_at` TIMESTAMP NOT NULL, + PRIMARY KEY (`id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='商机阶段变更历史表'; + +CREATE INDEX `idx_stage_history_opportunity` ON `opportunity_stage_history` (`opportunity_id`); +CREATE INDEX `idx_stage_history_changed_by` ON `opportunity_stage_history` (`changed_by_id`); +CREATE INDEX `idx_stage_history_changed_at` ON `opportunity_stage_history` (`changed_at`); + +-- Table from sales_stages.json +CREATE TABLE IF NOT EXISTS `sales_stages` ( + `id` VARCHAR(32) NOT NULL COMMENT '主键 - UUID格式', + `stage_name` VARCHAR(100) NOT NULL COMMENT '销售阶段名称,如\'初步接洽\'、\'需求确认\'等', + `stage_order` INT NOT NULL COMMENT '阶段在销售漏斗中的顺序,从小到大', + `conversion_rate` FLOAT(5,2) NOT NULL DEFAULT '0.00' COMMENT '该阶段到下一阶段的历史平均转化率', + `is_won_stage` VARCHAR(5) NOT NULL DEFAULT 'no' COMMENT 'yes=成交阶段, no=非成交阶段', + `is_lost_stage` VARCHAR(5) NOT NULL DEFAULT 'no' COMMENT 'yes=丢失阶段, no=非丢失阶段', + `created_at` TIMESTAMP NOT NULL, + `updated_at` TIMESTAMP NOT NULL, + PRIMARY KEY (`id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='销售阶段配置表'; + +CREATE UNIQUE INDEX `idx_sales_stages_order` ON `sales_stages` (`stage_order`); +CREATE UNIQUE INDEX `idx_sales_stages_name` ON `sales_stages` (`stage_name`); diff --git a/opportunity.json b/opportunity.json new file mode 100644 index 0000000..592174d --- /dev/null +++ b/opportunity.json @@ -0,0 +1,121 @@ +{ + "summary": { + "name": "opportunity", + "description": "商机管理表,存储销售机会的全生命周期信息", + "version": "1.0.0" + }, + "fields": { + "id": { + "type": "int", + "primary_key": true, + "auto_increment": true, + "description": "商机ID" + }, + "org_id": { + "type": "int", + "not_null": true, + "description": "组织ID,用于多租户隔离" + }, + "customer_id": { + "type": "int", + "not_null": true, + "description": "客户ID,关联客户管理模块" + }, + "customer_name": { + "type": "varchar(255)", + "not_null": true, + "description": "客户名称" + }, + "title": { + "type": "varchar(255)", + "not_null": true, + "description": "商机标题" + }, + "estimated_amount": { + "type": "decimal(15,2)", + "not_null": true, + "description": "预估金额" + }, + "current_stage": { + "type": "varchar(50)", + "not_null": true, + "default": "'初步接洽'", + "description": "当前销售阶段" + }, + "expected_close_date": { + "type": "date", + "not_null": true, + "description": "预计成交时间" + }, + "probability": { + "type": "decimal(5,2)", + "not_null": true, + "default": "0.00", + "description": "成交概率(百分比)" + }, + "source": { + "type": "varchar(50)", + "not_null": true, + "default": "'manual'", + "description": "商机来源:manual(手动录入)或 lead(线索转化)" + }, + "owner_id": { + "type": "int", + "not_null": true, + "description": "负责人ID" + }, + "owner_name": { + "type": "varchar(100)", + "not_null": true, + "description": "负责人姓名" + }, + "description": { + "type": "text", + "description": "商机描述" + }, + "created_at": { + "type": "datetime", + "not_null": true, + "default": "CURRENT_TIMESTAMP", + "description": "创建时间" + }, + "updated_at": { + "type": "datetime", + "not_null": true, + "default": "CURRENT_TIMESTAMP", + "on_update": "CURRENT_TIMESTAMP", + "description": "更新时间" + }, + "status": { + "type": "varchar(20)", + "not_null": true, + "default": "'active'", + "description": "状态:active(活跃)、won(成交)、lost(丢单)" + } + }, + "indexes": { + "idx_org_customer": ["org_id", "customer_id"], + "idx_org_owner": ["org_id", "owner_id"], + "idx_org_stage": ["org_id", "current_stage"], + "idx_org_status": ["org_id", "status"], + "idx_expected_close": ["org_id", "expected_close_date"] + }, + "codes": { + "stages": { + "初步接洽": "初步接触客户,了解基本信息", + "需求确认": "深入了解客户需求和痛点", + "方案报价": "提供解决方案和报价", + "合同谈判": "商务条款和合同细节谈判", + "成交": "签订合同,完成交易" + }, + "sources": { + "manual": "手动录入", + "lead": "线索转化" + }, + "statuses": { + "active": "活跃", + "won": "成交", + "lost": "丢单" + } + } +} \ No newline at end of file diff --git a/opportunity_management/core.py b/opportunity_management/core.py index d615ca1..eee20c3 100644 --- a/opportunity_management/core.py +++ b/opportunity_management/core.py @@ -5,7 +5,7 @@ import uuid from ahserver.serverenv import ServerEnv from appPublic.worker import awaitify -from sqlor.dbp import DBP +from sqlor.dbpools import DBPools async def create_opportunity( @@ -18,254 +18,258 @@ async def create_opportunity( region: str = None ) -> Dict: """创建商机""" - dbp = DBP() - opportunity_id = str(uuid.uuid4()).replace('-', '') - - # 验证阶段是否存在 - stage_exists = await dbp.select_one( - "sales_stages", - {"stage_name": current_stage, "is_active": "1"} - ) - if not stage_exists: - raise ValueError(f"销售阶段 '{current_stage}' 不存在或未启用") - - now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - opportunity_data = { - "id": opportunity_id, - "customer_name": customer_name, - "estimated_amount": estimated_amount, - "current_stage": current_stage, - "expected_close_date": expected_close_date, - "source_type": source_type, - "owner_id": owner_id or get_current_user_id(), - "region": region, - "created_at": now, - "updated_at": now, - "status": "active" + db = DBPools() + async with db.sqlorContext('default') as sor: + opportunity_id = str(uuid.uuid4()).replace('-', '') + + # 验证客户是否存在 + customer_records = await sor.R("customers", {"filters": [{"field": "customer_name", "op": "=", "value": customer_name}]}) + if not customer_records or len(customer_records) == 0: + raise ValueError(f"客户 {customer_name} 不存在") + + customer = customer_records[0] + customer_id = customer.get("id", "") + + now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + opportunity_data = { + "id": opportunity_id, + "customer_id": customer_id, + "customer_name": customer_name, + "estimated_amount": estimated_amount, + "current_stage": current_stage, + "expected_close_date": expected_close_date, + "source_type": source_type, + "owner_id": owner_id or get_current_user_id(), + "region": region, + "probability": get_stage_probability(current_stage), + "created_at": now, + "updated_at": now, + "status": "active" + } + + await sor.C("opportunities", opportunity_data) + return opportunity_data + + +def get_stage_probability(stage: str) -> float: + """根据销售阶段获取成交概率""" + stage_probabilities = { + "initial_contact": 0.1, + "needs_analysis": 0.2, + "proposal": 0.5, + "negotiation": 0.7, + "closed_won": 1.0, + "closed_lost": 0.0 } - - await dbp.insert("opportunities", opportunity_data) - - # 创建初始预测记录 - await create_prediction_record(opportunity_id, estimated_amount, current_stage) - - return opportunity_data + return stage_probabilities.get(stage, 0.0) async def update_opportunity_stage( opportunity_id: str, new_stage: str, - change_reason: str, - changed_by: str = None + notes: str = None ) -> Dict: """更新商机阶段""" - dbp = DBP() - changed_by = changed_by or get_current_user_id() - - # 获取当前商机信息 - opportunity = await dbp.select_one("opportunities", {"id": opportunity_id}) - if not opportunity: - raise ValueError("商机不存在") - - if opportunity["current_stage"] == new_stage: - raise ValueError("商机已在目标阶段") - - # 验证新阶段是否存在 - new_stage_info = await dbp.select_one( - "sales_stages", - {"stage_name": new_stage, "is_active": "1"} - ) - if not new_stage_info: - raise ValueError(f"销售阶段 '{new_stage}' 不存在或未启用") - - # 记录阶段变更历史 - history_id = str(uuid.uuid4()).replace('-', '') - history_data = { - "id": history_id, - "opportunity_id": opportunity_id, - "from_stage": opportunity["current_stage"], - "to_stage": new_stage, - "change_reason": change_reason, - "changed_by": changed_by, - "changed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") - } - await dbp.insert("opportunity_stage_history", history_data) - - # 更新商机阶段 - await dbp.update( - "opportunities", - {"current_stage": new_stage, "updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}, - {"id": opportunity_id} - ) - - # 更新预测记录 - await update_prediction_record(opportunity_id, opportunity["estimated_amount"], new_stage) - - return { - "opportunity_id": opportunity_id, - "from_stage": opportunity["current_stage"], - "to_stage": new_stage, - "history_id": history_id - } - - -async def create_prediction_record( - opportunity_id: str, - estimated_amount: float, - current_stage: str -) -> Dict: - """创建预测记录""" - dbp = DBP() - - # 获取当前阶段的转化率 - stage_info = await dbp.select_one("sales_stages", {"stage_name": current_stage}) - conversion_rate = float(stage_info.get("conversion_rate", "0.0000")) if stage_info else 0.0 - - predicted_amount = estimated_amount * conversion_rate if conversion_rate > 0 else estimated_amount - confidence_level = min(conversion_rate, 1.0) if conversion_rate > 0 else 0.5 - - prediction_id = str(uuid.uuid4()).replace('-', '') - prediction_data = { - "id": prediction_id, - "opportunity_id": opportunity_id, - "predicted_amount": predicted_amount, - "confidence_level": confidence_level, - "prediction_date": date.today().strftime("%Y-%m-%d"), - "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") - } - - await dbp.insert("opportunity_predictions", prediction_data) - return prediction_data - - -async def update_prediction_record( - opportunity_id: str, - estimated_amount: float, - current_stage: str -) -> Dict: - """更新预测记录""" - dbp = DBP() - - # 获取当前阶段的转化率 - stage_info = await dbp.select_one("sales_stages", {"stage_name": current_stage}) - conversion_rate = float(stage_info.get("conversion_rate", "0.0000")) if stage_info else 0.0 - - predicted_amount = estimated_amount * conversion_rate if conversion_rate > 0 else estimated_amount - confidence_level = min(conversion_rate, 1.0) if conversion_rate > 0 else 0.5 - - # 检查今天是否已有预测记录 - today = date.today().strftime("%Y-%m-%d") - existing_prediction = await dbp.select_one( - "opportunity_predictions", - {"opportunity_id": opportunity_id, "prediction_date": today} - ) - - prediction_data = { - "predicted_amount": predicted_amount, - "confidence_level": confidence_level, - "prediction_date": today, - "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") - } - - if existing_prediction: - # 更新现有记录 - await dbp.update( - "opportunity_predictions", - prediction_data, - {"id": existing_prediction["id"]} + db = DBPools() + async with db.sqlorContext('default') as sor: + opportunity_records = await sor.R("opportunities", {"filters": [{"field": "id", "op": "=", "value": opportunity_id}]}) + if not opportunity_records or len(opportunity_records) == 0: + raise ValueError("商机不存在") + + opportunity = opportunity_records[0] + if opportunity["status"] != "active": + raise ValueError("只能更新活跃状态的商机") + + # 记录阶段变更历史 + history_id = str(uuid.uuid4()).replace('-', '') + history_data = { + "id": history_id, + "opportunity_id": opportunity_id, + "from_stage": opportunity["current_stage"], + "to_stage": new_stage, + "notes": notes, + "changed_by": get_current_user_id(), + "changed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + } + await sor.C("opportunity_stage_history", history_data) + + # 更新商机 + now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + await sor.U( + "opportunities", + { + "current_stage": new_stage, + "probability": get_stage_probability(new_stage), + "updated_at": now + }, + {"filters": [{"field": "id", "op": "=", "value": opportunity_id}]} ) - prediction_data["id"] = existing_prediction["id"] - else: - # 创建新记录 - prediction_id = str(uuid.uuid4()).replace('-', '') - prediction_data["id"] = prediction_id - prediction_data["opportunity_id"] = opportunity_id - await dbp.insert("opportunity_predictions", prediction_data) - - return prediction_data + + # 如果是关闭阶段,更新状态 + if new_stage in ["closed_won", "closed_lost"]: + status = "won" if new_stage == "closed_won" else "lost" + await sor.U( + "opportunities", + {"status": status}, + {"filters": [{"field": "id", "op": "=", "value": opportunity_id}]} + ) + + return {"opportunity_id": opportunity_id, "new_stage": new_stage} -async def get_funnel_analysis( - region: str = None, - owner_id: str = None -) -> List[Dict]: - """获取销售漏斗分析数据""" - dbp = DBP() - - # 构建查询条件 - where_clause = {} - if region: - where_clause["region"] = region - if owner_id: - where_clause["owner_id"] = owner_id - - # 获取所有活跃商机按阶段分组 - funnel_data = await dbp.query(""" - SELECT - o.current_stage, - s.stage_order, - COUNT(*) as opportunity_count, - SUM(o.estimated_amount) as total_amount, - AVG(p.predicted_amount) as avg_predicted_amount - FROM opportunities o - LEFT JOIN sales_stages s ON o.current_stage = s.stage_name - LEFT JOIN opportunity_predictions p ON o.id = p.opportunity_id - AND p.prediction_date = CURDATE() - WHERE o.status = 'active' - """ + (" AND o.region = %(region)s" if region else "") + - (" AND o.owner_id = %(owner_id)s" if owner_id else "") + """ - GROUP BY o.current_stage, s.stage_order - ORDER BY s.stage_order ASC - """, where_clause) - - return funnel_data +async def assign_opportunity( + opportunity_id: str, + new_owner_id: str +) -> Dict: + """分配商机给销售人员""" + db = DBPools() + async with db.sqlorContext('default') as sor: + opportunity_records = await sor.R("opportunities", {"filters": [{"field": "id", "op": "=", "value": opportunity_id}]}) + if not opportunity_records or len(opportunity_records) == 0: + raise ValueError("商机不存在") + + opportunity = opportunity_records[0] + old_owner_id = opportunity["owner_id"] + + # 记录分配历史 + assignment_id = str(uuid.uuid4()).replace('-', '') + assignment_data = { + "id": assignment_id, + "opportunity_id": opportunity_id, + "from_owner_id": old_owner_id, + "to_owner_id": new_owner_id, + "assigned_by": get_current_user_id(), + "assigned_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + } + await sor.C("opportunity_assignment_history", assignment_data) + + # 更新商机负责人 + await sor.U( + "opportunities", + { + "owner_id": new_owner_id, + "updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + }, + {"filters": [{"field": "id", "op": "=", "value": opportunity_id}]} + ) + + return {"opportunity_id": opportunity_id, "new_owner_id": new_owner_id} -async def get_sales_prediction( - region: str = None, +async def get_opportunity_funnel( + start_date: str = None, + end_date: str = None, + owner_id: str = None, + region: str = None +) -> Dict: + """获取销售漏斗数据""" + db = DBPools() + async with db.sqlorContext('default') as sor: + filters = [] + if start_date and end_date: + filters.append({"field": "created_at", "op": ">=", "value": start_date}) + filters.append({"field": "created_at", "op": "<=", "value": end_date}) + if owner_id: + filters.append({"field": "owner_id", "op": "=", "value": owner_id}) + if region: + filters.append({"field": "region", "op": "=", "value": region}) + + opportunities = await sor.R("opportunities", {"filters": filters}) + + # 按阶段分组统计 + funnel_data = {} + total_value = 0 + + for opp in opportunities: + stage = opp["current_stage"] + amount = float(opp["estimated_amount"]) if opp["estimated_amount"] else 0 + probability = float(opp["probability"]) if opp["probability"] else 0 + + if stage not in funnel_data: + funnel_data[stage] = { + "count": 0, + "total_amount": 0, + "weighted_amount": 0 + } + + funnel_data[stage]["count"] += 1 + funnel_data[stage]["total_amount"] += amount + funnel_data[stage]["weighted_amount"] += amount * probability + total_value += amount * probability + + return { + "funnel_data": funnel_data, + "total_weighted_value": total_value, + "total_opportunities": len(opportunities) + } + + +async def get_sales_performance( + start_date: str, + end_date: str, owner_id: str = None ) -> Dict: - """获取销售预测汇总""" - dbp = DBP() - - where_clause = {"prediction_date": date.today().strftime("%Y-%m-%d")} - if region: - where_clause["region"] = region - if owner_id: - where_clause["owner_id"] = owner_id - - # 获取今日预测汇总 - prediction_summary = await dbp.query(""" - SELECT - SUM(p.predicted_amount) as total_predicted, - SUM(p.confidence_level) / COUNT(*) as avg_confidence, - COUNT(*) as opportunity_count - FROM opportunity_predictions p - JOIN opportunities o ON p.opportunity_id = o.id - WHERE p.prediction_date = %(prediction_date)s - """ + (" AND o.region = %(region)s" if region else "") + - (" AND o.owner_id = %(owner_id)s" if owner_id else ""), where_clause) - - if prediction_summary: - return prediction_summary[0] - return {"total_predicted": 0, "avg_confidence": 0, "opportunity_count": 0} + """获取销售业绩数据""" + db = DBPools() + async with db.sqlorContext('default') as sor: + # 查询已关闭的商机(赢单) + won_filters = [ + {"field": "status", "op": "=", "value": "won"}, + {"field": "updated_at", "op": ">=", "value": start_date}, + {"field": "updated_at", "op": "<=", "value": end_date} + ] + if owner_id: + won_filters.append({"field": "owner_id", "op": "=", "value": owner_id}) + + won_opportunities = await sor.R("opportunities", {"filters": won_filters}) + + total_revenue = sum(float(opp["estimated_amount"]) for opp in won_opportunities if opp["estimated_amount"]) + total_count = len(won_opportunities) + + # 查询活跃商机 + active_filters = [ + {"field": "status", "op": "=", "value": "active"}, + {"field": "created_at", "op": ">=", "value": start_date}, + {"field": "created_at", "op": "<=", "value": end_date} + ] + if owner_id: + active_filters.append({"field": "owner_id", "op": "=", "value": owner_id}) + + active_opportunities = await sor.R("opportunities", {"filters": active_filters}) + active_count = len(active_opportunities) + weighted_pipeline = sum( + float(opp["estimated_amount"]) * float(opp["probability"]) + for opp in active_opportunities + if opp["estimated_amount"] and opp["probability"] + ) + + return { + "period": f"{start_date} to {end_date}", + "total_revenue": total_revenue, + "total_closed_deals": total_count, + "active_opportunities": active_count, + "weighted_pipeline_value": weighted_pipeline, + "average_deal_size": total_revenue / total_count if total_count > 0 else 0 + } def get_current_user_id() -> str: """获取当前用户ID(模拟实现)""" - # 在实际实现中,这里应该从会话或认证信息中获取 return "current_user_id" -# 同步版本函数(用于前端调用) +# 同步版本函数 def sync_create_opportunity(*args, **kwargs): return create_opportunity(*args, **kwargs) def sync_update_opportunity_stage(*args, **kwargs): return update_opportunity_stage(*args, **kwargs) -def sync_get_funnel_analysis(*args, **kwargs): - return get_funnel_analysis(*args, **kwargs) +def sync_assign_opportunity(*args, **kwargs): + return assign_opportunity(*args, **kwargs) -def sync_get_sales_prediction(*args, **kwargs): - return get_sales_prediction(*args, **kwargs) \ No newline at end of file +def sync_get_opportunity_funnel(*args, **kwargs): + return get_opportunity_funnel(*args, **kwargs) + +def sync_get_sales_performance(*args, **kwargs): + return get_sales_performance(*args, **kwargs) \ No newline at end of file diff --git a/opportunity_management/init.py b/opportunity_management/init.py index 4a3969e..ed85683 100644 --- a/opportunity_management/init.py +++ b/opportunity_management/init.py @@ -2,15 +2,11 @@ from ahserver.serverenv import ServerEnv from appPublic.worker import awaitify from .core import ( sync_create_opportunity, - sync_update_opportunity_stage, - sync_get_funnel_analysis, - sync_get_sales_prediction + sync_update_opportunity_stage ) def load_opportunity_management(): env = ServerEnv() env.create_opportunity = awaitify(sync_create_opportunity) - env.update_opportunity_stage = awaitify(sync_update_opportunity_stage) - env.get_funnel_analysis = awaitify(sync_get_funnel_analysis) - env.get_sales_prediction = awaitify(sync_get_sales_prediction) \ No newline at end of file + env.update_opportunity_stage = awaitify(sync_update_opportunity_stage) \ No newline at end of file diff --git a/opportunity_management/opportunity_core.py b/opportunity_management/opportunity_core.py index ce4023f..54a5c5c 100644 --- a/opportunity_management/opportunity_core.py +++ b/opportunity_management/opportunity_core.py @@ -1,399 +1,307 @@ """ -商机管理模块 - 核心业务逻辑 +商机管理核心模块 实现商机全生命周期管理和商机分析功能 """ - -import os -import json -import uuid from datetime import datetime, date -from typing import List, Dict, Optional, Tuple -from appPublic.jsonconfig import getConfig -from appPublic.worker import Worker -from sqlor.dbp import getDBP +from decimal import Decimal +from typing import List, Dict, Optional +import uuid + +from ahserver.serverenv import ServerEnv +from appPublic.worker import awaitify +from appPublic.Config import getConfig +from sqlor.dbpools import DBPools -class OpportunityManager: - def __init__(self): - self.config = getConfig() - self.worker = Worker() - - async def get_db_connection(self, org_id: str): - """获取数据库连接""" - dbp = await getDBP(org_id) - return dbp - - async def create_opportunity(self, opportunity_data: Dict, user_id: str, org_id: str) -> str: - """创建商机""" - # 验证必填字段 - required_fields = ['customer_name', 'estimated_amount', 'sales_stage', 'expected_close_date'] - for field in required_fields: - if not opportunity_data.get(field): - raise ValueError(f"缺少必填字段: {field}") - +async def create_opportunity( + customer_name: str, + estimated_amount: float, + current_stage: str, + expected_close_date: str, + source_type: str = "manual", + owner_id: str = None, + region: str = None +) -> Dict: + """创建商机""" + db = DBPools() + async with db.sqlorContext('default') as sor: opportunity_id = str(uuid.uuid4()).replace('-', '') - dbp = await self.get_db_connection(org_id) - # 插入商机数据 - sql = """ - INSERT INTO opportunities ( - id, customer_name, customer_id, estimated_amount, sales_stage, - expected_close_date, source, description, owner_id, org_id, - status, created_at, updated_at, probability, next_action_date, - next_action_description, tags - ) VALUES ( - %(id)s, %(customer_name)s, %(customer_id)s, %(estimated_amount)s, %(sales_stage)s, - %(expected_close_date)s, %(source)s, %(description)s, %(owner_id)s, %(org_id)s, - %(status)s, NOW(), NOW(), %(probability)s, %(next_action_date)s, - %(next_action_description)s, %(tags)s - ) - """ + # 验证客户是否存在 + customer_records = await sor.R("customers", {"filters": [{"field": "customer_name", "op": "=", "value": customer_name}]}) + if not customer_records or len(customer_records) == 0: + raise ValueError(f"客户 {customer_name} 不存在") - params = { - 'id': opportunity_id, - 'customer_name': opportunity_data['customer_name'], - 'customer_id': opportunity_data.get('customer_id'), - 'estimated_amount': opportunity_data['estimated_amount'], - 'sales_stage': opportunity_data['sales_stage'], - 'expected_close_date': opportunity_data['expected_close_date'], - 'source': opportunity_data.get('source', 'manual'), # manual 或 lead_conversion - 'description': opportunity_data.get('description'), - 'owner_id': user_id, - 'org_id': org_id, - 'status': opportunity_data.get('status', 'active'), - 'probability': opportunity_data.get('probability', self._calculate_probability(opportunity_data['sales_stage'])), - 'next_action_date': opportunity_data.get('next_action_date'), - 'next_action_description': opportunity_data.get('next_action_description'), - 'tags': opportunity_data.get('tags') + customer = customer_records[0] + customer_id = customer.get("id", "") + + now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + opportunity_data = { + "id": opportunity_id, + "customer_id": customer_id, + "customer_name": customer_name, + "estimated_amount": estimated_amount, + "current_stage": current_stage, + "expected_close_date": expected_close_date, + "source_type": source_type, + "owner_id": owner_id or get_current_user_id(), + "region": region, + "probability": get_stage_probability(current_stage), + "created_at": now, + "updated_at": now, + "status": "active" } - await dbp.doTransaction([{'sql': sql, 'params': params}]) - return opportunity_id - - def _calculate_probability(self, sales_stage: str) -> float: - """根据销售阶段计算成交概率""" - stage_probabilities = { - '初步接洽': 0.1, - '需求确认': 0.3, - '方案报价': 0.5, - '合同谈判': 0.7, - '成交': 1.0 - } - return stage_probabilities.get(sales_stage, 0.1) - - async def update_opportunity_stage(self, opportunity_id: str, new_stage: str, - change_reason: str, user_id: str, org_id: str) -> bool: - """更新商机阶段""" - dbp = await self.get_db_connection(org_id) - - # 获取当前商机信息 - current_opportunity = await dbp.select_one("opportunities", {"id": opportunity_id, "org_id": org_id}) - if not current_opportunity: + await sor.C("opportunities", opportunity_data) + return opportunity_data + + +def get_stage_probability(stage: str) -> float: + """根据销售阶段获取成交概率""" + stage_probabilities = { + "initial_contact": 0.1, + "needs_analysis": 0.2, + "proposal": 0.5, + "negotiation": 0.7, + "closed_won": 1.0, + "closed_lost": 0.0 + } + return stage_probabilities.get(stage, 0.0) + + +async def update_opportunity_stage( + opportunity_id: str, + new_stage: str, + notes: str = None +) -> Dict: + """更新商机阶段""" + db = DBPools() + async with db.sqlorContext('default') as sor: + opportunity_records = await sor.R("opportunities", {"filters": [{"field": "id", "op": "=", "value": opportunity_id}]}) + if not opportunity_records or len(opportunity_records) == 0: raise ValueError("商机不存在") - - old_stage = current_opportunity['sales_stage'] - # 更新商机阶段和概率 - new_probability = self._calculate_probability(new_stage) - update_data = { - 'sales_stage': new_stage, - 'probability': new_probability, - 'updated_at': datetime.now().strftime("%Y-%m-%d %H:%M:%S") - } - - # 如果阶段变为成交,更新状态 - if new_stage == '成交': - update_data['status'] = 'won' - update_data['actual_close_date'] = datetime.now().strftime("%Y-%m-%d") - - # 如果阶段回退,可能需要更新状态 - elif old_stage == '成交' and new_stage != '成交': - update_data['status'] = 'active' - update_data['actual_close_date'] = None - - result = await dbp.update("opportunities", update_data, {"id": opportunity_id, "org_id": org_id}) + opportunity = opportunity_records[0] + if opportunity["status"] != "active": + raise ValueError("只能更新活跃状态的商机") # 记录阶段变更历史 - if old_stage != new_stage: - await self._record_stage_change(opportunity_id, old_stage, new_stage, change_reason, user_id, org_id) - - return result.rowcount > 0 - - async def _record_stage_change(self, opportunity_id: str, old_stage: str, - new_stage: str, change_reason: str, user_id: str, org_id: str): - """记录阶段变更历史""" history_id = str(uuid.uuid4()).replace('-', '') - dbp = await self.get_db_connection(org_id) - history_data = { - 'id': history_id, - 'opportunity_id': opportunity_id, - 'old_stage': old_stage, - 'new_stage': new_stage, - 'change_reason': change_reason, - 'changed_by': user_id, - 'created_at': datetime.now().strftime("%Y-%m-%d %H:%M:%S") + "id": history_id, + "opportunity_id": opportunity_id, + "from_stage": opportunity["current_stage"], + "to_stage": new_stage, + "notes": notes, + "changed_by": get_current_user_id(), + "changed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } + await sor.C("opportunity_stage_history", history_data) - await dbp.insert("opportunity_stage_history", history_data) - - async def get_funnel_analysis(self, org_id: str, filters: Optional[Dict] = None) -> Dict: - """获取销售漏斗分析数据""" - dbp = await self.get_db_connection(org_id) - - # 构建查询条件 - where_clauses = ["o.org_id = %(org_id)s", "o.status = 'active'"] - params = {'org_id': org_id} - - if filters: - if filters.get('region'): - where_clauses.append("c.region = %(region)s") - params['region'] = filters['region'] - if filters.get('owner_id'): - where_clauses.append("o.owner_id = %(owner_id)s") - params['owner_id'] = filters['owner_id'] - if filters.get('date_range'): - start_date, end_date = filters['date_range'] - where_clauses.append("o.created_at BETWEEN %(start_date)s AND %(end_date)s") - params['start_date'] = start_date - params['end_date'] = end_date - - where_sql = " AND ".join(where_clauses) - - # 获取各阶段商机数量和金额 - funnel_sql = f""" - SELECT - o.sales_stage, - COUNT(*) as opportunity_count, - SUM(o.estimated_amount) as total_amount, - AVG(o.probability) as avg_probability - FROM opportunities o - LEFT JOIN customers c ON o.customer_id = c.id - WHERE {where_sql} - GROUP BY o.sales_stage - ORDER BY - CASE o.sales_stage - WHEN '初步接洽' THEN 1 - WHEN '需求确认' THEN 2 - WHEN '方案报价' THEN 3 - WHEN '合同谈判' THEN 4 - WHEN '成交' THEN 5 - ELSE 99 - END - """ - - funnel_data = await dbp.doQuery(funnel_sql, params) - - # 计算预测成交金额 - predicted_revenue = 0 - for item in funnel_data: - predicted_revenue += float(item['total_amount']) * float(item['avg_probability']) - - return { - 'funnel_data': funnel_data, - 'predicted_revenue': predicted_revenue, - 'total_opportunities': sum(item['opportunity_count'] for item in funnel_data), - 'total_estimated_amount': sum(float(item['total_amount']) for item in funnel_data) - } - - async def get_conversion_rate_analysis(self, org_id: str, period: str = 'last_6_months') -> Dict: - """获取转化率分析数据(用于预测功能)""" - dbp = await self.get_db_connection(org_id) - - # 确定时间范围 - if period == 'last_6_months': - start_date = (date.today().replace(day=1) - timedelta(days=180)).strftime("%Y-%m-01") - elif period == 'last_year': - start_date = (date.today().replace(day=1) - timedelta(days=365)).strftime("%Y-%m-01") - else: - start_date = (date.today() - timedelta(days=90)).strftime("%Y-%m-%d") - - # 获取历史转化数据 - conversion_sql = """ - SELECT - DATE_FORMAT(created_at, '%%Y-%%m') as month, - COUNT(*) as total_opportunities, - SUM(CASE WHEN status = 'won' THEN 1 ELSE 0 END) as won_opportunities, - AVG(CASE WHEN status = 'won' THEN estimated_amount ELSE 0 END) as avg_won_amount - FROM opportunities - WHERE org_id = %(org_id)s - AND created_at >= %(start_date)s - GROUP BY DATE_FORMAT(created_at, '%%Y-%%m') - ORDER BY month - """ - - conversion_data = await dbp.doQuery(conversion_sql, {'org_id': org_id, 'start_date': start_date}) - - # 计算整体转化率 - total_opps = sum(item['total_opportunities'] for item in conversion_data) - total_won = sum(item['won_opportunities'] for item in conversion_data) - overall_conversion_rate = total_won / total_opps if total_opps > 0 else 0 - - return { - 'conversion_data': conversion_data, - 'overall_conversion_rate': overall_conversion_rate, - 'period': period, - 'start_date': start_date - } - - async def predict_revenue(self, org_id: str, filters: Optional[Dict] = None) -> Dict: - """预测收入(基于历史转化率)""" - # 获取当前活跃商机 - dbp = await self.get_db_connection(org_id) - - where_clauses = ["org_id = %(org_id)s", "status = 'active'"] - params = {'org_id': org_id} - - if filters: - if filters.get('owner_id'): - where_clauses.append("owner_id = %(owner_id)s") - params['owner_id'] = filters['owner_id'] - if filters.get('region'): - # 需要关联客户表 - pass - - where_sql = " AND ".join(where_clauses) - - opportunities_sql = f""" - SELECT id, estimated_amount, probability, sales_stage - FROM opportunities - WHERE {where_sql} - """ - - current_opportunities = await dbp.doQuery(opportunities_sql, params) - - # 获取历史转化率 - conversion_analysis = await self.get_conversion_rate_analysis(org_id) - historical_conversion_rate = conversion_analysis['overall_conversion_rate'] - - # 计算预测收入 - stage_based_prediction = sum( - float(opp['estimated_amount']) * float(opp['probability']) - for opp in current_opportunities + # 更新商机 + now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + await sor.U( + "opportunities", + { + "current_stage": new_stage, + "probability": get_stage_probability(new_stage), + "updated_at": now + }, + {"filters": [{"field": "id", "op": "=", "value": opportunity_id}]} ) - historical_based_prediction = sum( - float(opp['estimated_amount']) * historical_conversion_rate - for opp in current_opportunities + # 如果是关闭阶段,更新状态 + if new_stage in ["closed_won", "closed_lost"]: + status = "won" if new_stage == "closed_won" else "lost" + await sor.U( + "opportunities", + {"status": status}, + {"filters": [{"field": "id", "op": "=", "value": opportunity_id}]} + ) + + return {"opportunity_id": opportunity_id, "new_stage": new_stage} + + +async def assign_opportunity( + opportunity_id: str, + new_owner_id: str +) -> Dict: + """分配商机给销售人员""" + db = DBPools() + async with db.sqlorContext('default') as sor: + opportunity_records = await sor.R("opportunities", {"filters": [{"field": "id", "op": "=", "value": opportunity_id}]}) + if not opportunity_records or len(opportunity_records) == 0: + raise ValueError("商机不存在") + + opportunity = opportunity_records[0] + old_owner_id = opportunity["owner_id"] + + # 记录分配历史 + assignment_id = str(uuid.uuid4()).replace('-', '') + assignment_data = { + "id": assignment_id, + "opportunity_id": opportunity_id, + "from_owner_id": old_owner_id, + "to_owner_id": new_owner_id, + "assigned_by": get_current_user_id(), + "assigned_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + } + await sor.C("opportunity_assignment_history", assignment_data) + + # 更新商机负责人 + await sor.U( + "opportunities", + { + "owner_id": new_owner_id, + "updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + }, + {"filters": [{"field": "id", "op": "=", "value": opportunity_id}]} ) - # 使用加权平均(70%阶段概率 + 30%历史转化率) - final_prediction = stage_based_prediction * 0.7 + historical_based_prediction * 0.3 + return {"opportunity_id": opportunity_id, "new_owner_id": new_owner_id} + + +async def get_opportunity_funnel( + start_date: str = None, + end_date: str = None, + owner_id: str = None, + region: str = None +) -> Dict: + """获取销售漏斗数据""" + db = DBPools() + async with db.sqlorContext('default') as sor: + filters = [] + if start_date and end_date: + filters.append({"field": "created_at", "op": ">=", "value": start_date}) + filters.append({"field": "created_at", "op": "<=", "value": end_date}) + if owner_id: + filters.append({"field": "owner_id", "op": "=", "value": owner_id}) + if region: + filters.append({"field": "region", "op": "=", "value": region}) + + opportunities = await sor.R("opportunities", {"filters": filters}) + + # 按阶段分组统计 + funnel_data = {} + total_value = 0 + + for opp in opportunities: + stage = opp["current_stage"] + amount = float(opp["estimated_amount"]) if opp["estimated_amount"] else 0 + probability = float(opp["probability"]) if opp["probability"] else 0 + + if stage not in funnel_data: + funnel_data[stage] = { + "count": 0, + "total_amount": 0, + "weighted_amount": 0 + } + + funnel_data[stage]["count"] += 1 + funnel_data[stage]["total_amount"] += amount + funnel_data[stage]["weighted_amount"] += amount * probability + total_value += amount * probability return { - 'stage_based_prediction': stage_based_prediction, - 'historical_based_prediction': historical_based_prediction, - 'final_prediction': final_prediction, - 'opportunity_count': len(current_opportunities), - 'total_estimated_amount': sum(float(opp['estimated_amount']) for opp in current_opportunities), - 'confidence_level': 'high' if len(current_opportunities) > 10 else 'medium' + "funnel_data": funnel_data, + "total_weighted_value": total_value, + "total_opportunities": len(opportunities) } + + +async def get_sales_performance( + start_date: str, + end_date: str, + owner_id: str = None +) -> Dict: + """获取销售业绩数据""" + db = DBPools() + async with db.sqlorContext('default') as sor: + # 查询已关闭的商机(赢单) + won_filters = [ + {"field": "status", "op": "=", "value": "won"}, + {"field": "updated_at", "op": ">=", "value": start_date}, + {"field": "updated_at", "op": "<=", "value": end_date} + ] + if owner_id: + won_filters.append({"field": "owner_id", "op": "=", "value": owner_id}) - # 基础CRUD操作 - async def update_opportunity(self, opportunity_id: str, opportunity_data: Dict, org_id: str) -> bool: - """更新商机""" - dbp = await self.get_db_connection(org_id) - opportunity_data['updated_at'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - result = await dbp.update("opportunities", opportunity_data, {"id": opportunity_id, "org_id": org_id}) - return result.rowcount > 0 + won_opportunities = await sor.R("opportunities", {"filters": won_filters}) - async def delete_opportunity(self, opportunity_id: str, org_id: str) -> bool: - """删除商机(软删除)""" - dbp = await self.get_db_connection(org_id) - update_data = { - 'status': 'deleted', - 'updated_at': datetime.now().strftime("%Y-%m-%d %H:%M:%S") + total_revenue = sum(float(opp["estimated_amount"]) for opp in won_opportunities if opp["estimated_amount"]) + total_count = len(won_opportunities) + + # 查询活跃商机 + active_filters = [ + {"field": "status", "op": "=", "value": "active"}, + {"field": "created_at", "op": ">=", "value": start_date}, + {"field": "created_at", "op": "<=", "value": end_date} + ] + if owner_id: + active_filters.append({"field": "owner_id", "op": "=", "value": owner_id}) + + active_opportunities = await sor.R("opportunities", {"filters": active_filters}) + active_count = len(active_opportunities) + weighted_pipeline = sum( + float(opp["estimated_amount"]) * float(opp["probability"]) + for opp in active_opportunities + if opp["estimated_amount"] and opp["probability"] + ) + + return { + "period": f"{start_date} to {end_date}", + "total_revenue": total_revenue, + "total_closed_deals": total_count, + "active_opportunities": active_count, + "weighted_pipeline_value": weighted_pipeline, + "average_deal_size": total_revenue / total_count if total_count > 0 else 0 } - result = await dbp.update("opportunities", update_data, {"id": opportunity_id, "org_id": org_id}) - return result.rowcount > 0 + + +def get_current_user_id() -> str: + """获取当前用户ID(模拟实现)""" + return "current_user_id" + + +# 兼容旧接口 +async def get_funnel_analysis(**kwargs): + return await get_opportunity_funnel(**kwargs) + +async def predict_revenue(start_date: str, end_date: str): + performance = await get_sales_performance(start_date, end_date) + return {"predicted_revenue": performance["weighted_pipeline_value"]} + +async def update_opportunity(opportunity_id: str, **updates): + db = DBPools() + async with db.sqlorContext('default') as sor: + updates["updated_at"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + await sor.U( + "opportunities", + updates, + {"filters": [{"field": "id", "op": "=", "value": opportunity_id}]} + ) + return {"opportunity_id": opportunity_id, "updated": True} + +async def delete_opportunity(opportunity_id: str): + db = DBPools() + async with db.sqlorContext('default') as sor: + await sor.D( + "opportunities", + {"filters": [{"field": "id", "op": "=", "value": opportunity_id}]} + ) + return {"opportunity_id": opportunity_id, "deleted": True} + +async def get_opportunity_by_id(opportunity_id: str): + db = DBPools() + async with db.sqlorContext('default') as sor: + records = await sor.R("opportunities", {"filters": [{"field": "id", "op": "=", "value": opportunity_id}]}) + return records[0] if records else None + +async def list_opportunities(**filters): + db = DBPools() + async with db.sqlorContext('default') as sor: + filter_list = [] + for field, value in filters.items(): + filter_list.append({"field": field, "op": "=", "value": value}) - async def get_opportunity_by_id(self, opportunity_id: str, org_id: str) -> Optional[Dict]: - """根据ID获取商机""" - dbp = await self.get_db_connection(org_id) - sql = "SELECT * FROM opportunities WHERE id = %(id)s AND org_id = %(org_id)s AND status != 'deleted'" - result = await dbp.doQuery(sql, {'id': opportunity_id, 'org_id': org_id}) - return result[0] if result else None - - async def list_opportunities(self, org_id: str, filters: Optional[Dict] = None, - page: int = 1, page_size: int = 20) -> Tuple[List[Dict], int]: - """列出商机""" - dbp = await self.get_db_connection(org_id) - - # 构建查询条件 - where_clauses = ["o.org_id = %(org_id)s", "o.status != 'deleted'"] - params = {'org_id': org_id} - - if filters: - if filters.get('customer_name'): - where_clauses.append("o.customer_name LIKE %(customer_name)s") - params['customer_name'] = f"%{filters['customer_name']}%" - if filters.get('sales_stage'): - where_clauses.append("o.sales_stage = %(sales_stage)s") - params['sales_stage'] = filters['sales_stage'] - if filters.get('owner_id'): - where_clauses.append("o.owner_id = %(owner_id)s") - params['owner_id'] = filters['owner_id'] - if filters.get('status'): - where_clauses.append("o.status = %(status)s") - params['status'] = filters['status'] - if filters.get('expected_close_date_from'): - where_clauses.append("o.expected_close_date >= %(expected_close_date_from)s") - params['expected_close_date_from'] = filters['expected_close_date_from'] - if filters.get('expected_close_date_to'): - where_clauses.append("o.expected_close_date <= %(expected_close_date_to)s") - params['expected_close_date_to'] = filters['expected_close_date_to'] - - where_sql = " AND ".join(where_clauses) - - # 获取总数 - count_sql = f"SELECT COUNT(*) as total FROM opportunities o WHERE {where_sql}" - count_result = await dbp.doQuery(count_sql, params) - total = count_result[0]['total'] if count_result else 0 - - # 获取分页数据 - offset = (page - 1) * page_size - data_sql = f""" - SELECT o.*, u.username as owner_name - FROM opportunities o - LEFT JOIN users u ON o.owner_id = u.id - WHERE {where_sql} - ORDER BY o.created_at DESC - LIMIT %(limit)s OFFSET %(offset)s - """ - params['limit'] = page_size - params['offset'] = offset - - data_result = await dbp.doQuery(data_sql, params) - return data_result, total - - -# 全局实例 -opportunity_manager = OpportunityManager() - -# 导出函数 -async def create_opportunity(opportunity_data: Dict, user_id: str, org_id: str) -> str: - return await opportunity_manager.create_opportunity(opportunity_data, user_id, org_id) - -async def update_opportunity_stage(opportunity_id: str, new_stage: str, - change_reason: str, user_id: str, org_id: str) -> bool: - return await opportunity_manager.update_opportunity_stage(opportunity_id, new_stage, change_reason, user_id, org_id) - -async def get_funnel_analysis(org_id: str, filters: Optional[Dict] = None) -> Dict: - return await opportunity_manager.get_funnel_analysis(org_id, filters) - -async def predict_revenue(org_id: str, filters: Optional[Dict] = None) -> Dict: - return await opportunity_manager.predict_revenue(org_id, filters) - -# 基础CRUD接口 -async def update_opportunity(opportunity_id: str, opportunity_data: Dict, org_id: str) -> bool: - return await opportunity_manager.update_opportunity(opportunity_id, opportunity_data, org_id) - -async def delete_opportunity(opportunity_id: str, org_id: str) -> bool: - return await opportunity_manager.delete_opportunity(opportunity_id, org_id) - -async def get_opportunity_by_id(opportunity_id: str, org_id: str) -> Optional[Dict]: - return await opportunity_manager.get_opportunity_by_id(opportunity_id, org_id) - -async def list_opportunities(org_id: str, filters: Optional[Dict] = None, - page: int = 1, page_size: int = 20) -> Tuple[List[Dict], int]: - return await opportunity_manager.list_opportunities(org_id, filters, page, page_size) \ No newline at end of file + return await sor.R("opportunities", {"filters": filter_list}) \ No newline at end of file diff --git a/opportunity_stage_history.json b/opportunity_stage_history.json new file mode 100644 index 0000000..79f38b4 --- /dev/null +++ b/opportunity_stage_history.json @@ -0,0 +1,60 @@ +{ + "summary": { + "name": "opportunity_stage_history", + "description": "商机阶段变更历史记录表", + "version": "1.0.0" + }, + "fields": { + "id": { + "type": "int", + "primary_key": true, + "auto_increment": true, + "description": "历史记录ID" + }, + "opportunity_id": { + "type": "int", + "not_null": true, + "description": "商机ID" + }, + "org_id": { + "type": "int", + "not_null": true, + "description": "组织ID" + }, + "from_stage": { + "type": "varchar(50)", + "description": "变更前阶段" + }, + "to_stage": { + "type": "varchar(50)", + "not_null": true, + "description": "变更后阶段" + }, + "change_reason": { + "type": "text", + "not_null": true, + "description": "阶段变更原因" + }, + "changed_by": { + "type": "int", + "not_null": true, + "description": "变更人ID" + }, + "changed_by_name": { + "type": "varchar(100)", + "not_null": true, + "description": "变更人姓名" + }, + "created_at": { + "type": "datetime", + "not_null": true, + "default": "CURRENT_TIMESTAMP", + "description": "变更时间" + } + }, + "indexes": { + "idx_opportunity": ["opportunity_id"], + "idx_org": ["org_id"] + }, + "codes": {} +} \ No newline at end of file diff --git a/wwwroot/api/check_tables.dspy b/wwwroot/api/check_tables.dspy new file mode 100644 index 0000000..056a0f9 --- /dev/null +++ b/wwwroot/api/check_tables.dspy @@ -0,0 +1,18 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +import json +result = {} +try: + dbname = get_module_dbname('opportunity_management') + async with DBPools().sqlorContext(dbname) as sor: + ns = {'page': 1, 'rows': 50, 'sort': 'COLUMN_NAME'} + for tbl in ['opportunities', 'sales_stages']: + sql = f"SELECT COLUMN_NAME FROM information_schema.COLUMNS WHERE TABLE_SCHEMA='crm_db' AND TABLE_NAME='{tbl}'" + rows = await sor.sqlExe(sql, ns) + if isinstance(rows, dict): + rows = rows.get('rows', []) + result[tbl] = [dict(r).get('column_name', '') for r in rows] + result['success'] = True +except Exception as e: + result['error'] = str(e) +return json.dumps(result, ensure_ascii=False, default=str) diff --git a/wwwroot/api/opportunities_create.dspy b/wwwroot/api/opportunities_create.dspy new file mode 100644 index 0000000..005e58d --- /dev/null +++ b/wwwroot/api/opportunities_create.dspy @@ -0,0 +1,45 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +"""Opportunity create API""" +import json, uuid, time + +result = {'widgettype': 'Message', 'options': {'title': 'Error', 'message': 'Invalid request', 'type': 'error'}} + +try: + opportunity_name = params_kw.get('opportunity_name', '').strip() + customer_id = params_kw.get('customer_id', '').strip() + customer_name = params_kw.get('customer_name', '').strip() + estimated_amount = params_kw.get('estimated_amount', '0').strip() + current_stage = params_kw.get('current_stage', '').strip() + expected_close_date = params_kw.get('expected_close_date', '').strip() + owner_id = params_kw.get('owner_id', '').strip() + owner_name = params_kw.get('owner_name', '').strip() + + if not opportunity_name or not customer_id or not current_stage or not owner_id: + result['options'] = {'title': 'Error', 'message': '请填写必填字段', 'type': 'error'} + else: + dbname = get_module_dbname('opportunity_management') + new_id = str(uuid.uuid4()).replace('-', '') + now = time.strftime('%Y-%m-%d %H:%M:%S') + + async with DBPools().sqlorContext(dbname) as sor: + await sor.sqlExe("""INSERT INTO opportunities (id, customer_id, customer_name, opportunity_name, estimated_amount, current_stage, expected_close_date, source_type, owner_id, owner_name, region, description, probability, predicted_revenue, status, created_at, updated_at) + VALUES (${id}$, ${customer_id}$, ${customer_name}$, ${opportunity_name}$, ${estimated_amount}$, ${current_stage}$, ${expected_close_date}$, ${source_type}$, ${owner_id}$, ${owner_name}$, ${region}$, ${description}$, ${probability}$, ${predicted_revenue}$, ${status}$, ${created_at}$, ${updated_at}$)""", { + 'id': new_id, 'customer_id': customer_id, 'customer_name': customer_name, + 'opportunity_name': opportunity_name, 'estimated_amount': float(estimated_amount) if estimated_amount else 0, + 'current_stage': current_stage, 'expected_close_date': expected_close_date, + 'source_type': params_kw.get('source_type', 'manual').strip(), + 'owner_id': owner_id, 'owner_name': owner_name, + 'region': params_kw.get('region', '').strip(), + 'description': params_kw.get('description', '').strip(), + 'probability': float(params_kw.get('probability', '0').strip()) if params_kw.get('probability') else 0, + 'predicted_revenue': float(params_kw.get('predicted_revenue', '0').strip()) if params_kw.get('predicted_revenue') else 0, + 'status': params_kw.get('status', 'active').strip(), + 'created_at': now, 'updated_at': now + }) + result = {'widgettype': 'Message', 'options': {'title': 'Success', 'message': '商机创建成功', 'type': 'success'}} + +except Exception as e: + result['options'] = {'title': 'Error', 'message': f'创建失败: {str(e)}', 'type': 'error'} + +return json.dumps(result, ensure_ascii=False) diff --git a/wwwroot/api/opportunities_delete.dspy b/wwwroot/api/opportunities_delete.dspy new file mode 100644 index 0000000..4ecc069 --- /dev/null +++ b/wwwroot/api/opportunities_delete.dspy @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +"""Opportunity delete API""" +import json + +result = {'widgettype': 'Message', 'options': {'title': 'Error', 'message': 'Invalid request', 'type': 'error'}} + +try: + record_id = params_kw.get('id', '').strip() + if not record_id: + result['options'] = {'title': 'Error', 'message': '记录ID不能为空', 'type': 'error'} + else: + dbname = get_module_dbname('opportunity_management') + async with DBPools().sqlorContext(dbname) as sor: + existing = await sor.sqlExe("SELECT id FROM opportunities WHERE id = ${id}$", {'id': record_id}) + if not existing: + result['options'] = {'title': 'Error', 'message': '商机不存在', 'type': 'error'} + else: + await sor.sqlExe("DELETE FROM opportunities WHERE id = ${id}$", {'id': record_id}) + result = {'widgettype': 'Message', 'options': {'title': 'Success', 'message': '删除成功', 'type': 'success'}} + +except Exception as e: + result['options'] = {'title': 'Error', 'message': f'删除失败: {str(e)}', 'type': 'error'} + +return json.dumps(result, ensure_ascii=False) diff --git a/wwwroot/api/opportunities_list.dspy b/wwwroot/api/opportunities_list.dspy new file mode 100644 index 0000000..82e57e5 --- /dev/null +++ b/wwwroot/api/opportunities_list.dspy @@ -0,0 +1,30 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +"""Opportunity list API""" +import json + +result = {'success': False, 'rows': [], 'total': 0} + +try: + dbname = get_module_dbname('opportunity_management') + ns = { + 'page': int(params_kw.get('page', 1)), + 'rows': int(params_kw.get('rows', 20)), + 'sort': 'created_at desc' + } + sql = "SELECT id, customer_id, customer_name, opportunity_name, estimated_amount, current_stage, expected_close_date, owner_id, owner_name, region, probability, status, created_at FROM opportunities" + + async with DBPools().sqlorContext(dbname) as sor: + data = await sor.sqlExe(sql, ns) + if isinstance(data, dict): + result['total'] = data.get('total', 0) + result['rows'] = [dict(r) for r in data.get('rows', [])] + else: + result['rows'] = [dict(r) for r in (data or [])] + result['total'] = len(result['rows']) + result['success'] = True + +except Exception as e: + result['error'] = str(e) + +return json.dumps(result, ensure_ascii=False, default=str) diff --git a/wwwroot/api/opportunities_update.dspy b/wwwroot/api/opportunities_update.dspy new file mode 100644 index 0000000..c5f46e1 --- /dev/null +++ b/wwwroot/api/opportunities_update.dspy @@ -0,0 +1,39 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +"""Opportunity update API""" +import json, time + +result = {'widgettype': 'Message', 'options': {'title': 'Error', 'message': 'Invalid request', 'type': 'error'}} + +try: + record_id = params_kw.get('id', '').strip() + if not record_id: + result['options'] = {'title': 'Error', 'message': '记录ID不能为空', 'type': 'error'} + else: + dbname = get_module_dbname('opportunity_management') + now = time.strftime('%Y-%m-%d %H:%M:%S') + + fields = ['customer_id', 'customer_name', 'opportunity_name', 'estimated_amount', 'current_stage', 'expected_close_date', 'owner_id', 'owner_name', 'region', 'description', 'probability', 'predicted_revenue', 'status', 'source_type'] + set_parts = [] + params = {'id': record_id} + for f in fields: + val = params_kw.get(f, None) + if val is not None: + set_parts.append(f"{f} = %({f})s") + params[f] = val + + set_parts.append("updated_at = ${updated_at}$") + params['updated_at'] = now + + if len(set_parts) <= 1: + result['options'] = {'title': 'Error', 'message': '没有可更新的字段', 'type': 'error'} + else: + sql = f"UPDATE opportunities SET {', '.join(set_parts)} WHERE id = ${id}$" + async with DBPools().sqlorContext(dbname) as sor: + await sor.sqlExe(sql, params) + result = {'widgettype': 'Message', 'options': {'title': 'Success', 'message': '更新成功', 'type': 'success'}} + +except Exception as e: + result['options'] = {'title': 'Error', 'message': f'更新失败: {str(e)}', 'type': 'error'} + +return json.dumps(result, ensure_ascii=False) diff --git a/wwwroot/api/sales_stages_list.dspy b/wwwroot/api/sales_stages_list.dspy new file mode 100644 index 0000000..986b6f8 --- /dev/null +++ b/wwwroot/api/sales_stages_list.dspy @@ -0,0 +1,21 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +"""Sales stages list API""" +import json + +result = {'success': False, 'rows': [], 'total': 0} + +try: + dbname = get_module_dbname('opportunity_management') + sql = "SELECT id, stage_name, stage_order, conversion_rate, is_won_stage, is_lost_stage FROM sales_stages ORDER BY stage_order ASC" + + async with DBPools().sqlorContext(dbname) as sor: + data = await sor.sqlExe(sql, {}) + result['rows'] = [dict(r) for r in data] + result['total'] = len(data) + result['success'] = True + +except Exception as e: + result['error'] = str(e) + +return json.dumps(result, ensure_ascii=False, default=str) diff --git a/wwwroot/base.ui b/wwwroot/base.ui index e5883ca..8e2e25e 100644 --- a/wwwroot/base.ui +++ b/wwwroot/base.ui @@ -1,55 +1,67 @@ { - "widgettype": "TabPanel", - "options": { - "title": "商机管理" - }, - "subwidgets": [ - { - "widgettype": "CRUD", - "options": { - "title": "商机列表", - "url": "{{entire_url(opportunities_list)}}" - } + "type": "Page", + "title": "Opportunity Management", + "content": { + "type": "VBox", + "style": {"padding": "16px", "height": "100%"}, + "children": [ + { + "type": "HBox", + "justify": "space-between", + "style": {"marginBottom": "16px"}, + "children": [ + {"type": "Text", "content": "Opportunity List", "style": {"fontSize": "20px", "fontWeight": "bold"}}, + {"type": "Button", "text": "Add Opportunity", "variant": "primary", "leadingIcon": "add", "onclick": "openDialog('addOpportunityDialog')"} + ] + }, + { + "type": "CRUD", + "id": "opportunityCRUD", + "api": { + "list": "api/opportunities_list.dspy", + "create": "api/opportunities_create.dspy", + "update": "api/opportunities_update.dspy", + "delete": "api/opportunities_delete.dspy" }, - { - "widgettype": "CRUD", - "options": { - "title": "销售阶段", - "url": "{{entire_url(sales_stages_list)}}" - } - }, - { - "widgettype": "Panel", - "options": { - "title": "销售漏斗分析" - }, - "subwidgets": [ - { - "widgettype": "Chart", - "options": { - "title": "漏斗可视化", - "chartType": "funnel", - "dataSource": "get_funnel_analysis", - "refreshInterval": 30000 - } - } - ] - }, - { - "widgettype": "Panel", - "options": { - "title": "销售预测" - }, - "subwidgets": [ - { - "widgettype": "Card", - "options": { - "title": "今日预测汇总", - "dataSource": "get_sales_prediction", - "fields": ["total_predicted", "avg_confidence", "opportunity_count"] - } - } - ] + "columns": [ + {"field": "opportunity_name", "title": "Opportunity", "width": 200}, + {"field": "customer_name", "title": "Customer", "width": 150}, + {"field": "estimated_amount", "title": "Amount", "width": 120}, + {"field": "current_stage", "title": "Stage", "width": 120}, + {"field": "probability", "title": "Probability", "width": 100}, + {"field": "expected_close_date", "title": "Close Date", "width": 120}, + {"field": "owner_name", "title": "Owner", "width": 100}, + {"field": "status", "title": "Status", "width": 80, "formatter": "code:opportunity_status"}, + {"field": "created_at", "title": "Created", "width": 160} + ], + "style": {"flex": 1} + }, + { + "type": "Dialog", + "id": "addOpportunityDialog", + "title": "Add Opportunity", + "width": 600, + "content": { + "type": "Form", + "id": "addOpportunityForm", + "fields": [ + {"field": "opportunity_name", "title": "Opportunity Name", "uitype": "TextField", "required": true}, + {"field": "customer_id", "title": "Customer ID", "uitype": "TextField", "required": true}, + {"field": "customer_name", "title": "Customer Name", "uitype": "TextField", "required": true}, + {"field": "estimated_amount", "title": "Estimated Amount", "uitype": "TextField", "required": true}, + {"field": "current_stage", "title": "Stage", "uitype": "TextField", "required": true}, + {"field": "expected_close_date", "title": "Expected Close Date", "uitype": "DateField", "required": true}, + {"field": "owner_id", "title": "Owner ID", "uitype": "TextField", "required": true}, + {"field": "owner_name", "title": "Owner Name", "uitype": "TextField", "required": true}, + {"field": "region", "title": "Region", "uitype": "TextField"}, + {"field": "description", "title": "Description", "uitype": "TextField"} + ], + "actions": [ + {"type": "Button", "text": "Cancel", "onclick": "closeDialog('addOpportunityDialog')"}, + {"type": "Button", "text": "Submit", "variant": "primary", "onclick": "submitForm('addOpportunityForm', 'api/opportunities_create.dspy', 'addOpportunityDialog', 'opportunityCRUD')"} + ] } + } ] -} \ No newline at end of file + } +} diff --git a/wwwroot/opportunity_edit.ui b/wwwroot/opportunity_edit.ui new file mode 100644 index 0000000..d902c04 --- /dev/null +++ b/wwwroot/opportunity_edit.ui @@ -0,0 +1,58 @@ +{ + "widgettype": "Page", + "options": { + "title": "商机编辑", + "style": {"height": "100vh", "padding": "0"} + }, + "subwidgets": [ + { + "widgettype": "VBox", + "options": {"style": {"padding": "16px", "flex": 1, "overflow": "auto"}}, + "subwidgets": [ + { + "widgettype": "Form", + "id": "opportunity_form", + "options": { + "submit_url": "{{entire_url('api/opportunities_create.dspy')}}", + "method": "POST", + "layout": "vertical", + "style": {"maxWidth": "600px"}, + "fields": [ + {"name": "opportunity_name", "label": "商机名称", "uitype": "text", "required": true}, + {"name": "customer_id", "label": "客户ID", "uitype": "text", "required": true}, + {"name": "customer_name", "label": "客户名称", "uitype": "text", "required": true}, + {"name": "estimated_amount", "label": "预估金额", "uitype": "number", "required": true}, + {"name": "current_stage", "label": "阶段", "uitype": "code", "required": true, "data": [ + {"value": "qualification", "text": "资格确认"}, + {"value": "needs_analysis", "text": "需求分析"}, + {"value": "proposal", "text": "方案报价"}, + {"value": "negotiation", "text": "商务谈判"}, + {"value": "closed_won", "text": "成交"}, + {"value": "closed_lost", "text": "丢单"} + ]}, + {"name": "expected_close_date", "label": "预计成交日期", "uitype": "date", "required": true}, + {"name": "owner_id", "label": "负责人ID", "uitype": "text", "required": true}, + {"name": "owner_name", "label": "负责人", "uitype": "text", "required": true}, + {"name": "region", "label": "区域", "uitype": "code", "data": [ + {"value": "east", "text": "华东"}, + {"value": "south", "text": "华南"}, + {"value": "west", "text": "华西"}, + {"value": "north", "text": "华北"} + ]}, + {"name": "probability", "label": "成功概率(%)", "uitype": "number"}, + {"name": "description", "label": "描述", "uitype": "textarea"}, + {"name": "status", "label": "状态", "uitype": "code", "data": [ + {"value": "active", "text": "有效"}, + {"value": "inactive", "text": "无效"} + ]} + ], + "buttons": [ + {"type": "submit", "text": "保存", "variant": "primary"}, + {"type": "button", "text": "取消", "action": "navigate('main/opportunity_management/opportunity_management.ui')"} + ] + } + } + ] + } + ] +} diff --git a/wwwroot/opportunity_list.dspy b/wwwroot/opportunity_list.dspy new file mode 100644 index 0000000..8d59885 --- /dev/null +++ b/wwwroot/opportunity_list.dspy @@ -0,0 +1,58 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +"""List opportunities""" +import json + +result = {'success': False, 'rows': [], 'total': 0} + +try: + dbname = get_module_dbname('opportunity_management') + async with DBPools().sqlorContext(dbname) as sor: + where_clauses = [] + where_ns = {} + + customer_id = params_kw.get('customer_id', '') + status = params_kw.get('status', '') + + if customer_id: + where_clauses.append("customer_id=${customer_id}$") + where_ns['customer_id'] = customer_id + if status: + where_clauses.append("status=${status}$") + where_ns['status'] = status + + where_sql = " AND ".join(where_clauses) + where_prefix = " WHERE " if where_clauses else "" + + count_sql = f"SELECT count(*) rcnt FROM opportunities{where_prefix}{where_sql}" + count_rows = await sor.sqlExe(count_sql, where_ns) + total = 0 + if count_rows and len(count_rows) > 0: + r = count_rows[0] + if hasattr(r, 'keys'): + total = r.get('rcnt', 0) + elif isinstance(r, dict): + total = r.get('rcnt', 0) + elif hasattr(r, 'rcnt'): + total = r.rcnt + + if total > 0: + ns = {'page': int(params_kw.get('page', 1)), 'rows': int(params_kw.get('rows', 20)), 'sort': params_kw.get('sort', 'created_at')} + sql = f"SELECT id, opportunity_name, customer_id, customer_name, estimated_amount, probability, expected_close_date, current_stage, status, owner_id, owner_name, source_type, region, created_at, updated_at FROM opportunities{where_prefix}{where_sql}" + + query_ns = dict(list(ns.items()) + list(where_ns.items())) + rows = await sor.sqlExe(sql, query_ns) + + # sqlExe with pagination returns {'total': N, 'rows': [...]} + if isinstance(rows, dict): + result['rows'] = rows.get('rows', []) + result['total'] = rows.get('total', total) + elif rows: + result['rows'] = [dict(r) if hasattr(r, 'keys') else r for r in rows] + result['total'] = total + + result['success'] = True +except Exception as e: + result['error'] = str(e) + +return json.dumps(result, ensure_ascii=False, default=str) diff --git a/wwwroot/opportunity_management.ui b/wwwroot/opportunity_management.ui index 7336fae..f4b3e13 100644 --- a/wwwroot/opportunity_management.ui +++ b/wwwroot/opportunity_management.ui @@ -1,177 +1,46 @@ { - "widgettype": "VBox", - "options": { - "width": "100%", - "height": "100%" - }, - "subwidgets": [ + "widgettype": "Page", + "options": { + "title": "商机管理", + "style": {"height": "100vh", "padding": "0"} + }, + "subwidgets": [ + { + "widgettype": "VBox", + "options": {"style": {"padding": "16px", "flex": 1, "overflow": "hidden"}}, + "subwidgets": [ { - "widgettype": "TabPanel", - "options": { - "tabs": [ - { - "title": "商机列表", - "id": "opportunities_tab" - }, - { - "title": "销售阶段配置", - "id": "stages_tab" - }, - { - "title": "商机分析", - "id": "analysis_tab" - } - ] - }, - "subwidgets": [ - { - "id": "opportunities_tab_content", - "widgettype": "VBox", - "options": { - "width": "100%", - "height": "100%" - }, - "subwidgets": [ - { - "widgettype": "DataGrid", - "options": { - "url": "{{entire_url('opportunities.json')}}", - "height": "100%", - "width": "100%", - "toolbar": true, - "search": true, - "export": true - }, - "binds": [ - { - "wid": "self", - "event": "row_selected", - "actiontype": "script", - "target": "self", - "script": "console.log('Selected opportunity:', event.params);" - } - ] - } - ] - }, - { - "id": "stages_tab_content", - "widgettype": "VBox", - "options": { - "width": "100%", - "height": "100%" - }, - "subwidgets": [ - { - "widgettype": "DataGrid", - "options": { - "url": "{{entire_url('sales_stages.json')}}", - "height": "100%", - "width": "100%", - "toolbar": true, - "search": false, - "export": true - } - } - ] - }, - { - "id": "analysis_tab_content", - "widgettype": "VBox", - "options": { - "width": "100%", - "height": "100%" - }, - "subwidgets": [ - { - "widgettype": "HBox", - "options": { - "width": "100%", - "height": "50px" - }, - "subwidgets": [ - { - "widgettype": "Form", - "options": { - "fields": [ - { - "uitype": "str", - "name": "region_filter", - "label": "区域筛选", - "placeholder": "选择区域" - }, - { - "uitype": "str", - "name": "owner_filter", - "label": "销售筛选", - "placeholder": "选择销售人员" - }, - { - "uitype": "button", - "name": "apply_filter", - "label": "应用筛选" - } - ], - "inline": true - }, - "binds": [ - { - "wid": "apply_filter", - "event": "click", - "actiontype": "script", - "target": "self", - "script": "const region = document.querySelector('[name=\"region_filter\"]').value;\nconst owner = document.querySelector('[name=\"owner_filter\"]').value;\n// 这里会触发漏斗图和预测数据的更新\nconsole.log('Applying filters:', {region, owner});" - } - ] - } - ] - }, - { - "widgettype": "VBox", - "options": { - "width": "100%", - "height": "calc(100% - 60px)" - }, - "subwidgets": [ - { - "widgettype": "HBox", - "options": { - "width": "100%", - "height": "50%" - }, - "subwidgets": [ - { - "widgettype": "Pie", - "options": { - "title": "各阶段商机数量占比", - "dataurl": "/api/opportunity/stage-counts", - "datamethod": "GET" - } - }, - { - "widgettype": "Bar", - "options": { - "title": "各阶段商机金额占比", - "dataurl": "/api/opportunity/stage-amounts", - "datamethod": "GET" - } - } - ] - }, - { - "widgettype": "Line", - "options": { - "title": "成交预测 vs 实际成交", - "dataurl": "/api/opportunity/prediction-accuracy", - "datamethod": "GET", - "height": "50%" - } - } - ] - } - ] - } + "widgettype": "HBox", + "options": {"style": {"marginBottom": "16px", "gap": "8px"}}, + "subwidgets": [ + {"widgettype": "TextField", "id": "search_keyword", "options": {"label": "搜索", "placeholder": "商机名称/客户", "style": {"flex": 1}}}, + {"widgettype": "Button", "id": "btn_search", "options": {"text": "搜索", "variant": "primary"}}, + {"widgettype": "Button", "id": "btn_add", "options": {"text": "新增", "variant": "primary", "action": "navigate('main/opportunity_management/opportunity_edit.ui')"}} + ] + }, + { + "widgettype": "DataGrid", + "id": "opportunity_grid", + "options": { + "url": "{{entire_url('api/opportunities_list.dspy')}}", + "style": {"flex": 1}, + "columns": [ + {"field": "opportunity_name", "header": "商机名称", "width": 200}, + {"field": "customer_name", "header": "客户", "width": 150}, + {"field": "estimated_amount", "header": "预估金额", "width": 120}, + {"field": "current_stage", "header": "阶段", "width": 120}, + {"field": "probability", "header": "概率", "width": 80}, + {"field": "expected_close_date", "header": "预计成交", "width": 120}, + {"field": "owner_name", "header": "负责人", "width": 100}, + {"field": "status", "header": "状态", "width": 80} + ], + "toolbar": [ + {"type": "button", "text": "编辑", "icon": "edit", "action": "navigate('main/opportunity_management/opportunity_edit.ui?id={% raw %}{{selectedRow.id}}{% endraw %}')"}, + {"type": "button", "text": "删除", "icon": "delete", "action": "doDelete('{% raw %}{{selectedRow.id}}{% endraw %}')"} ] + } } - ] -} \ No newline at end of file + ] + } + ] +} diff --git a/wwwroot/sales_stages_list.dspy b/wwwroot/sales_stages_list.dspy new file mode 100644 index 0000000..e919148 --- /dev/null +++ b/wwwroot/sales_stages_list.dspy @@ -0,0 +1,23 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +"""List sales stages""" +import json + +result = {'success': False, 'rows': [], 'total': 0} + +try: + dbname = get_module_dbname('opportunity_management') + async with DBPools().sqlorContext(dbname) as sor: + sql = "SELECT id, stage_name, stage_order, conversion_rate, is_lost_stage, is_won_stage, created_at, updated_at FROM sales_stages ORDER BY stage_order" + rows = await sor.sqlExe(sql, {'page': 1, 'rows': 50, 'sort': 'stage_order'}) + if isinstance(rows, dict): + result['rows'] = rows.get('rows', []) + result['total'] = rows.get('total', len(result['rows'])) + elif rows: + result['rows'] = [dict(r) if hasattr(r, 'keys') else r for r in rows] + result['total'] = len(result['rows']) + result['success'] = True +except Exception as e: + result['error'] = str(e) + +return json.dumps(result, ensure_ascii=False, default=str)