sage/set_role_perm.py
2026-03-21 11:33:51 +08:00

73 lines
1.7 KiB
Python

import sys
import asyncio
from sqlor.dbpools import DBPools
from appPublic.jsonConfig import getConfig
from appPublic.dictObject import DictObject
from appPublic.uniqueID import getID
async def delete_anonymous_perm(sor, permid):
await sor.D('rolepermission', {
'roleid': 'anonymous',
'permid': permid
})
async def add_roleperm(sor, roleid, permid):
ns = {
'roleid': roleid,
'permid': permid
}
recs = await sor.R('rolepermission', ns.copy())
if recs:
print(f'{roleid}, {permid} 已经存在')
return
ns['id'] = getID()
await sor.C('rolepermission', ns)
print(f'{roleid}, {permid} perm add')
return
async def main():
config = getConfig('.')
db = DBPools(config.databases)
if len(sys.argv) < 3:
print(f'{sys.argv[0]} role path')
sys.exit(1)
role = sys.argv[1]
path = sys.argv[2]
async with db.sqlorContext('sage') as sor:
perms = None
if '%' in path:
perms = await sor.sqlExe("select * from permission where path like ${path}$", {'path': path})
else:
perms = await sor.R('permission', {'path': path})
if len(perms) < 1:
perms = [ DictObject(**{
'id': getID(),
'path': path
}) ]
await sor.C('permission', perms[0].copy())
if role in ['anonymous', 'any', 'logined']:
for p in perms:
await add_roleperm(sor, role, p.id)
return
orgtypeid, name = role.split('.')
ns = {
'orgtypeid': orgtypeid,
'name': name
}
roles = await sor.R('role', ns.copy())
if not roles:
ns['id'] = getID()
await sor.C('role', ns.copy())
else:
ns['id'] = roles[0].id
for p in perms:
await add_roleperm(sor, ns['id'], p.id)
await delete_anonymous_perm(sor, p.id)
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())