fix: add env.get_module_dbname() for dynamic database lookup

This commit is contained in:
yumoqing 2026-05-08 15:34:15 +08:00
parent bc7cb9676f
commit 2e8630c5ab
2 changed files with 84 additions and 14 deletions

View File

@ -19,12 +19,17 @@ async def create_opportunity(
region: str = None region: str = None
) -> Dict: ) -> Dict:
"""创建商机""" """创建商机"""
env = ServerEnv()
dbname = env.get_module_dbname('opportunity_management')
config = getConfig() config = getConfig()
db = DBPools() db = DBPools()
db.databases = config.databases db.databases = config.databases
async with db.sqlorContext('opportunity_management') as sor:
async with db.sqlorContext(dbname) as sor:
opportunity_id = str(uuid.uuid4()).replace('-', '') opportunity_id = str(uuid.uuid4()).replace('-', '')
# 验证客户是否存在 # 验证客户是否存在
@ -75,12 +80,17 @@ async def update_opportunity_stage(
notes: str = None notes: str = None
) -> Dict: ) -> Dict:
"""更新商机阶段""" """更新商机阶段"""
env = ServerEnv()
dbname = env.get_module_dbname('opportunity_management')
config = getConfig() config = getConfig()
db = DBPools() db = DBPools()
db.databases = config.databases db.databases = config.databases
async with db.sqlorContext('opportunity_management') as sor:
async with db.sqlorContext(dbname) as sor:
opportunity_records = await sor.R("opportunities", {"filters": [{"field": "id", "op": "=", "value": opportunity_id}]}) opportunity_records = await sor.R("opportunities", {"filters": [{"field": "id", "op": "=", "value": opportunity_id}]})
if not opportunity_records or len(opportunity_records) == 0: if not opportunity_records or len(opportunity_records) == 0:
raise ValueError("商机不存在") raise ValueError("商机不存在")
@ -131,12 +141,17 @@ async def assign_opportunity(
new_owner_id: str new_owner_id: str
) -> Dict: ) -> Dict:
"""分配商机给销售人员""" """分配商机给销售人员"""
env = ServerEnv()
dbname = env.get_module_dbname('opportunity_management')
config = getConfig() config = getConfig()
db = DBPools() db = DBPools()
db.databases = config.databases db.databases = config.databases
async with db.sqlorContext('opportunity_management') as sor:
async with db.sqlorContext(dbname) as sor:
opportunity_records = await sor.R("opportunities", {"filters": [{"field": "id", "op": "=", "value": opportunity_id}]}) opportunity_records = await sor.R("opportunities", {"filters": [{"field": "id", "op": "=", "value": opportunity_id}]})
if not opportunity_records or len(opportunity_records) == 0: if not opportunity_records or len(opportunity_records) == 0:
raise ValueError("商机不存在") raise ValueError("商机不存在")
@ -176,12 +191,17 @@ async def get_opportunity_funnel(
region: str = None region: str = None
) -> Dict: ) -> Dict:
"""获取销售漏斗数据""" """获取销售漏斗数据"""
env = ServerEnv()
dbname = env.get_module_dbname('opportunity_management')
config = getConfig() config = getConfig()
db = DBPools() db = DBPools()
db.databases = config.databases db.databases = config.databases
async with db.sqlorContext('opportunity_management') as sor:
async with db.sqlorContext(dbname) as sor:
filters = [] filters = []
if start_date and end_date: if start_date and end_date:
filters.append({"field": "created_at", "op": ">=", "value": start_date}) filters.append({"field": "created_at", "op": ">=", "value": start_date})
@ -227,12 +247,17 @@ async def get_sales_performance(
owner_id: str = None owner_id: str = None
) -> Dict: ) -> Dict:
"""获取销售业绩数据""" """获取销售业绩数据"""
env = ServerEnv()
dbname = env.get_module_dbname('opportunity_management')
config = getConfig() config = getConfig()
db = DBPools() db = DBPools()
db.databases = config.databases db.databases = config.databases
async with db.sqlorContext('opportunity_management') as sor:
async with db.sqlorContext(dbname) as sor:
# 查询已关闭的商机(赢单) # 查询已关闭的商机(赢单)
won_filters = [ won_filters = [
{"field": "status", "op": "=", "value": "won"}, {"field": "status", "op": "=", "value": "won"},

View File

@ -23,12 +23,17 @@ async def create_opportunity(
region: str = None region: str = None
) -> Dict: ) -> Dict:
"""创建商机""" """创建商机"""
env = ServerEnv()
dbname = env.get_module_dbname('opportunity_management')
config = getConfig() config = getConfig()
db = DBPools() db = DBPools()
db.databases = config.databases db.databases = config.databases
async with db.sqlorContext('opportunity_management') as sor:
async with db.sqlorContext(dbname) as sor:
opportunity_id = str(uuid.uuid4()).replace('-', '') opportunity_id = str(uuid.uuid4()).replace('-', '')
# 验证客户是否存在 # 验证客户是否存在
@ -79,12 +84,17 @@ async def update_opportunity_stage(
notes: str = None notes: str = None
) -> Dict: ) -> Dict:
"""更新商机阶段""" """更新商机阶段"""
env = ServerEnv()
dbname = env.get_module_dbname('opportunity_management')
config = getConfig() config = getConfig()
db = DBPools() db = DBPools()
db.databases = config.databases db.databases = config.databases
async with db.sqlorContext('opportunity_management') as sor:
async with db.sqlorContext(dbname) as sor:
opportunity_records = await sor.R("opportunities", {"filters": [{"field": "id", "op": "=", "value": opportunity_id}]}) opportunity_records = await sor.R("opportunities", {"filters": [{"field": "id", "op": "=", "value": opportunity_id}]})
if not opportunity_records or len(opportunity_records) == 0: if not opportunity_records or len(opportunity_records) == 0:
raise ValueError("商机不存在") raise ValueError("商机不存在")
@ -135,12 +145,17 @@ async def assign_opportunity(
new_owner_id: str new_owner_id: str
) -> Dict: ) -> Dict:
"""分配商机给销售人员""" """分配商机给销售人员"""
env = ServerEnv()
dbname = env.get_module_dbname('opportunity_management')
config = getConfig() config = getConfig()
db = DBPools() db = DBPools()
db.databases = config.databases db.databases = config.databases
async with db.sqlorContext('opportunity_management') as sor:
async with db.sqlorContext(dbname) as sor:
opportunity_records = await sor.R("opportunities", {"filters": [{"field": "id", "op": "=", "value": opportunity_id}]}) opportunity_records = await sor.R("opportunities", {"filters": [{"field": "id", "op": "=", "value": opportunity_id}]})
if not opportunity_records or len(opportunity_records) == 0: if not opportunity_records or len(opportunity_records) == 0:
raise ValueError("商机不存在") raise ValueError("商机不存在")
@ -180,12 +195,17 @@ async def get_opportunity_funnel(
region: str = None region: str = None
) -> Dict: ) -> Dict:
"""获取销售漏斗数据""" """获取销售漏斗数据"""
env = ServerEnv()
dbname = env.get_module_dbname('opportunity_management')
config = getConfig() config = getConfig()
db = DBPools() db = DBPools()
db.databases = config.databases db.databases = config.databases
async with db.sqlorContext('opportunity_management') as sor:
async with db.sqlorContext(dbname) as sor:
filters = [] filters = []
if start_date and end_date: if start_date and end_date:
filters.append({"field": "created_at", "op": ">=", "value": start_date}) filters.append({"field": "created_at", "op": ">=", "value": start_date})
@ -231,12 +251,17 @@ async def get_sales_performance(
owner_id: str = None owner_id: str = None
) -> Dict: ) -> Dict:
"""获取销售业绩数据""" """获取销售业绩数据"""
env = ServerEnv()
dbname = env.get_module_dbname('opportunity_management')
config = getConfig() config = getConfig()
db = DBPools() db = DBPools()
db.databases = config.databases db.databases = config.databases
async with db.sqlorContext('opportunity_management') as sor:
async with db.sqlorContext(dbname) as sor:
# 查询已关闭的商机(赢单) # 查询已关闭的商机(赢单)
won_filters = [ won_filters = [
{"field": "status", "op": "=", "value": "won"}, {"field": "status", "op": "=", "value": "won"},
@ -292,12 +317,17 @@ async def predict_revenue(start_date: str, end_date: str):
return {"predicted_revenue": performance["weighted_pipeline_value"]} return {"predicted_revenue": performance["weighted_pipeline_value"]}
async def update_opportunity(opportunity_id: str, **updates): async def update_opportunity(opportunity_id: str, **updates):
env = ServerEnv()
dbname = env.get_module_dbname('opportunity_management')
config = getConfig() config = getConfig()
db = DBPools() db = DBPools()
db.databases = config.databases db.databases = config.databases
async with db.sqlorContext('opportunity_management') as sor:
async with db.sqlorContext(dbname) as sor:
updates["updated_at"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") updates["updated_at"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
await sor.U( await sor.U(
"opportunities", "opportunities",
@ -307,12 +337,17 @@ async def update_opportunity(opportunity_id: str, **updates):
return {"opportunity_id": opportunity_id, "updated": True} return {"opportunity_id": opportunity_id, "updated": True}
async def delete_opportunity(opportunity_id: str): async def delete_opportunity(opportunity_id: str):
env = ServerEnv()
dbname = env.get_module_dbname('opportunity_management')
config = getConfig() config = getConfig()
db = DBPools() db = DBPools()
db.databases = config.databases db.databases = config.databases
async with db.sqlorContext('opportunity_management') as sor:
async with db.sqlorContext(dbname) as sor:
await sor.D( await sor.D(
"opportunities", "opportunities",
{"filters": [{"field": "id", "op": "=", "value": opportunity_id}]} {"filters": [{"field": "id", "op": "=", "value": opportunity_id}]}
@ -320,22 +355,32 @@ async def delete_opportunity(opportunity_id: str):
return {"opportunity_id": opportunity_id, "deleted": True} return {"opportunity_id": opportunity_id, "deleted": True}
async def get_opportunity_by_id(opportunity_id: str): async def get_opportunity_by_id(opportunity_id: str):
env = ServerEnv()
dbname = env.get_module_dbname('opportunity_management')
config = getConfig() config = getConfig()
db = DBPools() db = DBPools()
db.databases = config.databases db.databases = config.databases
async with db.sqlorContext('opportunity_management') as sor:
async with db.sqlorContext(dbname) as sor:
records = await sor.R("opportunities", {"filters": [{"field": "id", "op": "=", "value": opportunity_id}]}) records = await sor.R("opportunities", {"filters": [{"field": "id", "op": "=", "value": opportunity_id}]})
return records[0] if records else None return records[0] if records else None
async def list_opportunities(**filters): async def list_opportunities(**filters):
env = ServerEnv()
dbname = env.get_module_dbname('opportunity_management')
config = getConfig() config = getConfig()
db = DBPools() db = DBPools()
db.databases = config.databases db.databases = config.databases
async with db.sqlorContext('opportunity_management') as sor:
async with db.sqlorContext(dbname) as sor:
filter_list = [] filter_list = []
for field, value in filters.items(): for field, value in filters.items():
filter_list.append({"field": field, "op": "=", "value": value}) filter_list.append({"field": field, "op": "=", "value": value})