opportunity_management/wwwroot/opportunity_list.dspy
yumoqing 2547fad996 sync: local modifications to opportunity_management
- Updated core.py, init.py, opportunity_core.py, mysql.ddl.sql
- Updated UI: base.ui, opportunity_management.ui
- Added: opportunity.json, opportunity_stage_history.json
- Added API files: opportunities CRUD, sales_stages_list, check_tables
- Added UI/DSPY: opportunity_edit, opportunity_list, sales_stages_list
2026-04-28 18:54:47 +08:00

59 lines
2.3 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""List opportunities"""
import json
result = {'success': False, 'rows': [], 'total': 0}
try:
dbname = get_module_dbname('opportunity_management')
async with DBPools().sqlorContext(dbname) as sor:
where_clauses = []
where_ns = {}
customer_id = params_kw.get('customer_id', '')
status = params_kw.get('status', '')
if customer_id:
where_clauses.append("customer_id=${customer_id}$")
where_ns['customer_id'] = customer_id
if status:
where_clauses.append("status=${status}$")
where_ns['status'] = status
where_sql = " AND ".join(where_clauses)
where_prefix = " WHERE " if where_clauses else ""
count_sql = f"SELECT count(*) rcnt FROM opportunities{where_prefix}{where_sql}"
count_rows = await sor.sqlExe(count_sql, where_ns)
total = 0
if count_rows and len(count_rows) > 0:
r = count_rows[0]
if hasattr(r, 'keys'):
total = r.get('rcnt', 0)
elif isinstance(r, dict):
total = r.get('rcnt', 0)
elif hasattr(r, 'rcnt'):
total = r.rcnt
if total > 0:
ns = {'page': int(params_kw.get('page', 1)), 'rows': int(params_kw.get('rows', 20)), 'sort': params_kw.get('sort', 'created_at')}
sql = f"SELECT id, opportunity_name, customer_id, customer_name, estimated_amount, probability, expected_close_date, current_stage, status, owner_id, owner_name, source_type, region, created_at, updated_at FROM opportunities{where_prefix}{where_sql}"
query_ns = dict(list(ns.items()) + list(where_ns.items()))
rows = await sor.sqlExe(sql, query_ns)
# sqlExe with pagination returns {'total': N, 'rows': [...]}
if isinstance(rows, dict):
result['rows'] = rows.get('rows', [])
result['total'] = rows.get('total', total)
elif rows:
result['rows'] = [dict(r) if hasattr(r, 'keys') else r for r in rows]
result['total'] = total
result['success'] = True
except Exception as e:
result['error'] = str(e)
return json.dumps(result, ensure_ascii=False, default=str)