# 输入邮件地址,口令和 POP3 服务器地址 email = "billing-specific@kaiyuancloud.cn" password = "KYY@1234" # 在对于的邮箱设置的SMTP/POP3里,找到对应的服务地址 pop3_server = "pop.qiye.aliyun.com" # 连接到POP3服务器: server = poplib.POP3(pop3_server) # 可以打开或关闭调试信息: server.set_debuglevel(1) # 可选:打印POP3服务器的欢迎文字: print(server.getwelcome().decode('utf-8')) # 身份认证 server.user(email) server.pass_(password) # stat()返回邮件数量和占用空间: print('邮件数量: %s. 大小: %s' % server.stat()) # list()返回所有邮件的编号: resp, mails, octets = server.list() # 获取最新一封邮件, 注意索引号从1开始: index = len(mails) resp, lines, octets = server.retr(index) # lines存储了邮件的原始文本的每一行, # 可以获得整个邮件的原始文本: msg_content = b'\r\n'.join(lines).decode('utf-8') # 稍后解析出邮件,即完成下载邮件 msg = Parser().parsestr(msg_content) # 接下来解析文件 async def decode_str(s): value, charset = decode_header(s)[0] if charset: value = value.decode(charset) return value async def guess_charset(msg): charset = msg.get_charset() if charset is None: content_type = msg.get('Content-Type', '').lower() pos = content_type.find('charset=') if pos >= 0: charset = content_type[pos + 8:].strip() return charset # indent用于缩进显示: async def email_info(msg, indent=0): db = DBPools() async with db.sqlorContext('kboss') as sor: if indent == 0: for header in ['From', 'To', 'Subject']: value = msg.get(header, '') if value: if header=='Subject': value = await decode_str(value) else: hdr, addr = parseaddr(value) name = await decode_str(hdr) value = u'%s <%s>' % (name, addr) print('%s%s: %s' % (' ' * indent, header, value)) if (msg.is_multipart()): parts = msg.get_payload() for n, part in enumerate(parts): print('%spart %s' % (' ' * indent, n)) print('%s--------------------' % (' ' * indent)) await email_info(part, indent + 1) else: content_type = msg.get_content_type() if content_type=='text/plain' or content_type=='text/html': content = msg.get_payload(decode=True) charset = await guess_charset(msg) if charset: content = content.decode(charset) try: #解析交易金额 start_index = content.find("交易金额:") + len("交易金额:") end_index = content.find("\n", start_index) find_data = content[start_index:end_index] price = find_data.strip()[1:] #解析交易id码 start_index = content.find("摘要:") + len("摘要:") end_index = content.find("\n", start_index) find_data = content[start_index:end_index] index = find_data.index("<") name = find_data[:index] if price and name: mail_code_sql = """SELECT * FROM mail_code WHERE LOCATE(mailcode, name) > 0 and del_flg = '0';""" mail_code = await sor.sqlExe(mail_code_sql, {}) # mail_code = await sor.R('mail_code',{'mailcode':name,'del_flg':'0'}) date = await get_business_date(sor=None) recharge_log = {'customerid': mail_code[0]['customer_id'], 'recharge_amt': price, 'action': 'RECHARGE', 'recharge_path': '2', 'recharge_date': date} try: ra = RechargeAccounting(recharge_log) await ra.accounting(sor) await sor.C('recharge_log', {'id': uuid(), 'customerid': mail_code[0]['customer_id'], 'recharge_date': datetime.datetime.now().strftime("%Y-%m-%d"), 'recharge_path': '2', 'recharge_amt': price, 'recharge_timestamp': date, 'action': 'RECHARGE', # 'op_userid': apv[0]['apv_sender'], }) await sor.U('mail_code', {'id': mail_code[0]['id'], 'del_flg': '1'}) return {'status': True, 'msg': '充值成功'} except Exception as e: raise e except: pass else: print('%sAttachment: %s' % (' ' * indent, content_type)) return {"code":'200'} msg = Parser().parsestr(msg_content) ret = await email_info(msg) return ret