refactor: replace import_products_from_llmage with load_product_category_product

- New function: load_product_category_product(parent_category_id)
- Reads llmage catalogs + published models, writes sub-categories + products
  into product_management DB directly
- Removed env registration (called via importlib, not ServerEnv)
This commit is contained in:
Hermes Agent 2026-06-23 14:47:15 +08:00
parent df41478170
commit 510122a282

View File

@ -46,20 +46,24 @@ from .asyncinference import (
)
async def import_products_from_llmage(org_id=None, parent_category_id=None, user_id=None):
"""Import llmage categories (llmcatelog) and models (llm) as product categories and products.
async def load_product_category_product(parent_category_id):
"""Load llmage catalogs and published models as product sub-categories and products.
Registered on ServerEnv as env.import_products_from_llmage.
Called by product_management when resource_module='llmage'.
Called by product_management.import_categories_and_products() when resource_module='llmage'.
Responsible for reading source data AND writing to product_management tables.
"""
import time
from appPublic.uniqueID import getID
env = ServerEnv()
pm_dbname = env.get_module_dbname('product_management')
# Step 1: Read source data from llmage
async with get_sor_context(env, 'llmage') as sor:
# Get all catalogs
catelogs = await sor.R('llmcatelog', {})
if not catelogs:
return {'success': False, 'error': 'llmage中没有产品类别数据'}
# Get all published LLMs with catalog info
llm_sql = """select a.id, a.name, a.model, a.description, a.status,
m.llmcatelogid, lc.name as catelogname
from llm a
@ -69,52 +73,98 @@ where m.isdefaultcatelog = '1' and a.status = 'published'
order by lc.sort_order, lc.name, a.name"""
llms = await sor.sqlExe(llm_sql, {})
# Build import_data
categories = []
cat_ids_seen = set()
for c in catelogs:
if c.id in cat_ids_seen:
continue
cat_ids_seen.add(c.id)
categories.append({
'name': c.name,
'source_id': c.id,
'parent_source_id': None,
'sort_order': getattr(c, 'sort_order', 0) or 0,
'description': getattr(c, 'description', '') or '',
'has_product': '1',
'product_type': 'llm_model',
'product_type_title': '大模型按量',
'resource_module': 'llmage'
})
# Step 2: Get parent category's org_id from product_management
now = time.strftime('%Y-%m-%d %H:%M:%S')
async with DBPools().sqlorContext(pm_dbname) as sor:
parent_rows = await sor.sqlExe(
"SELECT org_id FROM product_category WHERE id = ${id}$",
{'id': parent_category_id}
)
if not parent_rows:
return {'success': False, 'error': f'父类别 {parent_category_id} 不存在'}
org_id = parent_rows[0].org_id
products = []
for llm in (llms or []):
products.append({
'product_code': llm.model,
'product_name': llm.name,
'category_source_id': llm.llmcatelogid,
'product_type': 'llm_model',
'brief_intro': getattr(llm, 'description', '') or '',
'price': 0,
'currency': 'CNY',
'sort_order': 0,
'status': '1'
})
# Step 3: Write sub-categories and products
source_to_id = {}
created_cats = 0
skipped_cats = 0
created_prods = 0
skipped_prods = 0
import_data = {
'categories': categories,
'products': products
async with DBPools().sqlorContext(pm_dbname) as sor:
for c in catelogs:
existing = await sor.sqlExe(
"""SELECT id FROM product_category
WHERE name = ${name}$ AND parent_id = ${parent_id}$ AND org_id = ${org_id}$""",
{'name': c.name, 'parent_id': parent_category_id, 'org_id': org_id}
)
if existing:
source_to_id[c.id] = existing[0].id
skipped_cats += 1
continue
new_id = getID()
source_to_id[c.id] = new_id
cat_data = {
'id': new_id,
'parent_id': parent_category_id,
'name': c.name,
'description': getattr(c, 'description', '') or '',
'has_product': '1',
'product_type': 'llm_model',
'product_type_title': '大模型按量',
'sort_order': str(getattr(c, 'sort_order', 0) or 0),
'icon': '',
'status': '1',
'resource_module': 'llmage',
'org_id': org_id,
'created_at': now,
'updated_at': now
}
await sor.C('product_category', cat_data)
created_cats += 1
for llm in (llms or []):
target_cat_id = source_to_id.get(llm.llmcatelogid)
if not target_cat_id:
skipped_prods += 1
continue
existing_prod = await sor.sqlExe(
"""SELECT id FROM product
WHERE product_code = ${code}$ AND org_id = ${org_id}$""",
{'code': llm.model, 'org_id': org_id}
)
if existing_prod:
skipped_prods += 1
continue
prod_id = getID()
prod_data = {
'id': prod_id,
'category_id': target_cat_id,
'product_code': llm.model,
'product_name': llm.name,
'product_type': 'llm_model',
'brief_intro': getattr(llm, 'description', '') or '',
'status': '1',
'price_type': '1',
'price': '0',
'currency': 'CNY',
'sort_order': '0',
'org_id': org_id,
'created_at': now,
'updated_at': now
}
await sor.C('product', prod_data)
created_prods += 1
return {
'success': True,
'message': f'llmage导入完成: 新增 {created_cats} 个子类别, {created_prods} 个产品; '
f'跳过 {skipped_cats} 个已存在类别, {skipped_prods} 个已存在产品'
}
# Call the generic import engine
return await env.import_categories_and_products(
org_id=org_id,
parent_category_id=parent_category_id,
user_id=user_id,
import_data=import_data
)
def _on_hot_reload(data=None):
"""Event handler for hot_reload — wraps invalidate_uapi_cache to accept dispatcher's data arg."""
@ -153,7 +203,6 @@ def load_llmage():
env.backup_accounted_llmusage = backup_accounted_llmusage
env.get_failed_accounting_records = get_failed_accounting_records
env.get_llmage_stats = get_llmage_stats
env.import_products_from_llmage = import_products_from_llmage
# Bind hot_reload event — module-level function, ref safe (module keeps it alive)
if hasattr(env, 'event_dispatcher'):
env.event_dispatcher.bind('hot_reload', _on_hot_reload)