async def get_user_role(ns={}): sor = ns['sor'] ns['del_flg'] = '0' res_role = await sor.R('userrole', ns) if res_role: user_role = res_role[0] else: return { 'status': False, 'msg': 'userrole table, user id can not find...', } roleid = user_role.get('roleid') role_name = await sor.R('role', {'id': roleid}) if role_name: role = role_name[0].get('role') else: return { 'status': False, 'msg': 'role table, can not get role name', } return role def _escape(value): if value is None: return None return str(value).replace("'", "''") def _parse_usage_content(raw): if not raw: return {} if isinstance(raw, dict): return raw try: return json.loads(raw) except (TypeError, ValueError): return {} def _resolve_time_range(ns): """ 解析时间范围。 优先使用 start_time + end_time;否则按 range 快捷窗口: hour -> 最近 1 小时 day -> 最近 1 天 week -> 最近 7 天 默认 day。 """ start_time = ns.get('start_time') end_time = ns.get('end_time') range_type = (ns.get('range') or ns.get('range_type') or 'day').lower() now = datetime.datetime.now() if start_time and end_time: start_dt = _parse_datetime(start_time) end_dt = _parse_datetime(end_time, end_of_day=True) if not start_dt or not end_dt: return None, None, 'start_time / end_time 格式无效,请使用 YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS' if start_dt > end_dt: return None, None, 'start_time 不能晚于 end_time' return start_dt, end_dt, None delta_map = { 'hour': datetime.timedelta(hours=1), 'day': datetime.timedelta(days=1), 'week': datetime.timedelta(weeks=1), } if range_type not in delta_map: return None, None, 'range 仅支持 hour / day / week' end_dt = now start_dt = now - delta_map[range_type] return start_dt, end_dt, None def _parse_datetime(value, end_of_day=False): if not value: return None text = str(value).strip() for fmt in ('%Y-%m-%d %H:%M:%S', '%Y-%m-%d'): try: dt = datetime.datetime.strptime(text, fmt) if fmt == '%Y-%m-%d' and end_of_day: dt = dt.replace(hour=23, minute=59, second=59) return dt except ValueError: continue return None def _format_datetime(dt): return dt.strftime('%Y-%m-%d %H:%M:%S') def _group_key(dt, group_by): if group_by == 'hour': return dt.strftime('%Y-%m-%d %H:00:00') if group_by == 'week': monday = dt - datetime.timedelta(days=dt.weekday()) return monday.strftime('%Y-%m-%d') return dt.strftime('%Y-%m-%d') def _round_amount(value): return round(float(value or 0), 8) def _format_amount(value): return '%.8f' % _round_amount(value) def _format_amount_item(item): formatted = dict(item) formatted['amount'] = _format_amount(formatted.get('amount')) return formatted def _format_amount_items(items): return [_format_amount_item(item) for item in items] def _format_amount_summary(summary): formatted = dict(summary) formatted['amount'] = _format_amount(formatted.get('amount')) return formatted def _usage_time_sort_key(item): return str(item.get('last_usage_time') or '') def _normalize_usage_row(row, bill_amount_map=None): usage = _parse_usage_content(row.get('usage_content')) orderid = row.get('orderid') amount = float(row.get('original_price') or 0) if bill_amount_map and orderid and orderid in bill_amount_map: amount = float(bill_amount_map[orderid] or amount) return { 'id': row.get('id'), 'userid': row.get('userid'), 'llmid': row.get('llmid'), 'model': usage.get('model') or row.get('model_name') or row.get('llm_model'), 'prompt_tokens': int(usage.get('prompt_tokens') or 0), 'completion_tokens': int(usage.get('completion_tokens') or 0), 'total_tokens': int(usage.get('total_tokens') or 0), 'amount': _round_amount(amount), 'bill_status': row.get('bill_status'), 'orderid': orderid, 'usage_time': row.get('created_at'), } async def _fetch_bill_amount_map(sor, order_ids): if not order_ids: return {} result = {} for orderid in set(order_ids): if not orderid: continue rows = await sor.R('bill', {'orderid': orderid, 'del_flg': '0'}) if rows: result[orderid] = sum(float(r.get('amount') or 0) for r in rows) return result async def _fetch_llm_model_map(sor, llm_ids): result = {} for llmid in set(llm_ids): if not llmid: continue rows = await sor.R('llm', {'id': llmid}) if rows: result[llmid] = rows[0].get('model') return result def _build_usage_where(userid=None, userids=None, start_dt=None, end_dt=None, model_filter=None): """构建 model_usage 单表查询条件(不使用 JOIN,兼容 sqlExe 环境)。""" conditions = [] if userid: conditions.append("userid = '%s'" % _escape(userid)) elif userids: escaped = ["'%s'" % _escape(uid) for uid in userids if uid] if not escaped: conditions.append('1 = 0') else: conditions.append('userid IN (%s)' % ','.join(escaped)) if start_dt and end_dt: conditions.append( "created_at BETWEEN '%s' AND '%s'" % (_format_datetime(start_dt), _format_datetime(end_dt)) ) if model_filter: conditions.append( "JSON_UNQUOTE(JSON_EXTRACT(usage_content, '$.model')) = '%s'" % _escape(model_filter) ) return conditions async def _query_model_usage_rows(sor, conditions, limit=None, offset=None): where_clause = ' AND '.join(conditions) if conditions else '1 = 1' sql = """ SELECT id, userid, llmid, original_price, orderid, bill_status, usage_content, created_at FROM model_usage WHERE %s AND bill_status != '0' ORDER BY created_at DESC """ % where_clause if limit is not None: sql += ' LIMIT %d OFFSET %d' % (int(limit), int(offset or 0)) return await sor.sqlExe(sql, {}) async def _count_model_usage(sor, conditions): where_clause = ' AND '.join(conditions) if conditions else '1 = 1' sql = 'SELECT COUNT(*) AS total_count FROM model_usage WHERE %s AND bill_status != "0"' % where_clause return (await sor.sqlExe(sql, {}))[0]['total_count'] async def _enrich_usage_rows(sor, rows): order_ids = [row.get('orderid') for row in rows if row.get('orderid')] llm_ids = [row.get('llmid') for row in rows if row.get('llmid')] bill_amount_map = await _fetch_bill_amount_map(sor, order_ids) llm_model_map = await _fetch_llm_model_map(sor, llm_ids) items = [] for row in rows: enriched = dict(row) enriched['llm_model'] = llm_model_map.get(row.get('llmid')) items.append(_normalize_usage_row(enriched, bill_amount_map)) return items async def _resolve_scope_orgid(sor, user_orgid, user_role): """ 解析报表统计范围的机构 id。 运营/运营管理员:users.orgid 即所在机构 id。 其他管理员:users.orgid 为用户机构 id,所在机构为 organization.parentid。 """ if user_role in ('运营', '运营管理员'): return user_orgid org_rows = await sor.R('organization', {'id': user_orgid, 'del_flg': '0'}) if org_rows and org_rows[0].get('parentid'): return org_rows[0]['parentid'] return user_orgid async def _fetch_customer_users(sor, institution_orgid, customerid=None): """ 获取机构下辖用户。 普通用户:users.orgid = organization.id,organization.parentid = 机构 id。 管理/运营:users.orgid 直接等于机构 id。 邀请注册可能存在二级 organization(parentid 指向上级客户机构)。 """ inst_esc = _escape(institution_orgid) if customerid: cid_esc = _escape(customerid) if customerid == institution_orgid: user_scope = """( o.parentid = '%s' OR o.parentid IN ( SELECT id FROM organization WHERE parentid = '%s' AND del_flg = '0' ) OR u.orgid = '%s' )""" % (inst_esc, inst_esc, inst_esc) else: user_scope = "(u.orgid = '%s' OR o.parentid = '%s')" % (cid_esc, cid_esc) else: user_scope = """( o.parentid = '%s' OR o.parentid IN ( SELECT id FROM organization WHERE parentid = '%s' AND del_flg = '0' ) OR u.orgid = '%s' )""" % (inst_esc, inst_esc, inst_esc) sql = """ SELECT u.id, u.username, u.name, u.orgid, o.orgname, o.parentid AS org_parentid FROM users u INNER JOIN organization o ON u.orgid = o.id AND o.del_flg = '0' WHERE u.del_flg = '0' AND %s """ % user_scope rows = await sor.sqlExe(sql, {}) org_map = {} user_map = {} for row in rows: uid = row['id'] oid = row.get('orgid') user_map[uid] = { 'id': uid, 'username': row.get('username'), 'name': row.get('name'), 'orgid': oid, } if oid and oid not in org_map: org_map[oid] = { 'id': oid, 'orgname': row.get('orgname'), 'parentid': row.get('org_parentid'), } return org_map, user_map def _aggregate_admin_summary(items, user_map, org_map): buckets = {} for item in items: user = user_map.get(item.get('userid'), {}) org = org_map.get(user.get('orgid'), {}) model = item.get('model') or 'unknown' key = (org.get('id'), item.get('userid'), model) if key not in buckets: buckets[key] = { 'customerid': org.get('id'), 'customer_name': org.get('orgname'), 'userid': item.get('userid'), 'username': user.get('username'), 'user_name': user.get('name'), 'model': model, 'prompt_tokens': 0, 'completion_tokens': 0, 'total_tokens': 0, 'amount': 0.0, 'request_count': 0, 'first_usage_time': item.get('usage_time'), 'last_usage_time': item.get('usage_time'), } bucket = buckets[key] bucket['prompt_tokens'] += item.get('prompt_tokens') or 0 bucket['completion_tokens'] += item.get('completion_tokens') or 0 bucket['total_tokens'] += item.get('total_tokens') or 0 bucket['amount'] = _round_amount(bucket['amount'] + float(item.get('amount') or 0)) bucket['request_count'] += 1 usage_time = item.get('usage_time') if usage_time: if ( not bucket.get('last_usage_time') or str(usage_time) > str(bucket['last_usage_time']) ): bucket['last_usage_time'] = usage_time if ( not bucket.get('first_usage_time') or str(usage_time) < str(bucket['first_usage_time']) ): bucket['first_usage_time'] = usage_time return sorted(buckets.values(), key=_usage_time_sort_key, reverse=True) def _aggregate_items(items, group_by=None): if not group_by: return items buckets = {} for item in items: usage_time = item.get('usage_time') if isinstance(usage_time, str): dt = _parse_datetime(usage_time) else: dt = usage_time if not dt: key = 'unknown' else: key = _group_key(dt, group_by) if key not in buckets: buckets[key] = { 'period': key, 'model': item.get('model'), 'prompt_tokens': 0, 'completion_tokens': 0, 'total_tokens': 0, 'amount': 0.0, 'request_count': 0, } bucket = buckets[key] bucket['prompt_tokens'] += item.get('prompt_tokens') or 0 bucket['completion_tokens'] += item.get('completion_tokens') or 0 bucket['total_tokens'] += item.get('total_tokens') or 0 bucket['amount'] = _round_amount(bucket['amount'] + float(item.get('amount') or 0)) bucket['request_count'] += 1 return sorted(buckets.values(), key=lambda x: x['period'], reverse=True) def _summarize(items): return { 'request_count': len(items), 'prompt_tokens': sum(i.get('prompt_tokens') or 0 for i in items), 'completion_tokens': sum(i.get('completion_tokens') or 0 for i in items), 'total_tokens': sum(i.get('total_tokens') or 0 for i in items), 'amount': _round_amount(sum(float(i.get('amount') or 0) for i in items)), } async def model_usage_user_report(ns={}): """ 用户查看自己的模型使用记录。 参数: userid 可选,默认当前登录用户 start_time 开始时间,格式 YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS end_time 结束时间 range 快捷范围:hour / day / week(未传 start/end 时生效,默认 day) model 按模型标识筛选(usage_content.model) group_by 聚合粒度:hour / day / week,不传则返回明细 current_page 页码,默认 1 page_size 每页条数,默认 20 返回字段: model, prompt_tokens, completion_tokens, total_tokens, amount, usage_time """ if ns.get('userid'): userid = ns.get('userid') else: userid = await get_user() if not userid: server_error(401) start_dt, end_dt, err = _resolve_time_range(ns) if err: return {'status': False, 'msg': err} model_filter = ns.get('model') group_by = (ns.get('group_by') or '').lower() or None if group_by and group_by not in ('hour', 'day', 'week'): return {'status': False, 'msg': 'group_by 仅支持 hour / day / week'} page_size = int(ns.get('page_size', 20)) current_page = int(ns.get('current_page', 1)) offset = (current_page - 1) * page_size db = DBPools() async with db.sqlorContext('kboss') as sor: try: user_rows = await sor.R('users', {'id': userid, 'del_flg': '0'}) if not user_rows: return {'status': False, 'msg': '用户不存在'} conditions = _build_usage_where( userid=userid, start_dt=start_dt, end_dt=end_dt, model_filter=model_filter, ) total_count = await _count_model_usage(sor, conditions) if group_by: all_rows = await _query_model_usage_rows(sor, conditions) all_items = await _enrich_usage_rows(sor, all_rows) grouped = _aggregate_items(all_items, group_by) return { 'status': True, 'msg': '查询成功', 'data': { 'userid': userid, 'start_time': _format_datetime(start_dt), 'end_time': _format_datetime(end_dt), 'summary': _format_amount_summary(_summarize(all_items)), 'group_by': group_by, 'groups': _format_amount_items(grouped), }, } all_rows = await _query_model_usage_rows(sor, conditions) all_items = await _enrich_usage_rows(sor, all_rows) page_rows = await _query_model_usage_rows( sor, conditions, limit=page_size, offset=offset, ) items = await _enrich_usage_rows(sor, page_rows) return { 'status': True, 'msg': '查询成功', 'data': { 'userid': userid, 'start_time': _format_datetime(start_dt), 'end_time': _format_datetime(end_dt), 'summary': _format_amount_summary(_summarize(all_items)), 'total_count': total_count, 'current_page': current_page, 'page_size': page_size, 'items': _format_amount_items(items), }, } except Exception as e: return {'status': False, 'msg': '查询失败, %s' % str(e)} async def model_usage_admin_report(ns={}): """ 管理员查看当前机构下所有客户的模型使用汇总。 参数: userid 可选,默认当前登录用户(须为机构管理员) start_time 开始时间 end_time 结束时间 range 快捷范围:hour / day / week(未传 start/end 时生效,默认 day) customerid 可选,按客户机构 id 筛选 model 可选,按模型标识筛选 group_by 聚合粒度:hour / day / week;不传则按客户+模型汇总 current_page 页码,默认 1 page_size 每页条数,默认 20 返回字段: customerid, customer_name, userid, username, model, prompt_tokens, completion_tokens, total_tokens, amount, usage_time """ if ns.get('userid'): userid = ns.get('userid') else: userid = await get_user() if not userid: server_error(401) start_dt, end_dt, err = _resolve_time_range(ns) if err: return {'status': False, 'msg': err} customerid = ns.get('customerid') model_filter = ns.get('model') group_by = (ns.get('group_by') or '').lower() or None if group_by and group_by not in ('hour', 'day', 'week'): return {'status': False, 'msg': 'group_by 仅支持 hour / day / week'} page_size = int(ns.get('page_size')) if ns.get('page_size') else 20 current_page = int(ns.get('current_page')) if ns.get('current_page') else 1 offset = (current_page - 1) * page_size db = DBPools() async with db.sqlorContext('kboss') as sor: try: user_rows = await sor.R('users', {'id': userid, 'del_flg': '0'}) if not user_rows: return {'status': False, 'msg': '用户不存在'} user_orgid = user_rows[0].get('orgid') user_role = await get_user_role({'userid': userid, 'sor': sor}) if user_role not in ('管理员', '运营', '运营管理员'): return {'status': False, 'msg': '无权限,仅机构管理员可查看'} orgid = await _resolve_scope_orgid(sor, user_orgid, user_role) org_map, user_map = await _fetch_customer_users(sor, orgid, customerid) user_ids = list(user_map.keys()) if not user_ids: empty_data = { 'orgid': orgid, 'start_time': _format_datetime(start_dt), 'end_time': _format_datetime(end_dt), 'summary': _format_amount_summary(_summarize([])), 'total_count': 0, 'current_page': current_page, 'page_size': page_size, 'items': [], } if group_by: empty_data = { 'orgid': orgid, 'start_time': _format_datetime(start_dt), 'end_time': _format_datetime(end_dt), 'summary': _format_amount_summary(_summarize([])), 'group_by': group_by, 'groups': [], } return {'status': True, 'msg': '查询成功', 'data': empty_data} conditions = _build_usage_where( userids=user_ids, start_dt=start_dt, end_dt=end_dt, model_filter=model_filter, ) all_rows = await _query_model_usage_rows(sor, conditions) all_items = await _enrich_usage_rows(sor, all_rows) if group_by: enriched_items = [] for item in all_items: user = user_map.get(item.get('userid'), {}) org = org_map.get(user.get('orgid'), {}) enriched = dict(item) enriched['customerid'] = org.get('id') enriched['customer_name'] = org.get('orgname') enriched['username'] = user.get('username') enriched['user_name'] = user.get('name') enriched_items.append(enriched) grouped = _aggregate_items(enriched_items, group_by) return { 'status': True, 'msg': '查询成功', 'data': { 'orgid': orgid, 'start_time': _format_datetime(start_dt), 'end_time': _format_datetime(end_dt), 'summary': _format_amount_summary(_summarize(all_items)), 'group_by': group_by, 'groups': _format_amount_items(grouped), }, } summary_items = _aggregate_admin_summary(all_items, user_map, org_map) total_count = len(summary_items) page_items = summary_items[offset:offset + page_size] return { 'status': True, 'msg': '查询成功', 'data': { 'orgid': orgid, 'start_time': _format_datetime(start_dt), 'end_time': _format_datetime(end_dt), 'summary': _format_amount_summary(_summarize(all_items)), 'total_count': total_count, 'current_page': current_page, 'page_size': page_size, 'items': _format_amount_items(page_items), }, } except Exception as e: return {'status': False, 'msg': '查询失败, %s' % str(e)} ret = await model_usage_admin_report(params_kw) return ret