12 KiB
以下是为提供的 Python 异步数据库连接与操作代码编写的 Markdown 格式技术文档,涵盖模块功能、类结构、核心方法说明及使用示例。
📚 dbpool 模块技术文档
基于异步 I/O 的通用数据库连接池与 SQL 执行封装系统
支持多种数据库驱动(SQLite、MySQL、PostgreSQL、Oracle、SQL Server)的同步/异步模式
🔧 概述
本模块提供了一个可扩展、高性能的异步数据库访问框架,主要特性包括:
- ✅ 多数据库支持:通过插件式设计支持多种数据库
- ✅ 异步连接池管理(基于
asyncio.Queue) - ✅ 自动重连与健康检测机制
- ✅ 单例全局连接池管理器
DBPools - ✅ 装饰器简化 SQL 执行流程
- ✅ 元数据缓存优化表结构查询性能
- ✅ 事务自动提交/回滚控制
适用于高并发 Web 应用或微服务中对数据库资源进行统一管理和高效复用。
📦 导入依赖
import asyncio
from functools import wraps
from contextlib import asynccontextmanager
# 内部工具库
from appPublic.myImport import myImport
from appPublic.dictObject import DictObject
from appPublic.Singleton import SingletonDecorator
from appPublic.myjson import loadf
from appPublic.jsonConfig import getConfig
from appPublic.rc4 import unpassword
from appPublic.log import exception
外部依赖:
aiosqlite,aiomysql,asyncpg等异步驱动需根据实际数据库安装
🏗️ 核心组件架构
| 组件 | 功能 |
|---|---|
SQLor 子类 |
数据库抽象层,封装增删改查逻辑 |
sqlorFactory() |
工厂函数,根据配置创建对应的 SQLor 实例 |
LifeConnect |
封装单个连接的生命期与可用性测试 |
ConnectionPool |
连接池实现,维护固定数量的活跃连接 |
DBPools |
单例全局连接池管理器,支持多数据库 |
装饰器 (@runSQL, @inSqlor) |
简化业务函数中的数据库调用 |
🧱 抽象接口:SQLor 及其子类
所有数据库适配器继承自基类 SQLor,并实现以下关键方法:
class SQLor:
def isMe(driver_name: str) -> bool: ...
async def tables() -> List[str]: ...
async def fields(table: str) -> List[FieldInfo]: ...
async def primary(table: str) -> List[str]: ...
async def indexes(table: str) -> List[IndexInfo]: ...
async def fkeys(table: str) -> List[ForeignKeyInfo]: ...
async def runSQL(desc: dict, ns: dict) -> Any: ...
async def runSQLPaging(desc: dict, ns: dict) -> Dict: ...
当前支持的数据库驱动
| 驱动名 | 类 | 文件 |
|---|---|---|
| sqlite3 | SQLite3or / Aiosqliteor |
.sqlite3or, .aiosqliteor |
| mysql | MySqlor / AioMysqlor |
.mysqlor, .aiomysqlor |
| postgresql | AioPostgresqlor |
.aiopostgresqlor |
| mssql | MsSqlor |
.mssqlor |
| oracle | Oracleor |
.oracleor |
同步和异步版本分别处理阻塞/非阻塞场景。
⚙️ 工厂函数
sqlorFactory(dbdesc) → SQLor
根据数据库描述字典动态选择合适的 SQLor 子类实例。
参数
| 参数 | 类型 | 说明 |
|---|---|---|
dbdesc |
dict |
包含 'driver' 和 'kwargs' 的数据库连接信息 |
示例
dbdesc = {
"driver": "aiomysql",
"kwargs": {
"host": "localhost",
"port": 3306,
"user": "root",
"password": "enc:xxxxx",
"database": "test"
},
"async_mode": True
}
sor = sqlorFactory(dbdesc)
若未找到匹配驱动,则返回默认
SQLor实例。
sqlorFromFile(dbdef_file, coding='utf8') → SQLor
从 JSON 文件加载数据库配置并初始化 SQLor 实例。
参数
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
dbdef_file |
str |
- | JSON 配置文件路径 |
coding |
str |
'utf8' |
文件编码格式 |
示例文件内容 (config/db.json)
{
"driver": "sqlite3",
"kwargs": {
"dbname": "/tmp/test.db"
}
}
使用方式
sor = await sqlorFromFile("config/db.json")
🔌 连接生命期管理:LifeConnect
封装单个数据库连接的生命周期,具备自动重建能力。
初始化参数
| 属性 | 说明 |
|---|---|
connfunc |
创建连接的函数(如 aiomysql.connect) |
kw |
传递给 connfunc 的关键字参数 |
use_max |
最大使用次数后强制关闭 |
async_mode |
是否启用异步模式 |
方法
| 方法 | 返回类型 | 说明 |
|---|---|---|
use() |
awaitable<connection> |
获取一个有效连接,失败时尝试重建 |
free(conn) |
None |
标记连接已释放(暂不关闭) |
testok() |
awaitable<bool> |
测试连接是否正常(执行 SELECT 1) |
_mkconn() |
awaitable |
内部创建新连接 |
在获取连接时会自动探测断连并尝试重新建立最多 4 次。
🔄 连接池:ConnectionPool
每个数据库拥有独立的连接池,基于 asyncio.Queue 实现先进先出调度。
初始化参数
| 参数 | 说明 |
|---|---|
dbdesc |
数据库配置对象 |
loop |
事件循环引用 |
属性
| 属性 | 说明 |
|---|---|
maxconn |
最大连接数(默认 5) |
maxuse |
单连接最大使用次数(默认 1000) |
_pool |
存放 LifeConnect 实例的队列 |
关键方法
| 方法 | 说明 |
|---|---|
connect() |
创建新的 LifeConnect 并放入池中 |
aquire() |
异步获取可用连接 |
release(conn) |
释放连接回池中 |
isEmpty() / isFull() |
查询池状态 |
注意:当前未启用锁保护共享状态,建议在单线程协程环境下运行。
🌐 全局连接池管理器:DBPools(单例)
使用 @SingletonDecorator 保证全局唯一实例。
构造参数
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
databases |
dict |
{} |
名称映射到数据库配置 |
max_connect |
int |
100 |
全局限流信号量 |
loop |
asyncio.AbstractEventLoop |
get_event_loop() |
事件循环 |
示例配置
databases = {
'default': {
'driver': 'aiomysql',
'async_mode': True,
'kwargs': { ... }
},
'local_sqlite': {
'driver': 'sqlite3',
'async_mode': False,
'kwargs': {'dbname': '/tmp/app.db'}
}
}
pools = DBPools(databases=databases)
💡 核心 API 方法
getSqlor(name) → SQLor
获取指定名称的数据库操作对象(带连接绑定)。
sor = await pools.getSqlor('default')
自动申请连接、游标,并设置上下文。
freeSqlor(sor)
释放 SQLor 占用的连接资源。
await pools.freeSqlor(sor)
@sqlorContext(name) → Async Context Manager
推荐使用的上下文管理器,自动处理连接获取、事务提交/回滚、异常捕获与释放。
示例
async with pools.sqlorContext('default') as sor:
ret = await sor.runSQL({"sql_string": "SELECT * FROM users"})
- 出现异常且有数据变更 → 自动
rollback- 正常退出 → 自动
commit- 总是释放连接
useOrGetSor(dbname, **kw) → (sor, commit_flag)
内部工具函数:若传入了 sor 则复用,否则新建。
用于嵌套调用时避免重复连接开销。
🎯 装饰器 API(简化开发)
@inSqlor(func)
装饰业务函数,自动注入 sor 对象,支持事务控制。
@pools.inSqlor
async def get_user(dbname, NS, user_id, **kw):
sor = kw['sor']
return await sor.selectOne("users", {"id": user_id})
调用方式:
ret = await get_user('default', {}, user_id=123)
若未传
sor,则自动申请;否则复用传入的sor
@runSQL(func)
装饰函数返回 SQL 描述对象,然后执行它。
@pools.runSQL
async def list_users(dbname, NS, dept_id, **kw):
return {
"sql_string": "SELECT * FROM users WHERE dept_id=?",
"args": [dept_id]
}
# 调用
result = await list_users('default', {}, dept_id=5)
自动执行 SQL、提交事务(如需)、释放连接。
@runSQLPaging(func)
分页查询专用装饰器,结合 sor.runSQLPaging() 使用。
@pools.runSQLPaging
async def search_users_paged(dbname, NS, keyword, page=1, pagesize=10):
return {
"sql_string": "SELECT id,name FROM users WHERE name LIKE ?",
"args": [f"%{keyword}%"]
}
返回格式:
{
"page": 1,
"pagesize": 10,
"total": 87,
"data": [...]
}
@runSQLResultFields(func)
获取查询结果字段定义(元数据),常用于动态表单生成。
@pools.runSQLResultFields
async def describe_query(dbname, NS, table):
return {"sql_string": f"SELECT * FROM {table} LIMIT 1"}
🔍 元数据查询接口
提升性能:结果缓存在内存中。
| 方法 | 作用 |
|---|---|
getTables(dbname) |
获取所有表名列表 |
getTableFields(dbname, tblname) |
获取某表字段结构 |
getTablePrimaryKey(dbname, tblname) |
获取主键字段 |
getTableIndexes(dbname, tblname) |
获取索引信息 |
getTableForignKeys(dbname, tblname) |
获取外键关系 |
所有方法均带本地缓存,默认以
"dbname:tablename"为键。
🛠️ 辅助函数
runSQL(dbname, sql, ns={}, sor=None)
快速执行一条原始 SQL。
result = await runSQL(
dbname='default',
sql='SELECT COUNT(*) AS cnt FROM users'
)
推荐仅用于简单脚本或调试。
runSQLPaging(dbname, sql, ns={}, sor=None)
快速执行分页 SQL。
paged = await runSQLPaging(
dbname='default',
sql='SELECT id,name FROM users ORDER BY id',
ns={'page': 2, 'pagesize': 10}
)
🔐 安全相关
- 密码字段支持加密标记:
"password": "enc:xxxxxx" - 使用
unpassword(pw)解密(RC4 加密算法,需确保密钥一致) - 不建议生产环境明文存储密码
📐 配置结构规范(JSON)
{
"driver": "aiomysql",
"async_mode": true,
"maxconn": 10,
"maxuse": 500,
"kwargs": {
"host": "127.0.0.1",
"port": 3306,
"user": "appuser",
"password": "enc:xxxxxxxx",
"database": "myapp_db"
}
}
支持字段:
driver: 必须匹配导入路径下的模块名async_mode: 控制异步行为kwargs: 透传给底层驱动
🧪 使用示例
1. 初始化连接池
from your_module import DBPools
config = loadf("db.json") # 加载配置
pools = DBPools(databases={"prod": config})
2. 执行普通查询
async def main():
async with pools.sqlorContext("prod") as sor:
users = await sor.select("users", where={"status": 1})
print(users)
3. 分页查询用户
@pools.runSQLPaging
async def query_active_users(dbname, NS, status=1):
return {
"sql_string": "SELECT id,name,email FROM users WHERE status=?",
"args": [status]
}
result = await query_active_users("prod", {"page": 1, "pagesize": 20})
print(result)
⚠️ 注意事项
- 异步模式必须使用异步驱动
- 如
aiomysql,asyncpg,aiosqlite
- 如
- 不要手动调用
close()游标或连接- 应由
DBPools统一管理
- 应由
- 避免长时间持有
sor实例- 建议使用
sqlorContext上下文管理器
- 建议使用
- 元数据缓存不会自动刷新
- 表结构变更后需重启服务或手动清理
meta
- 表结构变更后需重启服务或手动清理
📈 性能建议
- 设置合理的
maxconn(通常 ≤ CPU 核心数 × 2) maxuse可防止长连接老化导致的问题- 生产环境建议开启日志监控连接健康状况
📎 附录 A:错误处理策略
| 场景 | 处理方式 |
|---|---|
| 连接中断 | 尝试重建最多 4 次 |
| 查询异常 | 记录 traceback 日志 |
| 事务失败 | 自动 rollback 并抛出原异常 |
| 密码解密失败 | 抛出异常(需检查 RC4 密钥) |
📎 附录 B:未来改进方向
- 添加连接池监控指标(Prometheus)
- 支持读写分离
- 更细粒度的超时控制
- SQL 拦截器(审计/日志)
- 连接泄漏检测机制
✅ 文档版本:v1.0
📅 更新时间:2025-04-05
© 2025 Your Company. All Rights Reserved.
📌 提示:将此文档保存为 README.md 或集成至 Sphinx/Wiki 中便于团队查阅。