Compare commits

..

No commits in common. "62514a2398696cc10e627d67ce2351023004c133" and "11945861b2d376b03da8a0bfb075518ec54ee76e" have entirely different histories.

10 changed files with 410 additions and 331 deletions

View File

@ -14,83 +14,28 @@ SQLOR is a database api for python3, it is base on the python's DBAPI2
## requirements
* python 3.9 or above
* python 3.5 or above
* asyncio
* Oracle DBAPI2 driver(cx_Oracle)
* Postgresql DBAPI2 driver(aiopg)
* MySQL DBAPI2 driver(mysql-connector)
* Postgresql DBAPI2 driver(psycopg2-binrary)
* Asynchronous MySQL driver(aiomysql)
* Asynchronous Postgresql driver(aiopg)
* clickhouse(clickhouse-connect)
* Other driver can be easy integreated
## Support Database Types
* oracle
* mysql, mariadb
* TiDB
* clickHouse
* DuckDB
* PostgreSQL
* MsSQL
## Database Description json format
password in json data is encrypted by aes.
* mysql, tidb, mariadb
```
{
"databases":{
"mydb":{
"driver":"mysql",
"kwargs":{
"user":"test",
"db":"cfae",
"password":"test123",
"host":"localhost"
}
}
}
}
```
* PostgreSQL
```
{
"databases":{
"mydb":{
"driver":"postgresql",
"kwargs":{
"user":"test",
"dbname":"cfae",
"password":"test123",
"host":"localhost"
}
}
}
}
```
* duckdb
```
{
"databases":{
"mydb":{
"driver":"duckdb",
"kwargs":{
"dbfile":"ttt.ddb"
}
}
}
}
```
## Using
```
import asyncio
from sqlor.dbpools import DBPools, sqlorContext
from sqlor.dbpools import DBPools
dbs={
"mydb":{
"driver":"mysql",
"aiocfae":{
"driver":"aiomysql",
"async_mode":True,
"coding":"utf8",
"dbname":"cfae",
"kwargs":{
"user":"test",
"db":"cfae",
@ -127,7 +72,8 @@ loop = asyncio.get_event_loop()
pool = DBPools(dbs,loop=loop)
async def testfunc():
async with sqlorContext('stock') as sor:
db = DBPools()
async with db.sqlorContext('stock') as sor:
# start a transaction
# if exception happended, all change to database will rollback
# else will commit

View File

@ -1,7 +1,7 @@
[metadata]
name=sqlor
version = 2.0.0
description = a new version of sqlor, each db's sor need to plugin to sqlor, and dbdriver now a isolated module
version = 1.1.1
description = a wrap for DBAPI, to make python run sql easy and safe
authors = yu moqing
author_email = yumoqing@gmail.com
readme = README.md
@ -12,9 +12,8 @@ packages = find:
requires_python = >=3.8
install_requires =
aiomysql
aiopg
duckdb
clickhouse-driver
clickhouse-connect
PyMySQL
aiosqlite
asyncio

7
sqlor/aiomysqlor.py Executable file
View File

@ -0,0 +1,7 @@
from .mysqlor import MySqlor
class AioMysqlor(MySqlor):
@classmethod
def isMe(self,name):
return name=='aiomysql'

View File

@ -1,6 +1,4 @@
# -*- coding:utf8 -*-
# pip install clickhouse-driver
from clickhouse_driver import Client
from appPublic.argsConvert import ArgsConvert, ConditionConvert
from .sor import SQLor
from .ddl_template_clickhouse import clickhouse_ddl_tmpl
@ -97,10 +95,3 @@ WHERE database = '%s' AND table = '%s' AND is_in_primary_key = 1;
# ClickHouse 不支持外键
return "SELECT 'ClickHouse does not support foreign keys' AS msg;"
async def connect(self):
self.conn = Client(**self.dbdesc)
self.cur = self.conn
async def close(self):
self.conn.close()

View File

@ -1,7 +1,7 @@
import time
import asyncio
from traceback import format_exc
from functools import wraps, partial
from functools import wraps
import codecs
from contextlib import asynccontextmanager
@ -21,6 +21,7 @@ from .oracleor import Oracleor
from .sqlite3or import SQLite3or
from .aiosqliteor import Aiosqliteor
from .mysqlor import MySqlor
from .aiomysqlor import AioMysqlor
from .aiopostgresqlor import AioPostgresqlor
def sqlorFactory(dbdesc):
@ -36,39 +37,139 @@ def sqlorFactory(dbdesc):
k = findSubclass(driver,SQLor)
if k is None:
return SQLor(dbdesc=dbdesc)
return k(dbdesc=dbdesc.kwargs)
return k(dbdesc=dbdesc)
class SqlorPool:
def __init__(self, create_func, maxconn=100):
self.sema = asyncio.Semaphore(maxconn)
self.create_func = create_func
self.sqlors = []
def sqlorFromFile(dbdef_file,coding='utf8'):
dbdef = loadf(dbdef_file)
return sqlorFactory(dbdef)
async def _new_sqlor(self):
sqlor = await self.create_func()
await sqlor.connect()
x = DictObject(**{
'used': True,
'use_at': time.time(),
'sqlor':sqlor
})
self.sqlors.append(x)
return x
class LifeConnect:
def __init__(self,connfunc,kw,use_max=1000,async_mode=False):
self.connfunc = connfunc
self.async_mode = async_mode
self.use_max = use_max
self.kw = kw
self.conn = None
self.used = False
def print(self):
print(self.use_max)
print(self.conn)
@asynccontextmanager
async def context(self):
async with self.sema:
yielded_sqlor = None
for s in self.sqlors:
if not s.used:
yielded_sqlor = s
if not yielded_sqlor:
yielded_sqlor = await self._new_sqlor()
yielded_sqlor.used = True
yielded_sqlor.use_at = time.time()
yield yielded_sqlor.sqlor
yielded_sqlor.used = False
async def _mkconn(self):
if self.async_mode:
self.conn = await self.connfunc(**self.kw)
else:
self.conn = self.connfunc(**self.kw)
self.use_cnt = 0
async def use(self):
if self.conn is None:
await self._mkconn()
wait_time = 0.2
loop_cnt = 4
while loop_cnt > 0:
if await self.testok():
return self.conn
await asyncio.sleep(wait_time)
wait_time = wait_time + 0.4
loop_cnt = loop_cnt - 1
try:
await self.conn.close()
except:
pass
self.conn = None
await self._mkconn()
raise Exception('database connect break')
async def free(self,conn):
self.use_cnt = self.use_cnt + 1
return
if self.use_cnt >= self.use_max:
await self.conn.close()
await self._mkcomm()
async def testok(self):
if self.async_mode:
async with self.conn.cursor() as cur:
try:
await cur.execute('select 1 as cnt')
return True
except:
return False
else:
cur = self.conn.cursor()
try:
cur.execute('select 1 as cnt')
r = cur.fetchall()
return True
except:
return False
finally:
cur.close()
class ConnectionPool(object):
def __init__(self,dbdesc,loop):
self.dbdesc = dbdesc
self.async_mode = dbdesc.get('async_mode',False)
self.loop = loop
self.driver = myImport(self.dbdesc['driver'])
self.maxconn = dbdesc.get('maxconn',5)
self.maxuse = dbdesc.get('maxuse',1000)
self._pool = asyncio.Queue(self.maxconn)
self.connectObject = {}
self.use_cnt = 0
self.max_use = 1000
self.e_except = None
# self.lock = asyncio.Lock()
# self.lockstatus()
def lockstatus(self):
return
self.loop.call_later(5,self.lockstatus)
print('--lock statu=',self.lock.locked(),
'--pool empty()=',self._pool.empty(),
'--full()=',self._pool.full()
)
async def _fillPool(self):
for i in range(self.maxconn):
lc = await self.connect()
i = i + 1
async def connect(self):
lc = LifeConnect(self.driver.connect,self.dbdesc['kwargs'],
use_max=self.maxuse,async_mode=self.async_mode)
await self._pool.put(lc)
return lc
def isEmpty(self):
return self._pool.empty()
def isFull(self):
return self._pool.full()
async def aquire(self):
lc = await self._pool.get()
conn = await lc.use()
"""
with await self.lock:
self.connectObject[lc.conn] = lc
"""
self.connectObject[lc.conn] = lc
return conn
async def release(self,conn):
lc = None
"""
with await self.lock:
lc = self.connectObject.get(conn,None)
del self.connectObject[conn]
"""
lc = self.connectObject.get(conn,None)
del self.connectObject[conn]
await self._pool.put(lc)
@SingletonDecorator
class DBPools:
def __init__(self,databases={},max_connect=100,loop=None):
@ -76,7 +177,8 @@ class DBPools:
loop = asyncio.get_event_loop()
self.loop = loop
self.max_connect = max_connect
self._pools = {}
self.sema = asyncio.Semaphore(max_connect)
self._cpools = {}
self.databases = databases
self.meta = {}
@ -86,36 +188,108 @@ class DBPools:
return None
return desc.get('dbname')
def addDatabase(self, name, desc):
def addDatabase(self,name,desc):
self.databases[name] = desc
async def getSqlor(self, name):
async def getSqlor(self,name):
await self.sema.acquire()
desc = self.databases.get(name)
sor = sqlorFactory(desc)
await sor.connect()
sor.name = name
a,conn,cur = await self._aquireConn(name)
sor.setCursor(a,conn,cur)
return sor
async def freeSqlor(self,sor):
await self._releaseConn(sor.name,sor.conn,sor.cur)
self.sema.release()
@asynccontextmanager
async def sqlorContext(self, name):
pool = self._pools.get(name)
if pool is None:
f = partial(self.getSqlor, name)
pool = SqlorPool(f)
self._pools[name] = pool
async def sqlorContext(self,name):
self.e_except = None
sqlor = None
sqlor = await self.getSqlor(name)
try:
async with pool.context() as sqlor:
yield sqlor
if sqlor and sqlor.dataChanged:
await sqlor.commit()
yield sqlor
except Exception as e:
self.e_except = e
cb = format_exc()
exception(f'sqlorContext():EXCEPTION{e}, {cb}')
if sqlor and sqlor.dataChanged:
await sqlor.rollback()
finally:
if sqlor and sqlor.dataChanged:
await sqlor.commit()
await self.freeSqlor(sqlor)
def get_exception(self):
return self.e_except
async def _aquireConn(self,dbname):
"""
p = self._cpools.get(dbname)
if p == None:
p = ConnectionPool(self.databases.get(dbname),self.loop)
await p._fillPool()
self._cpools[dbname] = p
conn = await p.aquire()
if self.isAsyncDriver(dbname):
cur = await conn.cursor()
else:
cur = conn.cursor()
return self.isAsyncDriver(dbname),conn,cur
"""
dbdesc = self.databases.get(dbname)
driver = myImport(dbdesc['driver'])
conn = None
cur = None
desc = dbdesc['kwargs'].copy()
pw = desc.get('password')
if pw:
desc['password'] = unpassword(pw)
if self.isAsyncDriver(dbname):
if dbdesc['driver'] == 'sqlite3':
conn = await driver.connect(desc['dbname'])
else:
conn = await driver.connect(**desc)
cur = await conn.cursor()
return True,conn,cur
else:
if dbdesc['driver'] == 'sqlite3':
conn = driver.connect(desc['dbname'])
else:
conn = driver.connect(**desc)
cur = conn.cursor()
return False,conn,cur
def isAsyncDriver(self,dbname):
ret = self.databases[dbname].get('async_mode',False)
return ret
async def _releaseConn(self,dbname,conn,cur):
"""
if self.isAsyncDriver(dbname):
await cur.close()
else:
try:
cur.fetchall()
except:
pass
cur.close()
p = self._cpools.get(dbname)
if p == None:
raise Exception('database (%s) not connected'%dbname)
await p.release(conn)
"""
if self.isAsyncDriver(dbname):
try:
await cur.close()
except:
pass
else:
try:
cur.fetchall()
except:
pass
cur.close()
conn.close()

View File

@ -1,5 +1,4 @@
# -*- coding:utf8 -*-
import duckdb
from appPublic.argsConvert import ArgsConvert, ConditionConvert
from .sor import SQLor
from .const import ROWS
@ -118,15 +117,3 @@ class DuckDBor(SQLor):
sqlcmd += f" AND lower(table_name) = '{tablename.lower()}'"
return sqlcmd
async def connect(self):
self.conn = duckdb.connect(self.dbdesc.dbfile)
self.cur = self.conn
self.dbname = None
async def close(self):
self.conn.close()
def unpassword(self):
pass

View File

@ -1,7 +1,8 @@
# -*- coding:utf8 -*-
from appPublic.argsConvert import ArgsConvert,ConditionConvert
import aiomysql
from .sor import SQLor
from .const import ROWS
from .ddl_template_mysql import mysql_ddl_tmpl
class MySqlor(SQLor):
ddl_template = mysql_ddl_tmpl
@ -49,7 +50,7 @@ class MySqlor(SQLor):
}
@classmethod
def isMe(self,name):
if name in ['mysql', 'aiosqlor', 'tidb']:
if name=='pymysql':
return True
return False
@ -76,7 +77,7 @@ class MySqlor(SQLor):
limit $[from_line]$,$[rows]$"""
def tablesSQL(self):
sqlcmd = """SELECT lower(TABLE_NAME) as name, lower(TABLE_COMMENT) as title FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '%s'""" % self.dbname
sqlcmd = """SELECT lower(TABLE_NAME) as name, lower(TABLE_COMMENT) as title FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '%s'""" % self.dbdesc.get('dbname','unknown')
return sqlcmd
def fieldsSQL(self,tablename=None):
@ -91,7 +92,7 @@ limit $[from_line]$,$[rows]$"""
lower(is_nullable) as nullable,
column_comment as title,
lower(table_name) as table_name
from information_schema.columns where lower(TABLE_SCHEMA) = '%s' """ % self.dbname
from information_schema.columns where lower(TABLE_SCHEMA) = '%s' """ % self.dbdesc.get('dbname','unknown').lower()
if tablename is not None:
sqlcmd = sqlcmd + """and lower(table_name)='%s';""" % tablename.lower()
return sqlcmd
@ -115,7 +116,7 @@ limit $[from_line]$,$[rows]$"""
AND R.REFERENCED_TABLE_NAME = C.REFERENCED_TABLE_NAME
WHERE C.REFERENCED_TABLE_NAME IS NOT NULL ;
and C.TABLE_SCHEMA = '%s'
""" % self.dbname
""" % self.dbdesc.get('dbname','unknown').lower()
if tablename is not None:
sqlcmd = sqlcmd + " and C.REFERENCED_TABLE_NAME = '%s'" % tablename.lower()
return sqlcmd
@ -136,26 +137,7 @@ limit $[from_line]$,$[rows]$"""
FROM
information_schema.statistics
WHERE
table_schema = '%s'""" % self.dbname
table_schema = '%s'""" % self.dbdesc.get('dbname','unknown')
if tablename is not None:
sqlcmd = sqlcmd + """ AND table_name = '%s'""" % tablename.lower()
return sqlcmd
async def connect(self):
"""
kwargs:
host:
port:
user:
password:
db:
"""
dbdesc = self.dbdesc
self.conn = await aiomysql.connect(**dbdesc)
self.cur = await self.conn.cursor()
self.dbname = dbdesc.get('db')
async def close(self):
await self.cursor.close()
await self.conn.close()

View File

@ -1,4 +1,3 @@
import aiopg
from .sor import SQLor
from .ddl_template_postgresql import postgresql_ddl_tmpl
@ -170,22 +169,3 @@ order by
i.relname""" % tablename.lower()
return sqlcmd
async def connect():
"""
kwargs:
dbname:
user:
password:
host:
port:
"""
kwargs = self.dbdesc
dns = ' '.join([f'{k}={v}' for k, v in kwargs.items()])
self.conn = await self.connect(dns)
self.cur = await self.conn.cursor()
self.dbname = kwargs.dbname.lower()
async def close():
await self.cur.close()
await self.conn.close()

View File

@ -1,4 +1,3 @@
import asyncio
from traceback import format_exc
import os
import decimal
@ -8,10 +7,7 @@ from datetime import datetime, date
import codecs
import re
import json
import inspect
from appPublic.worker import awaitify
from appPublic.myImport import myImport
from appPublic.jsonConfig import getConfig
from appPublic.dictObject import DictObject
from appPublic.unicoding import uDict
from appPublic.myTE import MyTemplateEngine
@ -19,10 +15,8 @@ from appPublic.objectAction import ObjectAction
from appPublic.argsConvert import ArgsConvert,ConditionConvert
from appPublic.registerfunction import RegisterFunction
from appPublic.log import info, exception, debug
from appPublic.aes import aes_decode_b64
from .filter import DBFilter
def db_type_2_py_type(o):
if isinstance(o,decimal.Decimal):
return float(o)
@ -80,20 +74,16 @@ class SQLor(object):
self.sqlts = sqlts
self.sqlvp = sqlvp
self.sqlvs = sqlvs
self.dbdesc = dbdesc.copy()
self.unpassword()
self.dbname = None
self.dbdesc = dbdesc
self.dbname = self.dbdesc.get('dbname')
if self.dbname:
self.dbname = self.dbname.lower()
self.writer = None
self.convfuncs = {}
self.cc = ConditionConvert()
self.dataChanged = False
self.metadatas={}
def unpassword(self):
if self.dbdesc.password:
key=getConfig().password_key
self.dbdesc.password = aes_decode_b64(key, self.dbdesc.password)
async def get_schema(self):
def concat_idx_info(idxs):
x = []
@ -174,7 +164,7 @@ class SQLor(object):
return self.cur
def recordCnt(self,sql):
ret = """select count(*) rcnt from (%s) rowcount_table""" % sql
ret = u"""select count(*) rcnt from (%s) rowcount_table""" % sql
return ret
def sortSQL(self, sql, NS):
@ -231,23 +221,7 @@ class SQLor(object):
retsql = u"""select * from (%s) filter_table where %s""" % (sql,fb)
return retsql
async def cur_executemany(self, cur, sql, ns):
if inspect.iscoroutinefunction(cur.executemany):
return await cur.executemany(sql, ns)
f = awaitify(cur.executemany)
return await f(sql, ns)
async def cur_execute(self, cur, sql, ns):
if inspect.iscoroutinefunction(cur.execute):
ret = await cur.execute(sql, ns)
# debug(f'-------coroutine--{ret}-{cur}----')
return ret
f = awaitify(cur.execute)
ret = await f(sql, ns)
# debug(f'------function--{ret}------')
return ret
async def runVarSQL(self, cursor, sql, NS):
async def runVarSQL(self,cursor,sql,NS):
"""
using a opened cursor to run a SQL statment with variable, the variable is setup in NS namespace
return a cursor with data
@ -255,9 +229,10 @@ class SQLor(object):
markedSQL, datas = self.maskingSQL(sql,NS)
datas = self.dataConvert(datas)
try:
return await self.cur_execute(cursor,
markedSQL,
datas)
if self.async_mode:
return await cursor.execute(markedSQL,datas)
else:
return cursor.execute(markedSQL,datas)
except Exception as e:
fe = format_exc()
@ -307,55 +282,40 @@ class SQLor(object):
return 'dml'
return 'ddl'
async def fetchone(self, cur):
if inspect.iscoroutinefunction(cur.fetchone):
ret = await cur.fetchone()
# debug(f'coro:sor.fetchone()={ret}, {type(ret)}')
return ret
ret = await cur.fetchone()
# debug(f'func:sor.fetchone()={ret}, {type(ret)}')
if isinstance(ret, asyncio.Future):
ret = ret.result()
return ret
async def execute(self, sql, value):
async def execute(self,sql,value,callback,**kwargs):
sqltype = self.getSqlType(sql)
cur = self.cursor()
ret = await self.runVarSQL(cur, sql, value)
ret = await self.runVarSQL(cur,sql,value)
if sqltype == 'qry' and callback is not None:
fields = [ i[0].lower() for i in cur.description ]
rec = None
if self.async_mode:
rec = await cur.fetchone()
else:
rec = cur.fetchone()
while rec is not None:
dic = {}
for i in range(len(fields)):
dic.update({fields[i] : db_type_2_py_type(rec[i])})
dic = DictObject(**dic)
callback(dic,**kwargs)
if self.async_mode:
rec = await cur.fetchone()
else:
rec = cur.fetchone()
if sqltype == 'dml':
self.dataChanged = True
return ret
async def _get_data(self, sql, ns):
cur = self.cursor()
sqltype = self.getSqlType(sql)
if sqltype != 'qry':
raise Exception('not select sql')
ret = await self.execute(sql, ns)
fields = [i[0].lower() for i in cur.description]
while True:
rec = await self.fetchone(cur)
if rec is None:
break
if rec is None:
break
if isinstance(rec, dict):
rec = rec.values()
dic = {}
for i in range(len(fields)):
v = db_type_2_py_type(rec[i])
dic.update({fields[i]: v})
dic = DictObject(**dic)
yield dic
return ret
async def executemany(self,sql,values):
sqltype = self.getSqlType(sql)
if sqltype != 'dml':
raise Exception('no dml sql')
cur = self.cursor()
markedSQL, _ = self.maskingSQL(sql,{})
markedSQL,datas = self.maskingSQL(sql,{})
datas = [ self.dataConvert(d) for d in values ]
await self.cur_exectutemany(cur, markedSQL, datas)
if self.async_mode:
await cur.executemany(markedSQL,datas)
else:
cur.executemany(markedSQL,datas)
def pivotSQL(self,tablename,rowFields,columnFields,valueFields):
def maxValue(columnFields,valueFields,cfvalues):
@ -406,33 +366,76 @@ class SQLor(object):
async def pivot(self,desc,tablename,rowFields,columnFields,valueFields):
sql = self.pivotSQL(tablename,rowFields,columnFields,valueFields)
desc['sql_string'] = sql
ret = []
return await self.execute(sql,{})
return await self.execute(sql,{},lambda x:ret.append(x))
def isSelectSql(self,sql):
return self.getSqlType(sql) == 'qry'
async def record_count(self, sql, NS):
sql = self.recordCnt(sql)
async for r in self._get_data(sql, NS):
t = r.rcnt
return t
return None
def getSQLfromDesc(self,desc):
sql = ''
if 'sql_file' in desc.keys():
sql = readsql(desc['sql_file'])
else:
sql = desc['sql_string']
return sql
async def record_count(self,desc,NS):
cnt_desc = {}
cnt_desc.update(desc)
sql = self.getSQLfromDesc(desc)
if desc.get('sql_file',False):
del cnt_desc['sql_file']
cnt_desc['sql_string'] = self.recordCnt(sql)
class Cnt:
def __init__(self):
self.recs = []
def handler(self,rec):
self.recs.append(rec)
async def pagingdata(self, sql, NS):
paging = {
"rowsname":"rows",
"pagename":"page",
"sortname":"sort"
c = Cnt()
await self.runSQL(cnt_desc,NS,c.handler)
t = c.recs[0]['rcnt']
return t
async def runSQLPaging(self,desc,NS):
total = await self.record_count(desc,NS)
recs = await self.pagingdata(desc,NS)
data = {
"total":total,
"rows":recs
}
if not NS.get('sort'):
NS['sort'] = "id"
return data
async def pagingdata(self,desc,NS):
paging_desc = {}
paging_desc.update(desc)
paging_desc.update(
{
"paging":{
"rowsname":"rows",
"pagename":"page",
"sortname":"sort"
}
})
if desc.get('sortfield',False):
NS['sort'] = desc.get('sortfield')
sql = self.getSQLfromDesc(desc)
if desc.get('sql_file',False):
del cnt_desc['sql_file']
paging_desc['sql_string'] = self.pagingSQL(sql,
paging_desc.get('paging'),NS)
sql = self.pagingSQL(sql, paging, NS)
recs = []
async for r in self._get_data(sql, NS):
recs.append(r)
return recs
class Cnt:
def __init__(self):
self.recs = []
def handler(self,rec):
self.recs.append(rec)
c = Cnt()
await self.runSQL(paging_desc,NS,c.handler)
return c.recs
async def resultFields(self,desc,NS):
NS.update(rows=1,page=1)
@ -440,30 +443,52 @@ class SQLor(object):
ret = [ {'name':i[0],'type':i[1]} for i in self.cur.description ]
return ret
async def sqlExe(self, sql, ns):
sqltype = self.getSqlType(sql)
if sqltype != 'qry':
r = await self.execute(sql, ns)
return r
if 'page' in ns.keys():
cnt = await self.record_count(sql, ns)
rows = await self.pagingdata(sql, ns)
return {
'total': cnt,
'rows': rows
}
async def runSQL(self,desc,NS,callback,**kw):
class RecordHandler:
def __init__(self,ns,name):
self.ns = ns
self.name = name
self.ns[name] = []
def handler(self,rec):
self.ns[self.name].append(rec)
cur = self.cursor()
sql = self.getSQLfromDesc(desc)
if self.isSelectSql(sql):
if callback is None:
klass = desc.get('dataname','dummy')
if klass is not None:
rh = RecordHandler(NS,klass)
callback = rh.handler
else:
callback = None
await self.execute(sql,NS,callback)
async def sqlExecute(self,desc,NS):
return await self.execute(desc,NS,None)
async def sqlExe(self,sql,ns):
ret = []
async for r in self._get_data(sql, ns):
ret.append(r)
r = await self.execute(sql,ns,
callback=lambda x:ret.append(x))
sqltype = self.getSqlType(sql)
if sqltype == 'dml':
return r
return ret
async def sqlPaging(self,sql,ns):
ret = []
dic = {
"sql_string":sql
}
page = ns.get('page')
if not page:
ns['page'] = 1
total = await self.record_count(sql,ns)
rows = await self.pagingdata(sql,ns)
total = await self.record_count(dic,ns)
rows = await self.pagingdata(dic,ns)
return {
'total':total,
'rows':rows
@ -472,9 +497,7 @@ class SQLor(object):
async def tables(self):
sqlstring = self.tablesSQL()
ret = []
async for r in self._get_data(sqlstring,{}):
r.name = r.name.lower()
ret.append(r)
await self.execute(sqlstring,{},lambda x:ret.append(x))
return ret
def indexesSQL(self,tablename):
@ -493,16 +516,13 @@ class SQLor(object):
if sqlstring is None:
return []
recs = []
async for r in self._get_data(sqlstring, {}):
recs.append(r)
await self.execute(sqlstring,{},lambda x:recs.append(x))
return recs
async def fields(self,tablename=None):
sql = self.fieldsSQL(tablename)
sqlstring = self.fieldsSQL(tablename)
recs = []
async for r in self._get_data(sql, {}):
recs.append(r)
await self.execute(sqlstring,{},lambda x:recs.append(x))
ret = []
for r in recs:
r.update({'type':self.db2modelTypeMapping.get(r['type'].lower(),'unknown')})
@ -511,38 +531,31 @@ class SQLor(object):
return ret
async def primary(self,tablename):
sql = self.pkSQL(tablename)
sqlstring = self.pkSQL(tablename)
recs = []
async for r in self._get_data(sql, {}):
recs.append(r)
# debug(f'primary("{tablename}")={recs}, {sql}')
await self.execute(sqlstring,{},lambda x:recs.append(x))
# print('sql=', sqlstring, 'recs=', recs)
return recs
async def fkeys(self,tablename):
sqlstring = self.fkSQL(tablename)
recs = []
async for r in self._get_data(sqlstring, {}):
recs.append(r)
await self.execute(sqlstring,{},lambda x:recs.append(x))
return recs
async def createTable(self,tabledesc):
te = MyTemplateEngine([],'utf8','utf8')
sql = te.renders(self.ddl_template,tabledesc)
return await self.execute(sql, {})
desc = {
"sql_string":te.renders(self.ddl_template,tabledesc)
}
return await self.sqlExecute(desc,{})
async def getTableDesc(self,tablename):
tablename = tablename.lower()
desc = self.getMeta(tablename)
if desc:
return desc
desc = {}
tables = await self.tables()
summary = [i for i in tables if tablename == i.name]
if not summary:
e = Exception(f'table({tablename}) not exist')
exception(f'{e}{format_exc()}')
raise e
summary = [ i for i in await self.tables() if tablename.lower() == i['name'].lower() ]
pris = await self.primary(tablename)
primary = [i['name'] for i in pris ]
summary[0]['primary'] = primary
@ -569,14 +582,14 @@ class SQLor(object):
return desc
async def rollback(self):
if inspect.iscoroutinefunction(self.conn.rollback):
if self.async_mode:
await self.conn.rollback()
else:
self.conn.rollback()
self.dataChanged = False
async def commit(self):
if inspect.iscoroutinefunction(self.conn.commit):
if self.async_mode:
await self.conn.commit()
else:
self.conn.commit()
@ -597,7 +610,7 @@ class SQLor(object):
ret = await rf.exe(rfname, ns)
if isinstance(ret, dict):
ns.update(ret)
r = await self.execute(sql,ns.copy())
r = await self.runSQL({'sql_string':sql},ns.copy(), None)
await rf.exe(f'{self.dbname}:{tablename}:c:after', ns)
return r
@ -619,8 +632,11 @@ class SQLor(object):
if 'page' in ns.keys():
if not 'sort' in ns.keys():
ns['sort'] = desc['summary'][0]['primary'][0]
total = await self.record_count(sql, ns)
rows = await self.pagingdata(sql,ns)
dic = {
"sql_string":sql
}
total = await self.record_count(dic,ns)
rows = await self.pagingdata(dic,ns)
return {
'total':total,
'rows':rows
@ -645,11 +661,11 @@ class SQLor(object):
ret = await rf.exe(f'{self.dbname}:{tablename}:u:before',ns)
if isinstance(ret, dict):
ns.update(ret)
r = await self.execute(sql, ns.copy())
r = await self.runSQL({'sql_string':sql},ns.copy() ,None)
await rf.exe(f'{self.dbname}:{tablename}:u:after',ns)
return r
async def D(self,tablename, ns):
async def D(self,tablename,ns):
desc = await self.I(tablename)
fields = [ i['name'] for i in desc['fields']]
condi = [ i for i in desc['summary'][0]['primary']]
@ -660,12 +676,7 @@ class SQLor(object):
ret = await rf.exe(f'{self.dbname}:{tablename}:d:before', ns)
if isinstance(ret, dict):
ns.update(ret)
r = await self.execute(sql, ns)
r = await self.runSQL({'sql_string':sql},ns,None)
ns = await rf.exe(f'{self.dbname}:{tablename}:d:after', ns)
return r
async def connect(self):
raise Exception('Not Implemented')
async def close(self):
raise Exception('Not Implemented')

2
sqlor/version.py Executable file
View File

@ -0,0 +1,2 @@
# fixed sor.py C function bug.
__version__ = "0.1.3"