diff --git a/sqlor/sor.py b/sqlor/sor.py index beb5194..0876378 100755 --- a/sqlor/sor.py +++ b/sqlor/sor.py @@ -7,7 +7,10 @@ from datetime import datetime, date import codecs import re import json +import inspect +from appPublic.worker import awaitify from appPublic.myImport import myImport +from appPublic.jsonConfig import getConfig from appPublic.dictObject import DictObject from appPublic.unicoding import uDict from appPublic.myTE import MyTemplateEngine @@ -15,8 +18,10 @@ from appPublic.objectAction import ObjectAction from appPublic.argsConvert import ArgsConvert,ConditionConvert from appPublic.registerfunction import RegisterFunction from appPublic.log import info, exception, debug +from appPublic.aes import aes_decode_b64 from .filter import DBFilter + def db_type_2_py_type(o): if isinstance(o,decimal.Decimal): return float(o) @@ -75,15 +80,18 @@ class SQLor(object): self.sqlvp = sqlvp self.sqlvs = sqlvs self.dbdesc = dbdesc - self.dbname = self.dbdesc.get('dbname') - if self.dbname: - self.dbname = self.dbname.lower() + self.unpassword() + self.dbname = None self.writer = None self.convfuncs = {} self.cc = ConditionConvert() self.dataChanged = False self.metadatas={} + def unpassword(self): + key=getConfig().password_key + self.desc.kwargs.password = aes_decode_b64(key, self.desc.kwargs.password) + async def get_schema(self): def concat_idx_info(idxs): x = [] @@ -221,6 +229,18 @@ class SQLor(object): retsql = u"""select * from (%s) filter_table where %s""" % (sql,fb) return retsql + async def cur_executemany(self, cur, sql, ns): + if inspect.iscoroutinefunction(cur.executemany): + return await cur.executemany(sql, ns) + f = awaitify(cur.executemany) + return await f(sql, ns) + + async def cur_execute(self, cur, sql, ns): + if inspect.iscoroutinefunction(cur.execute): + return await cur.execute(sql, ns) + f = awaitify(cur.execute) + return await f(sql, ns) + async def runVarSQL(self,cursor,sql,NS): """ using a opened cursor to run a SQL statment with variable, the variable is setup in NS namespace @@ -229,10 +249,9 @@ class SQLor(object): markedSQL, datas = self.maskingSQL(sql,NS) datas = self.dataConvert(datas) try: - if self.async_mode: - return await cursor.execute(markedSQL,datas) - else: - return cursor.execute(markedSQL,datas) + return await self.cur_execute(cursor, + sql, + datas) except Exception as e: fe = format_exc() @@ -282,40 +301,56 @@ class SQLor(object): return 'dml' return 'ddl' - async def execute(self,sql,value,callback,**kwargs): + async def fetchone(self, cur): + if inspect.iscoroutinefunction(cur.fetchone): + return await cur.fetchone() + f = awaitify(cur.fetchone) + return await f() + + async def execute(self, sql, value): sqltype = self.getSqlType(sql) cur = self.cursor() ret = await self.runVarSQL(cur,sql,value) if sqltype == 'qry' and callback is not None: fields = [ i[0].lower() for i in cur.description ] - rec = None - if self.async_mode: - rec = await cur.fetchone() - else: - rec = cur.fetchone() - - while rec is not None: + while True: + rec = await self.fetchone(cur) + if rec is None: + break dic = {} for i in range(len(fields)): dic.update({fields[i] : db_type_2_py_type(rec[i])}) dic = DictObject(**dic) - callback(dic,**kwargs) - if self.async_mode: - rec = await cur.fetchone() - else: - rec = cur.fetchone() + yield dic if sqltype == 'dml': self.dataChanged = True return ret + async def _get_data(self, sql, ns): + sqltype = self.getSqlType(sql) + if sqltype != 'qry': + raise Exception('not select sql') + ret = self.execute(sql, ns) + fields = [i[0].lower() for i in cur.description] + while True: + rec = await self.fetchone(cur) + if rec is None: + break + dic = {} + for i in range(len(fields)): + v = db_type_2_py_type(rec[i]) + dic.update({fields[i]: v}) + dic = DictObject(**dic) + yield dic + async def executemany(self,sql,values): + sqltype = self.getSqlType(sql) + if sqltype != 'dml': + raise Exception('no dml sql') cur = self.cursor() - markedSQL,datas = self.maskingSQL(sql,{}) + markedSQL, _ = self.maskingSQL(sql,{}) datas = [ self.dataConvert(d) for d in values ] - if self.async_mode: - await cur.executemany(markedSQL,datas) - else: - cur.executemany(markedSQL,datas) + await self.cur_exectutemany(cur, markedSQL, datas) def pivotSQL(self,tablename,rowFields,columnFields,valueFields): def maxValue(columnFields,valueFields,cfvalues): @@ -366,76 +401,33 @@ class SQLor(object): async def pivot(self,desc,tablename,rowFields,columnFields,valueFields): sql = self.pivotSQL(tablename,rowFields,columnFields,valueFields) - desc['sql_string'] = sql ret = [] - return await self.execute(sql,{},lambda x:ret.append(x)) + return await self.execute(sql,{}) def isSelectSql(self,sql): return self.getSqlType(sql) == 'qry' - def getSQLfromDesc(self,desc): - sql = '' - if 'sql_file' in desc.keys(): - sql = readsql(desc['sql_file']) - else: - sql = desc['sql_string'] - return sql - - async def record_count(self,desc,NS): - cnt_desc = {} - cnt_desc.update(desc) - sql = self.getSQLfromDesc(desc) - if desc.get('sql_file',False): - del cnt_desc['sql_file'] - cnt_desc['sql_string'] = self.recordCnt(sql) - class Cnt: - def __init__(self): - self.recs = [] - def handler(self,rec): - self.recs.append(rec) + async def record_count(self,sql,NS): + sql = self.recordCnt(sql) + async for r in self._get_data(sql, NS): + t = r.rcnt + return t + return None - c = Cnt() - await self.runSQL(cnt_desc,NS,c.handler) - t = c.recs[0]['rcnt'] - return t - - async def runSQLPaging(self,desc,NS): - total = await self.record_count(desc,NS) - recs = await self.pagingdata(desc,NS) - data = { - "total":total, - "rows":recs + async def pagingdata(self, sql, NS): + paging = { + "rowsname":"rows", + "pagename":"page", + "sortname":"sort" } - return data - - async def pagingdata(self,desc,NS): - paging_desc = {} - paging_desc.update(desc) - paging_desc.update( - { - "paging":{ - "rowsname":"rows", - "pagename":"page", - "sortname":"sort" - } - }) - if desc.get('sortfield',False): - NS['sort'] = desc.get('sortfield') - sql = self.getSQLfromDesc(desc) - if desc.get('sql_file',False): - del cnt_desc['sql_file'] - paging_desc['sql_string'] = self.pagingSQL(sql, - paging_desc.get('paging'),NS) + if not NS.get('sort'): + NS['sort'] = "id" - class Cnt: - def __init__(self): - self.recs = [] - def handler(self,rec): - self.recs.append(rec) - - c = Cnt() - await self.runSQL(paging_desc,NS,c.handler) - return c.recs + sql = self.pagingSQL(sql, paging, NS) + recs = [] + async for r in self._get_data(sql, NS): + recs.append(r) + return recs async def resultFields(self,desc,NS): NS.update(rows=1,page=1) @@ -443,28 +435,6 @@ class SQLor(object): ret = [ {'name':i[0],'type':i[1]} for i in self.cur.description ] return ret - async def runSQL(self,desc,NS,callback,**kw): - class RecordHandler: - def __init__(self,ns,name): - self.ns = ns - self.name = name - self.ns[name] = [] - - def handler(self,rec): - self.ns[self.name].append(rec) - - cur = self.cursor() - sql = self.getSQLfromDesc(desc) - if self.isSelectSql(sql): - if callback is None: - klass = desc.get('dataname','dummy') - if klass is not None: - rh = RecordHandler(NS,klass) - callback = rh.handler - else: - callback = None - await self.execute(sql,NS,callback) - async def sqlExecute(self,desc,NS): return await self.execute(desc,NS,None) @@ -479,16 +449,12 @@ class SQLor(object): return ret async def sqlPaging(self,sql,ns): - ret = [] - dic = { - "sql_string":sql - } page = ns.get('page') if not page: ns['page'] = 1 - total = await self.record_count(dic,ns) - rows = await self.pagingdata(dic,ns) + total = await self.record_count(sql,ns) + rows = await self.pagingdata(sql,ns) return { 'total':total, 'rows':rows @@ -545,10 +511,8 @@ class SQLor(object): async def createTable(self,tabledesc): te = MyTemplateEngine([],'utf8','utf8') - desc = { - "sql_string":te.renders(self.ddl_template,tabledesc) - } - return await self.sqlExecute(desc,{}) + sql = te.renders(self.ddl_template,tabledesc) + return await self.execute(sql, {}) async def getTableDesc(self,tablename): desc = self.getMeta(tablename) @@ -610,7 +574,7 @@ class SQLor(object): ret = await rf.exe(rfname, ns) if isinstance(ret, dict): ns.update(ret) - r = await self.runSQL({'sql_string':sql},ns.copy(), None) + r = await self.execute(sql,ns.copy()) await rf.exe(f'{self.dbname}:{tablename}:c:after', ns) return r @@ -632,11 +596,8 @@ class SQLor(object): if 'page' in ns.keys(): if not 'sort' in ns.keys(): ns['sort'] = desc['summary'][0]['primary'][0] - dic = { - "sql_string":sql - } - total = await self.record_count(dic,ns) - rows = await self.pagingdata(dic,ns) + total = await self.record_count(sql, ns) + rows = await self.pagingdata(sql,ns) return { 'total':total, 'rows':rows @@ -661,11 +622,11 @@ class SQLor(object): ret = await rf.exe(f'{self.dbname}:{tablename}:u:before',ns) if isinstance(ret, dict): ns.update(ret) - r = await self.runSQL({'sql_string':sql},ns.copy() ,None) + r = await self.execute(sql, ns.copy()) await rf.exe(f'{self.dbname}:{tablename}:u:after',ns) return r - async def D(self,tablename,ns): + async def D(self,tablename, ns): desc = await self.I(tablename) fields = [ i['name'] for i in desc['fields']] condi = [ i for i in desc['summary'][0]['primary']] @@ -676,9 +637,12 @@ class SQLor(object): ret = await rf.exe(f'{self.dbname}:{tablename}:d:before', ns) if isinstance(ret, dict): ns.update(ret) - r = await self.runSQL({'sql_string':sql},ns,None) + r = await self.execute(sql, ns) ns = await rf.exe(f'{self.dbname}:{tablename}:d:after', ns) return r - async def connect(desc): + async def connect(self): + raise Exception('Not Implemented') + + async def close(self) raise Exception('Not Implemented')