Compare commits

...

44 Commits

Author SHA1 Message Date
62514a2398 bugfix 2025-10-20 16:38:58 +08:00
6655fc6fde bugfix 2025-10-20 14:22:21 +08:00
be7faa09ed bugfix 2025-10-20 11:48:58 +08:00
92f2857ce6 bugfix 2025-10-20 10:17:51 +08:00
yumoqing
b56afa920e bugfix 2025-10-19 20:12:33 +08:00
yumoqing
2708692f17 bugfix 2025-10-19 20:04:45 +08:00
yumoqing
0a75d57225 bugfix 2025-10-19 18:19:17 +08:00
yumoqing
08ffd2ef08 bugfix 2025-10-19 18:09:02 +08:00
yumoqing
a66f93c4f7 bugfix 2025-10-19 18:08:13 +08:00
yumoqing
f6eb868e77 bugfix 2025-10-19 18:06:00 +08:00
yumoqing
85f3980582 bugfix 2025-10-19 17:52:49 +08:00
yumoqing
f2efc672d8 bugfix 2025-10-19 17:51:15 +08:00
yumoqing
bb3e0bfecd bugfix 2025-10-19 17:42:12 +08:00
yumoqing
33b99b3acc bugfix 2025-10-19 17:38:59 +08:00
yumoqing
6c88be2071 bugfix 2025-10-19 17:36:10 +08:00
yumoqing
42d799bb49 bugfix 2025-10-19 17:25:14 +08:00
yumoqing
9166698797 bugfix 2025-10-19 17:22:08 +08:00
yumoqing
9fb0f25bf0 bugfix 2025-10-19 17:21:21 +08:00
yumoqing
917a52f940 bugfix 2025-10-19 17:08:55 +08:00
yumoqing
89a9756b00 bugfix 2025-10-19 17:04:42 +08:00
yumoqing
accd07844d bugfix 2025-10-19 16:55:42 +08:00
yumoqing
4d86499372 bugfix 2025-10-19 16:52:34 +08:00
yumoqing
5c58c04440 bugfix 2025-10-19 16:44:37 +08:00
yumoqing
98fe6e2b0e bugfix 2025-10-19 16:42:57 +08:00
yumoqing
906073a516 bugfix 2025-10-19 16:42:19 +08:00
yumoqing
fc0735478d bugfix 2025-10-19 16:38:54 +08:00
yumoqing
72c17f6c12 bugfix 2025-10-19 16:34:57 +08:00
yumoqing
93db80ac52 bugfix 2025-10-19 12:08:55 +08:00
yumoqing
67571f1af8 bugfix 2025-10-19 12:08:16 +08:00
yumoqing
91e93d778d bugfix 2025-10-19 12:06:32 +08:00
yumoqing
0223b5f660 bugfix 2025-10-19 12:04:26 +08:00
yumoqing
99ef9b20b2 bugfix 2025-10-19 12:03:40 +08:00
yumoqing
5ea4bdf171 bugfix 2025-10-19 12:03:03 +08:00
yumoqing
f1de7bd13e bugfix 2025-10-19 12:01:06 +08:00
yumoqing
49f866a1aa bugfix 2025-10-19 12:00:04 +08:00
yumoqing
3426bda470 bugfix 2025-10-19 11:47:08 +08:00
yumoqing
91e717d80f bugfix 2025-10-19 11:42:14 +08:00
yumoqing
9f12e041ce bugfix 2025-10-19 11:38:59 +08:00
yumoqing
26ac4571f4 bugfix 2025-10-19 11:33:43 +08:00
yumoqing
dec22bbc76 bugfix 2025-10-19 10:29:15 +08:00
yumoqing
212751bdfb bugfix 2025-10-18 19:43:13 +08:00
yumoqing
e10b8c5f16 bugfix 2025-10-18 19:41:23 +08:00
yumoqing
cb4bb508d3 bugfix 2025-10-18 12:05:44 +08:00
a7f651888b bugfix 2025-10-17 18:15:20 +08:00
10 changed files with 330 additions and 409 deletions

View File

