kboss/b/cntoai/model_usage_user_report.dspy
2026-06-08 17:32:45 +08:00

512 lines
19 KiB
Plaintext
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

def _escape(value):
if value is None:
return None
return str(value).replace("'", "''")
def _parse_usage_content(raw):
if not raw:
return {}
if isinstance(raw, dict):
return raw
try:
return json.loads(raw)
except (TypeError, ValueError):
return {}
def _resolve_time_range(ns):
"""
解析时间范围。
优先使用 start_time + end_time否则按 range 快捷窗口:
hour -> 最近 1 小时
day -> 最近 1 天
week -> 最近 7 天
默认 day。
"""
start_time = ns.get('start_time')
end_time = ns.get('end_time')
range_type = (ns.get('range') or ns.get('range_type') or 'day').lower()
now = datetime.datetime.now()
if start_time and end_time:
start_dt = _parse_datetime(start_time)
end_dt = _parse_datetime(end_time, end_of_day=True)
if not start_dt or not end_dt:
return None, None, 'start_time / end_time 格式无效,请使用 YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS'
if start_dt > end_dt:
return None, None, 'start_time 不能晚于 end_time'
return start_dt, end_dt, None
delta_map = {
'hour': datetime.timedelta(hours=1),
'day': datetime.timedelta(days=1),
'week': datetime.timedelta(weeks=1),
}
if range_type not in delta_map:
return None, None, 'range 仅支持 hour / day / week'
end_dt = now
start_dt = now - delta_map[range_type]
return start_dt, end_dt, None
def _parse_datetime(value, end_of_day=False):
if not value:
return None
text = str(value).strip()
for fmt in ('%Y-%m-%d %H:%M:%S', '%Y-%m-%d'):
try:
dt = datetime.datetime.strptime(text, fmt)
if fmt == '%Y-%m-%d' and end_of_day:
dt = dt.replace(hour=23, minute=59, second=59)
return dt
except ValueError:
continue
return None
def _format_datetime(dt):
return dt.strftime('%Y-%m-%d %H:%M:%S')
def _group_key(dt, group_by):
if group_by == 'hour':
return dt.strftime('%Y-%m-%d %H:00:00')
if group_by == 'week':
monday = dt - datetime.timedelta(days=dt.weekday())
return monday.strftime('%Y-%m-%d')
return dt.strftime('%Y-%m-%d')
def _normalize_usage_row(row, bill_amount_map=None):
usage = _parse_usage_content(row.get('usage_content'))
orderid = row.get('orderid')
amount = float(row.get('original_price') or 0)
if bill_amount_map and orderid and orderid in bill_amount_map:
amount = float(bill_amount_map[orderid] or amount)
return {
'id': row.get('id'),
'userid': row.get('userid'),
'llmid': row.get('llmid'),
'model': usage.get('model') or row.get('model_name') or row.get('llm_model'),
'prompt_tokens': int(usage.get('prompt_tokens') or 0),
'completion_tokens': int(usage.get('completion_tokens') or 0),
'total_tokens': int(usage.get('total_tokens') or 0),
'amount': round(amount, 2),
'bill_status': row.get('bill_status'),
'orderid': orderid,
'usage_time': row.get('created_at'),
}
async def _fetch_bill_amount_map(sor, order_ids):
if not order_ids:
return {}
result = {}
for orderid in set(order_ids):
if not orderid:
continue
rows = await sor.R('bill', {'orderid': orderid, 'del_flg': '0'})
if rows:
result[orderid] = sum(float(r.get('amount') or 0) for r in rows)
return result
async def _fetch_llm_model_map(sor, llm_ids):
result = {}
for llmid in set(llm_ids):
if not llmid:
continue
rows = await sor.R('llm', {'id': llmid})
if rows:
result[llmid] = rows[0].get('model')
return result
def _build_usage_where(userid=None, userids=None, start_dt=None, end_dt=None, model_filter=None):
"""构建 model_usage 单表查询条件(不使用 JOIN兼容 sqlExe 环境)。"""
conditions = []
if userid:
conditions.append("userid = '%s'" % _escape(userid))
elif userids:
escaped = ["'%s'" % _escape(uid) for uid in userids if uid]
if not escaped:
conditions.append('1 = 0')
else:
conditions.append('userid IN (%s)' % ','.join(escaped))
if start_dt and end_dt:
conditions.append(
"created_at BETWEEN '%s' AND '%s'"
% (_format_datetime(start_dt), _format_datetime(end_dt))
)
if model_filter:
conditions.append(
"JSON_UNQUOTE(JSON_EXTRACT(usage_content, '$.model')) = '%s'"
% _escape(model_filter)
)
return conditions
async def _query_model_usage_rows(sor, conditions, limit=None, offset=None):
where_clause = ' AND '.join(conditions) if conditions else '1 = 1'
sql = """
SELECT id, userid, llmid, original_price, orderid, bill_status, usage_content, created_at
FROM model_usage
WHERE %s AND bill_status != '0'
ORDER BY created_at DESC
""" % where_clause
if limit is not None:
sql += ' LIMIT %d OFFSET %d' % (int(limit), int(offset or 0))
return await sor.sqlExe(sql, {})
async def _count_model_usage(sor, conditions):
where_clause = ' AND '.join(conditions) if conditions else '1 = 1'
sql = 'SELECT COUNT(*) AS total_count FROM model_usage WHERE %s AND bill_status != "0"' % where_clause
return (await sor.sqlExe(sql, {}))[0]['total_count']
async def _enrich_usage_rows(sor, rows):
order_ids = [row.get('orderid') for row in rows if row.get('orderid')]
llm_ids = [row.get('llmid') for row in rows if row.get('llmid')]
bill_amount_map = await _fetch_bill_amount_map(sor, order_ids)
llm_model_map = await _fetch_llm_model_map(sor, llm_ids)
items = []
for row in rows:
enriched = dict(row)
enriched['llm_model'] = llm_model_map.get(row.get('llmid'))
items.append(_normalize_usage_row(enriched, bill_amount_map))
return items
async def _fetch_customer_users(sor, orgid, customerid=None):
"""获取机构下客户及其用户映射。"""
org_rows = await sor.R('organization', {'parentid': orgid, 'del_flg': '0'})
if customerid:
org_rows = [row for row in org_rows if row.get('id') == customerid]
org_map = {row['id']: row for row in org_rows}
user_map = {}
for oid in org_map:
user_rows = await sor.R('users', {'orgid': oid, 'del_flg': '0'})
for user in user_rows:
user_map[user['id']] = user
return org_map, user_map
def _aggregate_admin_summary(items, user_map, org_map):
buckets = {}
for item in items:
user = user_map.get(item.get('userid'), {})
org = org_map.get(user.get('orgid'), {})
model = item.get('model') or 'unknown'
key = (org.get('id'), item.get('userid'), model)
if key not in buckets:
buckets[key] = {
'customerid': org.get('id'),
'customer_name': org.get('orgname'),
'userid': item.get('userid'),
'username': user.get('username'),
'user_name': user.get('name'),
'model': model,
'prompt_tokens': 0,
'completion_tokens': 0,
'total_tokens': 0,
'amount': 0.0,
'request_count': 0,
'first_usage_time': item.get('usage_time'),
'last_usage_time': item.get('usage_time'),
}
bucket = buckets[key]
bucket['prompt_tokens'] += item.get('prompt_tokens') or 0
bucket['completion_tokens'] += item.get('completion_tokens') or 0
bucket['total_tokens'] += item.get('total_tokens') or 0
bucket['amount'] = round(bucket['amount'] + float(item.get('amount') or 0), 4)
bucket['request_count'] += 1
usage_time = item.get('usage_time')
if usage_time:
bucket['last_usage_time'] = usage_time
if (
not bucket.get('first_usage_time')
or str(usage_time) < str(bucket['first_usage_time'])
):
bucket['first_usage_time'] = usage_time
return sorted(buckets.values(), key=lambda x: x['amount'], reverse=True)
def _aggregate_items(items, group_by=None):
if not group_by:
return items
buckets = {}
for item in items:
usage_time = item.get('usage_time')
if isinstance(usage_time, str):
dt = _parse_datetime(usage_time)
else:
dt = usage_time
if not dt:
key = 'unknown'
else:
key = _group_key(dt, group_by)
if key not in buckets:
buckets[key] = {
'period': key,
'model': item.get('model'),
'prompt_tokens': 0,
'completion_tokens': 0,
'total_tokens': 0,
'amount': 0.0,
'request_count': 0,
}
bucket = buckets[key]
bucket['prompt_tokens'] += item.get('prompt_tokens') or 0
bucket['completion_tokens'] += item.get('completion_tokens') or 0
bucket['total_tokens'] += item.get('total_tokens') or 0
bucket['amount'] = round(bucket['amount'] + float(item.get('amount') or 0), 4)
bucket['request_count'] += 1
return sorted(buckets.values(), key=lambda x: x['period'], reverse=True)
def _summarize(items):
return {
'request_count': len(items),
'prompt_tokens': sum(i.get('prompt_tokens') or 0 for i in items),
'completion_tokens': sum(i.get('completion_tokens') or 0 for i in items),
'total_tokens': sum(i.get('total_tokens') or 0 for i in items),
'amount': round(sum(float(i.get('amount') or 0) for i in items), 4),
}
async def model_usage_user_report(ns={}):
"""
用户查看自己的模型使用记录。
参数:
userid 可选,默认当前登录用户
start_time 开始时间,格式 YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS
end_time 结束时间
range 快捷范围hour / day / week未传 start/end 时生效,默认 day
model 按模型标识筛选usage_content.model
group_by 聚合粒度hour / day / week不传则返回明细
current_page 页码,默认 1
page_size 每页条数,默认 20
返回字段:
model, prompt_tokens, completion_tokens, total_tokens, amount, usage_time
"""
if ns.get('userid'):
userid = ns.get('userid')
else:
userid = await get_user()
if not userid:
server_error(401)
start_dt, end_dt, err = _resolve_time_range(ns)
if err:
return {'status': False, 'msg': err}
model_filter = ns.get('model')
group_by = (ns.get('group_by') or '').lower() or None
if group_by and group_by not in ('hour', 'day', 'week'):
return {'status': False, 'msg': 'group_by 仅支持 hour / day / week'}
page_size = int(ns.get('page_size')) if ns.get('page_size') else 20
current_page = int(ns.get('current_page')) if ns.get('current_page') else 1
offset = (current_page - 1) * page_size
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
user_rows = await sor.R('users', {'id': userid, 'del_flg': '0'})
if not user_rows:
return {'status': False, 'msg': '用户不存在'}
conditions = _build_usage_where(
userid=userid,
start_dt=start_dt,
end_dt=end_dt,
model_filter=model_filter,
)
total_count = await _count_model_usage(sor, conditions)
if group_by:
all_rows = await _query_model_usage_rows(sor, conditions)
all_items = await _enrich_usage_rows(sor, all_rows)
grouped = _aggregate_items(all_items, group_by)
return {
'status': True,
'msg': '查询成功',
'data': {
'userid': userid,
'start_time': _format_datetime(start_dt),
'end_time': _format_datetime(end_dt),
'summary': _summarize(all_items),
'group_by': group_by,
'groups': grouped,
},
}
all_rows = await _query_model_usage_rows(sor, conditions)
all_items = await _enrich_usage_rows(sor, all_rows)
page_rows = await _query_model_usage_rows(
sor, conditions, limit=page_size, offset=offset,
)
items = await _enrich_usage_rows(sor, page_rows)
return {
'status': True,
'msg': '查询成功',
'data': {
'userid': userid,
'start_time': _format_datetime(start_dt),
'end_time': _format_datetime(end_dt),
'summary': _summarize(all_items),
'total_count': total_count,
'current_page': current_page,
'page_size': page_size,
'items': items,
},
}
except Exception as e:
return {'status': False, 'msg': '查询失败, %s' % str(e)}
async def model_usage_admin_report(ns={}):
"""
管理员查看当前机构下所有客户的模型使用汇总。
参数:
userid 可选,默认当前登录用户(须为机构管理员)
start_time 开始时间
end_time 结束时间
range 快捷范围hour / day / week未传 start/end 时生效,默认 day
customerid 可选,按客户机构 id 筛选
model 可选,按模型标识筛选
group_by 聚合粒度hour / day / week不传则按客户+模型汇总
current_page 页码,默认 1
page_size 每页条数,默认 20
返回字段:
customerid, customer_name, userid, username, model,
prompt_tokens, completion_tokens, total_tokens, amount, usage_time
"""
if ns.get('userid'):
userid = ns.get('userid')
else:
userid = await get_user()
if not userid:
server_error(401)
start_dt, end_dt, err = _resolve_time_range(ns)
if err:
return {'status': False, 'msg': err}
customerid = ns.get('customerid')
model_filter = ns.get('model')
group_by = (ns.get('group_by') or '').lower() or None
if group_by and group_by not in ('hour', 'day', 'week'):
return {'status': False, 'msg': 'group_by 仅支持 hour / day / week'}
page_size = int(ns.get('page_size', 20))
current_page = int(ns.get('current_page', 1))
offset = (current_page - 1) * page_size
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
user_rows = await sor.R('users', {'id': userid, 'del_flg': '0'})
if not user_rows:
return {'status': False, 'msg': '用户不存在'}
orgid = user_rows[0].get('orgid')
user_role = await get_user_role({'userid': userid, 'sor': sor})
if user_role not in ('管理员', '运营', '运营管理员'):
return {'status': False, 'msg': '无权限,仅机构管理员可查看'}
org_map, user_map = await _fetch_customer_users(sor, orgid, customerid)
user_ids = list(user_map.keys())
if not user_ids:
empty_data = {
'orgid': orgid,
'start_time': _format_datetime(start_dt),
'end_time': _format_datetime(end_dt),
'summary': _summarize([]),
'total_count': 0,
'current_page': current_page,
'page_size': page_size,
'items': [],
}
if group_by:
empty_data = {
'orgid': orgid,
'start_time': _format_datetime(start_dt),
'end_time': _format_datetime(end_dt),
'summary': _summarize([]),
'group_by': group_by,
'groups': [],
}
return {'status': True, 'msg': '查询成功', 'data': empty_data}
conditions = _build_usage_where(
userids=user_ids,
start_dt=start_dt,
end_dt=end_dt,
model_filter=model_filter,
)
all_rows = await _query_model_usage_rows(sor, conditions)
all_items = await _enrich_usage_rows(sor, all_rows)
if group_by:
enriched_items = []
for item in all_items:
user = user_map.get(item.get('userid'), {})
org = org_map.get(user.get('orgid'), {})
enriched = dict(item)
enriched['customerid'] = org.get('id')
enriched['customer_name'] = org.get('orgname')
enriched['username'] = user.get('username')
enriched['user_name'] = user.get('name')
enriched_items.append(enriched)
grouped = _aggregate_items(enriched_items, group_by)
return {
'status': True,
'msg': '查询成功',
'data': {
'orgid': orgid,
'start_time': _format_datetime(start_dt),
'end_time': _format_datetime(end_dt),
'summary': _summarize(all_items),
'group_by': group_by,
'groups': grouped,
},
}
summary_items = _aggregate_admin_summary(all_items, user_map, org_map)
total_count = len(summary_items)
page_items = summary_items[offset:offset + page_size]
return {
'status': True,
'msg': '查询成功',
'data': {
'orgid': orgid,
'start_time': _format_datetime(start_dt),
'end_time': _format_datetime(end_dt),
'summary': _summarize(all_items),
'total_count': total_count,
'current_page': current_page,
'page_size': page_size,
'items': page_items,
},
}
except Exception as e:
return {'status': False, 'msg': '查询失败, %s' % str(e)}
ret = await model_usage_user_report(params_kw)
return ret