sqlor/aidocs/dbpools.old.md
2025-10-05 11:24:24 +08:00

12 KiB
Raw Blame History

以下是为提供的 Python 异步数据库连接与操作代码编写的 Markdown 格式技术文档,涵盖模块功能、类结构、核心方法说明及使用示例。


📚 dbpool 模块技术文档

基于异步 I/O 的通用数据库连接池与 SQL 执行封装系统
支持多种数据库驱动SQLite、MySQL、PostgreSQL、Oracle、SQL Server的同步/异步模式


🔧 概述

本模块提供了一个可扩展、高性能的异步数据库访问框架,主要特性包括:

  • 多数据库支持:通过插件式设计支持多种数据库
  • 异步连接池管理(基于 asyncio.Queue
  • 自动重连与健康检测机制
  • 单例全局连接池管理器 DBPools
  • 装饰器简化 SQL 执行流程
  • 元数据缓存优化表结构查询性能
  • 事务自动提交/回滚控制

适用于高并发 Web 应用或微服务中对数据库资源进行统一管理和高效复用。


📦 导入依赖

import asyncio
from functools import wraps
from contextlib import asynccontextmanager

# 内部工具库
from appPublic.myImport import myImport
from appPublic.dictObject import DictObject
from appPublic.Singleton import SingletonDecorator
from appPublic.myjson import loadf
from appPublic.jsonConfig import getConfig
from appPublic.rc4 import unpassword
from appPublic.log import exception

外部依赖:

  • aiosqlite, aiomysql, asyncpg 等异步驱动需根据实际数据库安装

🏗️ 核心组件架构

组件 功能
SQLor 子类 数据库抽象层,封装增删改查逻辑
sqlorFactory() 工厂函数,根据配置创建对应的 SQLor 实例
LifeConnect 封装单个连接的生命期与可用性测试
ConnectionPool 连接池实现,维护固定数量的活跃连接
DBPools 单例全局连接池管理器,支持多数据库
装饰器 (@runSQL, @inSqlor) 简化业务函数中的数据库调用

🧱 抽象接口:SQLor 及其子类

所有数据库适配器继承自基类 SQLor,并实现以下关键方法:

class SQLor:
    def isMe(driver_name: str) -> bool: ...
    async def tables() -> List[str]: ...
    async def fields(table: str) -> List[FieldInfo]: ...
    async def primary(table: str) -> List[str]: ...
    async def indexes(table: str) -> List[IndexInfo]: ...
    async def fkeys(table: str) -> List[ForeignKeyInfo]: ...
    async def runSQL(desc: dict, ns: dict) -> Any: ...
    async def runSQLPaging(desc: dict, ns: dict) -> Dict: ...

当前支持的数据库驱动

驱动名 文件
sqlite3 SQLite3or / Aiosqliteor .sqlite3or, .aiosqliteor
mysql MySqlor / AioMysqlor .mysqlor, .aiomysqlor
postgresql AioPostgresqlor .aiopostgresqlor
mssql MsSqlor .mssqlor
oracle Oracleor .oracleor

同步和异步版本分别处理阻塞/非阻塞场景。


⚙️ 工厂函数

sqlorFactory(dbdesc) → SQLor

根据数据库描述字典动态选择合适的 SQLor 子类实例。

参数

参数 类型 说明
dbdesc dict 包含 'driver''kwargs' 的数据库连接信息

示例

dbdesc = {
    "driver": "aiomysql",
    "kwargs": {
        "host": "localhost",
        "port": 3306,
        "user": "root",
        "password": "enc:xxxxx",
        "database": "test"
    },
    "async_mode": True
}
sor = sqlorFactory(dbdesc)

若未找到匹配驱动,则返回默认 SQLor 实例。


sqlorFromFile(dbdef_file, coding='utf8') → SQLor

从 JSON 文件加载数据库配置并初始化 SQLor 实例。

参数

参数 类型 默认值 说明
dbdef_file str - JSON 配置文件路径
coding str 'utf8' 文件编码格式

示例文件内容 (config/db.json)

{
  "driver": "sqlite3",
  "kwargs": {
    "dbname": "/tmp/test.db"
  }
}

使用方式

sor = await sqlorFromFile("config/db.json")

🔌 连接生命期管理:LifeConnect

封装单个数据库连接的生命周期,具备自动重建能力。

初始化参数

属性 说明
connfunc 创建连接的函数(如 aiomysql.connect
kw 传递给 connfunc 的关键字参数
use_max 最大使用次数后强制关闭
async_mode 是否启用异步模式

方法

方法 返回类型 说明
use() awaitable<connection> 获取一个有效连接,失败时尝试重建
free(conn) None 标记连接已释放(暂不关闭)
testok() awaitable<bool> 测试连接是否正常(执行 SELECT 1
_mkconn() awaitable 内部创建新连接

在获取连接时会自动探测断连并尝试重新建立最多 4 次。


🔄 连接池:ConnectionPool

每个数据库拥有独立的连接池,基于 asyncio.Queue 实现先进先出调度。

初始化参数

参数 说明
dbdesc 数据库配置对象
loop 事件循环引用

属性

属性 说明
maxconn 最大连接数(默认 5
maxuse 单连接最大使用次数(默认 1000
_pool 存放 LifeConnect 实例的队列

关键方法

方法 说明
connect() 创建新的 LifeConnect 并放入池中
aquire() 异步获取可用连接
release(conn) 释放连接回池中
isEmpty() / isFull() 查询池状态

注意:当前未启用锁保护共享状态,建议在单线程协程环境下运行。


🌐 全局连接池管理器:DBPools(单例)

使用 @SingletonDecorator 保证全局唯一实例。

构造参数

参数 类型 默认值 说明
databases dict {} 名称映射到数据库配置
max_connect int 100 全局限流信号量
loop asyncio.AbstractEventLoop get_event_loop() 事件循环

示例配置

databases = {
    'default': {
        'driver': 'aiomysql',
        'async_mode': True,
        'kwargs': { ... }
    },
    'local_sqlite': {
        'driver': 'sqlite3',
        'async_mode': False,
        'kwargs': {'dbname': '/tmp/app.db'}
    }
}
pools = DBPools(databases=databases)

💡 核心 API 方法

getSqlor(name) → SQLor

获取指定名称的数据库操作对象(带连接绑定)。

sor = await pools.getSqlor('default')

自动申请连接、游标,并设置上下文。


freeSqlor(sor)

释放 SQLor 占用的连接资源。

await pools.freeSqlor(sor)

@sqlorContext(name) → Async Context Manager

推荐使用的上下文管理器,自动处理连接获取、事务提交/回滚、异常捕获与释放。

示例

async with pools.sqlorContext('default') as sor:
    ret = await sor.runSQL({"sql_string": "SELECT * FROM users"})
  • 出现异常且有数据变更 → 自动 rollback
  • 正常退出 → 自动 commit
  • 总是释放连接

useOrGetSor(dbname, **kw) → (sor, commit_flag)

内部工具函数:若传入了 sor 则复用,否则新建。

用于嵌套调用时避免重复连接开销。


🎯 装饰器 API简化开发

@inSqlor(func)

装饰业务函数,自动注入 sor 对象,支持事务控制。

@pools.inSqlor
async def get_user(dbname, NS, user_id, **kw):
    sor = kw['sor']
    return await sor.selectOne("users", {"id": user_id})

调用方式:

ret = await get_user('default', {}, user_id=123)

若未传 sor,则自动申请;否则复用传入的 sor


@runSQL(func)

装饰函数返回 SQL 描述对象,然后执行它。

@pools.runSQL
async def list_users(dbname, NS, dept_id, **kw):
    return {
        "sql_string": "SELECT * FROM users WHERE dept_id=?",
        "args": [dept_id]
    }

# 调用
result = await list_users('default', {}, dept_id=5)

自动执行 SQL、提交事务如需、释放连接。


@runSQLPaging(func)

分页查询专用装饰器,结合 sor.runSQLPaging() 使用。

@pools.runSQLPaging
async def search_users_paged(dbname, NS, keyword, page=1, pagesize=10):
    return {
        "sql_string": "SELECT id,name FROM users WHERE name LIKE ?",
        "args": [f"%{keyword}%"]
    }

返回格式:

{
  "page": 1,
  "pagesize": 10,
  "total": 87,
  "data": [...]
}

@runSQLResultFields(func)

获取查询结果字段定义(元数据),常用于动态表单生成。

@pools.runSQLResultFields
async def describe_query(dbname, NS, table):
    return {"sql_string": f"SELECT * FROM {table} LIMIT 1"}

🔍 元数据查询接口

提升性能:结果缓存在内存中。

方法 作用
getTables(dbname) 获取所有表名列表
getTableFields(dbname, tblname) 获取某表字段结构
getTablePrimaryKey(dbname, tblname) 获取主键字段
getTableIndexes(dbname, tblname) 获取索引信息
getTableForignKeys(dbname, tblname) 获取外键关系

所有方法均带本地缓存,默认以 "dbname:tablename" 为键。


🛠️ 辅助函数

runSQL(dbname, sql, ns={}, sor=None)

快速执行一条原始 SQL。

result = await runSQL(
    dbname='default',
    sql='SELECT COUNT(*) AS cnt FROM users'
)

推荐仅用于简单脚本或调试。


runSQLPaging(dbname, sql, ns={}, sor=None)

快速执行分页 SQL。

paged = await runSQLPaging(
    dbname='default',
    sql='SELECT id,name FROM users ORDER BY id',
    ns={'page': 2, 'pagesize': 10}
)

🔐 安全相关

  • 密码字段支持加密标记:"password": "enc:xxxxxx"
  • 使用 unpassword(pw) 解密RC4 加密算法,需确保密钥一致)
  • 不建议生产环境明文存储密码

📐 配置结构规范JSON

{
  "driver": "aiomysql",
  "async_mode": true,
  "maxconn": 10,
  "maxuse": 500,
  "kwargs": {
    "host": "127.0.0.1",
    "port": 3306,
    "user": "appuser",
    "password": "enc:xxxxxxxx",
    "database": "myapp_db"
  }
}

支持字段:

  • driver: 必须匹配导入路径下的模块名
  • async_mode: 控制异步行为
  • kwargs: 透传给底层驱动

🧪 使用示例

1. 初始化连接池

from your_module import DBPools

config = loadf("db.json")  # 加载配置
pools = DBPools(databases={"prod": config})

2. 执行普通查询

async def main():
    async with pools.sqlorContext("prod") as sor:
        users = await sor.select("users", where={"status": 1})
        print(users)

3. 分页查询用户

@pools.runSQLPaging
async def query_active_users(dbname, NS, status=1):
    return {
        "sql_string": "SELECT id,name,email FROM users WHERE status=?",
        "args": [status]
    }

result = await query_active_users("prod", {"page": 1, "pagesize": 20})
print(result)

⚠️ 注意事项

  1. 异步模式必须使用异步驱动
    • aiomysql, asyncpg, aiosqlite
  2. 不要手动调用 close() 游标或连接
    • 应由 DBPools 统一管理
  3. 避免长时间持有 sor 实例
    • 建议使用 sqlorContext 上下文管理器
  4. 元数据缓存不会自动刷新
    • 表结构变更后需重启服务或手动清理 meta

📈 性能建议

  • 设置合理的 maxconn(通常 ≤ CPU 核心数 × 2
  • maxuse 可防止长连接老化导致的问题
  • 生产环境建议开启日志监控连接健康状况

📎 附录 A错误处理策略

场景 处理方式
连接中断 尝试重建最多 4 次
查询异常 记录 traceback 日志
事务失败 自动 rollback 并抛出原异常
密码解密失败 抛出异常(需检查 RC4 密钥)

📎 附录 B未来改进方向

  • 添加连接池监控指标Prometheus
  • 支持读写分离
  • 更细粒度的超时控制
  • SQL 拦截器(审计/日志)
  • 连接泄漏检测机制

文档版本v1.0
📅 更新时间2025-04-05
© 2025 Your Company. All Rights Reserved.


📌 提示:将此文档保存为 README.md 或集成至 Sphinx/Wiki 中便于团队查阅。