@ -14,28 +14,83 @@ SQLOR is a database api for python3, it is base on the python's DBAPI2
## requirements
* python 3.5 or above
* python 3.9 or above
* asyncio
* Oracle DBAPI2 driver(cx_Oracle)
* MySQL DBAPI2 driver(mysql-connector)
* Postgresql DBAPI2 driver(psycopg2-binrary)
* Postgresql DBAPI2 driver(aiopg)
* Asynchronous MySQL driver(aiomysql)
* Asynchronous Postgresql driver(aiopg)
* clickhouse(clickhouse-connect)
* Other driver can be easy integreated
## Support Database Types
* oracle
* mysql, mariadb
* TiDB
* clickHouse
* DuckDB
* PostgreSQL
* MsSQL
## Database Description json format
password in json data is encrypted by aes.
* mysql, tidb, mariadb
```
{
"databases":{
"mydb":{
"driver":"mysql",
"kwargs":{
"user":"test",
"db":"cfae",
"password":"test123",
"host":"localhost"
}
}
}
}
```
* PostgreSQL
```
{
"databases":{
"mydb":{
"driver":"postgresql",
"kwargs":{
"user":"test",
"dbname":"cfae",
"password":"test123",
"host":"localhost"
}
}
}
}
```
* duckdb
```
{
"databases":{
"mydb":{
"driver":"duckdb",
"kwargs":{
"dbfile":"ttt.ddb"
}
}
}
}
```
## Using
```
import asyncio
from sqlor.dbpools import DBPools
from sqlor.dbpools import DBPools, sqlorContext
dbs={
"aiocfae":{
"driver":"aiomysql",
"async_mode":True,
"coding":"utf8",
"dbname":"cfae",
"mydb":{
"driver":"mysql",
"kwargs":{
"user":"test",
"db":"cfae",
@ -72,8 +127,7 @@ loop = asyncio.get_event_loop()
pool = DBPools(dbs,loop=loop)
async def testfunc():
db = DBPools()
async with db.sqlorContext('stock') as sor:
async with sqlorContext('stock') as sor:
# start a transaction
# if exception happended, all change to database will rollback
# else will commit

View File

@ -1,7 +1,7 @@
[metadata]
name=sqlor
version = 1.1.1
description = a wrap for DBAPI, to make python run sql easy and safe
version = 2.0.0
description = a new version of sqlor, each db's sor need to plugin to sqlor, and dbdriver now a isolated module
authors = yu moqing
author_email = yumoqing@gmail.com
readme = README.md
@ -12,8 +12,9 @@ packages = find:
requires_python = >=3.8
install_requires =
aiomysql
aiopg
duckdb
clickhouse-connect
clickhouse-driver
PyMySQL
aiosqlite
asyncio

View File

@ -1,7 +0,0 @@
from .mysqlor import MySqlor
class AioMysqlor(MySqlor):
@classmethod
def isMe(self,name):
return name=='aiomysql'

View File

@ -1,4 +1,6 @@
# -*- coding:utf8 -*-
# pip install clickhouse-driver
from clickhouse_driver import Client
from appPublic.argsConvert import ArgsConvert, ConditionConvert
from .sor import SQLor
from .ddl_template_clickhouse import clickhouse_ddl_tmpl
@ -95,3 +97,10 @@ WHERE database = '%s' AND table = '%s' AND is_in_primary_key = 1;
# ClickHouse 不支持外键
return "SELECT 'ClickHouse does not support foreign keys' AS msg;"
async def connect(self):
self.conn = Client(**self.dbdesc)
self.cur = self.conn
async def close(self):
self.conn.close()

View File

@ -1,7 +1,7 @@
import time
import asyncio
from traceback import format_exc
from functools import wraps
from functools import wraps, partial
import codecs
from contextlib import asynccontextmanager
@ -21,7 +21,6 @@ from .oracleor import Oracleor
from .sqlite3or import SQLite3or
from .aiosqliteor import Aiosqliteor
from .mysqlor import MySqlor
from .aiomysqlor import AioMysqlor
from .aiopostgresqlor import AioPostgresqlor
def sqlorFactory(dbdesc):
@ -37,138 +36,38 @@ def sqlorFactory(dbdesc):
k = findSubclass(driver,SQLor)
if k is None:
return SQLor(dbdesc=dbdesc)
return k(dbdesc=dbdesc)
return k(dbdesc=dbdesc.kwargs)
def sqlorFromFile(dbdef_file,coding='utf8'):
dbdef = loadf(dbdef_file)
return sqlorFactory(dbdef)
class SqlorPool:
def __init__(self, create_func, maxconn=100):
self.sema = asyncio.Semaphore(maxconn)
self.create_func = create_func
self.sqlors = []
class LifeConnect:
def __init__(self,connfunc,kw,use_max=1000,async_mode=False):
self.connfunc = connfunc
self.async_mode = async_mode
self.use_max = use_max
self.kw = kw
self.conn = None
self.used = False
async def _new_sqlor(self):
sqlor = await self.create_func()
await sqlor.connect()
x = DictObject(**{
'used': True,
'use_at': time.time(),
'sqlor':sqlor
})
self.sqlors.append(x)
return x
def print(self):
print(self.use_max)
print(self.conn)
async def _mkconn(self):
if self.async_mode:
self.conn = await self.connfunc(**self.kw)
else:
self.conn = self.connfunc(**self.kw)
self.use_cnt = 0
async def use(self):
if self.conn is None:
await self._mkconn()
wait_time = 0.2
loop_cnt = 4
while loop_cnt > 0:
if await self.testok():
return self.conn
await asyncio.sleep(wait_time)
wait_time = wait_time + 0.4
loop_cnt = loop_cnt - 1
try:
await self.conn.close()
except:
pass
self.conn = None
await self._mkconn()
raise Exception('database connect break')
async def free(self,conn):
self.use_cnt = self.use_cnt + 1
return
if self.use_cnt >= self.use_max:
await self.conn.close()
await self._mkcomm()
async def testok(self):
if self.async_mode:
async with self.conn.cursor() as cur:
try:
await cur.execute('select 1 as cnt')
return True
except:
return False
else:
cur = self.conn.cursor()
try:
cur.execute('select 1 as cnt')
r = cur.fetchall()
return True
except:
return False
finally:
cur.close()
class ConnectionPool(object):
def __init__(self,dbdesc,loop):
self.dbdesc = dbdesc
self.async_mode = dbdesc.get('async_mode',False)
self.loop = loop
self.driver = myImport(self.dbdesc['driver'])
self.maxconn = dbdesc.get('maxconn',5)
self.maxuse = dbdesc.get('maxuse',1000)
self._pool = asyncio.Queue(self.maxconn)
self.connectObject = {}
self.use_cnt = 0
self.max_use = 1000
self.e_except = None
# self.lock = asyncio.Lock()
# self.lockstatus()
def lockstatus(self):
return
self.loop.call_later(5,self.lockstatus)
print('--lock statu=',self.lock.locked(),
'--pool empty()=',self._pool.empty(),
'--full()=',self._pool.full()
)
async def _fillPool(self):
for i in range(self.maxconn):
lc = await self.connect()
i = i + 1
async def connect(self):
lc = LifeConnect(self.driver.connect,self.dbdesc['kwargs'],
use_max=self.maxuse,async_mode=self.async_mode)
await self._pool.put(lc)
return lc
def isEmpty(self):
return self._pool.empty()
def isFull(self):
return self._pool.full()
async def aquire(self):
lc = await self._pool.get()
conn = await lc.use()
"""
with await self.lock:
self.connectObject[lc.conn] = lc
"""
self.connectObject[lc.conn] = lc
return conn
async def release(self,conn):
lc = None
"""
with await self.lock:
lc = self.connectObject.get(conn,None)
del self.connectObject[conn]
"""
lc = self.connectObject.get(conn,None)
del self.connectObject[conn]
await self._pool.put(lc)
@asynccontextmanager
async def context(self):
async with self.sema:
yielded_sqlor = None
for s in self.sqlors:
if not s.used:
yielded_sqlor = s
if not yielded_sqlor:
yielded_sqlor = await self._new_sqlor()
yielded_sqlor.used = True
yielded_sqlor.use_at = time.time()
yield yielded_sqlor.sqlor
yielded_sqlor.used = False
@SingletonDecorator
class DBPools:
@ -177,8 +76,7 @@ class DBPools:
loop = asyncio.get_event_loop()
self.loop = loop
self.max_connect = max_connect
self.sema = asyncio.Semaphore(max_connect)
self._cpools = {}
self._pools = {}
self.databases = databases
self.meta = {}
@ -188,108 +86,36 @@ class DBPools:
return None
return desc.get('dbname')
def addDatabase(self,name,desc):
def addDatabase(self, name, desc):
self.databases[name] = desc
async def getSqlor(self,name):
await self.sema.acquire()
async def getSqlor(self, name):
desc = self.databases.get(name)
sor = sqlorFactory(desc)
sor.name = name
a,conn,cur = await self._aquireConn(name)
sor.setCursor(a,conn,cur)
await sor.connect()
return sor
async def freeSqlor(self,sor):
await self._releaseConn(sor.name,sor.conn,sor.cur)
self.sema.release()
@asynccontextmanager
async def sqlorContext(self,name):
async def sqlorContext(self, name):
pool = self._pools.get(name)
if pool is None:
f = partial(self.getSqlor, name)
pool = SqlorPool(f)
self._pools[name] = pool
self.e_except = None
sqlor = await self.getSqlor(name)
sqlor = None
try:
async with pool.context() as sqlor:
yield sqlor
if sqlor and sqlor.dataChanged:
await sqlor.commit()
except Exception as e:
self.e_except = e
cb = format_exc()
exception(f'sqlorContext():EXCEPTION{e}, {cb}')
if sqlor and sqlor.dataChanged:
await sqlor.rollback()
finally:
if sqlor and sqlor.dataChanged:
await sqlor.commit()
await self.freeSqlor(sqlor)
def get_exception(self):
return self.e_except
async def _aquireConn(self,dbname):
"""
p = self._cpools.get(dbname)
if p == None:
p = ConnectionPool(self.databases.get(dbname),self.loop)
await p._fillPool()
self._cpools[dbname] = p
conn = await p.aquire()
if self.isAsyncDriver(dbname):
cur = await conn.cursor()
else:
cur = conn.cursor()
return self.isAsyncDriver(dbname),conn,cur
"""
dbdesc = self.databases.get(dbname)
driver = myImport(dbdesc['driver'])
conn = None
cur = None
desc = dbdesc['kwargs'].copy()
pw = desc.get('password')
if pw:
desc['password'] = unpassword(pw)
if self.isAsyncDriver(dbname):
if dbdesc['driver'] == 'sqlite3':
conn = await driver.connect(desc['dbname'])
else:
conn = await driver.connect(**desc)
cur = await conn.cursor()
return True,conn,cur
else:
if dbdesc['driver'] == 'sqlite3':
conn = driver.connect(desc['dbname'])
else:
conn = driver.connect(**desc)
cur = conn.cursor()
return False,conn,cur
def isAsyncDriver(self,dbname):
ret = self.databases[dbname].get('async_mode',False)
return ret
async def _releaseConn(self,dbname,conn,cur):
"""
if self.isAsyncDriver(dbname):
await cur.close()
else:
try:
cur.fetchall()
except:
pass
cur.close()
p = self._cpools.get(dbname)
if p == None:
raise Exception('database (%s) not connected'%dbname)
await p.release(conn)
"""
if self.isAsyncDriver(dbname):
try:
await cur.close()
except:
pass
else:
try:
cur.fetchall()
except:
pass
cur.close()
conn.close()

View File

@ -1,4 +1,5 @@
# -*- coding:utf8 -*-
import duckdb
from appPublic.argsConvert import ArgsConvert, ConditionConvert
from .sor import SQLor
from .const import ROWS
@ -117,3 +118,15 @@ class DuckDBor(SQLor):
sqlcmd += f" AND lower(table_name) = '{tablename.lower()}'"
return sqlcmd
async def connect(self):
self.conn = duckdb.connect(self.dbdesc.dbfile)
self.cur = self.conn
self.dbname = None
async def close(self):
self.conn.close()
def unpassword(self):
pass

View File

@ -1,8 +1,7 @@
# -*- coding:utf8 -*-
from appPublic.argsConvert import ArgsConvert,ConditionConvert
import aiomysql
from .sor import SQLor
from .const import ROWS
from .ddl_template_mysql import mysql_ddl_tmpl
class MySqlor(SQLor):
ddl_template = mysql_ddl_tmpl
@ -50,7 +49,7 @@ class MySqlor(SQLor):
}
@classmethod
def isMe(self,name):
if name=='pymysql':
if name in ['mysql', 'aiosqlor', 'tidb']:
return True
return False
@ -77,7 +76,7 @@ class MySqlor(SQLor):
limit $[from_line]$,$[rows]$"""
def tablesSQL(self):
sqlcmd = """SELECT lower(TABLE_NAME) as name, lower(TABLE_COMMENT) as title FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '%s'""" % self.dbdesc.get('dbname','unknown')
sqlcmd = """SELECT lower(TABLE_NAME) as name, lower(TABLE_COMMENT) as title FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '%s'""" % self.dbname
return sqlcmd
def fieldsSQL(self,tablename=None):
@ -92,7 +91,7 @@ limit $[from_line]$,$[rows]$"""
lower(is_nullable) as nullable,
column_comment as title,
lower(table_name) as table_name
from information_schema.columns where lower(TABLE_SCHEMA) = '%s' """ % self.dbdesc.get('dbname','unknown').lower()
from information_schema.columns where lower(TABLE_SCHEMA) = '%s' """ % self.dbname
if tablename is not None:
sqlcmd = sqlcmd + """and lower(table_name)='%s';""" % tablename.lower()
return sqlcmd
@ -116,7 +115,7 @@ limit $[from_line]$,$[rows]$"""
AND R.REFERENCED_TABLE_NAME = C.REFERENCED_TABLE_NAME
WHERE C.REFERENCED_TABLE_NAME IS NOT NULL ;
and C.TABLE_SCHEMA = '%s'
""" % self.dbdesc.get('dbname','unknown').lower()
""" % self.dbname
if tablename is not None:
sqlcmd = sqlcmd + " and C.REFERENCED_TABLE_NAME = '%s'" % tablename.lower()
return sqlcmd
@ -137,7 +136,26 @@ limit $[from_line]$,$[rows]$"""
FROM
information_schema.statistics
WHERE
table_schema = '%s'""" % self.dbdesc.get('dbname','unknown')
table_schema = '%s'""" % self.dbname
if tablename is not None:
sqlcmd = sqlcmd + """ AND table_name = '%s'""" % tablename.lower()
return sqlcmd
async def connect(self):
"""
kwargs:
host:
port:
user:
password:
db:
"""
dbdesc = self.dbdesc
self.conn = await aiomysql.connect(**dbdesc)
self.cur = await self.conn.cursor()
self.dbname = dbdesc.get('db')
async def close(self):
await self.cursor.close()
await self.conn.close()

View File

@ -1,3 +1,4 @@
import aiopg
from .sor import SQLor
from .ddl_template_postgresql import postgresql_ddl_tmpl
@ -169,3 +170,22 @@ order by
i.relname""" % tablename.lower()
return sqlcmd
async def connect():
"""
kwargs:
dbname:
user:
password:
host:
port:
"""
kwargs = self.dbdesc
dns = ' '.join([f'{k}={v}' for k, v in kwargs.items()])
self.conn = await self.connect(dns)
self.cur = await self.conn.cursor()
self.dbname = kwargs.dbname.lower()
async def close():
await self.cur.close()
await self.conn.close()

View File

@ -1,3 +1,4 @@
import asyncio
from traceback import format_exc
import os
import decimal
@ -7,7 +8,10 @@ from datetime import datetime, date
import codecs
import re
import json
import inspect
from appPublic.worker import awaitify
from appPublic.myImport import myImport
from appPublic.jsonConfig import getConfig
from appPublic.dictObject import DictObject
from appPublic.unicoding import uDict
from appPublic.myTE import MyTemplateEngine
@ -15,8 +19,10 @@ from appPublic.objectAction import ObjectAction
from appPublic.argsConvert import ArgsConvert,ConditionConvert
from appPublic.registerfunction import RegisterFunction
from appPublic.log import info, exception, debug
from appPublic.aes import aes_decode_b64
from .filter import DBFilter
def db_type_2_py_type(o):
if isinstance(o,decimal.Decimal):
return float(o)
@ -74,16 +80,20 @@ class SQLor(object):
self.sqlts = sqlts
self.sqlvp = sqlvp
self.sqlvs = sqlvs
self.dbdesc = dbdesc
self.dbname = self.dbdesc.get('dbname')
if self.dbname:
self.dbname = self.dbname.lower()
self.dbdesc = dbdesc.copy()
self.unpassword()
self.dbname = None
self.writer = None
self.convfuncs = {}
self.cc = ConditionConvert()
self.dataChanged = False
self.metadatas={}
def unpassword(self):
if self.dbdesc.password:
key=getConfig().password_key
self.dbdesc.password = aes_decode_b64(key, self.dbdesc.password)
async def get_schema(self):
def concat_idx_info(idxs):
x = []
@ -164,7 +174,7 @@ class SQLor(object):
return self.cur
def recordCnt(self,sql):
ret = u"""select count(*) rcnt from (%s) rowcount_table""" % sql
ret = """select count(*) rcnt from (%s) rowcount_table""" % sql
return ret
def sortSQL(self, sql, NS):
@ -221,7 +231,23 @@ class SQLor(object):
retsql = u"""select * from (%s) filter_table where %s""" % (sql,fb)
return retsql
async def runVarSQL(self,cursor,sql,NS):
async def cur_executemany(self, cur, sql, ns):
if inspect.iscoroutinefunction(cur.executemany):
return await cur.executemany(sql, ns)
f = awaitify(cur.executemany)
return await f(sql, ns)
async def cur_execute(self, cur, sql, ns):
if inspect.iscoroutinefunction(cur.execute):
ret = await cur.execute(sql, ns)
# debug(f'-------coroutine--{ret}-{cur}----')
return ret
f = awaitify(cur.execute)
ret = await f(sql, ns)
# debug(f'------function--{ret}------')
return ret
async def runVarSQL(self, cursor, sql, NS):
"""
using a opened cursor to run a SQL statment with variable, the variable is setup in NS namespace
return a cursor with data
@ -229,10 +255,9 @@ class SQLor(object):
markedSQL, datas = self.maskingSQL(sql,NS)
datas = self.dataConvert(datas)
try:
if self.async_mode:
return await cursor.execute(markedSQL,datas)
else:
return cursor.execute(markedSQL,datas)
return await self.cur_execute(cursor,
markedSQL,
datas)
except Exception as e:
fe = format_exc()
@ -282,40 +307,55 @@ class SQLor(object):
return 'dml'
return 'ddl'
async def execute(self,sql,value,callback,**kwargs):
async def fetchone(self, cur):
if inspect.iscoroutinefunction(cur.fetchone):
ret = await cur.fetchone()
# debug(f'coro:sor.fetchone()={ret}, {type(ret)}')
return ret
ret = await cur.fetchone()
# debug(f'func:sor.fetchone()={ret}, {type(ret)}')
if isinstance(ret, asyncio.Future):
ret = ret.result()
return ret
async def execute(self, sql, value):
sqltype = self.getSqlType(sql)
cur = self.cursor()
ret = await self.runVarSQL(cur,sql,value)
if sqltype == 'qry' and callback is not None:
fields = [ i[0].lower() for i in cur.description ]
rec = None
if self.async_mode:
rec = await cur.fetchone()
else:
rec = cur.fetchone()
while rec is not None:
dic = {}
for i in range(len(fields)):
dic.update({fields[i] : db_type_2_py_type(rec[i])})
dic = DictObject(**dic)
callback(dic,**kwargs)
if self.async_mode:
rec = await cur.fetchone()
else:
rec = cur.fetchone()
ret = await self.runVarSQL(cur, sql, value)
if sqltype == 'dml':
self.dataChanged = True
return ret
async def executemany(self,sql,values):
async def _get_data(self, sql, ns):
cur = self.cursor()
markedSQL,datas = self.maskingSQL(sql,{})
sqltype = self.getSqlType(sql)
if sqltype != 'qry':
raise Exception('not select sql')
ret = await self.execute(sql, ns)
fields = [i[0].lower() for i in cur.description]
while True:
rec = await self.fetchone(cur)
if rec is None:
break
if rec is None:
break
if isinstance(rec, dict):
rec = rec.values()
dic = {}
for i in range(len(fields)):
v = db_type_2_py_type(rec[i])
dic.update({fields[i]: v})
dic = DictObject(**dic)
yield dic
async def executemany(self,sql,values):
sqltype = self.getSqlType(sql)
if sqltype != 'dml':
raise Exception('no dml sql')
cur = self.cursor()
markedSQL, _ = self.maskingSQL(sql,{})
datas = [ self.dataConvert(d) for d in values ]
if self.async_mode:
await cur.executemany(markedSQL,datas)
else:
cur.executemany(markedSQL,datas)
await self.cur_exectutemany(cur, markedSQL, datas)
def pivotSQL(self,tablename,rowFields,columnFields,valueFields):
def maxValue(columnFields,valueFields,cfvalues):
@ -366,76 +406,33 @@ class SQLor(object):
async def pivot(self,desc,tablename,rowFields,columnFields,valueFields):
sql = self.pivotSQL(tablename,rowFields,columnFields,valueFields)
desc['sql_string'] = sql
ret = []
return await self.execute(sql,{},lambda x:ret.append(x))
return await self.execute(sql,{})
def isSelectSql(self,sql):
return self.getSqlType(sql) == 'qry'
def getSQLfromDesc(self,desc):
sql = ''
if 'sql_file' in desc.keys():
sql = readsql(desc['sql_file'])
else:
sql = desc['sql_string']
return sql
async def record_count(self,desc,NS):
cnt_desc = {}
cnt_desc.update(desc)
sql = self.getSQLfromDesc(desc)
if desc.get('sql_file',False):
del cnt_desc['sql_file']
cnt_desc['sql_string'] = self.recordCnt(sql)
class Cnt:
def __init__(self):
self.recs = []
def handler(self,rec):
self.recs.append(rec)
c = Cnt()
await self.runSQL(cnt_desc,NS,c.handler)
t = c.recs[0]['rcnt']
async def record_count(self, sql, NS):
sql = self.recordCnt(sql)
async for r in self._get_data(sql, NS):
t = r.rcnt
return t
return None
async def runSQLPaging(self,desc,NS):
total = await self.record_count(desc,NS)
recs = await self.pagingdata(desc,NS)
data = {
"total":total,
"rows":recs
}
return data
async def pagingdata(self,desc,NS):
paging_desc = {}
paging_desc.update(desc)
paging_desc.update(
{
"paging":{
async def pagingdata(self, sql, NS):
paging = {
"rowsname":"rows",
"pagename":"page",
"sortname":"sort"
}
})
if desc.get('sortfield',False):
NS['sort'] = desc.get('sortfield')
sql = self.getSQLfromDesc(desc)
if desc.get('sql_file',False):
del cnt_desc['sql_file']
paging_desc['sql_string'] = self.pagingSQL(sql,
paging_desc.get('paging'),NS)
if not NS.get('sort'):
NS['sort'] = "id"
class Cnt:
def __init__(self):
self.recs = []
def handler(self,rec):
self.recs.append(rec)
c = Cnt()
await self.runSQL(paging_desc,NS,c.handler)
return c.recs
sql = self.pagingSQL(sql, paging, NS)
recs = []
async for r in self._get_data(sql, NS):
recs.append(r)
return recs
async def resultFields(self,desc,NS):
NS.update(rows=1,page=1)
@ -443,52 +440,30 @@ class SQLor(object):
ret = [ {'name':i[0],'type':i[1]} for i in self.cur.description ]
return ret
async def runSQL(self,desc,NS,callback,**kw):
class RecordHandler:
def __init__(self,ns,name):
self.ns = ns
self.name = name
self.ns[name] = []
def handler(self,rec):
self.ns[self.name].append(rec)
cur = self.cursor()
sql = self.getSQLfromDesc(desc)
if self.isSelectSql(sql):
if callback is None:
klass = desc.get('dataname','dummy')
if klass is not None:
rh = RecordHandler(NS,klass)
callback = rh.handler
else:
callback = None
await self.execute(sql,NS,callback)
async def sqlExecute(self,desc,NS):
return await self.execute(desc,NS,None)
async def sqlExe(self,sql,ns):
ret = []
r = await self.execute(sql,ns,
callback=lambda x:ret.append(x))
async def sqlExe(self, sql, ns):
sqltype = self.getSqlType(sql)
if sqltype == 'dml':
if sqltype != 'qry':
r = await self.execute(sql, ns)
return r
if 'page' in ns.keys():
cnt = await self.record_count(sql, ns)
rows = await self.pagingdata(sql, ns)
return {
'total': cnt,
'rows': rows
}
ret = []
async for r in self._get_data(sql, ns):
ret.append(r)
return ret
async def sqlPaging(self,sql,ns):
ret = []
dic = {
"sql_string":sql
}
page = ns.get('page')
if not page:
ns['page'] = 1
total = await self.record_count(dic,ns)
rows = await self.pagingdata(dic,ns)
total = await self.record_count(sql,ns)
rows = await self.pagingdata(sql,ns)
return {
'total':total,
'rows':rows
@ -497,7 +472,9 @@ class SQLor(object):
async def tables(self):
sqlstring = self.tablesSQL()
ret = []
await self.execute(sqlstring,{},lambda x:ret.append(x))
async for r in self._get_data(sqlstring,{}):
r.name = r.name.lower()
ret.append(r)
return ret
def indexesSQL(self,tablename):
@ -516,13 +493,16 @@ class SQLor(object):
if sqlstring is None:
return []
recs = []
await self.execute(sqlstring,{},lambda x:recs.append(x))
async for r in self._get_data(sqlstring, {}):
recs.append(r)
return recs
async def fields(self,tablename=None):
sqlstring = self.fieldsSQL(tablename)
sql = self.fieldsSQL(tablename)
recs = []
await self.execute(sqlstring,{},lambda x:recs.append(x))
async for r in self._get_data(sql, {}):
recs.append(r)
ret = []
for r in recs:
r.update({'type':self.db2modelTypeMapping.get(r['type'].lower(),'unknown')})
@ -531,31 +511,38 @@ class SQLor(object):
return ret
async def primary(self,tablename):
sqlstring = self.pkSQL(tablename)
sql = self.pkSQL(tablename)
recs = []
await self.execute(sqlstring,{},lambda x:recs.append(x))
# print('sql=', sqlstring, 'recs=', recs)
async for r in self._get_data(sql, {}):
recs.append(r)
# debug(f'primary("{tablename}")={recs}, {sql}')
return recs
async def fkeys(self,tablename):
sqlstring = self.fkSQL(tablename)
recs = []
await self.execute(sqlstring,{},lambda x:recs.append(x))
async for r in self._get_data(sqlstring, {}):
recs.append(r)
return recs
async def createTable(self,tabledesc):
te = MyTemplateEngine([],'utf8','utf8')
desc = {
"sql_string":te.renders(self.ddl_template,tabledesc)
}
return await self.sqlExecute(desc,{})
sql = te.renders(self.ddl_template,tabledesc)
return await self.execute(sql, {})
async def getTableDesc(self,tablename):
tablename = tablename.lower()
desc = self.getMeta(tablename)
if desc:
return desc
desc = {}
summary = [ i for i in await self.tables() if tablename.lower() == i['name'].lower() ]
tables = await self.tables()
summary = [i for i in tables if tablename == i.name]
if not summary:
e = Exception(f'table({tablename}) not exist')
exception(f'{e}{format_exc()}')
raise e
pris = await self.primary(tablename)
primary = [i['name'] for i in pris ]
summary[0]['primary'] = primary
@ -582,14 +569,14 @@ class SQLor(object):
return desc
async def rollback(self):
if self.async_mode:
if inspect.iscoroutinefunction(self.conn.rollback):
await self.conn.rollback()
else:
self.conn.rollback()
self.dataChanged = False
async def commit(self):
if self.async_mode:
if inspect.iscoroutinefunction(self.conn.commit):
await self.conn.commit()
else:
self.conn.commit()
@ -610,7 +597,7 @@ class SQLor(object):
ret = await rf.exe(rfname, ns)
if isinstance(ret, dict):
ns.update(ret)
r = await self.runSQL({'sql_string':sql},ns.copy(), None)
r = await self.execute(sql,ns.copy())
await rf.exe(f'{self.dbname}:{tablename}:c:after', ns)
return r
@ -632,11 +619,8 @@ class SQLor(object):
if 'page' in ns.keys():
if not 'sort' in ns.keys():
ns['sort'] = desc['summary'][0]['primary'][0]
dic = {
"sql_string":sql
}
total = await self.record_count(dic,ns)
rows = await self.pagingdata(dic,ns)
total = await self.record_count(sql, ns)
rows = await self.pagingdata(sql,ns)
return {
'total':total,
'rows':rows
@ -661,11 +645,11 @@ class SQLor(object):
ret = await rf.exe(f'{self.dbname}:{tablename}:u:before',ns)
if isinstance(ret, dict):
ns.update(ret)
r = await self.runSQL({'sql_string':sql},ns.copy() ,None)
r = await self.execute(sql, ns.copy())
await rf.exe(f'{self.dbname}:{tablename}:u:after',ns)
return r
async def D(self,tablename,ns):
async def D(self,tablename, ns):
desc = await self.I(tablename)
fields = [ i['name'] for i in desc['fields']]
condi = [ i for i in desc['summary'][0]['primary']]
@ -676,7 +660,12 @@ class SQLor(object):
ret = await rf.exe(f'{self.dbname}:{tablename}:d:before', ns)
if isinstance(ret, dict):
ns.update(ret)
r = await self.runSQL({'sql_string':sql},ns,None)
r = await self.execute(sql, ns)
ns = await rf.exe(f'{self.dbname}:{tablename}:d:after', ns)
return r
async def connect(self):
raise Exception('Not Implemented')
async def close(self):
raise Exception('Not Implemented')

View File

@ -1,2 +0,0 @@
# fixed sor.py C function bug.
__version__ = "0.1.3"