161 lines
7.0 KiB
Python

import os
import json
import uuid
from datetime import datetime
from typing import List, Dict, Optional, Tuple
from appPublic.jsonConfig import getConfig
from sqlor.dbpools import DBPools
class ContractManager:
def __init__(self):
self.config = getConfig()
async def get_db_connection(self, org_id: str):
"""获取数据库连接"""
dbp = await getDBP(org_id)
return dbp
async def create_contract(self, contract_data: Dict, user_id: str, org_id: str) -> str:
"""创建合同"""
contract_id = str(uuid.uuid4()).replace('-', '')
dbp = await self.get_db_connection(org_id)
# 插入合同数据
sql = """
INSERT INTO contract (
id, contract_number, title, party_a, party_b, contract_type,
status, amount, start_date, end_date, sign_date, description,
owner_id, org_id, created_at, updated_at
) VALUES (
%(id)s, %(contract_number)s, %(title)s, %(party_a)s, %(party_b)s, %(contract_type)s,
%(status)s, %(amount)s, %(start_date)s, %(end_date)s, %(sign_date)s, %(description)s,
%(owner_id)s, %(org_id)s, NOW(), NOW()
)
"""
params = {
'id': contract_id,
'contract_number': contract_data['contract_number'],
'title': contract_data['title'],
'party_a': contract_data['party_a'],
'party_b': contract_data['party_b'],
'contract_type': contract_data['contract_type'],
'status': contract_data.get('status', 'draft'),
'amount': contract_data.get('amount'),
'start_date': contract_data['start_date'],
'end_date': contract_data['end_date'],
'sign_date': contract_data.get('sign_date'),
'description': contract_data.get('description'),
'owner_id': user_id,
'org_id': org_id
}
await sor.doTransaction([{'sql': sql, 'params': params}])
return contract_id
async def update_contract(self, contract_id: str, contract_data: Dict, org_id: str) -> bool:
"""更新合同"""
dbp = await self.get_db_connection(org_id)
# 构建更新SQL
update_fields = []
params = {'id': contract_id, 'org_id': org_id}
for field, value in contract_data.items():
if field in ['contract_number', 'title', 'party_a', 'party_b', 'contract_type',
'status', 'amount', 'start_date', 'end_date', 'sign_date', 'description']:
update_fields.append(f"{field} = %({field})s")
params[field] = value
update_fields.append("updated_at = NOW()")
sql = f"UPDATE contract SET {', '.join(update_fields)} WHERE id = %(id)s AND org_id = %(org_id)s"
result = await sor.doTransaction([{'sql': sql, 'params': params}])
return result.rowcount > 0
async def delete_contract(self, contract_id: str, org_id: str) -> bool:
"""删除合同(软删除)"""
dbp = await self.get_db_connection(org_id)
sql = "UPDATE contract SET status = 'deleted', updated_at = NOW() WHERE id = %(id)s AND org_id = %(org_id)s"
result = await sor.doTransaction([{'sql': sql, 'params': {'id': contract_id, 'org_id': org_id}}])
return result.rowcount > 0
async def get_contract_by_id(self, contract_id: str, org_id: str) -> Optional[Dict]:
"""根据ID获取合同"""
dbp = await self.get_db_connection(org_id)
sql = "SELECT * FROM contract WHERE id = %(id)s AND org_id = %(org_id)s AND status != 'deleted'"
result = await sor.doQuery(sql, {'id': contract_id, 'org_id': org_id})
return result[0] if result else None
async def list_contracts(self, org_id: str, filters: Optional[Dict] = None,
page: int = 1, page_size: int = 20) -> Tuple[List[Dict], int]:
"""列出合同"""
dbp = await self.get_db_connection(org_id)
# 构建查询条件
where_clauses = ["org_id = %(org_id)s", "status != 'deleted'"]
params = {'org_id': org_id}
if filters:
if filters.get('contract_number'):
where_clauses.append("contract_number LIKE %(contract_number)s")
params['contract_number'] = f"%{filters['contract_number']}%"
if filters.get('title'):
where_clauses.append("title LIKE %(title)s")
params['title'] = f"%{filters['title']}%"
if filters.get('party_b'):
where_clauses.append("party_b LIKE %(party_b)s")
params['party_b'] = f"%{filters['party_b']}%"
if filters.get('status'):
where_clauses.append("status = %(status)s")
params['status'] = filters['status']
if filters.get('contract_type'):
where_clauses.append("contract_type = %(contract_type)s")
params['contract_type'] = filters['contract_type']
if filters.get('start_date_from'):
where_clauses.append("start_date >= %(start_date_from)s")
params['start_date_from'] = filters['start_date_from']
if filters.get('end_date_to'):
where_clauses.append("end_date <= %(end_date_to)s")
params['end_date_to'] = filters['end_date_to']
where_sql = " AND ".join(where_clauses)
# 获取总数
count_sql = f"SELECT COUNT(*) as total FROM contract WHERE {where_sql}"
count_result = await sor.doQuery(count_sql, params)
total = count_result[0]['total'] if count_result else 0
# 获取分页数据
offset = (page - 1) * page_size
data_sql = f"""
SELECT * FROM contract
WHERE {where_sql}
ORDER BY created_at DESC
LIMIT %(limit)s OFFSET %(offset)s
"""
params['limit'] = page_size
params['offset'] = offset
data_result = await sor.doQuery(data_sql, params)
return data_result, total
# 全局实例
contract_manager = ContractManager()
# 导出函数
async def create_contract(contract_data: Dict, user_id: str, org_id: str) -> str:
return await contract_manager.create_contract(contract_data, user_id, org_id)
async def update_contract(contract_id: str, contract_data: Dict, org_id: str) -> bool:
return await contract_manager.update_contract(contract_id, contract_data, org_id)
async def delete_contract(contract_id: str, org_id: str) -> bool:
return await contract_manager.delete_contract(contract_id, org_id)
async def get_contract_by_id(contract_id: str, org_id: str) -> Optional[Dict]:
return await contract_manager.get_contract_by_id(contract_id, org_id)
async def list_contracts(org_id: str, filters: Optional[Dict] = None,
page: int = 1, page_size: int = 20) -> Tuple[List[Dict], int]:
return await contract_manager.list_contracts(org_id, filters, page, page_size)