From d8b7e1806e1bb3c185c4048346301f6aea2bec82 Mon Sep 17 00:00:00 2001 From: yumoqing Date: Wed, 16 Jul 2025 14:18:38 +0800 Subject: [PATCH] first commit --- README.md | 160 ++++++++ build.sh | 4 + databases.json | 26 ++ docs/postgresql.md | 20 + pyproject.toml | 4 + setup.cfg | 19 + sqlor/__init__.py | 0 sqlor/aiomysqlor.py | 7 + sqlor/aiopostgresqlor.py | 10 + sqlor/aiosqliteor.py | 7 + sqlor/const.py | 1 + sqlor/crud.py | 373 +++++++++++++++++ sqlor/dbpools.py | 497 ++++++++++++++++++++++ sqlor/ddl_template_mysql.py | 58 +++ sqlor/ddl_template_oracle.py | 47 +++ sqlor/ddl_template_postgresql.py | 45 ++ sqlor/ddl_template_sqlite3.py | 37 ++ sqlor/ddl_template_sqlserver.py | 49 +++ sqlor/filter.py | 180 ++++++++ sqlor/mongo.desc | 12 + sqlor/mssqlor.py | 174 ++++++++ sqlor/mysqlor.py | 143 +++++++ sqlor/oracleor.py | 130 ++++++ sqlor/postgresqlor.py | 171 ++++++++ sqlor/records.py | 27 ++ sqlor/runsql.py | 35 ++ sqlor/sor.py | 682 +++++++++++++++++++++++++++++++ sqlor/sqlite3or.py | 100 +++++ sqlor/version.py | 2 + test/primary.py | 47 +++ test/sqlite3_sql.py | 35 ++ test/t1.py | 44 ++ test/t2.py | 64 +++ test/test.py | 60 +++ 34 files changed, 3270 insertions(+) create mode 100755 README.md create mode 100755 build.sh create mode 100755 databases.json create mode 100755 docs/postgresql.md create mode 100644 pyproject.toml create mode 100644 setup.cfg create mode 100755 sqlor/__init__.py create mode 100755 sqlor/aiomysqlor.py create mode 100755 sqlor/aiopostgresqlor.py create mode 100755 sqlor/aiosqliteor.py create mode 100755 sqlor/const.py create mode 100755 sqlor/crud.py create mode 100755 sqlor/dbpools.py create mode 100755 sqlor/ddl_template_mysql.py create mode 100755 sqlor/ddl_template_oracle.py create mode 100755 sqlor/ddl_template_postgresql.py create mode 100755 sqlor/ddl_template_sqlite3.py create mode 100755 sqlor/ddl_template_sqlserver.py create mode 100755 sqlor/filter.py create mode 100755 sqlor/mongo.desc create mode 100755 sqlor/mssqlor.py create mode 100755 sqlor/mysqlor.py create mode 100755 sqlor/oracleor.py create mode 100755 sqlor/postgresqlor.py create mode 100755 sqlor/records.py create mode 100755 sqlor/runsql.py create mode 100755 sqlor/sor.py create mode 100755 sqlor/sqlite3or.py create mode 100755 sqlor/version.py create mode 100755 test/primary.py create mode 100755 test/sqlite3_sql.py create mode 100755 test/t1.py create mode 100755 test/t2.py create mode 100755 test/test.py diff --git a/README.md b/README.md new file mode 100755 index 0000000..dd01447 --- /dev/null +++ b/README.md @@ -0,0 +1,160 @@ +# SQLOR + +SQLOR is a database api for python3, it is base on the python's DBAPI2 + +## Features + +* Multiple database supported(Oracle, MySql, Postgresql, SQL Server +* Both asynchronous API & synchronous API supported +* Connection pools +* Connection life cycle managements +* Easy using API +* Resources(connection object, cursor object) automatic recycled + + +## requirements + +* python 3.5 or above +* asyncio +* Oracle DBAPI2 driver(cx_Oracle) +* MySQL DBAPI2 driver(mysql-connector) +* Postgresql DBAPI2 driver(psycopg2-binrary) +* Asynchronous MySQL driver(aiomysql) +* Asynchronous Postgresql driver(aiopg) +* Other driver can be easy integreated + +## Using + +``` +import asyncio + +from sqlor.dbpools import DBPools + +dbs={ + "aiocfae":{ + "driver":"aiomysql", + "async_mode":True, + "coding":"utf8", + "dbname":"cfae", + "kwargs":{ + "user":"test", + "db":"cfae", + "password":"test123", + "host":"localhost" + } + }, + "stock":{ + "driver":"aiopg", + "async_mode":True, + "codeing":"utf-8", + "dbname":"stock", + "kwargs":{ + "dbname":"stock", + "user":"test", + "password":"test123", + "host":"127.0.0.1" + } + }, + "cfae":{ + "driver":"mysql.connector", + "coding":"utf8", + "dbname":"cfae", + "kwargs":{ + "user":"test", + "db":"cfae", + "password":"test123", + "host":"localhost" + } + } +} + +loop = asyncio.get_event_loop() +pool = DBPools(dbs,loop=loop) + +async def testfunc(): + db = DBPools() + async with db.sqlorContext('stock') as sor: + # start a transaction + # if exception happended, all change to database will rollback + # else will commit + sql = "select * from stock where stock_num = ${stock_num}" + recs = await sor.sqlExe(sql, {'stock_num':'688888'}) + # return a list of DictObject instance + sql1 = "select * from stock" + recs = await sor.sqlPaging(sql, {'pagerows':50, 'page':1, 'sort':'stock_id'}) + # return a dictionary { + # 'total': return all reocords count. + # 'rows': list of DictObject instance + # } + ns = {'id':'666667'} # filters dictionary, find all records which id = '666667' + recs = await sor.R('stock', ns) + # recs is a list of data in DictObject instance + ns = {'pagerows': 50, 'page':1, 'sort':stock_id'} + # find all record in table 'stock', return page 1 data which 50 max records + dic = await sor.R('stock', ns) + # return a dictionary { + # 'total': return all reocords count. + # 'rows': list of DictObject instance + # } + await sor.C('stock', { + 'stock_id': '111111', + ... + }) + # add a record to 'stock' table + await sor.D('stock', {'stock_id':'111111'}) + # delete a record in 'stock' table which stock_id = '111111' + await sor.U('stock', {'stock_id':'111111', 'name':'new_name'}) + # update name field's value to 'new_name' which stock_id = '111111' + + +loop.run_until_complete(testfunc()) +``` + +## API + + +### Databases description data(dbdesc) + +sqlor uses a dbdesc data(databases description data) which description +how many databases and what database will using, and them connection parameters to create a dbpools objects + +dbdesc data is a dict data, format of the dbdesc as follow: +``` +{ + "aiocfae":{ # name to identify a database connect + "driver":"aiomysql", # database dbapi2 driver package name + "async_mode":True, # indicte this connection is asynchronous mode + "coding":"utf8", # charset coding + "dbname":"cfae", # database real name + "kwargs":{ # connection parameters + "user":"test", + "db":"cfae", + "password":"test123", + "host":"localhost" + } + }, + "cfae":{ + "driver":"mysql.connector", + "coding":"utf8", + "dbname":"cfae", + "kwargs":{ + "user":"test", + "db":"cfae", + "password":"test123", + "host":"localhost" + } + } +} + +``` +sqlor can using multiple databases and difference databases by using difference database driver + +### sql description data + + +## class + +### DBPools + +### SQLor + diff --git a/build.sh b/build.sh new file mode 100755 index 0000000..2607c2b --- /dev/null +++ b/build.sh @@ -0,0 +1,4 @@ +rm dist/*.whl +python setup.py install +python setup.py bdist_wheel +python -m twine upload --repository-url https://upload.pypi.org/legacy/ dist/*.whl diff --git a/databases.json b/databases.json new file mode 100755 index 0000000..0aa8269 --- /dev/null +++ b/databases.json @@ -0,0 +1,26 @@ +{ + "mysql":{ + "driver":"aiomysql", + "async_mode":true, + "coding":"utf8", + "dbname":"homedata", + "maxconn":100, + "kwargs":{ + "user":"test", + "db":"homedata", + "password":"test123", + "host":"localhost" + } + }, + "sqlite3":{ + "driver":"aiosqlite", + "async_mode":true, + "coding":"utf8", + "dbname":"testdb", + "maxconn":100, + "kwargs":{ + "database":"./ttttt.db" + } + } +} + diff --git a/docs/postgresql.md b/docs/postgresql.md new file mode 100755 index 0000000..39d3b26 --- /dev/null +++ b/docs/postgresql.md @@ -0,0 +1,20 @@ +# POSTGRESQL + +## create database +create database testdb; + +## create user + +create role guest with login; + +## grat privileges + +grant all privileges on database testdb to guest; + +### grant table privileges + +psql -d testdb + +grant all privileges on all tables in schema public to guest; + + diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..59514a1 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,4 @@ +[build-system] +requires = ["setuptools>=61", "wheel"] +build-backend = "setuptools.build_meta" + diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..22c38fc --- /dev/null +++ b/setup.cfg @@ -0,0 +1,19 @@ +[metadata] +name=sqlor +version = 1.0.3 +description = a wrap for DBAPI, to make python run sql easy and safe +authors = yu moqing +author_email = yumoqing@gmail.com +readme = README.md +license = {text = MIT} + +[options] +packages = find: +requires_python = >=3.8 +install_requires = + aiomysql + PyMySQL + aiosqlite + asyncio + jinja2 + diff --git a/sqlor/__init__.py b/sqlor/__init__.py new file mode 100755 index 0000000..e69de29 diff --git a/sqlor/aiomysqlor.py b/sqlor/aiomysqlor.py new file mode 100755 index 0000000..26c6fb7 --- /dev/null +++ b/sqlor/aiomysqlor.py @@ -0,0 +1,7 @@ +from .mysqlor import MySqlor + +class AioMysqlor(MySqlor): + @classmethod + def isMe(self,name): + return name=='aiomysql' + diff --git a/sqlor/aiopostgresqlor.py b/sqlor/aiopostgresqlor.py new file mode 100755 index 0000000..1a9f8f4 --- /dev/null +++ b/sqlor/aiopostgresqlor.py @@ -0,0 +1,10 @@ + +from .postgresqlor import PostgreSQLor +class AioPostgresqlor(PostgreSQLor): + @classmethod + def isMe(self,name): + return name=='aiopg' + + async def commit(self): + pass + diff --git a/sqlor/aiosqliteor.py b/sqlor/aiosqliteor.py new file mode 100755 index 0000000..5de89c8 --- /dev/null +++ b/sqlor/aiosqliteor.py @@ -0,0 +1,7 @@ +import re +from .sqlite3or import SQLite3or + +class Aiosqliteor(SQLite3or): + @classmethod + def isMe(self,name): + return name=='aiosqlite' diff --git a/sqlor/const.py b/sqlor/const.py new file mode 100755 index 0000000..988523c --- /dev/null +++ b/sqlor/const.py @@ -0,0 +1 @@ +ROWS=100 diff --git a/sqlor/crud.py b/sqlor/crud.py new file mode 100755 index 0000000..b9b9f22 --- /dev/null +++ b/sqlor/crud.py @@ -0,0 +1,373 @@ +# -*- coding:utf8 -*- +from .dbpools import DBPools +from .const import ROWS +from .filter import DBFilter +from appPublic.objectAction import ObjectAction +from appPublic.dictObject import DictObject +from appPublic.timeUtils import date2str,time2str,str2Date +from appPublic.uniqueID import getID +toStringFuncs={ + 'char':None, + 'str':None, + 'short':str, + 'long':str, + 'float':str, + 'date':date2str, + 'time':time2str, +} +fromStringFuncs={ + 'short':int, + 'long':int, + 'float':float, + 'date':str2Date, + 'time':str2Date +} + +class DatabaseNotfound(Exception): + def __init__(self,dbname): + Exception.__init__(self) + self.dbname = dbname + + def __str__(self): + return f'{self.dbname} not found' + +class CRUD(object): + def __init__(self,dbname,tablename,rows=ROWS): + self.pool = DBPools() + if dbname not in self.pool.databases.keys(): + raise DatabaseNotfound(dbname) + self.dbname = dbname + self.tablename = tablename + self.rows = rows + self.primary_data = None + self.oa = ObjectAction() + + async def primaryKey(self,**kw): + if self.primary_data is None: + self.primary_data = await self.pool.getTablePrimaryKey(self.dbname, + self.tablename,**kw) + + return self.primary_data + + async def forignKeys(self,**kw): + data = self.pool.getTableForignKeys(self.dbname,self.tablename,**kw) + return data + + async def I(self,**kw): + """ + fields information + """ + @self.pool.inSqlor + async def main(dbname,NS,**kw): + pkdata = await self.primaryKey(**kw) + pks = [ i.field_name for i in pkdata ] + data = await self.pool.getTableFields(self.dbname,self.tablename,**kw) + for d in data: + if d.name in pks: + d.update({'primarykey':True}) + data = self.oa.execute(self.dbname+'_'+self.tablename,'tableInfo',data) + return data + + return await main(self.dbname,{},**kw) + + async def fromStr(self,data): + fields = await self.pool.getTableFields(self.dbname,self.tablename) + ret = {} + for k in data: + v = None if data[k] == '' else data[k] + for f in fields: + if k == f.name: + ret[k] = v + f = fromStringFuncs.get(f.type,None) + if f is not None and v is not None: + ret[k] = f(v) + return ret + + async def toStr(self,data): + fields = await self.pool.getTableFields(self.dbname,self.tablename) + ret = {} + for k in data: + for f in fields: + if k == f.name: + ret[k] = data[k] + f = toStringFuncs.get(f.type,None) + if f is not None and data[k] is not None: + ret[k] = f(data[k]) + return ret + + async def datagrid(self,request,targeti,**kw): + fields = await self.I() + fs = [ self.defaultIOField(f) for f in fields ] + id = self.dbname+':'+ self.tablename + pk = await self.primaryKey(**kw) + idField = pk[0]['field_name'] + data = { + "tmplname":"widget_js.tmpl", + "data":{ + "__ctmpl__":"datagrid", + "__target__":target, + "data":{ + "name":id, + "icon-conv":"icon-table", + "title":tablename, + "url":absurl('./RP.dspy?id=%s' % id), + "deleteUrl":absurl('./D.dspy?id=%s' % id), + "addUrl":absurl('./C.dspy?id=%s' % id), + "updateUrl":absurl('./U.dspy?id=%s' % id), + "idField":idField, + "dnd":True, + "view":"scrollview", + "fields":fs, + "toolbar":{ + "tools":[ + { + "name":"add", + "icon":"icon-add", + "label":"add ball" + }, + { + "name":"delete", + "icon":"icon-delete", + "label":"delete ball" + }, + { + "name":"moveup", + "icon":"icon-up", + "label":"moveup ball" + }, + { + "name":"movedown", + "icon":"icon-down", + "label":"movedown ball" + } + ] + }, + "options":{ + "pageSize":50, + "pagination":False + } + } + } + } + data = self.oa.execute(id,'datagrid',data) + return data + + def defaultIOField(self,f): + if f.type in ['str']: + return { + "primarykey":f.get('primarykey',False), + "name":f.name, + "hidden":False, + "sortable":True, + "label":f.title, + "align":"center", + "iotype":"text" + } + if f.type in ['float','short','long']: + return { + "primarykey":f.get('primarykey',False), + "sortable":True, + "name":f.name, + "hidden":False, + "label":f.title, + "align":"right", + "iotype":"text" + } + return { + "primarykey":f.get('primarykey',False), + "name":f.name, + "sortable":True, + "hidden":False, + "label":f.title, + "align":"center", + "iotype":"text" + } + + async def C(self,rec,**kw): + """ + create new data + """ + @self.pool.runSQL + async def addSQL(dbname,data,**kw): + fns = kw['fns'] + vfs = kw['vfs'] + sqldesc={ + "sql_string" : """ + insert into %s (%s) values (%s) + """ % (self.tablename,fns,vfs), + } + return sqldesc + + @self.pool.inSqlor + async def main(dbname,NS,**kw): + fields = await self.pool.getTableFields(self.dbname,self.tablename,**kw) + flist = [ f['name'] for f in fields ] + fns = ','.join(flist) + vfs = ','.join([ '${' + f + '}$' for f in flist ]) + data = {} + [ data.update({k.lower():v}) for k,v in NS.items() ] + pk = await self.primaryKey(**kw) + k = pk[0]['field_name'] + if not data.get(k): + v = getID() + data[k] = v + data = self.oa.execute(self.dbname+'_'+self.tablename,'beforeAdd',data) + kwargs = kw.copy() + kwargs['fns'] = fns + kwargs['vfs'] = vfs + await addSQL(self.dbname,data,**kwargs) + data = self.oa.execute(self.dbname+'_'+self.tablename,'afterAdd',data) + return {k:data[k]} + + return await main(self.dbname,rec,**kw) + + async def defaultFilter(self,NS,**kw): + fields = await self.pool.getTableFields(self.dbname,self.tablename,**kw) + d = [ '%s = ${%s}$' % (f['name'],f['name']) for f in fields if f['name'] in NS.keys() ] + if len(d) == 0: + return '' + ret = ' and ' + ' and '.join(d) + return ret + + async def R(self,filters=None,NS={},**kw): + """ + retrieve data + """ + @self.pool.runSQL + async def retrieve(dbname,data,**kw): + fstr = '' + if filters is not None: + fstr = ' and ' + dbf = DBFilter(filters) + fstr = fstr + dbf.genFilterString() + else: + fstr = await self.defaultFilter(NS,**kw) + sqldesc = { + "sql_string":"""select * from %s where 1=1 %s""" % (self.tablename,fstr), + } + return sqldesc + + @self.pool.runSQLPaging + async def pagingdata(dbname,data,filters=None,**kw): + fstr = "" + if filters is not None: + fstr = ' and ' + dbf = DBFilter(filters) + fstr = fstr + dbf.genFilterString() + else: + fstr = await self.defaultFilter(NS,**kw) + + sqldesc = { + "sql_string":"""select * from %s where 1=1 %s""" % (self.tablename,fstr), + "default":{'rows':self.rows} + } + return sqldesc + + @self.pool.inSqlor + async def main(dbname,NS,**kw): + p = await self.primaryKey(**kw) + if NS.get('__id') is not None: + NS[p[0]['field_name']] = NS['__id'] + del NS['__id'] + if NS.get('page'): + del NS['page'] + + if NS.get('page'): + if NS.get('sort',None) is None: + NS['sort'] = p[0]['field_name'] + + data = self.oa.execute(self.dbname+'_'+self.tablename,'beforeRetrieve',NS) + if NS.get('page'): + data = await pagingdata(self.dbname,data,**kw) + else: + data = await retrieve(self.dbname,data,**kw) + data = self.oa.execute(self.dbname+'_'+self.tablename,'afterRetrieve',data) + return data + + return await main(self.dbname,NS,**kw) + + async def U(self,data, **kw): + """ + update data + """ + @self.pool.runSQL + async def update(dbname,NS,**kw): + condi = [ i['field_name'] for i in self.primary_data ] + newData = [ i for i in NS.keys() if i not in condi ] + c = [ '%s = ${%s}$' % (i,i) for i in condi ] + u = [ '%s = ${%s}$' % (i,i) for i in newData ] + cs = ' and '.join(c) + us = ','.join(u) + sqldesc = { + "sql_string":"""update %s set %s where %s""" % (self.tablename,us,cs) + } + return sqldesc + + @self.pool.inSqlor + async def main(dbname,NS,**kw): + pk = await self.primaryKey(**kw) + pkfields = [k.field_name for k in pk ] + newData = [ k for k in data if k not in pkfields ] + data = self.oa.execute(self.dbname+'_'+self.tablename,'beforeUpdate',data) + await update(self.dbname,data,**kw) + data = self.oa.execute(self.dbname+'_'+self.tablename,'afterUpdate',data) + return data + return await main(self.dbname,data,**kw) + + async def D(self,data,**kw): + """ + delete data + """ + @self.pool.runSQL + def delete(dbname,data,**kw): + pnames = [ i['field_name'] for i in self.primary_data ] + c = [ '%s = ${%s}$' % (i,i) for i in pnames ] + cs = ' and '.join(c) + sqldesc = { + "sql_string":"delete from %s where %s" % (self.tablename,cs) + } + return sqldesc + + @self.pool.inSqlor + async def main(dbname,NS,**kw): + data = self.oa.execute(self.dbname+'_'+self.tablename,'beforeDelete',data) + await delete(self.dbname,data,pkfields,**kw) + data = self.oa.execute(self.dbname+'_'+self.tablename,'afterDelete',data) + return data + return await main(self.dbname,data,**kw) + +if __name__ == '__main__': + DBPools({ + "ambi":{ + "driver":"pymssql", + "coding":"utf-8", + "dbname":"ambi", + "kwargs":{ + "user":"ymq", + "database":"ambi", + "password":"ymq123", + "host":"localhost" + } + }, + "metadb":{ + "driver":"pymssql", + "coding":"utf-8", + "dbname":"metadb", + "kwargs":{ + "user":"ymq", + "database":"metadb", + "password":"ymq123", + "host":"localhost" + } + } + }) + crud = CRUD('ambi') + #fields = crud.I('cashflow') + #for i in fields: + # print(i) + + data = crud.RP('cashflow') + print(data.total) + for i in data.rows: + print(i.balance,i.asid) + diff --git a/sqlor/dbpools.py b/sqlor/dbpools.py new file mode 100755 index 0000000..a80be8d --- /dev/null +++ b/sqlor/dbpools.py @@ -0,0 +1,497 @@ + +import asyncio +from traceback import format_exc +from functools import wraps +import codecs + +from contextlib import asynccontextmanager + +from appPublic.myImport import myImport +from appPublic.dictObject import DictObject +from appPublic.Singleton import SingletonDecorator +from appPublic.myjson import loadf +from appPublic.jsonConfig import getConfig +from appPublic.rc4 import unpassword +from appPublic.log import exception + +import threading +from .sor import SQLor +from .mssqlor import MsSqlor +from .oracleor import Oracleor +from .sqlite3or import SQLite3or +from .aiosqliteor import Aiosqliteor +from .mysqlor import MySqlor +from .aiomysqlor import AioMysqlor +from .aiopostgresqlor import AioPostgresqlor + +def sqlorFactory(dbdesc): + driver = dbdesc.get('driver',dbdesc) + def findSubclass(name,klass): + for k in klass.__subclasses__(): + if k.isMe(name): + return k + k1 = findSubclass(name,k) + if k1 is not None: + return k1 + return None + k = findSubclass(driver,SQLor) + if k is None: + return SQLor(dbdesc=dbdesc) + return k(dbdesc=dbdesc) + +def sqlorFromFile(dbdef_file,coding='utf8'): + dbdef = loadf(dbdef_file) + return sqlorFactory(dbdef) + +class LifeConnect: + def __init__(self,connfunc,kw,use_max=1000,async_mode=False): + self.connfunc = connfunc + self.async_mode = async_mode + self.use_max = use_max + self.kw = kw + self.conn = None + self.used = False + + def print(self): + print(self.use_max) + print(self.conn) + + async def _mkconn(self): + if self.async_mode: + self.conn = await self.connfunc(**self.kw) + else: + self.conn = self.connfunc(**self.kw) + self.use_cnt = 0 + + async def use(self): + if self.conn is None: + await self._mkconn() + wait_time = 0.2 + loop_cnt = 4 + while loop_cnt > 0: + if await self.testok(): + return self.conn + await asyncio.sleep(wait_time) + wait_time = wait_time + 0.4 + loop_cnt = loop_cnt - 1 + try: + await self.conn.close() + except: + pass + self.conn = None + await self._mkconn() + raise Exception('database connect break') + + async def free(self,conn): + self.use_cnt = self.use_cnt + 1 + return + if self.use_cnt >= self.use_max: + await self.conn.close() + await self._mkcomm() + + async def testok(self): + if self.async_mode: + async with self.conn.cursor() as cur: + try: + await cur.execute('select 1 as cnt') + return True + except: + return False + else: + cur = self.conn.cursor() + try: + cur.execute('select 1 as cnt') + r = cur.fetchall() + return True + except: + return False + finally: + cur.close() + +class ConnectionPool(object): + def __init__(self,dbdesc,loop): + self.dbdesc = dbdesc + self.async_mode = dbdesc.get('async_mode',False) + self.loop = loop + self.driver = myImport(self.dbdesc['driver']) + self.maxconn = dbdesc.get('maxconn',5) + self.maxuse = dbdesc.get('maxuse',1000) + self._pool = asyncio.Queue(self.maxconn) + self.connectObject = {} + self.use_cnt = 0 + self.max_use = 1000 + self.e_except = None + # self.lock = asyncio.Lock() + # self.lockstatus() + + def lockstatus(self): + return + self.loop.call_later(5,self.lockstatus) + print('--lock statu=',self.lock.locked(), + '--pool empty()=',self._pool.empty(), + '--full()=',self._pool.full() + ) + + async def _fillPool(self): + for i in range(self.maxconn): + lc = await self.connect() + i = i + 1 + + async def connect(self): + lc = LifeConnect(self.driver.connect,self.dbdesc['kwargs'], + use_max=self.maxuse,async_mode=self.async_mode) + await self._pool.put(lc) + return lc + + def isEmpty(self): + return self._pool.empty() + + def isFull(self): + return self._pool.full() + + async def aquire(self): + lc = await self._pool.get() + conn = await lc.use() + """ + with await self.lock: + self.connectObject[lc.conn] = lc + """ + self.connectObject[lc.conn] = lc + return conn + + async def release(self,conn): + lc = None + """ + with await self.lock: + lc = self.connectObject.get(conn,None) + del self.connectObject[conn] + """ + lc = self.connectObject.get(conn,None) + del self.connectObject[conn] + await self._pool.put(lc) + +@SingletonDecorator +class DBPools: + def __init__(self,databases={},max_connect=100,loop=None): + if loop is None: + loop = asyncio.get_event_loop() + self.loop = loop + self.max_connect = max_connect + self.sema = asyncio.Semaphore(max_connect) + self._cpools = {} + self.databases = databases + self.meta = {} + + def get_dbname(self, name): + desc = self.database.get(name) + if not desc: + return None + return desc.get('dbname') + + def addDatabase(self,name,desc): + self.databases[name] = desc + + async def getSqlor(self,name): + await self.sema.acquire() + desc = self.databases.get(name) + sor = sqlorFactory(desc) + sor.name = name + a,conn,cur = await self._aquireConn(name) + sor.setCursor(a,conn,cur) + return sor + + async def freeSqlor(self,sor): + await self._releaseConn(sor.name,sor.conn,sor.cur) + self.sema.release() + + @asynccontextmanager + async def sqlorContext(self,name): + self.e_except = None + sqlor = await self.getSqlor(name) + try: + yield sqlor + except Exception as e: + self.e_except = e + cb = format_exc() + exception(f'sqlorContext():EXCEPTION{e}, {cb}') + if sqlor and sqlor.dataChanged: + await sqlor.rollback() + finally: + if sqlor and sqlor.dataChanged: + await sqlor.commit() + await self.freeSqlor(sqlor) + + async def _aquireConn(self,dbname): + """ + p = self._cpools.get(dbname) + if p == None: + p = ConnectionPool(self.databases.get(dbname),self.loop) + await p._fillPool() + self._cpools[dbname] = p + conn = await p.aquire() + if self.isAsyncDriver(dbname): + cur = await conn.cursor() + else: + cur = conn.cursor() + return self.isAsyncDriver(dbname),conn,cur + """ + dbdesc = self.databases.get(dbname) + driver = myImport(dbdesc['driver']) + conn = None + cur = None + desc = dbdesc['kwargs'].copy() + pw = desc.get('password') + if pw: + desc['password'] = unpassword(pw) + if self.isAsyncDriver(dbname): + if dbdesc['driver'] == 'sqlite3': + conn = await driver.connect(desc['dbname']) + else: + conn = await driver.connect(**desc) + cur = await conn.cursor() + return True,conn,cur + else: + if dbdesc['driver'] == 'sqlite3': + conn = driver.connect(desc['dbname']) + else: + conn = driver.connect(**desc) + cur = conn.cursor() + return False,conn,cur + + def isAsyncDriver(self,dbname): + ret = self.databases[dbname].get('async_mode',False) + return ret + + async def _releaseConn(self,dbname,conn,cur): + """ + if self.isAsyncDriver(dbname): + await cur.close() + else: + try: + cur.fetchall() + except: + pass + cur.close() + p = self._cpools.get(dbname) + if p == None: + raise Exception('database (%s) not connected'%dbname) + await p.release(conn) + """ + if self.isAsyncDriver(dbname): + try: + await cur.close() + except: + pass + else: + try: + cur.fetchall() + except: + pass + cur.close() + conn.close() + + async def useOrGetSor(self,dbname,**kw): + commit = False + if kw.get('sor'): + sor = kw['sor'] + else: + sor = await self.getSqlor(dbname) + commit = True + return sor, commit + + def inSqlor(self,func): + @wraps(func) + async def wrap_func(dbname,NS,*args,**kw): + sor, commit = await self.useOrGetSor(dbname, **kw) + kw['sor'] = sor + try: + ret = await func(dbname,NS,*args,**kw) + if not commit: + return ret + try: + await sor.conn.commit() + except: + pass + return ret + except Exception as e: + print('error',sor) + if not commit: + raise e + try: + await sor.conn.rollback() + except: + pass + raise e + finally: + if commit: + await self.freeSqlor(sor) + + return wrap_func + + def runSQL(self,func): + @wraps(func) + async def wrap_func(dbname,NS,*args,**kw): + sor, commit = await self.useOrGetSor(dbname,**kw) + kw['sor'] = sor + ret = None + try: + desc = await func(dbname,NS,*args,**kw) + callback = kw.get('callback',None) + kw1 = {} + [ kw1.update({k:v}) for k,v in kw.items() if k!='callback' ] + ret = await sor.runSQL(desc,NS,callback,**kw1) + if commit: + try: + await sor.conn.commit() + except: + pass + if NS.get('dummy'): + return NS['dummy'] + else: + return [] + except Exception as e: + print('error:',e) + if not commit: + raise e + try: + await sor.conn.rollback() + except: + pass + raise e + finally: + if commit: + await self.freeSqlor(sor) + return wrap_func + + def runSQLPaging(self,func): + @wraps(func) + async def wrap_func(dbname,NS,*args,**kw): + sor, commit = await self.useOrGetSor(dbname,**kw) + kw['sor'] = sor + try: + desc = await func(dbname,NS,*args,**kw) + return await sor.runSQLPaging(desc,NS) + except Exception as e: + print('error',e) + raise e + finally: + if commit: + await self.freeSqlor(sor) + return wrap_func + + async def runSQLResultFields(self, func): + @wraps(func) + async def wrap_func(dbname,NS,*args,**kw): + sor, commit = await self.useOrGetSor(dbname,**kw) + kw['sor'] = sor + try: + desc = await func(dbname,NS,*args,**kw) + ret = await sor.resultFields(desc,NS) + return ret + except Exception as e: + print('error=',e) + raise e + finally: + if commit: + await self.freeSqlor(sor) + return wrap_func + + def setMeta(self,key,n,v): + if not self.meta.get(key): + self.meta[key] = {} + self.meta[key][n] = v + + def getMeta(self,key,n=None): + if not self.meta.get(key): + self.meta[key] = {} + + if n is None: + return self.meta[key] + return self.meta[key].get(n,None) + + async def getTables(self,dbname,**kw): + @self.inSqlor + async def _getTables(dbname,NS,**kw): + sor = kw['sor'] + ret = await sor.tables() + return ret + return await _getTables(dbname,{},**kw) + + async def getTableFields(self,dbname,tblname,**kw): + @self.inSqlor + async def _getTableFields(dbname,NS,tblname,**kw): + key = '%s:%s' % (dbname,tblname) + fields = self.getMeta(key, 'fields') + if fields: + return fields + sor = kw['sor'] + ret = await sor.fields(tblname) + self.setMeta(key,'fields',ret) + return ret + return await _getTableFields(dbname,{},tblname,**kw) + + async def getTablePrimaryKey(self,dbname,tblname,**kw): + @self.inSqlor + async def _getTablePrimaryKey(dbname,NS,tblname,**kw): + key = '%s:%s' % (dbname,tblname) + pri = self.getMeta(key,'primarykey') + if pri: + return pri + sor = kw['sor'] + ret = await sor.primary(tblname) + self.setMeta(key,'primarykey',ret) + return ret + return await _getTablePrimaryKey(dbname,{},tblname,**kw) + + async def getTableIndexes(self,dbname,tblname,**kw): + @self.inSqlor + async def _getTablePrimaryKey(dbname,NS,tblname,**kw): + key = '%s:%s' % (dbname,tblname) + idxes = self.getMeta(key,'indexes') + if idxes: + return idxes + sor = kw['sor'] + ret = await sor.indexes(tblname) + self.setMeta(key,'indexes',ret) + return ret + return await _getTablePrimaryKey(dbname,{},tblname,**kw) + + async def getTableForignKeys(self,dbname,tblname,**kw): + @self.inSqlor + async def _getTableForignKeys(dbname,NS,tblname,**kw): + key = '%s:%s' % (dbname,tblname) + forignkeys = self.getMeta(key,'forignkeys') + if forignkeys: + return forignkeys + sor = kw['sor'] + ret = await sor.fkeys(tblname) + self.setMeta(key,'forignkeys',forignkeys) + return ret + return await _getTableForignKeys(dbname,{},tblname,**kw) + +async def runSQL(dbname,sql,ns={},sor=None): + pool = DBPools() + @pool.runSQL + async def desc(dbname,ns,*args,**kw): + return { + "sql_string":sql + } + kw = { + 'sor':sor + } + x = await desc(dbname,ns,**kw) + return x + +async def runSQLPaging(dbname,sql,ns={},sor=None): + pool = DBPools() + @pool.runSQLPaging + async def desc(dbname,ns,*args,**kw): + return { + "sql_string":sql + } + kw = { + "sor":sor + } + x = await desc(dbname, ns, **kw) + return x + diff --git a/sqlor/ddl_template_mysql.py b/sqlor/ddl_template_mysql.py new file mode 100755 index 0000000..70c3815 --- /dev/null +++ b/sqlor/ddl_template_mysql.py @@ -0,0 +1,58 @@ +mysql_ddl_tmpl = """{% macro typeStr(type,len,dec) %} +{%- if type=='str' -%} +VARCHAR({{len}}) +{%- elif type=='char' -%} +CHAR({{len}}) +{%- elif type=='long' or type=='int' or type=='short' -%} +int +{%- elif type=='long' -%} +bigint +{%- elif type=='float' or type=='double' or type=='ddouble' -%} +double({{len}},{{dec}}) +{%- elif type=='date' -%} +date +{%- elif type=='time' -%} +time +{%- elif type=='datetime' -%} +datetime +{%- elif type=='timestamp' -%} +TIMESTAMP DEFAULT CURRENT_TIMESTAMP +{%- elif type=='text' -%} +longtext +{%- elif type=='bin' -%} +longblob +{%- else -%} +{{type}} +{%- endif %} +{%- endmacro %} + +{%- macro defaultValue(defaultv) %} +{%- if defaultv %} DEFAULT '{{defaultv}}'{%- endif -%} +{%- endmacro %} + +{% macro nullStr(nullable) %} +{%- if nullable=='no' -%} +NOT NULL +{%- endif -%} +{% endmacro %} +{% macro primary() %} +,primary key({{','.join(summary[0].primary)}}) +{% endmacro %} +drop table if exists {{summary[0].name}}; +CREATE TABLE {{summary[0].name}} +( +{% for field in fields %} + `{{field.name}}` {{typeStr(field.type,field.length,field.dec)}} {{nullStr(field.nullable)}} {{defaultValue(field.default)}} {%if field.title -%} comment '{{field.title}}'{%- endif %}{%- if not loop.last -%},{%- endif -%} +{% endfor %} +{% if summary[0].primary and len(summary[0].primary)>0 %} +{{primary()}} +{% endif %} +) +engine=innodb +default charset=utf8 +{% if summary[0].title %}comment '{{summary[0].title}}'{% endif %} +; +{% for v in indexes %} +CREATE {% if v.idxtype=='unique' %}UNIQUE{% endif %} INDEX {{summary[0].name}}_{{v.name}} ON {{summary[0].name}}({{",".join(v.idxfields)}}); +{%- endfor -%} +""" diff --git a/sqlor/ddl_template_oracle.py b/sqlor/ddl_template_oracle.py new file mode 100755 index 0000000..d71ca84 --- /dev/null +++ b/sqlor/ddl_template_oracle.py @@ -0,0 +1,47 @@ +oracle_ddl_tmpl = """{% macro typeStr(type,len,dec) %} +{%- if type=='str' -%} +VARCHAR2({{len}}) +{%- elif type=='char' -%} +CHAR({{len}}) +{%- elif type=='long' or type=='int' or type=='short' -%} +NUMBER +{%- elif type=='float' or type=='double' or type=='ddouble' -%} +NUMBER({{len}},{{dec}}) +{%- elif type=='date' or type=='time' -%} +DATE +{%- elif type=='timestamp' -%} +TIMESTAMP +{%- elif type=='text' -%} +CLOB +{%- elif type=='bin' -%} +BLOB +{%- else -%} +{{type}} +{%- endif %} +{%- endmacro %} +{% macro nullStr(nullable) %} +{%- if nullable=='no' -%} +NOT NULL +{%- endif -%} +{% endmacro %} +{% macro primary() %} +,primary key({{','.join(summary[0].primary)}}) +{% endmacro %} +drop table {{summary[0].name}}; +CREATE TABLE {{summary[0].name}} +( +{% for field in fields %} + {{field.name}} {{typeStr(field.type,field.length,field.dec)}} {{nullStr(field.nullable)}}{%- if not loop.last -%},{%- endif -%} +{% endfor %} +{% if summary[0].primary and len(summary[0].primary)>0 %} +{{primary()}} +{% endif %} +); +{% for v in indexes %} +CREATE {% if v.idxtype=='unique' %}UNIQUE{% endif %} INDEX {{summary[0].name}}_{{v.name}} ON {{summary[0].name}}({{",".join(v.idxfields)}}); +{%- endfor -%} +COMMENT ON TABLE {{summary[0].name}} IS '{{summary[0].title}}'; +{% for field in fields %} +COMMENT ON COLUMN {{summary[0].name}}.{{field.name}} is '{{field.title}}'; +{% endfor %} +""" diff --git a/sqlor/ddl_template_postgresql.py b/sqlor/ddl_template_postgresql.py new file mode 100755 index 0000000..a6c13b4 --- /dev/null +++ b/sqlor/ddl_template_postgresql.py @@ -0,0 +1,45 @@ +postgresql_ddl_tmpl = """{% macro typeStr(type,len,dec) %} +{%- if type=='str' -%} +VARCHAR({{len}}) +{%- elif type=='char' -%} +CHAR({{len}}) +{%- elif type=='long' or type=='int' or type=='short' -%} +NUMERIC(30,0) +{%- elif type=='float' or type=='double' or type=='ddouble' -%} +NUMERIC({{len}},{{dec}}) +{%- elif type=='date' -%} +DATE +{%- elif type=='time' -%} +TIME +{%- elif type=='timestamp' -%} +TIMESTAMP +{%- else -%} +{{type}} +{%- endif %} +{%- endmacro %} +{% macro nullStr(nullable) %} +{%- if nullable=='no' -%} +NOT NULL +{%- endif -%} +{% endmacro %} +{% macro primary() %} +,PRIMARY KEY({{','.join(summary[0].primary)}}) +{% endmacro %} +DROP TABLE IF EXISTS {{summary[0].name}}; +CREATE TABLE {{summary[0].name}} +( +{% for field in fields %} + {{field.name}} {{typeStr(field.type,field.length,field.dec)}} {{nullStr(field.nullable)}}{%- if not loop.last -%},{%- endif -%} +{% endfor %} +{% if summary[0].primary and len(summary[0].primary)>0 %} +{{primary()}} +{% endif %} +); +{% for v in indexes %} +CREATE {% if v.idxtype=='unique' %}UNIQUE{% endif %} INDEX {{summary[0].name}}_{{v.name}} ON {{summary[0].name}}({{",".join(v.idxfields)}}); +{%- endfor -%} +COMMENT ON TABLE {{summary[0].name}} IS '{{summary[0].title}}'; +{% for field in fields %} +COMMENT ON COLUMN {{summary[0].name}}.{{field.name}} is '{{field.title}}'; +{% endfor %} +""" diff --git a/sqlor/ddl_template_sqlite3.py b/sqlor/ddl_template_sqlite3.py new file mode 100755 index 0000000..6d6eca6 --- /dev/null +++ b/sqlor/ddl_template_sqlite3.py @@ -0,0 +1,37 @@ +sqlite3_ddl_tmpl = """{% macro typeStr(type,len,dec) %} +{%- if type in ['str', 'char', 'date', 'time', 'datetime', 'timestamp'] -%} +TEXT +{%- elif type in ['long', 'int', 'short', 'longlong' ] -%} +int +{%- elif type in ['float', 'double', 'ddouble'] -%} +real +{%- elif type=='bin' -%} +blob +{%- else -%} +{{type}} +{%- endif %} +{%- endmacro %} +{% macro nullStr(nullable) %} +{%- if nullable=='no' -%} +NOT NULL +{%- endif -%} +{% endmacro %} +{% macro primary() %} +,primary key({{','.join(summary[0].primary)}}) +{% endmacro %} +drop table if exists {{summary[0].name}}; +CREATE TABLE {{summary[0].name}} +( +{% for field in fields %} + `{{field.name}}` {{typeStr(field.type,field.length,field.dec)}} {{nullStr(field.nullable)}}{%- if not loop.last -%},{%- endif -%} {%if field.title -%} -- {{field.title}}{%- endif %} +{% endfor %} +{% if summary[0].primary and len(summary[0].primary)>0 %} +{{primary()}} +{% endif %} +) +{% if summary[0].title %} --{{summary[0].title}}{% endif %} +; +{% for v in indexes %} +CREATE {% if v.idxtype=='unique' %}UNIQUE{% endif %} INDEX {{summary[0].name}}_{{v.name}} ON {{summary[0].name}}({{",".join(v.idxfields)}}); +{%- endfor -%} +""" diff --git a/sqlor/ddl_template_sqlserver.py b/sqlor/ddl_template_sqlserver.py new file mode 100755 index 0000000..8bc7f6a --- /dev/null +++ b/sqlor/ddl_template_sqlserver.py @@ -0,0 +1,49 @@ +sqlserver_ddl_tmpl = """{% macro typeStr(type,len,dec) %} +{%- if type=='str' -%} +NVARCHAR({{len}}) +{%- elif type=='char' -%} +CHAR({{len}}) +{%- elif type=='long' or type=='int' or type=='short' -%} +NUMERIC +{%- elif type=='float' or type=='double' or type=='ddouble' -%} +numeric({{len}},{{dec}}) +{%- elif type=='date' or type=='time' -%} +DATE +{%- elif type=='timestamp' -%} +TIMESTAMP +{%- elif type=='text' -%} +NVARCHAR(MAX) +{%- elif type=='bin' -%} +IMAGE +{%- else -%} +{{type}} +{%- endif %} +{%- endmacro %} +{% macro nullStr(nullable) %} +{%- if nullable=='no' -%} +NOT NULL +{%- endif -%} +{% endmacro %} + +{% macro primary() %} +,primary key({{','.join(summary[0].primary)}}) +{% endmacro %} + +drop table dbo.{{summary[0].name}}; +CREATE TABLE dbo.{{summary[0].name}} +( +{% for field in fields %} + {{field.name}} {{typeStr(field.type,field.length,field.dec)}} {{nullStr(field.nullable)}}{%- if not loop.last -%},{%- endif -%} +{% endfor %} +{% if summary[0].primary and len(summary[0].primary)>0 %} +{{primary()}} +{% endif %} +) +{% for v in indexes %} +CREATE {% if v.idxtype=='unique' %}UNIQUE{% endif %} INDEX {{summary[0].name}}_{{v.name}} ON {{summary[0].name}}({{",".join(v.idxfields)}}); +{%- endfor -%} +EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'{{summary[0].title}}' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'{{summary[0].name}}' +{% for field in fields %} +EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'{{field.title}}' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'{{summary[0].name}}', @level2type=N'COLUMN',@level2name=N'{{field.name}}' +{% endfor %} +""" diff --git a/sqlor/filter.py b/sqlor/filter.py new file mode 100755 index 0000000..1bbd827 --- /dev/null +++ b/sqlor/filter.py @@ -0,0 +1,180 @@ +# -*- coding:utf8 -*- +""" +过滤器解释器 +解释json格式的SQL查询过滤 +过滤支持: +双目关系运算符: + AND + OR +单目关系运算符: + NOT +表达式: + 表达关系有:=,<>,>,<,>=,<=,in,not in + a=b格式的 +{ + "and":[ + { + "field":'field1' + "op":"=", + "const":1 + "var":"var1" + }, + { + "or":[...] + } + { + "not":{...} + } + ] +} +""" +try: + import ujson as json +except: + import json + +class DBFilter(object): + operators = [ + "=", + "!=", + "IN", + "NOT IN", + ">", + ">=", + "<", + "<=", + "IS NULL", + "IS NOT NULL", + "LIKE", + "NOT LIKE" + ] + logic = [ + "AND", + "OR", + "NOT" + ] + def __init__(self,filterjson): + if isinstance(filterjson, str): + filterjson = json.loads(filterjson) + self.filterjson = filterjson + + def gen(self, ns={}): + self.consts = {} + ret = self._genFilterSQL(self.filterjson, ns) + ns.update(self.consts) + return ret + + def get_variables(self): + return self.get_filters_variables(self.filterjson) + + def get_filters_variables(self, filters): + vs = {} + keys = [ k for k in filters.keys() ] + if len(keys) == 1: + fs = filters[keys[0]] + if isinstance(fs, list): + for f in fs: + v = self.get_filters_variables(f) + if v: + vs.update(v) + return vs + else: + v = self.get_filters_variables(fs) + if v: + vs.update(v) + return vs + if 'var' in keys: + return { + filters['var']:filters['field'] + } + return None + + def _genFilterSQL(self,fj, ns): + keys = [ i for i in fj.keys()] + if len(keys) == 1: + key = keys[0] + if key.lower() in ['and','or']: + if type(fj[key]) != type([]) or len(fj[key])<2: + raise Exception(key + ':' + json.dumps(fj[key]) + ':is not a array, or array length < 2') + subsqls = [self._genFilterSQL(f, ns) for f in fj[key]] + subsqls = [ s for s in subsqls if s is not None] + if len(subsqls) < 1: + return None + if len(subsqls) < 2: + return subsqls[0] + + a = ' %s ' % key + sql = a.join(subsqls) + if key == 'or': + return ' (%s) ' % sql + return sql + if key.lower() == 'not': + if type(fj[key]) != type({}): + raise Exception(key + ':' + json.dumps(fj[key]) + ':is not a dict') + a = ' %s ' % key + sql = self._genFilterSQL(fj[key], ns) + if not sql: + return None + return ' not (%s) ' % sql + return self._genFilterItems(fj, ns) + + def _genFilterItems(self,fj, ns): + keys = fj.keys() + assert 'field' in keys + assert 'op' in keys + assert 'const' in keys or 'var' in keys or fj['op'].upper() in ['IS NULL', 'IS NOT NULL'] + op = fj.get('op').upper() + assert op in self.operators + + if op in ['IS NULL', 'IS NOT NULL']: + return f"{fj['field']} {op}" + + var = fj.get('var') + if var and not var in ns.keys(): + return None + + if 'const' in keys: + cnt = len(self.consts.keys()) + name = f'filter_const_{cnt}' + self.consts.update({name:fj.get('const')}) + sql = '%s %s ${%s}$' % (fj.get('field'), fj.get('op'), name) + return sql + + sql = '%s %s ${%s}$' % (fj.get('field'), fj.get('op'), fj.get('var')) + return sql + +def default_filterjson(fields: list, ns: dict): + fj = {} + c = [ { + 'field':k, + 'op':'=', + 'var':k + } for k in ns.keys() if k in fields ] + print(f'default_filterjson():{c=}') + if len(c) < 1: + return None + if len(c) < 2: + return c[0] + return { + 'AND':c + } + +if __name__ == '__main__': + fj = { + "AND":[ + { + "field":'field1', + "op":"=", + "var":"name" + }, + { + "field":'del_flg', + "op":"=", + "const":"0" + }] + } + + dbf = DBFilter(fj) + print(fj) + print(dbf.gen({'name':'joe'})) + print(f'{dbf.consts=}') diff --git a/sqlor/mongo.desc b/sqlor/mongo.desc new file mode 100755 index 0000000..aebcc1d --- /dev/null +++ b/sqlor/mongo.desc @@ -0,0 +1,12 @@ +{ + "driver":"motor", + "async_mode":true, + "dbname":"test", + "kwargs":{ + "host":"localhost", + "port":20017, + "user":"test", + "password":"test123", + "dbname":"test" + } +} diff --git a/sqlor/mssqlor.py b/sqlor/mssqlor.py new file mode 100755 index 0000000..83d6fcd --- /dev/null +++ b/sqlor/mssqlor.py @@ -0,0 +1,174 @@ +# -*- coding:utf8 -*- +from .sor import SQLor +from .ddl_template_sqlserver import sqlserver_ddl_tmpl + +class MsSqlor(SQLor): + ddl_template = sqlserver_ddl_tmpl + db2modelTypeMapping = { + 'bit':'short', + 'tinyint':'short', + 'date':'date', + 'bigint':'long', + 'smallint':'short', + 'int':'long', + 'decimal':'float', + 'numeric':'float', + 'smallmoney':'float', + 'money':'float', + 'real':'float', + 'float':'float', + 'datetime':'date', + 'timestamp':'timestamp', + 'uniqueidentifier':'timestamp', + 'char':'char', + 'varchar':'str', + 'text':'text', + 'nchar':'str', + 'nvarchar':'str', + 'ntext':'text', + 'binary':'str', + 'varbinary':'str', + 'image':'file', + } + model2dbTypemapping = { + 'date':'datetime', + 'time':'date', + 'timestamp':'timestamp', + 'str':'nvarchar', + 'char':'char', + 'short':'int', + 'long':'numeric', + 'float':'numeric', + 'text':'ntext', + 'file':'image', + } + @classmethod + def isMe(self,name): + return name=='pymssql' + + def grammar(self): + return { + 'select':select_stmt, + } + + def placeHolder(self,varname,pos=None): + if varname=='__mainsql__' : + return '' + return '%s' + + def dataConvert(self,dataList): + if type(dataList) == type({}): + d = [ i for i in dataList.values()] + else: + d = [ i['value'] for i in dataList] + return tuple(d) + + def pagingSQLmodel(self): + return u"""select * +from ( + select row_number() over(order by $[sort]$) as _row_id,page_s.* + from (%s) page_s + ) A +where _row_id >= $[from_line]$ and _row_id < $[end_line]$""" + + def tablesSQL(self): + sqlcmd = u"""select + lower(d.name) as name, + lower(cast(Isnull(f.VALUE,d.name) as nvarchar )) title + from sysobjects d + left join sys.extended_properties f on d.id = f.major_id and f.minor_id = 0 + where d.xtype = 'U'""" + return sqlcmd + + def fieldsSQL(self,tablename=None): + sqlcmd=u"""SELECT name = lower(a.name) + ,type = b.name + ,length = Columnproperty(a.id,a.name,'PRECISION') + ,dec = Isnull(Columnproperty(a.id,a.name,'Scale'),null) + ,nullable = CASE + WHEN a.isnullable = 1 THEN 'yes' + ELSE 'no' + END + ,title = lower(cast(Isnull(g.[value],a.name) as nvarchar) ) + ,table_name = lower(d.name) + FROM syscolumns a + LEFT JOIN systypes b + ON a.xusertype = b.xusertype + INNER JOIN sysobjects d + ON (a.id = d.id) + AND (d.xtype = 'U') + AND (d.name <> 'dtproperties') + INNER JOIN sys.all_objects c + ON d.id=c.object_id + AND schema_name(schema_id)='dbo' + LEFT JOIN sys.extended_properties g + ON (a.id = g.major_id) + AND (a.colid = g.minor_id) + LEFT JOIN sys.extended_properties f + ON (d.id = f.major_id) + AND (f.minor_id = 0)""" + if tablename is not None: + sqlcmd = sqlcmd + """ where lower(d.name)='%s' + ORDER BY a.id,a.colorder""" % tablename.lower() + else: + sqlcmd = sqlcmd + """ ORDER BY a.id,a.colorder""" + return sqlcmd + + def fkSQL(self,tablename=None): + sqlcmd = u"""select + MainCol.name AS field -- [主表列名] + ,oSub.name AS fk_table -- [子表名称], + ,SubCol.name AS fk_field -- [子表列名], +from + sys.foreign_keys fk + JOIN sys.all_objects oSub + ON (fk.parent_object_id = oSub.object_id) + JOIN sys.all_objects oMain + ON (fk.referenced_object_id = oMain.object_id) + JOIN sys.foreign_key_columns fkCols + ON (fk.object_id = fkCols.constraint_object_id) + JOIN sys.columns SubCol + ON (oSub.object_id = SubCol.object_id + AND fkCols.parent_column_id = SubCol.column_id) + JOIN sys.columns MainCol + ON (oMain.object_id = MainCol.object_id + AND fkCols.referenced_column_id = MainCol.column_id)""" + if tablename is not None: + sqlcmd = sqlcmd + """ where lower(oMain.name) = '%s'""" % tablename.lower() + + return sqlcmd + + def pkSQL(self,tablename=None): + sqlcmd = u"""select + lower(a.table_name) as table_name, + lower(b.column_name) as field_name + from information_schema.table_constraints a + inner join information_schema.constraint_column_usage b + on a.constraint_name = b.constraint_name + where a.constraint_type = 'PRIMARY KEY'""" + if tablename is not None: + sqlcmd = sqlcmd + """ and lower(a.table_name) = '%s'""" % tablename.lower() + return sqlcmd + + def indexesSQL(self,tablename=None): + sqlcmd = """SELECT +index_name=lower(IDX.Name), +index_type=IDX.is_unique, +column_name=lower(C.Name) +FROM sys.indexes IDX +INNER JOIN sys.index_columns IDXC +ON IDX.[object_id]=IDXC.[object_id] +AND IDX.index_id=IDXC.index_id +LEFT JOIN sys.key_constraints KC +ON IDX.[object_id]=KC.[parent_object_id] +AND IDX.index_id=KC.unique_index_id +INNER JOIN sys.objects O +ON O.[object_id]=IDX.[object_id] +INNER JOIN sys.columns C +ON O.[object_id]=C.[object_id] +AND O.type='U' +AND O.is_ms_shipped=0 +AND IDXC.Column_id=C.Column_id""" + if tablename is not None: + sqlcmd = sqlcmd + """ where lower(O.name)='%s'""" % tablename.lower() + return sqlcmd diff --git a/sqlor/mysqlor.py b/sqlor/mysqlor.py new file mode 100755 index 0000000..c864602 --- /dev/null +++ b/sqlor/mysqlor.py @@ -0,0 +1,143 @@ +# -*- coding:utf8 -*- +from appPublic.argsConvert import ArgsConvert,ConditionConvert + +from .sor import SQLor +from .const import ROWS +from .ddl_template_mysql import mysql_ddl_tmpl +class MySqlor(SQLor): + ddl_template = mysql_ddl_tmpl + db2modelTypeMapping = { + 'tinyint':'short', + 'smallint':'short', + 'mediumint':'long', + 'int':'long', + 'bigint':'long', + 'decimal':'float', + 'double':'float', + 'float':'float', + 'char':'char', + 'varchar':'str', + 'tinyblob':'text', + 'tinytext':'text', + 'mediumblob':'text', + 'mediumtext':'text', + 'blob':'text', + 'text':'text', + 'mediumblob':'text', + 'mediumtext':'text', + 'longblob':'bin', + 'longtext':'text', + 'barbinary':'text', + 'binary':'text', + 'date':'date', + 'time':'time', + 'datetime':'datetime', + 'timestamp':'datestamp', + 'year':'short', + } + model2dbTypemapping = { + 'date':'date', + 'time':'date', + 'timestamp':'timestamp', + 'str':'varchar', + 'char':'char', + 'short':'int', + 'long':'bigint', + 'float':'double', + 'text':'longtext', + 'bin':'longblob', + 'file':'longblob', + } + @classmethod + def isMe(self,name): + if name=='pymysql': + return True + return False + + def grammar(self): + return { + 'select':select_stmt, + } + + def placeHolder(self,varname,pos=None): + if varname=='__mainsql__' : + return '' + return '%s' + + def dataConvert(self,dataList): + if type(dataList) == type({}): + d = [ i for i in dataList.values()] + else: + d = [ i['value'] for i in dataList] + return tuple(d) + + def pagingSQLmodel(self): + return """select *, row_number() over(order by $[sort]$) as row_num_ from (%s) A limit $[from_line]$, $[rows]$""" + return """select * from (%s) A order by $[sort]$ +limit $[from_line]$,$[rows]$""" + + def tablesSQL(self): + sqlcmd = """SELECT lower(TABLE_NAME) as name, lower(TABLE_COMMENT) as title FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '%s'""" % self.dbdesc.get('dbname','unknown') + return sqlcmd + + def fieldsSQL(self,tablename=None): + sqlcmd=""" + select + lower(column_name) as name, + data_type as type, + case when character_maximum_length is null then NUMERIC_PRECISION + else character_maximum_length end + as length, + NUMERIC_SCALE as 'dec', + lower(is_nullable) as nullable, + column_comment as title, + lower(table_name) as table_name + from information_schema.columns where lower(TABLE_SCHEMA) = '%s' """ % self.dbdesc.get('dbname','unknown').lower() + if tablename is not None: + sqlcmd = sqlcmd + """and lower(table_name)='%s';""" % tablename.lower() + return sqlcmd + + def fkSQL(self,tablename=None): + sqlcmd = """SELECT C.TABLE_SCHEMA 拥有者, + C.REFERENCED_TABLE_NAME 父表名称 , + C.REFERENCED_COLUMN_NAME 父表字段 , + C.TABLE_NAME 子表名称, + C.COLUMN_NAME 子表字段, + C.CONSTRAINT_NAME 约束名, + T.TABLE_COMMENT 表注释, + R.UPDATE_RULE 约束更新规则, + R.DELETE_RULE 约束删除规则 + FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE C + JOIN INFORMATION_SCHEMA. TABLES T + ON T.TABLE_NAME = C.TABLE_NAME + JOIN INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS R + ON R.TABLE_NAME = C.TABLE_NAME + AND R.CONSTRAINT_NAME = C.CONSTRAINT_NAME + AND R.REFERENCED_TABLE_NAME = C.REFERENCED_TABLE_NAME + WHERE C.REFERENCED_TABLE_NAME IS NOT NULL ; + and C.TABLE_SCHEMA = '%s' +""" % self.dbdesc.get('dbname','unknown').lower() + if tablename is not None: + sqlcmd = sqlcmd + " and C.REFERENCED_TABLE_NAME = '%s'" % tablename.lower() + return sqlcmd + + def pkSQL(self,tablename=None): + sqlcmd = """SELECT distinct column_name as name FROM INFORMATION_SCHEMA.`KEY_COLUMN_USAGE` WHERE table_name='%s' AND constraint_name='PRIMARY' +""" % tablename.lower() + return sqlcmd + + def indexesSQL(self,tablename=None): + sqlcmd = """SELECT DISTINCT + lower(index_name) as index_name, + case NON_UNIQUE + when 1 then 'unique' + else '' + end as is_unique, + lower(column_name) as column_name +FROM + information_schema.statistics +WHERE + table_schema = '%s'""" % self.dbdesc.get('dbname','unknown') + if tablename is not None: + sqlcmd = sqlcmd + """ AND table_name = '%s'""" % tablename.lower() + return sqlcmd diff --git a/sqlor/oracleor.py b/sqlor/oracleor.py new file mode 100755 index 0000000..d34b0c4 --- /dev/null +++ b/sqlor/oracleor.py @@ -0,0 +1,130 @@ +from .sor import SQLor +from .ddl_template_oracle import oracle_ddl_tmpl +class Oracleor(SQLor): + ddl_template = oracle_ddl_tmpl + db2modelTypeMapping = { + 'char':'char', + 'nchar':'str', + 'varchar':'str', + 'varchar2':'str', + 'nvarchar2':'str', + 'number':'long', + 'integer':'long', + 'binary_float':'float', + 'binary_double':'float', + 'float':'float', + 'timestamp':'timestamp', + 'timestamp with time zone':'timestamp', + 'timestamp with local time zone':'timestamp', + 'interval year to moth':'date', + 'interval day to second':'timestamp', + 'clob':'text', + 'nclob':'text', + 'blob':'file', + 'bfile':'file', + 'date':'date', + } + model2dbTypemapping = { + 'date':'date', + 'time':'date', + 'timestamp':'date', + 'str':'varchar2', + 'char':'char', + 'short':'number', + 'long':'number', + 'float':'number', + 'text':'nclob', + 'file':'blob', + } + @classmethod + def isMe(self,name): + return name=='cx_Oracle' + + def grammar(self): + return { + 'select':select_stmt, + } + + def placeHolder(self,varname,pos=None): + if varname=='__mainsql__' : + return '' + return ':%s' % varname + + def dataConvert(self,dataList): + if type(dataList) == type({}): + return dataList + d = {} + [ d.update({i['name']:i['value']}) for i in dataList ] + return d + + def pagingSQLmodel(self): + return u"""select * +from ( + select page_s.*,rownum row_id + from (%s) page_s + order by $[sort]$ + ) +where row_id >=$[from_line]$ and row_id < $[end_line]$""" + + def tablesSQL(self): + sqlcmd = """select +lower(table_name) as name, +lower(decode(comments,null,table_name,comments)) as title +from USER_TAB_COMMENTS where table_type = 'TABLE'""" + return sqlcmd + + def fieldsSQL(self,tablename=None): + sqlcmd="""select lower(utc.COLUMN_NAME) name + ,utc.DATA_TYPE type + ,utc.DATA_LENGTH length + ,utc.data_scale dec + ,case when utc.nullable = 'Y' then 'yes' else 'no' end nullable + ,lower(nvl(ucc.comments,utc.COLUMN_NAME)) title + ,lower(utc.table_name) as table_name + from user_tab_cols utc left join USER_COL_COMMENTS ucc on utc.table_name = ucc.table_name and utc.COLUMN_NAME = ucc.COLUMN_NAME""" + if tablename is not None: + sqlcmd = sqlcmd + """ where lower(utc.table_name) = '%s'""" % tablename.lower() + return sqlcmd + + def fkSQL(self,tablename=None): + tablename = tablename.lower() + sqlcmd = """select + distinct(ucc.column_name) as field,rela.table_name as fk_table,rela.column_name as fk_field +from + user_constraints uc,user_cons_columns ucc, + ( + select t2.table_name,t2.column_name,t1.r_constraint_name + from user_constraints t1,user_cons_columns t2 + where t1.r_constraint_name=t2.constraint_name +) rela +where + uc.constraint_name=ucc.constraint_name + and uc.r_constraint_name=rela.r_constraint_name""" + if tablename is not None: + sqlcmd = sqlcmd + """ and lower(uc.table_name)='%s'""" % tablename.lower() + return sqlcmd + + def pkSQL(self,tablename=None): + sqlcmd = """ +select + lower(col.table_name) table_name, + lower(col.column_name) as field_name +from + user_constraints con,user_cons_columns col +where + con.constraint_name=col.constraint_name and con.constraint_type='P'""" + if tablename is not None: + sqlcmd = sqlcmd + """ and lower(col.table_name)='%s'""" % tablename.lower() + return sqlcmd + + def indexesSQL(self,tablename=None): + sqlcmd = """select + lower(a.index_name) index_name, + lower(a.UNIQUENESS) is_unique, + lower(b.column_name) column_name +from user_indexes a, user_ind_columns b +where a.index_name = b.index_name""" + if tablename is not None: + sqlcmd += """ and lower(a.table_name) = lower('%s')""" % tablename.lower() + return sqlcmd + diff --git a/sqlor/postgresqlor.py b/sqlor/postgresqlor.py new file mode 100755 index 0000000..7abe110 --- /dev/null +++ b/sqlor/postgresqlor.py @@ -0,0 +1,171 @@ +from .sor import SQLor +from .ddl_template_postgresql import postgresql_ddl_tmpl + +class PostgreSQLor(SQLor): + ddl_template = postgresql_ddl_tmpl + db2modelTypeMapping = { + 'smallint':'short', + 'integer':'long', + 'bigint':'llong', + 'decimal':'float', + 'numeric':'float', + 'real':'float', + 'double':'float', + 'serial':'long', + 'bigserial':'llong', + 'char':'char', + 'character':'char', + 'varchar':'str', + 'character varying':'str', + 'text':'text', + 'timestamp':'timestamp', + 'date':'date', + 'time':'time', + 'boolean':'char', + 'bytea':'file' + } + model2dbTypemapping = { + 'date':'date', + 'time':'date', + 'timestamp':'timestamp', + 'str':'varchar', + 'char':'char', + 'short':'smallint', + 'long':'integer', + 'float':'numeric', + 'text':'text', + 'file':'bytea', + } + @classmethod + def isMe(self,name): + return name=='psycopg2' or name=='pyguass' + + def grammar(self): + return { + 'select':select_stmt, + } + + def placeHolder(self,varname,i): + if varname=='__mainsql__' : + return '' + return '%%(%s)s' % varname + + def dataConvert(self,dataList): + if type(dataList) == type({}): + return dataList + d = { i['name']:i['value'] for i in dataList } + return d + + def pagingSQLmodel(self): + return u"""select * +from ( + select page_s.*,rownum row_id + from (%s) page_s + order by $[sort]$ + ) +where row_id >=$[from_line]$ and row_id < $[end_line]$""" + + def tablesSQL(self): + sqlcmd = """select x.name, y.description as title +from +(select a.name,c.oid + from + (select lower(tablename) as name from pg_tables where schemaname='public') a, + pg_class c +where a.name = c.relname) x +left join pg_description y +on x.oid=y.objoid + and y.objsubid='0'""" + return sqlcmd + + def fieldsSQL(self,tablename=None): + sqlcmd="""SELECT + a.attname AS name, + t.typname AS type, + case t.typname + when 'varchar' then a.atttypmod - 4 + when 'numeric' then (a.atttypmod - 4) / 65536 + else null + end as length, + case t.typname + when 'numeric' then (a.atttypmod - 4) %% 65536 + else null + end as dec, + case a.attnotnull + when 't' then 'no' + when 'f' then 'yes' + end as nullable, + b.description AS title +FROM pg_class c, pg_attribute a + LEFT JOIN pg_description b + ON a.attrelid = b.objoid + AND a.attnum = b.objsubid, pg_type t +WHERE lower(c.relname) = '%s' + AND a.attnum > 0 + AND a.attrelid = c.oid + AND a.atttypid = t.oid +ORDER BY a.attnum; + """ % tablename.lower() + return sqlcmd + + def fkSQL(self,tablename=None): + tablename = tablename.lower() + sqlcmd = """select + distinct(ucc.column_name) as field,rela.table_name as fk_table,rela.column_name as fk_field +from + user_constraints uc,user_cons_columns ucc, + ( + select t2.table_name,t2.column_name,t1.r_constraint_name + from user_constraints t1,user_cons_columns t2 + where t1.r_constraint_name=t2.constraint_name +) rela +where + uc.constraint_name=ucc.constraint_name + and uc.r_constraint_name=rela.r_constraint_name""" + if tablename is not None: + sqlcmd = sqlcmd + """ and lower(uc.table_name)='%s'""" % tablename.lower() + return sqlcmd + + def pkSQL(self,tablename=None): + sqlcmd=""" + select + pg_attribute.attname as field_name, + lower(pg_class.relname) as table_name +from pg_constraint + inner join pg_class + on pg_constraint.conrelid = pg_class.oid + inner join pg_attribute + on pg_attribute.attrelid = pg_class.oid + and pg_attribute.attnum = pg_constraint.conkey[1] + inner join pg_type + on pg_type.oid = pg_attribute.atttypid +where lower(pg_class.relname) = '%s' +and pg_constraint.contype='p' + """ % tablename.lower() + return sqlcmd + + def indexesSQL(self,tablename=None): + sqlcmd = """select + i.relname as index_name, + case ix.INDISUNIQUE + when 't' then 'unique' + else '' + end as is_unique, + a.attname as column_name +from + pg_class t, + pg_class i, + pg_index ix, + pg_attribute a +where + t.oid = ix.indrelid + and i.oid = ix.indexrelid + and a.attrelid = t.oid + and a.attnum = ANY(ix.indkey) + and t.relkind = 'r' + and lower(t.relname) = '%s' +order by + t.relname, + i.relname""" % tablename.lower() + return sqlcmd + diff --git a/sqlor/records.py b/sqlor/records.py new file mode 100755 index 0000000..f52b026 --- /dev/null +++ b/sqlor/records.py @@ -0,0 +1,27 @@ +from appPublic.dictObject import DictObject + +class Records: + def __init__(self,klass=DictObject): + self._records = [] + self.klass = klass + + def add(self,rec): + obj = self.klass(**rec) + self._records.append(obj) + + def get(self): + return self._records + + + def __iter__(self): + self.start = 0 + self.end = len(self._records) + return self + + def __next__(self): + if self.start < self.end: + d = self._records[self.start] + self.start = self.start + 1 + return d + else: + raise StopIteration diff --git a/sqlor/runsql.py b/sqlor/runsql.py new file mode 100755 index 0000000..93b98d1 --- /dev/null +++ b/sqlor/runsql.py @@ -0,0 +1,35 @@ +#!/usr/bin/python3 +import sys +import codecs +from sqlor.dbpools import runSQL +import asyncio + +def appinit(): + if len(sys.argv) < 4: + print(f'usage:\n {sys.argv[0]} path dbname sqlfile [k=v ...] \n') + sys.exit(1) + p = ProgramPath() + if len(sys.argv) > 1: + p = sys.argv[1] + config = getConfig(p) + DBPools(config.databases) + + +async def run(ns): + with codecs.open(sys.argv[3], 'r', 'utf-8') as f: + sql = f.read() + await runSQL(sys.argv[2], sql, ns) + +if __name__ == '__main__': + ns = {} + for x in sys.argv[3:]: + try: + k,v = x.split('=') + ns.update({k:v}) + except Exception as e: + print(x, 'key-value pair expected') + print(e) + + appinit() + loop = asyncio.get_event_loop() + loop.run_until_complete(run(ns)) diff --git a/sqlor/sor.py b/sqlor/sor.py new file mode 100755 index 0000000..c108f92 --- /dev/null +++ b/sqlor/sor.py @@ -0,0 +1,682 @@ +from traceback import format_exc +import os +import decimal +os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8' +import sys +from datetime import datetime, date +import codecs +import re +import json +from appPublic.myImport import myImport +from appPublic.dictObject import DictObject +from appPublic.unicoding import uDict +from appPublic.myTE import MyTemplateEngine +from appPublic.objectAction import ObjectAction +from appPublic.argsConvert import ArgsConvert,ConditionConvert +from appPublic.registerfunction import RegisterFunction +from appPublic.log import info, exception, debug +from .filter import DBFilter + +def db_type_2_py_type(o): + if isinstance(o,decimal.Decimal): + return float(o) + if isinstance(o,datetime): + # return '%020d' % int(o.timestamp() * 1000) + return str(o) + if isinstance(o, date): + return '%04d-%02d-%02d' % (o.year, o.month, o.day) + return o + +class SQLorException(Exception,object): + def __int__(self,**kvs): + supper(SQLException,self).__init__(self,**kvs) + self.dic = { + 'response':'error', + 'errtype':'SQLor', + 'errmsg':supper(SQLException,self).message, + } + + def __str__(self): + return 'errtype:%s,errmsg=%s' % (self.dic['errtype'],self.dic['errmsg']) + +def setValues(params,ns): + r = ns.get(params,os.getenv(params)) + return r + +def findNamedParameters(sql): + """ + return a list of named parameters + """ + re1 = '\$\{[_a-zA-Z_][a-zA-Z_0-9]*\}' + params1 = re.findall(re1,sql) + return params1 + + +def uniParams(params1): + ret = [] + for i in params1: + if i not in ret: + ret.append(i) + return ret + +def readsql(fn): + f = codecs.open(fn,'r','utf-8') + b = f.read() + f.close() + return b + +class SQLor(object): + def __init__(self,dbdesc=None,sqltp = '$[',sqlts = ']$',sqlvp = '${',sqlvs = '}$'): + self.conn = None + self.cur = None + self.async_mode = False + self.sqltp = sqltp + self.sqlts = sqlts + self.sqlvp = sqlvp + self.sqlvs = sqlvs + self.dbdesc = dbdesc + self.dbname = self.dbdesc.get('dbname') + if self.dbname: + self.dbname = self.dbname.lower() + self.writer = None + self.convfuncs = {} + self.cc = ConditionConvert() + self.dataChanged = False + self.metadatas={} + + async def get_schema(self): + def concat_idx_info(idxs): + x = [] + n = None + for i in idxs: + if not n or n.index_name != i.index_name: + if n: + x.append(n) + n = i + n.column_name = [i.column_name] + else: + n.column_name.append(i.column_name) + return x + + tabs = await self.tables() + schemas = [] + for t in tabs: + primary = await self.primary(t.name) + # print('primary=', primary) + indexes = concat_idx_info(await self.indexes(t.name)) + fields = await self.fields(t.name) + primary_fields = [f.field_name for f in primary] + if len(primary_fields)>0: + t.primary = [f.field_name for f in primary] + x = {} + x['summary'] = [t] + x['indexes'] = indexes + x['fields'] = fields + schemas.append(x) + return schemas + + def setMeta(self,tablename,meta): + self.metadatas[tablename.lower()] = meta + + def getMeta(self,tablename): + return self.metadatas.get(tablename.lower(),None) + + def removeMeta(self,tablename): + if getMeta(self.tablename): + del self.metadatas[tablename.lower()] + + def setCursor(self,async_mode,conn,cur): + self.async_mode = async_mode + self.conn = conn + self.cur = cur + + def getConn(self): + return self.conn + + def setConvertFunction(self,typ,func): + self.convfuncs.update({typ:func}) + + def convert(self,typ,value): + if self.convfuncs.get(typ,None) is not None: + return self.convfuncs[typ](value) + return value + @classmethod + def isMe(self,name): + return name=='sqlor' + + def pagingSQLmodel(self): + return "" + + def placeHolder(self,varname,pos=None): + if varname=='__mainsql__' : + return '' + return '?' + + def dataConvert(self,dataList): + return [ i.get('value',None) for i in dataList] + + def dataList(self,k,v): + a = [] + a.append({'name':k,'value':v}) + return a + + def cursor(self): + return self.cur + + def recordCnt(self,sql): + ret = u"""select count(*) rcnt from (%s) rowcount_table""" % sql + return ret + + def sortSQL(self, sql, NS): + sort = NS.get('sort',None) + if sort is None: + return sql + if isinstance(sort, list): + sort = ','.join(sort) + return sql + ' ORDER BY ' + sort + + def pagingSQL(self,sql,paging,NS): + """ + default it not support paging + """ + page = int(NS.get(paging['pagename'],1)) + rows = int(NS.get(paging['rowsname'],80)) + sort = NS.get(paging.get('sortname','sort'),None) + if isinstance(sort, list): + sort = ','.join(sort) + if not sort: + return sql + if page < 1: + page = 1 + from_line = (page - 1) * rows + end_line = page * rows + psql = self.pagingSQLmodel() + ns={ + 'from_line':from_line, + 'end_line':end_line, + 'rows':rows, + 'sort':sort + } + ac = ArgsConvert('$[',']$') + psql = ac.convert(psql,ns) + retSQL=psql % sql + return retSQL + + def filterSQL(self,sql,filters,NS): + ac = ArgsConvert('$[',']$') + fbs = [] + for f in filters: + vars = ac.findAllVariables(f) + if len(vars) > 0: + ignoreIt = False + for v in vars: + if not NS.get(v,False): + ignoreIt = True + if not ignoreIt: + f = ac.convert(f,NS) + else: + f = '1=1' + fbs.append(f) + fb = ' '.join(fbs) + retsql = u"""select * from (%s) filter_table where %s""" % (sql,fb) + return retsql + + async def runVarSQL(self,cursor,sql,NS): + """ + using a opened cursor to run a SQL statment with variable, the variable is setup in NS namespace + return a cursor with data + """ + markedSQL, datas = self.maskingSQL(sql,NS) + datas = self.dataConvert(datas) + try: + if self.async_mode: + return await cursor.execute(markedSQL,datas) + else: + return cursor.execute(markedSQL,datas) + + except Exception as e: + fe = format_exc() + exception(f"{markedSQL=},{datas=}, {e=}, {fe=}") + raise e + + def maskingSQL(self,org_sql,NS): + """ + replace all ${X}$ format variable exception named by '__mainsql__' in sql with '%s', + and return the marked sql sentent and variable list + sql is a sql statment with variable formated in '${X}$ + the '__mainsql__' variable use to identify the main sql will outout data. + NS is the name space the variable looking for, it is a variable dictionary + return (MarkedSQL,list_of_variable) + """ + sqltextAC = ArgsConvert(self.sqltp,self.sqlts) + sqlargsAC = ArgsConvert(self.sqlvp,self.sqlvs) + sql1 = sqltextAC.convert(org_sql,NS) + cc = ConditionConvert() + sql1 = cc.convert(sql1,NS) + vars = sqlargsAC.findAllVariables(sql1) + phnamespace = {} + [phnamespace.update({v:self.placeHolder(v,i)}) for i,v in enumerate(vars)] + m_sql = sqlargsAC.convert(sql1,phnamespace) + newdata = [] + for v in vars: + if v != '__mainsql__': + value = sqlargsAC.getVarValue(v,NS,None) + newdata += self.dataList(v,value) + + return (m_sql,newdata) + + def getSqlType(self,sql): + """ + return one of "qry", "dml" and "ddl" + ddl change the database schema + dml change the database data + qry query data + """ + + a = sql.lstrip(' \t\n\r') + a = a.lower() + al = a.split(' ') + if al[0] == 'select': + return 'qry' + if al[0] in ['update','delete','insert']: + return 'dml' + return 'ddl' + + async def execute(self,sql,value,callback,**kwargs): + sqltype = self.getSqlType(sql) + cur = self.cursor() + ret = await self.runVarSQL(cur,sql,value) + if sqltype == 'qry' and callback is not None: + fields = [ i[0].lower() for i in cur.description ] + rec = None + if self.async_mode: + rec = await cur.fetchone() + else: + rec = cur.fetchone() + + while rec is not None: + dic = {} + for i in range(len(fields)): + dic.update({fields[i] : db_type_2_py_type(rec[i])}) + dic = DictObject(**dic) + callback(dic,**kwargs) + if self.async_mode: + rec = await cur.fetchone() + else: + rec = cur.fetchone() + if sqltype == 'dml': + self.dataChanged = True + return ret + + async def executemany(self,sql,values): + cur = self.cursor() + markedSQL,datas = self.maskingSQL(sql,{}) + datas = [ self.dataConvert(d) for d in values ] + if self.async_mode: + await cur.executemany(markedSQL,datas) + else: + cur.executemany(markedSQL,datas) + + def pivotSQL(self,tablename,rowFields,columnFields,valueFields): + def maxValue(columnFields,valueFields,cfvalues): + sql = '' + for f in valueFields: + i = 0 + for field in columnFields: + for v in cfvalues[field]: + sql += """ + ,sum(%s_%d) %s_%d""" % (f,i,f,i) + i+=1 + return sql + def casewhen(columnFields,valueFields,cfvalues): + sql = '' + for f in valueFields: + i = 0 + for field in columnFields: + for v in cfvalues[field]: + if v is None: + sql += """,case when %s is null then %s + else 0 end as %s_%d -- %s + """ % (field,f,f,i,v) + else: + sql += """,case when trim(%s) = trim('%s') then %s + else 0 end as %s_%d -- %s + """ % (field,v,f,f,i,v) + + i += 1 + return sql + + cfvalues={} + for field in columnFields: + sqlstring = 'select distinct %s from %s' % (field,tablename) + v = [] + self.execute(sqlstring,{},lambda x: v.append(x)) + cfvalues[field] = [ i[field] for i in v ] + + sql =""" + select """ + ','.join(rowFields) + sql += maxValue(columnFields,valueFields,cfvalues) + sql += """ from + (select """ + ','.join(rowFields) + sql += casewhen(columnFields,valueFields,cfvalues) + sql += """ + from %s) + group by %s""" % (tablename,','.join(rowFields)) + return sql + + async def pivot(self,desc,tablename,rowFields,columnFields,valueFields): + sql = self.pivotSQL(tablename,rowFields,columnFields,valueFields) + desc['sql_string'] = sql + ret = [] + return await self.execute(sql,{},lambda x:ret.append(x)) + + def isSelectSql(self,sql): + return self.getSqlType(sql) == 'qry' + + def getSQLfromDesc(self,desc): + sql = '' + if 'sql_file' in desc.keys(): + sql = readsql(desc['sql_file']) + else: + sql = desc['sql_string'] + return sql + + async def record_count(self,desc,NS): + cnt_desc = {} + cnt_desc.update(desc) + sql = self.getSQLfromDesc(desc) + if desc.get('sql_file',False): + del cnt_desc['sql_file'] + cnt_desc['sql_string'] = self.recordCnt(sql) + class Cnt: + def __init__(self): + self.recs = [] + def handler(self,rec): + self.recs.append(rec) + + c = Cnt() + await self.runSQL(cnt_desc,NS,c.handler) + t = c.recs[0]['rcnt'] + return t + + async def runSQLPaging(self,desc,NS): + total = await self.record_count(desc,NS) + recs = await self.pagingdata(desc,NS) + data = { + "total":total, + "rows":recs + } + return data + + async def pagingdata(self,desc,NS): + paging_desc = {} + paging_desc.update(desc) + paging_desc.update( + { + "paging":{ + "rowsname":"rows", + "pagename":"page", + "sortname":"sort" + } + }) + if desc.get('sortfield',False): + NS['sort'] = desc.get('sortfield') + sql = self.getSQLfromDesc(desc) + if desc.get('sql_file',False): + del cnt_desc['sql_file'] + paging_desc['sql_string'] = self.pagingSQL(sql, + paging_desc.get('paging'),NS) + + class Cnt: + def __init__(self): + self.recs = [] + def handler(self,rec): + self.recs.append(rec) + + c = Cnt() + await self.runSQL(paging_desc,NS,c.handler) + return c.recs + + async def resultFields(self,desc,NS): + NS.update(rows=1,page=1) + r = await self.pagingdata(desc,NS) + ret = [ {'name':i[0],'type':i[1]} for i in self.cur.description ] + return ret + + async def runSQL(self,desc,NS,callback,**kw): + class RecordHandler: + def __init__(self,ns,name): + self.ns = ns + self.name = name + self.ns[name] = [] + + def handler(self,rec): + self.ns[self.name].append(rec) + + cur = self.cursor() + sql = self.getSQLfromDesc(desc) + if self.isSelectSql(sql): + if callback is None: + klass = desc.get('dataname','dummy') + if klass is not None: + rh = RecordHandler(NS,klass) + callback = rh.handler + else: + callback = None + await self.execute(sql,NS,callback) + + async def sqlExecute(self,desc,NS): + return await self.execute(desc,NS,None) + + async def sqlExe(self,sql,ns): + ret = [] + r = await self.execute(sql,ns, + callback=lambda x:ret.append(x)) + sqltype = self.getSqlType(sql) + if sqltype == 'dml': + return r + + return ret + + async def sqlPaging(self,sql,ns): + ret = [] + dic = { + "sql_string":sql + } + page = ns.get('page') + if not page: + ns['page'] = 1 + + total = await self.record_count(dic,ns) + rows = await self.pagingdata(dic,ns) + return { + 'total':total, + 'rows':rows + } + + async def tables(self): + sqlstring = self.tablesSQL() + ret = [] + await self.execute(sqlstring,{},lambda x:ret.append(x)) + return ret + + def indexesSQL(self,tablename): + """ + record of { + index_name, + index_type, + table_name, + column_name + } + """ + return None + + async def indexes(self,tablename=None): + sqlstring = self.indexesSQL(tablename.lower()) + if sqlstring is None: + return [] + recs = [] + await self.execute(sqlstring,{},lambda x:recs.append(x)) + return recs + + async def fields(self,tablename=None): + sqlstring = self.fieldsSQL(tablename) + recs = [] + await self.execute(sqlstring,{},lambda x:recs.append(x)) + ret = [] + for r in recs: + r.update({'type':self.db2modelTypeMapping.get(r['type'].lower(),'unknown')}) + r.update({'name':r['name'].lower()}) + ret.append(r) + return ret + + async def primary(self,tablename): + sqlstring = self.pkSQL(tablename) + recs = [] + await self.execute(sqlstring,{},lambda x:recs.append(x)) + # print('sql=', sqlstring, 'recs=', recs) + return recs + + async def fkeys(self,tablename): + sqlstring = self.fkSQL(tablename) + recs = [] + await self.execute(sqlstring,{},lambda x:recs.append(x)) + return recs + + async def createTable(self,tabledesc): + te = MyTemplateEngine([],'utf8','utf8') + desc = { + "sql_string":te.renders(self.ddl_template,tabledesc) + } + return await self.sqlExecute(desc,{}) + + async def getTableDesc(self,tablename): + desc = self.getMeta(tablename) + if desc: + return desc + desc = {} + summary = [ i for i in await self.tables() if tablename.lower() == i['name'].lower() ] + pris = await self.primary(tablename) + primary = [i['name'] for i in pris ] + summary[0]['primary'] = primary + desc['summary'] = summary + desc['fields'] = await self.fields(tablename=tablename) + desc['indexes'] = [] + idx = {} + idxrecs = await self.indexes(tablename) + for idxrec in idxrecs: + if idxrec['index_name'] == 'primary': + continue + if idxrec['index_name'] != idx.get('name',None): + if idx != {}: + desc['indexes'].append(idx) + idx = { + } + idx['name'] = idxrec['index_name'] + idx['idxtype'] = 'unique' if idxrec['is_unique'] else 'index' + idx['idxfields'] = [] + idx['idxfields'].append(idxrec['column_name']) + if idx != {}: + desc['indexes'].append(idx) + self.setMeta(tablename,desc) + return desc + + async def rollback(self): + if self.async_mode: + await self.conn.rollback() + else: + self.conn.rollback() + self.dataChanged = False + + async def commit(self): + if self.async_mode: + await self.conn.commit() + else: + self.conn.commit() + self.datachanged = False + + async def I(self,tablename): + return await self.getTableDesc(tablename) + + async def C(self,tablename,ns): + desc = await self.I(tablename) + keys = ns.keys() + fields = [ i['name'] for i in desc['fields'] if i['name'] in keys ] + fns = ','.join(fields) + vfns = ','.join(['${%s}$' % n for n in fields ]) + sql = 'insert into %s.%s (%s) values (%s)' % (self.dbname, tablename,fns,vfns) + rf = RegisterFunction() + rfname = f'{self.dbname}:{tablename}:c:before' + ret = await rf.exe(rfname, ns) + if isinstance(ret, dict): + ns.update(ret) + r = await self.runSQL({'sql_string':sql},ns.copy(), None) + await rf.exe(f'{self.dbname}:{tablename}:c:after', ns) + return r + + async def R(self,tablename,ns,filters=None): + desc = await self.I(tablename) + sql = 'select * from %s.%s' % (self.dbname, tablename.lower()) + if filters: + dbf = DBFilter(filters) + sub = dbf.genFilterString(ns) + if sub: + sql = '%s where %s' % (sql, sub) + + else: + fields = [ i['name'] for i in desc['fields'] ] + c = [ '%s=${%s}$' % (k,k) for k in ns.keys() if k in fields ] + if len(c) > 0: + sql = '%s where %s' % (sql,' and '.join(c)) + + if 'page' in ns.keys(): + if not 'sort' in ns.keys(): + ns['sort'] = desc['summary'][0]['primary'][0] + dic = { + "sql_string":sql + } + total = await self.record_count(dic,ns) + rows = await self.pagingdata(dic,ns) + return { + 'total':total, + 'rows':rows + } + else: + if ns.get('sort'): + sql = self.sortSQL(sql, ns) + return await self.sqlExe(sql,ns) + + async def U(self,tablename,ns): + desc = await self.I(tablename) + fields = [ i['name'] for i in desc['fields']] + condi = [ i for i in desc['summary'][0]['primary']] + newData = [ i for i in ns.keys() if i not in condi and i in fields] + c = [ '%s = ${%s}$' % (i,i) for i in condi ] + u = [ '%s = ${%s}$' % (i,i) for i in newData ] + c_str = ' and '.join(c) + u_str = ','.join(u) + sql = 'update %s.%s set %s where %s' % (self.dbname, tablename, + u_str,c_str) + rf = RegisterFunction() + ret = await rf.exe(f'{self.dbname}:{tablename}:u:before',ns) + if isinstance(ret, dict): + ns.update(ret) + r = await self.runSQL({'sql_string':sql},ns.copy() ,None) + await rf.exe(f'{self.dbname}:{tablename}:u:after',ns) + return r + + async def D(self,tablename,ns): + desc = await self.I(tablename) + fields = [ i['name'] for i in desc['fields']] + condi = [ i for i in desc['summary'][0]['primary']] + c = [ '%s = ${%s}$' % (i,i) for i in condi ] + c_str = ' and '.join(c) + sql = 'delete from %s.%s where %s' % (self.dbname, tablename,c_str) + rf = RegisterFunction() + ret = await rf.exe(f'{self.dbname}:{tablename}:d:before', ns) + if isinstance(ret, dict): + ns.update(ret) + r = await self.runSQL({'sql_string':sql},ns,None) + ns = await rf.exe(f'{self.dbname}:{tablename}:d:after', ns) + return r + diff --git a/sqlor/sqlite3or.py b/sqlor/sqlite3or.py new file mode 100755 index 0000000..959a5f4 --- /dev/null +++ b/sqlor/sqlite3or.py @@ -0,0 +1,100 @@ +import re +from .sor import SQLor + +class SQLite3or(SQLor): + db2modelTypeMapping = { + 'text':'str', + 'blob':'file', + 'int':'long', + 'integer':'long', + 'real':'float', + } + model2dbTypemapping = { + 'date':'text', + 'time':'text', + 'timestamp':'text', + 'str':'text', + 'char':'text', + 'short':'int', + 'long':'int', + 'float':'real', + 'text':'text', + 'file':'blob', + } + @classmethod + def isMe(self,name): + return name=='sqlite3' + + def placeHolder(self,varname,pos=None): + if varname=='__mainsql__' : + return '' + return '?' + + def dataConvert(self,dataList): + if type(dataList) == type({}): + d = [ i for i in dataList.values()] + else: + d = [ i['value'] for i in dataList] + return tuple(d) + + def pagingSQLmodel(self): + sql = u"""select * from (%s) order by $[sort]$ limit $[from_line]$,$[end_line]$""" + return sql + + def tablesSQL(self): + sqlcmd = u"""select name, tbl_name as title from sqlite_master where upper(type) = 'TABLE'""" + return sqlcmd + + def fieldsSQL(self,tablename): + # sqlcmd="""PRAGMA table_info('%s')""" % tablename.lower() + return sqlcmd + + def fields(self,tablename): + m = u'(\w+)\(((\d+)(,(\d+)){0,1})\){0,1}' + k = re.compile(m) + def typesplit(typ): + d = k.search(typ) + if d is None: + return typ,0,0 + + return d.group(1),int(d.group(3) if d.group(3) is not None else 0 ),int(d.group(5) if d.group(5) is not None else 0) + + sqlstring = self.fieldsSQL(tablename) + recs = [] + self.execute(sqlstring,callback=lambda x:recs.append(x)) + for r in recs: + t,l,d = typesplit(r['type']) + r['type'] = t + r['length'] = int(l) + r['dec'] = int(d) + r['title'] = r['name'] + ret = [] + for r in recs: + r.update({'type':self.db2modelTypeMapping.get(r['type'].lower(),'text')}) + r.update({'name':r['name'].lower()}) + ret.append(r) + return ret + + def fkSQL(self,tablename): + sqlcmd = "" + return sqlcmd + + def fkeys(self,tablename): + return [] + + def primary(self,tablename): + recs = self.fields(tablename) + ret = [ {'field':r['name']} for r in recs if r['pk'] == 1 ] + return ret + + def pkSQL(self,tablename): + sqlcmd = "" + return sqlcmd + + def indexesSQL(self,tablename=None): + sqlcmd = """select * from sqlite_master +where lower(type) = 'index' + """ + if tablename: + sqlcmd += "and lower(tbl_name)='" + tablename.lower() + "' " + return sqlcmd diff --git a/sqlor/version.py b/sqlor/version.py new file mode 100755 index 0000000..fc9509b --- /dev/null +++ b/sqlor/version.py @@ -0,0 +1,2 @@ +# fixed sor.py C function bug. +__version__ = "0.1.3" diff --git a/test/primary.py b/test/primary.py new file mode 100755 index 0000000..5c3c890 --- /dev/null +++ b/test/primary.py @@ -0,0 +1,47 @@ +import asyncio + +from sqlor.dbpools import DBPools + +dbs={ + "aiocfae":{ + "driver":"aiomysql", + "async_mode":True, + "coding":"utf8", + "dbname":"cfae", + "kwargs":{ + "user":"test", + "db":"cfae", + "password":"test123", + "host":"localhost" + } + }, + "cfae":{ + "driver":"mysql.connector", + "coding":"utf8", + "dbname":"cfae", + "kwargs":{ + "user":"test", + "db":"cfae", + "password":"test123", + "host":"localhost" + } + } +} + +loop = asyncio.get_event_loop() +pool = DBPools(dbs,loop=loop) +async def printTables(dbname): + r = await pool.getTables(dbname) + print('tables=',r) + +async def printFields(dbname,tablename): + r = await pool.getTableFields(dbname,tablename) + print(dbname,tablename,'fields=',r) + +async def printPrimary(dbname,tablename): + r = await pool.getTablePrimaryKey(dbname,tablename) + print(dbname,tablename,'primary key=',r) + +loop.run_until_complete(printTables('cfae')) +loop.run_until_complete(printFields('cfae','product')) +loop.run_until_complete(printPrimary('cfae','product')) diff --git a/test/sqlite3_sql.py b/test/sqlite3_sql.py new file mode 100755 index 0000000..680a016 --- /dev/null +++ b/test/sqlite3_sql.py @@ -0,0 +1,35 @@ +import sys +import os +import asyncio +import codecs +from sqlor.dbpools import DBPools + +async def exesqls(sqllines): + sqls = sqllines.split(';') + async with DBPools().sqlorContext('db') as sor: + for sql in sqls: + await sor.sqlExe(sql, {}) + +async def main(): + with codecs.open(sys.argv[2],'r', 'utf-8') as f: + txt = f.read() + await exesqls(txt) + + +if __name__ == '__main__': + if len(sys.argv) < 3: + print(f'Usage:{sys.argv[0]} sqlite3_db_file sqlfile') + sys.exit(1) + dbs = { + "db":{ + "driver":"aiosqlite", + "kwargs":{ + "dbname":sys.argv[1] + } + } + } + DBPools(dbs) + loop = asyncio.get_event_loop() + loop.run_until_complete(main()) + + diff --git a/test/t1.py b/test/t1.py new file mode 100755 index 0000000..15de10d --- /dev/null +++ b/test/t1.py @@ -0,0 +1,44 @@ +import asyncio + +from sqlor.dbpools import DBPools + +dbs={ + "aiocfae":{ + "driver":"aiomysql", + "async_mode":True, + "coding":"utf8", + "dbname":"cfae", + "kwargs":{ + "user":"test", + "db":"cfae", + "password":"test123", + "host":"localhost" + } + }, + "cfae":{ + "driver":"mysql.connector", + "coding":"utf8", + "dbname":"cfae", + "kwargs":{ + "user":"test", + "db":"cfae", + "password":"test123", + "host":"localhost" + } + } +} + +loop = asyncio.get_event_loop() +pool = DBPools(dbs,loop=loop) + +async def paging(): + @pool.runSQLPaging + def sql(db,ns): + return { + "sql_string":"select * from product", + } + x = await sql('aiocfae',{'rows':5,'page':1,"sort":"productid"}) + print('x=',x['total'],len(x['rows'])) + + +loop.run_until_complete(paging()) diff --git a/test/t2.py b/test/t2.py new file mode 100755 index 0000000..ac09b60 --- /dev/null +++ b/test/t2.py @@ -0,0 +1,64 @@ +import asyncio + +from sqlor.dbpools import DBPools + +dbs={ + "tasks":{ + "driver":"aiomysql", + "async_mode":True, + "coding":"utf8", + "dbname":"tasks", + "kwargs":{ + "user":"test", + "db":"tasks", + "password":"test123", + "host":"localhost" + } + }, + "aiocfae":{ + "driver":"aiomysql", + "async_mode":True, + "coding":"utf8", + "dbname":"cfae", + "kwargs":{ + "user":"test", + "db":"cfae", + "password":"test123", + "host":"localhost" + } + }, + "cfae":{ + "driver":"mysql.connector", + "coding":"utf8", + "dbname":"cfae", + "kwargs":{ + "user":"test", + "db":"cfae", + "password":"test123", + "host":"localhost" + } + } +} + +loop = asyncio.get_event_loop() +pool = DBPools(dbs,loop=loop) + +async def testfunc(): + @pool.runSQL + def sql(db,ns,callback): + return { + "sql_string":"select * from product", + } + x = await sql('cfae',{},print) + +async def testfunc1(): + @pool.runSQL + def sql(db,ns,callback): + return { + "sql_string":"select * from timeobjects", + } + print('testfunc1(),test tasks database select') + x = await sql('tasks',{},print) + +loop.run_until_complete(testfunc()) +loop.run_until_complete(testfunc1()) diff --git a/test/test.py b/test/test.py new file mode 100755 index 0000000..5cf717f --- /dev/null +++ b/test/test.py @@ -0,0 +1,60 @@ +import asyncio + +from sqlor.dbpools import DBPools +from sqlor.records import Records + +dbs={ + "aiocfae":{ + "driver":"aiomysql", + "async_mode":True, + "coding":"utf8", + "dbname":"cfae", + "kwargs":{ + "user":"test", + "db":"cfae", + "password":"test123", + "host":"localhost" + } + }, + "cfae":{ + "driver":"mysql.connector", + "coding":"utf8", + "dbname":"cfae", + "kwargs":{ + "user":"test", + "db":"cfae", + "password":"test123", + "host":"localhost" + } + } +} + +loop = asyncio.get_event_loop() +pool = DBPools(dbs,loop=loop) + +async def testfunc1(): + @pool.runSQL + def sql(db,ns,callback): + return { + "sql_string":"select * from product", + } + recs = Records() + x = await sql('cfae',{},recs.add) + print('--------------%s---------------' % x) + for r in recs: + print(type(r),r) + +async def testfunc2(): + @pool.runSQLResultFields + def sql(db,ns,callback=None): + return { + "sql_string":"select * from product", + } + recs = Records() + x = await sql('cfae',{},callback=recs.add) + print('-------%s------' % x) + for i in recs._records: + print("--",i) + +loop.run_until_complete(testfunc1()) +loop.run_until_complete(testfunc2())