Hermes Agent 2a7be96ec9 fix: use accounting_status='accounted' instead of !='failed'
Only explicitly accounted records count as successful transactions.
2026-06-19 15:23:03 +08:00

616 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Dashboard data functions - exposed via load_dashboard() on ServerEnv"""
# Force re-sync: file was truncated in working copy on some deployments
from ahserver.serverenv import ServerEnv
from sqlor.dbpools import get_sor_context
from datetime import datetime, timedelta, date
async def get_today_usage(request):
"""获取当天llmusage笔数排除记账失败"""
env = request._run_ns
today = env.curDateString()
async with get_sor_context(env, 'sage') as sor:
sql = "SELECT COUNT(*) as cnt FROM llmusage WHERE use_date = ${today}$ AND accounting_status = 'accounted'"
recs = await sor.sqlExe(sql, {'today': today})
cnt = int(recs[0].get('cnt', 0)) if recs else 0
return cnt
async def get_today_amount(request):
"""获取当天交易金额(排除记账失败)"""
env = request._run_ns
today = env.curDateString()
async with get_sor_context(env, 'sage') as sor:
sql = "SELECT COALESCE(SUM(amount), 0) as total_amount FROM llmusage WHERE use_date = ${today}$ AND accounting_status = 'accounted'"
recs = await sor.sqlExe(sql, {'today': today})
amount = float(recs[0].get('total_amount', 0)) if recs else 0.0
return amount
async def get_usage_trend(request):
"""计算今日调用量趋势(与昨日对比)"""
env = request._run_ns
today = env.curDateString()
yesterday = (date.today() - timedelta(days=1)).isoformat()
async with get_sor_context(env, 'sage') as sor:
# 今日
sql_today = "SELECT COUNT(*) as cnt FROM llmusage WHERE use_date = ${today}$ AND accounting_status = 'accounted'"
recs_today = await sor.sqlExe(sql_today, {'today': today})
today_cnt = int(recs_today[0].get('cnt', 0)) if recs_today else 0
# 昨日
sql_yesterday = "SELECT COUNT(*) as cnt FROM llmusage WHERE use_date = ${yesterday}$ AND accounting_status = 'accounted'"
recs_yesterday = await sor.sqlExe(sql_yesterday, {'yesterday': yesterday})
yesterday_cnt = int(recs_yesterday[0].get('cnt', 0)) if recs_yesterday else 0
# 计算趋势
if yesterday_cnt == 0:
return {'trend': 'flat', 'percentage': 0, 'value': today_cnt}
change_pct = ((today_cnt - yesterday_cnt) / yesterday_cnt) * 100
if change_pct > 5:
trend = 'up'
elif change_pct < -5:
trend = 'down'
else:
trend = 'flat'
return {'trend': trend, 'percentage': abs(change_pct), 'value': today_cnt}
async def get_amount_trend(request):
"""计算今日金额趋势(与昨日对比)"""
env = request._run_ns
today = env.curDateString()
yesterday = (date.today() - timedelta(days=1)).isoformat()
async with get_sor_context(env, 'sage') as sor:
# 今日
sql_today = "SELECT COALESCE(SUM(amount), 0) as total_amount FROM llmusage WHERE use_date = ${today}$ AND accounting_status = 'accounted'"
recs_today = await sor.sqlExe(sql_today, {'today': today})
today_amount = float(recs_today[0].get('total_amount', 0)) if recs_today else 0.0
# 昨日
sql_yesterday = "SELECT COALESCE(SUM(amount), 0) as total_amount FROM llmusage WHERE use_date = ${yesterday}$ AND accounting_status = 'accounted'"
recs_yesterday = await sor.sqlExe(sql_yesterday, {'yesterday': yesterday})
yesterday_amount = float(recs_yesterday[0].get('total_amount', 0)) if recs_yesterday else 0.0
# 计算趋势
if yesterday_amount == 0:
return {'trend': 'flat', 'percentage': 0, 'value': today_amount}
change_pct = ((today_amount - yesterday_amount) / yesterday_amount) * 100
if change_pct > 5:
trend = 'up'
elif change_pct < -5:
trend = 'down'
else:
trend = 'flat'
return {'trend': trend, 'percentage': abs(change_pct), 'value': today_amount}
async def get_total_users(request):
"""获取用户总数"""
env = request._run_ns
async with get_sor_context(env, 'sage') as sor:
sql = "SELECT COUNT(*) as total_users FROM users"
recs = await sor.sqlExe(sql, {})
total = int(recs[0].get('total_users', 0)) if recs else 0
return total
async def get_concurrent_users(request):
"""获取当前并发用户数近5分钟有活跃记录的用户"""
now = datetime.now()
five_min_ago = (now - timedelta(minutes=5)).strftime('%Y-%m-%d %H:%M:%S')
env = request._run_ns
today = env.curDateString()
async with get_sor_context(env, 'sage') as sor:
sql = """
SELECT COUNT(DISTINCT userid) as concurrent_users
FROM llmusage
WHERE use_date = ${today}$
AND use_time >= ${five_min_ago}$
"""
recs = await sor.sqlExe(sql, {'today': today, 'five_min_ago': five_min_ago})
cnt = int(recs[0].get('concurrent_users', 0)) if recs else 0
return cnt
async def get_top_models(request):
"""获取当天排名前三的模型返回记录列表供ChartBar使用"""
# today = date.today().isoformat()
env = request._run_ns
today = env.curDateString()
async with get_sor_context(env, 'sage') as sor:
sql = """
SELECT
b.name as model_name,
COUNT(*) as cnt,
COALESCE(SUM(a.amount), 0) as total_amount
FROM llmusage a
LEFT JOIN llm b ON a.llmid = b.id
WHERE a.use_date = ${today}$
GROUP BY a.llmid, b.name
ORDER BY cnt DESC
LIMIT 5
"""
recs = await sor.sqlExe(sql, {'today': today})
result = []
for r in recs:
result.append({
'model_name': r.get('model_name', 'Unknown'),
'cnt': int(r.get('cnt', 0)),
'total_amount': float(r.get('total_amount', 0))
})
return result
async def get_accounting_errors(request):
"""获取当天记账错误笔数accounting_status='failed'"""
env = request._run_ns
today = env.curDateString()
async with get_sor_context(env, 'sage') as sor:
sql = "SELECT COUNT(*) as cnt FROM llmusage WHERE use_date = ${today}$ AND accounting_status = 'failed'"
recs = await sor.sqlExe(sql, {'today': today})
cnt = int(recs[0].get('cnt', 0)) if recs else 0
return cnt
async def get_top_users_by_amount(request):
"""获取用户金额前5全量"""
env = request._run_ns
async with get_sor_context(env, 'sage') as sor:
sql = """
SELECT
COALESCE(b.nick_name, b.username) as user_name,
COALESCE(SUM(a.amount), 0) as total_amount,
COUNT(*) as cnt
FROM llmusage a
LEFT JOIN users b ON a.userid = b.id
GROUP BY a.userid, b.nick_name, b.username
ORDER BY total_amount DESC
LIMIT 5
"""
recs = await sor.sqlExe(sql, {})
result = []
for r in recs:
result.append({
'user_name': r.get('user_name', 'Unknown'),
'total_amount': round(float(r.get('total_amount', 0)), 2),
'cnt': int(r.get('cnt', 0))
})
return result
async def get_top_users_by_count(request):
"""获取用户笔数前5全量"""
env = request._run_ns
async with get_sor_context(env, 'sage') as sor:
sql = """
SELECT
COALESCE(b.nick_name, b.username) as user_name,
COUNT(*) as cnt,
COALESCE(SUM(a.amount), 0) as total_amount
FROM llmusage a
LEFT JOIN users b ON a.userid = b.id
GROUP BY a.userid, b.nick_name, b.username
ORDER BY cnt DESC
LIMIT 5
"""
recs = await sor.sqlExe(sql, {})
result = []
for r in recs:
result.append({
'user_name': r.get('user_name', 'Unknown'),
'cnt': int(r.get('cnt', 0)),
'total_amount': round(float(r.get('total_amount', 0)), 2)
})
return result
async def get_top_providers_by_amount(request):
"""获取模型供应商金额前5全量"""
env = request._run_ns
async with get_sor_context(env, 'sage') as sor:
sql = """
SELECT
COALESCE(c.orgname, 'Unknown') as provider_name,
COALESCE(SUM(a.amount), 0) as total_amount,
COUNT(*) as cnt
FROM llmusage a
LEFT JOIN llm b ON a.llmid = b.id
LEFT JOIN organization c ON b.providerid = c.id
GROUP BY b.providerid, c.orgname
ORDER BY total_amount DESC
LIMIT 5
"""
recs = await sor.sqlExe(sql, {})
result = []
for r in recs:
result.append({
'provider_name': r.get('provider_name', 'Unknown'),
'total_amount': round(float(r.get('total_amount', 0)), 2),
'cnt': int(r.get('cnt', 0))
})
return result
async def get_top_providers_by_count(request):
"""获取模型供应商笔数前5全量"""
env = request._run_ns
async with get_sor_context(env, 'sage') as sor:
sql = """
SELECT
COALESCE(c.orgname, 'Unknown') as provider_name,
COUNT(*) as cnt,
COALESCE(SUM(a.amount), 0) as total_amount
FROM llmusage a
LEFT JOIN llm b ON a.llmid = b.id
LEFT JOIN organization c ON b.providerid = c.id
GROUP BY b.providerid, c.orgname
ORDER BY cnt DESC
LIMIT 5
"""
recs = await sor.sqlExe(sql, {})
result = []
for r in recs:
result.append({
'provider_name': r.get('provider_name', 'Unknown'),
'cnt': int(r.get('cnt', 0)),
'total_amount': round(float(r.get('total_amount', 0)), 2)
})
return result
async def get_top_users_combined(request):
"""Top 5 users by amount with count - for combined ChartBar"""
env = request._run_ns
async with get_sor_context(env, 'sage') as sor:
sql = """
SELECT
COALESCE(b.nick_name, b.username) as user_name,
COUNT(*) as cnt,
COALESCE(SUM(a.amount), 0) as total_amount
FROM llmusage a
LEFT JOIN users b ON a.userid = b.id
GROUP BY a.userid, b.nick_name, b.username
ORDER BY total_amount DESC
LIMIT 5
"""
recs = await sor.sqlExe(sql, {})
result = []
for r in recs:
result.append({
'user_name': r.get('user_name', 'Unknown'),
'cnt': int(r.get('cnt', 0)),
'total_amount': round(float(r.get('total_amount', 0)), 2)
})
return result
async def get_top_providers_combined(request):
"""Top 5 providers by amount with count - for combined ChartBar"""
env = request._run_ns
async with get_sor_context(env, 'sage') as sor:
sql = """
SELECT
COALESCE(c.orgname, 'Unknown') as provider_name,
COUNT(*) as cnt,
COALESCE(SUM(a.amount), 0) as total_amount
FROM llmusage a
LEFT JOIN llm b ON a.llmid = b.id
LEFT JOIN organization c ON b.providerid = c.id
GROUP BY b.providerid, c.orgname
ORDER BY total_amount DESC
LIMIT 5
"""
recs = await sor.sqlExe(sql, {})
result = []
for r in recs:
result.append({
'provider_name': r.get('provider_name', 'Unknown'),
'cnt': int(r.get('cnt', 0)),
'total_amount': round(float(r.get('total_amount', 0)), 2)
})
return result
async def get_active_users_today(request):
"""获取今日活跃用户数今日有llmusage记录的去重用户"""
env = request._run_ns
today = env.curDateString()
tomorrow = (datetime.now() + timedelta(days=1)).strftime('%Y-%m-%d')
async with get_sor_context(env, 'sage') as sor:
sql = """
SELECT COUNT(DISTINCT userid) as cnt FROM llmusage
WHERE use_date >= ${today}$ AND use_date < ${tomorrow}$
"""
recs = await sor.sqlExe(sql, {'today': today, 'tomorrow': tomorrow})
cnt = int(recs[0].get('cnt', 0)) if recs else 0
return cnt
async def get_new_users_month(request):
"""获取本月新增用户数"""
env = request._run_ns
month_start = datetime.now().strftime('%Y-%m-01')
async with get_sor_context(env, 'sage') as sor:
sql = """
SELECT COUNT(*) as cnt FROM users WHERE created_at >= ${month_start}$
"""
recs = await sor.sqlExe(sql, {'month_start': month_start})
cnt = int(recs[0].get('cnt', 0)) if recs else 0
return cnt
async def get_total_orgs(request):
"""获取组织机构总数"""
env = request._run_ns
async with get_sor_context(env, 'sage') as sor:
sql = "SELECT COUNT(*) as cnt FROM organization"
recs = await sor.sqlExe(sql, {})
cnt = int(recs[0].get('cnt', 0)) if recs else 0
return cnt
async def get_user_today_models(request):
"""获取当前用户当天各模型调用次数和金额(用户级监控项)"""
env = request._run_ns
userid = await env.get_user()
if not userid:
return []
today = env.curDateString()
async with get_sor_context(env, 'sage') as sor:
sql = """
SELECT
COALESCE(b.name, 'Unknown') as model_name,
COUNT(*) as cnt,
COALESCE(SUM(a.amount), 0) as total_amount
FROM llmusage a
LEFT JOIN llm b ON a.llmid = b.id
WHERE a.use_date = ${today}$
AND a.userid = ${userid}$
GROUP BY a.llmid, b.name
ORDER BY cnt DESC
"""
recs = await sor.sqlExe(sql, {'today': today, 'userid': userid})
result = []
for r in recs:
result.append({
'model_name': r.get('model_name', 'Unknown'),
'cnt': int(r.get('cnt', 0)),
'total_amount': round(float(r.get('total_amount', 0)), 4)
})
return result
async def get_all_today_models(request):
"""获取所有用户当天各模型调用次数和金额(全局监控项)"""
env = request._run_ns
today = env.curDateString()
async with get_sor_context(env, 'sage') as sor:
sql = """
SELECT
COALESCE(b.name, 'Unknown') as model_name,
COUNT(*) as cnt,
COALESCE(SUM(a.amount), 0) as total_amount
FROM llmusage a
LEFT JOIN llm b ON a.llmid = b.id
WHERE a.use_date = ${today}$
GROUP BY a.llmid, b.name
ORDER BY cnt DESC
"""
recs = await sor.sqlExe(sql, {'today': today})
result = []
for r in recs:
result.append({
'model_name': r.get('model_name', 'Unknown'),
'cnt': int(r.get('cnt', 0)),
'total_amount': round(float(r.get('total_amount', 0)), 4)
})
return result
# ── Customer (org-level) monitoring functions ──
async def _get_org_id(request):
"""Helper: get current user's org_id from request context."""
env = request._run_ns
org_id = await env.get_userorgid()
return org_id or '0'
async def get_customer_daily_models(request):
"""获取当前客户组织当天各模型调用次数和金额"""
env = request._run_ns
org_id = await _get_org_id(request)
today = env.curDateString()
async with get_sor_context(env, 'sage') as sor:
sql = """
SELECT
COALESCE(b.name, 'Unknown') as model_name,
COUNT(*) as cnt,
COALESCE(SUM(a.amount), 0) as total_amount
FROM llmusage a
LEFT JOIN llm b ON a.llmid = b.id
WHERE a.use_date = ${today}$
AND a.userorgid = ${org_id}$
GROUP BY a.llmid, b.name
ORDER BY cnt DESC
"""
recs = await sor.sqlExe(sql, {'today': today, 'org_id': org_id})
result = []
for r in recs:
result.append({
'model_name': r.get('model_name', 'Unknown'),
'cnt': int(r.get('cnt', 0)),
'total_amount': round(float(r.get('total_amount', 0)), 4)
})
return result
async def get_customer_monthly_models(request):
"""获取当前客户组织当月各模型调用次数和金额"""
env = request._run_ns
org_id = await _get_org_id(request)
now = datetime.now()
month_start = now.strftime('%Y-%m-01')
if now.month == 12:
month_end = f'{now.year + 1}-01-01'
else:
month_end = f'{now.year}-{now.month + 1:02d}-01'
async with get_sor_context(env, 'sage') as sor:
sql = """
SELECT
COALESCE(b.name, 'Unknown') as model_name,
COUNT(*) as cnt,
COALESCE(SUM(a.amount), 0) as total_amount
FROM llmusage a
LEFT JOIN llm b ON a.llmid = b.id
WHERE a.use_date >= ${month_start}$
AND a.use_date < ${month_end}$
AND a.userorgid = ${org_id}$
GROUP BY a.llmid, b.name
ORDER BY cnt DESC
"""
recs = await sor.sqlExe(sql, {
'month_start': month_start,
'month_end': month_end,
'org_id': org_id
})
result = []
for r in recs:
result.append({
'model_name': r.get('model_name', 'Unknown'),
'cnt': int(r.get('cnt', 0)),
'total_amount': round(float(r.get('total_amount', 0)), 4)
})
return result
async def get_customer_daily_summary(request):
"""获取当前客户组织今日汇总(调用次数+金额)"""
env = request._run_ns
org_id = await _get_org_id(request)
today = env.curDateString()
async with get_sor_context(env, 'sage') as sor:
sql = """
SELECT
COUNT(*) as cnt,
COALESCE(SUM(a.amount), 0) as total_amount
FROM llmusage a
WHERE a.use_date = ${today}$
AND a.userorgid = ${org_id}$
"""
recs = await sor.sqlExe(sql, {'today': today, 'org_id': org_id})
if recs:
return {
'cnt': int(recs[0].get('cnt', 0)),
'total_amount': round(float(recs[0].get('total_amount', 0)), 4)
}
return {'cnt': 0, 'total_amount': 0}
async def get_customer_month_summary(request):
"""获取当前客户组织当月汇总(调用次数+金额)"""
env = request._run_ns
org_id = await _get_org_id(request)
now = datetime.now()
month_start = now.strftime('%Y-%m-01')
if now.month == 12:
month_end = f'{now.year + 1}-01-01'
else:
month_end = f'{now.year}-{now.month + 1:02d}-01'
async with get_sor_context(env, 'sage') as sor:
sql = """
SELECT
COUNT(*) as cnt,
COALESCE(SUM(a.amount), 0) as total_amount
FROM llmusage a
WHERE a.use_date >= ${month_start}$
AND a.use_date < ${month_end}$
AND a.userorgid = ${org_id}$
"""
recs = await sor.sqlExe(sql, {
'month_start': month_start,
'month_end': month_end,
'org_id': org_id
})
if recs:
return {
'cnt': int(recs[0].get('cnt', 0)),
'total_amount': round(float(recs[0].get('total_amount', 0)), 4)
}
return {'cnt': 0, 'total_amount': 0}
async def get_customer_daily_trend(request):
"""获取当前客户组织当月每日调用趋势(每天的调用次数和金额)"""
env = request._run_ns
org_id = await _get_org_id(request)
now = datetime.now()
month_start = now.strftime('%Y-%m-01')
if now.month == 12:
month_end = f'{now.year + 1}-01-01'
else:
month_end = f'{now.year}-{now.month + 1:02d}-01'
async with get_sor_context(env, 'sage') as sor:
sql = """
SELECT
a.use_date as date,
COUNT(*) as cnt,
COALESCE(SUM(a.amount), 0) as total_amount
FROM llmusage a
WHERE a.use_date >= ${month_start}$
AND a.use_date < ${month_end}$
AND a.userorgid = ${org_id}$
GROUP BY a.use_date
ORDER BY a.use_date ASC
"""
recs = await sor.sqlExe(sql, {
'month_start': month_start,
'month_end': month_end,
'org_id': org_id
})
result = []
for r in recs:
result.append({
'date': r.get('date', ''),
'cnt': int(r.get('cnt', 0)),
'total_amount': round(float(r.get('total_amount', 0)), 4)
})
return result
def load_dashboard():
"""Register dashboard functions on ServerEnv"""
g = ServerEnv()
g.get_today_usage = get_today_usage
g.get_today_amount = get_today_amount
g.get_usage_trend = get_usage_trend
g.get_amount_trend = get_amount_trend
g.get_total_users = get_total_users
g.get_concurrent_users = get_concurrent_users
g.get_top_models = get_top_models
g.get_accounting_errors = get_accounting_errors
g.get_top_users_by_amount = get_top_users_by_amount
g.get_top_users_by_count = get_top_users_by_count
g.get_top_providers_by_amount = get_top_providers_by_amount
g.get_top_providers_by_count = get_top_providers_by_count
g.get_top_users_combined = get_top_users_combined
g.get_top_providers_combined = get_top_providers_combined
g.get_active_users_today = get_active_users_today
g.get_new_users_month = get_new_users_month
g.get_total_orgs = get_total_orgs
g.get_user_today_models = get_user_today_models
g.get_all_today_models = get_all_today_models
g.get_customer_daily_models = get_customer_daily_models
g.get_customer_monthly_models = get_customer_monthly_models
g.get_customer_daily_summary = get_customer_daily_summary
g.get_customer_month_summary = get_customer_month_summary
g.get_customer_daily_trend = get_customer_daily_trend