Compare commits

..

No commits in common. "main" and "plugable" have entirely different histories.

14 changed files with 773 additions and 649 deletions

177
README.md
View File

@ -23,7 +23,7 @@ SQLOR is a database api for python3, it is base on the python's DBAPI2
* clickhouse(clickhouse-connect)
* Other driver can be easy integreated
## Supported Database Types
## Support Database Types
* oracle
* mysql, mariadb
* TiDB
@ -83,120 +83,87 @@ password in json data is encrypted by aes.
## Using
### sqlor setup
First, Specified a server_path folder, under the server_path folder, need a named by "conf" subfolder.
and a config.json file must be in the conf.
the config.json need "password_key" and "databases" attributes, like:
```
{
.......
"password_key":"tfyugihjo245g7g642yubv24g534",
"databases":{
.......
import asyncio
from sqlor.dbpools import DBPools, sqlorContext
dbs={
"mydb":{
"driver":"mysql",
"kwargs":{
"user":"test",
"db":"database_name_in_your_database_engine",
"password":"encoded_password_string",
"db":"cfae",
"password":"test123",
"host":"localhost"
}
},
"stock":{
"driver":"aiopg",
"async_mode":True,
"codeing":"utf-8",
"dbname":"stock",
"kwargs":{
"dbname":"stock",
"user":"test",
"password":"test123",
"host":"127.0.0.1"
}
},
"cfae":{
"driver":"mysql.connector",
"coding":"utf8",
"dbname":"cfae",
"kwargs":{
"user":"test",
"db":"cfae",
"password":"test123",
"host":"localhost"
}
}
}
}
```
### generates encoded password string
sqlor has a dbpassword script let you generate the "encoded_password_string" in config.json
dbpassword usage:
```
dbpassword server_path database_password_of_user
```
### script to use sqlor
```
import asyncio
from appPublic.worker import get_event_loop
from appPublic.jsonConfig import getConfig
from sqlor.dbpools import DBPools, sqlorContext
loop = get_event_loop()
config = getConfig(server_path)
pool = DBPools(config.databases, loop=loop)
loop = asyncio.get_event_loop()
pool = DBPools(dbs,loop=loop)
async def testfunc():
dbname = 'mydb'
db = DBPools()
async with sqlorContext('stock') as sor:
# start a transaction
# if exception happended, all change to database will rollback
# else will commit
sql = "select * from stock where stock_num = ${stock_num}"
recs = await sor.sqlExe(sql, {'stock_num':'688888'})
# return a list of DictObject instance
sql1 = "select * from stock"
recs = await sor.sqlPaging(sql, {'pagerows':50, 'page':1, 'sort':'stock_id'})
# return a dictionary {
# 'total': return all reocords count.
# 'rows': list of DictObject instance
# }
ns = {'id':'666667'} # filters dictionary, find all records which id = '666667'
recs = await sor.R('stock', ns)
# recs is a list of data in DictObject instance
ns = {'pagerows': 50, 'page':1, 'sort':stock_id'}
# find all record in table 'stock', return page 1 data which 50 max records
dic = await sor.R('stock', ns)
# return a dictionary {
# 'total': return all reocords count.
# 'rows': list of DictObject instance
# }
await sor.C('stock', {
'stock_id': '111111',
...
})
# add a record to 'stock' table
await sor.D('stock', {'stock_id':'111111'})
# delete a record in 'stock' table which stock_id = '111111'
await sor.U('stock', {'stock_id':'111111', 'name':'new_name'})
# update name field's value to 'new_name' which stock_id = '111111'
async with sqlorContext('stock') as sor:
# start a transaction
# if exception happended, all change to database will rollback
# else will commit
sql = "select * from stock where stock_num = ${stock_num}"
recs = await sor.sqlExe(sql, {'stock_num':'688888'})
# return a list of DictObject instance
sql1 = "select * from stock"
recs = await sor.sqlPaging(sql, {'pagerows':50, 'page':1, 'sort':'stock_id'})
# return a dictionary {
# 'total': return all reocords count.
# 'rows': list of DictObject instance
# }
ns = {'id':'666667'} # filters dictionary, find all records which id = '666667'
recs = await sor.R('stock', ns)
# recs is a list of data in DictObject instance
ns = {'pagerows': 50, 'page':1, 'sort':stock_id'}
# find all record in table 'stock', return page 1 data which 50 max records
dic = await sor.R('stock', ns)
# return a dictionary {
# 'total': return all reocords count.
# 'rows': list of DictObject instance
# }
await sor.C('stock', {
'stock_id': '111111',
...
})
# add a record to 'stock' table
await sor.D('stock', {'stock_id':'111111'})
# delete a record in 'stock' table which stock_id = '111111'
await sor.U('stock', {'stock_id':'111111', 'name':'new_name'})
# update name field's value to 'new_name' which stock_id = '111111'
loop.run_until_complete(testfunc())
```
## scripts
### dbpassword
generate a encoded password for the password of the user of database
* Syntax
dbpassword server_path password_text
1 server_path is folder where server_path/conf/config.json file have specifies password_key and databases
2 password_text is the password of the specified user of database
* description
dbpassword encodes second argument password to a base64 based cyber, this cyber need to write into the password attribute under kwargs, and print the cyber to stdout
### dbloader
load data in xlsx file to your database
* Syntax
dbloader server_path dbname xlsxfile
1 server_path is folder where server_path/conf/config.json file have specifies password_key and databases
2 dbname is database where the data will insert into
3 xlsxfile is a data file, can contains many sheets, sheet name is tablename, first rows of sheet contains the field name of the table, fields without data not need to appear in the sheet.
* dbloader get data from each named sheet in xlsxfile, and insert them to database parellally,
## API
@ -208,23 +175,25 @@ how many databases and what database will using, and them connection parameters
dbdesc data is a dict data, format of the dbdesc as follow:
```
{
"mydb1":{ # name to identify a database connect
"driver":"mysql", # database dbapi2 driver package name
"aiocfae":{ # name to identify a database connect
"driver":"aiomysql", # database dbapi2 driver package name
"async_mode":True, # indicte this connection is asynchronous mode
"coding":"utf8", # charset coding
"dbname":"cfae", # database real name
"kwargs":{ # connection parameters
"user":"test",
"db":"cfae",
"password":"encoded_password",
"password":"test123",
"host":"localhost"
}
},
"mydb2":{
"driver":"postgresql",
"cfae":{
"driver":"mysql.connector",
"coding":"utf8",
"dbname":"cfae",
"kwargs":{
"user":"test",
"dbname":"cfae",
"db":"cfae",
"password":"test123",
"host":"localhost"
}

View File

@ -34,7 +34,7 @@
## 概述
`SQLor` 提供了一个统一接口来执行 SQL 查询和命令,并通过命名参数 `${var}$` 支持模板化 SQL。其主要特点包括
`SQLor` 提供了一个统一接口来执行 SQL 查询和命令,并通过命名参数 `${var}` 支持模板化 SQL。其主要特点包括
- 异步/同步双模式支持。
- 参数自动替换与安全绑定。
@ -89,7 +89,7 @@ os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'
| 功能 | 描述 |
|------|------|
| 参数化 SQL | 使用 `${var}$` 占位符进行变量注入 |
| 参数化 SQL | 使用 `${var}` 占位符进行变量注入 |
| 异步支持 | 支持 `async/await` 模式运行 SQL |
| 分页查询 | 自动生成分页 SQL需子类实现模型 |
| 排序与过滤 | 支持动态排序字段和条件表达式 |
@ -100,6 +100,289 @@ os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'
---
## 主要类与方法说明
### `db_type_2_py_type(o)`
将特定数据库类型转换为 Python 原生类型。
#### 参数
- `o`: 数据库字段值
#### 返回值
| 输入类型 | 输出 |
|--------|-------|
| `decimal.Decimal` | `float` |
| `datetime.datetime` | 字符串格式 `'YYYY-MM-DD HH:MM:SS'` |
| `datetime.date` | `'YYYY-MM-DD'` |
| 其他 | 原值 |
#### 示例
```python
db_type_2_py_type(decimal.Decimal('3.14')) → 3.14
```
---
### `SQLorException`
自定义异常类,封装 SQL 执行错误。
> ❗ 当前代码存在拼写错误:`__int__` 应为 `__init__`,且 `supper` 应为 `super`
#### 属性
- `response`: `"error"`
- `errtype`: `"SQLor"`
- `errmsg`: 异常消息字符串
#### 示例输出
```
errtype:SQLor,errmsg=database error...
```
> ✅ **建议修复**
```python
class SQLException(Exception):
def __init__(self, **kvs):
super(SQLException, self).__init__(**kvs)
self.dic = {
'response': 'error',
'errtype': 'SQLor',
'errmsg': str(self)
}
def __str__(self):
return 'errtype:%s,errmsg=%s' % (self.dic['errtype'], self.dic['errmsg'])
```
---
### `setValues(params, ns)`
从命名空间 `ns` 或系统环境变量中查找参数值。
#### 参数
- `params`: 参数名(字符串)
- `ns`: 命名空间字典
#### 返回
优先从 `ns` 中取值,若无则尝试 `os.getenv(params)`
---
### `findNamedParameters(sql)`
解析 SQL 字符串中的所有 `${...}` 形式的命名参数。
#### 示例
```python
sql = "SELECT * FROM user WHERE id = ${uid}$ AND name = ${uname}$"
findNamedParameters(sql)
# 返回 ['${uid}$', '${uname}$']
```
---
### `uniParams(params1)`
去重保留唯一参数名列表。
#### 示例
```python
uniParams(['${a}$', '${b}$', '${a}$']) → ['${a}$', '${b}$']
```
---
### `readsql(fn)`
读取指定文件路径的 SQL 文件内容UTF-8 编码)。
#### 参数
- `fn`: 文件路径
#### 返回
文件内容字符串。
---
## `SQLor`
### 初始化 `__init__`
#### 参数
| 参数 | 默认值 | 说明 |
|------|--------|------|
| `dbdesc` | None | 数据库描述字典,至少含 `dbname` |
| `sqltp`, `sqlts` | `$[`, `]$` | 模板占位符开始/结束符号 |
| `sqlvp`, `sqlvs` | `${`, `}$` | 变量占位符开始/结束符号 |
#### 初始化行为
- 设置连接状态(`conn`, `cur`
- 初始化元数据缓存 `metadatas`
- 创建 `ConditionConvert` 实例用于条件表达式处理
---
### 元数据管理
#### `setMeta(tablename, meta)`
保存表的元数据到内存缓存。
#### `getMeta(tablename)`
获取指定表的缓存元数据。
#### `removeMeta(tablename)`
清除指定表的元数据缓存。
---
### 连接与游标控制
#### `setCursor(async_mode, conn, cur)`
绑定数据库连接和游标对象。
#### `getConn()`
返回当前数据库连接对象。
#### `cursor()`
返回当前游标对象。
---
### 类型转换支持
#### `setConvertFunction(typ, func)`
注册用户自定义类型转换函数。
#### `convert(typ, value)`
调用注册的转换函数处理值。
---
### SQL 类型判断
#### `getSqlType(sql)`
判断 SQL 语句类型。
| 类型 | 条件 |
|------|------|
| `"qry"` | 以 `SELECT` 开头 |
| `"dml"` | `INSERT`, `UPDATE`, `DELETE` |
| `"ddl"` | 其他(如 `CREATE`, `DROP` |
---
### SQL 模板处理
#### `maskingSQL(org_sql, NS)`
将命名参数 `${var}$` 替换为 `?` 并生成参数列表。
##### 步骤
1. 先替换 `$[...]$` 模板片段(如条件块)
2. 提取 `${var}$` 参数名
3. 替换为 `?` 占位符
4. 返回 `(标记后的SQL, 参数值列表)`
##### 特殊变量
- `__mainsql__`: 不参与参数绑定,用于嵌套 SQL 注入
---
### 执行方法
#### `execute(sql, value, callback, **kwargs)`
执行单条 SQL支持回调逐行处理结果。
##### 流程
- 调用 `runVarSQL` 执行 SQL
- 若是查询 (`qry`) 且有回调,则逐行调用 `callback(DictObject(row))`
- 若是 DML标记 `dataChanged=True`
#### `executemany(sql, values)`
批量执行 SQL适用于 `INSERT` 等)
---
### 分页与排序
#### `sortSQL(sql, NS)`
添加 `ORDER BY` 子句。
- 支持 `NS['sort']` 为字符串或列表
#### `pagingSQL(sql, paging, NS)`
构建分页 SQL需配合 `pagingSQLmodel()` 使用。
> 默认为空,需由子类重写提供具体数据库分页语法(如 MySQL 的 `LIMIT OFFSET` 或 Oracle 的 `ROWNUM`
##### 参数
- `pagename`: 页码参数名(默认 `'page'`
- `rowsname`: 每页行数(默认 `'rows'`
- `sortname`: 排序字段(可选)
##### 示例 NS
```python
{
'page': 2,
'rows': 20,
'sort': 'id desc'
}
```
#### `recordCnt(sql)`
包装 SQL 查询总记录数:
```sql
SELECT COUNT(*) rcnt FROM (your_sql) rowcount_table
```
---
### 过滤器支持
#### `filterSQL(sql, filters, NS)`
将一组条件过滤器合并进原 SQL。
每个 filter 是一个带 `${}` 的模板字符串,若其中变量未在 `NS` 中存在,则忽略此条件(替换为 `1=1`)。
最终生成:
```sql
SELECT * FROM (original_sql) filter_table WHERE f1 AND f2 ...
```
---
### Pivot 表格转换
#### `pivotSQL(tablename, rowFields, columnFields, valueFields)`
生成透视表 SQL。
##### 逻辑
1. 查询 `columnFields` 的所有唯一值作为列头
2. 使用 `CASE WHEN` 构造每列聚合
3. 使用 `SUM` 聚合数值字段
##### 示例输出
```sql
SELECT dept,
SUM(salary_0) salary_0, -- 北京
SUM(salary_1) salary_1 -- 上海
FROM (
SELECT dept,
CASE WHEN city='北京' THEN salary ELSE 0 END AS salary_0,
CASE WHEN city='上海' THEN salary ELSE 0 END AS salary_1
FROM employees
) GROUP BY dept;
```
#### `pivot(desc, ...)`
执行 pivot 查询并返回结果列表。
---
### 元数据查询方法
#### `tables()`
返回数据库中所有表的列表。

View File

@ -1,82 +0,0 @@
# python使用sqlor数据操作规范
## 安装依赖包
```
pip install apppublic
pip install sqlor
```
## 使用规范
### 包含所需的包
```
from sqlor.dbpools import DBPools
```
### 数据库操作
获得数据库连接
```
db = DBPools() 获得数据库连接
···
### 使用数据库操作上下文
使用数据库操作上下文的好处:
* 自动事务开始和结束,整体提交或整体失败
* 资源自动回收
```
async with db.sqlorContext(dbname) as sor:
....
```
### 执行数据库操作
sqlor所有的数据库操作都必须在sqlorContext函数生成的上下文中执行以提供事务一致性并自动回收资源提供两种模式的数据库操作sqlExe函数可以执行dml ddl支持多表数据操作而CRUD只针对单表提供增产改查
#### runSQL
写符合SQL标准的SQL语句数据变量用“${varname}$"作为占位符并在参数ns中给定参数
以下是一个查询用户数据的语句机构id作为参数传入
```
sql = "select * from users where where orgid = ${myorgid}$ and delflg='0'"
await sor.sqlExe(sql, {'myorgid' : '12122'})
```
返回一个数据,数组中的每一个元素为一个用户数据
以下例子返回分页数据按照给定的page不同返回不同页的数据pagerows定义每页数据量```
sql1 = "select * from stock"
dic = await sor.sqlPaging(sql, {'pagerows':50, 'page':1, 'sort':'stock_id'})
```
dic是个DictObject从dict继承有两个属性total查询结果总记录数 rowspage指定页的数据
#### CRUD
CRUD针对单表操作提供对数据表的增删改查CDU三个函数在ns参数中必须提供主键数据。
* C
向表中添加数据
```
await sor.C('stock', {
'stock_id': '111111',
...
})
```
* R
从表中读取数据,支持分页字典数据返回和不分页数组数据返回
分页例子
```
ns = {'pagerows': 50, 'page':1, 'sort':stock_id'}
dic = await sor.R('stock', ns)
```
dic是个DictObject从dict继承有两个属性total查询结果总记录数 rowspage指定页的数据
不分页例子
```
ns = {'id':'666667'} # filters dictionary, find all records which id = '666667'
recs = await sor.R('stock', ns)
```
* U
修改数据表数据以主键必须在ns中出现作为修改数据条件修改ns中其他字段的值ns中不出现的字段不会被修改
```
await sor.U('stock', {'stock_id':'111111', 'name':'new_name'})
```
* D
删除表中数据以主键必须在ns中出现作为删除数据条件从数据库中删除主键值与参数ns中主键值相同的记录
```
await sor.D('stock', {'stock_id':'111111'})
```

