This commit is contained in:
ping 2026-05-25 11:48:21 +08:00
parent 8b221aeccd
commit 57c8723070
2 changed files with 513 additions and 1 deletions

View File

@ -0,0 +1,512 @@
def _escape(value):
if value is None:
return None
return str(value).replace("'", "''")
def _parse_usage_content(raw):
if not raw:
return {}
if isinstance(raw, dict):
return raw
try:
return json.loads(raw)
except (TypeError, ValueError):
return {}
def _resolve_time_range(ns):
"""
解析时间范围。
优先使用 start_time + end_time否则按 range 快捷窗口:
hour -> 最近 1 小时
day -> 最近 1 天
week -> 最近 7 天
默认 day。
"""
start_time = ns.get('start_time')
end_time = ns.get('end_time')
range_type = (ns.get('range') or ns.get('range_type') or 'day').lower()
now = datetime.datetime.now()
if start_time and end_time:
start_dt = _parse_datetime(start_time)
end_dt = _parse_datetime(end_time, end_of_day=True)
if not start_dt or not end_dt:
return None, None, 'start_time / end_time 格式无效,请使用 YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS'
if start_dt > end_dt:
return None, None, 'start_time 不能晚于 end_time'
return start_dt, end_dt, None
delta_map = {
'hour': datetime.timedelta(hours=1),
'day': datetime.timedelta(days=1),
'week': datetime.timedelta(weeks=1),
}
if range_type not in delta_map:
return None, None, 'range 仅支持 hour / day / week'
end_dt = now
start_dt = now - delta_map[range_type]
return start_dt, end_dt, None
def _parse_datetime(value, end_of_day=False):
if not value:
return None
text = str(value).strip()
for fmt in ('%Y-%m-%d %H:%M:%S', '%Y-%m-%d'):
try:
dt = datetime.datetime.strptime(text, fmt)
if fmt == '%Y-%m-%d' and end_of_day:
dt = dt.replace(hour=23, minute=59, second=59)
return dt
except ValueError:
continue
return None
def _format_datetime(dt):
return dt.strftime('%Y-%m-%d %H:%M:%S')
def _group_key(dt, group_by):
if group_by == 'hour':
return dt.strftime('%Y-%m-%d %H:00:00')
if group_by == 'week':
monday = dt - datetime.timedelta(days=dt.weekday())
return monday.strftime('%Y-%m-%d')
return dt.strftime('%Y-%m-%d')
def _normalize_usage_row(row, bill_amount_map=None):
usage = _parse_usage_content(row.get('usage_content'))
orderid = row.get('orderid')
amount = float(row.get('original_price') or 0)
if bill_amount_map and orderid and orderid in bill_amount_map:
amount = float(bill_amount_map[orderid] or amount)
return {
'id': row.get('id'),
'userid': row.get('userid'),
'llmid': row.get('llmid'),
'model': usage.get('model') or row.get('model_name') or row.get('llm_model'),
'prompt_tokens': int(usage.get('prompt_tokens') or 0),
'completion_tokens': int(usage.get('completion_tokens') or 0),
'total_tokens': int(usage.get('total_tokens') or 0),
'amount': round(amount, 8),
'bill_status': row.get('bill_status'),
'orderid': orderid,
'usage_time': row.get('created_at'),
}
async def _fetch_bill_amount_map(sor, order_ids):
if not order_ids:
return {}
result = {}
for orderid in set(order_ids):
if not orderid:
continue
rows = await sor.R('bill', {'orderid': orderid, 'del_flg': '0'})
if rows:
result[orderid] = sum(float(r.get('amount') or 0) for r in rows)
return result
async def _fetch_llm_model_map(sor, llm_ids):
result = {}
for llmid in set(llm_ids):
if not llmid:
continue
rows = await sor.R('llm', {'id': llmid})
if rows:
result[llmid] = rows[0].get('model')
return result
def _build_usage_where(userid=None, userids=None, start_dt=None, end_dt=None, model_filter=None):
"""构建 model_usage 单表查询条件(不使用 JOIN兼容 sqlExe 环境)。"""
conditions = []
if userid:
conditions.append("userid = '%s'" % _escape(userid))
elif userids:
escaped = ["'%s'" % _escape(uid) for uid in userids if uid]
if not escaped:
conditions.append('1 = 0')
else:
conditions.append('userid IN (%s)' % ','.join(escaped))
if start_dt and end_dt:
conditions.append(
"created_at BETWEEN '%s' AND '%s'"
% (_format_datetime(start_dt), _format_datetime(end_dt))
)
if model_filter:
conditions.append(
"JSON_UNQUOTE(JSON_EXTRACT(usage_content, '$.model')) = '%s'"
% _escape(model_filter)
)
return conditions
async def _query_model_usage_rows(sor, conditions, limit=None, offset=None):
where_clause = ' AND '.join(conditions) if conditions else '1 = 1'
sql = """
SELECT id, userid, llmid, original_price, orderid, bill_status, usage_content, created_at
FROM model_usage
WHERE %s
ORDER BY created_at DESC
""" % where_clause
if limit is not None:
sql += ' LIMIT %d OFFSET %d' % (int(limit), int(offset or 0))
return await sor.sqlExe(sql, {})
async def _count_model_usage(sor, conditions):
where_clause = ' AND '.join(conditions) if conditions else '1 = 1'
sql = 'SELECT COUNT(*) AS total_count FROM model_usage WHERE %s' % where_clause
return (await sor.sqlExe(sql, {}))[0]['total_count']
async def _enrich_usage_rows(sor, rows):
order_ids = [row.get('orderid') for row in rows if row.get('orderid')]
llm_ids = [row.get('llmid') for row in rows if row.get('llmid')]
bill_amount_map = await _fetch_bill_amount_map(sor, order_ids)
llm_model_map = await _fetch_llm_model_map(sor, llm_ids)
items = []
for row in rows:
enriched = dict(row)
enriched['llm_model'] = llm_model_map.get(row.get('llmid'))
items.append(_normalize_usage_row(enriched, bill_amount_map))
return items
async def _fetch_customer_users(sor, orgid, customerid=None):
"""获取机构下客户及其用户映射。"""
org_rows = await sor.R('organization', {'parentid': orgid, 'del_flg': '0'})
if customerid:
org_rows = [row for row in org_rows if row.get('id') == customerid]
org_map = {row['id']: row for row in org_rows}
user_map = {}
for oid in org_map:
user_rows = await sor.R('users', {'orgid': oid, 'del_flg': '0'})
for user in user_rows:
user_map[user['id']] = user
return org_map, user_map
def _aggregate_admin_summary(items, user_map, org_map):
buckets = {}
for item in items:
user = user_map.get(item.get('userid'), {})
org = org_map.get(user.get('orgid'), {})
model = item.get('model') or 'unknown'
key = (org.get('id'), item.get('userid'), model)
if key not in buckets:
buckets[key] = {
'customerid': org.get('id'),
'customer_name': org.get('orgname'),
'userid': item.get('userid'),
'username': user.get('username'),
'user_name': user.get('name'),
'model': model,
'prompt_tokens': 0,
'completion_tokens': 0,
'total_tokens': 0,
'amount': 0.0,
'request_count': 0,
'first_usage_time': item.get('usage_time'),
'last_usage_time': item.get('usage_time'),
}
bucket = buckets[key]
bucket['prompt_tokens'] += item.get('prompt_tokens') or 0
bucket['completion_tokens'] += item.get('completion_tokens') or 0
bucket['total_tokens'] += item.get('total_tokens') or 0
bucket['amount'] = round(bucket['amount'] + float(item.get('amount') or 0), 8)
bucket['request_count'] += 1
usage_time = item.get('usage_time')
if usage_time:
bucket['last_usage_time'] = usage_time
if (
not bucket.get('first_usage_time')
or str(usage_time) < str(bucket['first_usage_time'])
):
bucket['first_usage_time'] = usage_time
return sorted(buckets.values(), key=lambda x: x['amount'], reverse=True)
def _aggregate_items(items, group_by=None):
if not group_by:
return items
buckets = {}
for item in items:
usage_time = item.get('usage_time')
if isinstance(usage_time, str):
dt = _parse_datetime(usage_time)
else:
dt = usage_time
if not dt:
key = 'unknown'
else:
key = _group_key(dt, group_by)
if key not in buckets:
buckets[key] = {
'period': key,
'model': item.get('model'),
'prompt_tokens': 0,
'completion_tokens': 0,
'total_tokens': 0,
'amount': 0.0,
'request_count': 0,
}
bucket = buckets[key]
bucket['prompt_tokens'] += item.get('prompt_tokens') or 0
bucket['completion_tokens'] += item.get('completion_tokens') or 0
bucket['total_tokens'] += item.get('total_tokens') or 0
bucket['amount'] = round(bucket['amount'] + float(item.get('amount') or 0), 8)
bucket['request_count'] += 1
return sorted(buckets.values(), key=lambda x: x['period'], reverse=True)
def _summarize(items):
return {
'request_count': len(items),
'prompt_tokens': sum(i.get('prompt_tokens') or 0 for i in items),
'completion_tokens': sum(i.get('completion_tokens') or 0 for i in items),
'total_tokens': sum(i.get('total_tokens') or 0 for i in items),
'amount': round(sum(float(i.get('amount') or 0) for i in items), 8),
}
async def model_usage_user_report(ns={}):
"""
用户查看自己的模型使用记录。
参数:
userid 可选,默认当前登录用户
start_time 开始时间,格式 YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS
end_time 结束时间
range 快捷范围hour / day / week未传 start/end 时生效,默认 day
model 按模型标识筛选usage_content.model
group_by 聚合粒度hour / day / week不传则返回明细
current_page 页码,默认 1
page_size 每页条数,默认 20
返回字段:
model, prompt_tokens, completion_tokens, total_tokens, amount, usage_time
"""
if ns.get('userid'):
userid = ns.get('userid')
else:
userid = await get_user()
if not userid:
server_error(401)
start_dt, end_dt, err = _resolve_time_range(ns)
if err:
return {'status': False, 'msg': err}
model_filter = ns.get('model')
group_by = (ns.get('group_by') or '').lower() or None
if group_by and group_by not in ('hour', 'day', 'week'):
return {'status': False, 'msg': 'group_by 仅支持 hour / day / week'}
page_size = int(ns.get('page_size', 20))
current_page = int(ns.get('current_page', 1))
offset = (current_page - 1) * page_size
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
user_rows = await sor.R('users', {'id': userid, 'del_flg': '0'})
if not user_rows:
return {'status': False, 'msg': '用户不存在'}
conditions = _build_usage_where(
userid=userid,
start_dt=start_dt,
end_dt=end_dt,
model_filter=model_filter,
)
total_count = await _count_model_usage(sor, conditions)
if group_by:
all_rows = await _query_model_usage_rows(sor, conditions)
all_items = await _enrich_usage_rows(sor, all_rows)
grouped = _aggregate_items(all_items, group_by)
return {
'status': True,
'msg': '查询成功',
'data': {
'userid': userid,
'start_time': _format_datetime(start_dt),
'end_time': _format_datetime(end_dt),
'summary': _summarize(all_items),
'group_by': group_by,
'groups': grouped,
},
}
all_rows = await _query_model_usage_rows(sor, conditions)
all_items = await _enrich_usage_rows(sor, all_rows)
page_rows = await _query_model_usage_rows(
sor, conditions, limit=page_size, offset=offset,
)
items = await _enrich_usage_rows(sor, page_rows)
return {
'status': True,
'msg': '查询成功',
'data': {
'userid': userid,
'start_time': _format_datetime(start_dt),
'end_time': _format_datetime(end_dt),
'summary': _summarize(all_items),
'total_count': total_count,
'current_page': current_page,
'page_size': page_size,
'items': items,
},
}
except Exception as e:
return {'status': False, 'msg': '查询失败, %s' % str(e)}
async def model_usage_admin_report(ns={}):
"""
管理员查看当前机构下所有客户的模型使用汇总。
参数:
userid 可选,默认当前登录用户(须为机构管理员)
start_time 开始时间
end_time 结束时间
range 快捷范围hour / day / week未传 start/end 时生效,默认 day
customerid 可选,按客户机构 id 筛选
model 可选,按模型标识筛选
group_by 聚合粒度hour / day / week不传则按客户+模型汇总
current_page 页码,默认 1
page_size 每页条数,默认 20
返回字段:
customerid, customer_name, userid, username, model,
prompt_tokens, completion_tokens, total_tokens, amount, usage_time
"""
if ns.get('userid'):
userid = ns.get('userid')
else:
userid = await get_user()
if not userid:
server_error(401)
start_dt, end_dt, err = _resolve_time_range(ns)
if err:
return {'status': False, 'msg': err}
customerid = ns.get('customerid')
model_filter = ns.get('model')
group_by = (ns.get('group_by') or '').lower() or None
if group_by and group_by not in ('hour', 'day', 'week'):
return {'status': False, 'msg': 'group_by 仅支持 hour / day / week'}
page_size = int(ns.get('page_size', 20))
current_page = int(ns.get('current_page', 1))
offset = (current_page - 1) * page_size
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
user_rows = await sor.R('users', {'id': userid, 'del_flg': '0'})
if not user_rows:
return {'status': False, 'msg': '用户不存在'}
orgid = user_rows[0].get('orgid')
user_role = await get_user_role({'userid': userid, 'sor': sor})
if user_role not in ('管理员', '运营', '运营管理员'):
return {'status': False, 'msg': '无权限,仅机构管理员可查看'}
org_map, user_map = await _fetch_customer_users(sor, orgid, customerid)
user_ids = list(user_map.keys())
if not user_ids:
empty_data = {
'orgid': orgid,
'start_time': _format_datetime(start_dt),
'end_time': _format_datetime(end_dt),
'summary': _summarize([]),
'total_count': 0,
'current_page': current_page,
'page_size': page_size,
'items': [],
}
if group_by:
empty_data = {
'orgid': orgid,
'start_time': _format_datetime(start_dt),
'end_time': _format_datetime(end_dt),
'summary': _summarize([]),
'group_by': group_by,
'groups': [],
}
return {'status': True, 'msg': '查询成功', 'data': empty_data}
conditions = _build_usage_where(
userids=user_ids,
start_dt=start_dt,
end_dt=end_dt,
model_filter=model_filter,
)
all_rows = await _query_model_usage_rows(sor, conditions)
all_items = await _enrich_usage_rows(sor, all_rows)
if group_by:
enriched_items = []
for item in all_items:
user = user_map.get(item.get('userid'), {})
org = org_map.get(user.get('orgid'), {})
enriched = dict(item)
enriched['customerid'] = org.get('id')
enriched['customer_name'] = org.get('orgname')
enriched['username'] = user.get('username')
enriched['user_name'] = user.get('name')
enriched_items.append(enriched)
grouped = _aggregate_items(enriched_items, group_by)
return {
'status': True,
'msg': '查询成功',
'data': {
'orgid': orgid,
'start_time': _format_datetime(start_dt),
'end_time': _format_datetime(end_dt),
'summary': _summarize(all_items),
'group_by': group_by,
'groups': grouped,
},
}
summary_items = _aggregate_admin_summary(all_items, user_map, org_map)
total_count = len(summary_items)
page_items = summary_items[offset:offset + page_size]
return {
'status': True,
'msg': '查询成功',
'data': {
'orgid': orgid,
'start_time': _format_datetime(start_dt),
'end_time': _format_datetime(end_dt),
'summary': _summarize(all_items),
'total_count': total_count,
'current_page': current_page,
'page_size': page_size,
'items': page_items,
},
}
except Exception as e:
return {'status': False, 'msg': '查询失败, %s' % str(e)}
ret = await model_usage_admin_report(params_kw)
return ret

View File

@ -141,7 +141,7 @@ async def logintype(ns):
domain_name = ns.get('domain_name')
# if domain_name in ['www.opencomputing.cn', 'dev.opencomputing.cn', 'localhost:9527', 'www.ncmatch.cn'] and ns.get('username') not in ['开元云(北京)科技有限公司', 'admin', 'kyy_root', 'kyy_kaiyuan', 'kyacloud', 'kyy_运营', 'kyy_销售', 'kyy_财务', '测试用户', 'kycloud']:
if domain_name in ['www.opencomputing.cn', 'dev.opencomputing.cn', 'localhost:9527', 'www.ncmatch.cn'] and not ns.get('username'):
if domain_name in ['www.opencomputing.cn', 'dev.opencomputing.cn', 'localhost:9527', 'www.ncmatch.cn'] and not ns.get('username') and not ns.get('codeid'):
# 登录失败次数限制
login_allowed = await check_login_allowed(ns.get('username'))