sync: local modifications to opportunity_management

- Updated core.py, init.py, opportunity_core.py, mysql.ddl.sql
- Updated UI: base.ui, opportunity_management.ui
- Added: opportunity.json, opportunity_stage_history.json
- Added API files: opportunities CRUD, sales_stages_list, check_tables
- Added UI/DSPY: opportunity_edit, opportunity_list, sales_stages_list
This commit is contained in:
yumoqing 2026-04-28 18:54:47 +08:00
parent b837692cc4
commit 2547fad996
17 changed files with 1182 additions and 885 deletions

View File

@ -1,67 +1,77 @@
-- 商机管理模块数据库表结构 -- Table from opportunities.json
CREATE TABLE IF NOT EXISTS `opportunities` (
`id` VARCHAR(32) NOT NULL COMMENT '主键 - UUID格式',
`customer_id` VARCHAR(32) NOT NULL COMMENT '关联客户管理模块的客户ID',
`customer_name` VARCHAR(255) NOT NULL COMMENT '客户名称,必填字段',
`opportunity_name` VARCHAR(255) NOT NULL COMMENT '商机标题或项目名称',
`estimated_amount` DECIMAL(15,2) NOT NULL COMMENT '预估成交金额,必填字段',
`current_stage` VARCHAR(50) NOT NULL COMMENT '当前所处的销售阶段,必填字段',
`expected_close_date` DATE NOT NULL COMMENT '预计成交日期,必填字段',
`source_type` VARCHAR(20) NOT NULL DEFAULT 'manual' COMMENT 'manual=手动录入, lead=线索转化',
`owner_id` VARCHAR(32) NOT NULL COMMENT '负责该商机的销售人员ID',
`owner_name` VARCHAR(100) NOT NULL COMMENT '负责该商机的销售人员姓名',
`region` VARCHAR(100) COMMENT '商机所属区域,用于分析筛选',
`description` TEXT COMMENT '商机详细描述信息',
`probability` FLOAT(5,2) NOT NULL DEFAULT '0.00' COMMENT '基于历史转化率计算的成交概率百分比',
`predicted_revenue` DECIMAL(15,2) NOT NULL DEFAULT '0.00' COMMENT '基于成交概率计算的预测收入',
`created_at` TIMESTAMP NOT NULL,
`updated_at` TIMESTAMP NOT NULL,
`status` VARCHAR(20) NOT NULL DEFAULT 'active' COMMENT 'active=活跃, won=已成交, lost=已丢失, closed=已关闭',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='商机表';
-- 1. 商机表 (opportunities) CREATE INDEX `idx_opportunities_customer` ON `opportunities` (`customer_id`);
CREATE TABLE IF NOT EXISTS opportunities ( CREATE INDEX `idx_opportunities_owner` ON `opportunities` (`owner_id`);
id VARCHAR(64) PRIMARY KEY, CREATE INDEX `idx_opportunities_stage` ON `opportunities` (`current_stage`);
customer_name VARCHAR(255) NOT NULL COMMENT '客户名称', CREATE INDEX `idx_opportunities_region` ON `opportunities` (`region`);
customer_id VARCHAR(64) COMMENT '客户ID关联客户管理模块', CREATE INDEX `idx_opportunities_status` ON `opportunities` (`status`);
estimated_amount DECIMAL(15,2) NOT NULL COMMENT '预估金额', CREATE INDEX `idx_opportunities_expected_close` ON `opportunities` (`expected_close_date`);
sales_stage VARCHAR(64) NOT NULL COMMENT '销售阶段',
expected_close_date DATE NOT NULL COMMENT '预计成交时间',
actual_close_date DATE COMMENT '实际成交时间',
source VARCHAR(32) DEFAULT 'manual' COMMENT '来源: manual/lead_conversion',
description TEXT COMMENT '描述',
owner_id VARCHAR(64) NOT NULL COMMENT '负责人ID',
org_id VARCHAR(64) NOT NULL COMMENT '组织ID',
status VARCHAR(32) DEFAULT 'active' COMMENT '状态: active/won/lost/deleted',
probability DECIMAL(5,4) DEFAULT 0.1000 COMMENT '成交概率',
next_action_date DATE COMMENT '下次行动日期',
next_action_description VARCHAR(255) COMMENT '下次行动描述',
tags VARCHAR(255) COMMENT '标签',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_customer_id (customer_id),
INDEX idx_sales_stage (sales_stage),
INDEX idx_owner_id (owner_id),
INDEX idx_org_id (org_id),
INDEX idx_status (status),
INDEX idx_expected_close_date (expected_close_date)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 2. 商机阶段变更历史表 (opportunity_stage_history) -- Table from opportunity_predictions.json
CREATE TABLE IF NOT EXISTS opportunity_stage_history ( CREATE TABLE IF NOT EXISTS `opportunity_predictions` (
id VARCHAR(64) PRIMARY KEY, `id` VARCHAR(32) NOT NULL COMMENT '主键 - UUID格式',
opportunity_id VARCHAR(64) NOT NULL, `opportunity_id` VARCHAR(32) NOT NULL COMMENT '关联的商机ID',
old_stage VARCHAR(64) NOT NULL COMMENT '原阶段', `predicted_amount` DECIMAL(15,2) NOT NULL DEFAULT '0.00' COMMENT '基于历史转化率计算的预测成交金额',
new_stage VARCHAR(64) NOT NULL COMMENT '新阶段', `confidence_level` DECIMAL(5,4) NOT NULL DEFAULT '0.0000' COMMENT '预测的置信度0-1',
change_reason TEXT NOT NULL COMMENT '变更原因', `prediction_date` DATE NOT NULL COMMENT '预测生成日期',
changed_by VARCHAR(64) NOT NULL COMMENT '变更人ID', `actual_amount` DECIMAL(15,2) COMMENT '实际成交金额(成交后更新)',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP, `deviation_rate` DECIMAL(5,4) COMMENT '预测与实际的偏差率',
INDEX idx_opportunity_id (opportunity_id), `created_at` TIMESTAMP NOT NULL COMMENT '记录创建时间',
INDEX idx_changed_by (changed_by), PRIMARY KEY (`id`)
FOREIGN KEY (opportunity_id) REFERENCES opportunities(id) ON DELETE CASCADE ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='商机预测表';
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 3. 销售漏斗配置表 (sales_funnel_config) CREATE UNIQUE INDEX `idx_predictions_opportunity` ON `opportunity_predictions` (`opportunity_id`, `prediction_date`);
CREATE TABLE IF NOT EXISTS sales_funnel_config ( CREATE INDEX `idx_predictions_date` ON `opportunity_predictions` (`prediction_date`);
id VARCHAR(64) PRIMARY KEY,
org_id VARCHAR(64) NOT NULL,
stage_name VARCHAR(64) NOT NULL COMMENT '阶段名称',
stage_order INT NOT NULL COMMENT '阶段顺序',
default_probability DECIMAL(5,4) NOT NULL COMMENT '默认成交概率',
color_code VARCHAR(16) COMMENT '颜色代码',
is_active TINYINT(1) DEFAULT 1 COMMENT '是否激活',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_org_stage (org_id, stage_name),
INDEX idx_org_id (org_id),
INDEX idx_stage_order (stage_order)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 插入默认销售漏斗配置 -- Table from opportunity_stage_history.json
INSERT IGNORE INTO sales_funnel_config (id, org_id, stage_name, stage_order, default_probability, color_code) VALUES CREATE TABLE IF NOT EXISTS `opportunity_stage_history` (
(REPLACE(UUID(), '-', ''), 'default', '初步接洽', 1, 0.1000, '#FF6B6B'), `id` VARCHAR(32) NOT NULL COMMENT '主键 - UUID格式',
(REPLACE(UUID(), '-', ''), 'default', '需求确认', 2, 0.3000, '#4ECDC4'), `opportunity_id` VARCHAR(32) NOT NULL COMMENT '关联的商机ID',
(REPLACE(UUID(), '-', ''), 'default', '方案报价', 3, 0.5000, '#45B7D1'), `from_stage` VARCHAR(50) COMMENT '变更前的销售阶段',
(REPLACE(UUID(), '-', ''), 'default', '合同谈判', 4, 0.7000, '#96CEB4'), `to_stage` VARCHAR(50) NOT NULL COMMENT '变更后的销售阶段',
(REPLACE(UUID(), '-', ''), 'default', '成交', 5, 1.0000, '#FFEAA7'); `change_reason` TEXT NOT NULL COMMENT '阶段变更的原因说明,必填字段',
`changed_by_id` VARCHAR(32) NOT NULL COMMENT '执行阶段变更的用户ID',
`changed_by_name` VARCHAR(100) NOT NULL COMMENT '执行阶段变更的用户姓名',
`changed_at` TIMESTAMP NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='商机阶段变更历史表';
CREATE INDEX `idx_stage_history_opportunity` ON `opportunity_stage_history` (`opportunity_id`);
CREATE INDEX `idx_stage_history_changed_by` ON `opportunity_stage_history` (`changed_by_id`);
CREATE INDEX `idx_stage_history_changed_at` ON `opportunity_stage_history` (`changed_at`);
-- Table from sales_stages.json
CREATE TABLE IF NOT EXISTS `sales_stages` (
`id` VARCHAR(32) NOT NULL COMMENT '主键 - UUID格式',
`stage_name` VARCHAR(100) NOT NULL COMMENT '销售阶段名称,如\'\'、\'\'',
`stage_order` INT NOT NULL COMMENT '阶段在销售漏斗中的顺序,从小到大',
`conversion_rate` FLOAT(5,2) NOT NULL DEFAULT '0.00' COMMENT '该阶段到下一阶段的历史平均转化率',
`is_won_stage` VARCHAR(5) NOT NULL DEFAULT 'no' COMMENT 'yes=成交阶段, no=非成交阶段',
`is_lost_stage` VARCHAR(5) NOT NULL DEFAULT 'no' COMMENT 'yes=丢失阶段, no=非丢失阶段',
`created_at` TIMESTAMP NOT NULL,
`updated_at` TIMESTAMP NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='销售阶段配置表';
CREATE UNIQUE INDEX `idx_sales_stages_order` ON `sales_stages` (`stage_order`);
CREATE UNIQUE INDEX `idx_sales_stages_name` ON `sales_stages` (`stage_name`);

121
opportunity.json Normal file
View File

@ -0,0 +1,121 @@
{
"summary": {
"name": "opportunity",
"description": "商机管理表,存储销售机会的全生命周期信息",
"version": "1.0.0"
},
"fields": {
"id": {
"type": "int",
"primary_key": true,
"auto_increment": true,
"description": "商机ID"
},
"org_id": {
"type": "int",
"not_null": true,
"description": "组织ID用于多租户隔离"
},
"customer_id": {
"type": "int",
"not_null": true,
"description": "客户ID关联客户管理模块"
},
"customer_name": {
"type": "varchar(255)",
"not_null": true,
"description": "客户名称"
},
"title": {
"type": "varchar(255)",
"not_null": true,
"description": "商机标题"
},
"estimated_amount": {
"type": "decimal(15,2)",
"not_null": true,
"description": "预估金额"
},
"current_stage": {
"type": "varchar(50)",
"not_null": true,
"default": "'初步接洽'",
"description": "当前销售阶段"
},
"expected_close_date": {
"type": "date",
"not_null": true,
"description": "预计成交时间"
},
"probability": {
"type": "decimal(5,2)",
"not_null": true,
"default": "0.00",
"description": "成交概率(百分比)"
},
"source": {
"type": "varchar(50)",
"not_null": true,
"default": "'manual'",
"description": "商机来源manual手动录入或 lead线索转化"
},
"owner_id": {
"type": "int",
"not_null": true,
"description": "负责人ID"
},
"owner_name": {
"type": "varchar(100)",
"not_null": true,
"description": "负责人姓名"
},
"description": {
"type": "text",
"description": "商机描述"
},
"created_at": {
"type": "datetime",
"not_null": true,
"default": "CURRENT_TIMESTAMP",
"description": "创建时间"
},
"updated_at": {
"type": "datetime",
"not_null": true,
"default": "CURRENT_TIMESTAMP",
"on_update": "CURRENT_TIMESTAMP",
"description": "更新时间"
},
"status": {
"type": "varchar(20)",
"not_null": true,
"default": "'active'",
"description": "状态active活跃、won成交、lost丢单"
}
},
"indexes": {
"idx_org_customer": ["org_id", "customer_id"],
"idx_org_owner": ["org_id", "owner_id"],
"idx_org_stage": ["org_id", "current_stage"],
"idx_org_status": ["org_id", "status"],
"idx_expected_close": ["org_id", "expected_close_date"]
},
"codes": {
"stages": {
"初步接洽": "初步接触客户,了解基本信息",
"需求确认": "深入了解客户需求和痛点",
"方案报价": "提供解决方案和报价",
"合同谈判": "商务条款和合同细节谈判",
"成交": "签订合同,完成交易"
},
"sources": {
"manual": "手动录入",
"lead": "线索转化"
},
"statuses": {
"active": "活跃",
"won": "成交",
"lost": "丢单"
}
}
}

View File

@ -5,7 +5,7 @@ import uuid
from ahserver.serverenv import ServerEnv from ahserver.serverenv import ServerEnv
from appPublic.worker import awaitify from appPublic.worker import awaitify
from sqlor.dbp import DBP from sqlor.dbpools import DBPools
async def create_opportunity( async def create_opportunity(
@ -18,20 +18,22 @@ async def create_opportunity(
region: str = None region: str = None
) -> Dict: ) -> Dict:
"""创建商机""" """创建商机"""
dbp = DBP() db = DBPools()
async with db.sqlorContext('default') as sor:
opportunity_id = str(uuid.uuid4()).replace('-', '') opportunity_id = str(uuid.uuid4()).replace('-', '')
# 验证阶段是否存在 # 验证客户是否存在
stage_exists = await dbp.select_one( customer_records = await sor.R("customers", {"filters": [{"field": "customer_name", "op": "=", "value": customer_name}]})
"sales_stages", if not customer_records or len(customer_records) == 0:
{"stage_name": current_stage, "is_active": "1"} raise ValueError(f"客户 {customer_name} 不存在")
)
if not stage_exists: customer = customer_records[0]
raise ValueError(f"销售阶段 '{current_stage}' 不存在或未启用") customer_id = customer.get("id", "")
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
opportunity_data = { opportunity_data = {
"id": opportunity_id, "id": opportunity_id,
"customer_id": customer_id,
"customer_name": customer_name, "customer_name": customer_name,
"estimated_amount": estimated_amount, "estimated_amount": estimated_amount,
"current_stage": current_stage, "current_stage": current_stage,
@ -39,44 +41,44 @@ async def create_opportunity(
"source_type": source_type, "source_type": source_type,
"owner_id": owner_id or get_current_user_id(), "owner_id": owner_id or get_current_user_id(),
"region": region, "region": region,
"probability": get_stage_probability(current_stage),
"created_at": now, "created_at": now,
"updated_at": now, "updated_at": now,
"status": "active" "status": "active"
} }
await dbp.insert("opportunities", opportunity_data) await sor.C("opportunities", opportunity_data)
# 创建初始预测记录
await create_prediction_record(opportunity_id, estimated_amount, current_stage)
return opportunity_data return opportunity_data
def get_stage_probability(stage: str) -> float:
"""根据销售阶段获取成交概率"""
stage_probabilities = {
"initial_contact": 0.1,
"needs_analysis": 0.2,
"proposal": 0.5,
"negotiation": 0.7,
"closed_won": 1.0,
"closed_lost": 0.0
}
return stage_probabilities.get(stage, 0.0)
async def update_opportunity_stage( async def update_opportunity_stage(
opportunity_id: str, opportunity_id: str,
new_stage: str, new_stage: str,
change_reason: str, notes: str = None
changed_by: str = None
) -> Dict: ) -> Dict:
"""更新商机阶段""" """更新商机阶段"""
dbp = DBP() db = DBPools()
changed_by = changed_by or get_current_user_id() async with db.sqlorContext('default') as sor:
opportunity_records = await sor.R("opportunities", {"filters": [{"field": "id", "op": "=", "value": opportunity_id}]})
# 获取当前商机信息 if not opportunity_records or len(opportunity_records) == 0:
opportunity = await dbp.select_one("opportunities", {"id": opportunity_id})
if not opportunity:
raise ValueError("商机不存在") raise ValueError("商机不存在")
if opportunity["current_stage"] == new_stage: opportunity = opportunity_records[0]
raise ValueError("商机已在目标阶段") if opportunity["status"] != "active":
raise ValueError("只能更新活跃状态的商机")
# 验证新阶段是否存在
new_stage_info = await dbp.select_one(
"sales_stages",
{"stage_name": new_stage, "is_active": "1"}
)
if not new_stage_info:
raise ValueError(f"销售阶段 '{new_stage}' 不存在或未启用")
# 记录阶段变更历史 # 记录阶段变更历史
history_id = str(uuid.uuid4()).replace('-', '') history_id = str(uuid.uuid4()).replace('-', '')
@ -85,187 +87,189 @@ async def update_opportunity_stage(
"opportunity_id": opportunity_id, "opportunity_id": opportunity_id,
"from_stage": opportunity["current_stage"], "from_stage": opportunity["current_stage"],
"to_stage": new_stage, "to_stage": new_stage,
"change_reason": change_reason, "notes": notes,
"changed_by": changed_by, "changed_by": get_current_user_id(),
"changed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") "changed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
} }
await dbp.insert("opportunity_stage_history", history_data) await sor.C("opportunity_stage_history", history_data)
# 更新商机阶段 # 更新商机
await dbp.update( now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
await sor.U(
"opportunities", "opportunities",
{"current_stage": new_stage, "updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}, {
{"id": opportunity_id} "current_stage": new_stage,
"probability": get_stage_probability(new_stage),
"updated_at": now
},
{"filters": [{"field": "id", "op": "=", "value": opportunity_id}]}
) )
# 更新预测记录 # 如果是关闭阶段,更新状态
await update_prediction_record(opportunity_id, opportunity["estimated_amount"], new_stage) if new_stage in ["closed_won", "closed_lost"]:
status = "won" if new_stage == "closed_won" else "lost"
await sor.U(
"opportunities",
{"status": status},
{"filters": [{"field": "id", "op": "=", "value": opportunity_id}]}
)
return {"opportunity_id": opportunity_id, "new_stage": new_stage}
async def assign_opportunity(
opportunity_id: str,
new_owner_id: str
) -> Dict:
"""分配商机给销售人员"""
db = DBPools()
async with db.sqlorContext('default') as sor:
opportunity_records = await sor.R("opportunities", {"filters": [{"field": "id", "op": "=", "value": opportunity_id}]})
if not opportunity_records or len(opportunity_records) == 0:
raise ValueError("商机不存在")
opportunity = opportunity_records[0]
old_owner_id = opportunity["owner_id"]
# 记录分配历史
assignment_id = str(uuid.uuid4()).replace('-', '')
assignment_data = {
"id": assignment_id,
"opportunity_id": opportunity_id,
"from_owner_id": old_owner_id,
"to_owner_id": new_owner_id,
"assigned_by": get_current_user_id(),
"assigned_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
await sor.C("opportunity_assignment_history", assignment_data)
# 更新商机负责人
await sor.U(
"opportunities",
{
"owner_id": new_owner_id,
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
{"filters": [{"field": "id", "op": "=", "value": opportunity_id}]}
)
return {"opportunity_id": opportunity_id, "new_owner_id": new_owner_id}
async def get_opportunity_funnel(
start_date: str = None,
end_date: str = None,
owner_id: str = None,
region: str = None
) -> Dict:
"""获取销售漏斗数据"""
db = DBPools()
async with db.sqlorContext('default') as sor:
filters = []
if start_date and end_date:
filters.append({"field": "created_at", "op": ">=", "value": start_date})
filters.append({"field": "created_at", "op": "<=", "value": end_date})
if owner_id:
filters.append({"field": "owner_id", "op": "=", "value": owner_id})
if region:
filters.append({"field": "region", "op": "=", "value": region})
opportunities = await sor.R("opportunities", {"filters": filters})
# 按阶段分组统计
funnel_data = {}
total_value = 0
for opp in opportunities:
stage = opp["current_stage"]
amount = float(opp["estimated_amount"]) if opp["estimated_amount"] else 0
probability = float(opp["probability"]) if opp["probability"] else 0
if stage not in funnel_data:
funnel_data[stage] = {
"count": 0,
"total_amount": 0,
"weighted_amount": 0
}
funnel_data[stage]["count"] += 1
funnel_data[stage]["total_amount"] += amount
funnel_data[stage]["weighted_amount"] += amount * probability
total_value += amount * probability
return { return {
"opportunity_id": opportunity_id, "funnel_data": funnel_data,
"from_stage": opportunity["current_stage"], "total_weighted_value": total_value,
"to_stage": new_stage, "total_opportunities": len(opportunities)
"history_id": history_id
} }
async def create_prediction_record( async def get_sales_performance(
opportunity_id: str, start_date: str,
estimated_amount: float, end_date: str,
current_stage: str
) -> Dict:
"""创建预测记录"""
dbp = DBP()
# 获取当前阶段的转化率
stage_info = await dbp.select_one("sales_stages", {"stage_name": current_stage})
conversion_rate = float(stage_info.get("conversion_rate", "0.0000")) if stage_info else 0.0
predicted_amount = estimated_amount * conversion_rate if conversion_rate > 0 else estimated_amount
confidence_level = min(conversion_rate, 1.0) if conversion_rate > 0 else 0.5
prediction_id = str(uuid.uuid4()).replace('-', '')
prediction_data = {
"id": prediction_id,
"opportunity_id": opportunity_id,
"predicted_amount": predicted_amount,
"confidence_level": confidence_level,
"prediction_date": date.today().strftime("%Y-%m-%d"),
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
await dbp.insert("opportunity_predictions", prediction_data)
return prediction_data
async def update_prediction_record(
opportunity_id: str,
estimated_amount: float,
current_stage: str
) -> Dict:
"""更新预测记录"""
dbp = DBP()
# 获取当前阶段的转化率
stage_info = await dbp.select_one("sales_stages", {"stage_name": current_stage})
conversion_rate = float(stage_info.get("conversion_rate", "0.0000")) if stage_info else 0.0
predicted_amount = estimated_amount * conversion_rate if conversion_rate > 0 else estimated_amount
confidence_level = min(conversion_rate, 1.0) if conversion_rate > 0 else 0.5
# 检查今天是否已有预测记录
today = date.today().strftime("%Y-%m-%d")
existing_prediction = await dbp.select_one(
"opportunity_predictions",
{"opportunity_id": opportunity_id, "prediction_date": today}
)
prediction_data = {
"predicted_amount": predicted_amount,
"confidence_level": confidence_level,
"prediction_date": today,
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
if existing_prediction:
# 更新现有记录
await dbp.update(
"opportunity_predictions",
prediction_data,
{"id": existing_prediction["id"]}
)
prediction_data["id"] = existing_prediction["id"]
else:
# 创建新记录
prediction_id = str(uuid.uuid4()).replace('-', '')
prediction_data["id"] = prediction_id
prediction_data["opportunity_id"] = opportunity_id
await dbp.insert("opportunity_predictions", prediction_data)
return prediction_data
async def get_funnel_analysis(
region: str = None,
owner_id: str = None
) -> List[Dict]:
"""获取销售漏斗分析数据"""
dbp = DBP()
# 构建查询条件
where_clause = {}
if region:
where_clause["region"] = region
if owner_id:
where_clause["owner_id"] = owner_id
# 获取所有活跃商机按阶段分组
funnel_data = await dbp.query("""
SELECT
o.current_stage,
s.stage_order,
COUNT(*) as opportunity_count,
SUM(o.estimated_amount) as total_amount,
AVG(p.predicted_amount) as avg_predicted_amount
FROM opportunities o
LEFT JOIN sales_stages s ON o.current_stage = s.stage_name
LEFT JOIN opportunity_predictions p ON o.id = p.opportunity_id
AND p.prediction_date = CURDATE()
WHERE o.status = 'active'
""" + (" AND o.region = %(region)s" if region else "") +
(" AND o.owner_id = %(owner_id)s" if owner_id else "") + """
GROUP BY o.current_stage, s.stage_order
ORDER BY s.stage_order ASC
""", where_clause)
return funnel_data
async def get_sales_prediction(
region: str = None,
owner_id: str = None owner_id: str = None
) -> Dict: ) -> Dict:
"""获取销售预测汇总""" """获取销售业绩数据"""
dbp = DBP() db = DBPools()
async with db.sqlorContext('default') as sor:
where_clause = {"prediction_date": date.today().strftime("%Y-%m-%d")} # 查询已关闭的商机(赢单)
if region: won_filters = [
where_clause["region"] = region {"field": "status", "op": "=", "value": "won"},
{"field": "updated_at", "op": ">=", "value": start_date},
{"field": "updated_at", "op": "<=", "value": end_date}
]
if owner_id: if owner_id:
where_clause["owner_id"] = owner_id won_filters.append({"field": "owner_id", "op": "=", "value": owner_id})
# 获取今日预测汇总 won_opportunities = await sor.R("opportunities", {"filters": won_filters})
prediction_summary = await dbp.query("""
SELECT
SUM(p.predicted_amount) as total_predicted,
SUM(p.confidence_level) / COUNT(*) as avg_confidence,
COUNT(*) as opportunity_count
FROM opportunity_predictions p
JOIN opportunities o ON p.opportunity_id = o.id
WHERE p.prediction_date = %(prediction_date)s
""" + (" AND o.region = %(region)s" if region else "") +
(" AND o.owner_id = %(owner_id)s" if owner_id else ""), where_clause)
if prediction_summary: total_revenue = sum(float(opp["estimated_amount"]) for opp in won_opportunities if opp["estimated_amount"])
return prediction_summary[0] total_count = len(won_opportunities)
return {"total_predicted": 0, "avg_confidence": 0, "opportunity_count": 0}
# 查询活跃商机
active_filters = [
{"field": "status", "op": "=", "value": "active"},
{"field": "created_at", "op": ">=", "value": start_date},
{"field": "created_at", "op": "<=", "value": end_date}
]
if owner_id:
active_filters.append({"field": "owner_id", "op": "=", "value": owner_id})
active_opportunities = await sor.R("opportunities", {"filters": active_filters})
active_count = len(active_opportunities)
weighted_pipeline = sum(
float(opp["estimated_amount"]) * float(opp["probability"])
for opp in active_opportunities
if opp["estimated_amount"] and opp["probability"]
)
return {
"period": f"{start_date} to {end_date}",
"total_revenue": total_revenue,
"total_closed_deals": total_count,
"active_opportunities": active_count,
"weighted_pipeline_value": weighted_pipeline,
"average_deal_size": total_revenue / total_count if total_count > 0 else 0
}
def get_current_user_id() -> str: def get_current_user_id() -> str:
"""获取当前用户ID模拟实现""" """获取当前用户ID模拟实现"""
# 在实际实现中,这里应该从会话或认证信息中获取
return "current_user_id" return "current_user_id"
# 同步版本函数(用于前端调用) # 同步版本函数
def sync_create_opportunity(*args, **kwargs): def sync_create_opportunity(*args, **kwargs):
return create_opportunity(*args, **kwargs) return create_opportunity(*args, **kwargs)
def sync_update_opportunity_stage(*args, **kwargs): def sync_update_opportunity_stage(*args, **kwargs):
return update_opportunity_stage(*args, **kwargs) return update_opportunity_stage(*args, **kwargs)
def sync_get_funnel_analysis(*args, **kwargs): def sync_assign_opportunity(*args, **kwargs):
return get_funnel_analysis(*args, **kwargs) return assign_opportunity(*args, **kwargs)
def sync_get_sales_prediction(*args, **kwargs): def sync_get_opportunity_funnel(*args, **kwargs):
return get_sales_prediction(*args, **kwargs) return get_opportunity_funnel(*args, **kwargs)
def sync_get_sales_performance(*args, **kwargs):
return get_sales_performance(*args, **kwargs)

View File

@ -2,9 +2,7 @@ from ahserver.serverenv import ServerEnv
from appPublic.worker import awaitify from appPublic.worker import awaitify
from .core import ( from .core import (
sync_create_opportunity, sync_create_opportunity,
sync_update_opportunity_stage, sync_update_opportunity_stage
sync_get_funnel_analysis,
sync_get_sales_prediction
) )
@ -12,5 +10,3 @@ def load_opportunity_management():
env = ServerEnv() env = ServerEnv()
env.create_opportunity = awaitify(sync_create_opportunity) env.create_opportunity = awaitify(sync_create_opportunity)
env.update_opportunity_stage = awaitify(sync_update_opportunity_stage) env.update_opportunity_stage = awaitify(sync_update_opportunity_stage)
env.get_funnel_analysis = awaitify(sync_get_funnel_analysis)
env.get_sales_prediction = awaitify(sync_get_sales_prediction)

View File

@ -1,399 +1,307 @@
""" """
商机管理模块 - 核心业务逻辑 商机管理核心模块
实现商机全生命周期管理和商机分析功能 实现商机全生命周期管理和商机分析功能
""" """
import os
import json
import uuid
from datetime import datetime, date from datetime import datetime, date
from typing import List, Dict, Optional, Tuple from decimal import Decimal
from appPublic.jsonconfig import getConfig from typing import List, Dict, Optional
from appPublic.worker import Worker import uuid
from sqlor.dbp import getDBP
from ahserver.serverenv import ServerEnv
from appPublic.worker import awaitify
from appPublic.Config import getConfig
from sqlor.dbpools import DBPools
class OpportunityManager: async def create_opportunity(
def __init__(self): customer_name: str,
self.config = getConfig() estimated_amount: float,
self.worker = Worker() current_stage: str,
expected_close_date: str,
async def get_db_connection(self, org_id: str): source_type: str = "manual",
"""获取数据库连接""" owner_id: str = None,
dbp = await getDBP(org_id) region: str = None
return dbp ) -> Dict:
async def create_opportunity(self, opportunity_data: Dict, user_id: str, org_id: str) -> str:
"""创建商机""" """创建商机"""
# 验证必填字段 db = DBPools()
required_fields = ['customer_name', 'estimated_amount', 'sales_stage', 'expected_close_date'] async with db.sqlorContext('default') as sor:
for field in required_fields:
if not opportunity_data.get(field):
raise ValueError(f"缺少必填字段: {field}")
opportunity_id = str(uuid.uuid4()).replace('-', '') opportunity_id = str(uuid.uuid4()).replace('-', '')
dbp = await self.get_db_connection(org_id)
# 插入商机数据 # 验证客户是否存在
sql = """ customer_records = await sor.R("customers", {"filters": [{"field": "customer_name", "op": "=", "value": customer_name}]})
INSERT INTO opportunities ( if not customer_records or len(customer_records) == 0:
id, customer_name, customer_id, estimated_amount, sales_stage, raise ValueError(f"客户 {customer_name} 不存在")
expected_close_date, source, description, owner_id, org_id,
status, created_at, updated_at, probability, next_action_date,
next_action_description, tags
) VALUES (
%(id)s, %(customer_name)s, %(customer_id)s, %(estimated_amount)s, %(sales_stage)s,
%(expected_close_date)s, %(source)s, %(description)s, %(owner_id)s, %(org_id)s,
%(status)s, NOW(), NOW(), %(probability)s, %(next_action_date)s,
%(next_action_description)s, %(tags)s
)
"""
params = { customer = customer_records[0]
'id': opportunity_id, customer_id = customer.get("id", "")
'customer_name': opportunity_data['customer_name'],
'customer_id': opportunity_data.get('customer_id'), now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
'estimated_amount': opportunity_data['estimated_amount'], opportunity_data = {
'sales_stage': opportunity_data['sales_stage'], "id": opportunity_id,
'expected_close_date': opportunity_data['expected_close_date'], "customer_id": customer_id,
'source': opportunity_data.get('source', 'manual'), # manual 或 lead_conversion "customer_name": customer_name,
'description': opportunity_data.get('description'), "estimated_amount": estimated_amount,
'owner_id': user_id, "current_stage": current_stage,
'org_id': org_id, "expected_close_date": expected_close_date,
'status': opportunity_data.get('status', 'active'), "source_type": source_type,
'probability': opportunity_data.get('probability', self._calculate_probability(opportunity_data['sales_stage'])), "owner_id": owner_id or get_current_user_id(),
'next_action_date': opportunity_data.get('next_action_date'), "region": region,
'next_action_description': opportunity_data.get('next_action_description'), "probability": get_stage_probability(current_stage),
'tags': opportunity_data.get('tags') "created_at": now,
"updated_at": now,
"status": "active"
} }
await dbp.doTransaction([{'sql': sql, 'params': params}]) await sor.C("opportunities", opportunity_data)
return opportunity_id return opportunity_data
def _calculate_probability(self, sales_stage: str) -> float:
"""根据销售阶段计算成交概率""" def get_stage_probability(stage: str) -> float:
"""根据销售阶段获取成交概率"""
stage_probabilities = { stage_probabilities = {
'初步接洽': 0.1, "initial_contact": 0.1,
'需求确认': 0.3, "needs_analysis": 0.2,
'方案报价': 0.5, "proposal": 0.5,
'合同谈判': 0.7, "negotiation": 0.7,
'成交': 1.0 "closed_won": 1.0,
"closed_lost": 0.0
} }
return stage_probabilities.get(sales_stage, 0.1) return stage_probabilities.get(stage, 0.0)
async def update_opportunity_stage(self, opportunity_id: str, new_stage: str,
change_reason: str, user_id: str, org_id: str) -> bool: async def update_opportunity_stage(
opportunity_id: str,
new_stage: str,
notes: str = None
) -> Dict:
"""更新商机阶段""" """更新商机阶段"""
dbp = await self.get_db_connection(org_id) db = DBPools()
async with db.sqlorContext('default') as sor:
# 获取当前商机信息 opportunity_records = await sor.R("opportunities", {"filters": [{"field": "id", "op": "=", "value": opportunity_id}]})
current_opportunity = await dbp.select_one("opportunities", {"id": opportunity_id, "org_id": org_id}) if not opportunity_records or len(opportunity_records) == 0:
if not current_opportunity:
raise ValueError("商机不存在") raise ValueError("商机不存在")
old_stage = current_opportunity['sales_stage'] opportunity = opportunity_records[0]
if opportunity["status"] != "active":
# 更新商机阶段和概率 raise ValueError("只能更新活跃状态的商机")
new_probability = self._calculate_probability(new_stage)
update_data = {
'sales_stage': new_stage,
'probability': new_probability,
'updated_at': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
# 如果阶段变为成交,更新状态
if new_stage == '成交':
update_data['status'] = 'won'
update_data['actual_close_date'] = datetime.now().strftime("%Y-%m-%d")
# 如果阶段回退,可能需要更新状态
elif old_stage == '成交' and new_stage != '成交':
update_data['status'] = 'active'
update_data['actual_close_date'] = None
result = await dbp.update("opportunities", update_data, {"id": opportunity_id, "org_id": org_id})
# 记录阶段变更历史 # 记录阶段变更历史
if old_stage != new_stage:
await self._record_stage_change(opportunity_id, old_stage, new_stage, change_reason, user_id, org_id)
return result.rowcount > 0
async def _record_stage_change(self, opportunity_id: str, old_stage: str,
new_stage: str, change_reason: str, user_id: str, org_id: str):
"""记录阶段变更历史"""
history_id = str(uuid.uuid4()).replace('-', '') history_id = str(uuid.uuid4()).replace('-', '')
dbp = await self.get_db_connection(org_id)
history_data = { history_data = {
'id': history_id, "id": history_id,
'opportunity_id': opportunity_id, "opportunity_id": opportunity_id,
'old_stage': old_stage, "from_stage": opportunity["current_stage"],
'new_stage': new_stage, "to_stage": new_stage,
'change_reason': change_reason, "notes": notes,
'changed_by': user_id, "changed_by": get_current_user_id(),
'created_at': datetime.now().strftime("%Y-%m-%d %H:%M:%S") "changed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
} }
await sor.C("opportunity_stage_history", history_data)
await dbp.insert("opportunity_stage_history", history_data) # 更新商机
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
async def get_funnel_analysis(self, org_id: str, filters: Optional[Dict] = None) -> Dict: await sor.U(
"""获取销售漏斗分析数据""" "opportunities",
dbp = await self.get_db_connection(org_id) {
"current_stage": new_stage,
# 构建查询条件 "probability": get_stage_probability(new_stage),
where_clauses = ["o.org_id = %(org_id)s", "o.status = 'active'"] "updated_at": now
params = {'org_id': org_id} },
{"filters": [{"field": "id", "op": "=", "value": opportunity_id}]}
if filters:
if filters.get('region'):
where_clauses.append("c.region = %(region)s")
params['region'] = filters['region']
if filters.get('owner_id'):
where_clauses.append("o.owner_id = %(owner_id)s")
params['owner_id'] = filters['owner_id']
if filters.get('date_range'):
start_date, end_date = filters['date_range']
where_clauses.append("o.created_at BETWEEN %(start_date)s AND %(end_date)s")
params['start_date'] = start_date
params['end_date'] = end_date
where_sql = " AND ".join(where_clauses)
# 获取各阶段商机数量和金额
funnel_sql = f"""
SELECT
o.sales_stage,
COUNT(*) as opportunity_count,
SUM(o.estimated_amount) as total_amount,
AVG(o.probability) as avg_probability
FROM opportunities o
LEFT JOIN customers c ON o.customer_id = c.id
WHERE {where_sql}
GROUP BY o.sales_stage
ORDER BY
CASE o.sales_stage
WHEN '初步接洽' THEN 1
WHEN '需求确认' THEN 2
WHEN '方案报价' THEN 3
WHEN '合同谈判' THEN 4
WHEN '成交' THEN 5
ELSE 99
END
"""
funnel_data = await dbp.doQuery(funnel_sql, params)
# 计算预测成交金额
predicted_revenue = 0
for item in funnel_data:
predicted_revenue += float(item['total_amount']) * float(item['avg_probability'])
return {
'funnel_data': funnel_data,
'predicted_revenue': predicted_revenue,
'total_opportunities': sum(item['opportunity_count'] for item in funnel_data),
'total_estimated_amount': sum(float(item['total_amount']) for item in funnel_data)
}
async def get_conversion_rate_analysis(self, org_id: str, period: str = 'last_6_months') -> Dict:
"""获取转化率分析数据(用于预测功能)"""
dbp = await self.get_db_connection(org_id)
# 确定时间范围
if period == 'last_6_months':
start_date = (date.today().replace(day=1) - timedelta(days=180)).strftime("%Y-%m-01")
elif period == 'last_year':
start_date = (date.today().replace(day=1) - timedelta(days=365)).strftime("%Y-%m-01")
else:
start_date = (date.today() - timedelta(days=90)).strftime("%Y-%m-%d")
# 获取历史转化数据
conversion_sql = """
SELECT
DATE_FORMAT(created_at, '%%Y-%%m') as month,
COUNT(*) as total_opportunities,
SUM(CASE WHEN status = 'won' THEN 1 ELSE 0 END) as won_opportunities,
AVG(CASE WHEN status = 'won' THEN estimated_amount ELSE 0 END) as avg_won_amount
FROM opportunities
WHERE org_id = %(org_id)s
AND created_at >= %(start_date)s
GROUP BY DATE_FORMAT(created_at, '%%Y-%%m')
ORDER BY month
"""
conversion_data = await dbp.doQuery(conversion_sql, {'org_id': org_id, 'start_date': start_date})
# 计算整体转化率
total_opps = sum(item['total_opportunities'] for item in conversion_data)
total_won = sum(item['won_opportunities'] for item in conversion_data)
overall_conversion_rate = total_won / total_opps if total_opps > 0 else 0
return {
'conversion_data': conversion_data,
'overall_conversion_rate': overall_conversion_rate,
'period': period,
'start_date': start_date
}
async def predict_revenue(self, org_id: str, filters: Optional[Dict] = None) -> Dict:
"""预测收入(基于历史转化率)"""
# 获取当前活跃商机
dbp = await self.get_db_connection(org_id)
where_clauses = ["org_id = %(org_id)s", "status = 'active'"]
params = {'org_id': org_id}
if filters:
if filters.get('owner_id'):
where_clauses.append("owner_id = %(owner_id)s")
params['owner_id'] = filters['owner_id']
if filters.get('region'):
# 需要关联客户表
pass
where_sql = " AND ".join(where_clauses)
opportunities_sql = f"""
SELECT id, estimated_amount, probability, sales_stage
FROM opportunities
WHERE {where_sql}
"""
current_opportunities = await dbp.doQuery(opportunities_sql, params)
# 获取历史转化率
conversion_analysis = await self.get_conversion_rate_analysis(org_id)
historical_conversion_rate = conversion_analysis['overall_conversion_rate']
# 计算预测收入
stage_based_prediction = sum(
float(opp['estimated_amount']) * float(opp['probability'])
for opp in current_opportunities
) )
historical_based_prediction = sum( # 如果是关闭阶段,更新状态
float(opp['estimated_amount']) * historical_conversion_rate if new_stage in ["closed_won", "closed_lost"]:
for opp in current_opportunities status = "won" if new_stage == "closed_won" else "lost"
await sor.U(
"opportunities",
{"status": status},
{"filters": [{"field": "id", "op": "=", "value": opportunity_id}]}
) )
# 使用加权平均70%阶段概率 + 30%历史转化率) return {"opportunity_id": opportunity_id, "new_stage": new_stage}
final_prediction = stage_based_prediction * 0.7 + historical_based_prediction * 0.3
async def assign_opportunity(
opportunity_id: str,
new_owner_id: str
) -> Dict:
"""分配商机给销售人员"""
db = DBPools()
async with db.sqlorContext('default') as sor:
opportunity_records = await sor.R("opportunities", {"filters": [{"field": "id", "op": "=", "value": opportunity_id}]})
if not opportunity_records or len(opportunity_records) == 0:
raise ValueError("商机不存在")
opportunity = opportunity_records[0]
old_owner_id = opportunity["owner_id"]
# 记录分配历史
assignment_id = str(uuid.uuid4()).replace('-', '')
assignment_data = {
"id": assignment_id,
"opportunity_id": opportunity_id,
"from_owner_id": old_owner_id,
"to_owner_id": new_owner_id,
"assigned_by": get_current_user_id(),
"assigned_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
await sor.C("opportunity_assignment_history", assignment_data)
# 更新商机负责人
await sor.U(
"opportunities",
{
"owner_id": new_owner_id,
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
{"filters": [{"field": "id", "op": "=", "value": opportunity_id}]}
)
return {"opportunity_id": opportunity_id, "new_owner_id": new_owner_id}
async def get_opportunity_funnel(
start_date: str = None,
end_date: str = None,
owner_id: str = None,
region: str = None
) -> Dict:
"""获取销售漏斗数据"""
db = DBPools()
async with db.sqlorContext('default') as sor:
filters = []
if start_date and end_date:
filters.append({"field": "created_at", "op": ">=", "value": start_date})
filters.append({"field": "created_at", "op": "<=", "value": end_date})
if owner_id:
filters.append({"field": "owner_id", "op": "=", "value": owner_id})
if region:
filters.append({"field": "region", "op": "=", "value": region})
opportunities = await sor.R("opportunities", {"filters": filters})
# 按阶段分组统计
funnel_data = {}
total_value = 0
for opp in opportunities:
stage = opp["current_stage"]
amount = float(opp["estimated_amount"]) if opp["estimated_amount"] else 0
probability = float(opp["probability"]) if opp["probability"] else 0
if stage not in funnel_data:
funnel_data[stage] = {
"count": 0,
"total_amount": 0,
"weighted_amount": 0
}
funnel_data[stage]["count"] += 1
funnel_data[stage]["total_amount"] += amount
funnel_data[stage]["weighted_amount"] += amount * probability
total_value += amount * probability
return { return {
'stage_based_prediction': stage_based_prediction, "funnel_data": funnel_data,
'historical_based_prediction': historical_based_prediction, "total_weighted_value": total_value,
'final_prediction': final_prediction, "total_opportunities": len(opportunities)
'opportunity_count': len(current_opportunities),
'total_estimated_amount': sum(float(opp['estimated_amount']) for opp in current_opportunities),
'confidence_level': 'high' if len(current_opportunities) > 10 else 'medium'
} }
# 基础CRUD操作
async def update_opportunity(self, opportunity_id: str, opportunity_data: Dict, org_id: str) -> bool:
"""更新商机"""
dbp = await self.get_db_connection(org_id)
opportunity_data['updated_at'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
result = await dbp.update("opportunities", opportunity_data, {"id": opportunity_id, "org_id": org_id})
return result.rowcount > 0
async def delete_opportunity(self, opportunity_id: str, org_id: str) -> bool: async def get_sales_performance(
"""删除商机(软删除)""" start_date: str,
dbp = await self.get_db_connection(org_id) end_date: str,
update_data = { owner_id: str = None
'status': 'deleted', ) -> Dict:
'updated_at': datetime.now().strftime("%Y-%m-%d %H:%M:%S") """获取销售业绩数据"""
db = DBPools()
async with db.sqlorContext('default') as sor:
# 查询已关闭的商机(赢单)
won_filters = [
{"field": "status", "op": "=", "value": "won"},
{"field": "updated_at", "op": ">=", "value": start_date},
{"field": "updated_at", "op": "<=", "value": end_date}
]
if owner_id:
won_filters.append({"field": "owner_id", "op": "=", "value": owner_id})
won_opportunities = await sor.R("opportunities", {"filters": won_filters})
total_revenue = sum(float(opp["estimated_amount"]) for opp in won_opportunities if opp["estimated_amount"])
total_count = len(won_opportunities)
# 查询活跃商机
active_filters = [
{"field": "status", "op": "=", "value": "active"},
{"field": "created_at", "op": ">=", "value": start_date},
{"field": "created_at", "op": "<=", "value": end_date}
]
if owner_id:
active_filters.append({"field": "owner_id", "op": "=", "value": owner_id})
active_opportunities = await sor.R("opportunities", {"filters": active_filters})
active_count = len(active_opportunities)
weighted_pipeline = sum(
float(opp["estimated_amount"]) * float(opp["probability"])
for opp in active_opportunities
if opp["estimated_amount"] and opp["probability"]
)
return {
"period": f"{start_date} to {end_date}",
"total_revenue": total_revenue,
"total_closed_deals": total_count,
"active_opportunities": active_count,
"weighted_pipeline_value": weighted_pipeline,
"average_deal_size": total_revenue / total_count if total_count > 0 else 0
} }
result = await dbp.update("opportunities", update_data, {"id": opportunity_id, "org_id": org_id})
return result.rowcount > 0
async def get_opportunity_by_id(self, opportunity_id: str, org_id: str) -> Optional[Dict]:
"""根据ID获取商机"""
dbp = await self.get_db_connection(org_id)
sql = "SELECT * FROM opportunities WHERE id = %(id)s AND org_id = %(org_id)s AND status != 'deleted'"
result = await dbp.doQuery(sql, {'id': opportunity_id, 'org_id': org_id})
return result[0] if result else None
async def list_opportunities(self, org_id: str, filters: Optional[Dict] = None,
page: int = 1, page_size: int = 20) -> Tuple[List[Dict], int]:
"""列出商机"""
dbp = await self.get_db_connection(org_id)
# 构建查询条件
where_clauses = ["o.org_id = %(org_id)s", "o.status != 'deleted'"]
params = {'org_id': org_id}
if filters:
if filters.get('customer_name'):
where_clauses.append("o.customer_name LIKE %(customer_name)s")
params['customer_name'] = f"%{filters['customer_name']}%"
if filters.get('sales_stage'):
where_clauses.append("o.sales_stage = %(sales_stage)s")
params['sales_stage'] = filters['sales_stage']
if filters.get('owner_id'):
where_clauses.append("o.owner_id = %(owner_id)s")
params['owner_id'] = filters['owner_id']
if filters.get('status'):
where_clauses.append("o.status = %(status)s")
params['status'] = filters['status']
if filters.get('expected_close_date_from'):
where_clauses.append("o.expected_close_date >= %(expected_close_date_from)s")
params['expected_close_date_from'] = filters['expected_close_date_from']
if filters.get('expected_close_date_to'):
where_clauses.append("o.expected_close_date <= %(expected_close_date_to)s")
params['expected_close_date_to'] = filters['expected_close_date_to']
where_sql = " AND ".join(where_clauses)
# 获取总数
count_sql = f"SELECT COUNT(*) as total FROM opportunities o WHERE {where_sql}"
count_result = await dbp.doQuery(count_sql, params)
total = count_result[0]['total'] if count_result else 0
# 获取分页数据
offset = (page - 1) * page_size
data_sql = f"""
SELECT o.*, u.username as owner_name
FROM opportunities o
LEFT JOIN users u ON o.owner_id = u.id
WHERE {where_sql}
ORDER BY o.created_at DESC
LIMIT %(limit)s OFFSET %(offset)s
"""
params['limit'] = page_size
params['offset'] = offset
data_result = await dbp.doQuery(data_sql, params)
return data_result, total
# 全局实例 def get_current_user_id() -> str:
opportunity_manager = OpportunityManager() """获取当前用户ID模拟实现"""
return "current_user_id"
# 导出函数
async def create_opportunity(opportunity_data: Dict, user_id: str, org_id: str) -> str:
return await opportunity_manager.create_opportunity(opportunity_data, user_id, org_id)
async def update_opportunity_stage(opportunity_id: str, new_stage: str, # 兼容旧接口
change_reason: str, user_id: str, org_id: str) -> bool: async def get_funnel_analysis(**kwargs):
return await opportunity_manager.update_opportunity_stage(opportunity_id, new_stage, change_reason, user_id, org_id) return await get_opportunity_funnel(**kwargs)
async def get_funnel_analysis(org_id: str, filters: Optional[Dict] = None) -> Dict: async def predict_revenue(start_date: str, end_date: str):
return await opportunity_manager.get_funnel_analysis(org_id, filters) performance = await get_sales_performance(start_date, end_date)
return {"predicted_revenue": performance["weighted_pipeline_value"]}
async def predict_revenue(org_id: str, filters: Optional[Dict] = None) -> Dict: async def update_opportunity(opportunity_id: str, **updates):
return await opportunity_manager.predict_revenue(org_id, filters) db = DBPools()
async with db.sqlorContext('default') as sor:
updates["updated_at"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
await sor.U(
"opportunities",
updates,
{"filters": [{"field": "id", "op": "=", "value": opportunity_id}]}
)
return {"opportunity_id": opportunity_id, "updated": True}
# 基础CRUD接口 async def delete_opportunity(opportunity_id: str):
async def update_opportunity(opportunity_id: str, opportunity_data: Dict, org_id: str) -> bool: db = DBPools()
return await opportunity_manager.update_opportunity(opportunity_id, opportunity_data, org_id) async with db.sqlorContext('default') as sor:
await sor.D(
"opportunities",
{"filters": [{"field": "id", "op": "=", "value": opportunity_id}]}
)
return {"opportunity_id": opportunity_id, "deleted": True}
async def delete_opportunity(opportunity_id: str, org_id: str) -> bool: async def get_opportunity_by_id(opportunity_id: str):
return await opportunity_manager.delete_opportunity(opportunity_id, org_id) db = DBPools()
async with db.sqlorContext('default') as sor:
records = await sor.R("opportunities", {"filters": [{"field": "id", "op": "=", "value": opportunity_id}]})
return records[0] if records else None
async def get_opportunity_by_id(opportunity_id: str, org_id: str) -> Optional[Dict]: async def list_opportunities(**filters):
return await opportunity_manager.get_opportunity_by_id(opportunity_id, org_id) db = DBPools()
async with db.sqlorContext('default') as sor:
filter_list = []
for field, value in filters.items():
filter_list.append({"field": field, "op": "=", "value": value})
async def list_opportunities(org_id: str, filters: Optional[Dict] = None, return await sor.R("opportunities", {"filters": filter_list})
page: int = 1, page_size: int = 20) -> Tuple[List[Dict], int]:
return await opportunity_manager.list_opportunities(org_id, filters, page, page_size)

View File

@ -0,0 +1,60 @@
{
"summary": {
"name": "opportunity_stage_history",
"description": "商机阶段变更历史记录表",
"version": "1.0.0"
},
"fields": {
"id": {
"type": "int",
"primary_key": true,
"auto_increment": true,
"description": "历史记录ID"
},
"opportunity_id": {
"type": "int",
"not_null": true,
"description": "商机ID"
},
"org_id": {
"type": "int",
"not_null": true,
"description": "组织ID"
},
"from_stage": {
"type": "varchar(50)",
"description": "变更前阶段"
},
"to_stage": {
"type": "varchar(50)",
"not_null": true,
"description": "变更后阶段"
},
"change_reason": {
"type": "text",
"not_null": true,
"description": "阶段变更原因"
},
"changed_by": {
"type": "int",
"not_null": true,
"description": "变更人ID"
},
"changed_by_name": {
"type": "varchar(100)",
"not_null": true,
"description": "变更人姓名"
},
"created_at": {
"type": "datetime",
"not_null": true,
"default": "CURRENT_TIMESTAMP",
"description": "变更时间"
}
},
"indexes": {
"idx_opportunity": ["opportunity_id"],
"idx_org": ["org_id"]
},
"codes": {}
}

View File

@ -0,0 +1,18 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
result = {}
try:
dbname = get_module_dbname('opportunity_management')
async with DBPools().sqlorContext(dbname) as sor:
ns = {'page': 1, 'rows': 50, 'sort': 'COLUMN_NAME'}
for tbl in ['opportunities', 'sales_stages']:
sql = f"SELECT COLUMN_NAME FROM information_schema.COLUMNS WHERE TABLE_SCHEMA='crm_db' AND TABLE_NAME='{tbl}'"
rows = await sor.sqlExe(sql, ns)
if isinstance(rows, dict):
rows = rows.get('rows', [])
result[tbl] = [dict(r).get('column_name', '') for r in rows]
result['success'] = True
except Exception as e:
result['error'] = str(e)
return json.dumps(result, ensure_ascii=False, default=str)

View File

@ -0,0 +1,45 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Opportunity create API"""
import json, uuid, time
result = {'widgettype': 'Message', 'options': {'title': 'Error', 'message': 'Invalid request', 'type': 'error'}}
try:
opportunity_name = params_kw.get('opportunity_name', '').strip()
customer_id = params_kw.get('customer_id', '').strip()
customer_name = params_kw.get('customer_name', '').strip()
estimated_amount = params_kw.get('estimated_amount', '0').strip()
current_stage = params_kw.get('current_stage', '').strip()
expected_close_date = params_kw.get('expected_close_date', '').strip()
owner_id = params_kw.get('owner_id', '').strip()
owner_name = params_kw.get('owner_name', '').strip()
if not opportunity_name or not customer_id or not current_stage or not owner_id:
result['options'] = {'title': 'Error', 'message': '请填写必填字段', 'type': 'error'}
else:
dbname = get_module_dbname('opportunity_management')
new_id = str(uuid.uuid4()).replace('-', '')
now = time.strftime('%Y-%m-%d %H:%M:%S')
async with DBPools().sqlorContext(dbname) as sor:
await sor.sqlExe("""INSERT INTO opportunities (id, customer_id, customer_name, opportunity_name, estimated_amount, current_stage, expected_close_date, source_type, owner_id, owner_name, region, description, probability, predicted_revenue, status, created_at, updated_at)
VALUES (${id}$, ${customer_id}$, ${customer_name}$, ${opportunity_name}$, ${estimated_amount}$, ${current_stage}$, ${expected_close_date}$, ${source_type}$, ${owner_id}$, ${owner_name}$, ${region}$, ${description}$, ${probability}$, ${predicted_revenue}$, ${status}$, ${created_at}$, ${updated_at}$)""", {
'id': new_id, 'customer_id': customer_id, 'customer_name': customer_name,
'opportunity_name': opportunity_name, 'estimated_amount': float(estimated_amount) if estimated_amount else 0,
'current_stage': current_stage, 'expected_close_date': expected_close_date,
'source_type': params_kw.get('source_type', 'manual').strip(),
'owner_id': owner_id, 'owner_name': owner_name,
'region': params_kw.get('region', '').strip(),
'description': params_kw.get('description', '').strip(),
'probability': float(params_kw.get('probability', '0').strip()) if params_kw.get('probability') else 0,
'predicted_revenue': float(params_kw.get('predicted_revenue', '0').strip()) if params_kw.get('predicted_revenue') else 0,
'status': params_kw.get('status', 'active').strip(),
'created_at': now, 'updated_at': now
})
result = {'widgettype': 'Message', 'options': {'title': 'Success', 'message': '商机创建成功', 'type': 'success'}}
except Exception as e:
result['options'] = {'title': 'Error', 'message': f'创建失败: {str(e)}', 'type': 'error'}
return json.dumps(result, ensure_ascii=False)

View File

@ -0,0 +1,25 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Opportunity delete API"""
import json
result = {'widgettype': 'Message', 'options': {'title': 'Error', 'message': 'Invalid request', 'type': 'error'}}
try:
record_id = params_kw.get('id', '').strip()
if not record_id:
result['options'] = {'title': 'Error', 'message': '记录ID不能为空', 'type': 'error'}
else:
dbname = get_module_dbname('opportunity_management')
async with DBPools().sqlorContext(dbname) as sor:
existing = await sor.sqlExe("SELECT id FROM opportunities WHERE id = ${id}$", {'id': record_id})
if not existing:
result['options'] = {'title': 'Error', 'message': '商机不存在', 'type': 'error'}
else:
await sor.sqlExe("DELETE FROM opportunities WHERE id = ${id}$", {'id': record_id})
result = {'widgettype': 'Message', 'options': {'title': 'Success', 'message': '删除成功', 'type': 'success'}}
except Exception as e:
result['options'] = {'title': 'Error', 'message': f'删除失败: {str(e)}', 'type': 'error'}
return json.dumps(result, ensure_ascii=False)

View File

@ -0,0 +1,30 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Opportunity list API"""
import json
result = {'success': False, 'rows': [], 'total': 0}
try:
dbname = get_module_dbname('opportunity_management')
ns = {
'page': int(params_kw.get('page', 1)),
'rows': int(params_kw.get('rows', 20)),
'sort': 'created_at desc'
}
sql = "SELECT id, customer_id, customer_name, opportunity_name, estimated_amount, current_stage, expected_close_date, owner_id, owner_name, region, probability, status, created_at FROM opportunities"
async with DBPools().sqlorContext(dbname) as sor:
data = await sor.sqlExe(sql, ns)
if isinstance(data, dict):
result['total'] = data.get('total', 0)
result['rows'] = [dict(r) for r in data.get('rows', [])]
else:
result['rows'] = [dict(r) for r in (data or [])]
result['total'] = len(result['rows'])
result['success'] = True
except Exception as e:
result['error'] = str(e)
return json.dumps(result, ensure_ascii=False, default=str)

View File

@ -0,0 +1,39 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Opportunity update API"""
import json, time
result = {'widgettype': 'Message', 'options': {'title': 'Error', 'message': 'Invalid request', 'type': 'error'}}
try:
record_id = params_kw.get('id', '').strip()
if not record_id:
result['options'] = {'title': 'Error', 'message': '记录ID不能为空', 'type': 'error'}
else:
dbname = get_module_dbname('opportunity_management')
now = time.strftime('%Y-%m-%d %H:%M:%S')
fields = ['customer_id', 'customer_name', 'opportunity_name', 'estimated_amount', 'current_stage', 'expected_close_date', 'owner_id', 'owner_name', 'region', 'description', 'probability', 'predicted_revenue', 'status', 'source_type']
set_parts = []
params = {'id': record_id}
for f in fields:
val = params_kw.get(f, None)
if val is not None:
set_parts.append(f"{f} = %({f})s")
params[f] = val
set_parts.append("updated_at = ${updated_at}$")
params['updated_at'] = now
if len(set_parts) <= 1:
result['options'] = {'title': 'Error', 'message': '没有可更新的字段', 'type': 'error'}
else:
sql = f"UPDATE opportunities SET {', '.join(set_parts)} WHERE id = ${id}$"
async with DBPools().sqlorContext(dbname) as sor:
await sor.sqlExe(sql, params)
result = {'widgettype': 'Message', 'options': {'title': 'Success', 'message': '更新成功', 'type': 'success'}}
except Exception as e:
result['options'] = {'title': 'Error', 'message': f'更新失败: {str(e)}', 'type': 'error'}
return json.dumps(result, ensure_ascii=False)

View File

@ -0,0 +1,21 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Sales stages list API"""
import json
result = {'success': False, 'rows': [], 'total': 0}
try:
dbname = get_module_dbname('opportunity_management')
sql = "SELECT id, stage_name, stage_order, conversion_rate, is_won_stage, is_lost_stage FROM sales_stages ORDER BY stage_order ASC"
async with DBPools().sqlorContext(dbname) as sor:
data = await sor.sqlExe(sql, {})
result['rows'] = [dict(r) for r in data]
result['total'] = len(data)
result['success'] = True
except Exception as e:
result['error'] = str(e)
return json.dumps(result, ensure_ascii=False, default=str)

View File

@ -1,55 +1,67 @@
{ {
"widgettype": "TabPanel", "type": "Page",
"options": { "title": "Opportunity Management",
"title": "商机管理" "content": {
}, "type": "VBox",
"subwidgets": [ "style": {"padding": "16px", "height": "100%"},
"children": [
{ {
"widgettype": "CRUD", "type": "HBox",
"options": { "justify": "space-between",
"title": "商机列表", "style": {"marginBottom": "16px"},
"url": "{{entire_url(opportunities_list)}}" "children": [
} {"type": "Text", "content": "Opportunity List", "style": {"fontSize": "20px", "fontWeight": "bold"}},
}, {"type": "Button", "text": "Add Opportunity", "variant": "primary", "leadingIcon": "add", "onclick": "openDialog('addOpportunityDialog')"}
{
"widgettype": "CRUD",
"options": {
"title": "销售阶段",
"url": "{{entire_url(sales_stages_list)}}"
}
},
{
"widgettype": "Panel",
"options": {
"title": "销售漏斗分析"
},
"subwidgets": [
{
"widgettype": "Chart",
"options": {
"title": "漏斗可视化",
"chartType": "funnel",
"dataSource": "get_funnel_analysis",
"refreshInterval": 30000
}
}
] ]
}, },
{ {
"widgettype": "Panel", "type": "CRUD",
"options": { "id": "opportunityCRUD",
"title": "销售预测" "api": {
"list": "api/opportunities_list.dspy",
"create": "api/opportunities_create.dspy",
"update": "api/opportunities_update.dspy",
"delete": "api/opportunities_delete.dspy"
},
"columns": [
{"field": "opportunity_name", "title": "Opportunity", "width": 200},
{"field": "customer_name", "title": "Customer", "width": 150},
{"field": "estimated_amount", "title": "Amount", "width": 120},
{"field": "current_stage", "title": "Stage", "width": 120},
{"field": "probability", "title": "Probability", "width": 100},
{"field": "expected_close_date", "title": "Close Date", "width": 120},
{"field": "owner_name", "title": "Owner", "width": 100},
{"field": "status", "title": "Status", "width": 80, "formatter": "code:opportunity_status"},
{"field": "created_at", "title": "Created", "width": 160}
],
"style": {"flex": 1}
}, },
"subwidgets": [
{ {
"widgettype": "Card", "type": "Dialog",
"options": { "id": "addOpportunityDialog",
"title": "今日预测汇总", "title": "Add Opportunity",
"dataSource": "get_sales_prediction", "width": 600,
"fields": ["total_predicted", "avg_confidence", "opportunity_count"] "content": {
"type": "Form",
"id": "addOpportunityForm",
"fields": [
{"field": "opportunity_name", "title": "Opportunity Name", "uitype": "TextField", "required": true},
{"field": "customer_id", "title": "Customer ID", "uitype": "TextField", "required": true},
{"field": "customer_name", "title": "Customer Name", "uitype": "TextField", "required": true},
{"field": "estimated_amount", "title": "Estimated Amount", "uitype": "TextField", "required": true},
{"field": "current_stage", "title": "Stage", "uitype": "TextField", "required": true},
{"field": "expected_close_date", "title": "Expected Close Date", "uitype": "DateField", "required": true},
{"field": "owner_id", "title": "Owner ID", "uitype": "TextField", "required": true},
{"field": "owner_name", "title": "Owner Name", "uitype": "TextField", "required": true},
{"field": "region", "title": "Region", "uitype": "TextField"},
{"field": "description", "title": "Description", "uitype": "TextField"}
],
"actions": [
{"type": "Button", "text": "Cancel", "onclick": "closeDialog('addOpportunityDialog')"},
{"type": "Button", "text": "Submit", "variant": "primary", "onclick": "submitForm('addOpportunityForm', 'api/opportunities_create.dspy', 'addOpportunityDialog', 'opportunityCRUD')"}
]
} }
} }
] ]
} }
]
} }

View File

@ -0,0 +1,58 @@
{
"widgettype": "Page",
"options": {
"title": "商机编辑",
"style": {"height": "100vh", "padding": "0"}
},
"subwidgets": [
{
"widgettype": "VBox",
"options": {"style": {"padding": "16px", "flex": 1, "overflow": "auto"}},
"subwidgets": [
{
"widgettype": "Form",
"id": "opportunity_form",
"options": {
"submit_url": "{{entire_url('api/opportunities_create.dspy')}}",
"method": "POST",
"layout": "vertical",
"style": {"maxWidth": "600px"},
"fields": [
{"name": "opportunity_name", "label": "商机名称", "uitype": "text", "required": true},
{"name": "customer_id", "label": "客户ID", "uitype": "text", "required": true},
{"name": "customer_name", "label": "客户名称", "uitype": "text", "required": true},
{"name": "estimated_amount", "label": "预估金额", "uitype": "number", "required": true},
{"name": "current_stage", "label": "阶段", "uitype": "code", "required": true, "data": [
{"value": "qualification", "text": "资格确认"},
{"value": "needs_analysis", "text": "需求分析"},
{"value": "proposal", "text": "方案报价"},
{"value": "negotiation", "text": "商务谈判"},
{"value": "closed_won", "text": "成交"},
{"value": "closed_lost", "text": "丢单"}
]},
{"name": "expected_close_date", "label": "预计成交日期", "uitype": "date", "required": true},
{"name": "owner_id", "label": "负责人ID", "uitype": "text", "required": true},
{"name": "owner_name", "label": "负责人", "uitype": "text", "required": true},
{"name": "region", "label": "区域", "uitype": "code", "data": [
{"value": "east", "text": "华东"},
{"value": "south", "text": "华南"},
{"value": "west", "text": "华西"},
{"value": "north", "text": "华北"}
]},
{"name": "probability", "label": "成功概率(%)", "uitype": "number"},
{"name": "description", "label": "描述", "uitype": "textarea"},
{"name": "status", "label": "状态", "uitype": "code", "data": [
{"value": "active", "text": "有效"},
{"value": "inactive", "text": "无效"}
]}
],
"buttons": [
{"type": "submit", "text": "保存", "variant": "primary"},
{"type": "button", "text": "取消", "action": "navigate('main/opportunity_management/opportunity_management.ui')"}
]
}
}
]
}
]
}

View File

@ -0,0 +1,58 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""List opportunities"""
import json
result = {'success': False, 'rows': [], 'total': 0}
try:
dbname = get_module_dbname('opportunity_management')
async with DBPools().sqlorContext(dbname) as sor:
where_clauses = []
where_ns = {}
customer_id = params_kw.get('customer_id', '')
status = params_kw.get('status', '')
if customer_id:
where_clauses.append("customer_id=${customer_id}$")
where_ns['customer_id'] = customer_id
if status:
where_clauses.append("status=${status}$")
where_ns['status'] = status
where_sql = " AND ".join(where_clauses)
where_prefix = " WHERE " if where_clauses else ""
count_sql = f"SELECT count(*) rcnt FROM opportunities{where_prefix}{where_sql}"
count_rows = await sor.sqlExe(count_sql, where_ns)
total = 0
if count_rows and len(count_rows) > 0:
r = count_rows[0]
if hasattr(r, 'keys'):
total = r.get('rcnt', 0)
elif isinstance(r, dict):
total = r.get('rcnt', 0)
elif hasattr(r, 'rcnt'):
total = r.rcnt
if total > 0:
ns = {'page': int(params_kw.get('page', 1)), 'rows': int(params_kw.get('rows', 20)), 'sort': params_kw.get('sort', 'created_at')}
sql = f"SELECT id, opportunity_name, customer_id, customer_name, estimated_amount, probability, expected_close_date, current_stage, status, owner_id, owner_name, source_type, region, created_at, updated_at FROM opportunities{where_prefix}{where_sql}"
query_ns = dict(list(ns.items()) + list(where_ns.items()))
rows = await sor.sqlExe(sql, query_ns)
# sqlExe with pagination returns {'total': N, 'rows': [...]}
if isinstance(rows, dict):
result['rows'] = rows.get('rows', [])
result['total'] = rows.get('total', total)
elif rows:
result['rows'] = [dict(r) if hasattr(r, 'keys') else r for r in rows]
result['total'] = total
result['success'] = True
except Exception as e:
result['error'] = str(e)
return json.dumps(result, ensure_ascii=False, default=str)

View File

@ -1,175 +1,44 @@
{ {
"widgettype": "VBox", "widgettype": "Page",
"options": { "options": {
"width": "100%", "title": "商机管理",
"height": "100%" "style": {"height": "100vh", "padding": "0"}
}, },
"subwidgets": [ "subwidgets": [
{ {
"widgettype": "TabPanel",
"options": {
"tabs": [
{
"title": "商机列表",
"id": "opportunities_tab"
},
{
"title": "销售阶段配置",
"id": "stages_tab"
},
{
"title": "商机分析",
"id": "analysis_tab"
}
]
},
"subwidgets": [
{
"id": "opportunities_tab_content",
"widgettype": "VBox", "widgettype": "VBox",
"options": { "options": {"style": {"padding": "16px", "flex": 1, "overflow": "hidden"}},
"width": "100%",
"height": "100%"
},
"subwidgets": [
{
"widgettype": "DataGrid",
"options": {
"url": "{{entire_url('opportunities.json')}}",
"height": "100%",
"width": "100%",
"toolbar": true,
"search": true,
"export": true
},
"binds": [
{
"wid": "self",
"event": "row_selected",
"actiontype": "script",
"target": "self",
"script": "console.log('Selected opportunity:', event.params);"
}
]
}
]
},
{
"id": "stages_tab_content",
"widgettype": "VBox",
"options": {
"width": "100%",
"height": "100%"
},
"subwidgets": [
{
"widgettype": "DataGrid",
"options": {
"url": "{{entire_url('sales_stages.json')}}",
"height": "100%",
"width": "100%",
"toolbar": true,
"search": false,
"export": true
}
}
]
},
{
"id": "analysis_tab_content",
"widgettype": "VBox",
"options": {
"width": "100%",
"height": "100%"
},
"subwidgets": [ "subwidgets": [
{ {
"widgettype": "HBox", "widgettype": "HBox",
"options": { "options": {"style": {"marginBottom": "16px", "gap": "8px"}},
"width": "100%",
"height": "50px"
},
"subwidgets": [ "subwidgets": [
{"widgettype": "TextField", "id": "search_keyword", "options": {"label": "搜索", "placeholder": "商机名称/客户", "style": {"flex": 1}}},
{"widgettype": "Button", "id": "btn_search", "options": {"text": "搜索", "variant": "primary"}},
{"widgettype": "Button", "id": "btn_add", "options": {"text": "新增", "variant": "primary", "action": "navigate('main/opportunity_management/opportunity_edit.ui')"}}
]
},
{ {
"widgettype": "Form", "widgettype": "DataGrid",
"id": "opportunity_grid",
"options": { "options": {
"fields": [ "url": "{{entire_url('api/opportunities_list.dspy')}}",
{ "style": {"flex": 1},
"uitype": "str", "columns": [
"name": "region_filter", {"field": "opportunity_name", "header": "商机名称", "width": 200},
"label": "区域筛选", {"field": "customer_name", "header": "客户", "width": 150},
"placeholder": "选择区域" {"field": "estimated_amount", "header": "预估金额", "width": 120},
}, {"field": "current_stage", "header": "阶段", "width": 120},
{ {"field": "probability", "header": "概率", "width": 80},
"uitype": "str", {"field": "expected_close_date", "header": "预计成交", "width": 120},
"name": "owner_filter", {"field": "owner_name", "header": "负责人", "width": 100},
"label": "销售筛选", {"field": "status", "header": "状态", "width": 80}
"placeholder": "选择销售人员"
},
{
"uitype": "button",
"name": "apply_filter",
"label": "应用筛选"
}
], ],
"inline": true "toolbar": [
}, {"type": "button", "text": "编辑", "icon": "edit", "action": "navigate('main/opportunity_management/opportunity_edit.ui?id={% raw %}{{selectedRow.id}}{% endraw %}')"},
"binds": [ {"type": "button", "text": "删除", "icon": "delete", "action": "doDelete('{% raw %}{{selectedRow.id}}{% endraw %}')"}
{
"wid": "apply_filter",
"event": "click",
"actiontype": "script",
"target": "self",
"script": "const region = document.querySelector('[name=\"region_filter\"]').value;\nconst owner = document.querySelector('[name=\"owner_filter\"]').value;\n// 这里会触发漏斗图和预测数据的更新\nconsole.log('Applying filters:', {region, owner});"
}
] ]
} }
]
},
{
"widgettype": "VBox",
"options": {
"width": "100%",
"height": "calc(100% - 60px)"
},
"subwidgets": [
{
"widgettype": "HBox",
"options": {
"width": "100%",
"height": "50%"
},
"subwidgets": [
{
"widgettype": "Pie",
"options": {
"title": "各阶段商机数量占比",
"dataurl": "/api/opportunity/stage-counts",
"datamethod": "GET"
}
},
{
"widgettype": "Bar",
"options": {
"title": "各阶段商机金额占比",
"dataurl": "/api/opportunity/stage-amounts",
"datamethod": "GET"
}
}
]
},
{
"widgettype": "Line",
"options": {
"title": "成交预测 vs 实际成交",
"dataurl": "/api/opportunity/prediction-accuracy",
"datamethod": "GET",
"height": "50%"
}
}
]
}
]
} }
] ]
} }

View File

@ -0,0 +1,23 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""List sales stages"""
import json
result = {'success': False, 'rows': [], 'total': 0}
try:
dbname = get_module_dbname('opportunity_management')
async with DBPools().sqlorContext(dbname) as sor:
sql = "SELECT id, stage_name, stage_order, conversion_rate, is_lost_stage, is_won_stage, created_at, updated_at FROM sales_stages ORDER BY stage_order"
rows = await sor.sqlExe(sql, {'page': 1, 'rows': 50, 'sort': 'stage_order'})
if isinstance(rows, dict):
result['rows'] = rows.get('rows', [])
result['total'] = rows.get('total', len(result['rows']))
elif rows:
result['rows'] = [dict(r) if hasattr(r, 'keys') else r for r in rows]
result['total'] = len(result['rows'])
result['success'] = True
except Exception as e:
result['error'] = str(e)
return json.dumps(result, ensure_ascii=False, default=str)