This commit is contained in:
yumoqing 2025-10-19 11:33:43 +08:00
parent dec22bbc76
commit 26ac4571f4

View File

@ -1,7 +1,7 @@
import asyncio import asyncio
from traceback import format_exc from traceback import format_exc
from functools import wraps from functools import wraps, partial
import codecs import codecs
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
@ -37,138 +37,38 @@ def sqlorFactory(dbdesc):
k = findSubclass(driver,SQLor) k = findSubclass(driver,SQLor)
if k is None: if k is None:
return SQLor(dbdesc=dbdesc) return SQLor(dbdesc=dbdesc)
return k(dbdesc=dbdesc) return k(dbdesc=dbdesc.kwargs)
def sqlorFromFile(dbdef_file,coding='utf8'): class SqlorPool:
dbdef = loadf(dbdef_file) def __init__(self, create_func, maxconn=100):
return sqlorFactory(dbdef) self.sema = asyncio.Semaphore(maxconn)
self.create_func = create_func
self.sqlors = []
class LifeConnect: async def _new_sqlor(self):
def __init__(self,connfunc,kw,use_max=1000,async_mode=False): sqlor = await self.create_func()
self.connfunc = connfunc sqlor.connect()
self.async_mode = async_mode x = DictObject(**{
self.use_max = use_max 'used': True,
self.kw = kw 'use_at': time.time(),
self.conn = None 'sqlor':sqlor
self.used = False })
self.sqlors.append(x)
return x
def print(self): @asynccontextmanager
print(self.use_max) async def context(self):
print(self.conn) async with self.sema:
yielded_sqlor = None
async def _mkconn(self): for s in self.sqlors:
if self.async_mode: if not s.used:
self.conn = await self.connfunc(**self.kw) yielded_sqlor = s
else: if not yielded_sqlor:
self.conn = self.connfunc(**self.kw) yielded_sqlor = self._new_sqlor()
self.use_cnt = 0 yielded_sqlor.used = True
yielded_sqlor.use_at = time.time()
async def use(self): yield yielded_sqlor.sqlor
if self.conn is None: yielded_sqlor.used = False
await self._mkconn()
wait_time = 0.2
loop_cnt = 4
while loop_cnt > 0:
if await self.testok():
return self.conn
await asyncio.sleep(wait_time)
wait_time = wait_time + 0.4
loop_cnt = loop_cnt - 1
try:
await self.conn.close()
except:
pass
self.conn = None
await self._mkconn()
raise Exception('database connect break')
async def free(self,conn):
self.use_cnt = self.use_cnt + 1
return
if self.use_cnt >= self.use_max:
await self.conn.close()
await self._mkcomm()
async def testok(self):
if self.async_mode:
async with self.conn.cursor() as cur:
try:
await cur.execute('select 1 as cnt')
return True
except:
return False
else:
cur = self.conn.cursor()
try:
cur.execute('select 1 as cnt')
r = cur.fetchall()
return True
except:
return False
finally:
cur.close()
class ConnectionPool(object):
def __init__(self,dbdesc,loop):
self.dbdesc = dbdesc
self.async_mode = dbdesc.get('async_mode',False)
self.loop = loop
self.driver = myImport(self.dbdesc['driver'])
self.maxconn = dbdesc.get('maxconn',5)
self.maxuse = dbdesc.get('maxuse',1000)
self._pool = asyncio.Queue(self.maxconn)
self.connectObject = {}
self.use_cnt = 0
self.max_use = 1000
self.e_except = None
# self.lock = asyncio.Lock()
# self.lockstatus()
def lockstatus(self):
return
self.loop.call_later(5,self.lockstatus)
print('--lock statu=',self.lock.locked(),
'--pool empty()=',self._pool.empty(),
'--full()=',self._pool.full()
)
async def _fillPool(self):
for i in range(self.maxconn):
lc = await self.connect()
i = i + 1
async def connect(self):
lc = LifeConnect(self.driver.connect,self.dbdesc['kwargs'],
use_max=self.maxuse,async_mode=self.async_mode)
await self._pool.put(lc)
return lc
def isEmpty(self):
return self._pool.empty()
def isFull(self):
return self._pool.full()
async def aquire(self):
lc = await self._pool.get()
conn = await lc.use()
"""
with await self.lock:
self.connectObject[lc.conn] = lc
"""
self.connectObject[lc.conn] = lc
return conn
async def release(self,conn):
lc = None
"""
with await self.lock:
lc = self.connectObject.get(conn,None)
del self.connectObject[conn]
"""
lc = self.connectObject.get(conn,None)
del self.connectObject[conn]
await self._pool.put(lc)
@SingletonDecorator @SingletonDecorator
class DBPools: class DBPools:
@ -177,8 +77,7 @@ class DBPools:
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
self.loop = loop self.loop = loop
self.max_connect = max_connect self.max_connect = max_connect
self.sema = asyncio.Semaphore(max_connect) self._pools = {}
self._cpools = {}
self.databases = databases self.databases = databases
self.meta = {} self.meta = {}
@ -188,108 +87,35 @@ class DBPools:
return None return None
return desc.get('dbname') return desc.get('dbname')
def addDatabase(self,name,desc): def addDatabase(self, name, desc):
self.databases[name] = desc self.databases[name] = desc
async def getSqlor(self,name): async def getSqlor(self, name):
await self.sema.acquire()
desc = self.databases.get(name) desc = self.databases.get(name)
sor = sqlorFactory(desc) sor = sqlorFactory(desc)
sor.name = name await sor.connect()
a,conn,cur = await self._aquireConn(name)
sor.setCursor(a,conn,cur)
return sor return sor
async def freeSqlor(self,sor):
await self._releaseConn(sor.name,sor.conn,sor.cur)
self.sema.release()
@asynccontextmanager @asynccontextmanager
async def sqlorContext(self,name): async def sqlorContext(self, name):
pool = self._pools.get(name)
if pool is None:
f = partial(self.getSqlor, name)
pool = SqlorPool(f)
self._pools[name] = pool
self.e_except = None self.e_except = None
sqlor = await self.getSqlor(name)
try: try:
async with pool.context() as sqlor:
yield sqlor yield sqlor
if sqlor and sqlor.dataChanged:
await sqlor.commit()
except Exception as e: except Exception as e:
self.e_except = e self.e_except = e
cb = format_exc() cb = format_exc()
exception(f'sqlorContext():EXCEPTION{e}, {cb}') exception(f'sqlorContext():EXCEPTION{e}, {cb}')
if sqlor and sqlor.dataChanged: if sqlor and sqlor.dataChanged:
await sqlor.rollback() await sqlor.rollback()
finally:
if sqlor and sqlor.dataChanged:
await sqlor.commit()
await self.freeSqlor(sqlor)
def get_exception(self): def get_exception(self):
return self.e_except return self.e_except
async def _aquireConn(self,dbname):
"""
p = self._cpools.get(dbname)
if p == None:
p = ConnectionPool(self.databases.get(dbname),self.loop)
await p._fillPool()
self._cpools[dbname] = p
conn = await p.aquire()
if self.isAsyncDriver(dbname):
cur = await conn.cursor()
else:
cur = conn.cursor()
return self.isAsyncDriver(dbname),conn,cur
"""
dbdesc = self.databases.get(dbname)
driver = myImport(dbdesc['driver'])
conn = None
cur = None
desc = dbdesc['kwargs'].copy()
pw = desc.get('password')
if pw:
desc['password'] = unpassword(pw)
if self.isAsyncDriver(dbname):
if dbdesc['driver'] == 'sqlite3':
conn = await driver.connect(desc['dbname'])
else:
conn = await driver.connect(**desc)
cur = await conn.cursor()
return True,conn,cur
else:
if dbdesc['driver'] == 'sqlite3':
conn = driver.connect(desc['dbname'])
else:
conn = driver.connect(**desc)
cur = conn.cursor()
return False,conn,cur
def isAsyncDriver(self,dbname):
ret = self.databases[dbname].get('async_mode',False)
return ret
async def _releaseConn(self,dbname,conn,cur):
"""
if self.isAsyncDriver(dbname):
await cur.close()
else:
try:
cur.fetchall()
except:
pass
cur.close()
p = self._cpools.get(dbname)
if p == None:
raise Exception('database (%s) not connected'%dbname)
await p.release(conn)
"""
if self.isAsyncDriver(dbname):
try:
await cur.close()
except:
pass
else:
try:
cur.fetchall()
except:
pass
cur.close()
conn.close()