contract_management/contract_management/enhanced_contract_core.py

500 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import json
import uuid
from datetime import datetime, date, timedelta
from typing import List, Dict, Optional, Tuple
from appPublic.jsonConfig import getConfig
from appPublic.worker import awaitify
from sqlor.dbpools import DBPools
class EnhancedContractManager:
def __init__(self):
self.config = getConfig()
async def get_db_connection(self, org_id: str):
"""获取数据库连接"""
dbp = await getDBP(org_id)
return dbp
async def create_contract_from_opportunity(self, opportunity_id: str, contract_data: Dict, user_id: str, org_id: str) -> str:
"""从商机创建合同"""
# 获取商机信息
opp_dbp = await getDBP(org_id)
opportunity = await opp_sor.select_one("opportunities", {"id": opportunity_id})
if not opportunity:
raise ValueError("商机不存在")
# 自动填充客户信息和金额
contract_data["customer_id"] = opportunity.get("customer_id") # 假设商机表有customer_id字段
contract_data["opportunity_id"] = opportunity_id
contract_data["party_b"] = opportunity["customer_name"]
contract_data["amount"] = opportunity["estimated_amount"]
contract_data["owner_id"] = user_id
contract_data["org_id"] = org_id
# 创建合同
contract_id = await self.create_contract(contract_data, user_id, org_id)
# 自动生成里程碑(基于付款条款)
if contract_data.get("payment_terms"):
await self.generate_milestones_from_payment_terms(contract_id, contract_data["payment_terms"], org_id)
# 自动生成订单(基于付款条款)
if contract_data.get("payment_terms"):
await self.generate_orders_from_payment_terms(contract_id, contract_data, org_id)
return contract_id
async def create_contract(self, contract_data: Dict, user_id: str, org_id: str) -> str:
"""创建合同(增强版)"""
contract_id = str(uuid.uuid4()).replace('-', '')
dbp = await self.get_db_connection(org_id)
# 插入合同数据
sql = """
INSERT INTO contract (
id, contract_number, title, party_a, party_b, contract_type,
status, amount, start_date, end_date, sign_date, description,
owner_id, org_id, created_at, updated_at,
payment_terms, credit_period, penalty_clause, opportunity_id, customer_id, tax_rate
) VALUES (
%(id)s, %(contract_number)s, %(title)s, %(party_a)s, %(party_b)s, %(contract_type)s,
%(status)s, %(amount)s, %(start_date)s, %(end_date)s, %(sign_date)s, %(description)s,
%(owner_id)s, %(org_id)s, NOW(), NOW(),
%(payment_terms)s, %(credit_period)s, %(penalty_clause)s, %(opportunity_id)s, %(customer_id)s, %(tax_rate)s
)
"""
params = {
'id': contract_id,
'contract_number': contract_data['contract_number'],
'title': contract_data['title'],
'party_a': contract_data['party_a'],
'party_b': contract_data['party_b'],
'contract_type': contract_data['contract_type'],
'status': contract_data.get('status', 'draft'),
'amount': contract_data.get('amount'),
'start_date': contract_data['start_date'],
'end_date': contract_data['end_date'],
'sign_date': contract_data.get('sign_date'),
'description': contract_data.get('description'),
'owner_id': user_id,
'org_id': org_id,
'payment_terms': contract_data.get('payment_terms'),
'credit_period': contract_data.get('credit_period'),
'penalty_clause': contract_data.get('penalty_clause'),
'opportunity_id': contract_data.get('opportunity_id'),
'customer_id': contract_data.get('customer_id'),
'tax_rate': contract_data.get('tax_rate', '0.1300')
}
await sor.doTransaction([{'sql': sql, 'params': params}])
# 创建初始版本
await self.create_contract_version(contract_id, contract_data.get('content', ''), user_id, org_id, 'v1.0', '初始版本')
return contract_id
async def create_contract_version(self, contract_id: str, content: str, user_id: str, org_id: str,
version_number: str, modified_reason: str = '') -> str:
"""创建合同版本"""
version_id = str(uuid.uuid4()).replace('-', '')
dbp = await self.get_db_connection(org_id)
# 将之前的当前版本标记为非当前
await sor.update(
"contract_versions",
{"is_current": "0"},
{"contract_id": contract_id, "is_current": "1"}
)
# 插入新版本
version_data = {
"id": version_id,
"contract_id": contract_id,
"version_number": version_number,
"content": content,
"diff_content": "", # 实际应用中应计算差异
"modified_by": user_id,
"modified_reason": modified_reason,
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"is_current": "1"
}
await sor.insert("contract_versions", version_data)
return version_id
async def generate_milestones_from_payment_terms(self, contract_id: str, payment_terms: str, org_id: str):
"""根据付款条款自动生成里程碑"""
# 简单解析付款条款,实际应用中应使用更复杂的解析逻辑
milestones = []
if "预付款" in payment_terms:
milestones.append({
"milestone_name": "预付款到账",
"milestone_type": "payment",
"planned_date": date.today() + timedelta(days=7)
})
if "进度款" in payment_terms:
milestones.append({
"milestone_name": "产品交付",
"milestone_type": "delivery",
"planned_date": date.today() + timedelta(days=30)
})
milestones.append({
"milestone_name": "进度款支付",
"milestone_type": "payment",
"planned_date": date.today() + timedelta(days=35)
})
if "尾款" in payment_terms:
milestones.append({
"milestone_name": "验收完成",
"milestone_type": "acceptance",
"planned_date": date.today() + timedelta(days=60)
})
milestones.append({
"milestone_name": "尾款支付",
"milestone_type": "payment",
"planned_date": date.today() + timedelta(days=65)
})
# 插入里程碑
for milestone in milestones:
milestone_id = str(uuid.uuid4()).replace('-', '')
milestone_data = {
"id": milestone_id,
"contract_id": contract_id,
"milestone_name": milestone["milestone_name"],
"milestone_type": milestone["milestone_type"],
"planned_date": milestone["planned_date"].strftime("%Y-%m-%d"),
"status": "pending",
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
dbp = await self.get_db_connection(org_id)
await sor.insert("contract_milestones", milestone_data)
async def generate_orders_from_payment_terms(self, contract_id: str, contract_data: Dict, org_id: str):
"""根据付款条款自动生成订单"""
# 解析付款条款并生成订单
orders = []
amount = float(contract_data.get('amount', 0))
if "30%" in contract_data.get('payment_terms', '') and "预付款" in contract_data.get('payment_terms', ''):
orders.append({
"order_type": "advance",
"amount": amount * 0.3,
"delivery_batch": "预付款阶段"
})
if "50%" in contract_data.get('payment_terms', '') and "进度款" in contract_data.get('payment_terms', ''):
orders.append({
"order_type": "progress",
"amount": amount * 0.5,
"delivery_batch": "进度款阶段"
})
if "20%" in contract_data.get('payment_terms', '') and "尾款" in contract_data.get('payment_terms', ''):
orders.append({
"order_type": "final",
"amount": amount * 0.2,
"delivery_batch": "尾款阶段"
})
# 创建订单
for i, order in enumerate(orders):
order_id = str(uuid.uuid4()).replace('-', '')
order_number = f"{contract_data['contract_number']}-ORD-{i+1:02d}"
order_data = {
"id": order_id,
"order_number": order_number,
"contract_id": contract_id,
"customer_id": contract_data.get('customer_id', ''),
"order_type": order["order_type"],
"delivery_batch": order["delivery_batch"],
"amount": order["amount"],
"tax_rate": contract_data.get('tax_rate', '0.1300'),
"credit_period": contract_data.get('credit_period'),
"status": "active",
"owner_id": contract_data.get('owner_id', ''),
"org_id": org_id,
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
dbp = await self.get_db_connection(org_id)
await sor.insert("orders", order_data)
async def create_manual_order(self, contract_id: str, order_data: Dict, user_id: str, org_id: str) -> str:
"""手动创建订单"""
# 验证合同剩余未收款金额
contract = await self.get_contract_by_id(contract_id, org_id)
if not contract:
raise ValueError("合同不存在")
# 计算已收款金额
dbp = await self.get_db_connection(org_id)
paid_orders = await sor.query(
"SELECT SUM(amount) as total_paid FROM orders WHERE contract_id = %(contract_id)s AND status != 'cancelled'",
{"contract_id": contract_id}
)
total_paid = float(paid_orders[0]["total_paid"]) if paid_orders and paid_orders[0]["total_paid"] else 0
remaining_amount = float(contract["amount"]) - total_paid
if float(order_data["amount"]) > remaining_amount:
raise ValueError(f"订单金额不能超过合同剩余未收款金额 {remaining_amount}")
# 创建订单
order_id = str(uuid.uuid4()).replace('-', '')
order_number = f"{contract['contract_number']}-MANUAL-{datetime.now().strftime('%Y%m%d%H%M%S')}"
final_order_data = {
"id": order_id,
"order_number": order_number,
"contract_id": contract_id,
"customer_id": contract["customer_id"],
"order_type": order_data["order_type"],
"delivery_batch": order_data.get("delivery_batch"),
"acceptance_deadline": order_data.get("acceptance_deadline"),
"amount": order_data["amount"],
"tax_rate": order_data.get("tax_rate", contract.get("tax_rate", "0.1300")),
"credit_period": order_data.get("credit_period", contract.get("credit_period")),
"status": "active",
"description": order_data.get("description"),
"owner_id": user_id,
"org_id": org_id,
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
await sor.insert("orders", final_order_data)
return order_id
async def update_contract_milestone_status(self, milestone_id: str, status: str, actual_date: str = None, org_id: str = None):
"""更新里程碑状态"""
dbp = await self.get_db_connection(org_id)
update_data = {
"status": status,
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
if actual_date:
update_data["actual_date"] = actual_date
await sor.update("contract_milestones", update_data, {"id": milestone_id})
# 如果里程碑完成,更新关联的合同履约进度
if status == "completed":
await self.update_contract_fulfillment_progress(milestone_id, org_id)
async def update_contract_fulfillment_progress(self, milestone_id: str, org_id: str):
"""更新合同履约进度"""
dbp = await self.get_db_connection(org_id)
milestone = await sor.select_one("contract_milestones", {"id": milestone_id})
if not milestone:
return
# 计算履约进度(简化逻辑)
contract_id = milestone["contract_id"]
all_milestones = await sor.query(
"SELECT COUNT(*) as total, SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed FROM contract_milestones WHERE contract_id = %(contract_id)s",
{"contract_id": contract_id}
)
if all_milestones:
total = int(all_milestones[0]["total"])
completed = int(all_milestones[0]["completed"])
progress = (completed / total * 100) if total > 0 else 0
# 更新商机阶段(如果存在关联商机)
contract = await sor.select_one("contract", {"id": contract_id})
if contract and contract.get("opportunity_id"):
# 这里应该调用商机管理模块的API来更新阶段
# 模拟实现当进度达到100%时,将商机状态更新为成交
if progress >= 100:
await sor.update(
"opportunities",
{"status": "won", "updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")},
{"id": contract["opportunity_id"]}
)
async def check_overdue_milestones(self, org_id: str) -> List[Dict]:
"""检查逾期的里程碑并返回提醒列表"""
dbp = await self.get_db_connection(org_id)
# 查询所有计划日期已过但状态不是已完成的里程碑
sql = """
SELECT
cm.id,
cm.contract_id,
cm.milestone_name,
cm.milestone_type,
cm.planned_date,
cm.status,
c.contract_number,
c.party_b as customer_name
FROM contract_milestones cm
JOIN contract c ON cm.contract_id = c.id
WHERE c.org_id = %(org_id)s
AND cm.planned_date < CURDATE()
AND cm.status NOT IN ('completed', 'cancelled')
AND c.status != 'deleted'
ORDER BY cm.planned_date ASC
"""
overdue_milestones = await sor.doQuery(sql, {"org_id": org_id})
return overdue_milestones
async def send_milestone_reminders(self, org_id: str) -> int:
"""发送里程碑逾期提醒"""
overdue_milestones = await self.check_overdue_milestones(org_id)
# 这里应该集成消息通知系统
# 目前只记录日志,实际应用中应发送邮件/站内信等
reminder_count = len(overdue_milestones)
if reminder_count > 0:
print(f"发现 {reminder_count} 个逾期里程碑需要提醒")
for milestone in overdue_milestones:
print(f"提醒: 合同 {milestone['contract_number']} 的里程碑 '{milestone['milestone_name']}' 已逾期")
return reminder_count
async def ai_contract_analysis(self, contract_content: str, org_id: str) -> Dict:
"""AI合同分析智能审核- 参考纷享销客RMS模块"""
analysis_result = {
"compliance_issues": [],
"key_dates": [],
"risk_warnings": [],
"extracted_terms": {}
}
# 条款解析 - 提取关键信息
extracted_terms = {}
# 1. 提取付款节点
payment_nodes = []
if "预付款" in contract_content:
payment_nodes.append({"type": "advance", "description": "预付款", "percentage": self._extract_percentage(contract_content, "预付款")})
if "进度款" in contract_content:
payment_nodes.append({"type": "progress", "description": "进度款", "percentage": self._extract_percentage(contract_content, "进度款")})
if "尾款" in contract_content:
payment_nodes.append({"type": "final", "description": "尾款", "percentage": self._extract_percentage(contract_content, "尾款")})
if "验收款" in contract_content:
payment_nodes.append({"type": "acceptance", "description": "验收款", "percentage": self._extract_percentage(contract_content, "验收款")})
extracted_terms["payment_nodes"] = payment_nodes
# 2. 提取账期信息
credit_period = None
if "账期" in contract_content or "付款期限" in contract_content:
credit_period = self._extract_credit_period(contract_content)
extracted_terms["credit_period"] = credit_period
# 3. 提取违约金条款
penalty_clause = None
if "违约金" in contract_content:
penalty_clause = self._extract_penalty_clause(contract_content)
extracted_terms["penalty_clause"] = penalty_clause
# 4. 提取关键日期
key_dates = self._extract_key_dates(contract_content)
extracted_terms["key_dates"] = key_dates
analysis_result["extracted_terms"] = extracted_terms
# 风险预警
risk_warnings = []
# 超长账期检测(>90天
if credit_period and self._is_long_credit_period(credit_period):
risk_warnings.append({
"type": "credit_period",
"severity": "high",
"message": f"检测到超长账期({credit_period} > 90天建议修改",
"highlight": True,
"recommendation": "建议将账期调整为90天以内以降低资金风险"
})
# 异常违约金条款检测
if penalty_clause and self._is_abnormal_penalty(penalty_clause):
risk_warnings.append({
"type": "penalty_clause",
"severity": "high",
"message": f"检测到异常高额违约金条款({penalty_clause}),建议审核",
"highlight": True,
"recommendation": "违约金比例通常不应超过合同总金额的30%"
})
# 其他合规性检查
compliance_issues = []
if not payment_nodes:
compliance_issues.append({
"type": "missing_payment_terms",
"severity": "medium",
"message": "未检测到明确的付款节点条款,建议补充",
"highlight": False
})
if not key_dates.get("start_date") or not key_dates.get("end_date"):
compliance_issues.append({
"type": "missing_dates",
"severity": "medium",
"message": "缺少合同开始或结束日期,建议补充",
"highlight": False
})
analysis_result["risk_warnings"] = risk_warnings
analysis_result["compliance_issues"] = compliance_issues
analysis_result["key_dates"] = list(key_dates.values()) if isinstance(key_dates, dict) else key_dates
return analysis_result
def _extract_percentage(self, content: str, term: str) -> float:
"""提取百分比"""
import re
# 查找形如 "30%预付款" 或 "预付款30%" 的模式
patterns = [
rf'(\d+)%\s*{term}',
rf'{term}\s*(\d+)%',
rf'(\d+)\s*{term}', # 全角百分号
rf'{term}\s*(\d+)'
]
for pattern in patterns:
match = re.search(pattern, content)
if match:
return float(match.group(1)) / 100.0
# 默认值
defaults = {"预付款": 0.3, "进度款": 0.5, "尾款": 0.2, "验收款": 0.2}
return defaults.get(term, 0.0)
def _extract_credit_period(self, content: str) -> str:
"""提取账期信息"""
import re
# 查找账期相关数字
patterns = [
r'账期\s*(\d+)\s*天',
r'(\d+)\s*天\s*账期',
r'付款期限\s*(\d+)\s*天',
r'(\d+)\s*天\s*内付款'
]
for pattern in patterns:
match = re.search(pattern, content)
if match:
return f"{match.group(1)}"
return "未明确"
def _extract_penalty_clause(self, content: str) -> str:
"""提取违约金条款"""
import re
# 查找违约金相关数字
patterns = [
r'违约金\s*(\d+)%',
r'(\d+)%\s*违约金',
r'违约金\s*(\d+)',
r'(\d+)\s*违约金',
r'违约金为合同总额的\s*(\d+)%'
]
for pattern in patterns:
match = re.search(pattern, content)