- Fix health check sqlExe calls: add missing ns parameter - Fix accounting ID generation: use getID() instead of uuid4 (VARCHAR length) - Fix accounting request_id: normalize empty to NULL to avoid UNIQUE constraint violation - Fix test_server balance/update route: was incorrectly pointing to accounting handler - Add test_server.py: standalone aiohttp test server for local development - Update conf/config.yaml: local MySQL credentials (test/test) - Update db/schema.sql and scripts/generate_ddl.py for local testing - Fix router sync_status sqlExe call: add missing ns parameter - Fix sync uapi_sync: use correct table/column names
172 lines
7.1 KiB
Python
172 lines
7.1 KiB
Python
#!/usr/bin/env python3
|
|
"""Standalone test server for SageAPI.
|
|
|
|
Uses aiohttp (already in Sage venv) to expose all SageAPI endpoints
|
|
without requiring the full Sage framework. Uses local MySQL (127.0.0.1).
|
|
"""
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
|
|
# Setup paths
|
|
SAGE_DIR = '/home/hermesai/repos/sage'
|
|
SAGEAPI_DIR = '/home/hermesai/repos/sageapi'
|
|
sys.path.insert(0, SAGEAPI_DIR)
|
|
sys.path.insert(0, SAGE_DIR)
|
|
os.chdir(SAGE_DIR)
|
|
|
|
from aiohttp import web
|
|
|
|
from appPublic.folderUtils import ProgramPath
|
|
ProgramPath()
|
|
|
|
from appPublic.jsonConfig import getConfig
|
|
from appPublic.dictObject import DictObject
|
|
from sqlor.dbpools import DBPools
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Build a local database config that works on this machine
|
|
# sage DB: override host from 'db' to '127.0.0.1'
|
|
# sageapi_db: add our test database
|
|
# ---------------------------------------------------------------------------
|
|
config = getConfig('.')
|
|
|
|
# config.databases is already a DictObject from JsonConfig, so .kwargs access works
|
|
# Just override the sage DB host to use local MySQL
|
|
sage_db = config.databases.get('sage')
|
|
if sage_db and 'kwargs' in sage_db:
|
|
sage_db['kwargs']['host'] = '127.0.0.1'
|
|
# Override password to encrypted 'test' (the original encrypted pw decodes to empty)
|
|
sage_db['kwargs']['password'] = 'xGatnL1idCnFRCe4FaIWRQ=='
|
|
|
|
# Add sageapi_db as DictObject (nested dicts need DictObject for .kwargs access)
|
|
# Password must be encrypted (sqlor.SQLor.unpassword decrypts it)
|
|
sageapi_db_cfg = DictObject(
|
|
driver='mysql',
|
|
kwargs=DictObject(
|
|
host='127.0.0.1',
|
|
port=3306,
|
|
user='test',
|
|
password='xGatnL1idCnFRCe4FaIWRQ==', # encrypted 'test'
|
|
db='sageapi_db',
|
|
charset='utf8mb4',
|
|
)
|
|
)
|
|
config.databases['sageapi_db'] = sageapi_db_cfg
|
|
|
|
# Initialize DBPools with the (now modified) config
|
|
DBPools(config.databases)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Monkey-patch ServerEnv.get_module_dbname
|
|
# ---------------------------------------------------------------------------
|
|
from ahserver.serverenv import ServerEnv
|
|
_serverenv_cls = getattr(ServerEnv, 'klass', ServerEnv)
|
|
|
|
def _patched_get_module_dbname(self, module):
|
|
if module == 'sageapi':
|
|
return 'sageapi_db'
|
|
return 'sage'
|
|
|
|
_serverenv_cls.get_module_dbname = _patched_get_module_dbname
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Import sageapi modules (must be after DBPools init and patching)
|
|
# ---------------------------------------------------------------------------
|
|
from sageapi.api.health import health_check, readiness_check
|
|
from sageapi.api.balance import get_customer_balance, update_customer_balance
|
|
from sageapi.api.accounting import create_accounting_record, query_accounting_records
|
|
from sageapi.api.users import query_users, get_user_by_id
|
|
from sageapi.api.pricing import query_pricing, get_pricing_by_llmid
|
|
|
|
_START_TIME = time.time()
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Route handlers
|
|
# ---------------------------------------------------------------------------
|
|
async def handle_health(request):
|
|
result = await health_check()
|
|
return web.Response(text=result, content_type='application/json')
|
|
|
|
async def handle_readiness(request):
|
|
result = await readiness_check()
|
|
return web.Response(text=result, content_type='application/json')
|
|
|
|
async def handle_balance_get(request):
|
|
customer_id = request.query.get('customer_id', '')
|
|
result = await get_customer_balance(customer_id=customer_id)
|
|
return web.Response(text=result if isinstance(result, str) else json.dumps(result), content_type='application/json')
|
|
|
|
async def handle_accounting_create(request):
|
|
body = await request.json()
|
|
result = await create_accounting_record(**body)
|
|
return web.Response(text=result if isinstance(result, str) else json.dumps(result), content_type='application/json')
|
|
|
|
async def handle_balance_update(request):
|
|
body = await request.json()
|
|
customer_id = body.get('customer_id', '')
|
|
balance = body.get('balance', 0.0)
|
|
result = await update_customer_balance(customer_id=customer_id, balance=balance)
|
|
return web.Response(text=result if isinstance(result, str) else json.dumps(result), content_type='application/json')
|
|
|
|
async def handle_accounting_query(request):
|
|
customer_id = request.query.get('customer_id', '')
|
|
result = await query_accounting_records(customer_id=customer_id)
|
|
return web.Response(text=result if isinstance(result, str) else json.dumps(result), content_type='application/json')
|
|
|
|
async def handle_users_query(request):
|
|
result = await query_users()
|
|
return web.Response(text=result if isinstance(result, str) else json.dumps(result), content_type='application/json')
|
|
|
|
async def handle_pricing_query(request):
|
|
result = await query_pricing()
|
|
return web.Response(text=result if isinstance(result, str) else json.dumps(result), content_type='application/json')
|
|
|
|
async def handle_sync_trigger(request):
|
|
from sageapi.sync.base_sync import run_all_syncs
|
|
result = await run_all_syncs()
|
|
return web.Response(text=result if isinstance(result, str) else json.dumps(result), content_type='application/json')
|
|
|
|
async def handle_sync_status(request):
|
|
result = {'success': False, 'data': []}
|
|
try:
|
|
env = ServerEnv()
|
|
dbname = env.get_module_dbname('sageapi')
|
|
sql = "SELECT entity_type, sync_status, last_sync_time, error_msg FROM sync_state ORDER BY entity_type"
|
|
async with DBPools().sqlorContext(dbname) as sor:
|
|
rows = await sor.sqlExe(sql, {})
|
|
result['data'] = rows if isinstance(rows, list) else rows.get('rows', [])
|
|
result['success'] = True
|
|
except Exception as e:
|
|
result['error'] = str(e)
|
|
return web.Response(text=json.dumps(result, ensure_ascii=False, default=str), content_type='application/json')
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Setup routes
|
|
# ---------------------------------------------------------------------------
|
|
def setup_app():
|
|
app = web.Application()
|
|
app.router.add_get('/api/v1/health', handle_health)
|
|
app.router.add_get('/api/v1/health/ready', handle_readiness)
|
|
app.router.add_get('/api/v1/balance', handle_balance_get)
|
|
app.router.add_post('/api/v1/balance/update', handle_balance_update)
|
|
app.router.add_post('/api/v1/accounting', handle_accounting_create)
|
|
app.router.add_get('/api/v1/accounting', handle_accounting_query)
|
|
app.router.add_get('/api/v1/users', handle_users_query)
|
|
app.router.add_get('/api/v1/pricing', handle_pricing_query)
|
|
app.router.add_post('/api/v1/admin/sync', handle_sync_trigger)
|
|
app.router.add_get('/api/v1/admin/sync/status', handle_sync_status)
|
|
return app
|
|
|
|
if __name__ == '__main__':
|
|
app = setup_app()
|
|
print("Starting SageAPI test server on http://127.0.0.1:18080")
|
|
print("Endpoints:")
|
|
for route in app.router.routes():
|
|
methods = route.method if hasattr(route, 'method') else 'GET'
|
|
path = route.resource.canonical if route.resource else str(route)
|
|
print(f" {methods} {path}")
|
|
web.run_app(app, host='127.0.0.1', port=18080, print=None)
|