This commit is contained in:
yumoqing 2025-10-31 08:35:53 +08:00
parent bd15c2fb8d
commit d3c9c060ea
2 changed files with 329 additions and 0 deletions

184
sqlor/pgsqlor.py Normal file
View File

@ -0,0 +1,184 @@
# -*- coding:utf8 -*-
import asyncpg
from appPublic.argsConvert import ArgsConvert, ConditionConvert
from .sor import SQLor
from .ddl_template_pgsql import pgsql_ddl_tmpl # 需要提供 PostgreSQL DDL 模板
class PgSqlor(SQLor):
ddl_template = pgsql_ddl_tmpl
db2modelTypeMapping = {
'smallint': 'short',
'integer': 'long',
'bigint': 'long',
'decimal': 'float',
'numeric': 'float',
'real': 'float',
'double precision': 'float',
'serial': 'long',
'bigserial': 'long',
'varchar': 'str',
'char': 'char',
'text': 'text',
'bytea': 'bin',
'date': 'date',
'time': 'time',
'timestamp': 'datetime',
'timestamptz': 'datestamp',
'boolean': 'short',
'json': 'text',
'jsonb': 'text',
}
model2dbTypemapping = {
'date': 'date',
'time': 'time',
'timestamp': 'timestamp',
'str': 'varchar',
'char': 'char',
'short': 'smallint',
'long': 'bigint',
'float': 'double precision',
'text': 'text',
'bin': 'bytea',
'file': 'bytea',
}
@classmethod
def isMe(cls, name):
return name.lower() in ['pgsql', 'postgres', 'postgresql']
def grammar(self):
return {
# PostgreSQL 支持 window/CTE 等,可在这里扩展
'select': 'select',
}
def placeHolder(self, varname, pos=None):
"""PostgreSQL 使用 $1, $2 作为占位符"""
if varname == '__mainsql__':
return ''
if pos is not None:
return f"${pos + 1}"
return '$1'
def dataConvert(self, dataList):
if isinstance(dataList, dict):
return tuple(dataList.values())
else:
return tuple(i['value'] for i in dataList)
def pagingSQLmodel(self):
"""分页模板"""
return """select * from (%s) as A order by $[sort]$ offset $[from_line]$ limit $[rows]$"""
def tablesSQL(self):
sqlcmd = f"""
SELECT
lower(tablename) as name,
obj_description(format('%s.%s', schemaname, tablename)::regclass) as title
FROM pg_tables
WHERE schemaname NOT IN ('pg_catalog', 'information_schema');
"""
return sqlcmd
def fieldsSQL(self, tablename=None):
sqlcmd = f"""
SELECT
lower(a.attname) as name,
format_type(a.atttypid, a.atttypmod) as type,
CASE
WHEN a.atttypmod > 0 THEN a.atttypmod - 4
ELSE NULL
END as length,
NULL as "dec",
CASE WHEN a.attnotnull THEN 'no' ELSE 'yes' END as nullable,
col_description(a.attrelid, a.attnum) as title,
lower(c.relname) as table_name
FROM pg_attribute a
JOIN pg_class c ON a.attrelid = c.oid
JOIN pg_namespace n ON c.relnamespace = n.oid
WHERE a.attnum > 0 AND NOT a.attisdropped
AND n.nspname NOT IN ('pg_catalog', 'information_schema')
"""
if tablename:
sqlcmd += f" AND lower(c.relname) = '{tablename.lower()}'"
return sqlcmd
def fkSQL(self, tablename=None):
sqlcmd = f"""
SELECT
con.conname AS constraint_name,
nsp.nspname AS schema_name,
rel.relname AS child_table,
att.attname AS child_column,
frel.relname AS parent_table,
fatt.attname AS parent_column,
con.confupdtype AS update_rule,
con.confdeltype AS delete_rule
FROM pg_constraint con
JOIN pg_class rel ON rel.oid = con.conrelid
JOIN pg_class frel ON frel.oid = con.confrelid
JOIN pg_attribute att ON att.attrelid = con.conrelid AND att.attnum = ANY(con.conkey)
JOIN pg_attribute fatt ON fatt.attrelid = con.confrelid AND fatt.attnum = ANY(con.confkey)
JOIN pg_namespace nsp ON nsp.oid = con.connamespace
WHERE con.contype = 'f'
"""
if tablename:
sqlcmd += f" AND lower(rel.relname) = '{tablename.lower()}'"
return sqlcmd
def pkSQL(self, tablename=None):
sqlcmd = f"""
SELECT a.attname as name
FROM pg_index i
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
WHERE i.indrelid = '{tablename.lower()}'::regclass
AND i.indisprimary;
"""
return sqlcmd
def indexesSQL(self, tablename=None):
sqlcmd = f"""
SELECT
lower(i.relname) as index_name,
CASE WHEN ix.indisunique THEN 'unique' ELSE '' END as is_unique,
lower(a.attname) as column_name
FROM pg_class t,
pg_class i,
pg_index ix,
pg_attribute a
WHERE
t.oid = ix.indrelid
AND i.oid = ix.indexrelid
AND a.attrelid = t.oid
AND a.attnum = ANY(ix.indkey)
AND t.relkind = 'r'
"""
if tablename:
sqlcmd += f" AND lower(t.relname) = '{tablename.lower()}'"
return sqlcmd
async def connect(self):
"""建立数据库连接"""
dbdesc = self.dbdesc
self.conn = await asyncpg.connect(
user=dbdesc.get('user'),
password=dbdesc.get('password'),
database=dbdesc.get('db'),
host=dbdesc.get('host', 'localhost'),
port=dbdesc.get('port', 5432),
)
self.dbname = dbdesc.get('db')
async def close(self):
await self.conn.close()
async def enter(self):
"""开启事务或获取游标asyncpg 无游标,直接用连接执行)"""
self.cur = self.conn # 保留接口一致性
async def exit(self):
"""释放资源"""
self.cur = None

