Compare commits

..

No commits in common. "main" and "plugable" have entirely different histories.

30 changed files with 836 additions and 952 deletions

177
README.md
View File

@ -23,7 +23,7 @@ SQLOR is a database api for python3, it is base on the python's DBAPI2
* clickhouse(clickhouse-connect) * clickhouse(clickhouse-connect)
* Other driver can be easy integreated * Other driver can be easy integreated
## Supported Database Types ## Support Database Types
* oracle * oracle
* mysql, mariadb * mysql, mariadb
* TiDB * TiDB
@ -83,120 +83,87 @@ password in json data is encrypted by aes.
## Using ## Using
### sqlor setup
First, Specified a server_path folder, under the server_path folder, need a named by "conf" subfolder.
and a config.json file must be in the conf.
the config.json need "password_key" and "databases" attributes, like:
``` ```
{ import asyncio
.......
"password_key":"tfyugihjo245g7g642yubv24g534", from sqlor.dbpools import DBPools, sqlorContext
"databases":{
....... dbs={
"mydb":{ "mydb":{
"driver":"mysql", "driver":"mysql",
"kwargs":{ "kwargs":{
"user":"test", "user":"test",
"db":"database_name_in_your_database_engine", "db":"cfae",
"password":"encoded_password_string", "password":"test123",
"host":"localhost"
}
},
"stock":{
"driver":"aiopg",
"async_mode":True,
"codeing":"utf-8",
"dbname":"stock",
"kwargs":{
"dbname":"stock",
"user":"test",
"password":"test123",
"host":"127.0.0.1"
}
},
"cfae":{
"driver":"mysql.connector",
"coding":"utf8",
"dbname":"cfae",
"kwargs":{
"user":"test",
"db":"cfae",
"password":"test123",
"host":"localhost" "host":"localhost"
} }
} }
}
} }
```
### generates encoded password string
sqlor has a dbpassword script let you generate the "encoded_password_string" in config.json
dbpassword usage:
```
dbpassword server_path database_password_of_user
```
### script to use sqlor loop = asyncio.get_event_loop()
pool = DBPools(dbs,loop=loop)
```
import asyncio
from appPublic.worker import get_event_loop
from appPublic.jsonConfig import getConfig
from sqlor.dbpools import DBPools, sqlorContext
loop = get_event_loop()
config = getConfig(server_path)
pool = DBPools(config.databases, loop=loop)
async def testfunc(): async def testfunc():
dbname = 'mydb' async with sqlorContext('stock') as sor:
db = DBPools() # start a transaction
async with sqlorContext('stock') as sor: # if exception happended, all change to database will rollback
# start a transaction # else will commit
# if exception happended, all change to database will rollback sql = "select * from stock where stock_num = ${stock_num}"
# else will commit recs = await sor.sqlExe(sql, {'stock_num':'688888'})
sql = "select * from stock where stock_num = ${stock_num}" # return a list of DictObject instance
recs = await sor.sqlExe(sql, {'stock_num':'688888'}) sql1 = "select * from stock"
# return a list of DictObject instance recs = await sor.sqlPaging(sql, {'pagerows':50, 'page':1, 'sort':'stock_id'})
sql1 = "select * from stock" # return a dictionary {
recs = await sor.sqlPaging(sql, {'pagerows':50, 'page':1, 'sort':'stock_id'}) # 'total': return all reocords count.
# return a dictionary { # 'rows': list of DictObject instance
# 'total': return all reocords count. # }
# 'rows': list of DictObject instance ns = {'id':'666667'} # filters dictionary, find all records which id = '666667'
# } recs = await sor.R('stock', ns)
ns = {'id':'666667'} # filters dictionary, find all records which id = '666667' # recs is a list of data in DictObject instance
recs = await sor.R('stock', ns) ns = {'pagerows': 50, 'page':1, 'sort':stock_id'}
# recs is a list of data in DictObject instance # find all record in table 'stock', return page 1 data which 50 max records
ns = {'pagerows': 50, 'page':1, 'sort':stock_id'} dic = await sor.R('stock', ns)
# find all record in table 'stock', return page 1 data which 50 max records # return a dictionary {
dic = await sor.R('stock', ns) # 'total': return all reocords count.
# return a dictionary { # 'rows': list of DictObject instance
# 'total': return all reocords count. # }
# 'rows': list of DictObject instance await sor.C('stock', {
# } 'stock_id': '111111',
await sor.C('stock', { ...
'stock_id': '111111', })
... # add a record to 'stock' table
}) await sor.D('stock', {'stock_id':'111111'})
# add a record to 'stock' table # delete a record in 'stock' table which stock_id = '111111'
await sor.D('stock', {'stock_id':'111111'}) await sor.U('stock', {'stock_id':'111111', 'name':'new_name'})
# delete a record in 'stock' table which stock_id = '111111' # update name field's value to 'new_name' which stock_id = '111111'
await sor.U('stock', {'stock_id':'111111', 'name':'new_name'})
# update name field's value to 'new_name' which stock_id = '111111'
loop.run_until_complete(testfunc()) loop.run_until_complete(testfunc())
``` ```
## scripts
### dbpassword
generate a encoded password for the password of the user of database
* Syntax
dbpassword server_path password_text
1 server_path is folder where server_path/conf/config.json file have specifies password_key and databases
2 password_text is the password of the specified user of database
* description
dbpassword encodes second argument password to a base64 based cyber, this cyber need to write into the password attribute under kwargs, and print the cyber to stdout
### dbloader
load data in xlsx file to your database
* Syntax
dbloader server_path dbname xlsxfile
1 server_path is folder where server_path/conf/config.json file have specifies password_key and databases
2 dbname is database where the data will insert into
3 xlsxfile is a data file, can contains many sheets, sheet name is tablename, first rows of sheet contains the field name of the table, fields without data not need to appear in the sheet.
* dbloader get data from each named sheet in xlsxfile, and insert them to database parellally,
## API ## API
@ -208,23 +175,25 @@ how many databases and what database will using, and them connection parameters
dbdesc data is a dict data, format of the dbdesc as follow: dbdesc data is a dict data, format of the dbdesc as follow:
``` ```
{ {
"mydb1":{ # name to identify a database connect "aiocfae":{ # name to identify a database connect
"driver":"mysql", # database dbapi2 driver package name "driver":"aiomysql", # database dbapi2 driver package name
"async_mode":True, # indicte this connection is asynchronous mode "async_mode":True, # indicte this connection is asynchronous mode
"coding":"utf8", # charset coding "coding":"utf8", # charset coding
"dbname":"cfae", # database real name "dbname":"cfae", # database real name
"kwargs":{ # connection parameters "kwargs":{ # connection parameters
"user":"test", "user":"test",
"db":"cfae", "db":"cfae",
"password":"encoded_password", "password":"test123",
"host":"localhost" "host":"localhost"
} }
}, },
"mydb2":{ "cfae":{
"driver":"postgresql", "driver":"mysql.connector",
"coding":"utf8",
"dbname":"cfae",
"kwargs":{ "kwargs":{
"user":"test", "user":"test",
"dbname":"cfae", "db":"cfae",
"password":"test123", "password":"test123",
"host":"localhost" "host":"localhost"
} }

View File

@ -34,7 +34,7 @@
## 概述 ## 概述
`SQLor` 提供了一个统一接口来执行 SQL 查询和命令,并通过命名参数 `${var}$` 支持模板化 SQL。其主要特点包括 `SQLor` 提供了一个统一接口来执行 SQL 查询和命令,并通过命名参数 `${var}` 支持模板化 SQL。其主要特点包括
- 异步/同步双模式支持。 - 异步/同步双模式支持。
- 参数自动替换与安全绑定。 - 参数自动替换与安全绑定。
@ -89,7 +89,7 @@ os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'
| 功能 | 描述 | | 功能 | 描述 |
|------|------| |------|------|
| 参数化 SQL | 使用 `${var}$` 占位符进行变量注入 | | 参数化 SQL | 使用 `${var}` 占位符进行变量注入 |
| 异步支持 | 支持 `async/await` 模式运行 SQL | | 异步支持 | 支持 `async/await` 模式运行 SQL |
| 分页查询 | 自动生成分页 SQL需子类实现模型 | | 分页查询 | 自动生成分页 SQL需子类实现模型 |
| 排序与过滤 | 支持动态排序字段和条件表达式 | | 排序与过滤 | 支持动态排序字段和条件表达式 |
@ -100,6 +100,289 @@ os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'
--- ---
## 主要类与方法说明
### `db_type_2_py_type(o)`
将特定数据库类型转换为 Python 原生类型。
#### 参数
- `o`: 数据库字段值
#### 返回值
| 输入类型 | 输出 |
|--------|-------|
| `decimal.Decimal` | `float` |
| `datetime.datetime` | 字符串格式 `'YYYY-MM-DD HH:MM:SS'` |
| `datetime.date` | `'YYYY-MM-DD'` |
| 其他 | 原值 |
#### 示例
```python
db_type_2_py_type(decimal.Decimal('3.14')) → 3.14
```
---
### `SQLorException`
自定义异常类,封装 SQL 执行错误。
> ❗ 当前代码存在拼写错误:`__int__` 应为 `__init__`,且 `supper` 应为 `super`
#### 属性
- `response`: `"error"`
- `errtype`: `"SQLor"`
- `errmsg`: 异常消息字符串
#### 示例输出
```
errtype:SQLor,errmsg=database error...
```
> ✅ **建议修复**
```python
class SQLException(Exception):
def __init__(self, **kvs):
super(SQLException, self).__init__(**kvs)
self.dic = {
'response': 'error',
'errtype': 'SQLor',
'errmsg': str(self)
}
def __str__(self):
return 'errtype:%s,errmsg=%s' % (self.dic['errtype'], self.dic['errmsg'])
```
---
### `setValues(params, ns)`
从命名空间 `ns` 或系统环境变量中查找参数值。
#### 参数
- `params`: 参数名(字符串)
- `ns`: 命名空间字典
#### 返回
优先从 `ns` 中取值,若无则尝试 `os.getenv(params)`
---
### `findNamedParameters(sql)`
解析 SQL 字符串中的所有 `${...}` 形式的命名参数。
#### 示例
```python
sql = "SELECT * FROM user WHERE id = ${uid}$ AND name = ${uname}$"
findNamedParameters(sql)
# 返回 ['${uid}$', '${uname}$']
```
---
### `uniParams(params1)`
去重保留唯一参数名列表。
#### 示例
```python
uniParams(['${a}$', '${b}$', '${a}$']) → ['${a}$', '${b}$']
```
---
### `readsql(fn)`
读取指定文件路径的 SQL 文件内容UTF-8 编码)。
#### 参数
- `fn`: 文件路径
#### 返回
文件内容字符串。
---
## `SQLor`
### 初始化 `__init__`
#### 参数
| 参数 | 默认值 | 说明 |
|------|--------|------|
| `dbdesc` | None | 数据库描述字典,至少含 `dbname` |
| `sqltp`, `sqlts` | `$[`, `]$` | 模板占位符开始/结束符号 |
| `sqlvp`, `sqlvs` | `${`, `}$` | 变量占位符开始/结束符号 |
#### 初始化行为
- 设置连接状态(`conn`, `cur`
- 初始化元数据缓存 `metadatas`
- 创建 `ConditionConvert` 实例用于条件表达式处理
---
### 元数据管理
#### `setMeta(tablename, meta)`
保存表的元数据到内存缓存。
#### `getMeta(tablename)`
获取指定表的缓存元数据。
#### `removeMeta(tablename)`
清除指定表的元数据缓存。
---
### 连接与游标控制
#### `setCursor(async_mode, conn, cur)`
绑定数据库连接和游标对象。
#### `getConn()`
返回当前数据库连接对象。
#### `cursor()`
返回当前游标对象。
---
### 类型转换支持
#### `setConvertFunction(typ, func)`
注册用户自定义类型转换函数。
#### `convert(typ, value)`
调用注册的转换函数处理值。
---
### SQL 类型判断
#### `getSqlType(sql)`
判断 SQL 语句类型。
| 类型 | 条件 |
|------|------|
| `"qry"` | 以 `SELECT` 开头 |
| `"dml"` | `INSERT`, `UPDATE`, `DELETE` |
| `"ddl"` | 其他(如 `CREATE`, `DROP` |
---
### SQL 模板处理
#### `maskingSQL(org_sql, NS)`
将命名参数 `${var}$` 替换为 `?` 并生成参数列表。
##### 步骤
1. 先替换 `$[...]$` 模板片段(如条件块)
2. 提取 `${var}$` 参数名
3. 替换为 `?` 占位符
4. 返回 `(标记后的SQL, 参数值列表)`
##### 特殊变量
- `__mainsql__`: 不参与参数绑定,用于嵌套 SQL 注入
---
### 执行方法
#### `execute(sql, value, callback, **kwargs)`
执行单条 SQL支持回调逐行处理结果。
##### 流程
- 调用 `runVarSQL` 执行 SQL
- 若是查询 (`qry`) 且有回调,则逐行调用 `callback(DictObject(row))`
- 若是 DML标记 `dataChanged=True`
#### `executemany(sql, values)`
批量执行 SQL适用于 `INSERT` 等)
---
### 分页与排序
#### `sortSQL(sql, NS)`
添加 `ORDER BY` 子句。
- 支持 `NS['sort']` 为字符串或列表
#### `pagingSQL(sql, paging, NS)`
构建分页 SQL需配合 `pagingSQLmodel()` 使用。
> 默认为空,需由子类重写提供具体数据库分页语法(如 MySQL 的 `LIMIT OFFSET` 或 Oracle 的 `ROWNUM`
##### 参数
- `pagename`: 页码参数名(默认 `'page'`
- `rowsname`: 每页行数(默认 `'rows'`
- `sortname`: 排序字段(可选)
##### 示例 NS
```python
{
'page': 2,
'rows': 20,
'sort': 'id desc'
}
```
#### `recordCnt(sql)`
包装 SQL 查询总记录数:
```sql
SELECT COUNT(*) rcnt FROM (your_sql) rowcount_table
```
---
### 过滤器支持
#### `filterSQL(sql, filters, NS)`
将一组条件过滤器合并进原 SQL。
每个 filter 是一个带 `${}` 的模板字符串,若其中变量未在 `NS` 中存在,则忽略此条件(替换为 `1=1`)。
最终生成:
```sql
SELECT * FROM (original_sql) filter_table WHERE f1 AND f2 ...
```
---
### Pivot 表格转换
#### `pivotSQL(tablename, rowFields, columnFields, valueFields)`
生成透视表 SQL。
##### 逻辑
1. 查询 `columnFields` 的所有唯一值作为列头
2. 使用 `CASE WHEN` 构造每列聚合
3. 使用 `SUM` 聚合数值字段
##### 示例输出
```sql
SELECT dept,
SUM(salary_0) salary_0, -- 北京
SUM(salary_1) salary_1 -- 上海
FROM (
SELECT dept,
CASE WHEN city='北京' THEN salary ELSE 0 END AS salary_0,
CASE WHEN city='上海' THEN salary ELSE 0 END AS salary_1
FROM employees
) GROUP BY dept;
```
#### `pivot(desc, ...)`
执行 pivot 查询并返回结果列表。
---
### 元数据查询方法
#### `tables()` #### `tables()`
返回数据库中所有表的列表。 返回数据库中所有表的列表。
@ -309,4 +592,4 @@ def pagingSQLmodel(self):
📝 **版本**: v1.0 📝 **版本**: v1.0
📅 **最后更新**: 2025-04-05 📅 **最后更新**: 2025-04-05
👨‍💻 **作者**: Auto-generated Documentation Tool 👨‍💻 **作者**: Auto-generated Documentation Tool

View File

@ -1,82 +0,0 @@
# python使用sqlor数据操作规范
## 安装依赖包
```
pip install apppublic
pip install sqlor
```
## 使用规范
### 包含所需的包
```
from sqlor.dbpools import DBPools
```
### 数据库操作
获得数据库连接
```
db = DBPools() 获得数据库连接
···
### 使用数据库操作上下文
使用数据库操作上下文的好处:
* 自动事务开始和结束,整体提交或整体失败
* 资源自动回收
```
async with db.sqlorContext(dbname) as sor:
....
```
### 执行数据库操作
sqlor所有的数据库操作都必须在sqlorContext函数生成的上下文中执行以提供事务一致性并自动回收资源提供两种模式的数据库操作sqlExe函数可以执行dml ddl支持多表数据操作而CRUD只针对单表提供增产改查
#### runSQL
写符合SQL标准的SQL语句数据变量用“${varname}$"作为占位符并在参数ns中给定参数
以下是一个查询用户数据的语句机构id作为参数传入
```
sql = "select * from users where where orgid = ${myorgid}$ and delflg='0'"
await sor.sqlExe(sql, {'myorgid' : '12122'})
```
返回一个数据,数组中的每一个元素为一个用户数据
以下例子返回分页数据按照给定的page不同返回不同页的数据pagerows定义每页数据量```
sql1 = "select * from stock"
dic = await sor.sqlPaging(sql, {'pagerows':50, 'page':1, 'sort':'stock_id'})
```
dic是个DictObject从dict继承有两个属性total查询结果总记录数 rowspage指定页的数据
#### CRUD
CRUD针对单表操作提供对数据表的增删改查CDU三个函数在ns参数中必须提供主键数据。
* C
向表中添加数据
```
await sor.C('stock', {
'stock_id': '111111',
...
})
```
* R
从表中读取数据,支持分页字典数据返回和不分页数组数据返回
分页例子
```
ns = {'pagerows': 50, 'page':1, 'sort':stock_id'}
dic = await sor.R('stock', ns)
```
dic是个DictObject从dict继承有两个属性total查询结果总记录数 rowspage指定页的数据
不分页例子
```
ns = {'id':'666667'} # filters dictionary, find all records which id = '666667'
recs = await sor.R('stock', ns)
```
* U
修改数据表数据以主键必须在ns中出现作为修改数据条件修改ns中其他字段的值ns中不出现的字段不会被修改
```
await sor.U('stock', {'stock_id':'111111', 'name':'new_name'})
```
* D
删除表中数据以主键必须在ns中出现作为删除数据条件从数据库中删除主键值与参数ns中主键值相同的记录
```
await sor.D('stock', {'stock_id':'111111'})
```

View File

@ -1,6 +1,6 @@
[metadata] [metadata]
name=sqlor name=sqlor
version = 2.1.2 version = 2.0.0
description = a new version of sqlor, each db's sor need to plugin to sqlor, and dbdriver now a isolated module description = a new version of sqlor, each db's sor need to plugin to sqlor, and dbdriver now a isolated module
authors = yu moqing authors = yu moqing
author_email = yumoqing@gmail.com author_email = yumoqing@gmail.com
@ -20,8 +20,3 @@ install_requires =
asyncio asyncio
jinja2 jinja2
[options.entry_points]
console_scripts =
dbpassword = sqlor.dbpassword:main
dbloader = sqlor.dbloader:main

0
sqlor/__init__.py Normal file → Executable file
View File

0
sqlor/aiopostgresqlor.py Normal file → Executable file
View File

84
sqlor/aiosqliteor.py Normal file → Executable file
View File

@ -1,85 +1,7 @@
import aiosqlite import re
from .sor import SQLor from .sqlite3or import SQLite3or
select_stmt = """ class Aiosqliteor(SQLite3or):
select
$[fields]$
from
$[tables]$
$[whereclause]$
$[groupbyclause]$
$[havingclause]$
$[orderbyclause]$
"""
class Aiosqliteor(SQLor):
@classmethod @classmethod
def isMe(self,name): def isMe(self,name):
return name=='aiosqlite' return name=='aiosqlite'
async def connect(self):
"""
dbdesc:
path: 数据库文件路径 :memory:
"""
dbdesc = self.dbdesc
self.dbpath = dbdesc.get('path', ':memory:')
self.conn = await aiosqlite.connect(self.dbpath)
self.conn.row_factory = aiosqlite.Row # 支持 dict 访问
self.dbname = self.dbpath
async def close(self):
await self.conn.close()
async def enter(self):
"""开启事务"""
self.cur = await self.conn.cursor()
async def exit(self):
"""释放 cursor"""
try:
await self.cur.close()
except:
pass
self.cur = None
def grammar(self):
return {
'select':select_stmt,
}
def placeHolder(self,varname,i):
if varname=='__mainsql__' :
return ''
return '?'
def dataConvert(self,dataList):
if type(dataList) == type({}):
return dataList
d = { i['name']:i['value'] for i in dataList }
return d
def pagingSQLmodel(self):
return u"""select * from (%s) page_s limit $[page_size]$ offset $[offset]$"""
def tablesSQL(self):
sqlcmd = """SELECT name,sql as title FROM sqlite_master WHERE type='table'"""
return sqlcmd
def fieldsSQL(self,tablename=None):
sqlcmd="""PRAGMA table_info(%s);""" % tablename
return sqlcmd
def fkSQL(self,tablename=None):
tablename = tablename.lower()
sqlcmd = """PRAGMA foreign_key_list('%s');""" % tablename
return sqlcmd
def pkSQL(self,tablename=None):
tablename = tablename.lower()
sqlcmd="""PRAGMA table_info('%s');""" % tablename
return sqlcmd
def indexesSQL(self,tablename=None):
sqlcmd = """PRAGMA index_list('%s');""" % tablename
return sqlcmd

0
sqlor/const.py Normal file → Executable file
View File

373
sqlor/crud.py Executable file
View File

@ -0,0 +1,373 @@
# -*- coding:utf8 -*-
from .dbpools import DBPools
from .const import ROWS
from .filter import DBFilter
from appPublic.objectAction import ObjectAction
from appPublic.dictObject import DictObject
from appPublic.timeUtils import date2str,time2str,str2Date
from appPublic.uniqueID import getID
toStringFuncs={
'char':None,
'str':None,
'short':str,
'long':str,
'float':str,
'date':date2str,
'time':time2str,
}
fromStringFuncs={
'short':int,
'long':int,
'float':float,
'date':str2Date,
'time':str2Date
}
class DatabaseNotfound(Exception):
def __init__(self,dbname):
Exception.__init__(self)
self.dbname = dbname
def __str__(self):
return f'{self.dbname} not found'
class CRUD(object):
def __init__(self,dbname,tablename,rows=ROWS):
self.pool = DBPools()
if dbname not in self.pool.databases.keys():
raise DatabaseNotfound(dbname)
self.dbname = dbname
self.tablename = tablename
self.rows = rows
self.primary_data = None
self.oa = ObjectAction()
async def primaryKey(self,**kw):
if self.primary_data is None:
self.primary_data = await self.pool.getTablePrimaryKey(self.dbname,
self.tablename,**kw)
return self.primary_data
async def forignKeys(self,**kw):
data = self.pool.getTableForignKeys(self.dbname,self.tablename,**kw)
return data
async def I(self,**kw):
"""
fields information
"""
@self.pool.inSqlor
async def main(dbname,NS,**kw):
pkdata = await self.primaryKey(**kw)
pks = [ i.field_name for i in pkdata ]
data = await self.pool.getTableFields(self.dbname,self.tablename,**kw)
for d in data:
if d.name in pks:
d.update({'primarykey':True})
data = self.oa.execute(self.dbname+'_'+self.tablename,'tableInfo',data)
return data
return await main(self.dbname,{},**kw)
async def fromStr(self,data):
fields = await self.pool.getTableFields(self.dbname,self.tablename)
ret = {}
for k in data:
v = None if data[k] == '' else data[k]
for f in fields:
if k == f.name:
ret[k] = v
f = fromStringFuncs.get(f.type,None)
if f is not None and v is not None:
ret[k] = f(v)
return ret
async def toStr(self,data):
fields = await self.pool.getTableFields(self.dbname,self.tablename)
ret = {}
for k in data:
for f in fields:
if k == f.name:
ret[k] = data[k]
f = toStringFuncs.get(f.type,None)
if f is not None and data[k] is not None:
ret[k] = f(data[k])
return ret
async def datagrid(self,request,targeti,**kw):
fields = await self.I()
fs = [ self.defaultIOField(f) for f in fields ]
id = self.dbname+':'+ self.tablename
pk = await self.primaryKey(**kw)
idField = pk[0]['field_name']
data = {
"tmplname":"widget_js.tmpl",
"data":{
"__ctmpl__":"datagrid",
"__target__":target,
"data":{
"name":id,
"icon-conv":"icon-table",
"title":tablename,
"url":absurl('./RP.dspy?id=%s' % id),
"deleteUrl":absurl('./D.dspy?id=%s' % id),
"addUrl":absurl('./C.dspy?id=%s' % id),
"updateUrl":absurl('./U.dspy?id=%s' % id),
"idField":idField,
"dnd":True,
"view":"scrollview",
"fields":fs,
"toolbar":{
"tools":[
{
"name":"add",
"icon":"icon-add",
"label":"add ball"
},
{
"name":"delete",
"icon":"icon-delete",
"label":"delete ball"
},
{
"name":"moveup",
"icon":"icon-up",
"label":"moveup ball"
},
{
"name":"movedown",
"icon":"icon-down",
"label":"movedown ball"
}
]
},
"options":{
"pageSize":50,
"pagination":False
}
}
}
}
data = self.oa.execute(id,'datagrid',data)
return data
def defaultIOField(self,f):
if f.type in ['str']:
return {
"primarykey":f.get('primarykey',False),
"name":f.name,
"hidden":False,
"sortable":True,
"label":f.title,
"align":"center",
"iotype":"text"
}
if f.type in ['float','short','long']:
return {
"primarykey":f.get('primarykey',False),
"sortable":True,
"name":f.name,
"hidden":False,
"label":f.title,
"align":"right",
"iotype":"text"
}
return {
"primarykey":f.get('primarykey',False),
"name":f.name,
"sortable":True,
"hidden":False,
"label":f.title,
"align":"center",
"iotype":"text"
}
async def C(self,rec,**kw):
"""
create new data
"""
@self.pool.runSQL
async def addSQL(dbname,data,**kw):
fns = kw['fns']
vfs = kw['vfs']
sqldesc={
"sql_string" : """
insert into %s (%s) values (%s)
""" % (self.tablename,fns,vfs),
}
return sqldesc
@self.pool.inSqlor
async def main(dbname,NS,**kw):
fields = await self.pool.getTableFields(self.dbname,self.tablename,**kw)
flist = [ f['name'] for f in fields ]
fns = ','.join(flist)
vfs = ','.join([ '${' + f + '}$' for f in flist ])
data = {}
[ data.update({k.lower():v}) for k,v in NS.items() ]
pk = await self.primaryKey(**kw)
k = pk[0]['field_name']
if not data.get(k):
v = getID()
data[k] = v
data = self.oa.execute(self.dbname+'_'+self.tablename,'beforeAdd',data)
kwargs = kw.copy()
kwargs['fns'] = fns
kwargs['vfs'] = vfs
await addSQL(self.dbname,data,**kwargs)
data = self.oa.execute(self.dbname+'_'+self.tablename,'afterAdd',data)
return {k:data[k]}
return await main(self.dbname,rec,**kw)
async def defaultFilter(self,NS,**kw):
fields = await self.pool.getTableFields(self.dbname,self.tablename,**kw)
d = [ '%s = ${%s}$' % (f['name'],f['name']) for f in fields if f['name'] in NS.keys() ]
if len(d) == 0:
return ''
ret = ' and ' + ' and '.join(d)
return ret
async def R(self,filters=None,NS={},**kw):
"""
retrieve data
"""
@self.pool.runSQL
async def retrieve(dbname,data,**kw):
fstr = ''
if filters is not None:
fstr = ' and '
dbf = DBFilter(filters)
fstr = fstr + dbf.genFilterString()
else:
fstr = await self.defaultFilter(NS,**kw)
sqldesc = {
"sql_string":"""select * from %s where 1=1 %s""" % (self.tablename,fstr),
}
return sqldesc
@self.pool.runSQLPaging
async def pagingdata(dbname,data,filters=None,**kw):
fstr = ""
if filters is not None:
fstr = ' and '
dbf = DBFilter(filters)
fstr = fstr + dbf.genFilterString()
else:
fstr = await self.defaultFilter(NS,**kw)
sqldesc = {
"sql_string":"""select * from %s where 1=1 %s""" % (self.tablename,fstr),
"default":{'rows':self.rows}
}
return sqldesc
@self.pool.inSqlor
async def main(dbname,NS,**kw):
p = await self.primaryKey(**kw)
if NS.get('__id') is not None:
NS[p[0]['field_name']] = NS['__id']
del NS['__id']
if NS.get('page'):
del NS['page']
if NS.get('page'):
if NS.get('sort',None) is None:
NS['sort'] = p[0]['field_name']
data = self.oa.execute(self.dbname+'_'+self.tablename,'beforeRetrieve',NS)
if NS.get('page'):
data = await pagingdata(self.dbname,data,**kw)
else:
data = await retrieve(self.dbname,data,**kw)
data = self.oa.execute(self.dbname+'_'+self.tablename,'afterRetrieve',data)
return data
return await main(self.dbname,NS,**kw)
async def U(self,data, **kw):
"""
update data
"""
@self.pool.runSQL
async def update(dbname,NS,**kw):
condi = [ i['field_name'] for i in self.primary_data ]
newData = [ i for i in NS.keys() if i not in condi ]
c = [ '%s = ${%s}$' % (i,i) for i in condi ]
u = [ '%s = ${%s}$' % (i,i) for i in newData ]
cs = ' and '.join(c)
us = ','.join(u)
sqldesc = {
"sql_string":"""update %s set %s where %s""" % (self.tablename,us,cs)
}
return sqldesc
@self.pool.inSqlor
async def main(dbname,NS,**kw):
pk = await self.primaryKey(**kw)
pkfields = [k.field_name for k in pk ]
newData = [ k for k in data if k not in pkfields ]
data = self.oa.execute(self.dbname+'_'+self.tablename,'beforeUpdate',data)
await update(self.dbname,data,**kw)
data = self.oa.execute(self.dbname+'_'+self.tablename,'afterUpdate',data)
return data
return await main(self.dbname,data,**kw)
async def D(self,data,**kw):
"""
delete data
"""
@self.pool.runSQL
def delete(dbname,data,**kw):
pnames = [ i['field_name'] for i in self.primary_data ]
c = [ '%s = ${%s}$' % (i,i) for i in pnames ]
cs = ' and '.join(c)
sqldesc = {
"sql_string":"delete from %s where %s" % (self.tablename,cs)
}
return sqldesc
@self.pool.inSqlor
async def main(dbname,NS,**kw):
data = self.oa.execute(self.dbname+'_'+self.tablename,'beforeDelete',data)
await delete(self.dbname,data,pkfields,**kw)
data = self.oa.execute(self.dbname+'_'+self.tablename,'afterDelete',data)
return data
return await main(self.dbname,data,**kw)
if __name__ == '__main__':
DBPools({
"ambi":{
"driver":"pymssql",
"coding":"utf-8",
"dbname":"ambi",
"kwargs":{
"user":"ymq",
"database":"ambi",
"password":"ymq123",
"host":"localhost"
}
},
"metadb":{
"driver":"pymssql",
"coding":"utf-8",
"dbname":"metadb",
"kwargs":{
"user":"ymq",
"database":"metadb",
"password":"ymq123",
"host":"localhost"
}
}
})
crud = CRUD('ambi')
#fields = crud.I('cashflow')
#for i in fields:
# print(i)
data = crud.RP('cashflow')
print(data.total)
for i in data.rows:
print(i.balance,i.asid)

View File

@ -1,50 +0,0 @@
import os
import sys
import asyncio
from appPublic.worker import get_event_loop
from appPublic.jsonConfig import getConfig
from appPublic.worker import AsyncWorker
from sqlor.dbpools import DBPools
from appPublic.dictObject import DictObject
from xls2ddl.xlsxData import XLSXData
def chunked(lst, n):
for i in range(0, len(lst), n):
yield lst[i:i+n]
async def load_tabledata(dbname, tblname, data):
db = DBPools()
async with db.sqlorContext(dbname) as sor:
for r in data:
try:
await sor.D(tblname, {'id': r.id})
except:
pass
await sor.C(tblname, r.copy())
async def load_data():
if len(sys.argv) < 4:
print(f'{sys.argv[0]} server_path dbname datafile')
return 1
runpath = sys.argv[1]
dbname = sys.argv[2]
datafile = sys.argv[3]
config = getConfig(runpath)
db = DBPools(config.databases)
xlsx = XLSXData(datafile)
worker = AsyncWorker(maxtask=100)
tasks = []
for i,s in enumerate(xlsx.book.worksheets):
tblname = xlsx.book.sheetnames[i]
dic = xlsx.readRecords(tblname, s)
for chunk in chunked(dic[tblname], 100):
t = asyncio.create_task(load_tabledata(dbname, tblname, chunk))
tasks.append(t)
await asyncio.wait(tasks)
return 0
def main():
get_event_loop().run_until_complete(load_data())
if __name__ == '__main__':
main()

View File

@ -1,17 +0,0 @@
import os, sys
from appPublic.aes import aes_encode_b64
from appPublic.jsonConfig import getConfig
def main():
if len(sys.argv) < 3:
print(f'{sys.argv[0]} server_path dbuser_password')
sys.exit(1)
runpath = sys.argv[1]
password = sys.argv[2]
config = getConfig(runpath)
cyber = aes_encode_b64(config.password_key, sys.argv[2])
print(f'{password} encoded is {cyber}')
sys.exit(0)
if __name__ == '__main__':
main()

0
sqlor/dbpools.old.py Normal file → Executable file
View File

85
sqlor/dbpools.py Normal file → Executable file
View File

@ -6,7 +6,6 @@ import codecs
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from appPublic.worker import get_event_loop
from appPublic.myImport import myImport from appPublic.myImport import myImport
from appPublic.dictObject import DictObject from appPublic.dictObject import DictObject
from appPublic.Singleton import SingletonDecorator from appPublic.Singleton import SingletonDecorator
@ -14,7 +13,6 @@ from appPublic.myjson import loadf
from appPublic.jsonConfig import getConfig from appPublic.jsonConfig import getConfig
from appPublic.rc4 import unpassword from appPublic.rc4 import unpassword
from appPublic.log import exception from appPublic.log import exception
from appPublic.event_dispatcher import EventDispatcher
import threading import threading
from .sor import SQLor from .sor import SQLor
@ -57,58 +55,25 @@ class SqlorPool:
self.sqlors.append(x) self.sqlors.append(x)
return x return x
async def _del_sqlor(self, sor):
try:
await sor.exit()
except:
pass
try:
await sor.close()
except:
pass
async def test_sqlor(self, sor):
try:
await sor.enter()
await sor.execute(sor.test_sqlstr, {})
await sor.exit()
return True
except:
await sor.exit()
return False
@asynccontextmanager @asynccontextmanager
async def context(self): async def context(self):
async with self.sema: async with self.sema:
sqlors = [s for s in self.sqlors]
yielded_sqlor = None yielded_sqlor = None
for s in sqlors: for s in self.sqlors:
if not s.used: if not s.used:
flg = await self.test_sqlor(s.sqlor) yielded_sqlor = s
if flg:
yielded_sqlor = s
else:
await self._del_sqlor(s.sqlor)
self.sqlors = [ x for x in self.sqlors if x != s ]
if not yielded_sqlor: if not yielded_sqlor:
yielded_sqlor = await self._new_sqlor() yielded_sqlor = await self._new_sqlor()
yielded_sqlor.used = True yielded_sqlor.used = True
yielded_sqlor.use_at = time.time() yielded_sqlor.use_at = time.time()
try: yield yielded_sqlor.sqlor
yield yielded_sqlor.sqlor yielded_sqlor.used = False
yielded_sqlor.used = False
return
except Exception as e:
yielded_sqlor.used = False
raise e
@SingletonDecorator @SingletonDecorator
class DBPools(EventDispatcher): class DBPools:
def __init__(self,databases={},max_connect=100,loop=None): def __init__(self,databases={},max_connect=100,loop=None):
if loop is None: if loop is None:
loop = get_event_loop() loop = asyncio.get_event_loop()
super().__init__()
self.loop = loop self.loop = loop
self.max_connect = max_connect self.max_connect = max_connect
self._pools = {} self._pools = {}
@ -139,36 +104,18 @@ class DBPools(EventDispatcher):
self._pools[name] = pool self._pools[name] = pool
self.e_except = None self.e_except = None
sqlor = None sqlor = None
async with pool.context() as sqlor: try:
try: async with pool.context() as sqlor:
sqlor.dbpools = self
await sqlor.enter()
yield sqlor yield sqlor
if sqlor and sqlor.dataChanged: if sqlor and sqlor.dataChanged:
await sqlor.commit() await sqlor.commit()
await sqlor.exit() except Exception as e:
except Exception as e: self.e_except = e
self.e_except = e cb = format_exc()
cb = format_exc() exception(f'sqlorContext():EXCEPTION{e}, {cb}')
exception(f'sqlorContext():EXCEPTION{e}, {cb}') if sqlor and sqlor.dataChanged:
try: await sqlor.rollback()
await sqlor.rollback()
except:
pass
await sqlor.exit()
raise e
def get_exception(self): def get_exception(self):
return self.e_except return self.e_except
@asynccontextmanager
async def get_sor_context(env, modulename, errorback=None):
db = DBPools()
dbname = env.get_module_dbname(modulename)
async with db.sqlorContext(dbname) as sor:
yield sor
if db.e_except:
if errorback:
await errorback(db.e_except)
else:
raise db.e_except

7
sqlor/ddl_template_mysql.py Normal file → Executable file
View File

@ -18,7 +18,7 @@ datetime
{%- elif type=='timestamp' -%} {%- elif type=='timestamp' -%}
TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIMESTAMP DEFAULT CURRENT_TIMESTAMP
{%- elif type=='text' -%} {%- elif type=='text' -%}
longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci longtext
{%- elif type=='bin' -%} {%- elif type=='bin' -%}
longblob longblob
{%- else -%} {%- else -%}
@ -38,8 +38,6 @@ NOT NULL
{% macro primary() %} {% macro primary() %}
,primary key({{','.join(summary[0].primary)}}) ,primary key({{','.join(summary[0].primary)}})
{% endmacro %} {% endmacro %}
-- 建库时请用以下语句支持emoji字符
-- CREATE DATABASE mydb CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
drop table if exists {{summary[0].name}}; drop table if exists {{summary[0].name}};
CREATE TABLE {{summary[0].name}} CREATE TABLE {{summary[0].name}}
( (
@ -50,9 +48,8 @@ CREATE TABLE {{summary[0].name}}
{{primary()}} {{primary()}}
{% endif %} {% endif %}
) )
CHARACTER SET utf8mb4
COLLATE utf8mb4_unicode_ci
engine=innodb engine=innodb
default charset=utf8
{% if summary[0].title %}comment '{{summary[0].title}}'{% endif %} {% if summary[0].title %}comment '{{summary[0].title}}'{% endif %}
; ;
{% for v in indexes %} {% for v in indexes %}

0
sqlor/ddl_template_oracle.py Normal file → Executable file
View File

0
sqlor/ddl_template_postgresql.py Normal file → Executable file
View File

0
sqlor/ddl_template_sqlite3.py Normal file → Executable file
View File

0
sqlor/ddl_template_sqlserver.py Normal file → Executable file
View File

0
sqlor/filter.py Normal file → Executable file
View File

53
sqlor/mssqlor.py Normal file → Executable file
View File

@ -2,17 +2,6 @@
from .sor import SQLor from .sor import SQLor
from .ddl_template_sqlserver import sqlserver_ddl_tmpl from .ddl_template_sqlserver import sqlserver_ddl_tmpl
select_stmt = """
select
$[fields]$
from
$[tables]$
$[whereclause]$
$[groupbyclause]$
$[havingclause]$
$[orderbyclause]$
"""
class MsSqlor(SQLor): class MsSqlor(SQLor):
ddl_template = sqlserver_ddl_tmpl ddl_template = sqlserver_ddl_tmpl
db2modelTypeMapping = { db2modelTypeMapping = {
@ -56,7 +45,7 @@ class MsSqlor(SQLor):
@classmethod @classmethod
def isMe(self,name): def isMe(self,name):
return name=='pymssql' return name=='pymssql'
def grammar(self): def grammar(self):
return { return {
'select':select_stmt, 'select':select_stmt,
@ -66,7 +55,7 @@ class MsSqlor(SQLor):
if varname=='__mainsql__' : if varname=='__mainsql__' :
return '' return ''
return '%s' return '%s'
def dataConvert(self,dataList): def dataConvert(self,dataList):
if type(dataList) == type({}): if type(dataList) == type({}):
d = [ i for i in dataList.values()] d = [ i for i in dataList.values()]
@ -97,9 +86,9 @@ where _row_id >= $[from_line]$ and _row_id < $[end_line]$"""
,length = Columnproperty(a.id,a.name,'PRECISION') ,length = Columnproperty(a.id,a.name,'PRECISION')
,dec = Isnull(Columnproperty(a.id,a.name,'Scale'),null) ,dec = Isnull(Columnproperty(a.id,a.name,'Scale'),null)
,nullable = CASE ,nullable = CASE
WHEN a.isnullable = 1 THEN 'yes' WHEN a.isnullable = 1 THEN 'yes'
ELSE 'no' ELSE 'no'
END END
,title = lower(cast(Isnull(g.[value],a.name) as nvarchar) ) ,title = lower(cast(Isnull(g.[value],a.name) as nvarchar) )
,table_name = lower(d.name) ,table_name = lower(d.name)
FROM syscolumns a FROM syscolumns a
@ -109,7 +98,7 @@ where _row_id >= $[from_line]$ and _row_id < $[end_line]$"""
ON (a.id = d.id) ON (a.id = d.id)
AND (d.xtype = 'U') AND (d.xtype = 'U')
AND (d.name <> 'dtproperties') AND (d.name <> 'dtproperties')
INNER JOIN sys.all_objects c INNER JOIN sys.all_objects c
ON d.id=c.object_id ON d.id=c.object_id
AND schema_name(schema_id)='dbo' AND schema_name(schema_id)='dbo'
LEFT JOIN sys.extended_properties g LEFT JOIN sys.extended_properties g
@ -183,33 +172,3 @@ AND IDXC.Column_id=C.Column_id"""
if tablename is not None: if tablename is not None:
sqlcmd = sqlcmd + """ where lower(O.name)='%s'""" % tablename.lower() sqlcmd = sqlcmd + """ where lower(O.name)='%s'""" % tablename.lower()
return sqlcmd return sqlcmd
async def connect(self):
"""
Note: pymssql is synchronous. For async support, consider using aioodbc.
This implementation uses threading for now.
"""
import asyncio
import pymssql
dbdesc = self.dbdesc
# pymssql uses different parameter names
conn_params = {
'server': dbdesc.get('host', 'localhost'),
'user': dbdesc.get('user'),
'password': dbdesc.get('password'),
'database': dbdesc.get('db'),
'port': dbdesc.get('port', 1433)
}
# Remove None values
conn_params = {k: v for k, v in conn_params.items() if v is not None}
# Use thread pool for synchronous pymssql
loop = asyncio.get_event_loop()
self.conn = await loop.run_in_executor(None, pymssql.connect, **conn_params)
self.dbname = dbdesc.get('db', '')
async def close(self):
import asyncio
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self.conn.close)

14
sqlor/mysqlor.py Normal file → Executable file
View File

@ -49,7 +49,7 @@ class MySqlor(SQLor):
} }
@classmethod @classmethod
def isMe(self,name): def isMe(self,name):
if name in ['mysql', 'aiosqlor', 'tidb']: if name=='mysql':
return True return True
return False return False
@ -152,20 +152,10 @@ WHERE
""" """
dbdesc = self.dbdesc dbdesc = self.dbdesc
self.conn = await aiomysql.connect(**dbdesc) self.conn = await aiomysql.connect(**dbdesc)
self.cur = await self.conn.cursor()
self.dbname = dbdesc.get('db') self.dbname = dbdesc.get('db')
async def close(self): async def close(self):
await self.cursor.close() await self.cursor.close()
await self.conn.close() await self.conn.close()
async def enter(self):
self.cur = await self.conn.cursor()
async def exit(self):
try:
await self.cur.fetchall()
await self.cur.close()
except:
pass
self.cur = None

50
sqlor/oracleor.py Normal file → Executable file
View File

@ -1,17 +1,5 @@
from .sor import SQLor from .sor import SQLor
from .ddl_template_oracle import oracle_ddl_tmpl from .ddl_template_oracle import oracle_ddl_tmpl
select_stmt = """
select
$[fields]$
from
$[tables]$
$[whereclause]$
$[groupbyclause]$
$[havingclause]$
$[orderbyclause]$
"""
class Oracleor(SQLor): class Oracleor(SQLor):
ddl_template = oracle_ddl_tmpl ddl_template = oracle_ddl_tmpl
db2modelTypeMapping = { db2modelTypeMapping = {
@ -61,7 +49,7 @@ class Oracleor(SQLor):
if varname=='__mainsql__' : if varname=='__mainsql__' :
return '' return ''
return ':%s' % varname return ':%s' % varname
def dataConvert(self,dataList): def dataConvert(self,dataList):
if type(dataList) == type({}): if type(dataList) == type({}):
return dataList return dataList
@ -78,9 +66,6 @@ from (
) )
where row_id >=$[from_line]$ and row_id < $[end_line]$""" where row_id >=$[from_line]$ and row_id < $[end_line]$"""
def test_sqlstr(self):
return "select 1 from dual"
def tablesSQL(self): def tablesSQL(self):
sqlcmd = """select sqlcmd = """select
lower(table_name) as name, lower(table_name) as name,
@ -100,7 +85,7 @@ from USER_TAB_COMMENTS where table_type = 'TABLE'"""
if tablename is not None: if tablename is not None:
sqlcmd = sqlcmd + """ where lower(utc.table_name) = '%s'""" % tablename.lower() sqlcmd = sqlcmd + """ where lower(utc.table_name) = '%s'""" % tablename.lower()
return sqlcmd return sqlcmd
def fkSQL(self,tablename=None): def fkSQL(self,tablename=None):
tablename = tablename.lower() tablename = tablename.lower()
sqlcmd = """select sqlcmd = """select
@ -118,7 +103,7 @@ where
if tablename is not None: if tablename is not None:
sqlcmd = sqlcmd + """ and lower(uc.table_name)='%s'""" % tablename.lower() sqlcmd = sqlcmd + """ and lower(uc.table_name)='%s'""" % tablename.lower()
return sqlcmd return sqlcmd
def pkSQL(self,tablename=None): def pkSQL(self,tablename=None):
sqlcmd = """ sqlcmd = """
select select
@ -142,33 +127,4 @@ where a.index_name = b.index_name"""
if tablename is not None: if tablename is not None:
sqlcmd += """ and lower(a.table_name) = lower('%s')""" % tablename.lower() sqlcmd += """ and lower(a.table_name) = lower('%s')""" % tablename.lower()
return sqlcmd return sqlcmd
async def connect(self):
"""
Note: cx_Oracle is synchronous. Using thread pool for async support.
"""
import asyncio
import cx_Oracle
dbdesc = self.dbdesc
# cx_Oracle connection string format
dsn = cx_Oracle.makedsn(
host=dbdesc.get('host', 'localhost'),
port=dbdesc.get('port', 1521),
service_name=dbdesc.get('service_name', dbdesc.get('sid', 'XE'))
)
loop = asyncio.get_event_loop()
self.conn = await loop.run_in_executor(
None,
cx_Oracle.connect,
dbdesc.get('user'),
dbdesc.get('password'),
dsn
)
self.dbname = dbdesc.get('service_name', dbdesc.get('sid', ''))
async def close(self):
import asyncio
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self.conn.close)

View File

@ -1,184 +0,0 @@
# -*- coding:utf8 -*-
import asyncpg
from appPublic.argsConvert import ArgsConvert, ConditionConvert
from .sor import SQLor
from .ddl_template_pgsql import pgsql_ddl_tmpl # 需要提供 PostgreSQL DDL 模板
class PgSqlor(SQLor):
ddl_template = pgsql_ddl_tmpl
db2modelTypeMapping = {
'smallint': 'short',
'integer': 'long',
'bigint': 'long',
'decimal': 'float',
'numeric': 'float',
'real': 'float',
'double precision': 'float',
'serial': 'long',
'bigserial': 'long',
'varchar': 'str',
'char': 'char',
'text': 'text',
'bytea': 'bin',
'date': 'date',
'time': 'time',
'timestamp': 'datetime',
'timestamptz': 'datestamp',
'boolean': 'short',
'json': 'text',
'jsonb': 'text',
}
model2dbTypemapping = {
'date': 'date',
'time': 'time',
'timestamp': 'timestamp',
'str': 'varchar',
'char': 'char',
'short': 'smallint',
'long': 'bigint',
'float': 'double precision',
'text': 'text',
'bin': 'bytea',
'file': 'bytea',
}
@classmethod
def isMe(cls, name):
return name.lower() in ['pgsql', 'postgres', 'postgresql']
def grammar(self):
return {
# PostgreSQL 支持 window/CTE 等,可在这里扩展
'select': 'select',
}
def placeHolder(self, varname, pos=None):
"""PostgreSQL 使用 $1, $2 作为占位符"""
if varname == '__mainsql__':
return ''
if pos is not None:
return f"${pos + 1}"
return '$1'
def dataConvert(self, dataList):
if isinstance(dataList, dict):
return tuple(dataList.values())
else:
return tuple(i['value'] for i in dataList)
def pagingSQLmodel(self):
"""分页模板"""
return """select * from (%s) as A order by $[sort]$ offset $[from_line]$ limit $[rows]$"""
def tablesSQL(self):
sqlcmd = f"""
SELECT
lower(tablename) as name,
obj_description(format('%s.%s', schemaname, tablename)::regclass) as title
FROM pg_tables
WHERE schemaname NOT IN ('pg_catalog', 'information_schema');
"""
return sqlcmd
def fieldsSQL(self, tablename=None):
sqlcmd = f"""
SELECT
lower(a.attname) as name,
format_type(a.atttypid, a.atttypmod) as type,
CASE
WHEN a.atttypmod > 0 THEN a.atttypmod - 4
ELSE NULL
END as length,
NULL as "dec",
CASE WHEN a.attnotnull THEN 'no' ELSE 'yes' END as nullable,
col_description(a.attrelid, a.attnum) as title,
lower(c.relname) as table_name
FROM pg_attribute a
JOIN pg_class c ON a.attrelid = c.oid
JOIN pg_namespace n ON c.relnamespace = n.oid
WHERE a.attnum > 0 AND NOT a.attisdropped
AND n.nspname NOT IN ('pg_catalog', 'information_schema')
"""
if tablename:
sqlcmd += f" AND lower(c.relname) = '{tablename.lower()}'"
return sqlcmd
def fkSQL(self, tablename=None):
sqlcmd = f"""
SELECT
con.conname AS constraint_name,
nsp.nspname AS schema_name,
rel.relname AS child_table,
att.attname AS child_column,
frel.relname AS parent_table,
fatt.attname AS parent_column,
con.confupdtype AS update_rule,
con.confdeltype AS delete_rule
FROM pg_constraint con
JOIN pg_class rel ON rel.oid = con.conrelid
JOIN pg_class frel ON frel.oid = con.confrelid
JOIN pg_attribute att ON att.attrelid = con.conrelid AND att.attnum = ANY(con.conkey)
JOIN pg_attribute fatt ON fatt.attrelid = con.confrelid AND fatt.attnum = ANY(con.confkey)
JOIN pg_namespace nsp ON nsp.oid = con.connamespace
WHERE con.contype = 'f'
"""
if tablename:
sqlcmd += f" AND lower(rel.relname) = '{tablename.lower()}'"
return sqlcmd
def pkSQL(self, tablename=None):
sqlcmd = f"""
SELECT a.attname as name
FROM pg_index i
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
WHERE i.indrelid = '{tablename.lower()}'::regclass
AND i.indisprimary;
"""
return sqlcmd
def indexesSQL(self, tablename=None):
sqlcmd = f"""
SELECT
lower(i.relname) as index_name,
CASE WHEN ix.indisunique THEN 'unique' ELSE '' END as is_unique,
lower(a.attname) as column_name
FROM pg_class t,
pg_class i,
pg_index ix,
pg_attribute a
WHERE
t.oid = ix.indrelid
AND i.oid = ix.indexrelid
AND a.attrelid = t.oid
AND a.attnum = ANY(ix.indkey)
AND t.relkind = 'r'
"""
if tablename:
sqlcmd += f" AND lower(t.relname) = '{tablename.lower()}'"
return sqlcmd
async def connect(self):
"""建立数据库连接"""
dbdesc = self.dbdesc
self.conn = await asyncpg.connect(
user=dbdesc.get('user'),
password=dbdesc.get('password'),
database=dbdesc.get('db'),
host=dbdesc.get('host', 'localhost'),
port=dbdesc.get('port', 5432),
)
self.dbname = dbdesc.get('db')
async def close(self):
await self.conn.close()
async def enter(self):
"""开启事务或获取游标asyncpg 无游标,直接用连接执行)"""
self.cur = self.conn # 保留接口一致性
async def exit(self):
"""释放资源"""
self.cur = None

25
sqlor/postgresqlor.py Normal file → Executable file
View File

@ -170,7 +170,7 @@ order by
i.relname""" % tablename.lower() i.relname""" % tablename.lower()
return sqlcmd return sqlcmd
async def connect(self): async def connect():
""" """
kwargs: kwargs:
dbname: dbname:
@ -179,22 +179,13 @@ order by
host: host:
port: port:
""" """
dbdesc = self.dbdesc kwargs = self.dbdesc
self.conn = await aiopg.connect(**dbdesc) dns = ' '.join([f'{k}={v}' for k, v in kwargs.items()])
self.dbname = dbdesc.get('dbname', '').lower() self.conn = await self.connect(dns)
self.cur = await self.conn.cursor()
async def close(self): self.dbname = kwargs.dbname.lower()
async def close():
await self.cur.close() await self.cur.close()
await self.conn.close() await self.conn.close()
async def enter(self):
self.cur = await self.conn.cursor()
async def exit(self):
try:
await self.cur.fetchall()
await self.cur.close()
except:
pass
self.cur = None

0
sqlor/records.py Normal file → Executable file
View File

35
sqlor/runsql.py Executable file
View File

@ -0,0 +1,35 @@
#!/usr/bin/python3
import sys
import codecs
from sqlor.dbpools import runSQL
import asyncio
def appinit():
if len(sys.argv) < 4:
print(f'usage:\n {sys.argv[0]} path dbname sqlfile [k=v ...] \n')
sys.exit(1)
p = ProgramPath()
if len(sys.argv) > 1:
p = sys.argv[1]
config = getConfig(p)
DBPools(config.databases)
async def run(ns):
with codecs.open(sys.argv[3], 'r', 'utf-8') as f:
sql = f.read()
await runSQL(sys.argv[2], sql, ns)
if __name__ == '__main__':
ns = {}
for x in sys.argv[3:]:
try:
k,v = x.split('=')
ns.update({k:v})
except Exception as e:
print(x, 'key-value pair expected')
print(e)
appinit()
loop = asyncio.get_event_loop()
loop.run_until_complete(run(ns))

61
sqlor/sor.py Normal file → Executable file
View File

@ -72,9 +72,6 @@ def readsql(fn):
return b return b
class SQLor(object): class SQLor(object):
"""
self.dbpools was set by DBPools
"""
def __init__(self,dbdesc=None,sqltp = '$[',sqlts = ']$',sqlvp = '${',sqlvs = '}$'): def __init__(self,dbdesc=None,sqltp = '$[',sqlts = ']$',sqlvp = '${',sqlvs = '}$'):
self.conn = None self.conn = None
self.cur = None self.cur = None
@ -92,20 +89,11 @@ class SQLor(object):
self.dataChanged = False self.dataChanged = False
self.metadatas={} self.metadatas={}
async def enter(self):
pass
async def exit(self):
pass
def unpassword(self): def unpassword(self):
if self.dbdesc.password: if self.dbdesc.password:
key=getConfig().password_key key=getConfig().password_key
self.dbdesc.password = aes_decode_b64(key, self.dbdesc.password) self.dbdesc.password = aes_decode_b64(key, self.dbdesc.password)
def test_sqlstr(self):
return "select 1"
async def get_schema(self): async def get_schema(self):
def concat_idx_info(idxs): def concat_idx_info(idxs):
x = [] x = []
@ -201,9 +189,9 @@ class SQLor(object):
""" """
default it not support paging default it not support paging
""" """
page = int(NS.get('page', 1)) page = int(NS.get(paging['pagename'],1))
rows = int(NS.get('pagerows', 80)) rows = int(NS.get(paging['rowsname'],80))
sort = NS.get('sort', 'id') sort = NS.get(paging.get('sortname','sort'),None)
if isinstance(sort, list): if isinstance(sort, list):
sort = ','.join(sort) sort = ','.join(sort)
if not sort: if not sort:
@ -309,10 +297,8 @@ class SQLor(object):
dml change the database data dml change the database data
qry query data qry query data
""" """
a = sql.lstrip(' \t\n\r') a = sql.lstrip(' \t\n\r')
a = ''.join(a.split('\r'))
a = ' '.join(a.split('\n'))
a = ' '.join(a.split('\t'))
a = a.lower() a = a.lower()
al = a.split(' ') al = a.split(' ')
if al[0] == 'select': if al[0] == 'select':
@ -462,8 +448,10 @@ class SQLor(object):
if 'page' in ns.keys(): if 'page' in ns.keys():
cnt = await self.record_count(sql, ns) cnt = await self.record_count(sql, ns)
rows = await self.pagingdata(sql, ns) rows = await self.pagingdata(sql, ns)
d = DictObject(total=cnt, rows=rows) return {
return d 'total': cnt,
'rows': rows
}
ret = [] ret = []
async for r in self._get_data(sql, ns): async for r in self._get_data(sql, ns):
ret.append(r) ret.append(r)
@ -476,8 +464,10 @@ class SQLor(object):
total = await self.record_count(sql,ns) total = await self.record_count(sql,ns)
rows = await self.pagingdata(sql,ns) rows = await self.pagingdata(sql,ns)
d = DictObject(total=total, rows=rows) return {
return d 'total':total,
'rows':rows
}
async def tables(self): async def tables(self):
sqlstring = self.tablesSQL() sqlstring = self.tablesSQL()
@ -604,14 +594,11 @@ class SQLor(object):
sql = 'insert into %s.%s (%s) values (%s)' % (self.dbname, tablename,fns,vfns) sql = 'insert into %s.%s (%s) values (%s)' % (self.dbname, tablename,fns,vfns)
rf = RegisterFunction() rf = RegisterFunction()
rfname = f'{self.dbname}:{tablename}:c:before' rfname = f'{self.dbname}:{tablename}:c:before'
await self.dbpools.dispatch(rfname, ns)
ret = await rf.exe(rfname, ns) ret = await rf.exe(rfname, ns)
if isinstance(ret, dict): if isinstance(ret, dict):
ns.update(ret) ns.update(ret)
r = await self.execute(sql,ns.copy()) r = await self.execute(sql,ns.copy())
rfname = f'{self.dbname}:{tablename}:c:after' await rf.exe(f'{self.dbname}:{tablename}:c:after', ns)
await self.dbpools.dispatch(rfname, ns)
await rf.exe(rfname, ns)
return r return r
async def R(self,tablename,ns,filters=None): async def R(self,tablename,ns,filters=None):
@ -634,7 +621,10 @@ class SQLor(object):
ns['sort'] = desc['summary'][0]['primary'][0] ns['sort'] = desc['summary'][0]['primary'][0]
total = await self.record_count(sql, ns) total = await self.record_count(sql, ns)
rows = await self.pagingdata(sql,ns) rows = await self.pagingdata(sql,ns)
return DictObject(total=total, rows=rows) return {
'total':total,
'rows':rows
}
else: else:
if ns.get('sort'): if ns.get('sort'):
sql = self.sortSQL(sql, ns) sql = self.sortSQL(sql, ns)
@ -652,16 +642,11 @@ class SQLor(object):
sql = 'update %s.%s set %s where %s' % (self.dbname, tablename, sql = 'update %s.%s set %s where %s' % (self.dbname, tablename,
u_str,c_str) u_str,c_str)
rf = RegisterFunction() rf = RegisterFunction()
rfname = f'{self.dbname}:{tablename}:u:before' ret = await rf.exe(f'{self.dbname}:{tablename}:u:before',ns)
await self.dbpools.dispatch(rfname, ns)
ret = await rf.exe(rfname, ns)
if isinstance(ret, dict): if isinstance(ret, dict):
ns.update(ret) ns.update(ret)
r = await self.execute(sql, ns.copy()) r = await self.execute(sql, ns.copy())
rfname = f'{self.dbname}:{tablename}:u:after' await rf.exe(f'{self.dbname}:{tablename}:u:after',ns)
debug(f'Event({rfname}) dispatching with data:{ns}...')
await self.dbpools.dispatch(rfname, ns)
await rf.exe(rfname, ns)
return r return r
async def D(self,tablename, ns): async def D(self,tablename, ns):
@ -672,15 +657,11 @@ class SQLor(object):
c_str = ' and '.join(c) c_str = ' and '.join(c)
sql = 'delete from %s.%s where %s' % (self.dbname, tablename,c_str) sql = 'delete from %s.%s where %s' % (self.dbname, tablename,c_str)
rf = RegisterFunction() rf = RegisterFunction()
rfname = f'{self.dbname}:{tablename}:d:before' ret = await rf.exe(f'{self.dbname}:{tablename}:d:before', ns)
await self.dbpools.dispatch(rfname, ns)
ret = await rf.exe(rfname, ns)
if isinstance(ret, dict): if isinstance(ret, dict):
ns.update(ret) ns.update(ret)
r = await self.execute(sql, ns) r = await self.execute(sql, ns)
rfname = f'{self.dbname}:{tablename}:d:after' ns = await rf.exe(f'{self.dbname}:{tablename}:d:after', ns)
await self.dbpools.dispatch(rfname, ns)
ns = await rf.exe(rfname, ns)
return r return r
async def connect(self): async def connect(self):

0
sqlor/sqlite3or.py Normal file → Executable file
View File

View File

@ -1,145 +0,0 @@
# -*- coding:utf8 -*-
import aiosqlite
from appPublic.argsConvert import ArgsConvert, ConditionConvert
from .sor import SQLor
from .ddl_template_sqlite import sqlite_ddl_tmpl
class SQLiteor(SQLor):
ddl_template = sqlite_ddl_tmpl
db2modelTypeMapping = {
'integer': 'long',
'int': 'long',
'tinyint': 'short',
'smallint': 'short',
'mediumint': 'long',
'bigint': 'long',
'decimal': 'float',
'numeric': 'float',
'real': 'float',
'double': 'float',
'float': 'float',
'char': 'char',
'varchar': 'str',
'text': 'text',
'clob': 'text',
'blob': 'bin',
'date': 'date',
'datetime': 'datetime',
'boolean': 'short',
}
model2dbTypemapping = {
'date': 'date',
'time': 'time',
'timestamp': 'datetime',
'str': 'text',
'char': 'char',
'short': 'integer',
'long': 'integer',
'float': 'real',
'text': 'text',
'bin': 'blob',
'file': 'blob',
}
@classmethod
def isMe(cls, name):
return name.lower() in ['sqlite', 'sqlite3']
def grammar(self):
return {
'select': 'select',
}
def placeHolder(self, varname, pos=None):
"""SQLite 使用 ? 占位符"""
if varname == '__mainsql__':
return ''
return '?'
def dataConvert(self, dataList):
if isinstance(dataList, dict):
return tuple(dataList.values())
else:
return tuple(i['value'] for i in dataList)
def pagingSQLmodel(self):
"""分页 SQL 模板"""
return """select * from (%s) as A order by $[sort]$ limit $[rows]$ offset $[from_line]$"""
def tablesSQL(self):
"""获取表名和备注"""
sqlcmd = """
SELECT
lower(name) AS name,
'' AS title
FROM sqlite_master
WHERE type='table'
AND name NOT LIKE 'sqlite_%';
"""
return sqlcmd
def fieldsSQL(self, tablename=None):
"""获取字段信息"""
sqlcmd = f"""
PRAGMA table_info({tablename});
"""
# SQLite 无信息架构表,直接返回 PRAGMA 命令
return sqlcmd
def fkSQL(self, tablename=None):
"""获取外键信息"""
if tablename:
sqlcmd = f"PRAGMA foreign_key_list({tablename});"
else:
sqlcmd = "-- SQLite 需逐表获取外键信息"
return sqlcmd
def pkSQL(self, tablename=None):
"""获取主键信息"""
sqlcmd = f"""
SELECT name
FROM pragma_table_info('{tablename}')
WHERE pk != 0;
"""
return sqlcmd
def indexesSQL(self, tablename=None):
"""获取索引信息"""
if not tablename:
return "SELECT name as index_name, '' as is_unique, '' as column_name FROM sqlite_master WHERE type='index';"
sqlcmd = f"""
PRAGMA index_list('{tablename}');
"""
return sqlcmd
async def connect(self):
"""
连接 SQLite 数据库
dbdesc:
path: 数据库文件路径 :memory:
"""
dbdesc = self.dbdesc
self.dbpath = dbdesc.get('path', ':memory:')
self.conn = await aiosqlite.connect(self.dbpath)
self.conn.row_factory = aiosqlite.Row # 支持 dict 访问
self.dbname = self.dbpath
async def close(self):
await self.conn.close()
async def enter(self):
"""开启事务"""
self.cur = await self.conn.cursor()
async def exit(self):
"""释放 cursor"""
try:
await self.cur.close()
except:
pass
self.cur = None

View File

@ -4,17 +4,6 @@ from .sor import SQLor
from .const import ROWS from .const import ROWS
from .ddl_template_mysql import mysql_ddl_tmpl # ✅ 直接复用 MySQL 模板 from .ddl_template_mysql import mysql_ddl_tmpl # ✅ 直接复用 MySQL 模板
select_stmt = """
select
$[fields]$
from
$[tables]$
$[whereclause]$
$[groupbyclause]$
$[havingclause]$
$[orderbyclause]$
"""
class TiDBor(SQLor): class TiDBor(SQLor):
ddl_template = mysql_ddl_tmpl ddl_template = mysql_ddl_tmpl
@ -61,7 +50,7 @@ class TiDBor(SQLor):
def grammar(self): def grammar(self):
return { return {
'select': select_stmt, 'select': 'select * from {table} where {condition}',
} }
def placeHolder(self, varname, pos=None): def placeHolder(self, varname, pos=None):
@ -82,14 +71,14 @@ class TiDBor(SQLor):
def tablesSQL(self): def tablesSQL(self):
# ✅ TiDB 支持 INFORMATION_SCHEMA.TABLES # ✅ TiDB 支持 INFORMATION_SCHEMA.TABLES
dbname = self.dbdesc.get('db', 'unknown') # MySQL uses 'db', not 'dbname' dbname = self.dbdesc.get('dbname', 'unknown')
sqlcmd = f"""SELECT lower(TABLE_NAME) AS name, TABLE_COMMENT AS title sqlcmd = f"""SELECT lower(TABLE_NAME) AS name, TABLE_COMMENT AS title
FROM INFORMATION_SCHEMA.TABLES FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = '{dbname}'""" WHERE TABLE_SCHEMA = '{dbname}'"""
return sqlcmd return sqlcmd
def fieldsSQL(self, tablename=None): def fieldsSQL(self, tablename=None):
dbname = self.dbdesc.get('db', 'unknown').lower() # MySQL uses 'db' dbname = self.dbdesc.get('dbname', 'unknown').lower()
sqlcmd = f""" sqlcmd = f"""
SELECT SELECT
lower(column_name) AS name, lower(column_name) AS name,
@ -109,7 +98,7 @@ WHERE lower(TABLE_SCHEMA) = '{dbname}'
def fkSQL(self, tablename=None): def fkSQL(self, tablename=None):
# ✅ TiDB 兼容 MySQL 的 FK 元信息 # ✅ TiDB 兼容 MySQL 的 FK 元信息
dbname = self.dbdesc.get('db', 'unknown').lower() dbname = self.dbdesc.get('dbname', 'unknown').lower()
sqlcmd = f""" sqlcmd = f"""
SELECT SELECT
C.TABLE_SCHEMA AS owner, C.TABLE_SCHEMA AS owner,
@ -132,7 +121,7 @@ FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
WHERE table_name='{tablename.lower()}' AND constraint_name='PRIMARY'""" WHERE table_name='{tablename.lower()}' AND constraint_name='PRIMARY'"""
def indexesSQL(self, tablename=None): def indexesSQL(self, tablename=None):
dbname = self.dbdesc.get('db', 'unknown') dbname = self.dbdesc.get('dbname', 'unknown')
sqlcmd = f"""SELECT DISTINCT sqlcmd = f"""SELECT DISTINCT
lower(index_name) AS index_name, lower(index_name) AS index_name,
CASE NON_UNIQUE WHEN 0 THEN 'unique' ELSE '' END AS is_unique, CASE NON_UNIQUE WHEN 0 THEN 'unique' ELSE '' END AS is_unique,
@ -143,28 +132,3 @@ WHERE table_schema = '{dbname}'"""
sqlcmd += f" AND table_name = '{tablename.lower()}'" sqlcmd += f" AND table_name = '{tablename.lower()}'"
return sqlcmd return sqlcmd
async def connect(self):
"""
TiDB 兼容 MySQL 协议使用 aiomysql
"""
import aiomysql
dbdesc = self.dbdesc
# TiDB 使用与 MySQL 相同的连接参数
self.conn = await aiomysql.connect(**dbdesc)
self.dbname = dbdesc.get('db', '')
async def close(self):
await self.cur.close()
await self.conn.close()
async def enter(self):
self.cur = await self.conn.cursor()
async def exit(self):
try:
await self.cur.fetchall()
await self.cur.close()
except:
pass
self.cur = None