refactor: get_llm() uses get_llmage_llm() + cached uapi/uapiio lookups

Replace 6-table JOIN with 3-step approach:
1. get_llmage_llm() for base info (llm + llm_api_map + llmcatelog)
2. Cached uapi lookup (ioid, stream, callbackurl)
3. Cached uapiio lookup (input_fields)

Benefits:
- Code reuse: eliminates duplicate SQL
- Performance: uapi/uapiio cached with 5min TTL
- Maintainability: separate concerns for model info vs API config
- Adds invalidate_uapi_cache() for config changes
This commit is contained in:
yumoqing 2026-05-30 12:19:42 +08:00
parent 3ba1c50eb6
commit 7e4069f3b6
2 changed files with 85 additions and 55 deletions

View File

@ -17,6 +17,7 @@ from .utils import (
get_llmproviders, get_llmproviders,
get_llm, get_llm,
get_llmage_llm, get_llmage_llm,
invalidate_uapi_cache,
) )
from .llmclient import ( from .llmclient import (
@ -54,6 +55,7 @@ def load_llmage():
env.query_task_status = query_task_status env.query_task_status = query_task_status
env.get_llm = get_llm env.get_llm = get_llm
env.get_llmage_llm = get_llmage_llm env.get_llmage_llm = get_llmage_llm
env.invalidate_uapi_cache = invalidate_uapi_cache
env.inference = inference env.inference = inference
env.inference_generator = inference_generator env.inference_generator = inference_generator
env.get_llms_by_catelog = get_llms_by_catelog env.get_llms_by_catelog = get_llms_by_catelog

View File

@ -1,21 +1,68 @@
import json import json
import time
import asyncio import asyncio
import aiofiles import aiofiles
from random import randint from random import randint
from functools import partial from functools import partial
from traceback import format_exc from traceback import format_exc
from time import time
from sqlor.dbpools import DBPools, get_sor_context from sqlor.dbpools import DBPools, get_sor_context
from appPublic.log import debug, exception, error, critical from appPublic.log import debug, exception, error, critical
from appPublic.uniqueID import getID from appPublic.uniqueID import getID
from appPublic.dictObject import DictObject from appPublic.dictObject import DictObject
from appPublic.timeUtils import curDateString, timestampstr from appPublic.timeUtils import curDateString, timestampstr
from uapi.appapi import UAPI, sor_get_callerid, sor_get_uapi from uapi.appapi import UAPI, sor_get_callerid, sor_get_uapi, get_uapi
from ahserver.serverenv import get_serverenv, ServerEnv from ahserver.serverenv import get_serverenv, ServerEnv
from ahserver.filestorage import FileStorage from ahserver.filestorage import FileStorage
from appPublic.jsonConfig import getConfig from appPublic.jsonConfig import getConfig
from appPublic.streamhttpclient import StreamHttpClient from appPublic.streamhttpclient import StreamHttpClient
# =============================================================
# Process-level cache for uapi/uapiio (static config, rarely changes)
# =============================================================
_UAPI_CACHE_TTL = 300 # 5 minutes
_uapi_cache = {} # key: "upappid:apiname" -> {data, ts}
_uapiio_cache = {} # key: "ioid" -> {data, ts}
async def _get_uapi_cached(upappid, apiname):
"""Get uapi record with process-level cache (uapi config rarely changes)"""
global _uapi_cache
cache_key = f"{upappid}:{apiname}"
cached = _uapi_cache.get(cache_key)
if cached and (time() - cached['ts']) < _UAPI_CACHE_TTL:
return cached['data']
uapi_rec = await get_uapi(upappid, apiname)
_uapi_cache[cache_key] = {'data': uapi_rec, 'ts': time()}
return uapi_rec
async def _get_uapiio_cached(ioid):
"""Get uapiio record with process-level cache (io config rarely changes)"""
global _uapiio_cache
if ioid is None:
return None
cached = _uapiio_cache.get(ioid)
if cached and (time() - cached['ts']) < _UAPI_CACHE_TTL:
return cached['data']
env = ServerEnv()
uapi_dbname = get_serverenv('get_module_dbname')('uapi')
async with DBPools().sqlorContext(uapi_dbname) as sor:
recs = await sor.R('uapiio', {'id': ioid})
result = recs[0] if recs else None
_uapiio_cache[ioid] = {'data': result, 'ts': time()}
return result
def invalidate_uapi_cache(upappid=None, apiname=None):
"""Invalidate uapi/uapiio cache entries. Call when uapi config changes."""
global _uapi_cache, _uapiio_cache
if upappid and apiname:
_uapi_cache.pop(f"{upappid}:{apiname}", None)
else:
_uapi_cache.clear()
_uapiio_cache.clear()
async def update_llmusage(ns): async def update_llmusage(ns):
env = ServerEnv() env = ServerEnv()
async with get_sor_context(env, 'llmage') as sor: async with get_sor_context(env, 'llmage') as sor:
@ -314,59 +361,40 @@ async def get_llms_by_catelog(catelogid=None, orderby='providerid'):
return [] return []
async def get_llm(llmid, catelogid=None): async def get_llm(llmid, catelogid=None):
today = curDateString() """Get LLM with full uapi info for vendor API calls.
env = ServerEnv() Refactored to use get_llmage_llm() + cached uapi/uapiio lookups
async with get_sor_context(env, 'llmage') as sor: instead of a 6-table JOIN.
sql = """select a.id,
a.name, Returns DictObject with merged fields:
a.model, From get_llmage_llm: id, name, model, providerid, description,
a.providerid, iconid, upappid, ownerid, min_balance, status, llmcatelogid,
a.description, apiname, query_apiname, query_period, ppid, isdefaultcatelog,
a.iconid, catelogname
a.upappid, From uapi (cached): ioid, stream, callbackurl
a.ownerid, From uapiio (cached): input_fields
a.min_balance, """
m.llmcatelogid, # Step 1: Get base info from get_llmage_llm (3-table JOIN: llm + llm_api_map + llmcatelog)
m.apiname, llm = await get_llmage_llm(llmid, catelogid)
m.query_apiname, if not llm:
m.query_period, debug(f'{llmid=} not found via get_llmage_llm')
m.ppid, return None
e.ioid,
e.stream, # Step 2: Get uapi info (cached, keyed by upappid:apiname)
e.callbackurl, uapi = await _get_uapi_cached(llm.upappid, llm.apiname)
f.input_fields, if not uapi:
lc.name as catelogname debug(f'uapi not found: upappid={llm.upappid}, apiname={llm.apiname}')
from llm a return None
,llm_api_map m
,llmcatelog lc # Step 3: Get uapiio info (cached, keyed by ioid)
,upapp c uapiio = await _get_uapiio_cached(uapi.ioid)
,uapi e
,uapiio f # Merge uapi fields into llm result
where a.id = m.llmid llm.ioid = uapi.ioid
and a.upappid = c.id llm.stream = uapi.stream
and c.id = e.upappid llm.callbackurl = uapi.callbackurl
and m.apiname = e.name llm.input_fields = uapiio.input_fields if uapiio else '{}'
and e.ioid = f.id
and a.id = ${llmid}$ return llm
and a.status = 'published'
and a.expired_date > ${today}$
and a.enabled_date <= ${today}$
"""
ns = {'llmid': llmid, 'today': today}
if catelogid:
sql += ' and m.llmcatelogid = ${catelogid}$ '
ns['catelogid'] = catelogid
else:
sql += " and m.isdefaultcatelog = '1'"
recs = await sor.sqlExe(sql, ns.copy())
if len(recs) > 0:
r = recs[0]
return r
else:
debug(f'{llmid=} not found, {ns=}, {sql=}')
return None
exception(f'Error: {format_exc()}')
return None
async def write_llmusage(llmusage): async def write_llmusage(llmusage):