yumoqing 9eccd08ffd feat: pipeline-app独立产线后端服务
3个业务模块:
- pipeline_core: 产线定义(pipelines/steps/versions)
- pipeline_ops: 运营(定价/供应量/使用记录)
- pipeline_dist: 分销(分销商/独立定价/API密钥)

- ahserver独立部署(端口9090)
- 独立数据库pipeline
- 80个文件, 符合module/db-table/crud三规范
2026-06-11 14:49:02 +08:00

230 lines
8.5 KiB
Python

"""pipeline_dist 模块 - 分销商管理模块"""
import json
from appPublic.uniqueID import getID
from sqlor.dbpools import DBPools
from ahserver.serverenv import ServerEnv
from appPublic.log import debug
MODULE_NAME = 'pipeline_dist'
DBNAME = 'pipeline'
def _get_sor():
"""Get database pool and dbname for pipeline_dist module."""
return DBPools(), DBNAME
async def distributors_create(params_kw):
"""创建分销商记录"""
result = {'success': False, 'message': ''}
try:
db, dbname = _get_sor()
async with db.sqlorContext(dbname) as sor:
data = params_kw.copy()
data.pop('page', None)
data.pop('rows', None)
data.pop('data_filter', None)
data['id'] = getID()
await sor.C('distributors', data)
result['success'] = True
result['message'] = '创建成功'
result['id'] = data['id']
except Exception as e:
result['message'] = str(e)
return json.dumps(result, ensure_ascii=False, default=str)
async def distributors_update(params_kw):
"""更新分销商记录"""
result = {'success': False, 'message': ''}
try:
db, dbname = _get_sor()
async with db.sqlorContext(dbname) as sor:
data = params_kw.copy()
data.pop('page', None)
data.pop('rows', None)
data.pop('data_filter', None)
if not data.get('id'):
result['message'] = '缺少id'
else:
await sor.U('distributors', data)
result['success'] = True
result['message'] = '更新成功'
except Exception as e:
result['message'] = str(e)
return json.dumps(result, ensure_ascii=False, default=str)
async def distributors_delete(params_kw):
"""删除分销商记录"""
result = {'success': False, 'message': ''}
try:
db, dbname = _get_sor()
async with db.sqlorContext(dbname) as sor:
record_id = params_kw.get('id')
if not record_id:
result['message'] = '缺少id'
else:
await sor.D('distributors', {'id': record_id})
result['success'] = True
result['message'] = '删除成功'
except Exception as e:
result['message'] = str(e)
return json.dumps(result, ensure_ascii=False, default=str)
async def distributor_pipeline_create(params_kw):
"""创建分销商-产线关联记录"""
result = {'success': False, 'message': ''}
try:
db, dbname = _get_sor()
async with db.sqlorContext(dbname) as sor:
data = params_kw.copy()
data.pop('page', None)
data.pop('rows', None)
data.pop('data_filter', None)
data['id'] = getID()
await sor.C('distributor_pipeline', data)
result['success'] = True
result['message'] = '创建成功'
result['id'] = data['id']
except Exception as e:
result['message'] = str(e)
return json.dumps(result, ensure_ascii=False, default=str)
async def distributor_pipeline_update(params_kw):
"""更新分销商-产线关联记录"""
result = {'success': False, 'message': ''}
try:
db, dbname = _get_sor()
async with db.sqlorContext(dbname) as sor:
data = params_kw.copy()
data.pop('page', None)
data.pop('rows', None)
data.pop('data_filter', None)
if not data.get('id'):
result['message'] = '缺少id'
else:
await sor.U('distributor_pipeline', data)
result['success'] = True
result['message'] = '更新成功'
except Exception as e:
result['message'] = str(e)
return json.dumps(result, ensure_ascii=False, default=str)
async def distributor_pipeline_delete(params_kw):
"""删除分销商-产线关联记录"""
result = {'success': False, 'message': ''}
try:
db, dbname = _get_sor()
async with db.sqlorContext(dbname) as sor:
record_id = params_kw.get('id')
if not record_id:
result['message'] = '缺少id'
else:
await sor.D('distributor_pipeline', {'id': record_id})
result['success'] = True
result['message'] = '删除成功'
except Exception as e:
result['message'] = str(e)
return json.dumps(result, ensure_ascii=False, default=str)
async def generate_api_key(params_kw):
"""Generate a new API key for a distributor"""
result = {'success': False, 'message': ''}
try:
distributor_id = params_kw.get('id') or params_kw.get('distributor_id')
if not distributor_id:
result['message'] = '缺少distributor_id'
return json.dumps(result, ensure_ascii=False, default=str)
new_key = getID() + getID()
db, dbname = _get_sor()
async with db.sqlorContext(dbname) as sor:
await sor.U('distributors', {'id': distributor_id, 'api_key': new_key})
result['success'] = True
result['key'] = new_key
result['distributor_id'] = distributor_id
except Exception as e:
result['message'] = str(e)
return json.dumps(result, ensure_ascii=False, default=str)
async def calculate_price(params_kw):
"""Calculate effective price for a distributor-pipeline combination.
Returns the effective price considering custom pricing and markup.
If no custom pricing exists, returns None (caller should use base price).
"""
result = {'success': False, 'message': '', 'price': None}
try:
distributor_id = params_kw.get('distributor_id')
pipeline_id = params_kw.get('pipeline_id')
if not distributor_id or not pipeline_id:
result['message'] = '缺少distributor_id或pipeline_id'
return json.dumps(result, ensure_ascii=False, default=str)
db, dbname = _get_sor()
async with db.sqlorContext(dbname) as sor:
rows = await sor.sqlExe(
'select * from distributor_pipeline where distributor_id = :distributor_id and pipeline_id = :pipeline_id',
{'distributor_id': distributor_id, 'pipeline_id': pipeline_id}
)
if not rows:
result['success'] = True
result['price'] = None
return json.dumps(result, ensure_ascii=False, default=str)
dp = rows[0] if isinstance(rows, list) else rows
# If custom_price is set, use it as base
base_price = None
custom_price = dp.get('custom_price') if hasattr(dp, 'get') else getattr(dp, 'custom_price', None)
if custom_price is not None:
base_price = custom_price
# Apply markup if configured
markup_type = dp.get('markup_type') if hasattr(dp, 'get') else getattr(dp, 'markup_type', None)
markup_value = dp.get('markup_value') if hasattr(dp, 'get') else getattr(dp, 'markup_value', None)
if markup_type and markup_value is not None and base_price is not None:
if markup_type == 'fixed':
result['price'] = base_price + markup_value
elif markup_type == 'percentage':
result['price'] = base_price * (1 + markup_value / 100)
else:
result['price'] = base_price
result['success'] = True
except Exception as e:
result['message'] = str(e)
return json.dumps(result, ensure_ascii=False, default=str)
def load_pipeline_dist():
"""注册函数到 ServerEnv"""
env = ServerEnv()
# Distributors
env.distributors_create = distributors_create
env.distributors_creates = distributors_create
env.distributors_update = distributors_update
env.distributors_updates = distributors_update
env.distributors_delete = distributors_delete
env.distributors_deletes = distributors_delete
# Distributor Pipeline
env.distributor_pipeline_create = distributor_pipeline_create
env.distributor_pipeline_creates = distributor_pipeline_create
env.distributor_pipeline_update = distributor_pipeline_update
env.distributor_pipeline_updates = distributor_pipeline_update
env.distributor_pipeline_delete = distributor_pipeline_delete
env.distributor_pipeline_deletes = distributor_pipeline_delete
# Utility functions
env.generate_api_key = generate_api_key
env.calculate_price = calculate_price
debug(f'[{MODULE_NAME}] module loaded')
return True