View File

@ -1,6 +1,6 @@
[metadata]
name=sqlor
version = 2.0.3
version = 2.0.0
description = a new version of sqlor, each db's sor need to plugin to sqlor, and dbdriver now a isolated module
authors = yu moqing
author_email = yumoqing@gmail.com
@ -20,8 +20,3 @@ install_requires =
asyncio
jinja2
[options.entry_points]
console_scripts =
dbpassword = sqlor.dbpassword:main
dbloader = sqlor.dbloader:main

373
sqlor/crud.py Executable file
View File

@ -0,0 +1,373 @@
# -*- coding:utf8 -*-
from .dbpools import DBPools
from .const import ROWS
from .filter import DBFilter
from appPublic.objectAction import ObjectAction
from appPublic.dictObject import DictObject
from appPublic.timeUtils import date2str,time2str,str2Date
from appPublic.uniqueID import getID
toStringFuncs={
'char':None,
'str':None,
'short':str,
'long':str,
'float':str,
'date':date2str,
'time':time2str,
}
fromStringFuncs={
'short':int,
'long':int,
'float':float,
'date':str2Date,
'time':str2Date
}
class DatabaseNotfound(Exception):
def __init__(self,dbname):
Exception.__init__(self)
self.dbname = dbname
def __str__(self):
return f'{self.dbname} not found'
class CRUD(object):
def __init__(self,dbname,tablename,rows=ROWS):
self.pool = DBPools()
if dbname not in self.pool.databases.keys():
raise DatabaseNotfound(dbname)
self.dbname = dbname
self.tablename = tablename
self.rows = rows
self.primary_data = None
self.oa = ObjectAction()
async def primaryKey(self,**kw):
if self.primary_data is None:
self.primary_data = await self.pool.getTablePrimaryKey(self.dbname,
self.tablename,**kw)
return self.primary_data
async def forignKeys(self,**kw):
data = self.pool.getTableForignKeys(self.dbname,self.tablename,**kw)
return data
async def I(self,**kw):
"""
fields information
"""
@self.pool.inSqlor
async def main(dbname,NS,**kw):
pkdata = await self.primaryKey(**kw)
pks = [ i.field_name for i in pkdata ]
data = await self.pool.getTableFields(self.dbname,self.tablename,**kw)
for d in data:
if d.name in pks:
d.update({'primarykey':True})
data = self.oa.execute(self.dbname+'_'+self.tablename,'tableInfo',data)
return data
return await main(self.dbname,{},**kw)
async def fromStr(self,data):
fields = await self.pool.getTableFields(self.dbname,self.tablename)
ret = {}
for k in data:
v = None if data[k] == '' else data[k]
for f in fields:
if k == f.name:
ret[k] = v
f = fromStringFuncs.get(f.type,None)
if f is not None and v is not None:
ret[k] = f(v)
return ret
async def toStr(self,data):
fields = await self.pool.getTableFields(self.dbname,self.tablename)
ret = {}
for k in data:
for f in fields:
if k == f.name:
ret[k] = data[k]
f = toStringFuncs.get(f.type,None)
if f is not None and data[k] is not None:
ret[k] = f(data[k])
return ret
async def datagrid(self,request,targeti,**kw):
fields = await self.I()
fs = [ self.defaultIOField(f) for f in fields ]
id = self.dbname+':'+ self.tablename
pk = await self.primaryKey(**kw)
idField = pk[0]['field_name']
data = {
"tmplname":"widget_js.tmpl",
"data":{
"__ctmpl__":"datagrid",
"__target__":target,
"data":{
"name":id,
"icon-conv":"icon-table",
"title":tablename,
"url":absurl('./RP.dspy?id=%s' % id),
"deleteUrl":absurl('./D.dspy?id=%s' % id),
"addUrl":absurl('./C.dspy?id=%s' % id),
"updateUrl":absurl('./U.dspy?id=%s' % id),
"idField":idField,
"dnd":True,
"view":"scrollview",
"fields":fs,
"toolbar":{
"tools":[
{
"name":"add",
"icon":"icon-add",
"label":"add ball"
},
{
"name":"delete",
"icon":"icon-delete",
"label":"delete ball"
},
{
"name":"moveup",
"icon":"icon-up",
"label":"moveup ball"
},
{
"name":"movedown",
"icon":"icon-down",
"label":"movedown ball"
}
]
},
"options":{
"pageSize":50,
"pagination":False
}
}
}
}
data = self.oa.execute(id,'datagrid',data)
return data
def defaultIOField(self,f):
if f.type in ['str']:
return {
"primarykey":f.get('primarykey',False),
"name":f.name,
"hidden":False,
"sortable":True,
"label":f.title,
"align":"center",
"iotype":"text"
}
if f.type in ['float','short','long']:
return {
"primarykey":f.get('primarykey',False),
"sortable":True,
"name":f.name,
"hidden":False,
"label":f.title,
"align":"right",
"iotype":"text"
}
return {
"primarykey":f.get('primarykey',False),
"name":f.name,
"sortable":True,
"hidden":False,
"label":f.title,
"align":"center",
"iotype":"text"
}
async def C(self,rec,**kw):
"""
create new data
"""
@self.pool.runSQL
async def addSQL(dbname,data,**kw):
fns = kw['fns']
vfs = kw['vfs']
sqldesc={
"sql_string" : """
insert into %s (%s) values (%s)
""" % (self.tablename,fns,vfs),
}
return sqldesc
@self.pool.inSqlor
async def main(dbname,NS,**kw):
fields = await self.pool.getTableFields(self.dbname,self.tablename,**kw)
flist = [ f['name'] for f in fields ]
fns = ','.join(flist)
vfs = ','.join([ '${' + f + '}$' for f in flist ])
data = {}
[ data.update({k.lower():v}) for k,v in NS.items() ]
pk = await self.primaryKey(**kw)
k = pk[0]['field_name']
if not data.get(k):
v = getID()
data[k] = v
data = self.oa.execute(self.dbname+'_'+self.tablename,'beforeAdd',data)
kwargs = kw.copy()
kwargs['fns'] = fns
kwargs['vfs'] = vfs
await addSQL(self.dbname,data,**kwargs)
data = self.oa.execute(self.dbname+'_'+self.tablename,'afterAdd',data)
return {k:data[k]}
return await main(self.dbname,rec,**kw)
async def defaultFilter(self,NS,**kw):
fields = await self.pool.getTableFields(self.dbname,self.tablename,**kw)
d = [ '%s = ${%s}$' % (f['name'],f['name']) for f in fields if f['name'] in NS.keys() ]
if len(d) == 0:
return ''
ret = ' and ' + ' and '.join(d)
return ret
async def R(self,filters=None,NS={},**kw):
"""
retrieve data
"""
@self.pool.runSQL
async def retrieve(dbname,data,**kw):
fstr = ''
if filters is not None:
fstr = ' and '
dbf = DBFilter(filters)
fstr = fstr + dbf.genFilterString()
else:
fstr = await self.defaultFilter(NS,**kw)
sqldesc = {
"sql_string":"""select * from %s where 1=1 %s""" % (self.tablename,fstr),
}
return sqldesc
@self.pool.runSQLPaging
async def pagingdata(dbname,data,filters=None,**kw):
fstr = ""
if filters is not None:
fstr = ' and '
dbf = DBFilter(filters)
fstr = fstr + dbf.genFilterString()
else:
fstr = await self.defaultFilter(NS,**kw)
sqldesc = {
"sql_string":"""select * from %s where 1=1 %s""" % (self.tablename,fstr),
"default":{'rows':self.rows}
}
return sqldesc
@self.pool.inSqlor
async def main(dbname,NS,**kw):
p = await self.primaryKey(**kw)
if NS.get('__id') is not None:
NS[p[0]['field_name']] = NS['__id']
del NS['__id']
if NS.get('page'):
del NS['page']
if NS.get('page'):
if NS.get('sort',None) is None:
NS['sort'] = p[0]['field_name']
data = self.oa.execute(self.dbname+'_'+self.tablename,'beforeRetrieve',NS)
if NS.get('page'):
data = await pagingdata(self.dbname,data,**kw)
else:
data = await retrieve(self.dbname,data,**kw)
data = self.oa.execute(self.dbname+'_'+self.tablename,'afterRetrieve',data)
return data
return await main(self.dbname,NS,**kw)
async def U(self,data, **kw):
"""
update data
"""
@self.pool.runSQL
async def update(dbname,NS,**kw):
condi = [ i['field_name'] for i in self.primary_data ]
newData = [ i for i in NS.keys() if i not in condi ]
c = [ '%s = ${%s}$' % (i,i) for i in condi ]
u = [ '%s = ${%s}$' % (i,i) for i in newData ]
cs = ' and '.join(c)
us = ','.join(u)
sqldesc = {
"sql_string":"""update %s set %s where %s""" % (self.tablename,us,cs)
}
return sqldesc
@self.pool.inSqlor
async def main(dbname,NS,**kw):
pk = await self.primaryKey(**kw)
pkfields = [k.field_name for k in pk ]
newData = [ k for k in data if k not in pkfields ]
data = self.oa.execute(self.dbname+'_'+self.tablename,'beforeUpdate',data)
await update(self.dbname,data,**kw)
data = self.oa.execute(self.dbname+'_'+self.tablename,'afterUpdate',data)
return data
return await main(self.dbname,data,**kw)
async def D(self,data,**kw):
"""
delete data
"""
@self.pool.runSQL
def delete(dbname,data,**kw):
pnames = [ i['field_name'] for i in self.primary_data ]
c = [ '%s = ${%s}$' % (i,i) for i in pnames ]
cs = ' and '.join(c)
sqldesc = {
"sql_string":"delete from %s where %s" % (self.tablename,cs)
}
return sqldesc
@self.pool.inSqlor
async def main(dbname,NS,**kw):
data = self.oa.execute(self.dbname+'_'+self.tablename,'beforeDelete',data)
await delete(self.dbname,data,pkfields,**kw)
data = self.oa.execute(self.dbname+'_'+self.tablename,'afterDelete',data)
return data
return await main(self.dbname,data,**kw)
if __name__ == '__main__':
DBPools({
"ambi":{
"driver":"pymssql",
"coding":"utf-8",
"dbname":"ambi",
"kwargs":{
"user":"ymq",
"database":"ambi",
"password":"ymq123",
"host":"localhost"
}
},
"metadb":{
"driver":"pymssql",
"coding":"utf-8",
"dbname":"metadb",
"kwargs":{
"user":"ymq",
"database":"metadb",
"password":"ymq123",
"host":"localhost"
}
}
})
crud = CRUD('ambi')
#fields = crud.I('cashflow')
#for i in fields:
# print(i)
data = crud.RP('cashflow')
print(data.total)
for i in data.rows:
print(i.balance,i.asid)

