async def user_get_bill(nss={}, extra_info={}): ns = { 'user_orgids': list(nss.keys()), 'current_page': extra_info.get('current_page') or 1, 'start_time': extra_info.get('start_time'), 'end_time': extra_info.get('end_time'), 'page_size': extra_info.get('page_size') } start_time = ns['start_time'] if ns.get('start_time') else '2010-10-01' end_time = ns['end_time'] if ns.get('end_time') else '2099-12-01' page_size = int(ns['page_size']) if ns.get('page_size') else 20000 offset = page_size * (int(ns['current_page']) - 1) db = DBPools() async with db.sqlorContext('kboss') as sor: try: # 根据userid获取用户orgid # user_orgid_li = await sor.R('users', {'id': ns['userid']}) # user_orgid = user_orgid_li[0]['orgid'] user_orgids = ns['user_orgids'] # 计算总量 user_tuple = tuple(user_orgids) all_user_set = ', '.join(list(map(lambda x: "'%s'", user_orgids))) bill_count_sql = """select count(*) as total from bill where customerid in (%s) and bill_date >= '%s' and bill_date <= '%s';""" % \ (all_user_set, start_time, end_time) bill_sum_sql = """select sum(amount) as bill_sum from bill where customerid in (%s) and business_op = 'BUY' and bill_date >= '%s' and bill_date <= '%s';""" % \ (all_user_set, start_time, end_time) bill_count_li = await sor.sqlExe(bill_count_sql % user_tuple, {}) bill_sum = (await sor.sqlExe(bill_sum_sql % user_tuple, {}))[0]['bill_sum'] if bill_count_li: total_num = bill_count_li[0]['total'] total_page = total_num / page_size if total_num % page_size == 0 else int(total_num / page_size) + 1 bill_sql = """select * from bill where customerid in (%s) and bill_date >= '%s' and bill_date <= '%s' order by bill_date desc limit %s OFFSET %s;""" % \ (all_user_set, start_time, end_time, page_size, offset) bill_li = await sor.sqlExe(bill_sql % user_tuple, {}) new_bill_li = [] for bill_detail in bill_li: product_info = await sor.R('product', {'id': bill_detail['productid']}) bill_detail['productname'] = product_info[0]['name'] if product_info else '' bill_detail.update(nss[bill_detail['customerid']]) if bill_detail.get('business_op') == 'RECHARGE': bill_detail['productname'] = '线下充值' bill_detail['bill_state'] = 1 else: if bill_detail.get('resource_type'): type_parts = bill_detail['resource_type'].split('-') bill_detail['type_one'] = type_parts[0] bill_detail['type_two'] = type_parts[1] if len(type_parts) > 1 else None bill_detail['type_three'] = type_parts[2] if len(type_parts) > 2 else None else: bill_detail['type_one'] = None bill_detail['type_two'] = None bill_detail['type_three'] = None new_bill_li.append(bill_detail) return { 'status': True, 'msg': '获取账单成功', 'data': { 'total_page': total_page, 'total_num': total_num, 'bill_sum': bill_sum, 'page_size': page_size, 'current_page': ns['current_page'], 'bill_list': new_bill_li } } except Exception as e: return { 'status': False, 'msg': '获取账单报错, %s' % e } async def getUserInfo(ns={}): # ns = { # # 'userid': 'user_abcd', # 'orgid': 'APPbNUfV5Im_KlZNnBRjd' # } db = DBPools() async with db.sqlorContext('kboss') as sor: try: if not ns.get('orgid'): # find orgid info from users org_info = await sor.R('users', {'id': ns.get('userid'), 'del_flg': '0'}) if org_info: orgid = org_info[0].get('orgid') else: return { 'status': False, 'msg': 'can not find orgid from users' } else: orgid = ns.get('orgid') orgs = await sor.R('organization', {'id': orgid, 'del_flg': '0'}) org = orgs[0] if orgs else {} nss = {} nss['orgid'] = orgid nss['fullName'] = org.get('orgname') or '' nss['name'] = org.get('orgname') or '' nss['identityCode'] = org.get('orgcode') or '' nss['address'] = org.get('address') or '' nss['contact'] = org.get('contactor') or '' nss['tel'] = org.get('contactor_phone') or '' nss['email'] = org.get('emailaddress') or '' # find salemanid info from customers cust = await sor.R('customer', {'customerid': orgid, 'del_flg': '0'}) customer = cust[0] if cust else {} salemanid = customer.get('salemanid') org_name = '' if salemanid: saleman = await sor.R('users', {'id': salemanid, 'del_flg': '0'}) saleman_user = saleman[0] if saleman else {} salemanorgid = saleman_user.get('orgid') if salemanorgid: org_name_li = await sor.R('organization', {'id': salemanorgid, 'del_flg': '0'}) if org_name_li: org_name = org_name_li[0].get('orgname') or '' else: saleman_user = {} nss['saleMgrName'] = saleman_user.get('username') or '' nss['saleMgrEmail'] = saleman_user.get('email') or '' nss['saleMgrTel'] = saleman_user.get('mobile') or '' # add saleman org name nss['saleorgname'] = org_name # find invoice attr from orgid invoice_attr = await sor.R('invoice_attr', {'customerid': orgid, 'del_flg': '0'}) invoice = invoice_attr[0] if invoice_attr else {} nss['bankName'] = invoice.get('bank_name') or '' nss['bankAcc'] = invoice.get('bank_account') or '' nss['identityName'] = invoice.get('invoice_title') or '' nss['taxTel'] = invoice.get('phone') or '' nss['taxpayerIdentity'] = invoice.get('tax_no') or '' nss['taxAddress'] = invoice.get('address') or '' # find zj_users from orgid zj_attr = await sor.R('zj_users', {'orgid': orgid, 'del_flg': '0'}) zj = zj_attr[0] if zj_attr else {} nss['userAc'] = zj.get('userac') or ('kboss_' + orgid) nss['userPhone'] = zj.get('userphone') or nss['tel'] nss['remark'] = zj.get('remark') or '' nss['ak'] = zj.get('ak') or '' nss['sk'] = zj.get('sk') or '' nss['thirdId'] = zj.get('thirdid') or '' nss['appId'] = zj.get('appid') or '' nss['supId'] = zj.get('supid') or '' nss['sync_status'] = zj.get('sync_status') return { 'status': True, 'msg': 'get user info success', 'data': nss } except Exception as e: raise e return { 'status': False, 'msg': 'get user info failed', 'data': '' } async def saleGetUsers(ns={}): """ get users id * :param ns: :return: """ db = DBPools() async with db.sqlorContext('kboss') as sor: # if not await get_user(): # return { # "status": False, # "msg": "user id is empty, please check", # } try: org_list = [] lizhi_list = await sor.R('transfer_record', {'change_id': ns.get('userid')}) ids = list(map(lambda x: x['customer_id'], lizhi_list)) cus_list = await sor.R('customer', {'salemanid': ns.get('userid')}) for cus in cus_list: customerid = cus.get('customerid') if ns.get('kv') == 'zj': # 判断中金users是否存在,存在就查询 exists_zj_users = await sor.R('zj_users', {'orgid': customerid}) if exists_zj_users: res_dict = await getUserInfo({'orgid': customerid}) if res_dict.get('status') and res_dict.get('data'): org_list.append(res_dict.get('data')) elif ns.get('kv') == 'jncs': # 获取济南超算id jncs_li = await sor.R('organization', {'orgname': '济南超算', 'del_flg': '0'}) jscs_id = jncs_li[0].get('id') # 获取购买过济南超算产品的用户 产品持有表 ress = await sor.R('customer_goods', {'customerid': customerid, 'providerrid': jscs_id}) if ress: org_list.append(ress) else: org_content_li = await sor.R('organization', {'id': customerid}) org_content = org_content_li[0] if org_content_li else [] if org_content: if customerid in ids: org_content["type"] = '转接客户' transfer_info = sorted(await sor.R('transfer_record', {'customer_id': customerid}), key=lambda x: x['create_at'], reverse=True) transfer_time = transfer_info[0]['create_at'] if transfer_info else None previous_sales = None for transfer in transfer_info: # print('111111137219047923042141', transfer_info) if transfer['change_id'] == ns.get('userid'): previous_sales = transfer['dimission_id'] previous_id = await sor.R('users', {'id': previous_sales}) previous_name = await sor.R('users', {'username': previous_id[0]['username']}) if previous_name: previous_sales_name = previous_name[0]['username'] else: previous_sales_name = None break org_content["transfer_time"] = transfer_time org_content["previous_sales"] = previous_sales_name org_list.append(org_content) else: org_content["type"] = '直接客户' org_content["transfer_time"] = None org_content["previous_sales"] = None org_list.append(org_content) # for org_ in org_list: # org_['bill'] = await user_get_bill({'user_orgid': org_['id'], 'current_page': 1}) # org_['salemanid'] = ns.get('salemanid') # org_['salemanname'] = ns.get('salemanname') return { "status": True, "msg": "get user success", "data": org_list } except Exception as e: return { "status": False, "msg": "get user failed", "data": str(e) } # 查看所有销售 async def operator_get_all_bill(ns={}): """ 获取所有销售人员id 名称 :param ns: :return: """ db = DBPools() async with db.sqlorContext('kboss') as sor: if ns.get('sor'): sor = ns.get('sor') users_li_sql = """select u.id, u.username, u.orgid, ur.userid, ur.roleid, r.id, r.role from users u, userrole ur, role r where u.orgid = '%s' and u.id = ur.userid and ur.roleid = r.id and r.role = '销售';""" % ns.get('orgid') users_li = await sor.sqlExe(users_li_sql, {}) saleman_all_list = [{'label': item['username'], 'value': item['userid']} for item in users_li] users = [(item['userid'], item['username']) for item in users_li] # 查询指定销售 # 忽略销售 查询所有客户 target_users = users if ns.get('salemanid'): users = [user for user in users if user[0]==ns['salemanid']] user_dic = {} customer_all_list = [] # 获取所有客户 for user in target_users: res_user = await saleGetUsers({'userid': user[0]}) for k_user in res_user['data']: customer_all_list.append({'label': k_user['orgname'], 'value': k_user['id']}) if ns.get('user_orgid'): users=target_users for user in users: res = await saleGetUsers({'userid': user[0]}) for u_user in res['data']: additional_info = { 'orgname': u_user['orgname'], 'orgcode': u_user['orgcode'], 'address': u_user['address'], 'salemanid': user[0], 'salemanname': user[1], } user_dic[u_user['id']] = additional_info target_user_dic = None # 查询指定的客户 new_user_dic = {} if ns.get('user_orgid'): new_user_dic[ns['user_orgid']] = user_dic.pop(ns['user_orgid']) # 如果没有找到对应的销售或者客户 if not user_dic: return { 'status': True, 'msg': '获取账单成功', 'saleman_all_list': saleman_all_list, 'customer_all_list': customer_all_list, 'data': { 'total_page': 0, 'total_num': 0, 'bill_sum': 0, 'page_size': 20, 'current_page': 1, 'bill_list': [] } } elif new_user_dic: target_user_dic = new_user_dic else: target_user_dic = user_dic res_bill = await user_get_bill(nss=target_user_dic, extra_info=ns) return { 'status': True, 'msg': '获取账单成功', 'saleman_all_list': saleman_all_list, 'customer_all_list': customer_all_list, 'data': res_bill['data'] } async def get_product_volume_rank(ns={}): # ns = { # 'target': 'month' # } db = DBPools() async with db.sqlorContext('kboss') as sor: if not ns.get('target'): count_sql = """ SELECT p.id, p.name AS product_name, SUM(b.amount) AS total_sales FROM bill b JOIN product p ON b.productid = p.id WHERE b.business_op = 'BUY' GROUP BY p.id, p.name ORDER BY total_sales DESC LIMIT 20;""" else: count_sql = """ SELECT p.id, p.name AS product_name, DATE_FORMAT(b.bill_date, '%%Y-%%m') AS month, SUM(b.amount) AS total_sales FROM bill b JOIN product p ON b.productid = p.id WHERE b.business_op = 'BUY' GROUP BY p.id, month ORDER BY month, total_sales; -- 可根据需求调整排序 """ count_li = await sor.sqlExe(count_sql, {}) # for index, sale in enumerate(count_li): # sale['total_sales'] = round(sale['total_sales'] + 4 * 10000 + (index + 1) * 10024, 2) return { 'status': True, 'msg': '获取产品销售额成功', 'data': count_li } async def large_screen_first_page(ns={}): orgid = ns.get('orgid') if ns.get('orgid') else 'mIWUHBeeDM8mwAFPIQ8pS' n = 9 # 获取当前时间戳 now = time.time() # 获取当前年月 current_time = time.localtime(now) current_year = current_time.tm_year current_month = current_time.tm_mon x = [] y = [] for i in range(0, n + 1): # 计算之前的月份 month = current_month - i year = current_year if month <= 0: # 处理年份的变更 year -= (abs(month) // 12) + 1 if month % 12 == 0: month = 12 else: month = (month % 12) x.append(f"{year}-{month:02d}") y.append(int(f"{year}{month}") * 16 + 78390) # 统计销售额 res_bill = await operator_get_all_bill({'orgid': orgid}) # 销售额按月 # 使用 defaultdict 来存储每个月的金额总和 amount_by_month = defaultdict(int) # 处理数据 for entry in res_bill['data']['bill_list']: # 提取年月 year_month = datetime.datetime.strptime(entry["bill_date"], "%Y-%m-%d").strftime("%Y-%m") amount_by_month[year_month] += entry["amount"] # 将年月按升序排序 sorted_data = sorted(amount_by_month.items(), key=lambda x: x[0]) # 输出结果 year_month_li = [] total_amount_li = [] for year_month, total_amount in sorted_data: year_month_li.append(year_month) total_amount_li.append(total_amount) # print(f"Year-Month: {year_month}, Total Amount: {total_amount}") if ns.get('flag') == '1': LeftUp = { 'configname': '供应商销售额', 'x': year_month_li[-10:], 'y': total_amount_li[-10:] } else: LeftUp = { 'configname': '供应商销售额', 'x': x[::-1], 'y': [1502350, 1701232, 1602326, 1902310, 1802294, 4802278, 2262262, 2302246, 3115382, 3315366, 3631550] } # 统计用户量 user_count_sql = f""" SELECT DATE_FORMAT(create_at, '%%Y-%%m') AS month, COUNT(*) AS new_users_count FROM organization WHERE parentid = '{orgid}' GROUP BY month ORDER BY month ASC; """ user_count_time_li = [] user_count_li = [] db = DBPools() async with db.sqlorContext('kboss') as sor: user_count = await sor.sqlExe(user_count_sql, {}) for i in user_count: user_count_time_li.append(i['month']) user_count_li.append(i['new_users_count']) if ns.get('flag') == '1': LeftDown = { 'configname': '用户量', 'x': user_count_time_li[-10:], 'y': user_count_li[-10:] } else: LeftDown = { 'configname': '用户量', 'x': x[::-1], 'y': [502, 701, 602, 702, 1002, 3102, 1602, 1302, 1511, 1791, 1331] } # 分销商销售额 reseller_sum_sql = """ SELECT SUM(b.amount) AS total_sales_amount, o1.orgname AS distributor_name FROM bill b JOIN organization o2 ON b.customerid = o2.id -- 连接bill表和普通用户 JOIN organization o1 ON o2.parentid = o1.id -- 连接组织表,获取分销商 WHERE o1.parentid = '%s' -- 当前用户 AND o1.org_type = 1 -- 分销商 AND o1.del_flg = '0' -- 有效的分销商 AND o2.org_type = 2 -- 普通用户 AND b.business_op = 'BUY' -- 只统计购买操作 GROUP BY o1.orgname ORDER BY total_sales_amount; -- 按月份升序排列 """ % orgid reseller_name_li = [] reseller_sale_li = [] db = DBPools() async with db.sqlorContext('kboss') as sor: reseller_sum_li = await sor.sqlExe(reseller_sum_sql, {}) for i in reseller_sum_li: reseller_name_li.append(i['distributor_name']) reseller_sale_li.append(i['total_sales_amount']) # print(reseller_sum_li) if ns.get('flag') == '1': RightUp = { 'configname': '分销商销售额', 'x': reseller_name_li, 'y': reseller_sale_li } else: RightUp = { 'configname': '分销商销售额', 'x': ['叮*科技', '科行*科技', '深圳梵*科技', '广州高*科技*', '郑州白*科技'], 'y': [102232, 301057, 100407, 1315366, 231550] } # 产品销售额排名 product_rank_dic = await get_product_volume_rank() product_rank = [{"name": item['product_name'], "value": item['total_sales'], "children":[]} for item in product_rank_dic['data']] if ns.get('flag') == '1': RightDown = { 'configname': '产品排名', 'data': product_rank } else: RightDown = { 'configname': '产品排名', 'data': [{"name":"IDC","value":"220000","children":[]},{"name":"MSP","value":"540004","children":[]},{"name":"集团项目","value":"73375104.22","children":[{"name":"云平台","value":"56822324.22","children":[]},{"name":"技术服务","value":"16552780","children":[]}]},{"name":"算力","value":"49383708.31","children":[{"name":"公有云","value":"28295520.99","children":[{"name":"agency","value":"5734428.4","children":[]},{"name":"融合云","value":"16279947.35","children":[]},{"name":"reseller","value":"6180593.54","children":[]},{"name":"返佣","value":"74214.5974","children":[]},{"name":"短信","value":"26337.1","children":[]}]},{"name":"超算","value":"62560","children":[]},{"name":"智算","value":"21025627.32","children":[{"name":"A800","value":"20309100.84","children":[]},{"name":"A100","value":"716526.48","children":[]}]}]},{"name":"网业务","value":"2328264.29","children":[{"name":"互联网接入","value":"1743835.83","children":[]},{"name":"DCI","value":"584428.46","children":[]}]},{"name":"系统集成","value":"109880","children":[]}] } return { 'LeftUp': LeftUp, 'LeftDown': LeftDown, 'RightUp': RightUp, 'RightDown': RightDown } ret = await large_screen_first_page(params_kw) return ret