- All .dspy files: replace ServerEnv() org_id access with await get_userorgid() (get_userorgid is registered as global in ahserver processorResource) - core.py: simplify _get_db to use DBPools() singleton directly (DBPools is @SingletonDecorator) remove unnecessary db.databases = config.databases assignment - core.py: add MODULE_NAME constant, use env.get_module_dbname(MODULE_NAME) pattern - Create scripts/load_path.py with all RBAC paths per module-development-spec Covers: entry pages, feature .ui files, CRUD directories, all API .dspy endpoints - .gitignore: add __pycache__/ exclusion - All models/*.json and json/*.json pass spec validation checks
83 lines
2.8 KiB
Python
83 lines
2.8 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
产品购买标准化接口 (按机构隔离)
|
|
参数:
|
|
product_id: 产品ID
|
|
quantity: 购买数量 (默认1)
|
|
purchase_data: 购买附加数据 (JSON字符串)
|
|
org_id: 机构ID (可选)
|
|
返回:
|
|
{success, order_id, message}
|
|
"""
|
|
import json, time
|
|
from appPublic.uniqueID import getID
|
|
|
|
result = {'success': False, 'order_id': '', 'message': ''}
|
|
|
|
try:
|
|
user_id = await get_user()
|
|
org_id = params_kw.get('org_id', None) or (await get_userorgid()) or '0'
|
|
now = time.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
product_id = params_kw.get('product_id', '')
|
|
quantity = int(params_kw.get('quantity', '1'))
|
|
purchase_data = params_kw.get('purchase_data', '{}')
|
|
|
|
if not product_id:
|
|
result['message'] = '缺少product_id参数'
|
|
return json.dumps(result, ensure_ascii=False)
|
|
|
|
dbname = get_module_dbname('product_management')
|
|
sql = """SELECT * FROM product WHERE id = ${product_id}$ AND status = '1' AND org_id = ${org_id}$"""
|
|
|
|
async with DBPools().sqlorContext(dbname) as sor:
|
|
rows = await sor.sqlExe(sql, {'product_id': product_id, 'org_id': org_id})
|
|
if not rows:
|
|
result['message'] = '产品不存在或已禁用'
|
|
return json.dumps(result, ensure_ascii=False)
|
|
|
|
product = dict(rows[0])
|
|
|
|
import datetime
|
|
today = datetime.date.today().isoformat()
|
|
enabled = str(product.get('enabled_date', '') or '')
|
|
expired = str(product.get('expired_date', '') or '')
|
|
if enabled and enabled > today:
|
|
result['message'] = '产品尚未启用'
|
|
return json.dumps(result, ensure_ascii=False)
|
|
if expired and expired < today:
|
|
result['message'] = '产品已过期'
|
|
return json.dumps(result, ensure_ascii=False)
|
|
|
|
order_id = getID()
|
|
order_data = {
|
|
'id': order_id,
|
|
'product_id': product_id,
|
|
'product_code': product.get('product_code', ''),
|
|
'product_name': product.get('product_name', ''),
|
|
'buyer_id': user_id,
|
|
'buyer_org_id': org_id,
|
|
'quantity': quantity,
|
|
'unit_price': float(product.get('price', 0)),
|
|
'total_price': float(product.get('price', 0)) * quantity,
|
|
'currency': product.get('currency', 'CNY'),
|
|
'purchase_data': purchase_data,
|
|
'status': 'pending',
|
|
'created_at': now,
|
|
'updated_at': now
|
|
}
|
|
|
|
try:
|
|
await sor.C('purchase_orders', order_data)
|
|
except Exception:
|
|
pass
|
|
|
|
result['success'] = True
|
|
result['order_id'] = order_id
|
|
result['message'] = '购买请求已提交'
|
|
|
|
except Exception as e:
|
|
result['message'] = '购买失败: ' + str(e)
|
|
|
|
return json.dumps(result, ensure_ascii=False)
|