View File

@ -1,50 +0,0 @@
import os
import sys
import asyncio
from appPublic.worker import get_event_loop
from appPublic.jsonConfig import getConfig
from appPublic.worker import AsyncWorker
from sqlor.dbpools import DBPools
from appPublic.dictObject import DictObject
from xls2ddl.xlsxData import XLSXData
def chunked(lst, n):
for i in range(0, len(lst), n):
yield lst[i:i+n]
async def load_tabledata(dbname, tblname, data):
db = DBPools()
async with db.sqlorContext(dbname) as sor:
for r in data:
try:
await sor.D(tblname, {'id': r.id})
except:
pass
await sor.C(tblname, r.copy())
async def load_data():
if len(sys.argv) < 4:
print(f'{sys.argv[0]} server_path dbname datafile')
return 1
runpath = sys.argv[1]
dbname = sys.argv[2]
datafile = sys.argv[3]
config = getConfig(runpath)
db = DBPools(config.databases)
xlsx = XLSXData(datafile)
worker = AsyncWorker(maxtask=100)
tasks = []
for i,s in enumerate(xlsx.book.worksheets):
tblname = xlsx.book.sheetnames[i]
dic = xlsx.readRecords(tblname, s)
for chunk in chunked(dic[tblname], 100):
t = asyncio.create_task(load_tabledata(dbname, tblname, chunk))
tasks.append(t)
await asyncio.wait(tasks)
return 0
def main():
get_event_loop().run_until_complete(load_data())
if __name__ == '__main__':
main()

