product_management/wwwroot/api/product_use.dspy
yumoqing 43787a63a4 fix: spec compliance pass - ServerEnv removal in .dspy, core.py DBPools singleton, scripts/load_path.py
- All .dspy files: replace ServerEnv() org_id access with await get_userorgid()
  (get_userorgid is registered as global in ahserver processorResource)
- core.py: simplify _get_db to use DBPools() singleton directly (DBPools is @SingletonDecorator)
  remove unnecessary db.databases = config.databases assignment
- core.py: add MODULE_NAME constant, use env.get_module_dbname(MODULE_NAME) pattern
- Create scripts/load_path.py with all RBAC paths per module-development-spec
  Covers: entry pages, feature .ui files, CRUD directories, all API .dspy endpoints
- .gitignore: add __pycache__/ exclusion
- All models/*.json and json/*.json pass spec validation checks
2026-05-25 17:03:08 +08:00

86 lines
2.9 KiB
Python

#!/usr/bin/env python3
"""
产品使用标准化接口 (按机构隔离)
参数:
product_id: 产品ID
order_id: 订单ID (可选)
use_data: 使用附加数据 (JSON字符串)
org_id: 机构ID (可选)
返回:
{success, use_record_id, data: {product_info, extra_parsed}}
"""
import json, time
from appPublic.uniqueID import getID
result = {'success': False, 'use_record_id': '', 'message': '', 'data': {}}
try:
user_id = await get_user()
org_id = params_kw.get('org_id', None) or (await get_userorgid()) or '0'
now = time.strftime('%Y-%m-%d %H:%M:%S')
product_id = params_kw.get('product_id', '')
order_id = params_kw.get('order_id', '')
use_data = params_kw.get('use_data', '{}')
if not product_id:
result['message'] = '缺少product_id参数'
return json.dumps(result, ensure_ascii=False)
dbname = get_module_dbname('product_management')
sql = """SELECT * FROM product WHERE id = ${product_id}$ AND status = '1' AND org_id = ${org_id}$"""
async with DBPools().sqlorContext(dbname) as sor:
rows = await sor.sqlExe(sql, {'product_id': product_id, 'org_id': org_id})
if not rows:
result['message'] = '产品不存在或无权访问'
return json.dumps(result, ensure_ascii=False)
product = dict(rows[0])
# Parse extra_json for the product
extra_parsed = {}
extra_str = product.get('extra_json', '')
if extra_str:
try:
extra_parsed = json.loads(extra_str)
except:
pass
# Verify purchase (if table exists)
try:
purchase_sql = """SELECT * FROM purchase_orders
WHERE product_id = ${product_id}$
AND buyer_id = ${user_id}$
AND buyer_org_id = ${org_id}$
AND status IN ('active', 'pending')"""
purchases = await sor.sqlExe(purchase_sql, {
'product_id': product_id,
'user_id': user_id,
'org_id': org_id
})
if not purchases and not order_id:
result['message'] = '您尚未购买此产品'
return json.dumps(result, ensure_ascii=False)
except:
pass
use_record_id = getID()
result['success'] = True
result['use_record_id'] = use_record_id
result['data'] = {
'product_info': {
'id': product['id'],
'name': product['product_name'],
'code': product['product_code'],
'product_type': product.get('product_type', '')
},
'extra_parsed': extra_parsed
}
result['message'] = '产品使用成功'
except Exception as e:
result['message'] = '使用失败: ' + str(e)
return json.dumps(result, ensure_ascii=False, default=str)