From 57c8723070ab1390a8233bced2b5bd14bacaeb50 Mon Sep 17 00:00:00 2001 From: ping <1017253325@qq.com> Date: Mon, 25 May 2026 11:48:21 +0800 Subject: [PATCH] update --- b/cntoai/model_usage_admin_report.dspy | 512 +++++++++++++++++++++++++ b/user/logintype.dspy | 2 +- 2 files changed, 513 insertions(+), 1 deletion(-) create mode 100644 b/cntoai/model_usage_admin_report.dspy diff --git a/b/cntoai/model_usage_admin_report.dspy b/b/cntoai/model_usage_admin_report.dspy new file mode 100644 index 0000000..254aa3e --- /dev/null +++ b/b/cntoai/model_usage_admin_report.dspy @@ -0,0 +1,512 @@ +def _escape(value): + if value is None: + return None + return str(value).replace("'", "''") + + +def _parse_usage_content(raw): + if not raw: + return {} + if isinstance(raw, dict): + return raw + try: + return json.loads(raw) + except (TypeError, ValueError): + return {} + + +def _resolve_time_range(ns): + """ + 解析时间范围。 + + 优先使用 start_time + end_time;否则按 range 快捷窗口: + hour -> 最近 1 小时 + day -> 最近 1 天 + week -> 最近 7 天 + 默认 day。 + """ + start_time = ns.get('start_time') + end_time = ns.get('end_time') + range_type = (ns.get('range') or ns.get('range_type') or 'day').lower() + + now = datetime.datetime.now() + if start_time and end_time: + start_dt = _parse_datetime(start_time) + end_dt = _parse_datetime(end_time, end_of_day=True) + if not start_dt or not end_dt: + return None, None, 'start_time / end_time 格式无效,请使用 YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS' + if start_dt > end_dt: + return None, None, 'start_time 不能晚于 end_time' + return start_dt, end_dt, None + + delta_map = { + 'hour': datetime.timedelta(hours=1), + 'day': datetime.timedelta(days=1), + 'week': datetime.timedelta(weeks=1), + } + if range_type not in delta_map: + return None, None, 'range 仅支持 hour / day / week' + + end_dt = now + start_dt = now - delta_map[range_type] + return start_dt, end_dt, None + + +def _parse_datetime(value, end_of_day=False): + if not value: + return None + text = str(value).strip() + for fmt in ('%Y-%m-%d %H:%M:%S', '%Y-%m-%d'): + try: + dt = datetime.datetime.strptime(text, fmt) + if fmt == '%Y-%m-%d' and end_of_day: + dt = dt.replace(hour=23, minute=59, second=59) + return dt + except ValueError: + continue + return None + + +def _format_datetime(dt): + return dt.strftime('%Y-%m-%d %H:%M:%S') + + +def _group_key(dt, group_by): + if group_by == 'hour': + return dt.strftime('%Y-%m-%d %H:00:00') + if group_by == 'week': + monday = dt - datetime.timedelta(days=dt.weekday()) + return monday.strftime('%Y-%m-%d') + return dt.strftime('%Y-%m-%d') + + +def _normalize_usage_row(row, bill_amount_map=None): + usage = _parse_usage_content(row.get('usage_content')) + orderid = row.get('orderid') + amount = float(row.get('original_price') or 0) + if bill_amount_map and orderid and orderid in bill_amount_map: + amount = float(bill_amount_map[orderid] or amount) + + return { + 'id': row.get('id'), + 'userid': row.get('userid'), + 'llmid': row.get('llmid'), + 'model': usage.get('model') or row.get('model_name') or row.get('llm_model'), + 'prompt_tokens': int(usage.get('prompt_tokens') or 0), + 'completion_tokens': int(usage.get('completion_tokens') or 0), + 'total_tokens': int(usage.get('total_tokens') or 0), + 'amount': round(amount, 8), + 'bill_status': row.get('bill_status'), + 'orderid': orderid, + 'usage_time': row.get('created_at'), + } + + +async def _fetch_bill_amount_map(sor, order_ids): + if not order_ids: + return {} + result = {} + for orderid in set(order_ids): + if not orderid: + continue + rows = await sor.R('bill', {'orderid': orderid, 'del_flg': '0'}) + if rows: + result[orderid] = sum(float(r.get('amount') or 0) for r in rows) + return result + + +async def _fetch_llm_model_map(sor, llm_ids): + result = {} + for llmid in set(llm_ids): + if not llmid: + continue + rows = await sor.R('llm', {'id': llmid}) + if rows: + result[llmid] = rows[0].get('model') + return result + + +def _build_usage_where(userid=None, userids=None, start_dt=None, end_dt=None, model_filter=None): + """构建 model_usage 单表查询条件(不使用 JOIN,兼容 sqlExe 环境)。""" + conditions = [] + if userid: + conditions.append("userid = '%s'" % _escape(userid)) + elif userids: + escaped = ["'%s'" % _escape(uid) for uid in userids if uid] + if not escaped: + conditions.append('1 = 0') + else: + conditions.append('userid IN (%s)' % ','.join(escaped)) + if start_dt and end_dt: + conditions.append( + "created_at BETWEEN '%s' AND '%s'" + % (_format_datetime(start_dt), _format_datetime(end_dt)) + ) + if model_filter: + conditions.append( + "JSON_UNQUOTE(JSON_EXTRACT(usage_content, '$.model')) = '%s'" + % _escape(model_filter) + ) + return conditions + + +async def _query_model_usage_rows(sor, conditions, limit=None, offset=None): + where_clause = ' AND '.join(conditions) if conditions else '1 = 1' + sql = """ + SELECT id, userid, llmid, original_price, orderid, bill_status, usage_content, created_at + FROM model_usage + WHERE %s + ORDER BY created_at DESC + """ % where_clause + if limit is not None: + sql += ' LIMIT %d OFFSET %d' % (int(limit), int(offset or 0)) + return await sor.sqlExe(sql, {}) + + +async def _count_model_usage(sor, conditions): + where_clause = ' AND '.join(conditions) if conditions else '1 = 1' + sql = 'SELECT COUNT(*) AS total_count FROM model_usage WHERE %s' % where_clause + return (await sor.sqlExe(sql, {}))[0]['total_count'] + + +async def _enrich_usage_rows(sor, rows): + order_ids = [row.get('orderid') for row in rows if row.get('orderid')] + llm_ids = [row.get('llmid') for row in rows if row.get('llmid')] + bill_amount_map = await _fetch_bill_amount_map(sor, order_ids) + llm_model_map = await _fetch_llm_model_map(sor, llm_ids) + items = [] + for row in rows: + enriched = dict(row) + enriched['llm_model'] = llm_model_map.get(row.get('llmid')) + items.append(_normalize_usage_row(enriched, bill_amount_map)) + return items + + +async def _fetch_customer_users(sor, orgid, customerid=None): + """获取机构下客户及其用户映射。""" + org_rows = await sor.R('organization', {'parentid': orgid, 'del_flg': '0'}) + if customerid: + org_rows = [row for row in org_rows if row.get('id') == customerid] + org_map = {row['id']: row for row in org_rows} + + user_map = {} + for oid in org_map: + user_rows = await sor.R('users', {'orgid': oid, 'del_flg': '0'}) + for user in user_rows: + user_map[user['id']] = user + return org_map, user_map + + +def _aggregate_admin_summary(items, user_map, org_map): + buckets = {} + for item in items: + user = user_map.get(item.get('userid'), {}) + org = org_map.get(user.get('orgid'), {}) + model = item.get('model') or 'unknown' + key = (org.get('id'), item.get('userid'), model) + if key not in buckets: + buckets[key] = { + 'customerid': org.get('id'), + 'customer_name': org.get('orgname'), + 'userid': item.get('userid'), + 'username': user.get('username'), + 'user_name': user.get('name'), + 'model': model, + 'prompt_tokens': 0, + 'completion_tokens': 0, + 'total_tokens': 0, + 'amount': 0.0, + 'request_count': 0, + 'first_usage_time': item.get('usage_time'), + 'last_usage_time': item.get('usage_time'), + } + bucket = buckets[key] + bucket['prompt_tokens'] += item.get('prompt_tokens') or 0 + bucket['completion_tokens'] += item.get('completion_tokens') or 0 + bucket['total_tokens'] += item.get('total_tokens') or 0 + bucket['amount'] = round(bucket['amount'] + float(item.get('amount') or 0), 8) + bucket['request_count'] += 1 + usage_time = item.get('usage_time') + if usage_time: + bucket['last_usage_time'] = usage_time + if ( + not bucket.get('first_usage_time') + or str(usage_time) < str(bucket['first_usage_time']) + ): + bucket['first_usage_time'] = usage_time + return sorted(buckets.values(), key=lambda x: x['amount'], reverse=True) + + +def _aggregate_items(items, group_by=None): + if not group_by: + return items + + buckets = {} + for item in items: + usage_time = item.get('usage_time') + if isinstance(usage_time, str): + dt = _parse_datetime(usage_time) + else: + dt = usage_time + if not dt: + key = 'unknown' + else: + key = _group_key(dt, group_by) + + if key not in buckets: + buckets[key] = { + 'period': key, + 'model': item.get('model'), + 'prompt_tokens': 0, + 'completion_tokens': 0, + 'total_tokens': 0, + 'amount': 0.0, + 'request_count': 0, + } + bucket = buckets[key] + bucket['prompt_tokens'] += item.get('prompt_tokens') or 0 + bucket['completion_tokens'] += item.get('completion_tokens') or 0 + bucket['total_tokens'] += item.get('total_tokens') or 0 + bucket['amount'] = round(bucket['amount'] + float(item.get('amount') or 0), 8) + bucket['request_count'] += 1 + + return sorted(buckets.values(), key=lambda x: x['period'], reverse=True) + + +def _summarize(items): + return { + 'request_count': len(items), + 'prompt_tokens': sum(i.get('prompt_tokens') or 0 for i in items), + 'completion_tokens': sum(i.get('completion_tokens') or 0 for i in items), + 'total_tokens': sum(i.get('total_tokens') or 0 for i in items), + 'amount': round(sum(float(i.get('amount') or 0) for i in items), 8), + } + + +async def model_usage_user_report(ns={}): + """ + 用户查看自己的模型使用记录。 + + 参数: + userid 可选,默认当前登录用户 + start_time 开始时间,格式 YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS + end_time 结束时间 + range 快捷范围:hour / day / week(未传 start/end 时生效,默认 day) + model 按模型标识筛选(usage_content.model) + group_by 聚合粒度:hour / day / week,不传则返回明细 + current_page 页码,默认 1 + page_size 每页条数,默认 20 + + 返回字段: + model, prompt_tokens, completion_tokens, total_tokens, amount, usage_time + """ + if ns.get('userid'): + userid = ns.get('userid') + else: + userid = await get_user() + if not userid: + server_error(401) + + start_dt, end_dt, err = _resolve_time_range(ns) + if err: + return {'status': False, 'msg': err} + + model_filter = ns.get('model') + group_by = (ns.get('group_by') or '').lower() or None + if group_by and group_by not in ('hour', 'day', 'week'): + return {'status': False, 'msg': 'group_by 仅支持 hour / day / week'} + + page_size = int(ns.get('page_size', 20)) + current_page = int(ns.get('current_page', 1)) + offset = (current_page - 1) * page_size + + db = DBPools() + async with db.sqlorContext('kboss') as sor: + try: + user_rows = await sor.R('users', {'id': userid, 'del_flg': '0'}) + if not user_rows: + return {'status': False, 'msg': '用户不存在'} + + conditions = _build_usage_where( + userid=userid, + start_dt=start_dt, + end_dt=end_dt, + model_filter=model_filter, + ) + total_count = await _count_model_usage(sor, conditions) + + if group_by: + all_rows = await _query_model_usage_rows(sor, conditions) + all_items = await _enrich_usage_rows(sor, all_rows) + grouped = _aggregate_items(all_items, group_by) + return { + 'status': True, + 'msg': '查询成功', + 'data': { + 'userid': userid, + 'start_time': _format_datetime(start_dt), + 'end_time': _format_datetime(end_dt), + 'summary': _summarize(all_items), + 'group_by': group_by, + 'groups': grouped, + }, + } + + all_rows = await _query_model_usage_rows(sor, conditions) + all_items = await _enrich_usage_rows(sor, all_rows) + page_rows = await _query_model_usage_rows( + sor, conditions, limit=page_size, offset=offset, + ) + items = await _enrich_usage_rows(sor, page_rows) + + return { + 'status': True, + 'msg': '查询成功', + 'data': { + 'userid': userid, + 'start_time': _format_datetime(start_dt), + 'end_time': _format_datetime(end_dt), + 'summary': _summarize(all_items), + 'total_count': total_count, + 'current_page': current_page, + 'page_size': page_size, + 'items': items, + }, + } + except Exception as e: + return {'status': False, 'msg': '查询失败, %s' % str(e)} + + +async def model_usage_admin_report(ns={}): + """ + 管理员查看当前机构下所有客户的模型使用汇总。 + + 参数: + userid 可选,默认当前登录用户(须为机构管理员) + start_time 开始时间 + end_time 结束时间 + range 快捷范围:hour / day / week(未传 start/end 时生效,默认 day) + customerid 可选,按客户机构 id 筛选 + model 可选,按模型标识筛选 + group_by 聚合粒度:hour / day / week;不传则按客户+模型汇总 + current_page 页码,默认 1 + page_size 每页条数,默认 20 + + 返回字段: + customerid, customer_name, userid, username, model, + prompt_tokens, completion_tokens, total_tokens, amount, usage_time + """ + if ns.get('userid'): + userid = ns.get('userid') + else: + userid = await get_user() + if not userid: + server_error(401) + + start_dt, end_dt, err = _resolve_time_range(ns) + if err: + return {'status': False, 'msg': err} + + customerid = ns.get('customerid') + model_filter = ns.get('model') + group_by = (ns.get('group_by') or '').lower() or None + if group_by and group_by not in ('hour', 'day', 'week'): + return {'status': False, 'msg': 'group_by 仅支持 hour / day / week'} + + page_size = int(ns.get('page_size', 20)) + current_page = int(ns.get('current_page', 1)) + offset = (current_page - 1) * page_size + + db = DBPools() + async with db.sqlorContext('kboss') as sor: + try: + user_rows = await sor.R('users', {'id': userid, 'del_flg': '0'}) + if not user_rows: + return {'status': False, 'msg': '用户不存在'} + + orgid = user_rows[0].get('orgid') + user_role = await get_user_role({'userid': userid, 'sor': sor}) + if user_role not in ('管理员', '运营', '运营管理员'): + return {'status': False, 'msg': '无权限,仅机构管理员可查看'} + + org_map, user_map = await _fetch_customer_users(sor, orgid, customerid) + user_ids = list(user_map.keys()) + if not user_ids: + empty_data = { + 'orgid': orgid, + 'start_time': _format_datetime(start_dt), + 'end_time': _format_datetime(end_dt), + 'summary': _summarize([]), + 'total_count': 0, + 'current_page': current_page, + 'page_size': page_size, + 'items': [], + } + if group_by: + empty_data = { + 'orgid': orgid, + 'start_time': _format_datetime(start_dt), + 'end_time': _format_datetime(end_dt), + 'summary': _summarize([]), + 'group_by': group_by, + 'groups': [], + } + return {'status': True, 'msg': '查询成功', 'data': empty_data} + + conditions = _build_usage_where( + userids=user_ids, + start_dt=start_dt, + end_dt=end_dt, + model_filter=model_filter, + ) + all_rows = await _query_model_usage_rows(sor, conditions) + all_items = await _enrich_usage_rows(sor, all_rows) + + if group_by: + enriched_items = [] + for item in all_items: + user = user_map.get(item.get('userid'), {}) + org = org_map.get(user.get('orgid'), {}) + enriched = dict(item) + enriched['customerid'] = org.get('id') + enriched['customer_name'] = org.get('orgname') + enriched['username'] = user.get('username') + enriched['user_name'] = user.get('name') + enriched_items.append(enriched) + grouped = _aggregate_items(enriched_items, group_by) + return { + 'status': True, + 'msg': '查询成功', + 'data': { + 'orgid': orgid, + 'start_time': _format_datetime(start_dt), + 'end_time': _format_datetime(end_dt), + 'summary': _summarize(all_items), + 'group_by': group_by, + 'groups': grouped, + }, + } + + summary_items = _aggregate_admin_summary(all_items, user_map, org_map) + total_count = len(summary_items) + page_items = summary_items[offset:offset + page_size] + + return { + 'status': True, + 'msg': '查询成功', + 'data': { + 'orgid': orgid, + 'start_time': _format_datetime(start_dt), + 'end_time': _format_datetime(end_dt), + 'summary': _summarize(all_items), + 'total_count': total_count, + 'current_page': current_page, + 'page_size': page_size, + 'items': page_items, + }, + } + except Exception as e: + return {'status': False, 'msg': '查询失败, %s' % str(e)} + +ret = await model_usage_admin_report(params_kw) +return ret \ No newline at end of file diff --git a/b/user/logintype.dspy b/b/user/logintype.dspy index da8958d..5d71bde 100644 --- a/b/user/logintype.dspy +++ b/b/user/logintype.dspy @@ -141,7 +141,7 @@ async def logintype(ns): domain_name = ns.get('domain_name') # if domain_name in ['www.opencomputing.cn', 'dev.opencomputing.cn', 'localhost:9527', 'www.ncmatch.cn'] and ns.get('username') not in ['开元云(北京)科技有限公司', 'admin', 'kyy_root', 'kyy_kaiyuan', 'kyacloud', 'kyy_运营', 'kyy_销售', 'kyy_财务', '测试用户', 'kycloud']: - if domain_name in ['www.opencomputing.cn', 'dev.opencomputing.cn', 'localhost:9527', 'www.ncmatch.cn'] and not ns.get('username'): + if domain_name in ['www.opencomputing.cn', 'dev.opencomputing.cn', 'localhost:9527', 'www.ncmatch.cn'] and not ns.get('username') and not ns.get('codeid'): # 登录失败次数限制 login_allowed = await check_login_allowed(ns.get('username'))