- product_category: org_id scoped tree, product_table_name -> product_type - product: org_id scoped, added extra_json for custom attributes, product_type field - product_type_config: org_id + operator_id dual isolation, unique key on (org_id, operator_id, category_id, config_name) - All 18 API endpoints enforce org_id filtering via ServerEnv - core.py: all methods accept optional org_id, default to current user's org - CRUD definitions: logined_userorgid set to org_id on all lists - init/data.json: removed hardcoded global categories (managed per reseller) - Rebuilt mysql.ddl.sql and all CRUD UI files
88 lines
3.0 KiB
Python
88 lines
3.0 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
产品使用标准化接口 (按机构隔离)
|
|
参数:
|
|
product_id: 产品ID
|
|
order_id: 订单ID (可选)
|
|
use_data: 使用附加数据 (JSON字符串)
|
|
org_id: 机构ID (可选)
|
|
返回:
|
|
{success, use_record_id, data: {product_info, extra_parsed}}
|
|
"""
|
|
import json, time
|
|
from appPublic.uniqueID import getID
|
|
|
|
result = {'success': False, 'use_record_id': '', 'message': '', 'data': {}}
|
|
|
|
try:
|
|
user_id = await get_user()
|
|
from ahserver.serverenv import ServerEnv
|
|
env = ServerEnv()
|
|
org_id = params_kw.get('org_id', None) or getattr(env, 'orgid', None) or getattr(env, 'org_id', '0')
|
|
now = time.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
product_id = params_kw.get('product_id', '')
|
|
order_id = params_kw.get('order_id', '')
|
|
use_data = params_kw.get('use_data', '{}')
|
|
|
|
if not product_id:
|
|
result['message'] = '缺少product_id参数'
|
|
return json.dumps(result, ensure_ascii=False)
|
|
|
|
dbname = get_module_dbname('product_management')
|
|
sql = """SELECT * FROM product WHERE id = ${product_id}$ AND status = '1' AND org_id = ${org_id}$"""
|
|
|
|
async with DBPools().sqlorContext(dbname) as sor:
|
|
rows = await sor.sqlExe(sql, {'product_id': product_id, 'org_id': org_id})
|
|
if not rows:
|
|
result['message'] = '产品不存在或无权访问'
|
|
return json.dumps(result, ensure_ascii=False)
|
|
|
|
product = dict(rows[0])
|
|
|
|
# Parse extra_json for the product
|
|
extra_parsed = {}
|
|
extra_str = product.get('extra_json', '')
|
|
if extra_str:
|
|
try:
|
|
extra_parsed = json.loads(extra_str)
|
|
except:
|
|
pass
|
|
|
|
# Verify purchase (if table exists)
|
|
try:
|
|
purchase_sql = """SELECT * FROM purchase_orders
|
|
WHERE product_id = ${product_id}$
|
|
AND buyer_id = ${user_id}$
|
|
AND buyer_org_id = ${org_id}$
|
|
AND status IN ('active', 'pending')"""
|
|
purchases = await sor.sqlExe(purchase_sql, {
|
|
'product_id': product_id,
|
|
'user_id': user_id,
|
|
'org_id': org_id
|
|
})
|
|
if not purchases and not order_id:
|
|
result['message'] = '您尚未购买此产品'
|
|
return json.dumps(result, ensure_ascii=False)
|
|
except:
|
|
pass
|
|
|
|
use_record_id = getID()
|
|
result['success'] = True
|
|
result['use_record_id'] = use_record_id
|
|
result['data'] = {
|
|
'product_info': {
|
|
'id': product['id'],
|
|
'name': product['product_name'],
|
|
'code': product['product_code'],
|
|
'product_type': product.get('product_type', '')
|
|
},
|
|
'extra_parsed': extra_parsed
|
|
}
|
|
result['message'] = '产品使用成功'
|
|
|
|
except Exception as e:
|
|
result['message'] = '使用失败: ' + str(e)
|
|
|
|
return json.dumps(result, ensure_ascii=False, default=str)
|