145
sqlor/sqliteor.py Normal file
View File

@ -0,0 +1,145 @@
# -*- coding:utf8 -*-
import aiosqlite
from appPublic.argsConvert import ArgsConvert, ConditionConvert
from .sor import SQLor
from .ddl_template_sqlite import sqlite_ddl_tmpl
class SQLiteor(SQLor):
ddl_template = sqlite_ddl_tmpl
db2modelTypeMapping = {
'integer': 'long',
'int': 'long',
'tinyint': 'short',
'smallint': 'short',
'mediumint': 'long',
'bigint': 'long',
'decimal': 'float',
'numeric': 'float',
'real': 'float',
'double': 'float',
'float': 'float',
'char': 'char',
'varchar': 'str',
'text': 'text',
'clob': 'text',
'blob': 'bin',
'date': 'date',
'datetime': 'datetime',
'boolean': 'short',
}
model2dbTypemapping = {
'date': 'date',
'time': 'time',
'timestamp': 'datetime',
'str': 'text',
'char': 'char',
'short': 'integer',
'long': 'integer',
'float': 'real',
'text': 'text',
'bin': 'blob',
'file': 'blob',
}
@classmethod
def isMe(cls, name):
return name.lower() in ['sqlite', 'sqlite3']
def grammar(self):
return {
'select': 'select',
}
def placeHolder(self, varname, pos=None):
"""SQLite 使用 ? 占位符"""
if varname == '__mainsql__':
return ''
return '?'
def dataConvert(self, dataList):
if isinstance(dataList, dict):
return tuple(dataList.values())
else:
return tuple(i['value'] for i in dataList)
def pagingSQLmodel(self):
"""分页 SQL 模板"""
return """select * from (%s) as A order by $[sort]$ limit $[rows]$ offset $[from_line]$"""
def tablesSQL(self):
"""获取表名和备注"""
sqlcmd = """
SELECT
lower(name) AS name,
'' AS title
FROM sqlite_master
WHERE type='table'
AND name NOT LIKE 'sqlite_%';
"""
return sqlcmd
def fieldsSQL(self, tablename=None):
"""获取字段信息"""
sqlcmd = f"""
PRAGMA table_info({tablename});
"""
# SQLite 无信息架构表,直接返回 PRAGMA 命令
return sqlcmd
def fkSQL(self, tablename=None):
"""获取外键信息"""
if tablename:
sqlcmd = f"PRAGMA foreign_key_list({tablename});"
else:
sqlcmd = "-- SQLite 需逐表获取外键信息"
return sqlcmd
def pkSQL(self, tablename=None):
"""获取主键信息"""
sqlcmd = f"""
SELECT name
FROM pragma_table_info('{tablename}')
WHERE pk != 0;
"""
return sqlcmd
def indexesSQL(self, tablename=None):
"""获取索引信息"""
if not tablename:
return "SELECT name as index_name, '' as is_unique, '' as column_name FROM sqlite_master WHERE type='index';"
sqlcmd = f"""
PRAGMA index_list('{tablename}');
"""
return sqlcmd
async def connect(self):
"""
连接 SQLite 数据库
dbdesc:
path: 数据库文件路径 :memory:
"""
dbdesc = self.dbdesc
self.dbpath = dbdesc.get('path', ':memory:')
self.conn = await aiosqlite.connect(self.dbpath)
self.conn.row_factory = aiosqlite.Row # 支持 dict 访问
self.dbname = self.dbpath
async def close(self):
await self.conn.close()
async def enter(self):
"""开启事务"""
self.cur = await self.conn.cursor()
async def exit(self):
"""释放 cursor"""
try:
await self.cur.close()
except:
pass
self.cur = None