View File

@ -1,17 +0,0 @@
import os, sys
from appPublic.aes import aes_encode_b64
from appPublic.jsonConfig import getConfig
def main():
if len(sys.argv) < 3:
print(f'{sys.argv[0]} server_path dbuser_password')
sys.exit(1)
runpath = sys.argv[1]
password = sys.argv[2]
config = getConfig(runpath)
cyber = aes_encode_b64(config.password_key, sys.argv[2])
print(f'{password} encoded is {cyber}')
sys.exit(0)
if __name__ == '__main__':
main()

View File

@ -6,7 +6,6 @@ import codecs
from contextlib import asynccontextmanager
from appPublic.worker import get_event_loop
from appPublic.myImport import myImport
from appPublic.dictObject import DictObject
from appPublic.Singleton import SingletonDecorator
@ -56,39 +55,13 @@ class SqlorPool:
self.sqlors.append(x)
return x
async def _del_sqlor(self, sor):
try:
await sor.exit()
except:
pass
try:
await sor.close()
except:
pass
async def test_sqlor(self, sor):
try:
await sor.enter()
await sor.execute(sor.test_sqlstr, {})
await sor.exit()
return True
except:
await sor.exit()
return False
@asynccontextmanager
async def context(self):
async with self.sema:
sqlors = [s for s in self.sqlors]
yielded_sqlor = None
for s in sqlors:
for s in self.sqlors:
if not s.used:
flg = await self.test_sqlor(s.sqlor)
if flg:
yielded_sqlor = s
else:
await self._del_sqlor(s.sqlor)
self.sqlors = [ x for x in self.sqlors if x != s ]
yielded_sqlor = s
if not yielded_sqlor:
yielded_sqlor = await self._new_sqlor()
yielded_sqlor.used = True
@ -96,12 +69,11 @@ class SqlorPool:
yield yielded_sqlor.sqlor
yielded_sqlor.used = False
@SingletonDecorator
class DBPools:
def __init__(self,databases={},max_connect=100,loop=None):
if loop is None:
loop = get_event_loop()
loop = asyncio.get_event_loop()
self.loop = loop
self.max_connect = max_connect
self._pools = {}
@ -134,9 +106,7 @@ class DBPools:
sqlor = None
try:
async with pool.context() as sqlor:
await sqlor.enter()
yield sqlor
await sqlor.exit()
if sqlor and sqlor.dataChanged:
await sqlor.commit()
except Exception as e:

