2026-04-16 13:29:07 +08:00

453 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from datetime import datetime, timedelta
from typing import List, Dict, Optional
import uuid
import re
from ahserver.serverenv import ServerEnv
from appPublic.worker import awaitify
from sqlor.dbp import DBP
async def create_customer(
customer_name: str,
customer_type: str,
phone: str = None,
email: str = None,
tax_id: str = None,
industry: str = None,
customer_level: str = "potential",
address: str = None,
owner_id: str = None,
region: str = None
) -> Dict:
"""创建客户档案"""
dbp = DBP()
customer_id = str(uuid.uuid4()).replace('-', '')
# 数据校验
await validate_customer_data(dbp, phone, tax_id, customer_type)
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
customer_data = {
"id": customer_id,
"customer_name": customer_name,
"customer_type": customer_type,
"phone": phone,
"email": email,
"tax_id": tax_id,
"industry": industry,
"customer_level": customer_level,
"address": address,
"owner_id": owner_id or get_current_user_id(),
"region": region,
"last_follow_up": now,
"created_at": now,
"updated_at": now,
"status": "active"
}
await dbp.insert("customers", customer_data)
return customer_data
async def validate_customer_data(dbp, phone: str, tax_id: str, customer_type: str):
"""验证客户数据唯一性"""
# 手机号唯一性校验
if phone:
existing_phone = await dbp.select_one("customers", {"phone": phone})
if existing_phone:
raise ValueError(f"手机号 {phone} 已存在,不能重复建档")
# 企业税号唯一性校验(仅对企业客户)
if customer_type == "enterprise" and tax_id:
existing_tax = await dbp.select_one("customers", {"tax_id": tax_id})
if existing_tax:
raise ValueError(f"企业税号 {tax_id} 已存在,不能重复建档")
async def initiate_handover(
customer_id: str,
to_owner_id: str,
handover_reason: str,
reviewer_id: str = None
) -> Dict:
"""发起客户交接流程"""
dbp = DBP()
# 获取客户信息
customer = await dbp.select_one("customers", {"id": customer_id})
if not customer:
raise ValueError("客户不存在")
if customer["status"] != "active":
raise ValueError("只能交接活跃状态的客户")
# 创建交接记录
handover_id = str(uuid.uuid4()).replace('-', '')
handover_data = {
"id": handover_id,
"customer_id": customer_id,
"from_owner_id": customer["owner_id"],
"to_owner_id": to_owner_id,
"handover_reason": handover_reason,
"current_stage": "preparation",
"reviewer_id": reviewer_id,
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
await dbp.insert("customer_handover", handover_data)
# 自动生成交接清单
await generate_handover_items(dbp, handover_id, customer_id)
return handover_data
async def generate_handover_items(dbp, handover_id: str, customer_id: str):
"""自动生成交接清单"""
items = []
# 基本信息
items.append({
"id": str(uuid.uuid4()).replace('-', ''),
"handover_id": handover_id,
"item_type": "basic_info",
"item_description": "客户基本信息和联系记录",
"is_completed": "0",
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
})
# 未结商机
opportunities = await dbp.query(
"SELECT id, customer_name, estimated_amount, current_stage FROM opportunities WHERE customer_name = (SELECT customer_name FROM customers WHERE id = %(customer_id)s) AND status = 'active'",
{"customer_id": customer_id}
)
if opportunities:
for opp in opportunities:
items.append({
"id": str(uuid.uuid4()).replace('-', ''),
"handover_id": handover_id,
"item_type": "opportunities",
"item_id": opp["id"],
"item_description": f"商机: {opp['customer_name']} - 预估金额: {opp['estimated_amount']}, 阶段: {opp['current_stage']}",
"is_completed": "0",
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
})
# 历史合同(假设合同管理模块存在)
contracts = await dbp.query(
"SELECT id, contract_no, amount, status FROM contracts WHERE customer_id = %(customer_id)s",
{"customer_id": customer_id}
)
if contracts:
for contract in contracts:
items.append({
"id": str(uuid.uuid4()).replace('-', ''),
"handover_id": handover_id,
"item_type": "contracts",
"item_id": contract["id"],
"item_description": f"合同: {contract['contract_no']} - 金额: {contract['amount']}, 状态: {contract['status']}",
"is_completed": "0",
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
})
# 服务工单(假设服务模块存在)
service_tickets = await dbp.query(
"SELECT id, ticket_no, subject, status FROM service_tickets WHERE customer_id = %(customer_id)s AND status != 'closed'",
{"customer_id": customer_id}
)
if service_tickets:
for ticket in service_tickets:
items.append({
"id": str(uuid.uuid4()).replace('-', ''),
"handover_id": handover_id,
"item_type": "service_tickets",
"item_id": ticket["id"],
"item_description": f"服务工单: {ticket['ticket_no']} - 主题: {ticket['subject']}, 状态: {ticket['status']}",
"is_completed": "0",
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
})
# 未解决回款问题
payment_issues = await dbp.query(
"SELECT id, invoice_no, amount, due_date FROM payment_records WHERE customer_id = %(customer_id)s AND status = 'overdue'",
{"customer_id": customer_id}
)
if payment_issues:
for issue in payment_issues:
items.append({
"id": str(uuid.uuid4()).replace('-', ''),
"handover_id": handover_id,
"item_type": "payment_issues",
"item_id": issue["id"],
"item_description": f"回款问题: 发票 {issue['invoice_no']} - 金额: {issue['amount']}, 到期日: {issue['due_date']}",
"is_completed": "0",
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
})
# 批量插入交接项目
for item in items:
await dbp.insert("customer_handover_items", item)
async def complete_handover_preparation(handover_id: str) -> Dict:
"""完成交接准备阶段"""
dbp = DBP()
handover = await dbp.select_one("customer_handover", {"id": handover_id})
if not handover:
raise ValueError("交接记录不存在")
if handover["current_stage"] != "preparation":
raise ValueError("当前不在准备阶段")
# 更新为审核阶段
await dbp.update(
"customer_handover",
{
"current_stage": "review",
"prepared_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
{"id": handover_id}
)
return {"handover_id": handover_id, "stage": "review"}
async def approve_handover(handover_id: str, approver_id: str = None) -> Dict:
"""审核交接清单"""
dbp = DBP()
approver_id = approver_id or get_current_user_id()
handover = await dbp.select_one("customer_handover", {"id": handover_id})
if not handover:
raise ValueError("交接记录不存在")
if handover["current_stage"] != "review":
raise ValueError("当前不在审核阶段")
# 更新为确认阶段
await dbp.update(
"customer_handover",
{
"current_stage": "confirmation",
"reviewed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
{"id": handover_id}
)
return {"handover_id": handover_id, "stage": "confirmation"}
async def confirm_handover(handover_id: str, confirm_by: str = None) -> Dict:
"""确认接收客户"""
dbp = DBP()
confirm_by = confirm_by or get_current_user_id()
handover = await dbp.select_one("customer_handover", {"id": handover_id})
if not handover:
raise ValueError("交接记录不存在")
if handover["current_stage"] != "confirmation":
raise ValueError("当前不在确认阶段")
# 更新客户负责人
await dbp.update(
"customers",
{
"owner_id": handover["to_owner_id"],
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
{"id": handover["customer_id"]}
)
# 完成交接流程
await dbp.update(
"customer_handover",
{
"current_stage": "completed",
"confirmed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"completed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
{"id": handover_id}
)
# 发送客户通知(模拟)
await send_customer_notification(handover["customer_id"], handover["to_owner_id"])
return {"handover_id": handover_id, "stage": "completed", "customer_id": handover["customer_id"]}
async def send_customer_notification(customer_id: str, new_owner_id: str):
"""发送客户对接人变更通知"""
# 这里应该集成短信/邮件服务
# 模拟实现
dbp = DBP()
customer = await dbp.select_one("customers", {"id": customer_id})
new_owner = await dbp.select_one("users", {"id": new_owner_id}) # 假设用户表存在
notification_content = f"尊敬的{customer['customer_name']},您的客户经理已变更为{new_owner.get('name', '新经理')},联系方式:{new_owner.get('phone', '待更新')}"
# 记录通知日志(实际应调用短信/邮件API
print(f"客户通知已发送: {notification_content}")
async def recycle_to_pool(customer_id: str, inactive_days: int = None, reason: str = "inactive_days"):
"""回收客户到公海池"""
dbp = DBP()
customer = await dbp.select_one("customers", {"id": customer_id})
if not customer:
raise ValueError("客户不存在")
if customer["status"] == "in_pool":
raise ValueError("客户已在公海池中")
# 创建公海记录
pool_id = str(uuid.uuid4()).replace('-', '')
pool_data = {
"id": pool_id,
"customer_id": customer_id,
"original_owner_id": customer["owner_id"],
"recycle_reason": reason,
"inactive_days": inactive_days,
"recycled_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"pool_status": "available",
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
await dbp.insert("customer_pool", pool_data)
# 更新客户状态
await dbp.update(
"customers",
{
"status": "in_pool",
"owner_id": "", # 清空负责人
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
{"id": customer_id}
)
return pool_data
async def claim_from_pool(pool_id: str, new_owner_id: str = None):
"""从公海池认领客户"""
dbp = DBP()
new_owner_id = new_owner_id or get_current_user_id()
pool_record = await dbp.select_one("customer_pool", {"id": pool_id})
if not pool_record:
raise ValueError("公海记录不存在")
if pool_record["pool_status"] != "available":
raise ValueError("该客户已被认领或分配")
# 更新公海记录
await dbp.update(
"customer_pool",
{
"assigned_to": new_owner_id,
"assigned_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"pool_status": "claimed",
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
{"id": pool_id}
)
# 更新客户状态和负责人
await dbp.update(
"customers",
{
"status": "active",
"owner_id": new_owner_id,
"last_follow_up": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
{"id": pool_record["customer_id"]}
)
return {"customer_id": pool_record["customer_id"], "new_owner_id": new_owner_id}
async def get_customer_360_view(customer_id: str) -> Dict:
"""获取客户360度视图"""
dbp = DBP()
# 客户基本信息
customer = await dbp.select_one("customers", {"id": customer_id})
if not customer:
raise ValueError("客户不存在")
# 商机记录
opportunities = await dbp.query(
"SELECT id, estimated_amount, current_stage, expected_close_date, status FROM opportunities WHERE customer_name = %(customer_name)s ORDER BY created_at DESC",
{"customer_name": customer["customer_name"]}
)
# 合同历史(假设合同管理模块存在)
contracts = await dbp.query(
"SELECT id, contract_no, amount, start_date, end_date, status FROM contracts WHERE customer_id = %(customer_id)s ORDER BY created_at DESC",
{"customer_id": customer_id}
)
# 服务工单(假设服务模块存在)
service_tickets = await dbp.query(
"SELECT id, ticket_no, subject, priority, status, created_at FROM service_tickets WHERE customer_id = %(customer_id)s ORDER BY created_at DESC",
{"customer_id": customer_id}
)
# 回款情况
payments = await dbp.query(
"SELECT id, invoice_no, amount, paid_amount, due_date, status FROM payment_records WHERE customer_id = %(customer_id)s ORDER BY due_date DESC",
{"customer_id": customer_id}
)
return {
"customer": customer,
"opportunities": opportunities,
"contracts": contracts,
"service_tickets": service_tickets,
"payments": payments
}
def get_current_user_id() -> str:
"""获取当前用户ID模拟实现"""
return "current_user_id"
# 同步版本函数
def sync_create_customer(*args, **kwargs):
return create_customer(*args, **kwargs)
def sync_initiate_handover(*args, **kwargs):
return initiate_handover(*args, **kwargs)
def sync_complete_handover_preparation(*args, **kwargs):
return complete_handover_preparation(*args, **kwargs)
def sync_approve_handover(*args, **kwargs):
return approve_handover(*args, **kwargs)
def sync_confirm_handover(*args, **kwargs):
return confirm_handover(*args, **kwargs)
def sync_recycle_to_pool(*args, **kwargs):
return recycle_to_pool(*args, **kwargs)
def sync_claim_from_pool(*args, **kwargs):
return claim_from_pool(*args, **kwargs)
def sync_get_customer_360_view(*args, **kwargs):
return get_customer_360_view(*args, **kwargs)