Compare commits
44 Commits
11945861b2
...
62514a2398
| Author | SHA1 | Date | |
|---|---|---|---|
| 62514a2398 | |||
| 6655fc6fde | |||
| be7faa09ed | |||
| 92f2857ce6 | |||
|
|
b56afa920e | ||
|
|
2708692f17 | ||
|
|
0a75d57225 | ||
|
|
08ffd2ef08 | ||
|
|
a66f93c4f7 | ||
|
|
f6eb868e77 | ||
|
|
85f3980582 | ||
|
|
f2efc672d8 | ||
|
|
bb3e0bfecd | ||
|
|
33b99b3acc | ||
|
|
6c88be2071 | ||
|
|
42d799bb49 | ||
|
|
9166698797 | ||
|
|
9fb0f25bf0 | ||
|
|
917a52f940 | ||
|
|
89a9756b00 | ||
|
|
accd07844d | ||
|
|
4d86499372 | ||
|
|
5c58c04440 | ||
|
|
98fe6e2b0e | ||
|
|
906073a516 | ||
|
|
fc0735478d | ||
|
|
72c17f6c12 | ||
|
|
93db80ac52 | ||
|
|
67571f1af8 | ||
|
|
91e93d778d | ||
|
|
0223b5f660 | ||
|
|
99ef9b20b2 | ||
|
|
5ea4bdf171 | ||
|
|
f1de7bd13e | ||
|
|
49f866a1aa | ||
|
|
3426bda470 | ||
|
|
91e717d80f | ||
|
|
9f12e041ce | ||
|
|
26ac4571f4 | ||
|
|
dec22bbc76 | ||
|
|
212751bdfb | ||
|
|
e10b8c5f16 | ||
|
|
cb4bb508d3 | ||
| a7f651888b |
76
README.md
76
README.md
@ -14,28 +14,83 @@ SQLOR is a database api for python3, it is base on the python's DBAPI2
|
||||
|
||||
## requirements
|
||||
|
||||
* python 3.5 or above
|
||||
* python 3.9 or above
|
||||
* asyncio
|
||||
* Oracle DBAPI2 driver(cx_Oracle)
|
||||
* MySQL DBAPI2 driver(mysql-connector)
|
||||
* Postgresql DBAPI2 driver(psycopg2-binrary)
|
||||
* Postgresql DBAPI2 driver(aiopg)
|
||||
* Asynchronous MySQL driver(aiomysql)
|
||||
* Asynchronous Postgresql driver(aiopg)
|
||||
* clickhouse(clickhouse-connect)
|
||||
* Other driver can be easy integreated
|
||||
|
||||
## Support Database Types
|
||||
* oracle
|
||||
* mysql, mariadb
|
||||
* TiDB
|
||||
* clickHouse
|
||||
* DuckDB
|
||||
* PostgreSQL
|
||||
* MsSQL
|
||||
|
||||
## Database Description json format
|
||||
password in json data is encrypted by aes.
|
||||
|
||||
* mysql, tidb, mariadb
|
||||
```
|
||||
{
|
||||
"databases":{
|
||||
"mydb":{
|
||||
"driver":"mysql",
|
||||
"kwargs":{
|
||||
"user":"test",
|
||||
"db":"cfae",
|
||||
"password":"test123",
|
||||
"host":"localhost"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
* PostgreSQL
|
||||
```
|
||||
{
|
||||
"databases":{
|
||||
"mydb":{
|
||||
"driver":"postgresql",
|
||||
"kwargs":{
|
||||
"user":"test",
|
||||
"dbname":"cfae",
|
||||
"password":"test123",
|
||||
"host":"localhost"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
* duckdb
|
||||
```
|
||||
{
|
||||
"databases":{
|
||||
"mydb":{
|
||||
"driver":"duckdb",
|
||||
"kwargs":{
|
||||
"dbfile":"ttt.ddb"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Using
|
||||
|
||||
```
|
||||
import asyncio
|
||||
|
||||
from sqlor.dbpools import DBPools
|
||||
from sqlor.dbpools import DBPools, sqlorContext
|
||||
|
||||
dbs={
|
||||
"aiocfae":{
|
||||
"driver":"aiomysql",
|
||||
"async_mode":True,
|
||||
"coding":"utf8",
|
||||
"dbname":"cfae",
|
||||
"mydb":{
|
||||
"driver":"mysql",
|
||||
"kwargs":{
|
||||
"user":"test",
|
||||
"db":"cfae",
|
||||
@ -72,8 +127,7 @@ loop = asyncio.get_event_loop()
|
||||
pool = DBPools(dbs,loop=loop)
|
||||
|
||||
async def testfunc():
|
||||
db = DBPools()
|
||||
async with db.sqlorContext('stock') as sor:
|
||||
async with sqlorContext('stock') as sor:
|
||||
# start a transaction
|
||||
# if exception happended, all change to database will rollback
|
||||
# else will commit
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
[metadata]
|
||||
name=sqlor
|
||||
version = 1.1.1
|
||||
description = a wrap for DBAPI, to make python run sql easy and safe
|
||||
version = 2.0.0
|
||||
description = a new version of sqlor, each db's sor need to plugin to sqlor, and dbdriver now a isolated module
|
||||
authors = yu moqing
|
||||
author_email = yumoqing@gmail.com
|
||||
readme = README.md
|
||||
@ -12,8 +12,9 @@ packages = find:
|
||||
requires_python = >=3.8
|
||||
install_requires =
|
||||
aiomysql
|
||||
aiopg
|
||||
duckdb
|
||||
clickhouse-connect
|
||||
clickhouse-driver
|
||||
PyMySQL
|
||||
aiosqlite
|
||||
asyncio
|
||||
|
||||
@ -1,7 +0,0 @@
|
||||
from .mysqlor import MySqlor
|
||||
|
||||
class AioMysqlor(MySqlor):
|
||||
@classmethod
|
||||
def isMe(self,name):
|
||||
return name=='aiomysql'
|
||||
|
||||
@ -1,4 +1,6 @@
|
||||
# -*- coding:utf8 -*-
|
||||
# pip install clickhouse-driver
|
||||
from clickhouse_driver import Client
|
||||
from appPublic.argsConvert import ArgsConvert, ConditionConvert
|
||||
from .sor import SQLor
|
||||
from .ddl_template_clickhouse import clickhouse_ddl_tmpl
|
||||
@ -95,3 +97,10 @@ WHERE database = '%s' AND table = '%s' AND is_in_primary_key = 1;
|
||||
# ClickHouse 不支持外键
|
||||
return "SELECT 'ClickHouse does not support foreign keys' AS msg;"
|
||||
|
||||
async def connect(self):
|
||||
self.conn = Client(**self.dbdesc)
|
||||
self.cur = self.conn
|
||||
|
||||
async def close(self):
|
||||
self.conn.close()
|
||||
|
||||
|
||||
266
sqlor/dbpools.py
266
sqlor/dbpools.py
@ -1,7 +1,7 @@
|
||||
|
||||
import time
|
||||
import asyncio
|
||||
from traceback import format_exc
|
||||
from functools import wraps
|
||||
from functools import wraps, partial
|
||||
import codecs
|
||||
|
||||
from contextlib import asynccontextmanager
|
||||
@ -21,7 +21,6 @@ from .oracleor import Oracleor
|
||||
from .sqlite3or import SQLite3or
|
||||
from .aiosqliteor import Aiosqliteor
|
||||
from .mysqlor import MySqlor
|
||||
from .aiomysqlor import AioMysqlor
|
||||
from .aiopostgresqlor import AioPostgresqlor
|
||||
|
||||
def sqlorFactory(dbdesc):
|
||||
@ -37,139 +36,39 @@ def sqlorFactory(dbdesc):
|
||||
k = findSubclass(driver,SQLor)
|
||||
if k is None:
|
||||
return SQLor(dbdesc=dbdesc)
|
||||
return k(dbdesc=dbdesc)
|
||||
return k(dbdesc=dbdesc.kwargs)
|
||||
|
||||
def sqlorFromFile(dbdef_file,coding='utf8'):
|
||||
dbdef = loadf(dbdef_file)
|
||||
return sqlorFactory(dbdef)
|
||||
class SqlorPool:
|
||||
def __init__(self, create_func, maxconn=100):
|
||||
self.sema = asyncio.Semaphore(maxconn)
|
||||
self.create_func = create_func
|
||||
self.sqlors = []
|
||||
|
||||
class LifeConnect:
|
||||
def __init__(self,connfunc,kw,use_max=1000,async_mode=False):
|
||||
self.connfunc = connfunc
|
||||
self.async_mode = async_mode
|
||||
self.use_max = use_max
|
||||
self.kw = kw
|
||||
self.conn = None
|
||||
self.used = False
|
||||
|
||||
def print(self):
|
||||
print(self.use_max)
|
||||
print(self.conn)
|
||||
async def _new_sqlor(self):
|
||||
sqlor = await self.create_func()
|
||||
await sqlor.connect()
|
||||
x = DictObject(**{
|
||||
'used': True,
|
||||
'use_at': time.time(),
|
||||
'sqlor':sqlor
|
||||
})
|
||||
self.sqlors.append(x)
|
||||
return x
|
||||
|
||||
async def _mkconn(self):
|
||||
if self.async_mode:
|
||||
self.conn = await self.connfunc(**self.kw)
|
||||
else:
|
||||
self.conn = self.connfunc(**self.kw)
|
||||
self.use_cnt = 0
|
||||
@asynccontextmanager
|
||||
async def context(self):
|
||||
async with self.sema:
|
||||
yielded_sqlor = None
|
||||
for s in self.sqlors:
|
||||
if not s.used:
|
||||
yielded_sqlor = s
|
||||
if not yielded_sqlor:
|
||||
yielded_sqlor = await self._new_sqlor()
|
||||
yielded_sqlor.used = True
|
||||
yielded_sqlor.use_at = time.time()
|
||||
yield yielded_sqlor.sqlor
|
||||
yielded_sqlor.used = False
|
||||
|
||||
async def use(self):
|
||||
if self.conn is None:
|
||||
await self._mkconn()
|
||||
wait_time = 0.2
|
||||
loop_cnt = 4
|
||||
while loop_cnt > 0:
|
||||
if await self.testok():
|
||||
return self.conn
|
||||
await asyncio.sleep(wait_time)
|
||||
wait_time = wait_time + 0.4
|
||||
loop_cnt = loop_cnt - 1
|
||||
try:
|
||||
await self.conn.close()
|
||||
except:
|
||||
pass
|
||||
self.conn = None
|
||||
await self._mkconn()
|
||||
raise Exception('database connect break')
|
||||
|
||||
async def free(self,conn):
|
||||
self.use_cnt = self.use_cnt + 1
|
||||
return
|
||||
if self.use_cnt >= self.use_max:
|
||||
await self.conn.close()
|
||||
await self._mkcomm()
|
||||
|
||||
async def testok(self):
|
||||
if self.async_mode:
|
||||
async with self.conn.cursor() as cur:
|
||||
try:
|
||||
await cur.execute('select 1 as cnt')
|
||||
return True
|
||||
except:
|
||||
return False
|
||||
else:
|
||||
cur = self.conn.cursor()
|
||||
try:
|
||||
cur.execute('select 1 as cnt')
|
||||
r = cur.fetchall()
|
||||
return True
|
||||
except:
|
||||
return False
|
||||
finally:
|
||||
cur.close()
|
||||
|
||||
class ConnectionPool(object):
|
||||
def __init__(self,dbdesc,loop):
|
||||
self.dbdesc = dbdesc
|
||||
self.async_mode = dbdesc.get('async_mode',False)
|
||||
self.loop = loop
|
||||
self.driver = myImport(self.dbdesc['driver'])
|
||||
self.maxconn = dbdesc.get('maxconn',5)
|
||||
self.maxuse = dbdesc.get('maxuse',1000)
|
||||
self._pool = asyncio.Queue(self.maxconn)
|
||||
self.connectObject = {}
|
||||
self.use_cnt = 0
|
||||
self.max_use = 1000
|
||||
self.e_except = None
|
||||
# self.lock = asyncio.Lock()
|
||||
# self.lockstatus()
|
||||
|
||||
def lockstatus(self):
|
||||
return
|
||||
self.loop.call_later(5,self.lockstatus)
|
||||
print('--lock statu=',self.lock.locked(),
|
||||
'--pool empty()=',self._pool.empty(),
|
||||
'--full()=',self._pool.full()
|
||||
)
|
||||
|
||||
async def _fillPool(self):
|
||||
for i in range(self.maxconn):
|
||||
lc = await self.connect()
|
||||
i = i + 1
|
||||
|
||||
async def connect(self):
|
||||
lc = LifeConnect(self.driver.connect,self.dbdesc['kwargs'],
|
||||
use_max=self.maxuse,async_mode=self.async_mode)
|
||||
await self._pool.put(lc)
|
||||
return lc
|
||||
|
||||
def isEmpty(self):
|
||||
return self._pool.empty()
|
||||
|
||||
def isFull(self):
|
||||
return self._pool.full()
|
||||
|
||||
async def aquire(self):
|
||||
lc = await self._pool.get()
|
||||
conn = await lc.use()
|
||||
"""
|
||||
with await self.lock:
|
||||
self.connectObject[lc.conn] = lc
|
||||
"""
|
||||
self.connectObject[lc.conn] = lc
|
||||
return conn
|
||||
|
||||
async def release(self,conn):
|
||||
lc = None
|
||||
"""
|
||||
with await self.lock:
|
||||
lc = self.connectObject.get(conn,None)
|
||||
del self.connectObject[conn]
|
||||
"""
|
||||
lc = self.connectObject.get(conn,None)
|
||||
del self.connectObject[conn]
|
||||
await self._pool.put(lc)
|
||||
|
||||
@SingletonDecorator
|
||||
class DBPools:
|
||||
def __init__(self,databases={},max_connect=100,loop=None):
|
||||
@ -177,8 +76,7 @@ class DBPools:
|
||||
loop = asyncio.get_event_loop()
|
||||
self.loop = loop
|
||||
self.max_connect = max_connect
|
||||
self.sema = asyncio.Semaphore(max_connect)
|
||||
self._cpools = {}
|
||||
self._pools = {}
|
||||
self.databases = databases
|
||||
self.meta = {}
|
||||
|
||||
@ -188,108 +86,36 @@ class DBPools:
|
||||
return None
|
||||
return desc.get('dbname')
|
||||
|
||||
def addDatabase(self,name,desc):
|
||||
def addDatabase(self, name, desc):
|
||||
self.databases[name] = desc
|
||||
|
||||
async def getSqlor(self,name):
|
||||
await self.sema.acquire()
|
||||
async def getSqlor(self, name):
|
||||
desc = self.databases.get(name)
|
||||
sor = sqlorFactory(desc)
|
||||
sor.name = name
|
||||
a,conn,cur = await self._aquireConn(name)
|
||||
sor.setCursor(a,conn,cur)
|
||||
await sor.connect()
|
||||
return sor
|
||||
|
||||
async def freeSqlor(self,sor):
|
||||
await self._releaseConn(sor.name,sor.conn,sor.cur)
|
||||
self.sema.release()
|
||||
|
||||
@asynccontextmanager
|
||||
async def sqlorContext(self,name):
|
||||
async def sqlorContext(self, name):
|
||||
pool = self._pools.get(name)
|
||||
if pool is None:
|
||||
f = partial(self.getSqlor, name)
|
||||
pool = SqlorPool(f)
|
||||
self._pools[name] = pool
|
||||
self.e_except = None
|
||||
sqlor = await self.getSqlor(name)
|
||||
sqlor = None
|
||||
try:
|
||||
yield sqlor
|
||||
async with pool.context() as sqlor:
|
||||
yield sqlor
|
||||
if sqlor and sqlor.dataChanged:
|
||||
await sqlor.commit()
|
||||
except Exception as e:
|
||||
self.e_except = e
|
||||
cb = format_exc()
|
||||
exception(f'sqlorContext():EXCEPTION{e}, {cb}')
|
||||
if sqlor and sqlor.dataChanged:
|
||||
await sqlor.rollback()
|
||||
finally:
|
||||
if sqlor and sqlor.dataChanged:
|
||||
await sqlor.commit()
|
||||
await self.freeSqlor(sqlor)
|
||||
|
||||
def get_exception(self):
|
||||
return self.e_except
|
||||
|
||||
async def _aquireConn(self,dbname):
|
||||
"""
|
||||
p = self._cpools.get(dbname)
|
||||
if p == None:
|
||||
p = ConnectionPool(self.databases.get(dbname),self.loop)
|
||||
await p._fillPool()
|
||||
self._cpools[dbname] = p
|
||||
conn = await p.aquire()
|
||||
if self.isAsyncDriver(dbname):
|
||||
cur = await conn.cursor()
|
||||
else:
|
||||
cur = conn.cursor()
|
||||
return self.isAsyncDriver(dbname),conn,cur
|
||||
"""
|
||||
dbdesc = self.databases.get(dbname)
|
||||
driver = myImport(dbdesc['driver'])
|
||||
conn = None
|
||||
cur = None
|
||||
desc = dbdesc['kwargs'].copy()
|
||||
pw = desc.get('password')
|
||||
if pw:
|
||||
desc['password'] = unpassword(pw)
|
||||
if self.isAsyncDriver(dbname):
|
||||
if dbdesc['driver'] == 'sqlite3':
|
||||
conn = await driver.connect(desc['dbname'])
|
||||
else:
|
||||
conn = await driver.connect(**desc)
|
||||
cur = await conn.cursor()
|
||||
return True,conn,cur
|
||||
else:
|
||||
if dbdesc['driver'] == 'sqlite3':
|
||||
conn = driver.connect(desc['dbname'])
|
||||
else:
|
||||
conn = driver.connect(**desc)
|
||||
cur = conn.cursor()
|
||||
return False,conn,cur
|
||||
|
||||
def isAsyncDriver(self,dbname):
|
||||
ret = self.databases[dbname].get('async_mode',False)
|
||||
return ret
|
||||
|
||||
async def _releaseConn(self,dbname,conn,cur):
|
||||
"""
|
||||
if self.isAsyncDriver(dbname):
|
||||
await cur.close()
|
||||
else:
|
||||
try:
|
||||
cur.fetchall()
|
||||
except:
|
||||
pass
|
||||
cur.close()
|
||||
p = self._cpools.get(dbname)
|
||||
if p == None:
|
||||
raise Exception('database (%s) not connected'%dbname)
|
||||
await p.release(conn)
|
||||
"""
|
||||
if self.isAsyncDriver(dbname):
|
||||
try:
|
||||
await cur.close()
|
||||
except:
|
||||
pass
|
||||
else:
|
||||
try:
|
||||
cur.fetchall()
|
||||
except:
|
||||
pass
|
||||
cur.close()
|
||||
conn.close()
|
||||
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
# -*- coding:utf8 -*-
|
||||
import duckdb
|
||||
from appPublic.argsConvert import ArgsConvert, ConditionConvert
|
||||
from .sor import SQLor
|
||||
from .const import ROWS
|
||||
@ -117,3 +118,15 @@ class DuckDBor(SQLor):
|
||||
sqlcmd += f" AND lower(table_name) = '{tablename.lower()}'"
|
||||
return sqlcmd
|
||||
|
||||
async def connect(self):
|
||||
self.conn = duckdb.connect(self.dbdesc.dbfile)
|
||||
self.cur = self.conn
|
||||
self.dbname = None
|
||||
|
||||
async def close(self):
|
||||
self.conn.close()
|
||||
|
||||
def unpassword(self):
|
||||
pass
|
||||
|
||||
|
||||
|
||||
@ -1,8 +1,7 @@
|
||||
# -*- coding:utf8 -*-
|
||||
from appPublic.argsConvert import ArgsConvert,ConditionConvert
|
||||
|
||||
import aiomysql
|
||||
from .sor import SQLor
|
||||
from .const import ROWS
|
||||
from .ddl_template_mysql import mysql_ddl_tmpl
|
||||
class MySqlor(SQLor):
|
||||
ddl_template = mysql_ddl_tmpl
|
||||
@ -50,7 +49,7 @@ class MySqlor(SQLor):
|
||||
}
|
||||
@classmethod
|
||||
def isMe(self,name):
|
||||
if name=='pymysql':
|
||||
if name in ['mysql', 'aiosqlor', 'tidb']:
|
||||
return True
|
||||
return False
|
||||
|
||||
@ -77,7 +76,7 @@ class MySqlor(SQLor):
|
||||
limit $[from_line]$,$[rows]$"""
|
||||
|
||||
def tablesSQL(self):
|
||||
sqlcmd = """SELECT lower(TABLE_NAME) as name, lower(TABLE_COMMENT) as title FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '%s'""" % self.dbdesc.get('dbname','unknown')
|
||||
sqlcmd = """SELECT lower(TABLE_NAME) as name, lower(TABLE_COMMENT) as title FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '%s'""" % self.dbname
|
||||
return sqlcmd
|
||||
|
||||
def fieldsSQL(self,tablename=None):
|
||||
@ -92,7 +91,7 @@ limit $[from_line]$,$[rows]$"""
|
||||
lower(is_nullable) as nullable,
|
||||
column_comment as title,
|
||||
lower(table_name) as table_name
|
||||
from information_schema.columns where lower(TABLE_SCHEMA) = '%s' """ % self.dbdesc.get('dbname','unknown').lower()
|
||||
from information_schema.columns where lower(TABLE_SCHEMA) = '%s' """ % self.dbname
|
||||
if tablename is not None:
|
||||
sqlcmd = sqlcmd + """and lower(table_name)='%s';""" % tablename.lower()
|
||||
return sqlcmd
|
||||
@ -116,7 +115,7 @@ limit $[from_line]$,$[rows]$"""
|
||||
AND R.REFERENCED_TABLE_NAME = C.REFERENCED_TABLE_NAME
|
||||
WHERE C.REFERENCED_TABLE_NAME IS NOT NULL ;
|
||||
and C.TABLE_SCHEMA = '%s'
|
||||
""" % self.dbdesc.get('dbname','unknown').lower()
|
||||
""" % self.dbname
|
||||
if tablename is not None:
|
||||
sqlcmd = sqlcmd + " and C.REFERENCED_TABLE_NAME = '%s'" % tablename.lower()
|
||||
return sqlcmd
|
||||
@ -137,7 +136,26 @@ limit $[from_line]$,$[rows]$"""
|
||||
FROM
|
||||
information_schema.statistics
|
||||
WHERE
|
||||
table_schema = '%s'""" % self.dbdesc.get('dbname','unknown')
|
||||
table_schema = '%s'""" % self.dbname
|
||||
if tablename is not None:
|
||||
sqlcmd = sqlcmd + """ AND table_name = '%s'""" % tablename.lower()
|
||||
return sqlcmd
|
||||
|
||||
async def connect(self):
|
||||
"""
|
||||
kwargs:
|
||||
host:
|
||||
port:
|
||||
user:
|
||||
password:
|
||||
db:
|
||||
"""
|
||||
dbdesc = self.dbdesc
|
||||
self.conn = await aiomysql.connect(**dbdesc)
|
||||
self.cur = await self.conn.cursor()
|
||||
self.dbname = dbdesc.get('db')
|
||||
|
||||
async def close(self):
|
||||
await self.cursor.close()
|
||||
await self.conn.close()
|
||||
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
import aiopg
|
||||
from .sor import SQLor
|
||||
from .ddl_template_postgresql import postgresql_ddl_tmpl
|
||||
|
||||
@ -169,3 +170,22 @@ order by
|
||||
i.relname""" % tablename.lower()
|
||||
return sqlcmd
|
||||
|
||||
async def connect():
|
||||
"""
|
||||
kwargs:
|
||||
dbname:
|
||||
user:
|
||||
password:
|
||||
host:
|
||||
port:
|
||||
"""
|
||||
kwargs = self.dbdesc
|
||||
dns = ' '.join([f'{k}={v}' for k, v in kwargs.items()])
|
||||
self.conn = await self.connect(dns)
|
||||
self.cur = await self.conn.cursor()
|
||||
self.dbname = kwargs.dbname.lower()
|
||||
|
||||
async def close():
|
||||
await self.cur.close()
|
||||
await self.conn.close()
|
||||
|
||||
|
||||
307
sqlor/sor.py
307
sqlor/sor.py
@ -1,3 +1,4 @@
|
||||
import asyncio
|
||||
from traceback import format_exc
|
||||
import os
|
||||
import decimal
|
||||
@ -7,7 +8,10 @@ from datetime import datetime, date
|
||||
import codecs
|
||||
import re
|
||||
import json
|
||||
import inspect
|
||||
from appPublic.worker import awaitify
|
||||
from appPublic.myImport import myImport
|
||||
from appPublic.jsonConfig import getConfig
|
||||
from appPublic.dictObject import DictObject
|
||||
from appPublic.unicoding import uDict
|
||||
from appPublic.myTE import MyTemplateEngine
|
||||
@ -15,8 +19,10 @@ from appPublic.objectAction import ObjectAction
|
||||
from appPublic.argsConvert import ArgsConvert,ConditionConvert
|
||||
from appPublic.registerfunction import RegisterFunction
|
||||
from appPublic.log import info, exception, debug
|
||||
from appPublic.aes import aes_decode_b64
|
||||
from .filter import DBFilter
|
||||
|
||||
|
||||
def db_type_2_py_type(o):
|
||||
if isinstance(o,decimal.Decimal):
|
||||
return float(o)
|
||||
@ -74,16 +80,20 @@ class SQLor(object):
|
||||
self.sqlts = sqlts
|
||||
self.sqlvp = sqlvp
|
||||
self.sqlvs = sqlvs
|
||||
self.dbdesc = dbdesc
|
||||
self.dbname = self.dbdesc.get('dbname')
|
||||
if self.dbname:
|
||||
self.dbname = self.dbname.lower()
|
||||
self.dbdesc = dbdesc.copy()
|
||||
self.unpassword()
|
||||
self.dbname = None
|
||||
self.writer = None
|
||||
self.convfuncs = {}
|
||||
self.cc = ConditionConvert()
|
||||
self.dataChanged = False
|
||||
self.metadatas={}
|
||||
|
||||
def unpassword(self):
|
||||
if self.dbdesc.password:
|
||||
key=getConfig().password_key
|
||||
self.dbdesc.password = aes_decode_b64(key, self.dbdesc.password)
|
||||
|
||||
async def get_schema(self):
|
||||
def concat_idx_info(idxs):
|
||||
x = []
|
||||
@ -164,7 +174,7 @@ class SQLor(object):
|
||||
return self.cur
|
||||
|
||||
def recordCnt(self,sql):
|
||||
ret = u"""select count(*) rcnt from (%s) rowcount_table""" % sql
|
||||
ret = """select count(*) rcnt from (%s) rowcount_table""" % sql
|
||||
return ret
|
||||
|
||||
def sortSQL(self, sql, NS):
|
||||
@ -221,7 +231,23 @@ class SQLor(object):
|
||||
retsql = u"""select * from (%s) filter_table where %s""" % (sql,fb)
|
||||
return retsql
|
||||
|
||||
async def runVarSQL(self,cursor,sql,NS):
|
||||
async def cur_executemany(self, cur, sql, ns):
|
||||
if inspect.iscoroutinefunction(cur.executemany):
|
||||
return await cur.executemany(sql, ns)
|
||||
f = awaitify(cur.executemany)
|
||||
return await f(sql, ns)
|
||||
|
||||
async def cur_execute(self, cur, sql, ns):
|
||||
if inspect.iscoroutinefunction(cur.execute):
|
||||
ret = await cur.execute(sql, ns)
|
||||
# debug(f'-------coroutine--{ret}-{cur}----')
|
||||
return ret
|
||||
f = awaitify(cur.execute)
|
||||
ret = await f(sql, ns)
|
||||
# debug(f'------function--{ret}------')
|
||||
return ret
|
||||
|
||||
async def runVarSQL(self, cursor, sql, NS):
|
||||
"""
|
||||
using a opened cursor to run a SQL statment with variable, the variable is setup in NS namespace
|
||||
return a cursor with data
|
||||
@ -229,10 +255,9 @@ class SQLor(object):
|
||||
markedSQL, datas = self.maskingSQL(sql,NS)
|
||||
datas = self.dataConvert(datas)
|
||||
try:
|
||||
if self.async_mode:
|
||||
return await cursor.execute(markedSQL,datas)
|
||||
else:
|
||||
return cursor.execute(markedSQL,datas)
|
||||
return await self.cur_execute(cursor,
|
||||
markedSQL,
|
||||
datas)
|
||||
|
||||
except Exception as e:
|
||||
fe = format_exc()
|
||||
@ -282,40 +307,55 @@ class SQLor(object):
|
||||
return 'dml'
|
||||
return 'ddl'
|
||||
|
||||
async def execute(self,sql,value,callback,**kwargs):
|
||||
async def fetchone(self, cur):
|
||||
if inspect.iscoroutinefunction(cur.fetchone):
|
||||
ret = await cur.fetchone()
|
||||
# debug(f'coro:sor.fetchone()={ret}, {type(ret)}')
|
||||
return ret
|
||||
ret = await cur.fetchone()
|
||||
# debug(f'func:sor.fetchone()={ret}, {type(ret)}')
|
||||
if isinstance(ret, asyncio.Future):
|
||||
ret = ret.result()
|
||||
return ret
|
||||
|
||||
async def execute(self, sql, value):
|
||||
sqltype = self.getSqlType(sql)
|
||||
cur = self.cursor()
|
||||
ret = await self.runVarSQL(cur,sql,value)
|
||||
if sqltype == 'qry' and callback is not None:
|
||||
fields = [ i[0].lower() for i in cur.description ]
|
||||
rec = None
|
||||
if self.async_mode:
|
||||
rec = await cur.fetchone()
|
||||
else:
|
||||
rec = cur.fetchone()
|
||||
|
||||
while rec is not None:
|
||||
dic = {}
|
||||
for i in range(len(fields)):
|
||||
dic.update({fields[i] : db_type_2_py_type(rec[i])})
|
||||
dic = DictObject(**dic)
|
||||
callback(dic,**kwargs)
|
||||
if self.async_mode:
|
||||
rec = await cur.fetchone()
|
||||
else:
|
||||
rec = cur.fetchone()
|
||||
ret = await self.runVarSQL(cur, sql, value)
|
||||
if sqltype == 'dml':
|
||||
self.dataChanged = True
|
||||
return ret
|
||||
return ret
|
||||
|
||||
async def _get_data(self, sql, ns):
|
||||
cur = self.cursor()
|
||||
sqltype = self.getSqlType(sql)
|
||||
if sqltype != 'qry':
|
||||
raise Exception('not select sql')
|
||||
ret = await self.execute(sql, ns)
|
||||
fields = [i[0].lower() for i in cur.description]
|
||||
while True:
|
||||
rec = await self.fetchone(cur)
|
||||
if rec is None:
|
||||
break
|
||||
if rec is None:
|
||||
break
|
||||
if isinstance(rec, dict):
|
||||
rec = rec.values()
|
||||
dic = {}
|
||||
for i in range(len(fields)):
|
||||
v = db_type_2_py_type(rec[i])
|
||||
dic.update({fields[i]: v})
|
||||
dic = DictObject(**dic)
|
||||
yield dic
|
||||
|
||||
async def executemany(self,sql,values):
|
||||
sqltype = self.getSqlType(sql)
|
||||
if sqltype != 'dml':
|
||||
raise Exception('no dml sql')
|
||||
cur = self.cursor()
|
||||
markedSQL,datas = self.maskingSQL(sql,{})
|
||||
markedSQL, _ = self.maskingSQL(sql,{})
|
||||
datas = [ self.dataConvert(d) for d in values ]
|
||||
if self.async_mode:
|
||||
await cur.executemany(markedSQL,datas)
|
||||
else:
|
||||
cur.executemany(markedSQL,datas)
|
||||
await self.cur_exectutemany(cur, markedSQL, datas)
|
||||
|
||||
def pivotSQL(self,tablename,rowFields,columnFields,valueFields):
|
||||
def maxValue(columnFields,valueFields,cfvalues):
|
||||
@ -366,76 +406,33 @@ class SQLor(object):
|
||||
|
||||
async def pivot(self,desc,tablename,rowFields,columnFields,valueFields):
|
||||
sql = self.pivotSQL(tablename,rowFields,columnFields,valueFields)
|
||||
desc['sql_string'] = sql
|
||||
ret = []
|
||||
return await self.execute(sql,{},lambda x:ret.append(x))
|
||||
return await self.execute(sql,{})
|
||||
|
||||
def isSelectSql(self,sql):
|
||||
return self.getSqlType(sql) == 'qry'
|
||||
|
||||
def getSQLfromDesc(self,desc):
|
||||
sql = ''
|
||||
if 'sql_file' in desc.keys():
|
||||
sql = readsql(desc['sql_file'])
|
||||
else:
|
||||
sql = desc['sql_string']
|
||||
return sql
|
||||
|
||||
async def record_count(self,desc,NS):
|
||||
cnt_desc = {}
|
||||
cnt_desc.update(desc)
|
||||
sql = self.getSQLfromDesc(desc)
|
||||
if desc.get('sql_file',False):
|
||||
del cnt_desc['sql_file']
|
||||
cnt_desc['sql_string'] = self.recordCnt(sql)
|
||||
class Cnt:
|
||||
def __init__(self):
|
||||
self.recs = []
|
||||
def handler(self,rec):
|
||||
self.recs.append(rec)
|
||||
async def record_count(self, sql, NS):
|
||||
sql = self.recordCnt(sql)
|
||||
async for r in self._get_data(sql, NS):
|
||||
t = r.rcnt
|
||||
return t
|
||||
return None
|
||||
|
||||
c = Cnt()
|
||||
await self.runSQL(cnt_desc,NS,c.handler)
|
||||
t = c.recs[0]['rcnt']
|
||||
return t
|
||||
|
||||
async def runSQLPaging(self,desc,NS):
|
||||
total = await self.record_count(desc,NS)
|
||||
recs = await self.pagingdata(desc,NS)
|
||||
data = {
|
||||
"total":total,
|
||||
"rows":recs
|
||||
async def pagingdata(self, sql, NS):
|
||||
paging = {
|
||||
"rowsname":"rows",
|
||||
"pagename":"page",
|
||||
"sortname":"sort"
|
||||
}
|
||||
return data
|
||||
|
||||
async def pagingdata(self,desc,NS):
|
||||
paging_desc = {}
|
||||
paging_desc.update(desc)
|
||||
paging_desc.update(
|
||||
{
|
||||
"paging":{
|
||||
"rowsname":"rows",
|
||||
"pagename":"page",
|
||||
"sortname":"sort"
|
||||
}
|
||||
})
|
||||
if desc.get('sortfield',False):
|
||||
NS['sort'] = desc.get('sortfield')
|
||||
sql = self.getSQLfromDesc(desc)
|
||||
if desc.get('sql_file',False):
|
||||
del cnt_desc['sql_file']
|
||||
paging_desc['sql_string'] = self.pagingSQL(sql,
|
||||
paging_desc.get('paging'),NS)
|
||||
if not NS.get('sort'):
|
||||
NS['sort'] = "id"
|
||||
|
||||
class Cnt:
|
||||
def __init__(self):
|
||||
self.recs = []
|
||||
def handler(self,rec):
|
||||
self.recs.append(rec)
|
||||
|
||||
c = Cnt()
|
||||
await self.runSQL(paging_desc,NS,c.handler)
|
||||
return c.recs
|
||||
sql = self.pagingSQL(sql, paging, NS)
|
||||
recs = []
|
||||
async for r in self._get_data(sql, NS):
|
||||
recs.append(r)
|
||||
return recs
|
||||
|
||||
async def resultFields(self,desc,NS):
|
||||
NS.update(rows=1,page=1)
|
||||
@ -443,52 +440,30 @@ class SQLor(object):
|
||||
ret = [ {'name':i[0],'type':i[1]} for i in self.cur.description ]
|
||||
return ret
|
||||
|
||||
async def runSQL(self,desc,NS,callback,**kw):
|
||||
class RecordHandler:
|
||||
def __init__(self,ns,name):
|
||||
self.ns = ns
|
||||
self.name = name
|
||||
self.ns[name] = []
|
||||
|
||||
def handler(self,rec):
|
||||
self.ns[self.name].append(rec)
|
||||
|
||||
cur = self.cursor()
|
||||
sql = self.getSQLfromDesc(desc)
|
||||
if self.isSelectSql(sql):
|
||||
if callback is None:
|
||||
klass = desc.get('dataname','dummy')
|
||||
if klass is not None:
|
||||
rh = RecordHandler(NS,klass)
|
||||
callback = rh.handler
|
||||
else:
|
||||
callback = None
|
||||
await self.execute(sql,NS,callback)
|
||||
|
||||
async def sqlExecute(self,desc,NS):
|
||||
return await self.execute(desc,NS,None)
|
||||
|
||||
async def sqlExe(self,sql,ns):
|
||||
ret = []
|
||||
r = await self.execute(sql,ns,
|
||||
callback=lambda x:ret.append(x))
|
||||
async def sqlExe(self, sql, ns):
|
||||
sqltype = self.getSqlType(sql)
|
||||
if sqltype == 'dml':
|
||||
if sqltype != 'qry':
|
||||
r = await self.execute(sql, ns)
|
||||
return r
|
||||
|
||||
if 'page' in ns.keys():
|
||||
cnt = await self.record_count(sql, ns)
|
||||
rows = await self.pagingdata(sql, ns)
|
||||
return {
|
||||
'total': cnt,
|
||||
'rows': rows
|
||||
}
|
||||
ret = []
|
||||
async for r in self._get_data(sql, ns):
|
||||
ret.append(r)
|
||||
return ret
|
||||
|
||||
async def sqlPaging(self,sql,ns):
|
||||
ret = []
|
||||
dic = {
|
||||
"sql_string":sql
|
||||
}
|
||||
page = ns.get('page')
|
||||
if not page:
|
||||
ns['page'] = 1
|
||||
|
||||
total = await self.record_count(dic,ns)
|
||||
rows = await self.pagingdata(dic,ns)
|
||||
total = await self.record_count(sql,ns)
|
||||
rows = await self.pagingdata(sql,ns)
|
||||
return {
|
||||
'total':total,
|
||||
'rows':rows
|
||||
@ -497,7 +472,9 @@ class SQLor(object):
|
||||
async def tables(self):
|
||||
sqlstring = self.tablesSQL()
|
||||
ret = []
|
||||
await self.execute(sqlstring,{},lambda x:ret.append(x))
|
||||
async for r in self._get_data(sqlstring,{}):
|
||||
r.name = r.name.lower()
|
||||
ret.append(r)
|
||||
return ret
|
||||
|
||||
def indexesSQL(self,tablename):
|
||||
@ -516,13 +493,16 @@ class SQLor(object):
|
||||
if sqlstring is None:
|
||||
return []
|
||||
recs = []
|
||||
await self.execute(sqlstring,{},lambda x:recs.append(x))
|
||||
async for r in self._get_data(sqlstring, {}):
|
||||
recs.append(r)
|
||||
return recs
|
||||
|
||||
async def fields(self,tablename=None):
|
||||
sqlstring = self.fieldsSQL(tablename)
|
||||
sql = self.fieldsSQL(tablename)
|
||||
recs = []
|
||||
await self.execute(sqlstring,{},lambda x:recs.append(x))
|
||||
async for r in self._get_data(sql, {}):
|
||||
recs.append(r)
|
||||
|
||||
ret = []
|
||||
for r in recs:
|
||||
r.update({'type':self.db2modelTypeMapping.get(r['type'].lower(),'unknown')})
|
||||
@ -531,31 +511,38 @@ class SQLor(object):
|
||||
return ret
|
||||
|
||||
async def primary(self,tablename):
|
||||
sqlstring = self.pkSQL(tablename)
|
||||
sql = self.pkSQL(tablename)
|
||||
recs = []
|
||||
await self.execute(sqlstring,{},lambda x:recs.append(x))
|
||||
# print('sql=', sqlstring, 'recs=', recs)
|
||||
async for r in self._get_data(sql, {}):
|
||||
recs.append(r)
|
||||
# debug(f'primary("{tablename}")={recs}, {sql}')
|
||||
return recs
|
||||
|
||||
async def fkeys(self,tablename):
|
||||
sqlstring = self.fkSQL(tablename)
|
||||
recs = []
|
||||
await self.execute(sqlstring,{},lambda x:recs.append(x))
|
||||
async for r in self._get_data(sqlstring, {}):
|
||||
recs.append(r)
|
||||
|
||||
return recs
|
||||
|
||||
async def createTable(self,tabledesc):
|
||||
te = MyTemplateEngine([],'utf8','utf8')
|
||||
desc = {
|
||||
"sql_string":te.renders(self.ddl_template,tabledesc)
|
||||
}
|
||||
return await self.sqlExecute(desc,{})
|
||||
sql = te.renders(self.ddl_template,tabledesc)
|
||||
return await self.execute(sql, {})
|
||||
|
||||
async def getTableDesc(self,tablename):
|
||||
tablename = tablename.lower()
|
||||
desc = self.getMeta(tablename)
|
||||
if desc:
|
||||
return desc
|
||||
desc = {}
|
||||
summary = [ i for i in await self.tables() if tablename.lower() == i['name'].lower() ]
|
||||
tables = await self.tables()
|
||||
summary = [i for i in tables if tablename == i.name]
|
||||
if not summary:
|
||||
e = Exception(f'table({tablename}) not exist')
|
||||
exception(f'{e}{format_exc()}')
|
||||
raise e
|
||||
pris = await self.primary(tablename)
|
||||
primary = [i['name'] for i in pris ]
|
||||
summary[0]['primary'] = primary
|
||||
@ -582,14 +569,14 @@ class SQLor(object):
|
||||
return desc
|
||||
|
||||
async def rollback(self):
|
||||
if self.async_mode:
|
||||
if inspect.iscoroutinefunction(self.conn.rollback):
|
||||
await self.conn.rollback()
|
||||
else:
|
||||
self.conn.rollback()
|
||||
self.dataChanged = False
|
||||
|
||||
async def commit(self):
|
||||
if self.async_mode:
|
||||
if inspect.iscoroutinefunction(self.conn.commit):
|
||||
await self.conn.commit()
|
||||
else:
|
||||
self.conn.commit()
|
||||
@ -610,7 +597,7 @@ class SQLor(object):
|
||||
ret = await rf.exe(rfname, ns)
|
||||
if isinstance(ret, dict):
|
||||
ns.update(ret)
|
||||
r = await self.runSQL({'sql_string':sql},ns.copy(), None)
|
||||
r = await self.execute(sql,ns.copy())
|
||||
await rf.exe(f'{self.dbname}:{tablename}:c:after', ns)
|
||||
return r
|
||||
|
||||
@ -632,11 +619,8 @@ class SQLor(object):
|
||||
if 'page' in ns.keys():
|
||||
if not 'sort' in ns.keys():
|
||||
ns['sort'] = desc['summary'][0]['primary'][0]
|
||||
dic = {
|
||||
"sql_string":sql
|
||||
}
|
||||
total = await self.record_count(dic,ns)
|
||||
rows = await self.pagingdata(dic,ns)
|
||||
total = await self.record_count(sql, ns)
|
||||
rows = await self.pagingdata(sql,ns)
|
||||
return {
|
||||
'total':total,
|
||||
'rows':rows
|
||||
@ -661,11 +645,11 @@ class SQLor(object):
|
||||
ret = await rf.exe(f'{self.dbname}:{tablename}:u:before',ns)
|
||||
if isinstance(ret, dict):
|
||||
ns.update(ret)
|
||||
r = await self.runSQL({'sql_string':sql},ns.copy() ,None)
|
||||
r = await self.execute(sql, ns.copy())
|
||||
await rf.exe(f'{self.dbname}:{tablename}:u:after',ns)
|
||||
return r
|
||||
|
||||
async def D(self,tablename,ns):
|
||||
async def D(self,tablename, ns):
|
||||
desc = await self.I(tablename)
|
||||
fields = [ i['name'] for i in desc['fields']]
|
||||
condi = [ i for i in desc['summary'][0]['primary']]
|
||||
@ -676,7 +660,12 @@ class SQLor(object):
|
||||
ret = await rf.exe(f'{self.dbname}:{tablename}:d:before', ns)
|
||||
if isinstance(ret, dict):
|
||||
ns.update(ret)
|
||||
r = await self.runSQL({'sql_string':sql},ns,None)
|
||||
r = await self.execute(sql, ns)
|
||||
ns = await rf.exe(f'{self.dbname}:{tablename}:d:after', ns)
|
||||
return r
|
||||
|
||||
async def connect(self):
|
||||
raise Exception('Not Implemented')
|
||||
|
||||
async def close(self):
|
||||
raise Exception('Not Implemented')
|
||||
|
||||
@ -1,2 +0,0 @@
|
||||
# fixed sor.py C function bug.
|
||||
__version__ = "0.1.3"
|
||||
Loading…
x
Reference in New Issue
Block a user