View File

@ -49,7 +49,7 @@ class MySqlor(SQLor):
}
@classmethod
def isMe(self,name):
if name in ['mysql', 'aiosqlor', 'tidb']:
if name=='mysql':
return True
return False
@ -152,20 +152,10 @@ WHERE
"""
dbdesc = self.dbdesc
self.conn = await aiomysql.connect(**dbdesc)
self.cur = await self.conn.cursor()
self.dbname = dbdesc.get('db')
async def close(self):
await self.cursor.close()
await self.conn.close()
async def enter(self):
self.cur = await self.conn.cursor()
async def exit(self):
try:
await self.cur.fetchall()
await self.cur.close()
except:
pass
self.cur = None

View File

@ -66,9 +66,6 @@ from (
)
where row_id >=$[from_line]$ and row_id < $[end_line]$"""
def test_sqlstr(self):
return "select 1 from dual"
def tablesSQL(self):
sqlcmd = """select
lower(table_name) as name,

View File

@ -1,184 +0,0 @@
# -*- coding:utf8 -*-
import asyncpg
from appPublic.argsConvert import ArgsConvert, ConditionConvert
from .sor import SQLor
from .ddl_template_pgsql import pgsql_ddl_tmpl # 需要提供 PostgreSQL DDL 模板
class PgSqlor(SQLor):
ddl_template = pgsql_ddl_tmpl
db2modelTypeMapping = {
'smallint': 'short',
'integer': 'long',
'bigint': 'long',
'decimal': 'float',
'numeric': 'float',
'real': 'float',
'double precision': 'float',
'serial': 'long',
'bigserial': 'long',
'varchar': 'str',
'char': 'char',
'text': 'text',
'bytea': 'bin',
'date': 'date',
'time': 'time',
'timestamp': 'datetime',
'timestamptz': 'datestamp',
'boolean': 'short',
'json': 'text',
'jsonb': 'text',
}
model2dbTypemapping = {
'date': 'date',
'time': 'time',
'timestamp': 'timestamp',
'str': 'varchar',
'char': 'char',
'short': 'smallint',
'long': 'bigint',
'float': 'double precision',
'text': 'text',
'bin': 'bytea',
'file': 'bytea',
}
@classmethod
def isMe(cls, name):
return name.lower() in ['pgsql', 'postgres', 'postgresql']
def grammar(self):
return {
# PostgreSQL 支持 window/CTE 等,可在这里扩展
'select': 'select',
}
def placeHolder(self, varname, pos=None):
"""PostgreSQL 使用 $1, $2 作为占位符"""
if varname == '__mainsql__':
return ''
if pos is not None:
return f"${pos + 1}"
return '$1'
def dataConvert(self, dataList):
if isinstance(dataList, dict):
return tuple(dataList.values())
else:
return tuple(i['value'] for i in dataList)
def pagingSQLmodel(self):
"""分页模板"""
return """select * from (%s) as A order by $[sort]$ offset $[from_line]$ limit $[rows]$"""
def tablesSQL(self):
sqlcmd = f"""
SELECT
lower(tablename) as name,
obj_description(format('%s.%s', schemaname, tablename)::regclass) as title
FROM pg_tables
WHERE schemaname NOT IN ('pg_catalog', 'information_schema');
"""
return sqlcmd
def fieldsSQL(self, tablename=None):
sqlcmd = f"""
SELECT
lower(a.attname) as name,
format_type(a.atttypid, a.atttypmod) as type,
CASE
WHEN a.atttypmod > 0 THEN a.atttypmod - 4
ELSE NULL
END as length,
NULL as "dec",
CASE WHEN a.attnotnull THEN 'no' ELSE 'yes' END as nullable,
col_description(a.attrelid, a.attnum) as title,
lower(c.relname) as table_name
FROM pg_attribute a
JOIN pg_class c ON a.attrelid = c.oid
JOIN pg_namespace n ON c.relnamespace = n.oid
WHERE a.attnum > 0 AND NOT a.attisdropped
AND n.nspname NOT IN ('pg_catalog', 'information_schema')
"""
if tablename:
sqlcmd += f" AND lower(c.relname) = '{tablename.lower()}'"
return sqlcmd
def fkSQL(self, tablename=None):
sqlcmd = f"""
SELECT
con.conname AS constraint_name,
nsp.nspname AS schema_name,
rel.relname AS child_table,
att.attname AS child_column,
frel.relname AS parent_table,
fatt.attname AS parent_column,
con.confupdtype AS update_rule,
con.confdeltype AS delete_rule
FROM pg_constraint con
JOIN pg_class rel ON rel.oid = con.conrelid
JOIN pg_class frel ON frel.oid = con.confrelid
JOIN pg_attribute att ON att.attrelid = con.conrelid AND att.attnum = ANY(con.conkey)
JOIN pg_attribute fatt ON fatt.attrelid = con.confrelid AND fatt.attnum = ANY(con.confkey)
JOIN pg_namespace nsp ON nsp.oid = con.connamespace
WHERE con.contype = 'f'
"""
if tablename:
sqlcmd += f" AND lower(rel.relname) = '{tablename.lower()}'"
return sqlcmd
def pkSQL(self, tablename=None):
sqlcmd = f"""
SELECT a.attname as name
FROM pg_index i
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
WHERE i.indrelid = '{tablename.lower()}'::regclass
AND i.indisprimary;
"""
return sqlcmd
def indexesSQL(self, tablename=None):
sqlcmd = f"""
SELECT
lower(i.relname) as index_name,
CASE WHEN ix.indisunique THEN 'unique' ELSE '' END as is_unique,
lower(a.attname) as column_name
FROM pg_class t,
pg_class i,
pg_index ix,
pg_attribute a
WHERE
t.oid = ix.indrelid
AND i.oid = ix.indexrelid
AND a.attrelid = t.oid
AND a.attnum = ANY(ix.indkey)
AND t.relkind = 'r'
"""
if tablename:
sqlcmd += f" AND lower(t.relname) = '{tablename.lower()}'"
return sqlcmd
async def connect(self):
"""建立数据库连接"""
dbdesc = self.dbdesc
self.conn = await asyncpg.connect(
user=dbdesc.get('user'),
password=dbdesc.get('password'),
database=dbdesc.get('db'),
host=dbdesc.get('host', 'localhost'),
port=dbdesc.get('port', 5432),
)
self.dbname = dbdesc.get('db')
async def close(self):
await self.conn.close()
async def enter(self):
"""开启事务或获取游标asyncpg 无游标,直接用连接执行)"""
self.cur = self.conn # 保留接口一致性
async def exit(self):
"""释放资源"""
self.cur = None

35
sqlor/runsql.py Executable file
View File

@ -0,0 +1,35 @@
#!/usr/bin/python3
import sys
import codecs
from sqlor.dbpools import runSQL
import asyncio
def appinit():
if len(sys.argv) < 4:
print(f'usage:\n {sys.argv[0]} path dbname sqlfile [k=v ...] \n')
sys.exit(1)
p = ProgramPath()
if len(sys.argv) > 1:
p = sys.argv[1]
config = getConfig(p)
DBPools(config.databases)
async def run(ns):
with codecs.open(sys.argv[3], 'r', 'utf-8') as f:
sql = f.read()
await runSQL(sys.argv[2], sql, ns)
if __name__ == '__main__':
ns = {}
for x in sys.argv[3:]:
try:
k,v = x.split('=')
ns.update({k:v})
except Exception as e:
print(x, 'key-value pair expected')
print(e)
appinit()
loop = asyncio.get_event_loop()
loop.run_until_complete(run(ns))

View File

@ -82,7 +82,6 @@ class SQLor(object):
self.sqlvs = sqlvs
self.dbdesc = dbdesc.copy()
self.unpassword()
debug(f'--------{self.dbdesc=}----')
self.dbname = None
self.writer = None
self.convfuncs = {}
@ -90,20 +89,11 @@ class SQLor(object):
self.dataChanged = False
self.metadatas={}
async def enter(self):
pass
async def exit(self):
pass
def unpassword(self):
if self.dbdesc.password:
key=getConfig().password_key
self.dbdesc.password = aes_decode_b64(key, self.dbdesc.password)
def test_sqlstr(self):
return "select 1"
async def get_schema(self):
def concat_idx_info(idxs):
x = []

View File

@ -1,145 +0,0 @@
# -*- coding:utf8 -*-
import aiosqlite
from appPublic.argsConvert import ArgsConvert, ConditionConvert
from .sor import SQLor
from .ddl_template_sqlite import sqlite_ddl_tmpl
class SQLiteor(SQLor):
ddl_template = sqlite_ddl_tmpl
db2modelTypeMapping = {
'integer': 'long',
'int': 'long',
'tinyint': 'short',
'smallint': 'short',
'mediumint': 'long',
'bigint': 'long',
'decimal': 'float',
'numeric': 'float',
'real': 'float',
'double': 'float',
'float': 'float',
'char': 'char',
'varchar': 'str',
'text': 'text',
'clob': 'text',
'blob': 'bin',
'date': 'date',
'datetime': 'datetime',
'boolean': 'short',
}
model2dbTypemapping = {
'date': 'date',
'time': 'time',
'timestamp': 'datetime',
'str': 'text',
'char': 'char',
'short': 'integer',
'long': 'integer',
'float': 'real',
'text': 'text',
'bin': 'blob',
'file': 'blob',
}
@classmethod
def isMe(cls, name):
return name.lower() in ['sqlite', 'sqlite3']
def grammar(self):
return {
'select': 'select',
}
def placeHolder(self, varname, pos=None):
"""SQLite 使用 ? 占位符"""
if varname == '__mainsql__':
return ''
return '?'
def dataConvert(self, dataList):
if isinstance(dataList, dict):
return tuple(dataList.values())
else:
return tuple(i['value'] for i in dataList)
def pagingSQLmodel(self):
"""分页 SQL 模板"""
return """select * from (%s) as A order by $[sort]$ limit $[rows]$ offset $[from_line]$"""
def tablesSQL(self):
"""获取表名和备注"""
sqlcmd = """
SELECT
lower(name) AS name,
'' AS title
FROM sqlite_master
WHERE type='table'
AND name NOT LIKE 'sqlite_%';
"""
return sqlcmd
def fieldsSQL(self, tablename=None):
"""获取字段信息"""
sqlcmd = f"""
PRAGMA table_info({tablename});
"""
# SQLite 无信息架构表,直接返回 PRAGMA 命令
return sqlcmd
def fkSQL(self, tablename=None):
"""获取外键信息"""
if tablename:
sqlcmd = f"PRAGMA foreign_key_list({tablename});"
else:
sqlcmd = "-- SQLite 需逐表获取外键信息"
return sqlcmd
def pkSQL(self, tablename=None):
"""获取主键信息"""
sqlcmd = f"""
SELECT name
FROM pragma_table_info('{tablename}')
WHERE pk != 0;
"""
return sqlcmd
def indexesSQL(self, tablename=None):
"""获取索引信息"""
if not tablename:
return "SELECT name as index_name, '' as is_unique, '' as column_name FROM sqlite_master WHERE type='index';"
sqlcmd = f"""
PRAGMA index_list('{tablename}');
"""
return sqlcmd
async def connect(self):
"""
连接 SQLite 数据库
dbdesc:
path: 数据库文件路径 :memory:
"""
dbdesc = self.dbdesc
self.dbpath = dbdesc.get('path', ':memory:')
self.conn = await aiosqlite.connect(self.dbpath)
self.conn.row_factory = aiosqlite.Row # 支持 dict 访问
self.dbname = self.dbpath
async def close(self):
await self.conn.close()
async def enter(self):
"""开启事务"""
self.cur = await self.conn.cursor()
async def exit(self):
"""释放 cursor"""
try:
await self.cur.close()
except:
pass
self.cur = None