Compare commits

..

No commits in common. "62514a2398696cc10e627d67ce2351023004c133" and "11945861b2d376b03da8a0bfb075518ec54ee76e" have entirely different histories.

10 changed files with 410 additions and 331 deletions

View File

@ -14,83 +14,28 @@ SQLOR is a database api for python3, it is base on the python's DBAPI2
## requirements ## requirements
* python 3.9 or above * python 3.5 or above
* asyncio * asyncio
* Oracle DBAPI2 driver(cx_Oracle) * Oracle DBAPI2 driver(cx_Oracle)
* Postgresql DBAPI2 driver(aiopg) * MySQL DBAPI2 driver(mysql-connector)
* Postgresql DBAPI2 driver(psycopg2-binrary)
* Asynchronous MySQL driver(aiomysql) * Asynchronous MySQL driver(aiomysql)
* Asynchronous Postgresql driver(aiopg) * Asynchronous Postgresql driver(aiopg)
* clickhouse(clickhouse-connect)
* Other driver can be easy integreated * Other driver can be easy integreated
## Support Database Types
* oracle
* mysql, mariadb
* TiDB
* clickHouse
* DuckDB
* PostgreSQL
* MsSQL
## Database Description json format
password in json data is encrypted by aes.
* mysql, tidb, mariadb
```
{
"databases":{
"mydb":{
"driver":"mysql",
"kwargs":{
"user":"test",
"db":"cfae",
"password":"test123",
"host":"localhost"
}
}
}
}
```
* PostgreSQL
```
{
"databases":{
"mydb":{
"driver":"postgresql",
"kwargs":{
"user":"test",
"dbname":"cfae",
"password":"test123",
"host":"localhost"
}
}
}
}
```
* duckdb
```
{
"databases":{
"mydb":{
"driver":"duckdb",
"kwargs":{
"dbfile":"ttt.ddb"
}
}
}
}
```
## Using ## Using
``` ```
import asyncio import asyncio
from sqlor.dbpools import DBPools, sqlorContext from sqlor.dbpools import DBPools
dbs={ dbs={
"mydb":{ "aiocfae":{
"driver":"mysql", "driver":"aiomysql",
"async_mode":True,
"coding":"utf8",
"dbname":"cfae",
"kwargs":{ "kwargs":{
"user":"test", "user":"test",
"db":"cfae", "db":"cfae",
@ -127,7 +72,8 @@ loop = asyncio.get_event_loop()
pool = DBPools(dbs,loop=loop) pool = DBPools(dbs,loop=loop)
async def testfunc(): async def testfunc():
async with sqlorContext('stock') as sor: db = DBPools()
async with db.sqlorContext('stock') as sor:
# start a transaction # start a transaction
# if exception happended, all change to database will rollback # if exception happended, all change to database will rollback
# else will commit # else will commit

View File

@ -1,7 +1,7 @@
[metadata] [metadata]
name=sqlor name=sqlor
version = 2.0.0 version = 1.1.1
description = a new version of sqlor, each db's sor need to plugin to sqlor, and dbdriver now a isolated module description = a wrap for DBAPI, to make python run sql easy and safe
authors = yu moqing authors = yu moqing
author_email = yumoqing@gmail.com author_email = yumoqing@gmail.com
readme = README.md readme = README.md
@ -12,9 +12,8 @@ packages = find:
requires_python = >=3.8 requires_python = >=3.8
install_requires = install_requires =
aiomysql aiomysql
aiopg
duckdb duckdb
clickhouse-driver clickhouse-connect
PyMySQL PyMySQL
aiosqlite aiosqlite
asyncio asyncio

7
sqlor/aiomysqlor.py Executable file
View File

@ -0,0 +1,7 @@
from .mysqlor import MySqlor
class AioMysqlor(MySqlor):
@classmethod
def isMe(self,name):
return name=='aiomysql'

View File

@ -1,6 +1,4 @@
# -*- coding:utf8 -*- # -*- coding:utf8 -*-
# pip install clickhouse-driver
from clickhouse_driver import Client
from appPublic.argsConvert import ArgsConvert, ConditionConvert from appPublic.argsConvert import ArgsConvert, ConditionConvert
from .sor import SQLor from .sor import SQLor
from .ddl_template_clickhouse import clickhouse_ddl_tmpl from .ddl_template_clickhouse import clickhouse_ddl_tmpl
@ -97,10 +95,3 @@ WHERE database = '%s' AND table = '%s' AND is_in_primary_key = 1;
# ClickHouse 不支持外键 # ClickHouse 不支持外键
return "SELECT 'ClickHouse does not support foreign keys' AS msg;" return "SELECT 'ClickHouse does not support foreign keys' AS msg;"
async def connect(self):
self.conn = Client(**self.dbdesc)
self.cur = self.conn
async def close(self):
self.conn.close()

View File

@ -1,7 +1,7 @@
import time
import asyncio import asyncio
from traceback import format_exc from traceback import format_exc
from functools import wraps, partial from functools import wraps
import codecs import codecs
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
@ -21,6 +21,7 @@ from .oracleor import Oracleor
from .sqlite3or import SQLite3or from .sqlite3or import SQLite3or
from .aiosqliteor import Aiosqliteor from .aiosqliteor import Aiosqliteor
from .mysqlor import MySqlor from .mysqlor import MySqlor
from .aiomysqlor import AioMysqlor
from .aiopostgresqlor import AioPostgresqlor from .aiopostgresqlor import AioPostgresqlor
def sqlorFactory(dbdesc): def sqlorFactory(dbdesc):
@ -36,39 +37,139 @@ def sqlorFactory(dbdesc):
k = findSubclass(driver,SQLor) k = findSubclass(driver,SQLor)
if k is None: if k is None:
return SQLor(dbdesc=dbdesc) return SQLor(dbdesc=dbdesc)
return k(dbdesc=dbdesc.kwargs) return k(dbdesc=dbdesc)
class SqlorPool: def sqlorFromFile(dbdef_file,coding='utf8'):
def __init__(self, create_func, maxconn=100): dbdef = loadf(dbdef_file)
self.sema = asyncio.Semaphore(maxconn) return sqlorFactory(dbdef)
self.create_func = create_func
self.sqlors = []
async def _new_sqlor(self): class LifeConnect:
sqlor = await self.create_func() def __init__(self,connfunc,kw,use_max=1000,async_mode=False):
await sqlor.connect() self.connfunc = connfunc
x = DictObject(**{ self.async_mode = async_mode
'used': True, self.use_max = use_max
'use_at': time.time(), self.kw = kw
'sqlor':sqlor self.conn = None
}) self.used = False
self.sqlors.append(x)
return x def print(self):
print(self.use_max)
print(self.conn)
@asynccontextmanager async def _mkconn(self):
async def context(self): if self.async_mode:
async with self.sema: self.conn = await self.connfunc(**self.kw)
yielded_sqlor = None else:
for s in self.sqlors: self.conn = self.connfunc(**self.kw)
if not s.used: self.use_cnt = 0
yielded_sqlor = s
if not yielded_sqlor:
yielded_sqlor = await self._new_sqlor()
yielded_sqlor.used = True
yielded_sqlor.use_at = time.time()
yield yielded_sqlor.sqlor
yielded_sqlor.used = False
async def use(self):
if self.conn is None:
await self._mkconn()
wait_time = 0.2
loop_cnt = 4
while loop_cnt > 0:
if await self.testok():
return self.conn
await asyncio.sleep(wait_time)
wait_time = wait_time + 0.4
loop_cnt = loop_cnt - 1
try:
await self.conn.close()
except:
pass
self.conn = None
await self._mkconn()
raise Exception('database connect break')
async def free(self,conn):
self.use_cnt = self.use_cnt + 1
return
if self.use_cnt >= self.use_max:
await self.conn.close()
await self._mkcomm()
async def testok(self):
if self.async_mode:
async with self.conn.cursor() as cur:
try:
await cur.execute('select 1 as cnt')
return True
except:
return False
else:
cur = self.conn.cursor()
try:
cur.execute('select 1 as cnt')
r = cur.fetchall()
return True
except:
return False
finally:
cur.close()
class ConnectionPool(object):
def __init__(self,dbdesc,loop):
self.dbdesc = dbdesc
self.async_mode = dbdesc.get('async_mode',False)
self.loop = loop
self.driver = myImport(self.dbdesc['driver'])
self.maxconn = dbdesc.get('maxconn',5)
self.maxuse = dbdesc.get('maxuse',1000)
self._pool = asyncio.Queue(self.maxconn)
self.connectObject = {}
self.use_cnt = 0
self.max_use = 1000
self.e_except = None
# self.lock = asyncio.Lock()
# self.lockstatus()
def lockstatus(self):
return
self.loop.call_later(5,self.lockstatus)
print('--lock statu=',self.lock.locked(),
'--pool empty()=',self._pool.empty(),
'--full()=',self._pool.full()
)
async def _fillPool(self):
for i in range(self.maxconn):
lc = await self.connect()
i = i + 1
async def connect(self):
lc = LifeConnect(self.driver.connect,self.dbdesc['kwargs'],
use_max=self.maxuse,async_mode=self.async_mode)
await self._pool.put(lc)
return lc
def isEmpty(self):
return self._pool.empty()
def isFull(self):
return self._pool.full()
async def aquire(self):
lc = await self._pool.get()
conn = await lc.use()
"""
with await self.lock:
self.connectObject[lc.conn] = lc
"""
self.connectObject[lc.conn] = lc
return conn
async def release(self,conn):
lc = None
"""
with await self.lock:
lc = self.connectObject.get(conn,None)
del self.connectObject[conn]
"""
lc = self.connectObject.get(conn,None)
del self.connectObject[conn]
await self._pool.put(lc)
@SingletonDecorator @SingletonDecorator
class DBPools: class DBPools:
def __init__(self,databases={},max_connect=100,loop=None): def __init__(self,databases={},max_connect=100,loop=None):
@ -76,7 +177,8 @@ class DBPools:
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
self.loop = loop self.loop = loop
self.max_connect = max_connect self.max_connect = max_connect
self._pools = {} self.sema = asyncio.Semaphore(max_connect)
self._cpools = {}
self.databases = databases self.databases = databases
self.meta = {} self.meta = {}
@ -86,36 +188,108 @@ class DBPools:
return None return None
return desc.get('dbname') return desc.get('dbname')
def addDatabase(self, name, desc): def addDatabase(self,name,desc):
self.databases[name] = desc self.databases[name] = desc
async def getSqlor(self, name): async def getSqlor(self,name):
await self.sema.acquire()
desc = self.databases.get(name) desc = self.databases.get(name)
sor = sqlorFactory(desc) sor = sqlorFactory(desc)
await sor.connect() sor.name = name
a,conn,cur = await self._aquireConn(name)
sor.setCursor(a,conn,cur)
return sor return sor
async def freeSqlor(self,sor):
await self._releaseConn(sor.name,sor.conn,sor.cur)
self.sema.release()
@asynccontextmanager @asynccontextmanager
async def sqlorContext(self, name): async def sqlorContext(self,name):
pool = self._pools.get(name)
if pool is None:
f = partial(self.getSqlor, name)
pool = SqlorPool(f)
self._pools[name] = pool
self.e_except = None self.e_except = None
sqlor = None sqlor = await self.getSqlor(name)
try: try:
async with pool.context() as sqlor: yield sqlor
yield sqlor
if sqlor and sqlor.dataChanged:
await sqlor.commit()
except Exception as e: except Exception as e:
self.e_except = e self.e_except = e
cb = format_exc() cb = format_exc()
exception(f'sqlorContext():EXCEPTION{e}, {cb}') exception(f'sqlorContext():EXCEPTION{e}, {cb}')
if sqlor and sqlor.dataChanged: if sqlor and sqlor.dataChanged:
await sqlor.rollback() await sqlor.rollback()
finally:
if sqlor and sqlor.dataChanged:
await sqlor.commit()
await self.freeSqlor(sqlor)
def get_exception(self): def get_exception(self):
return self.e_except return self.e_except
async def _aquireConn(self,dbname):
"""
p = self._cpools.get(dbname)
if p == None:
p = ConnectionPool(self.databases.get(dbname),self.loop)
await p._fillPool()
self._cpools[dbname] = p
conn = await p.aquire()
if self.isAsyncDriver(dbname):
cur = await conn.cursor()
else:
cur = conn.cursor()
return self.isAsyncDriver(dbname),conn,cur
"""
dbdesc = self.databases.get(dbname)
driver = myImport(dbdesc['driver'])
conn = None
cur = None
desc = dbdesc['kwargs'].copy()
pw = desc.get('password')
if pw:
desc['password'] = unpassword(pw)
if self.isAsyncDriver(dbname):
if dbdesc['driver'] == 'sqlite3':
conn = await driver.connect(desc['dbname'])
else:
conn = await driver.connect(**desc)
cur = await conn.cursor()
return True,conn,cur
else:
if dbdesc['driver'] == 'sqlite3':
conn = driver.connect(desc['dbname'])
else:
conn = driver.connect(**desc)
cur = conn.cursor()
return False,conn,cur
def isAsyncDriver(self,dbname):
ret = self.databases[dbname].get('async_mode',False)
return ret
async def _releaseConn(self,dbname,conn,cur):
"""
if self.isAsyncDriver(dbname):
await cur.close()
else:
try:
cur.fetchall()
except:
pass
cur.close()
p = self._cpools.get(dbname)
if p == None:
raise Exception('database (%s) not connected'%dbname)
await p.release(conn)
"""
if self.isAsyncDriver(dbname):
try:
await cur.close()
except:
pass
else:
try:
cur.fetchall()
except:
pass
cur.close()
conn.close()

View File

@ -1,5 +1,4 @@
# -*- coding:utf8 -*- # -*- coding:utf8 -*-
import duckdb
from appPublic.argsConvert import ArgsConvert, ConditionConvert from appPublic.argsConvert import ArgsConvert, ConditionConvert
from .sor import SQLor from .sor import SQLor
from .const import ROWS from .const import ROWS
@ -118,15 +117,3 @@ class DuckDBor(SQLor):
sqlcmd += f" AND lower(table_name) = '{tablename.lower()}'" sqlcmd += f" AND lower(table_name) = '{tablename.lower()}'"
return sqlcmd return sqlcmd
async def connect(self):
self.conn = duckdb.connect(self.dbdesc.dbfile)
self.cur = self.conn
self.dbname = None
async def close(self):
self.conn.close()
def unpassword(self):
pass

View File

@ -1,7 +1,8 @@
# -*- coding:utf8 -*- # -*- coding:utf8 -*-
from appPublic.argsConvert import ArgsConvert,ConditionConvert from appPublic.argsConvert import ArgsConvert,ConditionConvert
import aiomysql
from .sor import SQLor from .sor import SQLor
from .const import ROWS
from .ddl_template_mysql import mysql_ddl_tmpl from .ddl_template_mysql import mysql_ddl_tmpl
class MySqlor(SQLor): class MySqlor(SQLor):
ddl_template = mysql_ddl_tmpl ddl_template = mysql_ddl_tmpl
@ -49,7 +50,7 @@ class MySqlor(SQLor):
} }
@classmethod @classmethod
def isMe(self,name): def isMe(self,name):
if name in ['mysql', 'aiosqlor', 'tidb']: if name=='pymysql':
return True return True
return False return False
@ -76,7 +77,7 @@ class MySqlor(SQLor):
limit $[from_line]$,$[rows]$""" limit $[from_line]$,$[rows]$"""
def tablesSQL(self): def tablesSQL(self):
sqlcmd = """SELECT lower(TABLE_NAME) as name, lower(TABLE_COMMENT) as title FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '%s'""" % self.dbname sqlcmd = """SELECT lower(TABLE_NAME) as name, lower(TABLE_COMMENT) as title FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '%s'""" % self.dbdesc.get('dbname','unknown')
return sqlcmd return sqlcmd
def fieldsSQL(self,tablename=None): def fieldsSQL(self,tablename=None):
@ -91,7 +92,7 @@ limit $[from_line]$,$[rows]$"""
lower(is_nullable) as nullable, lower(is_nullable) as nullable,
column_comment as title, column_comment as title,
lower(table_name) as table_name lower(table_name) as table_name
from information_schema.columns where lower(TABLE_SCHEMA) = '%s' """ % self.dbname from information_schema.columns where lower(TABLE_SCHEMA) = '%s' """ % self.dbdesc.get('dbname','unknown').lower()
if tablename is not None: if tablename is not None:
sqlcmd = sqlcmd + """and lower(table_name)='%s';""" % tablename.lower() sqlcmd = sqlcmd + """and lower(table_name)='%s';""" % tablename.lower()
return sqlcmd return sqlcmd
@ -115,7 +116,7 @@ limit $[from_line]$,$[rows]$"""
AND R.REFERENCED_TABLE_NAME = C.REFERENCED_TABLE_NAME AND R.REFERENCED_TABLE_NAME = C.REFERENCED_TABLE_NAME
WHERE C.REFERENCED_TABLE_NAME IS NOT NULL ; WHERE C.REFERENCED_TABLE_NAME IS NOT NULL ;
and C.TABLE_SCHEMA = '%s' and C.TABLE_SCHEMA = '%s'
""" % self.dbname """ % self.dbdesc.get('dbname','unknown').lower()
if tablename is not None: if tablename is not None:
sqlcmd = sqlcmd + " and C.REFERENCED_TABLE_NAME = '%s'" % tablename.lower() sqlcmd = sqlcmd + " and C.REFERENCED_TABLE_NAME = '%s'" % tablename.lower()
return sqlcmd return sqlcmd
@ -136,26 +137,7 @@ limit $[from_line]$,$[rows]$"""
FROM FROM
information_schema.statistics information_schema.statistics
WHERE WHERE
table_schema = '%s'""" % self.dbname table_schema = '%s'""" % self.dbdesc.get('dbname','unknown')
if tablename is not None: if tablename is not None:
sqlcmd = sqlcmd + """ AND table_name = '%s'""" % tablename.lower() sqlcmd = sqlcmd + """ AND table_name = '%s'""" % tablename.lower()
return sqlcmd return sqlcmd
async def connect(self):
"""
kwargs:
host:
port:
user:
password:
db:
"""
dbdesc = self.dbdesc
self.conn = await aiomysql.connect(**dbdesc)
self.cur = await self.conn.cursor()
self.dbname = dbdesc.get('db')
async def close(self):
await self.cursor.close()
await self.conn.close()

View File

@ -1,4 +1,3 @@
import aiopg
from .sor import SQLor from .sor import SQLor
from .ddl_template_postgresql import postgresql_ddl_tmpl from .ddl_template_postgresql import postgresql_ddl_tmpl
@ -170,22 +169,3 @@ order by
i.relname""" % tablename.lower() i.relname""" % tablename.lower()
return sqlcmd return sqlcmd
async def connect():
"""
kwargs:
dbname:
user:
password:
host:
port:
"""
kwargs = self.dbdesc
dns = ' '.join([f'{k}={v}' for k, v in kwargs.items()])
self.conn = await self.connect(dns)
self.cur = await self.conn.cursor()
self.dbname = kwargs.dbname.lower()
async def close():
await self.cur.close()
await self.conn.close()

View File

@ -1,4 +1,3 @@
import asyncio
from traceback import format_exc from traceback import format_exc
import os import os
import decimal import decimal
@ -8,10 +7,7 @@ from datetime import datetime, date
import codecs import codecs
import re import re
import json import json
import inspect
from appPublic.worker import awaitify
from appPublic.myImport import myImport from appPublic.myImport import myImport
from appPublic.jsonConfig import getConfig
from appPublic.dictObject import DictObject from appPublic.dictObject import DictObject
from appPublic.unicoding import uDict from appPublic.unicoding import uDict
from appPublic.myTE import MyTemplateEngine from appPublic.myTE import MyTemplateEngine
@ -19,10 +15,8 @@ from appPublic.objectAction import ObjectAction
from appPublic.argsConvert import ArgsConvert,ConditionConvert from appPublic.argsConvert import ArgsConvert,ConditionConvert
from appPublic.registerfunction import RegisterFunction from appPublic.registerfunction import RegisterFunction
from appPublic.log import info, exception, debug from appPublic.log import info, exception, debug
from appPublic.aes import aes_decode_b64
from .filter import DBFilter from .filter import DBFilter
def db_type_2_py_type(o): def db_type_2_py_type(o):
if isinstance(o,decimal.Decimal): if isinstance(o,decimal.Decimal):
return float(o) return float(o)
@ -80,20 +74,16 @@ class SQLor(object):
self.sqlts = sqlts self.sqlts = sqlts
self.sqlvp = sqlvp self.sqlvp = sqlvp
self.sqlvs = sqlvs self.sqlvs = sqlvs
self.dbdesc = dbdesc.copy() self.dbdesc = dbdesc
self.unpassword() self.dbname = self.dbdesc.get('dbname')
self.dbname = None if self.dbname:
self.dbname = self.dbname.lower()
self.writer = None self.writer = None
self.convfuncs = {} self.convfuncs = {}
self.cc = ConditionConvert() self.cc = ConditionConvert()
self.dataChanged = False self.dataChanged = False
self.metadatas={} self.metadatas={}
def unpassword(self):
if self.dbdesc.password:
key=getConfig().password_key
self.dbdesc.password = aes_decode_b64(key, self.dbdesc.password)
async def get_schema(self): async def get_schema(self):
def concat_idx_info(idxs): def concat_idx_info(idxs):
x = [] x = []
@ -174,7 +164,7 @@ class SQLor(object):
return self.cur return self.cur
def recordCnt(self,sql): def recordCnt(self,sql):
ret = """select count(*) rcnt from (%s) rowcount_table""" % sql ret = u"""select count(*) rcnt from (%s) rowcount_table""" % sql
return ret return ret
def sortSQL(self, sql, NS): def sortSQL(self, sql, NS):
@ -231,23 +221,7 @@ class SQLor(object):
retsql = u"""select * from (%s) filter_table where %s""" % (sql,fb) retsql = u"""select * from (%s) filter_table where %s""" % (sql,fb)
return retsql return retsql
async def cur_executemany(self, cur, sql, ns): async def runVarSQL(self,cursor,sql,NS):
if inspect.iscoroutinefunction(cur.executemany):
return await cur.executemany(sql, ns)
f = awaitify(cur.executemany)
return await f(sql, ns)
async def cur_execute(self, cur, sql, ns):
if inspect.iscoroutinefunction(cur.execute):
ret = await cur.execute(sql, ns)
# debug(f'-------coroutine--{ret}-{cur}----')
return ret
f = awaitify(cur.execute)
ret = await f(sql, ns)
# debug(f'------function--{ret}------')
return ret
async def runVarSQL(self, cursor, sql, NS):
""" """
using a opened cursor to run a SQL statment with variable, the variable is setup in NS namespace using a opened cursor to run a SQL statment with variable, the variable is setup in NS namespace
return a cursor with data return a cursor with data
@ -255,9 +229,10 @@ class SQLor(object):
markedSQL, datas = self.maskingSQL(sql,NS) markedSQL, datas = self.maskingSQL(sql,NS)
datas = self.dataConvert(datas) datas = self.dataConvert(datas)
try: try:
return await self.cur_execute(cursor, if self.async_mode:
markedSQL, return await cursor.execute(markedSQL,datas)
datas) else:
return cursor.execute(markedSQL,datas)
except Exception as e: except Exception as e:
fe = format_exc() fe = format_exc()
@ -307,55 +282,40 @@ class SQLor(object):
return 'dml' return 'dml'
return 'ddl' return 'ddl'
async def fetchone(self, cur): async def execute(self,sql,value,callback,**kwargs):
if inspect.iscoroutinefunction(cur.fetchone):
ret = await cur.fetchone()
# debug(f'coro:sor.fetchone()={ret}, {type(ret)}')
return ret
ret = await cur.fetchone()
# debug(f'func:sor.fetchone()={ret}, {type(ret)}')
if isinstance(ret, asyncio.Future):
ret = ret.result()
return ret
async def execute(self, sql, value):
sqltype = self.getSqlType(sql) sqltype = self.getSqlType(sql)
cur = self.cursor() cur = self.cursor()
ret = await self.runVarSQL(cur, sql, value) ret = await self.runVarSQL(cur,sql,value)
if sqltype == 'qry' and callback is not None:
fields = [ i[0].lower() for i in cur.description ]
rec = None
if self.async_mode:
rec = await cur.fetchone()
else:
rec = cur.fetchone()
while rec is not None:
dic = {}
for i in range(len(fields)):
dic.update({fields[i] : db_type_2_py_type(rec[i])})
dic = DictObject(**dic)
callback(dic,**kwargs)
if self.async_mode:
rec = await cur.fetchone()
else:
rec = cur.fetchone()
if sqltype == 'dml': if sqltype == 'dml':
self.dataChanged = True self.dataChanged = True
return ret return ret
async def _get_data(self, sql, ns):
cur = self.cursor()
sqltype = self.getSqlType(sql)
if sqltype != 'qry':
raise Exception('not select sql')
ret = await self.execute(sql, ns)
fields = [i[0].lower() for i in cur.description]
while True:
rec = await self.fetchone(cur)
if rec is None:
break
if rec is None:
break
if isinstance(rec, dict):
rec = rec.values()
dic = {}
for i in range(len(fields)):
v = db_type_2_py_type(rec[i])
dic.update({fields[i]: v})
dic = DictObject(**dic)
yield dic
async def executemany(self,sql,values): async def executemany(self,sql,values):
sqltype = self.getSqlType(sql)
if sqltype != 'dml':
raise Exception('no dml sql')
cur = self.cursor() cur = self.cursor()
markedSQL, _ = self.maskingSQL(sql,{}) markedSQL,datas = self.maskingSQL(sql,{})
datas = [ self.dataConvert(d) for d in values ] datas = [ self.dataConvert(d) for d in values ]
await self.cur_exectutemany(cur, markedSQL, datas) if self.async_mode:
await cur.executemany(markedSQL,datas)
else:
cur.executemany(markedSQL,datas)
def pivotSQL(self,tablename,rowFields,columnFields,valueFields): def pivotSQL(self,tablename,rowFields,columnFields,valueFields):
def maxValue(columnFields,valueFields,cfvalues): def maxValue(columnFields,valueFields,cfvalues):
@ -406,33 +366,76 @@ class SQLor(object):
async def pivot(self,desc,tablename,rowFields,columnFields,valueFields): async def pivot(self,desc,tablename,rowFields,columnFields,valueFields):
sql = self.pivotSQL(tablename,rowFields,columnFields,valueFields) sql = self.pivotSQL(tablename,rowFields,columnFields,valueFields)
desc['sql_string'] = sql
ret = [] ret = []
return await self.execute(sql,{}) return await self.execute(sql,{},lambda x:ret.append(x))
def isSelectSql(self,sql): def isSelectSql(self,sql):
return self.getSqlType(sql) == 'qry' return self.getSqlType(sql) == 'qry'
async def record_count(self, sql, NS): def getSQLfromDesc(self,desc):
sql = self.recordCnt(sql) sql = ''
async for r in self._get_data(sql, NS): if 'sql_file' in desc.keys():
t = r.rcnt sql = readsql(desc['sql_file'])
return t else:
return None sql = desc['sql_string']
return sql
async def record_count(self,desc,NS):
cnt_desc = {}
cnt_desc.update(desc)
sql = self.getSQLfromDesc(desc)
if desc.get('sql_file',False):
del cnt_desc['sql_file']
cnt_desc['sql_string'] = self.recordCnt(sql)
class Cnt:
def __init__(self):
self.recs = []
def handler(self,rec):
self.recs.append(rec)
async def pagingdata(self, sql, NS): c = Cnt()
paging = { await self.runSQL(cnt_desc,NS,c.handler)
"rowsname":"rows", t = c.recs[0]['rcnt']
"pagename":"page", return t
"sortname":"sort"
async def runSQLPaging(self,desc,NS):
total = await self.record_count(desc,NS)
recs = await self.pagingdata(desc,NS)
data = {
"total":total,
"rows":recs
} }
if not NS.get('sort'): return data
NS['sort'] = "id"
async def pagingdata(self,desc,NS):
paging_desc = {}
paging_desc.update(desc)
paging_desc.update(
{
"paging":{
"rowsname":"rows",
"pagename":"page",
"sortname":"sort"
}
})
if desc.get('sortfield',False):
NS['sort'] = desc.get('sortfield')
sql = self.getSQLfromDesc(desc)
if desc.get('sql_file',False):
del cnt_desc['sql_file']
paging_desc['sql_string'] = self.pagingSQL(sql,
paging_desc.get('paging'),NS)
sql = self.pagingSQL(sql, paging, NS) class Cnt:
recs = [] def __init__(self):
async for r in self._get_data(sql, NS): self.recs = []
recs.append(r) def handler(self,rec):
return recs self.recs.append(rec)
c = Cnt()
await self.runSQL(paging_desc,NS,c.handler)
return c.recs
async def resultFields(self,desc,NS): async def resultFields(self,desc,NS):
NS.update(rows=1,page=1) NS.update(rows=1,page=1)
@ -440,30 +443,52 @@ class SQLor(object):
ret = [ {'name':i[0],'type':i[1]} for i in self.cur.description ] ret = [ {'name':i[0],'type':i[1]} for i in self.cur.description ]
return ret return ret
async def sqlExe(self, sql, ns): async def runSQL(self,desc,NS,callback,**kw):
sqltype = self.getSqlType(sql) class RecordHandler:
if sqltype != 'qry': def __init__(self,ns,name):
r = await self.execute(sql, ns) self.ns = ns
return r self.name = name
if 'page' in ns.keys(): self.ns[name] = []
cnt = await self.record_count(sql, ns)
rows = await self.pagingdata(sql, ns) def handler(self,rec):
return { self.ns[self.name].append(rec)
'total': cnt,
'rows': rows cur = self.cursor()
} sql = self.getSQLfromDesc(desc)
if self.isSelectSql(sql):
if callback is None:
klass = desc.get('dataname','dummy')
if klass is not None:
rh = RecordHandler(NS,klass)
callback = rh.handler
else:
callback = None
await self.execute(sql,NS,callback)
async def sqlExecute(self,desc,NS):
return await self.execute(desc,NS,None)
async def sqlExe(self,sql,ns):
ret = [] ret = []
async for r in self._get_data(sql, ns): r = await self.execute(sql,ns,
ret.append(r) callback=lambda x:ret.append(x))
sqltype = self.getSqlType(sql)
if sqltype == 'dml':
return r
return ret return ret
async def sqlPaging(self,sql,ns): async def sqlPaging(self,sql,ns):
ret = []
dic = {
"sql_string":sql
}
page = ns.get('page') page = ns.get('page')
if not page: if not page:
ns['page'] = 1 ns['page'] = 1
total = await self.record_count(sql,ns) total = await self.record_count(dic,ns)
rows = await self.pagingdata(sql,ns) rows = await self.pagingdata(dic,ns)
return { return {
'total':total, 'total':total,
'rows':rows 'rows':rows
@ -472,9 +497,7 @@ class SQLor(object):
async def tables(self): async def tables(self):
sqlstring = self.tablesSQL() sqlstring = self.tablesSQL()
ret = [] ret = []
async for r in self._get_data(sqlstring,{}): await self.execute(sqlstring,{},lambda x:ret.append(x))
r.name = r.name.lower()
ret.append(r)
return ret return ret
def indexesSQL(self,tablename): def indexesSQL(self,tablename):
@ -493,16 +516,13 @@ class SQLor(object):
if sqlstring is None: if sqlstring is None:
return [] return []
recs = [] recs = []
async for r in self._get_data(sqlstring, {}): await self.execute(sqlstring,{},lambda x:recs.append(x))
recs.append(r)
return recs return recs
async def fields(self,tablename=None): async def fields(self,tablename=None):
sql = self.fieldsSQL(tablename) sqlstring = self.fieldsSQL(tablename)
recs = [] recs = []
async for r in self._get_data(sql, {}): await self.execute(sqlstring,{},lambda x:recs.append(x))
recs.append(r)
ret = [] ret = []
for r in recs: for r in recs:
r.update({'type':self.db2modelTypeMapping.get(r['type'].lower(),'unknown')}) r.update({'type':self.db2modelTypeMapping.get(r['type'].lower(),'unknown')})
@ -511,38 +531,31 @@ class SQLor(object):
return ret return ret
async def primary(self,tablename): async def primary(self,tablename):
sql = self.pkSQL(tablename) sqlstring = self.pkSQL(tablename)
recs = [] recs = []
async for r in self._get_data(sql, {}): await self.execute(sqlstring,{},lambda x:recs.append(x))
recs.append(r) # print('sql=', sqlstring, 'recs=', recs)
# debug(f'primary("{tablename}")={recs}, {sql}')
return recs return recs
async def fkeys(self,tablename): async def fkeys(self,tablename):
sqlstring = self.fkSQL(tablename) sqlstring = self.fkSQL(tablename)
recs = [] recs = []
async for r in self._get_data(sqlstring, {}): await self.execute(sqlstring,{},lambda x:recs.append(x))
recs.append(r)
return recs return recs
async def createTable(self,tabledesc): async def createTable(self,tabledesc):
te = MyTemplateEngine([],'utf8','utf8') te = MyTemplateEngine([],'utf8','utf8')
sql = te.renders(self.ddl_template,tabledesc) desc = {
return await self.execute(sql, {}) "sql_string":te.renders(self.ddl_template,tabledesc)
}
return await self.sqlExecute(desc,{})
async def getTableDesc(self,tablename): async def getTableDesc(self,tablename):
tablename = tablename.lower()
desc = self.getMeta(tablename) desc = self.getMeta(tablename)
if desc: if desc:
return desc return desc
desc = {} desc = {}
tables = await self.tables() summary = [ i for i in await self.tables() if tablename.lower() == i['name'].lower() ]
summary = [i for i in tables if tablename == i.name]
if not summary:
e = Exception(f'table({tablename}) not exist')
exception(f'{e}{format_exc()}')
raise e
pris = await self.primary(tablename) pris = await self.primary(tablename)
primary = [i['name'] for i in pris ] primary = [i['name'] for i in pris ]
summary[0]['primary'] = primary summary[0]['primary'] = primary
@ -569,14 +582,14 @@ class SQLor(object):
return desc return desc
async def rollback(self): async def rollback(self):
if inspect.iscoroutinefunction(self.conn.rollback): if self.async_mode:
await self.conn.rollback() await self.conn.rollback()
else: else:
self.conn.rollback() self.conn.rollback()
self.dataChanged = False self.dataChanged = False
async def commit(self): async def commit(self):
if inspect.iscoroutinefunction(self.conn.commit): if self.async_mode:
await self.conn.commit() await self.conn.commit()
else: else:
self.conn.commit() self.conn.commit()
@ -597,7 +610,7 @@ class SQLor(object):
ret = await rf.exe(rfname, ns) ret = await rf.exe(rfname, ns)
if isinstance(ret, dict): if isinstance(ret, dict):
ns.update(ret) ns.update(ret)
r = await self.execute(sql,ns.copy()) r = await self.runSQL({'sql_string':sql},ns.copy(), None)
await rf.exe(f'{self.dbname}:{tablename}:c:after', ns) await rf.exe(f'{self.dbname}:{tablename}:c:after', ns)
return r return r
@ -619,8 +632,11 @@ class SQLor(object):
if 'page' in ns.keys(): if 'page' in ns.keys():
if not 'sort' in ns.keys(): if not 'sort' in ns.keys():
ns['sort'] = desc['summary'][0]['primary'][0] ns['sort'] = desc['summary'][0]['primary'][0]
total = await self.record_count(sql, ns) dic = {
rows = await self.pagingdata(sql,ns) "sql_string":sql
}
total = await self.record_count(dic,ns)
rows = await self.pagingdata(dic,ns)
return { return {
'total':total, 'total':total,
'rows':rows 'rows':rows
@ -645,11 +661,11 @@ class SQLor(object):
ret = await rf.exe(f'{self.dbname}:{tablename}:u:before',ns) ret = await rf.exe(f'{self.dbname}:{tablename}:u:before',ns)
if isinstance(ret, dict): if isinstance(ret, dict):
ns.update(ret) ns.update(ret)
r = await self.execute(sql, ns.copy()) r = await self.runSQL({'sql_string':sql},ns.copy() ,None)
await rf.exe(f'{self.dbname}:{tablename}:u:after',ns) await rf.exe(f'{self.dbname}:{tablename}:u:after',ns)
return r return r
async def D(self,tablename, ns): async def D(self,tablename,ns):
desc = await self.I(tablename) desc = await self.I(tablename)
fields = [ i['name'] for i in desc['fields']] fields = [ i['name'] for i in desc['fields']]
condi = [ i for i in desc['summary'][0]['primary']] condi = [ i for i in desc['summary'][0]['primary']]
@ -660,12 +676,7 @@ class SQLor(object):
ret = await rf.exe(f'{self.dbname}:{tablename}:d:before', ns) ret = await rf.exe(f'{self.dbname}:{tablename}:d:before', ns)
if isinstance(ret, dict): if isinstance(ret, dict):
ns.update(ret) ns.update(ret)
r = await self.execute(sql, ns) r = await self.runSQL({'sql_string':sql},ns,None)
ns = await rf.exe(f'{self.dbname}:{tablename}:d:after', ns) ns = await rf.exe(f'{self.dbname}:{tablename}:d:after', ns)
return r return r
async def connect(self):
raise Exception('Not Implemented')
async def close(self):
raise Exception('Not Implemented')

2
sqlor/version.py Executable file
View File

@ -0,0 +1,2 @@
# fixed sor.py C function bug.
__version__ = "0.1.3"