545 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from datetime import datetime, timedelta
from typing import List, Dict, Optional
import uuid
from appPublic.uniqueID import getID
import re
from ahserver.serverenv import ServerEnv
from appPublic.worker import awaitify
from sqlor.dbpools import DBPools
from appPublic.jsonConfig import getConfig
async def create_customer(
customer_name: str,
customer_type: str,
phone: str = None,
email: str = None,
tax_id: str = None,
industry: str = None,
customer_level: str = "potential",
address: str = None,
owner_id: str = None,
region: str = None
) -> Dict:
"""创建客户档案"""
env = ServerEnv()
dbname = env.get_module_dbname('customer_management')
config = getConfig()
db = DBPools()
db.databases = config.databases
async with db.sqlorContext(dbname) as sor:
customer_id = getID()
# 数据校验
await validate_customer_data(sor, phone, tax_id, customer_type)
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
customer_data = {
"id": customer_id,
"customer_name": customer_name,
"customer_type": customer_type,
"phone": phone,
"email": email,
"tax_id": tax_id,
"industry": industry,
"customer_level": customer_level,
"address": address,
"owner_id": owner_id or get_current_user_id(),
"region": region,
"last_follow_up": now,
"created_at": now,
"updated_at": now,
"status": "active"
}
await sor.C("customers", customer_data)
return customer_data
async def validate_customer_data(sor, phone: str, tax_id: str, customer_type: str):
"""验证客户数据唯一性"""
# 手机号唯一性校验
if phone:
existing_phone = await sor.R("customers", {"filters": [{"field": "phone", "op": "=", "value": phone}]})
if existing_phone and len(existing_phone) > 0:
raise ValueError(f"手机号 {phone} 已存在,不能重复建档")
# 企业税号唯一性校验(仅对企业客户)
if customer_type == "enterprise" and tax_id:
existing_tax = await sor.R("customers", {"filters": [{"field": "tax_id", "op": "=", "value": tax_id}]})
if existing_tax and len(existing_tax) > 0:
raise ValueError(f"企业税号 {tax_id} 已存在,不能重复建档")
async def initiate_handover(
customer_id: str,
to_owner_id: str,
handover_reason: str,
reviewer_id: str = None
) -> Dict:
"""发起客户交接流程"""
env = ServerEnv()
dbname = env.get_module_dbname('customer_management')
config = getConfig()
db = DBPools()
db.databases = config.databases
async with db.sqlorContext(dbname) as sor:
# 获取客户信息
customer_records = await sor.R("customers", {"filters": [{"field": "id", "op": "=", "value": customer_id}]})
if not customer_records or len(customer_records) == 0:
raise ValueError("客户不存在")
customer = customer_records[0]
if customer["status"] != "active":
raise ValueError("只能交接活跃状态的客户")
# 创建交接记录
handover_id = getID()
handover_data = {
"id": handover_id,
"customer_id": customer_id,
"from_owner_id": customer["owner_id"],
"to_owner_id": to_owner_id,
"handover_reason": handover_reason,
"current_stage": "preparation",
"reviewer_id": reviewer_id,
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
await sor.C("customer_handover", handover_data)
# 自动生成交接清单
await generate_handover_items(sor, handover_id, customer_id)
return handover_data
async def generate_handover_items(sor, handover_id: str, customer_id: str):
"""自动生成交接清单"""
items = []
# 基本信息
items.append({
"id": getID(),
"handover_id": handover_id,
"item_type": "basic_info",
"item_description": "客户基本信息和联系记录",
"is_completed": "0",
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
})
# 未结商机
opportunities = await sor.R("opportunities", {
"filters": [
{"field": "customer_name", "op": "=", "value": f"(SELECT customer_name FROM customers WHERE id = '{customer_id}')"},
{"field": "status", "op": "=", "value": "active"}
]
})
if opportunities:
for opp in opportunities:
items.append({
"id": getID(),
"handover_id": handover_id,
"item_type": "opportunities",
"item_id": opp["id"],
"item_description": f"商机: {opp['customer_name']} - 预估金额: {opp['estimated_amount']}, 阶段: {opp['current_stage']}",
"is_completed": "0",
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
})
# 历史合同(假设合同管理模块存在)
contracts = await sor.R("contracts", {"filters": [{"field": "customer_id", "op": "=", "value": customer_id}]})
if contracts:
for contract in contracts:
items.append({
"id": getID(),
"handover_id": handover_id,
"item_type": "contracts",
"item_id": contract["id"],
"item_description": f"合同: {contract['contract_no']} - 金额: {contract['amount']}, 状态: {contract['status']}",
"is_completed": "0",
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
})
# 服务工单(假设服务模块存在)
service_tickets = await sor.R("service_tickets", {
"filters": [
{"field": "customer_id", "op": "=", "value": customer_id},
{"field": "status", "op": "!=", "value": "closed"}
]
})
if service_tickets:
for ticket in service_tickets:
items.append({
"id": getID(),
"handover_id": handover_id,
"item_type": "service_tickets",
"item_id": ticket["id"],
"item_description": f"服务工单: {ticket['ticket_no']} - 主题: {ticket['subject']}, 状态: {ticket['status']}",
"is_completed": "0",
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
})
# 未解决回款问题
payment_issues = await sor.R("payment_records", {
"filters": [
{"field": "customer_id", "op": "=", "value": customer_id},
{"field": "status", "op": "=", "value": "overdue"}
]
})
if payment_issues:
for issue in payment_issues:
items.append({
"id": getID(),
"handover_id": handover_id,
"item_type": "payment_issues",
"item_id": issue["id"],
"item_description": f"回款问题: 发票 {issue['invoice_no']} - 金额: {issue['amount']}, 到期日: {issue['due_date']}",
"is_completed": "0",
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
})
# 批量插入交接项目
for item in items:
await sor.C("customer_handover_items", item)
async def complete_handover_preparation(handover_id: str) -> Dict:
"""完成交接准备阶段"""
env = ServerEnv()
dbname = env.get_module_dbname('customer_management')
config = getConfig()
db = DBPools()
db.databases = config.databases
async with db.sqlorContext(dbname) as sor:
handover_records = await sor.R("customer_handover", {"filters": [{"field": "id", "op": "=", "value": handover_id}]})
if not handover_records or len(handover_records) == 0:
raise ValueError("交接记录不存在")
handover = handover_records[0]
if handover["current_stage"] != "preparation":
raise ValueError("当前不在准备阶段")
# 更新为审核阶段
await sor.U(
"customer_handover",
{
"current_stage": "review",
"prepared_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
{"filters": [{"field": "id", "op": "=", "value": handover_id}]}
)
return {"handover_id": handover_id, "stage": "review"}
async def approve_handover(handover_id: str, approver_id: str = None) -> Dict:
"""审核交接清单"""
env = ServerEnv()
dbname = env.get_module_dbname('customer_management')
config = getConfig()
db = DBPools()
db.databases = config.databases
async with db.sqlorContext(dbname) as sor:
approver_id = approver_id or get_current_user_id()
handover_records = await sor.R("customer_handover", {"filters": [{"field": "id", "op": "=", "value": handover_id}]})
if not handover_records or len(handover_records) == 0:
raise ValueError("交接记录不存在")
handover = handover_records[0]
if handover["current_stage"] != "review":
raise ValueError("当前不在审核阶段")
# 更新为确认阶段
await sor.U(
"customer_handover",
{
"current_stage": "confirmation",
"reviewed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
{"filters": [{"field": "id", "op": "=", "value": handover_id}]}
)
return {"handover_id": handover_id, "stage": "confirmation"}
async def confirm_handover(handover_id: str, confirm_by: str = None) -> Dict:
"""确认接收客户"""
env = ServerEnv()
dbname = env.get_module_dbname('customer_management')
config = getConfig()
db = DBPools()
db.databases = config.databases
async with db.sqlorContext(dbname) as sor:
confirm_by = confirm_by or get_current_user_id()
handover_records = await sor.R("customer_handover", {"filters": [{"field": "id", "op": "=", "value": handover_id}]})
if not handover_records or len(handover_records) == 0:
raise ValueError("交接记录不存在")
handover = handover_records[0]
if handover["current_stage"] != "confirmation":
raise ValueError("当前不在确认阶段")
# 更新客户负责人
await sor.U(
"customers",
{
"owner_id": handover["to_owner_id"],
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
{"filters": [{"field": "id", "op": "=", "value": handover["customer_id"]}]}
)
# 完成交接流程
await sor.U(
"customer_handover",
{
"current_stage": "completed",
"confirmed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"completed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
{"filters": [{"field": "id", "op": "=", "value": handover_id}]}
)
# 发送客户通知(模拟)
await send_customer_notification(sor, handover["customer_id"], handover["to_owner_id"])
return {"handover_id": handover_id, "stage": "completed", "customer_id": handover["customer_id"]}
async def send_customer_notification(sor, customer_id: str, new_owner_id: str):
"""发送客户对接人变更通知"""
# 这里应该集成短信/邮件服务
# 模拟实现
customer_records = await sor.R("customers", {"filters": [{"field": "id", "op": "=", "value": customer_id}]})
customer = customer_records[0] if customer_records else None
new_owner_records = await sor.R("users", {"filters": [{"field": "id", "op": "=", "value": new_owner_id}]})
new_owner = new_owner_records[0] if new_owner_records else {"name": "新经理", "phone": "待更新"}
if customer:
notification_content = f"尊敬的{customer['customer_name']},您的客户经理已变更为{new_owner.get('name', '新经理')},联系方式:{new_owner.get('phone', '待更新')}"
# 记录通知日志(实际应调用短信/邮件API
print(f"客户通知已发送: {notification_content}")
async def recycle_to_pool(customer_id: str, inactive_days: int = None, reason: str = "inactive_days"):
"""回收客户到公海池"""
env = ServerEnv()
dbname = env.get_module_dbname('customer_management')
config = getConfig()
db = DBPools()
db.databases = config.databases
async with db.sqlorContext(dbname) as sor:
customer_records = await sor.R("customers", {"filters": [{"field": "id", "op": "=", "value": customer_id}]})
if not customer_records or len(customer_records) == 0:
raise ValueError("客户不存在")
customer = customer_records[0]
if customer["status"] == "in_pool":
raise ValueError("客户已在公海池中")
# 创建公海记录
pool_id = getID()
pool_data = {
"id": pool_id,
"customer_id": customer_id,
"original_owner_id": customer["owner_id"],
"recycle_reason": reason,
"inactive_days": inactive_days,
"recycled_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"pool_status": "available",
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
await sor.C("customer_pool", pool_data)
# 更新客户状态
await sor.U(
"customers",
{
"status": "in_pool",
"owner_id": "", # 清空负责人
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
{"filters": [{"field": "id", "op": "=", "value": customer_id}]}
)
return pool_data
async def claim_from_pool(pool_id: str, new_owner_id: str = None):
"""从公海池认领客户"""
env = ServerEnv()
dbname = env.get_module_dbname('customer_management')
config = getConfig()
db = DBPools()
db.databases = config.databases
async with db.sqlorContext(dbname) as sor:
new_owner_id = new_owner_id or get_current_user_id()
pool_records = await sor.R("customer_pool", {"filters": [{"field": "id", "op": "=", "value": pool_id}]})
if not pool_records or len(pool_records) == 0:
raise ValueError("公海记录不存在")
pool_record = pool_records[0]
if pool_record["pool_status"] != "available":
raise ValueError("该客户已被认领或分配")
# 更新公海记录
await sor.U(
"customer_pool",
{
"assigned_to": new_owner_id,
"assigned_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"pool_status": "claimed",
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
{"filters": [{"field": "id", "op": "=", "value": pool_id}]}
)
# 更新客户状态和负责人
await sor.U(
"customers",
{
"status": "active",
"owner_id": new_owner_id,
"last_follow_up": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
{"filters": [{"field": "id", "op": "=", "value": pool_record["customer_id"]}]}
)
return {"customer_id": pool_record["customer_id"], "new_owner_id": new_owner_id}
async def get_customer_360_view(customer_id: str) -> Dict:
"""获取客户360度视图"""
env = ServerEnv()
dbname = env.get_module_dbname('customer_management')
config = getConfig()
db = DBPools()
db.databases = config.databases
async with db.sqlorContext(dbname) as sor:
# 客户基本信息
customer_records = await sor.R("customers", {"filters": [{"field": "id", "op": "=", "value": customer_id}]})
if not customer_records or len(customer_records) == 0:
raise ValueError("客户不存在")
customer = customer_records[0]
# 商机记录
opportunities = await sor.R("opportunities", {
"filters": [{"field": "customer_name", "op": "=", "value": customer["customer_name"]}],
"sortby": [{"field": "created_at", "direction": "desc"}]
})
# 合同历史(假设合同管理模块存在)
contracts = await sor.R("contracts", {
"filters": [{"field": "customer_id", "op": "=", "value": customer_id}],
"sortby": [{"field": "created_at", "direction": "desc"}]
})
# 服务工单(假设服务模块存在)
service_tickets = await sor.R("service_tickets", {
"filters": [{"field": "customer_id", "op": "=", "value": customer_id}],
"sortby": [{"field": "created_at", "direction": "desc"}]
})
# 回款情况
payments = await sor.R("payment_records", {
"filters": [{"field": "customer_id", "op": "=", "value": customer_id}],
"sortby": [{"field": "due_date", "direction": "desc"}]
})
return {
"customer": customer,
"opportunities": opportunities,
"contracts": contracts,
"service_tickets": service_tickets,
"payments": payments
}
def get_current_user_id() -> str:
"""获取当前用户ID模拟实现"""
return "current_user_id"
# 同步版本函数
def sync_create_customer(*args, **kwargs):
return create_customer(*args, **kwargs)
def sync_initiate_handover(*args, **kwargs):
return initiate_handover(*args, **kwargs)
def sync_complete_handover_preparation(*args, **kwargs):
return complete_handover_preparation(*args, **kwargs)
def sync_approve_handover(*args, **kwargs):
return approve_handover(*args, **kwargs)
def sync_confirm_handover(*args, **kwargs):
return confirm_handover(*args, **kwargs)
def sync_recycle_to_pool(*args, **kwargs):
return recycle_to_pool(*args, **kwargs)
def sync_claim_from_pool(*args, **kwargs):
return claim_from_pool(*args, **kwargs)
def sync_get_customer_360_view(*args, **kwargs):
return get_customer_360_view(*args, **kwargs)