bugfix
This commit is contained in:
parent
cb4bb508d3
commit
e10b8c5f16
212
sqlor/sor.py
212
sqlor/sor.py
@ -7,7 +7,10 @@ from datetime import datetime, date
|
||||
import codecs
|
||||
import re
|
||||
import json
|
||||
import inspect
|
||||
from appPublic.worker import awaitify
|
||||
from appPublic.myImport import myImport
|
||||
from appPublic.jsonConfig import getConfig
|
||||
from appPublic.dictObject import DictObject
|
||||
from appPublic.unicoding import uDict
|
||||
from appPublic.myTE import MyTemplateEngine
|
||||
@ -15,8 +18,10 @@ from appPublic.objectAction import ObjectAction
|
||||
from appPublic.argsConvert import ArgsConvert,ConditionConvert
|
||||
from appPublic.registerfunction import RegisterFunction
|
||||
from appPublic.log import info, exception, debug
|
||||
from appPublic.aes import aes_decode_b64
|
||||
from .filter import DBFilter
|
||||
|
||||
|
||||
def db_type_2_py_type(o):
|
||||
if isinstance(o,decimal.Decimal):
|
||||
return float(o)
|
||||
@ -75,15 +80,18 @@ class SQLor(object):
|
||||
self.sqlvp = sqlvp
|
||||
self.sqlvs = sqlvs
|
||||
self.dbdesc = dbdesc
|
||||
self.dbname = self.dbdesc.get('dbname')
|
||||
if self.dbname:
|
||||
self.dbname = self.dbname.lower()
|
||||
self.unpassword()
|
||||
self.dbname = None
|
||||
self.writer = None
|
||||
self.convfuncs = {}
|
||||
self.cc = ConditionConvert()
|
||||
self.dataChanged = False
|
||||
self.metadatas={}
|
||||
|
||||
def unpassword(self):
|
||||
key=getConfig().password_key
|
||||
self.desc.kwargs.password = aes_decode_b64(key, self.desc.kwargs.password)
|
||||
|
||||
async def get_schema(self):
|
||||
def concat_idx_info(idxs):
|
||||
x = []
|
||||
@ -221,6 +229,18 @@ class SQLor(object):
|
||||
retsql = u"""select * from (%s) filter_table where %s""" % (sql,fb)
|
||||
return retsql
|
||||
|
||||
async def cur_executemany(self, cur, sql, ns):
|
||||
if inspect.iscoroutinefunction(cur.executemany):
|
||||
return await cur.executemany(sql, ns)
|
||||
f = awaitify(cur.executemany)
|
||||
return await f(sql, ns)
|
||||
|
||||
async def cur_execute(self, cur, sql, ns):
|
||||
if inspect.iscoroutinefunction(cur.execute):
|
||||
return await cur.execute(sql, ns)
|
||||
f = awaitify(cur.execute)
|
||||
return await f(sql, ns)
|
||||
|
||||
async def runVarSQL(self,cursor,sql,NS):
|
||||
"""
|
||||
using a opened cursor to run a SQL statment with variable, the variable is setup in NS namespace
|
||||
@ -229,10 +249,9 @@ class SQLor(object):
|
||||
markedSQL, datas = self.maskingSQL(sql,NS)
|
||||
datas = self.dataConvert(datas)
|
||||
try:
|
||||
if self.async_mode:
|
||||
return await cursor.execute(markedSQL,datas)
|
||||
else:
|
||||
return cursor.execute(markedSQL,datas)
|
||||
return await self.cur_execute(cursor,
|
||||
sql,
|
||||
datas)
|
||||
|
||||
except Exception as e:
|
||||
fe = format_exc()
|
||||
@ -282,40 +301,56 @@ class SQLor(object):
|
||||
return 'dml'
|
||||
return 'ddl'
|
||||
|
||||
async def execute(self,sql,value,callback,**kwargs):
|
||||
async def fetchone(self, cur):
|
||||
if inspect.iscoroutinefunction(cur.fetchone):
|
||||
return await cur.fetchone()
|
||||
f = awaitify(cur.fetchone)
|
||||
return await f()
|
||||
|
||||
async def execute(self, sql, value):
|
||||
sqltype = self.getSqlType(sql)
|
||||
cur = self.cursor()
|
||||
ret = await self.runVarSQL(cur,sql,value)
|
||||
if sqltype == 'qry' and callback is not None:
|
||||
fields = [ i[0].lower() for i in cur.description ]
|
||||
rec = None
|
||||
if self.async_mode:
|
||||
rec = await cur.fetchone()
|
||||
else:
|
||||
rec = cur.fetchone()
|
||||
|
||||
while rec is not None:
|
||||
while True:
|
||||
rec = await self.fetchone(cur)
|
||||
if rec is None:
|
||||
break
|
||||
dic = {}
|
||||
for i in range(len(fields)):
|
||||
dic.update({fields[i] : db_type_2_py_type(rec[i])})
|
||||
dic = DictObject(**dic)
|
||||
callback(dic,**kwargs)
|
||||
if self.async_mode:
|
||||
rec = await cur.fetchone()
|
||||
else:
|
||||
rec = cur.fetchone()
|
||||
yield dic
|
||||
if sqltype == 'dml':
|
||||
self.dataChanged = True
|
||||
return ret
|
||||
|
||||
async def _get_data(self, sql, ns):
|
||||
sqltype = self.getSqlType(sql)
|
||||
if sqltype != 'qry':
|
||||
raise Exception('not select sql')
|
||||
ret = self.execute(sql, ns)
|
||||
fields = [i[0].lower() for i in cur.description]
|
||||
while True:
|
||||
rec = await self.fetchone(cur)
|
||||
if rec is None:
|
||||
break
|
||||
dic = {}
|
||||
for i in range(len(fields)):
|
||||
v = db_type_2_py_type(rec[i])
|
||||
dic.update({fields[i]: v})
|
||||
dic = DictObject(**dic)
|
||||
yield dic
|
||||
|
||||
async def executemany(self,sql,values):
|
||||
sqltype = self.getSqlType(sql)
|
||||
if sqltype != 'dml':
|
||||
raise Exception('no dml sql')
|
||||
cur = self.cursor()
|
||||
markedSQL,datas = self.maskingSQL(sql,{})
|
||||
markedSQL, _ = self.maskingSQL(sql,{})
|
||||
datas = [ self.dataConvert(d) for d in values ]
|
||||
if self.async_mode:
|
||||
await cur.executemany(markedSQL,datas)
|
||||
else:
|
||||
cur.executemany(markedSQL,datas)
|
||||
await self.cur_exectutemany(cur, markedSQL, datas)
|
||||
|
||||
def pivotSQL(self,tablename,rowFields,columnFields,valueFields):
|
||||
def maxValue(columnFields,valueFields,cfvalues):
|
||||
@ -366,76 +401,33 @@ class SQLor(object):
|
||||
|
||||
async def pivot(self,desc,tablename,rowFields,columnFields,valueFields):
|
||||
sql = self.pivotSQL(tablename,rowFields,columnFields,valueFields)
|
||||
desc['sql_string'] = sql
|
||||
ret = []
|
||||
return await self.execute(sql,{},lambda x:ret.append(x))
|
||||
return await self.execute(sql,{})
|
||||
|
||||
def isSelectSql(self,sql):
|
||||
return self.getSqlType(sql) == 'qry'
|
||||
|
||||
def getSQLfromDesc(self,desc):
|
||||
sql = ''
|
||||
if 'sql_file' in desc.keys():
|
||||
sql = readsql(desc['sql_file'])
|
||||
else:
|
||||
sql = desc['sql_string']
|
||||
return sql
|
||||
|
||||
async def record_count(self,desc,NS):
|
||||
cnt_desc = {}
|
||||
cnt_desc.update(desc)
|
||||
sql = self.getSQLfromDesc(desc)
|
||||
if desc.get('sql_file',False):
|
||||
del cnt_desc['sql_file']
|
||||
cnt_desc['sql_string'] = self.recordCnt(sql)
|
||||
class Cnt:
|
||||
def __init__(self):
|
||||
self.recs = []
|
||||
def handler(self,rec):
|
||||
self.recs.append(rec)
|
||||
|
||||
c = Cnt()
|
||||
await self.runSQL(cnt_desc,NS,c.handler)
|
||||
t = c.recs[0]['rcnt']
|
||||
async def record_count(self,sql,NS):
|
||||
sql = self.recordCnt(sql)
|
||||
async for r in self._get_data(sql, NS):
|
||||
t = r.rcnt
|
||||
return t
|
||||
return None
|
||||
|
||||
async def runSQLPaging(self,desc,NS):
|
||||
total = await self.record_count(desc,NS)
|
||||
recs = await self.pagingdata(desc,NS)
|
||||
data = {
|
||||
"total":total,
|
||||
"rows":recs
|
||||
}
|
||||
return data
|
||||
|
||||
async def pagingdata(self,desc,NS):
|
||||
paging_desc = {}
|
||||
paging_desc.update(desc)
|
||||
paging_desc.update(
|
||||
{
|
||||
"paging":{
|
||||
async def pagingdata(self, sql, NS):
|
||||
paging = {
|
||||
"rowsname":"rows",
|
||||
"pagename":"page",
|
||||
"sortname":"sort"
|
||||
}
|
||||
})
|
||||
if desc.get('sortfield',False):
|
||||
NS['sort'] = desc.get('sortfield')
|
||||
sql = self.getSQLfromDesc(desc)
|
||||
if desc.get('sql_file',False):
|
||||
del cnt_desc['sql_file']
|
||||
paging_desc['sql_string'] = self.pagingSQL(sql,
|
||||
paging_desc.get('paging'),NS)
|
||||
if not NS.get('sort'):
|
||||
NS['sort'] = "id"
|
||||
|
||||
class Cnt:
|
||||
def __init__(self):
|
||||
self.recs = []
|
||||
def handler(self,rec):
|
||||
self.recs.append(rec)
|
||||
|
||||
c = Cnt()
|
||||
await self.runSQL(paging_desc,NS,c.handler)
|
||||
return c.recs
|
||||
sql = self.pagingSQL(sql, paging, NS)
|
||||
recs = []
|
||||
async for r in self._get_data(sql, NS):
|
||||
recs.append(r)
|
||||
return recs
|
||||
|
||||
async def resultFields(self,desc,NS):
|
||||
NS.update(rows=1,page=1)
|
||||
@ -443,28 +435,6 @@ class SQLor(object):
|
||||
ret = [ {'name':i[0],'type':i[1]} for i in self.cur.description ]
|
||||
return ret
|
||||
|
||||
async def runSQL(self,desc,NS,callback,**kw):
|
||||
class RecordHandler:
|
||||
def __init__(self,ns,name):
|
||||
self.ns = ns
|
||||
self.name = name
|
||||
self.ns[name] = []
|
||||
|
||||
def handler(self,rec):
|
||||
self.ns[self.name].append(rec)
|
||||
|
||||
cur = self.cursor()
|
||||
sql = self.getSQLfromDesc(desc)
|
||||
if self.isSelectSql(sql):
|
||||
if callback is None:
|
||||
klass = desc.get('dataname','dummy')
|
||||
if klass is not None:
|
||||
rh = RecordHandler(NS,klass)
|
||||
callback = rh.handler
|
||||
else:
|
||||
callback = None
|
||||
await self.execute(sql,NS,callback)
|
||||
|
||||
async def sqlExecute(self,desc,NS):
|
||||
return await self.execute(desc,NS,None)
|
||||
|
||||
@ -479,16 +449,12 @@ class SQLor(object):
|
||||
return ret
|
||||
|
||||
async def sqlPaging(self,sql,ns):
|
||||
ret = []
|
||||
dic = {
|
||||
"sql_string":sql
|
||||
}
|
||||
page = ns.get('page')
|
||||
if not page:
|
||||
ns['page'] = 1
|
||||
|
||||
total = await self.record_count(dic,ns)
|
||||
rows = await self.pagingdata(dic,ns)
|
||||
total = await self.record_count(sql,ns)
|
||||
rows = await self.pagingdata(sql,ns)
|
||||
return {
|
||||
'total':total,
|
||||
'rows':rows
|
||||
@ -545,10 +511,8 @@ class SQLor(object):
|
||||
|
||||
async def createTable(self,tabledesc):
|
||||
te = MyTemplateEngine([],'utf8','utf8')
|
||||
desc = {
|
||||
"sql_string":te.renders(self.ddl_template,tabledesc)
|
||||
}
|
||||
return await self.sqlExecute(desc,{})
|
||||
sql = te.renders(self.ddl_template,tabledesc)
|
||||
return await self.execute(sql, {})
|
||||
|
||||
async def getTableDesc(self,tablename):
|
||||
desc = self.getMeta(tablename)
|
||||
@ -610,7 +574,7 @@ class SQLor(object):
|
||||
ret = await rf.exe(rfname, ns)
|
||||
if isinstance(ret, dict):
|
||||
ns.update(ret)
|
||||
r = await self.runSQL({'sql_string':sql},ns.copy(), None)
|
||||
r = await self.execute(sql,ns.copy())
|
||||
await rf.exe(f'{self.dbname}:{tablename}:c:after', ns)
|
||||
return r
|
||||
|
||||
@ -632,11 +596,8 @@ class SQLor(object):
|
||||
if 'page' in ns.keys():
|
||||
if not 'sort' in ns.keys():
|
||||
ns['sort'] = desc['summary'][0]['primary'][0]
|
||||
dic = {
|
||||
"sql_string":sql
|
||||
}
|
||||
total = await self.record_count(dic,ns)
|
||||
rows = await self.pagingdata(dic,ns)
|
||||
total = await self.record_count(sql, ns)
|
||||
rows = await self.pagingdata(sql,ns)
|
||||
return {
|
||||
'total':total,
|
||||
'rows':rows
|
||||
@ -661,7 +622,7 @@ class SQLor(object):
|
||||
ret = await rf.exe(f'{self.dbname}:{tablename}:u:before',ns)
|
||||
if isinstance(ret, dict):
|
||||
ns.update(ret)
|
||||
r = await self.runSQL({'sql_string':sql},ns.copy() ,None)
|
||||
r = await self.execute(sql, ns.copy())
|
||||
await rf.exe(f'{self.dbname}:{tablename}:u:after',ns)
|
||||
return r
|
||||
|
||||
@ -676,9 +637,12 @@ class SQLor(object):
|
||||
ret = await rf.exe(f'{self.dbname}:{tablename}:d:before', ns)
|
||||
if isinstance(ret, dict):
|
||||
ns.update(ret)
|
||||
r = await self.runSQL({'sql_string':sql},ns,None)
|
||||
r = await self.execute(sql, ns)
|
||||
ns = await rf.exe(f'{self.dbname}:{tablename}:d:after', ns)
|
||||
return r
|
||||
|
||||
async def connect(desc):
|
||||
async def connect(self):
|
||||
raise Exception('Not Implemented')
|
||||
|
||||
async def close(self)
|
||||
raise Exception('Not Implemented')
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user