sageapi/sageapi/api/pricing.py
Hermes Agent 5c65c78752 feat: sageapi initial scaffold
- 36 files: module structure following module-development-spec
- 7 table definitions: users_cache, pricing_cache, llmage_cache, uapi_cache, sync_state, customer_balance, accounting_records
- Auth: dapi_auth + uapi_sign
- Sync: base_sync + entity-specific sync modules (users/pricing/uapi/llmage)
- Cache: LRU cache manager with TTL
- API: balance, accounting, users, pricing, health handlers
- Config: YAML config loader with env overrides
- Utils: HTTP client, crypto helpers
2026-05-20 17:53:53 +08:00

83 lines
2.6 KiB
Python

"""Pricing query API handler.
Provides the RESTful endpoint for querying pricing information.
Reads from the local pricing_cache table synced from Sage.
"""
from __future__ import annotations
import json
from typing import Any
from appPublic.log import debug, error
from sqlor.dbpools import DBPools
async def query_pricing(
program_id: str | None = None,
model: str | None = None,
limit: int = 200,
offset: int = 0,
) -> str:
"""Query pricing information from the local cache.
Args:
program_id: Filter by pricing program ID.
model: Filter by model name (partial match).
limit: Maximum number of records to return.
offset: Number of records to skip.
Returns:
JSON string with success flag and pricing data.
"""
result: dict[str, Any] = {'success': False, 'data': [], 'total': 0}
try:
from ahserver.serverenv import ServerEnv
env = ServerEnv()
dbname = env.get_module_dbname('sageapi')
if not dbname:
result['error'] = 'No database configured for sageapi module'
return json.dumps(result, ensure_ascii=False, default=str)
conditions = []
params: dict[str, Any] = {'limit': limit, 'offset': offset}
if program_id:
conditions.append('program_id = ${program_id}$')
params['program_id'] = program_id
if model:
conditions.append('model LIKE ${model}$')
params['model'] = f'%{model}%'
where_clause = 'WHERE ' + ' AND '.join(conditions) if conditions else ''
# Count query
count_sql = f'SELECT COUNT(*) as cnt FROM pricing_cache {where_clause}'
async with DBPools().sqlorContext(dbname) as sor:
count_rows = await sor.sqlExe(count_sql, params)
total = count_rows[0]['cnt'] if count_rows else 0
result['total'] = total
if total > 0:
data_sql = f"""
SELECT program_id, model, input_price, output_price,
unit, currency, updated_at
FROM pricing_cache
{where_clause}
ORDER BY program_id, model
LIMIT ${limit}$ OFFSET ${offset}$
"""
rows = await sor.sqlExe(data_sql, params)
result['data'] = [dict(r) for r in (rows or [])]
result['success'] = True
debug(f'Pricing query: returned {result["total"]} records')
except Exception as e:
error(f'Pricing query failed: {e}')
result['error'] = str(e)
return json.dumps(result, ensure_ascii=False, default=str)