From 7e4069f3b6fc668cc2f6e1e07049ee601a8146b7 Mon Sep 17 00:00:00 2001 From: yumoqing Date: Sat, 30 May 2026 12:19:42 +0800 Subject: [PATCH] refactor: get_llm() uses get_llmage_llm() + cached uapi/uapiio lookups Replace 6-table JOIN with 3-step approach: 1. get_llmage_llm() for base info (llm + llm_api_map + llmcatelog) 2. Cached uapi lookup (ioid, stream, callbackurl) 3. Cached uapiio lookup (input_fields) Benefits: - Code reuse: eliminates duplicate SQL - Performance: uapi/uapiio cached with 5min TTL - Maintainability: separate concerns for model info vs API config - Adds invalidate_uapi_cache() for config changes --- llmage/init.py | 2 + llmage/utils.py | 138 +++++++++++++++++++++++++++++------------------- 2 files changed, 85 insertions(+), 55 deletions(-) diff --git a/llmage/init.py b/llmage/init.py index 93487c6..e65715b 100644 --- a/llmage/init.py +++ b/llmage/init.py @@ -17,6 +17,7 @@ from .utils import ( get_llmproviders, get_llm, get_llmage_llm, + invalidate_uapi_cache, ) from .llmclient import ( @@ -54,6 +55,7 @@ def load_llmage(): env.query_task_status = query_task_status env.get_llm = get_llm env.get_llmage_llm = get_llmage_llm + env.invalidate_uapi_cache = invalidate_uapi_cache env.inference = inference env.inference_generator = inference_generator env.get_llms_by_catelog = get_llms_by_catelog diff --git a/llmage/utils.py b/llmage/utils.py index beb29c4..8a60499 100644 --- a/llmage/utils.py +++ b/llmage/utils.py @@ -1,21 +1,68 @@ import json -import time import asyncio import aiofiles from random import randint from functools import partial from traceback import format_exc +from time import time from sqlor.dbpools import DBPools, get_sor_context from appPublic.log import debug, exception, error, critical from appPublic.uniqueID import getID from appPublic.dictObject import DictObject from appPublic.timeUtils import curDateString, timestampstr -from uapi.appapi import UAPI, sor_get_callerid, sor_get_uapi +from uapi.appapi import UAPI, sor_get_callerid, sor_get_uapi, get_uapi from ahserver.serverenv import get_serverenv, ServerEnv from ahserver.filestorage import FileStorage from appPublic.jsonConfig import getConfig from appPublic.streamhttpclient import StreamHttpClient +# ============================================================= +# Process-level cache for uapi/uapiio (static config, rarely changes) +# ============================================================= +_UAPI_CACHE_TTL = 300 # 5 minutes +_uapi_cache = {} # key: "upappid:apiname" -> {data, ts} +_uapiio_cache = {} # key: "ioid" -> {data, ts} + + +async def _get_uapi_cached(upappid, apiname): + """Get uapi record with process-level cache (uapi config rarely changes)""" + global _uapi_cache + cache_key = f"{upappid}:{apiname}" + cached = _uapi_cache.get(cache_key) + if cached and (time() - cached['ts']) < _UAPI_CACHE_TTL: + return cached['data'] + uapi_rec = await get_uapi(upappid, apiname) + _uapi_cache[cache_key] = {'data': uapi_rec, 'ts': time()} + return uapi_rec + + +async def _get_uapiio_cached(ioid): + """Get uapiio record with process-level cache (io config rarely changes)""" + global _uapiio_cache + if ioid is None: + return None + cached = _uapiio_cache.get(ioid) + if cached and (time() - cached['ts']) < _UAPI_CACHE_TTL: + return cached['data'] + env = ServerEnv() + uapi_dbname = get_serverenv('get_module_dbname')('uapi') + async with DBPools().sqlorContext(uapi_dbname) as sor: + recs = await sor.R('uapiio', {'id': ioid}) + result = recs[0] if recs else None + _uapiio_cache[ioid] = {'data': result, 'ts': time()} + return result + + +def invalidate_uapi_cache(upappid=None, apiname=None): + """Invalidate uapi/uapiio cache entries. Call when uapi config changes.""" + global _uapi_cache, _uapiio_cache + if upappid and apiname: + _uapi_cache.pop(f"{upappid}:{apiname}", None) + else: + _uapi_cache.clear() + _uapiio_cache.clear() + + async def update_llmusage(ns): env = ServerEnv() async with get_sor_context(env, 'llmage') as sor: @@ -314,59 +361,40 @@ async def get_llms_by_catelog(catelogid=None, orderby='providerid'): return [] async def get_llm(llmid, catelogid=None): - today = curDateString() - env = ServerEnv() - async with get_sor_context(env, 'llmage') as sor: - sql = """select a.id, -a.name, -a.model, -a.providerid, -a.description, -a.iconid, -a.upappid, -a.ownerid, -a.min_balance, -m.llmcatelogid, -m.apiname, -m.query_apiname, -m.query_period, -m.ppid, -e.ioid, -e.stream, -e.callbackurl, -f.input_fields, -lc.name as catelogname -from llm a -,llm_api_map m -,llmcatelog lc -,upapp c -,uapi e -,uapiio f -where a.id = m.llmid -and a.upappid = c.id -and c.id = e.upappid -and m.apiname = e.name -and e.ioid = f.id -and a.id = ${llmid}$ -and a.status = 'published' -and a.expired_date > ${today}$ -and a.enabled_date <= ${today}$ -""" - ns = {'llmid': llmid, 'today': today} - if catelogid: - sql += ' and m.llmcatelogid = ${catelogid}$ ' - ns['catelogid'] = catelogid - else: - sql += " and m.isdefaultcatelog = '1'" - recs = await sor.sqlExe(sql, ns.copy()) - if len(recs) > 0: - r = recs[0] - return r - else: - debug(f'{llmid=} not found, {ns=}, {sql=}') - return None - exception(f'Error: {format_exc()}') - return None + """Get LLM with full uapi info for vendor API calls. + Refactored to use get_llmage_llm() + cached uapi/uapiio lookups + instead of a 6-table JOIN. + + Returns DictObject with merged fields: + From get_llmage_llm: id, name, model, providerid, description, + iconid, upappid, ownerid, min_balance, status, llmcatelogid, + apiname, query_apiname, query_period, ppid, isdefaultcatelog, + catelogname + From uapi (cached): ioid, stream, callbackurl + From uapiio (cached): input_fields + """ + # Step 1: Get base info from get_llmage_llm (3-table JOIN: llm + llm_api_map + llmcatelog) + llm = await get_llmage_llm(llmid, catelogid) + if not llm: + debug(f'{llmid=} not found via get_llmage_llm') + return None + + # Step 2: Get uapi info (cached, keyed by upappid:apiname) + uapi = await _get_uapi_cached(llm.upappid, llm.apiname) + if not uapi: + debug(f'uapi not found: upappid={llm.upappid}, apiname={llm.apiname}') + return None + + # Step 3: Get uapiio info (cached, keyed by ioid) + uapiio = await _get_uapiio_cached(uapi.ioid) + + # Merge uapi fields into llm result + llm.ioid = uapi.ioid + llm.stream = uapi.stream + llm.callbackurl = uapi.callbackurl + llm.input_fields = uapiio.input_fields if uapiio else '{}' + + return llm async def write_llmusage(llmusage):