595 lines
21 KiB
Plaintext
595 lines
21 KiB
Plaintext
async def get_user_role(ns={}):
|
||
sor = ns['sor']
|
||
ns['del_flg'] = '0'
|
||
res_role = await sor.R('userrole', ns)
|
||
if res_role:
|
||
user_role = res_role[0]
|
||
else:
|
||
return {
|
||
'status': False,
|
||
'msg': 'userrole table, user id can not find...',
|
||
}
|
||
roleid = user_role.get('roleid')
|
||
role_name = await sor.R('role', {'id': roleid})
|
||
if role_name:
|
||
role = role_name[0].get('role')
|
||
else:
|
||
return {
|
||
'status': False,
|
||
'msg': 'role table, can not get role name',
|
||
}
|
||
return role
|
||
|
||
|
||
def _escape(value):
|
||
if value is None:
|
||
return None
|
||
return str(value).replace("'", "''")
|
||
|
||
|
||
def _parse_usage_content(raw):
|
||
if not raw:
|
||
return {}
|
||
if isinstance(raw, dict):
|
||
return raw
|
||
try:
|
||
return json.loads(raw)
|
||
except (TypeError, ValueError):
|
||
return {}
|
||
|
||
|
||
def _resolve_time_range(ns):
|
||
"""
|
||
解析时间范围。
|
||
|
||
优先使用 start_time + end_time;否则按 range 快捷窗口:
|
||
hour -> 最近 1 小时
|
||
day -> 最近 1 天
|
||
week -> 最近 7 天
|
||
默认 day。
|
||
"""
|
||
start_time = ns.get('start_time')
|
||
end_time = ns.get('end_time')
|
||
range_type = (ns.get('range') or ns.get('range_type') or 'day').lower()
|
||
|
||
now = datetime.datetime.now()
|
||
if start_time and end_time:
|
||
start_dt = _parse_datetime(start_time)
|
||
end_dt = _parse_datetime(end_time, end_of_day=True)
|
||
if not start_dt or not end_dt:
|
||
return None, None, 'start_time / end_time 格式无效,请使用 YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS'
|
||
if start_dt > end_dt:
|
||
return None, None, 'start_time 不能晚于 end_time'
|
||
return start_dt, end_dt, None
|
||
|
||
delta_map = {
|
||
'hour': datetime.timedelta(hours=1),
|
||
'day': datetime.timedelta(days=1),
|
||
'week': datetime.timedelta(weeks=1),
|
||
}
|
||
if range_type not in delta_map:
|
||
return None, None, 'range 仅支持 hour / day / week'
|
||
|
||
end_dt = now
|
||
start_dt = now - delta_map[range_type]
|
||
return start_dt, end_dt, None
|
||
|
||
|
||
def _parse_datetime(value, end_of_day=False):
|
||
if not value:
|
||
return None
|
||
text = str(value).strip()
|
||
for fmt in ('%Y-%m-%d %H:%M:%S', '%Y-%m-%d'):
|
||
try:
|
||
dt = datetime.datetime.strptime(text, fmt)
|
||
if fmt == '%Y-%m-%d' and end_of_day:
|
||
dt = dt.replace(hour=23, minute=59, second=59)
|
||
return dt
|
||
except ValueError:
|
||
continue
|
||
return None
|
||
|
||
|
||
def _format_datetime(dt):
|
||
return dt.strftime('%Y-%m-%d %H:%M:%S')
|
||
|
||
|
||
def _group_key(dt, group_by):
|
||
if group_by == 'hour':
|
||
return dt.strftime('%Y-%m-%d %H:00:00')
|
||
if group_by == 'week':
|
||
monday = dt - datetime.timedelta(days=dt.weekday())
|
||
return monday.strftime('%Y-%m-%d')
|
||
return dt.strftime('%Y-%m-%d')
|
||
|
||
|
||
def _normalize_usage_row(row, bill_amount_map=None):
|
||
usage = _parse_usage_content(row.get('usage_content'))
|
||
orderid = row.get('orderid')
|
||
amount = float(row.get('original_price') or 0)
|
||
if bill_amount_map and orderid and orderid in bill_amount_map:
|
||
amount = float(bill_amount_map[orderid] or amount)
|
||
|
||
return {
|
||
'id': row.get('id'),
|
||
'userid': row.get('userid'),
|
||
'llmid': row.get('llmid'),
|
||
'model': usage.get('model') or row.get('model_name') or row.get('llm_model'),
|
||
'prompt_tokens': int(usage.get('prompt_tokens') or 0),
|
||
'completion_tokens': int(usage.get('completion_tokens') or 0),
|
||
'total_tokens': int(usage.get('total_tokens') or 0),
|
||
'amount': round(amount, 8),
|
||
'bill_status': row.get('bill_status'),
|
||
'orderid': orderid,
|
||
'usage_time': row.get('created_at'),
|
||
}
|
||
|
||
|
||
async def _fetch_bill_amount_map(sor, order_ids):
|
||
if not order_ids:
|
||
return {}
|
||
result = {}
|
||
for orderid in set(order_ids):
|
||
if not orderid:
|
||
continue
|
||
rows = await sor.R('bill', {'orderid': orderid, 'del_flg': '0'})
|
||
if rows:
|
||
result[orderid] = sum(float(r.get('amount') or 0) for r in rows)
|
||
return result
|
||
|
||
|
||
async def _fetch_llm_model_map(sor, llm_ids):
|
||
result = {}
|
||
for llmid in set(llm_ids):
|
||
if not llmid:
|
||
continue
|
||
rows = await sor.R('llm', {'id': llmid})
|
||
if rows:
|
||
result[llmid] = rows[0].get('model')
|
||
return result
|
||
|
||
|
||
def _build_usage_where(userid=None, userids=None, start_dt=None, end_dt=None, model_filter=None):
|
||
"""构建 model_usage 单表查询条件(不使用 JOIN,兼容 sqlExe 环境)。"""
|
||
conditions = []
|
||
if userid:
|
||
conditions.append("userid = '%s'" % _escape(userid))
|
||
elif userids:
|
||
escaped = ["'%s'" % _escape(uid) for uid in userids if uid]
|
||
if not escaped:
|
||
conditions.append('1 = 0')
|
||
else:
|
||
conditions.append('userid IN (%s)' % ','.join(escaped))
|
||
if start_dt and end_dt:
|
||
conditions.append(
|
||
"created_at BETWEEN '%s' AND '%s'"
|
||
% (_format_datetime(start_dt), _format_datetime(end_dt))
|
||
)
|
||
if model_filter:
|
||
conditions.append(
|
||
"JSON_UNQUOTE(JSON_EXTRACT(usage_content, '$.model')) = '%s'"
|
||
% _escape(model_filter)
|
||
)
|
||
return conditions
|
||
|
||
|
||
async def _query_model_usage_rows(sor, conditions, limit=None, offset=None):
|
||
where_clause = ' AND '.join(conditions) if conditions else '1 = 1'
|
||
sql = """
|
||
SELECT id, userid, llmid, original_price, orderid, bill_status, usage_content, created_at
|
||
FROM model_usage
|
||
WHERE %s AND bill_status != '0'
|
||
ORDER BY created_at DESC
|
||
""" % where_clause
|
||
if limit is not None:
|
||
sql += ' LIMIT %d OFFSET %d' % (int(limit), int(offset or 0))
|
||
return await sor.sqlExe(sql, {})
|
||
|
||
|
||
async def _count_model_usage(sor, conditions):
|
||
where_clause = ' AND '.join(conditions) if conditions else '1 = 1'
|
||
sql = 'SELECT COUNT(*) AS total_count FROM model_usage WHERE %s AND bill_status != "0"' % where_clause
|
||
return (await sor.sqlExe(sql, {}))[0]['total_count']
|
||
|
||
|
||
async def _enrich_usage_rows(sor, rows):
|
||
order_ids = [row.get('orderid') for row in rows if row.get('orderid')]
|
||
llm_ids = [row.get('llmid') for row in rows if row.get('llmid')]
|
||
bill_amount_map = await _fetch_bill_amount_map(sor, order_ids)
|
||
llm_model_map = await _fetch_llm_model_map(sor, llm_ids)
|
||
items = []
|
||
for row in rows:
|
||
enriched = dict(row)
|
||
enriched['llm_model'] = llm_model_map.get(row.get('llmid'))
|
||
items.append(_normalize_usage_row(enriched, bill_amount_map))
|
||
return items
|
||
|
||
|
||
async def _resolve_scope_orgid(sor, user_orgid, user_role):
|
||
"""
|
||
解析报表统计范围的机构 id。
|
||
运营/运营管理员:users.orgid 即所在机构 id。
|
||
其他管理员:users.orgid 为用户机构 id,所在机构为 organization.parentid。
|
||
"""
|
||
if user_role in ('运营', '运营管理员'):
|
||
return user_orgid
|
||
org_rows = await sor.R('organization', {'id': user_orgid, 'del_flg': '0'})
|
||
if org_rows and org_rows[0].get('parentid'):
|
||
return org_rows[0]['parentid']
|
||
return user_orgid
|
||
|
||
|
||
async def _fetch_customer_users(sor, institution_orgid, customerid=None):
|
||
"""
|
||
获取机构下辖用户。
|
||
普通用户:users.orgid = organization.id,organization.parentid = 机构 id。
|
||
管理/运营:users.orgid 直接等于机构 id。
|
||
邀请注册可能存在二级 organization(parentid 指向上级客户机构)。
|
||
"""
|
||
inst_esc = _escape(institution_orgid)
|
||
if customerid:
|
||
cid_esc = _escape(customerid)
|
||
if customerid == institution_orgid:
|
||
user_scope = """(
|
||
o.parentid = '%s'
|
||
OR o.parentid IN (
|
||
SELECT id FROM organization WHERE parentid = '%s' AND del_flg = '0'
|
||
)
|
||
OR u.orgid = '%s'
|
||
)""" % (inst_esc, inst_esc, inst_esc)
|
||
else:
|
||
user_scope = "(u.orgid = '%s' OR o.parentid = '%s')" % (cid_esc, cid_esc)
|
||
else:
|
||
user_scope = """(
|
||
o.parentid = '%s'
|
||
OR o.parentid IN (
|
||
SELECT id FROM organization WHERE parentid = '%s' AND del_flg = '0'
|
||
)
|
||
OR u.orgid = '%s'
|
||
)""" % (inst_esc, inst_esc, inst_esc)
|
||
|
||
sql = """
|
||
SELECT u.id, u.username, u.name, u.orgid, o.orgname, o.parentid AS org_parentid
|
||
FROM users u
|
||
INNER JOIN organization o ON u.orgid = o.id AND o.del_flg = '0'
|
||
WHERE u.del_flg = '0' AND %s
|
||
""" % user_scope
|
||
rows = await sor.sqlExe(sql, {})
|
||
|
||
org_map = {}
|
||
user_map = {}
|
||
for row in rows:
|
||
uid = row['id']
|
||
oid = row.get('orgid')
|
||
user_map[uid] = {
|
||
'id': uid,
|
||
'username': row.get('username'),
|
||
'name': row.get('name'),
|
||
'orgid': oid,
|
||
}
|
||
if oid and oid not in org_map:
|
||
org_map[oid] = {
|
||
'id': oid,
|
||
'orgname': row.get('orgname'),
|
||
'parentid': row.get('org_parentid'),
|
||
}
|
||
return org_map, user_map
|
||
|
||
|
||
def _aggregate_admin_summary(items, user_map, org_map):
|
||
buckets = {}
|
||
for item in items:
|
||
user = user_map.get(item.get('userid'), {})
|
||
org = org_map.get(user.get('orgid'), {})
|
||
model = item.get('model') or 'unknown'
|
||
key = (org.get('id'), item.get('userid'), model)
|
||
if key not in buckets:
|
||
buckets[key] = {
|
||
'customerid': org.get('id'),
|
||
'customer_name': org.get('orgname'),
|
||
'userid': item.get('userid'),
|
||
'username': user.get('username'),
|
||
'user_name': user.get('name'),
|
||
'model': model,
|
||
'prompt_tokens': 0,
|
||
'completion_tokens': 0,
|
||
'total_tokens': 0,
|
||
'amount': 0.0,
|
||
'request_count': 0,
|
||
'first_usage_time': item.get('usage_time'),
|
||
'last_usage_time': item.get('usage_time'),
|
||
}
|
||
bucket = buckets[key]
|
||
bucket['prompt_tokens'] += item.get('prompt_tokens') or 0
|
||
bucket['completion_tokens'] += item.get('completion_tokens') or 0
|
||
bucket['total_tokens'] += item.get('total_tokens') or 0
|
||
bucket['amount'] = round(bucket['amount'] + float(item.get('amount') or 0), 8)
|
||
bucket['request_count'] += 1
|
||
usage_time = item.get('usage_time')
|
||
if usage_time:
|
||
if (
|
||
not bucket.get('last_usage_time')
|
||
or str(usage_time) > str(bucket['last_usage_time'])
|
||
):
|
||
bucket['last_usage_time'] = usage_time
|
||
if (
|
||
not bucket.get('first_usage_time')
|
||
or str(usage_time) < str(bucket['first_usage_time'])
|
||
):
|
||
bucket['first_usage_time'] = usage_time
|
||
return sorted(buckets.values(), key=lambda x: x['amount'], reverse=True)
|
||
|
||
|
||
def _aggregate_items(items, group_by=None):
|
||
if not group_by:
|
||
return items
|
||
|
||
buckets = {}
|
||
for item in items:
|
||
usage_time = item.get('usage_time')
|
||
if isinstance(usage_time, str):
|
||
dt = _parse_datetime(usage_time)
|
||
else:
|
||
dt = usage_time
|
||
if not dt:
|
||
key = 'unknown'
|
||
else:
|
||
key = _group_key(dt, group_by)
|
||
|
||
if key not in buckets:
|
||
buckets[key] = {
|
||
'period': key,
|
||
'model': item.get('model'),
|
||
'prompt_tokens': 0,
|
||
'completion_tokens': 0,
|
||
'total_tokens': 0,
|
||
'amount': 0.0,
|
||
'request_count': 0,
|
||
}
|
||
bucket = buckets[key]
|
||
bucket['prompt_tokens'] += item.get('prompt_tokens') or 0
|
||
bucket['completion_tokens'] += item.get('completion_tokens') or 0
|
||
bucket['total_tokens'] += item.get('total_tokens') or 0
|
||
bucket['amount'] = round(bucket['amount'] + float(item.get('amount') or 0), 8)
|
||
bucket['request_count'] += 1
|
||
|
||
return sorted(buckets.values(), key=lambda x: x['period'], reverse=True)
|
||
|
||
|
||
def _summarize(items):
|
||
return {
|
||
'request_count': len(items),
|
||
'prompt_tokens': sum(i.get('prompt_tokens') or 0 for i in items),
|
||
'completion_tokens': sum(i.get('completion_tokens') or 0 for i in items),
|
||
'total_tokens': sum(i.get('total_tokens') or 0 for i in items),
|
||
'amount': round(sum(float(i.get('amount') or 0) for i in items), 8),
|
||
}
|
||
|
||
|
||
async def model_usage_user_report(ns={}):
|
||
"""
|
||
用户查看自己的模型使用记录。
|
||
|
||
参数:
|
||
userid 可选,默认当前登录用户
|
||
start_time 开始时间,格式 YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS
|
||
end_time 结束时间
|
||
range 快捷范围:hour / day / week(未传 start/end 时生效,默认 day)
|
||
model 按模型标识筛选(usage_content.model)
|
||
group_by 聚合粒度:hour / day / week,不传则返回明细
|
||
current_page 页码,默认 1
|
||
page_size 每页条数,默认 20
|
||
|
||
返回字段:
|
||
model, prompt_tokens, completion_tokens, total_tokens, amount, usage_time
|
||
"""
|
||
if ns.get('userid'):
|
||
userid = ns.get('userid')
|
||
else:
|
||
userid = await get_user()
|
||
if not userid:
|
||
server_error(401)
|
||
|
||
start_dt, end_dt, err = _resolve_time_range(ns)
|
||
if err:
|
||
return {'status': False, 'msg': err}
|
||
|
||
model_filter = ns.get('model')
|
||
group_by = (ns.get('group_by') or '').lower() or None
|
||
if group_by and group_by not in ('hour', 'day', 'week'):
|
||
return {'status': False, 'msg': 'group_by 仅支持 hour / day / week'}
|
||
|
||
page_size = int(ns.get('page_size', 20))
|
||
current_page = int(ns.get('current_page', 1))
|
||
offset = (current_page - 1) * page_size
|
||
|
||
db = DBPools()
|
||
async with db.sqlorContext('kboss') as sor:
|
||
try:
|
||
user_rows = await sor.R('users', {'id': userid, 'del_flg': '0'})
|
||
if not user_rows:
|
||
return {'status': False, 'msg': '用户不存在'}
|
||
|
||
conditions = _build_usage_where(
|
||
userid=userid,
|
||
start_dt=start_dt,
|
||
end_dt=end_dt,
|
||
model_filter=model_filter,
|
||
)
|
||
total_count = await _count_model_usage(sor, conditions)
|
||
|
||
if group_by:
|
||
all_rows = await _query_model_usage_rows(sor, conditions)
|
||
all_items = await _enrich_usage_rows(sor, all_rows)
|
||
grouped = _aggregate_items(all_items, group_by)
|
||
return {
|
||
'status': True,
|
||
'msg': '查询成功',
|
||
'data': {
|
||
'userid': userid,
|
||
'start_time': _format_datetime(start_dt),
|
||
'end_time': _format_datetime(end_dt),
|
||
'summary': _summarize(all_items),
|
||
'group_by': group_by,
|
||
'groups': grouped,
|
||
},
|
||
}
|
||
|
||
all_rows = await _query_model_usage_rows(sor, conditions)
|
||
all_items = await _enrich_usage_rows(sor, all_rows)
|
||
page_rows = await _query_model_usage_rows(
|
||
sor, conditions, limit=page_size, offset=offset,
|
||
)
|
||
items = await _enrich_usage_rows(sor, page_rows)
|
||
|
||
return {
|
||
'status': True,
|
||
'msg': '查询成功',
|
||
'data': {
|
||
'userid': userid,
|
||
'start_time': _format_datetime(start_dt),
|
||
'end_time': _format_datetime(end_dt),
|
||
'summary': _summarize(all_items),
|
||
'total_count': total_count,
|
||
'current_page': current_page,
|
||
'page_size': page_size,
|
||
'items': items,
|
||
},
|
||
}
|
||
except Exception as e:
|
||
return {'status': False, 'msg': '查询失败, %s' % str(e)}
|
||
|
||
async def model_usage_admin_report(ns={}):
|
||
"""
|
||
管理员查看当前机构下所有客户的模型使用汇总。
|
||
|
||
参数:
|
||
userid 可选,默认当前登录用户(须为机构管理员)
|
||
start_time 开始时间
|
||
end_time 结束时间
|
||
range 快捷范围:hour / day / week(未传 start/end 时生效,默认 day)
|
||
customerid 可选,按客户机构 id 筛选
|
||
model 可选,按模型标识筛选
|
||
group_by 聚合粒度:hour / day / week;不传则按客户+模型汇总
|
||
current_page 页码,默认 1
|
||
page_size 每页条数,默认 20
|
||
|
||
返回字段:
|
||
customerid, customer_name, userid, username, model,
|
||
prompt_tokens, completion_tokens, total_tokens, amount, usage_time
|
||
"""
|
||
if ns.get('userid'):
|
||
userid = ns.get('userid')
|
||
else:
|
||
userid = await get_user()
|
||
if not userid:
|
||
server_error(401)
|
||
|
||
start_dt, end_dt, err = _resolve_time_range(ns)
|
||
if err:
|
||
return {'status': False, 'msg': err}
|
||
|
||
customerid = ns.get('customerid')
|
||
model_filter = ns.get('model')
|
||
group_by = (ns.get('group_by') or '').lower() or None
|
||
if group_by and group_by not in ('hour', 'day', 'week'):
|
||
return {'status': False, 'msg': 'group_by 仅支持 hour / day / week'}
|
||
|
||
page_size = int(ns.get('page_size')) if ns.get('page_size') else 20
|
||
current_page = int(ns.get('current_page')) if ns.get('current_page') else 1
|
||
offset = (current_page - 1) * page_size
|
||
|
||
db = DBPools()
|
||
async with db.sqlorContext('kboss') as sor:
|
||
try:
|
||
user_rows = await sor.R('users', {'id': userid, 'del_flg': '0'})
|
||
if not user_rows:
|
||
return {'status': False, 'msg': '用户不存在'}
|
||
|
||
user_orgid = user_rows[0].get('orgid')
|
||
user_role = await get_user_role({'userid': userid, 'sor': sor})
|
||
if user_role not in ('管理员', '运营', '运营管理员'):
|
||
return {'status': False, 'msg': '无权限,仅机构管理员可查看'}
|
||
|
||
orgid = await _resolve_scope_orgid(sor, user_orgid, user_role)
|
||
org_map, user_map = await _fetch_customer_users(sor, orgid, customerid)
|
||
user_ids = list(user_map.keys())
|
||
if not user_ids:
|
||
empty_data = {
|
||
'orgid': orgid,
|
||
'start_time': _format_datetime(start_dt),
|
||
'end_time': _format_datetime(end_dt),
|
||
'summary': _summarize([]),
|
||
'total_count': 0,
|
||
'current_page': current_page,
|
||
'page_size': page_size,
|
||
'items': [],
|
||
}
|
||
if group_by:
|
||
empty_data = {
|
||
'orgid': orgid,
|
||
'start_time': _format_datetime(start_dt),
|
||
'end_time': _format_datetime(end_dt),
|
||
'summary': _summarize([]),
|
||
'group_by': group_by,
|
||
'groups': [],
|
||
}
|
||
return {'status': True, 'msg': '查询成功', 'data': empty_data}
|
||
|
||
conditions = _build_usage_where(
|
||
userids=user_ids,
|
||
start_dt=start_dt,
|
||
end_dt=end_dt,
|
||
model_filter=model_filter,
|
||
)
|
||
all_rows = await _query_model_usage_rows(sor, conditions)
|
||
all_items = await _enrich_usage_rows(sor, all_rows)
|
||
|
||
if group_by:
|
||
enriched_items = []
|
||
for item in all_items:
|
||
user = user_map.get(item.get('userid'), {})
|
||
org = org_map.get(user.get('orgid'), {})
|
||
enriched = dict(item)
|
||
enriched['customerid'] = org.get('id')
|
||
enriched['customer_name'] = org.get('orgname')
|
||
enriched['username'] = user.get('username')
|
||
enriched['user_name'] = user.get('name')
|
||
enriched_items.append(enriched)
|
||
grouped = _aggregate_items(enriched_items, group_by)
|
||
return {
|
||
'status': True,
|
||
'msg': '查询成功',
|
||
'data': {
|
||
'orgid': orgid,
|
||
'start_time': _format_datetime(start_dt),
|
||
'end_time': _format_datetime(end_dt),
|
||
'summary': _summarize(all_items),
|
||
'group_by': group_by,
|
||
'groups': grouped,
|
||
},
|
||
}
|
||
|
||
summary_items = _aggregate_admin_summary(all_items, user_map, org_map)
|
||
total_count = len(summary_items)
|
||
page_items = summary_items[offset:offset + page_size]
|
||
|
||
return {
|
||
'status': True,
|
||
'msg': '查询成功',
|
||
'data': {
|
||
'orgid': orgid,
|
||
'start_time': _format_datetime(start_dt),
|
||
'end_time': _format_datetime(end_dt),
|
||
'summary': _summarize(all_items),
|
||
'total_count': total_count,
|
||
'current_page': current_page,
|
||
'page_size': page_size,
|
||
'items': page_items,
|
||
},
|
||
}
|
||
except Exception as e:
|
||
return {'status': False, 'msg': '查询失败, %s' % str(e)}
|
||
|
||
ret = await model_usage_admin_report(params_kw)
|
||
return ret |