286 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from datetime import datetime, date
from decimal import Decimal
from typing import List, Dict, Optional
import uuid
from ahserver.serverenv import ServerEnv
from appPublic.worker import awaitify
from sqlor.dbpools import DBPools
from appPublic.jsonConfig import getConfig
async def create_opportunity(
customer_name: str,
estimated_amount: float,
current_stage: str,
expected_close_date: str,
source_type: str = "manual",
owner_id: str = None,
region: str = None
) -> Dict:
"""创建商机"""
config = getConfig()
db = DBPools(config.databases)
async with db.sqlorContext('opportunity_management') as sor:
opportunity_id = str(uuid.uuid4()).replace('-', '')
# 验证客户是否存在
customer_records = await sor.R("customers", {"filters": [{"field": "customer_name", "op": "=", "value": customer_name}]})
if not customer_records or len(customer_records) == 0:
raise ValueError(f"客户 {customer_name} 不存在")
customer = customer_records[0]
customer_id = customer.get("id", "")
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
opportunity_data = {
"id": opportunity_id,
"customer_id": customer_id,
"customer_name": customer_name,
"estimated_amount": estimated_amount,
"current_stage": current_stage,
"expected_close_date": expected_close_date,
"source_type": source_type,
"owner_id": owner_id or get_current_user_id(),
"region": region,
"probability": get_stage_probability(current_stage),
"created_at": now,
"updated_at": now,
"status": "active"
}
await sor.C("opportunities", opportunity_data)
return opportunity_data
def get_stage_probability(stage: str) -> float:
"""根据销售阶段获取成交概率"""
stage_probabilities = {
"initial_contact": 0.1,
"needs_analysis": 0.2,
"proposal": 0.5,
"negotiation": 0.7,
"closed_won": 1.0,
"closed_lost": 0.0
}
return stage_probabilities.get(stage, 0.0)
async def update_opportunity_stage(
opportunity_id: str,
new_stage: str,
notes: str = None
) -> Dict:
"""更新商机阶段"""
config = getConfig()
db = DBPools(config.databases)
async with db.sqlorContext('opportunity_management') as sor:
opportunity_records = await sor.R("opportunities", {"filters": [{"field": "id", "op": "=", "value": opportunity_id}]})
if not opportunity_records or len(opportunity_records) == 0:
raise ValueError("商机不存在")
opportunity = opportunity_records[0]
if opportunity["status"] != "active":
raise ValueError("只能更新活跃状态的商机")
# 记录阶段变更历史
history_id = str(uuid.uuid4()).replace('-', '')
history_data = {
"id": history_id,
"opportunity_id": opportunity_id,
"from_stage": opportunity["current_stage"],
"to_stage": new_stage,
"notes": notes,
"changed_by": get_current_user_id(),
"changed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
await sor.C("opportunity_stage_history", history_data)
# 更新商机
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
await sor.U(
"opportunities",
{
"current_stage": new_stage,
"probability": get_stage_probability(new_stage),
"updated_at": now
},
{"filters": [{"field": "id", "op": "=", "value": opportunity_id}]}
)
# 如果是关闭阶段,更新状态
if new_stage in ["closed_won", "closed_lost"]:
status = "won" if new_stage == "closed_won" else "lost"
await sor.U(
"opportunities",
{"status": status},
{"filters": [{"field": "id", "op": "=", "value": opportunity_id}]}
)
return {"opportunity_id": opportunity_id, "new_stage": new_stage}
async def assign_opportunity(
opportunity_id: str,
new_owner_id: str
) -> Dict:
"""分配商机给销售人员"""
config = getConfig()
db = DBPools(config.databases)
async with db.sqlorContext('opportunity_management') as sor:
opportunity_records = await sor.R("opportunities", {"filters": [{"field": "id", "op": "=", "value": opportunity_id}]})
if not opportunity_records or len(opportunity_records) == 0:
raise ValueError("商机不存在")
opportunity = opportunity_records[0]
old_owner_id = opportunity["owner_id"]
# 记录分配历史
assignment_id = str(uuid.uuid4()).replace('-', '')
assignment_data = {
"id": assignment_id,
"opportunity_id": opportunity_id,
"from_owner_id": old_owner_id,
"to_owner_id": new_owner_id,
"assigned_by": get_current_user_id(),
"assigned_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
await sor.C("opportunity_assignment_history", assignment_data)
# 更新商机负责人
await sor.U(
"opportunities",
{
"owner_id": new_owner_id,
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
{"filters": [{"field": "id", "op": "=", "value": opportunity_id}]}
)
return {"opportunity_id": opportunity_id, "new_owner_id": new_owner_id}
async def get_opportunity_funnel(
start_date: str = None,
end_date: str = None,
owner_id: str = None,
region: str = None
) -> Dict:
"""获取销售漏斗数据"""
config = getConfig()
db = DBPools(config.databases)
async with db.sqlorContext('opportunity_management') as sor:
filters = []
if start_date and end_date:
filters.append({"field": "created_at", "op": ">=", "value": start_date})
filters.append({"field": "created_at", "op": "<=", "value": end_date})
if owner_id:
filters.append({"field": "owner_id", "op": "=", "value": owner_id})
if region:
filters.append({"field": "region", "op": "=", "value": region})
opportunities = await sor.R("opportunities", {"filters": filters})
# 按阶段分组统计
funnel_data = {}
total_value = 0
for opp in opportunities:
stage = opp["current_stage"]
amount = float(opp["estimated_amount"]) if opp["estimated_amount"] else 0
probability = float(opp["probability"]) if opp["probability"] else 0
if stage not in funnel_data:
funnel_data[stage] = {
"count": 0,
"total_amount": 0,
"weighted_amount": 0
}
funnel_data[stage]["count"] += 1
funnel_data[stage]["total_amount"] += amount
funnel_data[stage]["weighted_amount"] += amount * probability
total_value += amount * probability
return {
"funnel_data": funnel_data,
"total_weighted_value": total_value,
"total_opportunities": len(opportunities)
}
async def get_sales_performance(
start_date: str,
end_date: str,
owner_id: str = None
) -> Dict:
"""获取销售业绩数据"""
config = getConfig()
db = DBPools(config.databases)
async with db.sqlorContext('opportunity_management') as sor:
# 查询已关闭的商机(赢单)
won_filters = [
{"field": "status", "op": "=", "value": "won"},
{"field": "updated_at", "op": ">=", "value": start_date},
{"field": "updated_at", "op": "<=", "value": end_date}
]
if owner_id:
won_filters.append({"field": "owner_id", "op": "=", "value": owner_id})
won_opportunities = await sor.R("opportunities", {"filters": won_filters})
total_revenue = sum(float(opp["estimated_amount"]) for opp in won_opportunities if opp["estimated_amount"])
total_count = len(won_opportunities)
# 查询活跃商机
active_filters = [
{"field": "status", "op": "=", "value": "active"},
{"field": "created_at", "op": ">=", "value": start_date},
{"field": "created_at", "op": "<=", "value": end_date}
]
if owner_id:
active_filters.append({"field": "owner_id", "op": "=", "value": owner_id})
active_opportunities = await sor.R("opportunities", {"filters": active_filters})
active_count = len(active_opportunities)
weighted_pipeline = sum(
float(opp["estimated_amount"]) * float(opp["probability"])
for opp in active_opportunities
if opp["estimated_amount"] and opp["probability"]
)
return {
"period": f"{start_date} to {end_date}",
"total_revenue": total_revenue,
"total_closed_deals": total_count,
"active_opportunities": active_count,
"weighted_pipeline_value": weighted_pipeline,
"average_deal_size": total_revenue / total_count if total_count > 0 else 0
}
def get_current_user_id() -> str:
"""获取当前用户ID模拟实现"""
return "current_user_id"
# 同步版本函数
def sync_create_opportunity(*args, **kwargs):
return create_opportunity(*args, **kwargs)
def sync_update_opportunity_stage(*args, **kwargs):
return update_opportunity_stage(*args, **kwargs)
def sync_assign_opportunity(*args, **kwargs):
return assign_opportunity(*args, **kwargs)
def sync_get_opportunity_funnel(*args, **kwargs):
return get_opportunity_funnel(*args, **kwargs)
def sync_get_sales_performance(*args, **kwargs):
return get_sales_performance(*args, **kwargs)