sageapi/test_server.py
Hermes Agent 40c480e488 fix: sageapi local deployment and test server
- Fix health check sqlExe calls: add missing ns parameter
- Fix accounting ID generation: use getID() instead of uuid4 (VARCHAR length)
- Fix accounting request_id: normalize empty to NULL to avoid UNIQUE constraint violation
- Fix test_server balance/update route: was incorrectly pointing to accounting handler
- Add test_server.py: standalone aiohttp test server for local development
- Update conf/config.yaml: local MySQL credentials (test/test)
- Update db/schema.sql and scripts/generate_ddl.py for local testing
- Fix router sync_status sqlExe call: add missing ns parameter
- Fix sync uapi_sync: use correct table/column names
2026-05-20 22:46:05 +08:00

172 lines
7.1 KiB
Python

#!/usr/bin/env python3
"""Standalone test server for SageAPI.
Uses aiohttp (already in Sage venv) to expose all SageAPI endpoints
without requiring the full Sage framework. Uses local MySQL (127.0.0.1).
"""
import asyncio
import json
import os
import sys
import time
# Setup paths
SAGE_DIR = '/home/hermesai/repos/sage'
SAGEAPI_DIR = '/home/hermesai/repos/sageapi'
sys.path.insert(0, SAGEAPI_DIR)
sys.path.insert(0, SAGE_DIR)
os.chdir(SAGE_DIR)
from aiohttp import web
from appPublic.folderUtils import ProgramPath
ProgramPath()
from appPublic.jsonConfig import getConfig
from appPublic.dictObject import DictObject
from sqlor.dbpools import DBPools
# ---------------------------------------------------------------------------
# Build a local database config that works on this machine
# sage DB: override host from 'db' to '127.0.0.1'
# sageapi_db: add our test database
# ---------------------------------------------------------------------------
config = getConfig('.')
# config.databases is already a DictObject from JsonConfig, so .kwargs access works
# Just override the sage DB host to use local MySQL
sage_db = config.databases.get('sage')
if sage_db and 'kwargs' in sage_db:
sage_db['kwargs']['host'] = '127.0.0.1'
# Override password to encrypted 'test' (the original encrypted pw decodes to empty)
sage_db['kwargs']['password'] = 'xGatnL1idCnFRCe4FaIWRQ=='
# Add sageapi_db as DictObject (nested dicts need DictObject for .kwargs access)
# Password must be encrypted (sqlor.SQLor.unpassword decrypts it)
sageapi_db_cfg = DictObject(
driver='mysql',
kwargs=DictObject(
host='127.0.0.1',
port=3306,
user='test',
password='xGatnL1idCnFRCe4FaIWRQ==', # encrypted 'test'
db='sageapi_db',
charset='utf8mb4',
)
)
config.databases['sageapi_db'] = sageapi_db_cfg
# Initialize DBPools with the (now modified) config
DBPools(config.databases)
# ---------------------------------------------------------------------------
# Monkey-patch ServerEnv.get_module_dbname
# ---------------------------------------------------------------------------
from ahserver.serverenv import ServerEnv
_serverenv_cls = getattr(ServerEnv, 'klass', ServerEnv)
def _patched_get_module_dbname(self, module):
if module == 'sageapi':
return 'sageapi_db'
return 'sage'
_serverenv_cls.get_module_dbname = _patched_get_module_dbname
# ---------------------------------------------------------------------------
# Import sageapi modules (must be after DBPools init and patching)
# ---------------------------------------------------------------------------
from sageapi.api.health import health_check, readiness_check
from sageapi.api.balance import get_customer_balance, update_customer_balance
from sageapi.api.accounting import create_accounting_record, query_accounting_records
from sageapi.api.users import query_users, get_user_by_id
from sageapi.api.pricing import query_pricing, get_pricing_by_llmid
_START_TIME = time.time()
# ---------------------------------------------------------------------------
# Route handlers
# ---------------------------------------------------------------------------
async def handle_health(request):
result = await health_check()
return web.Response(text=result, content_type='application/json')
async def handle_readiness(request):
result = await readiness_check()
return web.Response(text=result, content_type='application/json')
async def handle_balance_get(request):
customer_id = request.query.get('customer_id', '')
result = await get_customer_balance(customer_id=customer_id)
return web.Response(text=result if isinstance(result, str) else json.dumps(result), content_type='application/json')
async def handle_accounting_create(request):
body = await request.json()
result = await create_accounting_record(**body)
return web.Response(text=result if isinstance(result, str) else json.dumps(result), content_type='application/json')
async def handle_balance_update(request):
body = await request.json()
customer_id = body.get('customer_id', '')
balance = body.get('balance', 0.0)
result = await update_customer_balance(customer_id=customer_id, balance=balance)
return web.Response(text=result if isinstance(result, str) else json.dumps(result), content_type='application/json')
async def handle_accounting_query(request):
customer_id = request.query.get('customer_id', '')
result = await query_accounting_records(customer_id=customer_id)
return web.Response(text=result if isinstance(result, str) else json.dumps(result), content_type='application/json')
async def handle_users_query(request):
result = await query_users()
return web.Response(text=result if isinstance(result, str) else json.dumps(result), content_type='application/json')
async def handle_pricing_query(request):
result = await query_pricing()
return web.Response(text=result if isinstance(result, str) else json.dumps(result), content_type='application/json')
async def handle_sync_trigger(request):
from sageapi.sync.base_sync import run_all_syncs
result = await run_all_syncs()
return web.Response(text=result if isinstance(result, str) else json.dumps(result), content_type='application/json')
async def handle_sync_status(request):
result = {'success': False, 'data': []}
try:
env = ServerEnv()
dbname = env.get_module_dbname('sageapi')
sql = "SELECT entity_type, sync_status, last_sync_time, error_msg FROM sync_state ORDER BY entity_type"
async with DBPools().sqlorContext(dbname) as sor:
rows = await sor.sqlExe(sql, {})
result['data'] = rows if isinstance(rows, list) else rows.get('rows', [])
result['success'] = True
except Exception as e:
result['error'] = str(e)
return web.Response(text=json.dumps(result, ensure_ascii=False, default=str), content_type='application/json')
# ---------------------------------------------------------------------------
# Setup routes
# ---------------------------------------------------------------------------
def setup_app():
app = web.Application()
app.router.add_get('/api/v1/health', handle_health)
app.router.add_get('/api/v1/health/ready', handle_readiness)
app.router.add_get('/api/v1/balance', handle_balance_get)
app.router.add_post('/api/v1/balance/update', handle_balance_update)
app.router.add_post('/api/v1/accounting', handle_accounting_create)
app.router.add_get('/api/v1/accounting', handle_accounting_query)
app.router.add_get('/api/v1/users', handle_users_query)
app.router.add_get('/api/v1/pricing', handle_pricing_query)
app.router.add_post('/api/v1/admin/sync', handle_sync_trigger)
app.router.add_get('/api/v1/admin/sync/status', handle_sync_status)
return app
if __name__ == '__main__':
app = setup_app()
print("Starting SageAPI test server on http://127.0.0.1:18080")
print("Endpoints:")
for route in app.router.routes():
methods = route.method if hasattr(route, 'method') else 'GET'
path = route.resource.canonical if route.resource else str(route)
print(f" {methods} {path}")
web.run_app(app, host='127.0.0.1', port=18080, print=None)