perf: optimize get_inference_history query and add indexes
Query optimization (dspy): - Replace UNION ALL + sort with two parallel queries (asyncio.gather) that each use (userid, use_time) composite index - Python-side merge-sort of two pre-sorted sequences O(n) - Concurrent FileStorage reads for ioinfo (asyncio.gather) Indexes (models/*.json + /tmp/llmage_history_indexes.sql): - llmusage: add idx_llmusage_userid_usetime (userid, use_time) - llmusage_history: add idx_lh_userid_usetime (userid, use_time) (was missing userid index entirely - main bottleneck)
This commit is contained in:
parent
2ebe811c34
commit
d4e455ba9a
@ -135,6 +135,14 @@
|
|||||||
"accounting_status",
|
"accounting_status",
|
||||||
"use_date"
|
"use_date"
|
||||||
]
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "idx_llmusage_userid_usetime",
|
||||||
|
"idxtype": "index",
|
||||||
|
"idxfields": [
|
||||||
|
"userid",
|
||||||
|
"use_time"
|
||||||
|
]
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
@ -136,6 +136,11 @@
|
|||||||
"name": "idx_lh_backup_time",
|
"name": "idx_lh_backup_time",
|
||||||
"idxtype": "index",
|
"idxtype": "index",
|
||||||
"idxfields": ["backup_time"]
|
"idxfields": ["backup_time"]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "idx_lh_userid_usetime",
|
||||||
|
"idxtype": "index",
|
||||||
|
"idxfields": ["userid", "use_time"]
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,5 +1,6 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
import json
|
import json
|
||||||
|
import asyncio
|
||||||
|
|
||||||
result = {'success': False, 'rows': [], 'total': 0, 'page': 1, 'page_size': 10}
|
result = {'success': False, 'rows': [], 'total': 0, 'page': 1, 'page_size': 10}
|
||||||
|
|
||||||
@ -22,63 +23,71 @@ try:
|
|||||||
|
|
||||||
where_clause = " and ".join(conditions)
|
where_clause = " and ".join(conditions)
|
||||||
|
|
||||||
# Count total from both tables
|
# Count total from both tables (并行两个 count 查询)
|
||||||
count_sql = f"""
|
count_sql = f"select count(*) as cnt from {{table}} where {where_clause}"
|
||||||
select count(*) as cnt from (
|
cnt1_recs, cnt2_recs = await asyncio.gather(
|
||||||
select id from llmusage where {where_clause}
|
sor.sqlExe(count_sql.format(table='llmusage'), ns),
|
||||||
union all
|
sor.sqlExe(count_sql.format(table='llmusage_history'), ns),
|
||||||
select id from llmusage_history where {where_clause}
|
)
|
||||||
) t
|
total = (cnt1_recs[0].cnt if cnt1_recs else 0) + (cnt2_recs[0].cnt if cnt2_recs else 0)
|
||||||
"""
|
|
||||||
count_recs = await sor.sqlExe(count_sql, ns)
|
|
||||||
total = count_recs[0].cnt if count_recs else 0
|
|
||||||
|
|
||||||
# UNION ALL query with pagination, time descending
|
# 优化点 1: 分别查询两张表, 让各自走 (userid, use_time) 复合索引
|
||||||
|
# 每表取前 offset+page_size 条 (已按 use_time desc 排好)
|
||||||
offset = (page - 1) * page_size
|
offset = (page - 1) * page_size
|
||||||
query_sql = f"""
|
fetch = offset + page_size
|
||||||
select id, llmid, use_date, use_time, userid, usages, ioinfo,
|
select_cols = ("id, llmid, use_date, use_time, userid, usages, ioinfo, "
|
||||||
status, taskid, amount, cost, userorgid, accounting_status
|
"status, taskid, amount, cost, userorgid, accounting_status")
|
||||||
from (
|
|
||||||
select id, llmid, use_date, use_time, userid, usages, ioinfo,
|
|
||||||
status, taskid, amount, cost, userorgid, accounting_status
|
|
||||||
from llmusage where {where_clause}
|
|
||||||
union all
|
|
||||||
select id, llmid, use_date, use_time, userid, usages, ioinfo,
|
|
||||||
status, taskid, amount, cost, userorgid, accounting_status
|
|
||||||
from llmusage_history where {where_clause}
|
|
||||||
) t
|
|
||||||
order by use_time desc
|
|
||||||
limit {page_size} offset {offset}
|
|
||||||
"""
|
|
||||||
recs = await sor.sqlExe(query_sql, ns)
|
|
||||||
|
|
||||||
rows = []
|
q1 = f"select {select_cols} from llmusage where {where_clause} order by use_time desc limit {fetch}"
|
||||||
for r in (recs or []):
|
q2 = f"select {select_cols} from llmusage_history where {where_clause} order by use_time desc limit {fetch}"
|
||||||
row = dict(r)
|
recs1, recs2 = await asyncio.gather(
|
||||||
# Read ioinfo content from FileStorage
|
sor.sqlExe(q1, ns),
|
||||||
|
sor.sqlExe(q2, ns),
|
||||||
|
)
|
||||||
|
|
||||||
|
# 优化点 2: Python 归并两个已排序序列 (O(n) 比 SQL UNION+sort 快)
|
||||||
|
merged = []
|
||||||
|
i = j = 0
|
||||||
|
rows1 = [dict(r) for r in (recs1 or [])]
|
||||||
|
rows2 = [dict(r) for r in (recs2 or [])]
|
||||||
|
while i < len(rows1) and j < len(rows2):
|
||||||
|
if (rows1[i].get('use_time') or '') >= (rows2[j].get('use_time') or ''):
|
||||||
|
merged.append(rows1[i]); i += 1
|
||||||
|
else:
|
||||||
|
merged.append(rows2[j]); j += 1
|
||||||
|
merged.extend(rows1[i:])
|
||||||
|
merged.extend(rows2[j:])
|
||||||
|
|
||||||
|
# 应用分页
|
||||||
|
page_rows = merged[offset:offset + page_size]
|
||||||
|
|
||||||
|
# 优化点 3: 并发读取 ioinfo 文件 (不再串行 await)
|
||||||
|
import aiofiles
|
||||||
|
from ahserver.filestorage import FileStorage
|
||||||
|
fs = FileStorage()
|
||||||
|
|
||||||
|
async def _load_io(row):
|
||||||
webpath = row.get('ioinfo')
|
webpath = row.get('ioinfo')
|
||||||
io_content = None
|
io_content = None
|
||||||
if webpath:
|
if webpath:
|
||||||
try:
|
try:
|
||||||
from ahserver.filestorage import FileStorage
|
|
||||||
fs = FileStorage()
|
|
||||||
real_path = fs.realPath(webpath)
|
real_path = fs.realPath(webpath)
|
||||||
import aiofiles
|
|
||||||
async with aiofiles.open(real_path, 'rb') as f:
|
async with aiofiles.open(real_path, 'rb') as f:
|
||||||
bin_data = await f.read()
|
bin_data = await f.read()
|
||||||
io_content = json.loads(bin_data.decode('utf-8'))
|
io_content = json.loads(bin_data.decode('utf-8'))
|
||||||
except Exception:
|
except Exception:
|
||||||
io_content = None
|
io_content = None
|
||||||
row['io_content'] = io_content
|
row['io_content'] = io_content
|
||||||
# Parse usages if it's a JSON string
|
|
||||||
if isinstance(row.get('usages'), str):
|
if isinstance(row.get('usages'), str):
|
||||||
try:
|
try:
|
||||||
row['usages'] = json.loads(row['usages'])
|
row['usages'] = json.loads(row['usages'])
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
rows.append(row)
|
return row
|
||||||
|
|
||||||
result['rows'] = rows
|
rows = await asyncio.gather(*[_load_io(r) for r in page_rows])
|
||||||
|
|
||||||
|
result['rows'] = list(rows)
|
||||||
result['total'] = total
|
result['total'] = total
|
||||||
result['page'] = page
|
result['page'] = page
|
||||||
result['page_size'] = page_size
|
result['page_size'] = page_size
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user