From 5c65c78752c10a184a021487e5db1b7d2e73822e Mon Sep 17 00:00:00 2001 From: Hermes Agent Date: Wed, 20 May 2026 17:53:53 +0800 Subject: [PATCH] feat: sageapi initial scaffold - 36 files: module structure following module-development-spec - 7 table definitions: users_cache, pricing_cache, llmage_cache, uapi_cache, sync_state, customer_balance, accounting_records - Auth: dapi_auth + uapi_sign - Sync: base_sync + entity-specific sync modules (users/pricing/uapi/llmage) - Cache: LRU cache manager with TTL - API: balance, accounting, users, pricing, health handlers - Config: YAML config loader with env overrides - Utils: HTTP client, crypto helpers --- README.md | 83 +++++++++++++++++ build.sh | 3 + conf/config.yaml | 35 +++++++ init/data.json | 7 ++ models/accounting_records.json | 162 +++++++++++++++++++++++++++++++++ models/customer_balance.json | 92 +++++++++++++++++++ models/llmage_cache.json | 113 +++++++++++++++++++++++ models/pricing_cache.json | 127 ++++++++++++++++++++++++++ models/sync_state.json | 107 ++++++++++++++++++++++ models/uapi_cache.json | 107 ++++++++++++++++++++++ models/users_cache.json | 113 +++++++++++++++++++++++ pyproject.toml | 13 +++ sageapi/__init__.py | 0 sageapi/api/__init__.py | 0 sageapi/api/accounting.py | 153 +++++++++++++++++++++++++++++++ sageapi/api/balance.py | 67 ++++++++++++++ sageapi/api/health.py | 60 ++++++++++++ sageapi/api/pricing.py | 82 +++++++++++++++++ sageapi/api/users.py | 83 +++++++++++++++++ sageapi/auth/__init__.py | 0 sageapi/auth/dapi_auth.py | 97 ++++++++++++++++++++ sageapi/auth/uapi_sign.py | 98 ++++++++++++++++++++ sageapi/cache/__init__.py | 0 sageapi/cache/cache_manager.py | 143 +++++++++++++++++++++++++++++ sageapi/config.py | 152 +++++++++++++++++++++++++++++++ sageapi/init.py | 110 ++++++++++++++++++++++ sageapi/router.py | 118 ++++++++++++++++++++++++ sageapi/sync/__init__.py | 0 sageapi/sync/base_sync.py | 125 +++++++++++++++++++++++++ sageapi/sync/llmage_sync.py | 65 +++++++++++++ sageapi/sync/pricing_sync.py | 65 +++++++++++++ sageapi/sync/uapi_sync.py | 65 +++++++++++++ sageapi/sync/user_sync.py | 76 ++++++++++++++++ sageapi/utils/__init__.py | 0 sageapi/utils/crypto.py | 95 +++++++++++++++++++ sageapi/utils/http_client.py | 115 +++++++++++++++++++++++ 36 files changed, 2731 insertions(+) create mode 100644 README.md create mode 100755 build.sh create mode 100644 conf/config.yaml create mode 100644 init/data.json create mode 100644 models/accounting_records.json create mode 100644 models/customer_balance.json create mode 100644 models/llmage_cache.json create mode 100644 models/pricing_cache.json create mode 100644 models/sync_state.json create mode 100644 models/uapi_cache.json create mode 100644 models/users_cache.json create mode 100644 pyproject.toml create mode 100644 sageapi/__init__.py create mode 100644 sageapi/api/__init__.py create mode 100644 sageapi/api/accounting.py create mode 100644 sageapi/api/balance.py create mode 100644 sageapi/api/health.py create mode 100644 sageapi/api/pricing.py create mode 100644 sageapi/api/users.py create mode 100644 sageapi/auth/__init__.py create mode 100644 sageapi/auth/dapi_auth.py create mode 100644 sageapi/auth/uapi_sign.py create mode 100644 sageapi/cache/__init__.py create mode 100644 sageapi/cache/cache_manager.py create mode 100644 sageapi/config.py create mode 100644 sageapi/init.py create mode 100644 sageapi/router.py create mode 100644 sageapi/sync/__init__.py create mode 100644 sageapi/sync/base_sync.py create mode 100644 sageapi/sync/llmage_sync.py create mode 100644 sageapi/sync/pricing_sync.py create mode 100644 sageapi/sync/uapi_sync.py create mode 100644 sageapi/sync/user_sync.py create mode 100644 sageapi/utils/__init__.py create mode 100644 sageapi/utils/crypto.py create mode 100644 sageapi/utils/http_client.py diff --git a/README.md b/README.md new file mode 100644 index 0000000..5ef37b9 --- /dev/null +++ b/README.md @@ -0,0 +1,83 @@ +# SageAPI + +独立 API 服务器模块,为外部客户提供余额查询、记账、用户/定价查询等 RESTful 接口。 + +## 架构 + +SageAPI 是独立于 Sage 的 API 服务器,支持多实例部署和水平扩展: + +- 每个实例独立运行:自己的数据库连接池、内存缓存、同步调度器 +- 多实例共享同一 MySQL 数据库,数据天然同步 +- 增量同步模式:基于变更时间戳和事件驱动的混合策略 +- 通过 dapi 提供认证,uapi 保障数据交互安全 +- 纯 RESTful API 服务(无 Web 界面,除管理后台) + +## 目录结构 + +``` +sageapi/ +├── sageapi/ # Python 包 +│ ├── __init__.py # 空文件(模块规范) +│ ├── init.py # 模块初始化:ServerEnv 注册 +│ ├── config.py # 配置管理 +│ ├── router.py # API 路由注册 +│ ├── auth/ # 认证模块 +│ │ ├── dapi_auth.py # dapi 认证中间件 +│ │ └── uapi_sign.py # uapi 签名验证 +│ ├── sync/ # 数据同步模块 +│ │ ├── base_sync.py # 同步基类 +│ │ ├── user_sync.py # 用户数据同步 +│ │ ├── pricing_sync.py # pricing 数据同步 +│ │ ├── uapi_sync.py # uapi 数据同步 +│ │ └── llmage_sync.py # llmage 数据同步 +│ ├── cache/ # 缓存层 +│ │ └── cache_manager.py # 进程内存 LRU 缓存管理 +│ ├── api/ # API 业务逻辑 +│ │ ├── balance.py # 客户余额查询 +│ │ ├── accounting.py # 记账接口 +│ │ ├── users.py # 用户查询 +│ │ ├── pricing.py # pricing 查询 +│ │ └── health.py # 健康检查 +│ └── utils/ # 工具函数 +│ ├── http_client.py # 上游 HTTP 客户端 +│ └── crypto.py # 加解密工具 +├── wwwroot/ # 前端(管理界面) +├── models/ # 数据库表定义 +├── json/ # CRUD 定义 +├── init/data.json # 初始化数据 +├── conf/config.yaml # 运行配置 +├── pyproject.toml +├── build.sh +└── README.md +``` + +## API 端点 + +| 方法 | 路径 | 描述 | 认证 | +|------|------|------|------| +| GET | /api/v1/health | 健康检查 | 无 | +| GET | /api/v1/balance | 客户余额查询 | dapi | +| POST | /api/v1/accounting | 创建记账记录 | dapi | +| GET | /api/v1/accounting | 查询记账记录 | dapi | +| GET | /api/v1/users | 用户查询 | dapi | +| GET | /api/v1/pricing | 定价查询 | dapi | + +## 配置 + +配置文件位于 `conf/config.yaml`。支持环境变量覆盖,命名规则为 `SAGEAPI_
_`: + +```bash +export SAGEAPI_DATABASE_HOST=10.0.0.1 +export SAGEAPI_UPSTREAM_DAPI_KEY=your_key +export SAGEAPI_UPSTREAM_DAPI_SECRET=your_secret +``` + +## 构建 + +```bash +bash build.sh +``` + +## 模块注册 + +在 Sage 启动时,`load_sageapi()` 函数自动注册所有公共函数到 `ServerEnv`,使它们在 dspy 脚本中可访问。 diff --git a/build.sh b/build.sh new file mode 100755 index 0000000..187a4f1 --- /dev/null +++ b/build.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +xls2ui -m ../models -o ../wwwroot sageapi *.json diff --git a/conf/config.yaml b/conf/config.yaml new file mode 100644 index 0000000..9270401 --- /dev/null +++ b/conf/config.yaml @@ -0,0 +1,35 @@ +# SageAPI 运行配置 +# 环境变量覆盖: SAGEAPI_
_ +# 示例: SAGEAPI_DATABASE_HOST=10.0.0.1 + +server: + host: "0.0.0.0" + port: 8080 + workers: 4 + +database: + host: "127.0.0.1" + port: 3306 + dbname: "sageapi_db" + user: "root" + password: "" + pool_size: 10 + +upstream: + sage_base_url: "http://127.0.0.1:9180" + dapi_key: "" + dapi_secret: "" + timeout: 30 + +sync: + interval_seconds: 60 + batch_size: 500 + max_retries: 3 + +cache: + ttl_seconds: 300 + max_entries: 10000 + +logging: + level: "INFO" + format: "%(asctime)s [%(levelname)s] %(name)s: %(message)s" diff --git a/init/data.json b/init/data.json new file mode 100644 index 0000000..0270db9 --- /dev/null +++ b/init/data.json @@ -0,0 +1,7 @@ +{ + "admin_user": { + "username": "admin", + "role": "sageapi_admin", + "description": "SageAPI 管理后台默认用户,由系统初始化创建" + } +} diff --git a/models/accounting_records.json b/models/accounting_records.json new file mode 100644 index 0000000..6a899ec --- /dev/null +++ b/models/accounting_records.json @@ -0,0 +1,162 @@ +{ + "summary": [ + { + "name": "accounting_records", + "title": "记账记录", + "primary": "id", + "classification": "business" + } + ], + "fields": [ + { + "name": "id", + "type": "VARCHAR(32)", + "nullable": false, + "default": "", + "comment": "主键" + }, + { + "name": "customer_id", + "type": "VARCHAR(32)", + "nullable": false, + "default": "", + "comment": "客户ID" + }, + { + "name": "llmid", + "type": "VARCHAR(32)", + "nullable": true, + "default": "", + "comment": "模型ID" + }, + { + "name": "model_name", + "type": "VARCHAR(128)", + "nullable": true, + "default": "", + "comment": "模型名称" + }, + { + "name": "pricing_id", + "type": "VARCHAR(32)", + "nullable": true, + "default": "", + "comment": "定价ID" + }, + { + "name": "input_tokens", + "type": "BIGINT", + "nullable": true, + "default": null, + "comment": "输入token数" + }, + { + "name": "output_tokens", + "type": "BIGINT", + "nullable": true, + "default": null, + "comment": "输出token数" + }, + { + "name": "total_tokens", + "type": "BIGINT", + "nullable": true, + "default": null, + "comment": "总token数" + }, + { + "name": "quantity", + "type": "DECIMAL(15,4)", + "nullable": true, + "default": null, + "comment": "用量(图片数/分钟数等)" + }, + { + "name": "amount", + "type": "DECIMAL(15,6)", + "nullable": false, + "default": 0.0, + "comment": "金额" + }, + { + "name": "currency", + "type": "VARCHAR(8)", + "nullable": false, + "default": "CNY", + "comment": "货币单位" + }, + { + "name": "request_id", + "type": "VARCHAR(64)", + "nullable": true, + "default": "", + "comment": "请求ID(幂等键)" + }, + { + "name": "transno", + "type": "VARCHAR(64)", + "nullable": true, + "default": "", + "comment": "事务号" + }, + { + "name": "status", + "type": "VARCHAR(16)", + "nullable": false, + "default": "pending", + "comment": "状态: pending/accounted/failed" + }, + { + "name": "created_at", + "type": "DATETIME", + "nullable": false, + "default": "CURRENT_TIMESTAMP", + "comment": "创建时间" + }, + { + "name": "updated_at", + "type": "DATETIME", + "nullable": false, + "default": "CURRENT_TIMESTAMP", + "comment": "更新时间" + } + ], + "idxfields": [ + { + "name": "idx_customer_id", + "fields": [ + "customer_id" + ], + "unique": false + }, + { + "name": "idx_llmid", + "fields": [ + "llmid" + ], + "unique": false + }, + { + "name": "idx_request_id", + "fields": [ + "request_id" + ], + "unique": true + }, + { + "name": "idx_status", + "fields": [ + "status" + ], + "unique": false + }, + { + "name": "idx_created_at", + "fields": [ + "created_at" + ], + "unique": false + } + ], + "codes": [] +} \ No newline at end of file diff --git a/models/customer_balance.json b/models/customer_balance.json new file mode 100644 index 0000000..622a92a --- /dev/null +++ b/models/customer_balance.json @@ -0,0 +1,92 @@ +{ + "summary": [ + { + "name": "customer_balance", + "title": "客户余额缓存", + "primary": "id", + "classification": "cache" + } + ], + "fields": [ + { + "name": "id", + "type": "VARCHAR(32)", + "nullable": false, + "default": "", + "comment": "主键,即 customer_id" + }, + { + "name": "balance", + "type": "DECIMAL(15,4)", + "nullable": false, + "default": 0.0, + "comment": "当前余额" + }, + { + "name": "currency", + "type": "VARCHAR(8)", + "nullable": false, + "default": "CNY", + "comment": "货币单位" + }, + { + "name": "credit_limit", + "type": "DECIMAL(15,4)", + "nullable": true, + "default": null, + "comment": "信用额度" + }, + { + "name": "last_recharge", + "type": "DATETIME", + "nullable": true, + "default": null, + "comment": "最后充值时间" + }, + { + "name": "last_consumption", + "type": "DATETIME", + "nullable": true, + "default": null, + "comment": "最后消费时间" + }, + { + "name": "status", + "type": "VARCHAR(16)", + "nullable": false, + "default": "active", + "comment": "状态: active/suspended/arrears" + }, + { + "name": "sync_version", + "type": "VARCHAR(32)", + "nullable": true, + "default": "", + "comment": "同步版本号" + }, + { + "name": "cached_at", + "type": "DATETIME", + "nullable": false, + "default": "CURRENT_TIMESTAMP", + "comment": "缓存更新时间" + } + ], + "idxfields": [ + { + "name": "idx_status", + "fields": [ + "status" + ], + "unique": false + }, + { + "name": "idx_balance", + "fields": [ + "balance" + ], + "unique": false + } + ], + "codes": [] +} \ No newline at end of file diff --git a/models/llmage_cache.json b/models/llmage_cache.json new file mode 100644 index 0000000..ad272be --- /dev/null +++ b/models/llmage_cache.json @@ -0,0 +1,113 @@ +{ + "summary": [ + { + "name": "llmage_cache", + "title": "模型API映射缓存", + "primary": "id", + "classification": "cache" + } + ], + "fields": [ + { + "name": "id", + "type": "VARCHAR(32)", + "nullable": false, + "default": "", + "comment": "主键" + }, + { + "name": "llmid", + "type": "VARCHAR(32)", + "nullable": false, + "default": "", + "comment": "关联模型ID" + }, + { + "name": "model_name", + "type": "VARCHAR(128)", + "nullable": true, + "default": "", + "comment": "模型名称" + }, + { + "name": "upappid", + "type": "VARCHAR(32)", + "nullable": false, + "default": "", + "comment": "上游应用ID" + }, + { + "name": "apiname", + "type": "VARCHAR(128)", + "nullable": false, + "default": "", + "comment": "API名称" + }, + { + "name": "api_url", + "type": "VARCHAR(512)", + "nullable": true, + "default": "", + "comment": "API端点URL" + }, + { + "name": "api_params", + "type": "TEXT", + "nullable": true, + "default": null, + "comment": "API参数配置JSON" + }, + { + "name": "model_params", + "type": "TEXT", + "nullable": true, + "default": null, + "comment": "模型参数配置JSON(max_tokens, temperature等)" + }, + { + "name": "status", + "type": "VARCHAR(16)", + "nullable": false, + "default": "active", + "comment": "状态: active/inactive" + }, + { + "name": "sync_version", + "type": "VARCHAR(32)", + "nullable": true, + "default": "", + "comment": "同步版本号" + }, + { + "name": "cached_at", + "type": "DATETIME", + "nullable": false, + "default": "CURRENT_TIMESTAMP", + "comment": "缓存写入时间" + } + ], + "idxfields": [ + { + "name": "idx_llmid", + "fields": [ + "llmid" + ], + "unique": false + }, + { + "name": "idx_upappid", + "fields": [ + "upappid" + ], + "unique": false + }, + { + "name": "idx_apiname", + "fields": [ + "apiname" + ], + "unique": false + } + ], + "codes": [] +} \ No newline at end of file diff --git a/models/pricing_cache.json b/models/pricing_cache.json new file mode 100644 index 0000000..da81f58 --- /dev/null +++ b/models/pricing_cache.json @@ -0,0 +1,127 @@ +{ + "summary": [ + { + "name": "pricing_cache", + "title": "定价数据缓存", + "primary": "id", + "classification": "cache" + } + ], + "fields": [ + { + "name": "id", + "type": "VARCHAR(32)", + "nullable": false, + "default": "", + "comment": "主键,对应 pricing_program id (ppid)" + }, + { + "name": "llmid", + "type": "VARCHAR(32)", + "nullable": false, + "default": "", + "comment": "关联模型ID" + }, + { + "name": "model_name", + "type": "VARCHAR(128)", + "nullable": true, + "default": "", + "comment": "模型名称" + }, + { + "name": "pricing_type", + "type": "VARCHAR(32)", + "nullable": false, + "default": "", + "comment": "计费类型: token/image/video/audio" + }, + { + "name": "input_price", + "type": "DECIMAL(10,6)", + "nullable": true, + "default": null, + "comment": "输入单价(每千token)" + }, + { + "name": "output_price", + "type": "DECIMAL(10,6)", + "nullable": true, + "default": null, + "comment": "输出单价(每千token)" + }, + { + "name": "unit_price", + "type": "DECIMAL(10,6)", + "nullable": true, + "default": null, + "comment": "统一单价(按次/按图/按分钟等)" + }, + { + "name": "currency", + "type": "VARCHAR(8)", + "nullable": false, + "default": "CNY", + "comment": "货币单位" + }, + { + "name": "status", + "type": "VARCHAR(16)", + "nullable": false, + "default": "active", + "comment": "状态: active/inactive/deprecated" + }, + { + "name": "effective_from", + "type": "DATETIME", + "nullable": true, + "default": null, + "comment": "生效时间" + }, + { + "name": "effective_to", + "type": "DATETIME", + "nullable": true, + "default": null, + "comment": "失效时间" + }, + { + "name": "sync_version", + "type": "VARCHAR(32)", + "nullable": true, + "default": "", + "comment": "同步版本号" + }, + { + "name": "cached_at", + "type": "DATETIME", + "nullable": false, + "default": "CURRENT_TIMESTAMP", + "comment": "缓存写入时间" + } + ], + "idxfields": [ + { + "name": "idx_llmid", + "fields": [ + "llmid" + ], + "unique": false + }, + { + "name": "idx_pricing_type", + "fields": [ + "pricing_type" + ], + "unique": false + }, + { + "name": "idx_status", + "fields": [ + "status" + ], + "unique": false + } + ], + "codes": [] +} \ No newline at end of file diff --git a/models/sync_state.json b/models/sync_state.json new file mode 100644 index 0000000..8ccf0f3 --- /dev/null +++ b/models/sync_state.json @@ -0,0 +1,107 @@ +{ + "summary": [ + { + "name": "sync_state", + "title": "同步状态跟踪", + "primary": "id", + "classification": "system" + } + ], + "fields": [ + { + "name": "id", + "type": "VARCHAR(32)", + "nullable": false, + "default": "", + "comment": "主键" + }, + { + "name": "entity_type", + "type": "VARCHAR(32)", + "nullable": false, + "default": "", + "comment": "实体类型: users/pricing/llmage/uapi" + }, + { + "name": "entity_id", + "type": "VARCHAR(64)", + "nullable": true, + "default": "", + "comment": "实体标识(全量同步时为空)" + }, + { + "name": "last_sync_time", + "type": "DATETIME", + "nullable": true, + "default": null, + "comment": "最后同步时间" + }, + { + "name": "sync_version", + "type": "VARCHAR(32)", + "nullable": true, + "default": "", + "comment": "Sage返回的版本标识" + }, + { + "name": "sync_status", + "type": "VARCHAR(16)", + "nullable": false, + "default": "success", + "comment": "同步状态: success/pending/failed" + }, + { + "name": "error_msg", + "type": "TEXT", + "nullable": true, + "default": null, + "comment": "失败原因" + }, + { + "name": "retry_count", + "type": "INT", + "nullable": false, + "default": 0, + "comment": "重试次数" + }, + { + "name": "created_at", + "type": "DATETIME", + "nullable": false, + "default": "CURRENT_TIMESTAMP", + "comment": "创建时间" + }, + { + "name": "updated_at", + "type": "DATETIME", + "nullable": false, + "default": "CURRENT_TIMESTAMP", + "comment": "更新时间" + } + ], + "idxfields": [ + { + "name": "idx_entity_type", + "fields": [ + "entity_type" + ], + "unique": false + }, + { + "name": "idx_entity_type_id", + "fields": [ + "entity_type", + "entity_id" + ], + "unique": true + }, + { + "name": "idx_sync_status", + "fields": [ + "sync_status" + ], + "unique": false + } + ], + "codes": [] +} \ No newline at end of file diff --git a/models/uapi_cache.json b/models/uapi_cache.json new file mode 100644 index 0000000..110ad83 --- /dev/null +++ b/models/uapi_cache.json @@ -0,0 +1,107 @@ +{ + "summary": [ + { + "name": "uapi_cache", + "title": "uapi定义缓存", + "primary": "id", + "classification": "cache" + } + ], + "fields": [ + { + "name": "id", + "type": "VARCHAR(32)", + "nullable": false, + "default": "", + "comment": "主键" + }, + { + "name": "upappid", + "type": "VARCHAR(32)", + "nullable": false, + "default": "", + "comment": "上游应用ID" + }, + { + "name": "apiname", + "type": "VARCHAR(128)", + "nullable": false, + "default": "", + "comment": "API名称" + }, + { + "name": "method", + "type": "VARCHAR(16)", + "nullable": true, + "default": "POST", + "comment": "HTTP方法" + }, + { + "name": "endpoint", + "type": "VARCHAR(512)", + "nullable": true, + "default": "", + "comment": "API端点" + }, + { + "name": "auth_type", + "type": "VARCHAR(32)", + "nullable": true, + "default": "bearer", + "comment": "认证类型" + }, + { + "name": "rate_limit", + "type": "INT", + "nullable": true, + "default": null, + "comment": "速率限制(次/分钟)" + }, + { + "name": "description", + "type": "TEXT", + "nullable": true, + "default": null, + "comment": "API描述" + }, + { + "name": "status", + "type": "VARCHAR(16)", + "nullable": false, + "default": "active", + "comment": "状态" + }, + { + "name": "sync_version", + "type": "VARCHAR(32)", + "nullable": true, + "default": "", + "comment": "同步版本号" + }, + { + "name": "cached_at", + "type": "DATETIME", + "nullable": false, + "default": "CURRENT_TIMESTAMP", + "comment": "缓存写入时间" + } + ], + "idxfields": [ + { + "name": "idx_upappid_apiname", + "fields": [ + "upappid", + "apiname" + ], + "unique": true + }, + { + "name": "idx_status", + "fields": [ + "status" + ], + "unique": false + } + ], + "codes": [] +} \ No newline at end of file diff --git a/models/users_cache.json b/models/users_cache.json new file mode 100644 index 0000000..4bd5c1b --- /dev/null +++ b/models/users_cache.json @@ -0,0 +1,113 @@ +{ + "summary": [ + { + "name": "users_cache", + "title": "用户数据缓存", + "primary": "id", + "classification": "cache" + } + ], + "fields": [ + { + "name": "id", + "type": "VARCHAR(32)", + "nullable": false, + "default": "", + "comment": "主键,对应 users 表 id" + }, + { + "name": "username", + "type": "VARCHAR(128)", + "nullable": false, + "default": "", + "comment": "用户名" + }, + { + "name": "orgid", + "type": "VARCHAR(32)", + "nullable": true, + "default": "", + "comment": "组织ID" + }, + { + "name": "orgname", + "type": "VARCHAR(255)", + "nullable": true, + "default": "", + "comment": "组织名称" + }, + { + "name": "email", + "type": "VARCHAR(128)", + "nullable": true, + "default": "", + "comment": "邮箱" + }, + { + "name": "phone", + "type": "VARCHAR(32)", + "nullable": true, + "default": "", + "comment": "手机号" + }, + { + "name": "status", + "type": "VARCHAR(16)", + "nullable": false, + "default": "active", + "comment": "状态: active/inactive/suspended" + }, + { + "name": "created_at", + "type": "DATETIME", + "nullable": true, + "default": null, + "comment": "创建时间" + }, + { + "name": "updated_at", + "type": "DATETIME", + "nullable": true, + "default": null, + "comment": "更新时间" + }, + { + "name": "sync_version", + "type": "VARCHAR(32)", + "nullable": true, + "default": "", + "comment": "同步版本号" + }, + { + "name": "cached_at", + "type": "DATETIME", + "nullable": false, + "default": "CURRENT_TIMESTAMP", + "comment": "缓存写入时间" + } + ], + "idxfields": [ + { + "name": "idx_username", + "fields": [ + "username" + ], + "unique": false + }, + { + "name": "idx_orgid", + "fields": [ + "orgid" + ], + "unique": false + }, + { + "name": "idx_sync_version", + "fields": [ + "sync_version" + ], + "unique": false + } + ], + "codes": [] +} \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..1527957 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,13 @@ +[build-system] +requires = ["setuptools>=61", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "sageapi" +version = "0.1.0" +description = "SageAPI - 独立 API 服务器模块,提供客户余额查询、记账、用户/定价查询等 RESTful 接口" +requires-python = ">=3.10" +dependencies = [] + +[tool.setuptools.packages.find] +include = ["sageapi*"] diff --git a/sageapi/__init__.py b/sageapi/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/sageapi/api/__init__.py b/sageapi/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/sageapi/api/accounting.py b/sageapi/api/accounting.py new file mode 100644 index 0000000..537149a --- /dev/null +++ b/sageapi/api/accounting.py @@ -0,0 +1,153 @@ +"""Accounting API handlers. + +Provides endpoints for creating and querying accounting records. +Writing goes directly to the accounting_records table; reads +are served from the same table with optional date range filtering. +""" + +from __future__ import annotations + +import json +import time +import uuid +from typing import Any + +from appPublic.log import debug, error +from sqlor.dbpools import DBPools + + +async def create_accounting_record( + customer_id: str, + amount: float, + record_type: str = 'charge', + description: str = '', + **extra: Any, +) -> str: + """Create a new accounting record. + + Args: + customer_id: The customer identifier. + amount: The accounting amount (positive for charges, negative for credits). + record_type: Type of record (charge, credit, adjustment, etc.). + description: Optional description of the transaction. + + Returns: + JSON string with success flag and the created record ID. + """ + result: dict[str, Any] = {'success': False, 'record_id': None} + + try: + from ahserver.serverenv import ServerEnv + env = ServerEnv() + dbname = env.get_module_dbname('sageapi') + + if not dbname: + result['error'] = 'No database configured for sageapi module' + return json.dumps(result, ensure_ascii=False, default=str) + + record_id = str(uuid.uuid4()) + now = time.strftime('%Y-%m-%d %H:%M:%S') + + sql = """ + INSERT INTO accounting_records + (id, customer_id, amount, record_type, description, created_at, extra) + VALUES + (${id}$, ${customer_id}$, ${amount}$, ${record_type}$, ${description}$, ${created_at}$, ${extra}$) + """ + + async with DBPools().sqlorContext(dbname) as sor: + await sor.sqlExe(sql, { + 'id': record_id, + 'customer_id': customer_id, + 'amount': amount, + 'record_type': record_type, + 'description': description, + 'created_at': now, + 'extra': json.dumps(extra, ensure_ascii=False) if extra else None, + }) + + result['success'] = True + result['record_id'] = record_id + debug(f'Accounting record created: id={record_id}, customer={customer_id}, amount={amount}') + + except Exception as e: + error(f'Accounting record creation failed: {e}') + result['error'] = str(e) + + return json.dumps(result, ensure_ascii=False, default=str) + + +async def query_accounting_records( + customer_id: str | None = None, + start_date: str | None = None, + end_date: str | None = None, + limit: int = 100, + offset: int = 0, +) -> str: + """Query accounting records with optional filters. + + Args: + customer_id: Filter by customer ID. + start_date: Filter records from this date (inclusive, YYYY-MM-DD). + end_date: Filter records up to this date (inclusive, YYYY-MM-DD). + limit: Maximum number of records to return. + offset: Number of records to skip. + + Returns: + JSON string with success flag and record data. + """ + result: dict[str, Any] = {'success': False, 'data': [], 'total': 0} + + try: + from ahserver.serverenv import ServerEnv + env = ServerEnv() + dbname = env.get_module_dbname('sageapi') + + if not dbname: + result['error'] = 'No database configured for sageapi module' + return json.dumps(result, ensure_ascii=False, default=str) + + conditions = [] + params: dict[str, Any] = {} + + if customer_id: + conditions.append('customer_id = ${customer_id}$') + params['customer_id'] = customer_id + if start_date: + conditions.append('created_at >= ${start_date}$') + params['start_date'] = start_date + if end_date: + conditions.append('created_at <= ${end_date}$') + params['end_date'] = end_date + + where_clause = 'WHERE ' + ' AND '.join(conditions) if conditions else '' + + # Count query + count_sql = f""" + SELECT COUNT(*) as cnt FROM accounting_records {where_clause} + """ + async with DBPools().sqlorContext(dbname) as sor: + count_rows = await sor.sqlExe(count_sql, params) + total = count_rows[0]['cnt'] if count_rows else 0 + result['total'] = total + + if total > 0: + data_sql = f""" + SELECT id, customer_id, amount, record_type, description, created_at, extra + FROM accounting_records + {where_clause} + ORDER BY created_at DESC + LIMIT ${limit}$ OFFSET ${offset}$ + """ + params['limit'] = limit + params['offset'] = offset + rows = await sor.sqlExe(data_sql, params) + result['data'] = [dict(r) for r in (rows or [])] + + result['success'] = True + + except Exception as e: + error(f'Accounting query failed: {e}') + result['error'] = str(e) + + return json.dumps(result, ensure_ascii=False, default=str) diff --git a/sageapi/api/balance.py b/sageapi/api/balance.py new file mode 100644 index 0000000..d541544 --- /dev/null +++ b/sageapi/api/balance.py @@ -0,0 +1,67 @@ +"""Customer balance query API handler. + +Provides the RESTful endpoint for querying customer account balances. +Reads from the local customer_balance cache table. +""" + +from __future__ import annotations + +import json +from typing import Any + +from appPublic.log import debug, error +from sqlor.dbpools import DBPools + + +async def get_customer_balance(customer_id: str | None = None) -> str: + """Query customer balance. + + Args: + customer_id: Optional customer ID filter. If not provided, + returns all customer balances. + + Returns: + JSON string with success flag and balance data. + """ + result: dict[str, Any] = {'success': False, 'data': [], 'total': 0} + + try: + from ahserver.serverenv import ServerEnv + env = ServerEnv() + dbname = env.get_module_dbname('sageapi') + + if not dbname: + result['error'] = 'No database configured for sageapi module' + return json.dumps(result, ensure_ascii=False, default=str) + + params: dict[str, Any] = {} + where_clause = '' + if customer_id: + where_clause = 'WHERE customer_id = ${customer_id}$' + params['customer_id'] = customer_id + + sql = f""" + SELECT customer_id, balance, currency, updated_at + FROM customer_balance + {where_clause} + ORDER BY customer_id + """ + + async with DBPools().sqlorContext(dbname) as sor: + data = await sor.sqlExe(sql, params) + if isinstance(data, dict): + result['total'] = data.get('total', 0) + result['data'] = [dict(r) for r in data.get('rows', [])] + else: + rows = [dict(r) for r in (data or [])] + result['data'] = rows + result['total'] = len(rows) + + result['success'] = True + debug(f'Balance query: returned {result["total"]} records') + + except Exception as e: + error(f'Balance query failed: {e}') + result['error'] = str(e) + + return json.dumps(result, ensure_ascii=False, default=str) diff --git a/sageapi/api/health.py b/sageapi/api/health.py new file mode 100644 index 0000000..1fed77b --- /dev/null +++ b/sageapi/api/health.py @@ -0,0 +1,60 @@ +"""Health check API handler. + +Provides a simple endpoint for load balancer health checks and +system status monitoring. No authentication required. +""" + +from __future__ import annotations + +import json +import time +from typing import Any + +from appPublic.log import debug +from sqlor.dbpools import DBPools + + +async def health_check() -> str: + """Health check endpoint. + + Returns system status including database connectivity, + cache stats, and uptime information. + + Returns: + JSON string with health status. + """ + result: dict[str, Any] = { + 'status': 'ok', + 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), + 'database': 'unknown', + 'cache': {}, + } + + # Check database connectivity + try: + from ahserver.serverenv import ServerEnv + env = ServerEnv() + dbname = env.get_module_dbname('sageapi') + + if dbname: + async with DBPools().sqlorContext(dbname) as sor: + await sor.sqlExe('SELECT 1') + result['database'] = 'connected' + else: + result['database'] = 'not_configured' + result['status'] = 'degraded' + + except Exception as e: + result['database'] = f'error: {str(e)}' + result['status'] = 'unhealthy' + + # Cache stats + try: + from ..cache.cache_manager import _get_cache_manager + cm = _get_cache_manager() + result['cache'] = cm.stats() + except Exception: + result['cache'] = {'error': 'cache not initialized'} + + debug(f'Health check: status={result["status"]}') + return json.dumps(result, ensure_ascii=False, default=str) diff --git a/sageapi/api/pricing.py b/sageapi/api/pricing.py new file mode 100644 index 0000000..7af1a9d --- /dev/null +++ b/sageapi/api/pricing.py @@ -0,0 +1,82 @@ +"""Pricing query API handler. + +Provides the RESTful endpoint for querying pricing information. +Reads from the local pricing_cache table synced from Sage. +""" + +from __future__ import annotations + +import json +from typing import Any + +from appPublic.log import debug, error +from sqlor.dbpools import DBPools + + +async def query_pricing( + program_id: str | None = None, + model: str | None = None, + limit: int = 200, + offset: int = 0, +) -> str: + """Query pricing information from the local cache. + + Args: + program_id: Filter by pricing program ID. + model: Filter by model name (partial match). + limit: Maximum number of records to return. + offset: Number of records to skip. + + Returns: + JSON string with success flag and pricing data. + """ + result: dict[str, Any] = {'success': False, 'data': [], 'total': 0} + + try: + from ahserver.serverenv import ServerEnv + env = ServerEnv() + dbname = env.get_module_dbname('sageapi') + + if not dbname: + result['error'] = 'No database configured for sageapi module' + return json.dumps(result, ensure_ascii=False, default=str) + + conditions = [] + params: dict[str, Any] = {'limit': limit, 'offset': offset} + + if program_id: + conditions.append('program_id = ${program_id}$') + params['program_id'] = program_id + if model: + conditions.append('model LIKE ${model}$') + params['model'] = f'%{model}%' + + where_clause = 'WHERE ' + ' AND '.join(conditions) if conditions else '' + + # Count query + count_sql = f'SELECT COUNT(*) as cnt FROM pricing_cache {where_clause}' + async with DBPools().sqlorContext(dbname) as sor: + count_rows = await sor.sqlExe(count_sql, params) + total = count_rows[0]['cnt'] if count_rows else 0 + result['total'] = total + + if total > 0: + data_sql = f""" + SELECT program_id, model, input_price, output_price, + unit, currency, updated_at + FROM pricing_cache + {where_clause} + ORDER BY program_id, model + LIMIT ${limit}$ OFFSET ${offset}$ + """ + rows = await sor.sqlExe(data_sql, params) + result['data'] = [dict(r) for r in (rows or [])] + + result['success'] = True + debug(f'Pricing query: returned {result["total"]} records') + + except Exception as e: + error(f'Pricing query failed: {e}') + result['error'] = str(e) + + return json.dumps(result, ensure_ascii=False, default=str) diff --git a/sageapi/api/users.py b/sageapi/api/users.py new file mode 100644 index 0000000..d0bd3b2 --- /dev/null +++ b/sageapi/api/users.py @@ -0,0 +1,83 @@ +"""User query API handler. + +Provides the RESTful endpoint for querying user information. +Reads from the local users_cache table synced from Sage. +""" + +from __future__ import annotations + +import json +from typing import Any + +from appPublic.log import debug, error +from sqlor.dbpools import DBPools + + +async def query_users( + user_id: str | None = None, + keyword: str | None = None, + limit: int = 100, + offset: int = 0, +) -> str: + """Query user information from the local cache. + + Args: + user_id: Filter by specific user ID. + keyword: Search keyword (matches username, email, or phone). + limit: Maximum number of records to return. + offset: Number of records to skip. + + Returns: + JSON string with success flag and user data. + """ + result: dict[str, Any] = {'success': False, 'data': [], 'total': 0} + + try: + from ahserver.serverenv import ServerEnv + env = ServerEnv() + dbname = env.get_module_dbname('sageapi') + + if not dbname: + result['error'] = 'No database configured for sageapi module' + return json.dumps(result, ensure_ascii=False, default=str) + + conditions = [] + params: dict[str, Any] = {'limit': limit, 'offset': offset} + + if user_id: + conditions.append('user_id = ${user_id}$') + params['user_id'] = user_id + if keyword: + conditions.append( + '(username LIKE ${keyword}$ OR email LIKE ${keyword}$ OR phone LIKE ${keyword}$)' + ) + params['keyword'] = f'%{keyword}%' + + where_clause = 'WHERE ' + ' AND '.join(conditions) if conditions else '' + + # Count query + count_sql = f'SELECT COUNT(*) as cnt FROM users_cache {where_clause}' + async with DBPools().sqlorContext(dbname) as sor: + count_rows = await sor.sqlExe(count_sql, params) + total = count_rows[0]['cnt'] if count_rows else 0 + result['total'] = total + + if total > 0: + data_sql = f""" + SELECT user_id, username, email, phone, status, updated_at + FROM users_cache + {where_clause} + ORDER BY user_id + LIMIT ${limit}$ OFFSET ${offset}$ + """ + rows = await sor.sqlExe(data_sql, params) + result['data'] = [dict(r) for r in (rows or [])] + + result['success'] = True + debug(f'User query: returned {result["total"]} records') + + except Exception as e: + error(f'User query failed: {e}') + result['error'] = str(e) + + return json.dumps(result, ensure_ascii=False, default=str) diff --git a/sageapi/auth/__init__.py b/sageapi/auth/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/sageapi/auth/dapi_auth.py b/sageapi/auth/dapi_auth.py new file mode 100644 index 0000000..1fa126e --- /dev/null +++ b/sageapi/auth/dapi_auth.py @@ -0,0 +1,97 @@ +"""DAPI authentication middleware for SageAPI. + +Validates incoming API requests using dapi key/secret authentication. +All public API endpoints (except health check) require valid dapi credentials. +""" + +from __future__ import annotations + +import hashlib +import hmac +import time +from typing import Any + +from appPublic.log import debug, error + + +def dapi_auth_middleware(request: Any) -> dict[str, Any]: + """Authenticate an incoming request via dapi credentials. + + Validates the dapi_key, timestamp, and HMAC signature from + request headers. Returns an auth context dict on success, + or raises an exception on failure. + + Args: + request: The HTTP request object from the server framework. + + Returns: + dict with keys: caller_id, api_key, authenticated_at + + Raises: + ValueError: If authentication fails (missing headers, invalid signature, etc.) + """ + headers = getattr(request, 'headers', {}) + + api_key = headers.get('X-DAPI-Key', '') + timestamp = headers.get('X-DAPI-Timestamp', '') + signature = headers.get('X-DAPI-Signature', '') + + if not api_key: + raise ValueError('Missing X-DAPI-Key header') + if not timestamp: + raise ValueError('Missing X-DAPI-Timestamp header') + if not signature: + raise ValueError('Missing X-DAPI-Signature header') + + # Validate timestamp window (5 minutes) + try: + ts = int(timestamp) + except (ValueError, TypeError): + raise ValueError('Invalid X-DAPI-Timestamp format') + + if abs(time.time() - ts) > 300: + raise ValueError('Request timestamp expired') + + # Look up the secret for this api_key + secret = _get_api_secret(api_key) + if not secret: + raise ValueError('Invalid DAPI key') + + # Verify HMAC signature + string_to_sign = f'{api_key}:{timestamp}' + expected = hmac.new( + secret.encode('utf-8'), + string_to_sign.encode('utf-8'), + hashlib.sha256, + ).hexdigest() + + if not hmac.compare_digest(signature, expected): + raise ValueError('Invalid DAPI signature') + + caller_id = _resolve_caller_id(api_key) + debug(f'DAPI auth passed: caller_id={caller_id}') + + return { + 'caller_id': caller_id, + 'api_key': api_key, + 'authenticated_at': ts, + } + + +def _get_api_secret(api_key: str) -> str | None: + """Retrieve the secret key for a given DAPI key. + + TODO: Replace with actual lookup from database or config. + Currently returns None — integrate with Sage's DAPI credential store. + """ + # Placeholder: integrate with upstream DAPI credential lookup + return None + + +def _resolve_caller_id(api_key: str) -> str: + """Resolve the caller ID for a given DAPI key. + + TODO: Replace with actual lookup from database or config. + """ + # Placeholder: integrate with upstream caller resolution + return api_key diff --git a/sageapi/auth/uapi_sign.py b/sageapi/auth/uapi_sign.py new file mode 100644 index 0000000..613a79d --- /dev/null +++ b/sageapi/auth/uapi_sign.py @@ -0,0 +1,98 @@ +"""UAPI signature verification for SageAPI. + +Verifies upstream API request signatures when SageAPI calls +internal Sage services via uapi. Ensures data integrity and +authenticity of inter-service communication. +""" + +from __future__ import annotations + +import hashlib +import hmac +import time +from typing import Any +from urllib.parse import urlencode + +from appPublic.log import debug, error + + +def uapi_sign_verify( + app_id: str, + api_key: str, + secret_key: str, + params: dict[str, Any], + signature: str, + timestamp: str | None = None, +) -> bool: + """Verify a uapi request signature. + + Computes the expected HMAC-SHA256 signature for the given parameters + and compares it with the provided signature. + + Args: + app_id: The uapi application ID. + api_key: The uapi API key. + secret_key: The uapi secret key for signing. + params: Request parameters that were signed. + signature: The signature to verify. + timestamp: Optional timestamp for replay protection. + + Returns: + True if the signature is valid, False otherwise. + """ + if timestamp: + try: + ts = int(timestamp) + except (ValueError, TypeError): + error(f'UAPI sign verify: invalid timestamp format: {timestamp}') + return False + if abs(time.time() - ts) > 300: + error(f'UAPI sign verify: timestamp expired: {timestamp}') + return False + + # Build the string to sign: sorted params + app_id + api_key + sorted_params = urlencode(sorted(params.items())) + string_to_sign = f'{sorted_params}&app_id={app_id}&api_key={api_key}' + + expected = hmac.new( + secret_key.encode('utf-8'), + string_to_sign.encode('utf-8'), + hashlib.sha256, + ).hexdigest() + + result = hmac.compare_digest(signature, expected) + if result: + debug(f'UAPI signature verified for app_id={app_id}') + else: + error(f'UAPI signature mismatch for app_id={app_id}') + + return result + + +def uapi_sign( + app_id: str, + api_key: str, + secret_key: str, + params: dict[str, Any], +) -> str: + """Generate a uapi request signature. + + Used when SageAPI makes outbound calls to Sage uapi endpoints. + + Args: + app_id: The uapi application ID. + api_key: The uapi API key. + secret_key: The uapi secret key. + params: Request parameters to sign. + + Returns: + HMAC-SHA256 hex digest signature string. + """ + sorted_params = urlencode(sorted(params.items())) + string_to_sign = f'{sorted_params}&app_id={app_id}&api_key={api_key}' + + return hmac.new( + secret_key.encode('utf-8'), + string_to_sign.encode('utf-8'), + hashlib.sha256, + ).hexdigest() diff --git a/sageapi/cache/__init__.py b/sageapi/cache/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/sageapi/cache/cache_manager.py b/sageapi/cache/cache_manager.py new file mode 100644 index 0000000..2a3366e --- /dev/null +++ b/sageapi/cache/cache_manager.py @@ -0,0 +1,143 @@ +"""Cache manager for SageAPI. + +Provides process-local LRU caching with TTL support for frequently +accessed data (pricing, user info, balance). Designed for multi-process +deployment — each SageAPI instance maintains its own cache. +""" + +from __future__ import annotations + +import time +from collections import OrderedDict +from threading import Lock +from typing import Any + +from appPublic.log import debug + + +class CacheManager: + """Thread-safe LRU cache with TTL support. + + Each SageAPI process instance gets its own cache. The cache + automatically evicts expired entries and respects the max + entry limit via LRU eviction. + """ + + def __init__(self, max_entries: int = 10000, default_ttl: int = 300) -> None: + self._max_entries = max_entries + self._default_ttl = default_ttl + self._store: OrderedDict[str, tuple[Any, float]] = OrderedDict() + self._lock = Lock() + + def get(self, key: str) -> Any | None: + """Get a cached value by key. + + Returns None if the key doesn't exist or the entry has expired. + On hit, moves the entry to the end (most recently used). + """ + with self._lock: + if key not in self._store: + return None + + value, expire_at = self._store[key] + if time.time() > expire_at: + del self._store[key] + debug(f'Cache: expired key={key}') + return None + + # Move to end (most recently used) + self._store.move_to_end(key) + return value + + def set(self, key: str, value: Any, ttl: int | None = None) -> None: + """Set a cache entry with optional TTL override. + + If the cache is full, evicts the least recently used entry. + """ + expire_at = time.time() + (ttl if ttl is not None else self._default_ttl) + + with self._lock: + if key in self._store: + # Update existing entry + self._store.move_to_end(key) + self._store[key] = (value, expire_at) + else: + # Evict LRU entry if at capacity + while len(self._store) >= self._max_entries: + evicted_key, _ = self._store.popitem(last=False) + debug(f'Cache: evicted key={evicted_key}') + + self._store[key] = (value, expire_at) + + def invalidate(self, key: str) -> bool: + """Remove a specific key from the cache. + + Returns True if the key existed and was removed. + """ + with self._lock: + if key in self._store: + del self._store[key] + return True + return False + + def clear(self) -> None: + """Clear all cached entries.""" + with self._lock: + self._store.clear() + debug('Cache: cleared all entries') + + @property + def size(self) -> int: + """Return the current number of cached entries.""" + return len(self._store) + + def stats(self) -> dict[str, Any]: + """Return cache statistics.""" + with self._lock: + now = time.time() + expired = sum(1 for _, exp in self._store.values() if now > exp) + return { + 'total_entries': len(self._store), + 'expired_entries': expired, + 'active_entries': len(self._store) - expired, + 'max_entries': self._max_entries, + 'default_ttl': self._default_ttl, + } + + # --- Named invalidation handlers for event binding --- + + @staticmethod + def invalidate_sync_state() -> None: + """Invalidate all sync-related cache entries. + + Called by the event dispatcher when sync_state table changes. + """ + cm = _get_cache_manager() + keys_to_remove = [k for k in cm._store if k.startswith('sync:')] + for k in keys_to_remove: + cm.invalidate(k) + debug(f'Cache: invalidated {len(keys_to_remove)} sync entries') + + @staticmethod + def invalidate_accounting() -> None: + """Invalidate all accounting-related cache entries. + + Called by the event dispatcher when accounting_records table changes. + """ + cm = _get_cache_manager() + keys_to_remove = [k for k in cm._store if k.startswith('accounting:')] + for k in keys_to_remove: + cm.invalidate(k) + debug(f'Cache: invalidated {len(keys_to_remove)} accounting entries') + + +# Module-level singleton +_cache_manager_instance: CacheManager | None = None + + +def _get_cache_manager() -> CacheManager: + """Get the module-level CacheManager singleton.""" + global _cache_manager_instance + if _cache_manager_instance is None: + _cache_manager_instance = CacheManager() + return _cache_manager_instance diff --git a/sageapi/config.py b/sageapi/config.py new file mode 100644 index 0000000..1ae5054 --- /dev/null +++ b/sageapi/config.py @@ -0,0 +1,152 @@ +"""SageAPI configuration management. + +Loads and validates runtime configuration from conf/config.yaml. +Provides typed access to upstream API endpoints, sync intervals, +cache TTLs, and database settings. +""" + +import os +from typing import Any + +import yaml + +# Default configuration values +_DEFAULT_CONFIG = { + 'server': { + 'host': '0.0.0.0', + 'port': 8080, + 'workers': 4, + }, + 'database': { + 'host': '127.0.0.1', + 'port': 3306, + 'dbname': 'sageapi_db', + 'user': 'root', + 'password': '', + 'pool_size': 10, + }, + 'upstream': { + 'sage_base_url': 'http://127.0.0.1:9180', + 'dapi_key': '', + 'dapi_secret': '', + 'timeout': 30, + }, + 'sync': { + 'interval_seconds': 60, + 'batch_size': 500, + 'max_retries': 3, + }, + 'cache': { + 'ttl_seconds': 300, + 'max_entries': 10000, + }, + 'logging': { + 'level': 'INFO', + 'format': '%(asctime)s [%(levelname)s] %(name)s: %(message)s', + }, +} + + +class Config: + """Immutable runtime configuration for SageAPI.""" + + def __init__(self, config_path: str | None = None) -> None: + self._config = dict(_DEFAULT_CONFIG) + if config_path: + self._load_file(config_path) + # Allow environment variable overrides + self._apply_env_overrides() + + def _load_file(self, path: str) -> None: + """Load configuration from a YAML file, merging into defaults.""" + if not os.path.exists(path): + return + with open(path, 'r', encoding='utf-8') as f: + file_config = yaml.safe_load(f) + if isinstance(file_config, dict): + self._deep_merge(self._config, file_config) + + def _apply_env_overrides(self) -> None: + """Apply environment variable overrides. + + Convention: SAGEAPI_
_ e.g. SAGEAPI_DATABASE_HOST + """ + prefix = 'SAGEAPI_' + for env_key, env_value in os.environ.items(): + if not env_key.startswith(prefix): + continue + parts = env_key[len(prefix):].lower().split('_', 1) + if len(parts) == 2: + section, key = parts + if section in self._config and isinstance(self._config[section], dict): + self._config[section][key] = env_value + + @staticmethod + def _deep_merge(base: dict[str, Any], override: dict[str, Any]) -> None: + """Recursively merge override into base in-place.""" + for key, value in override.items(): + if key in base and isinstance(base[key], dict) and isinstance(value, dict): + Config._deep_merge(base[key], value) + else: + base[key] = value + + def get(self, *keys: str, default: Any = None) -> Any: + """Get a nested configuration value by key path.""" + current: Any = self._config + for key in keys: + if isinstance(current, dict) and key in current: + current = current[key] + else: + return default + return current + + @property + def server(self) -> dict[str, Any]: + return self._config['server'] + + @property + def database(self) -> dict[str, Any]: + return self._config['database'] + + @property + def upstream(self) -> dict[str, Any]: + return self._config['upstream'] + + @property + def sync(self) -> dict[str, Any]: + return self._config['sync'] + + @property + def cache(self) -> dict[str, Any]: + return self._config['cache'] + + @property + def logging(self) -> dict[str, Any]: + return self._config['logging'] + + def to_dict(self) -> dict[str, Any]: + """Return the full configuration as a dictionary.""" + return dict(self._config) + + +# Module-level singleton, lazily initialized +_config_instance: Config | None = None + + +def get_config(config_path: str | None = None) -> Config: + """Get or create the module-level Config singleton.""" + global _config_instance + if _config_instance is None: + # Resolve default config path relative to this module + if config_path is None: + module_dir = os.path.dirname(os.path.abspath(__file__)) + project_root = os.path.dirname(module_dir) + config_path = os.path.join(project_root, 'conf', 'config.yaml') + _config_instance = Config(config_path) + return _config_instance + + +def reset_config() -> None: + """Reset the configuration singleton (useful for testing).""" + global _config_instance + _config_instance = None diff --git a/sageapi/init.py b/sageapi/init.py new file mode 100644 index 0000000..0c02353 --- /dev/null +++ b/sageapi/init.py @@ -0,0 +1,110 @@ +"""SageAPI module initialization. + +Registers all public functions to ServerEnv so they are accessible +from dspy scripts and other modules via the global environment. +""" + +from appPublic.log import debug +from sqlor.dbpools import DBPools +from ahserver.serverenv import ServerEnv + +# --------------------------------------------------------------------------- +# Auth +# --------------------------------------------------------------------------- +from .auth.dapi_auth import dapi_auth_middleware +from .auth.uapi_sign import uapi_sign_verify + +# --------------------------------------------------------------------------- +# Sync +# --------------------------------------------------------------------------- +from .sync.base_sync import BaseSync +from .sync.user_sync import sync_users +from .sync.pricing_sync import sync_pricing +from .sync.uapi_sync import sync_uapi +from .sync.llmage_sync import sync_llmage + +# --------------------------------------------------------------------------- +# Cache +# --------------------------------------------------------------------------- +from .cache.cache_manager import CacheManager + +# --------------------------------------------------------------------------- +# API +# --------------------------------------------------------------------------- +from .api.balance import get_customer_balance +from .api.accounting import create_accounting_record, query_accounting_records +from .api.users import query_users +from .api.pricing import query_pricing +from .api.health import health_check + +# --------------------------------------------------------------------------- +# Utils +# --------------------------------------------------------------------------- +from .utils.http_client import SageHttpClient +from .utils.crypto import encrypt_payload, decrypt_payload + + +def _bind_sageapi_events(dbpools: DBPools, dbname: str) -> None: + """Bind database events to SageAPI cache invalidation handlers. + + When sync state or accounting records change in the database, + the corresponding cache entries are invalidated automatically. + """ + bindings = [ + # sync_state table: clear sync-related caches on change + (f'{dbname}:sync_state:c:after', CacheManager.invalidate_sync_state), + (f'{dbname}:sync_state:u:after', CacheManager.invalidate_sync_state), + (f'{dbname}:sync_state:d:after', CacheManager.invalidate_sync_state), + # accounting_records: clear accounting cache on change + (f'{dbname}:accounting_records:c:after', CacheManager.invalidate_accounting), + (f'{dbname}:accounting_records:u:after', CacheManager.invalidate_accounting), + (f'{dbname}:accounting_records:d:after', CacheManager.invalidate_accounting), + ] + for event_name, handler in bindings: + dbpools.bind(event_name, handler) + debug(f'SageAPI event bound: {event_name}') + + +def load_sageapi() -> None: + """Register all SageAPI functions into ServerEnv. + + Called by the Sage server during module loading phase. + All registered functions become available as globals in dspy scripts. + """ + env = ServerEnv() + + # Auth + env.dapi_auth_middleware = dapi_auth_middleware + env.uapi_sign_verify = uapi_sign_verify + + # Sync + env.sync_users = sync_users + env.sync_pricing = sync_pricing + env.sync_uapi = sync_uapi + env.sync_llmage = sync_llmage + env.BaseSync = BaseSync + + # Cache + env.cache_manager = CacheManager() + + # API + env.get_customer_balance = get_customer_balance + env.create_accounting_record = create_accounting_record + env.query_accounting_records = query_accounting_records + env.query_users = query_users + env.query_pricing = query_pricing + env.health_check = health_check + + # Utils + env.SageHttpClient = SageHttpClient + env.encrypt_payload = encrypt_payload + env.decrypt_payload = decrypt_payload + + # Bind database events for automatic cache invalidation + dbpools = DBPools() + dbname = env.get_module_dbname('sageapi') + if dbname: + _bind_sageapi_events(dbpools, dbname) + debug(f'SageAPI event listeners bound for database: {dbname}') + else: + debug('SageAPI event listeners skipped: no database configured for sageapi module') diff --git a/sageapi/router.py b/sageapi/router.py new file mode 100644 index 0000000..b926481 --- /dev/null +++ b/sageapi/router.py @@ -0,0 +1,118 @@ +"""SageAPI route registration. + +Defines all RESTful API endpoints and maps them to handler functions. +Routes are registered during module initialization. + +Endpoint pattern: /api/v1// +""" + +from __future__ import annotations + +from typing import Any, Callable + + +class Router: + """Simple route registry for SageAPI endpoints.""" + + def __init__(self) -> None: + self._routes: list[dict[str, Any]] = [] + + def register( + self, + method: str, + path: str, + handler: Callable, + auth: str = 'dapi', + description: str = '', + ) -> None: + """Register an API route. + + Args: + method: HTTP method (GET, POST, PUT, DELETE). + path: URL path pattern, e.g. '/api/v1/balance'. + handler: Callable that handles the request. + auth: Authentication method ('dapi', 'uapi', 'none'). + description: Human-readable description of the endpoint. + """ + self._routes.append({ + 'method': method.upper(), + 'path': path, + 'handler': handler, + 'auth': auth, + 'description': description, + }) + + def get_routes(self) -> list[dict[str, Any]]: + """Return all registered routes.""" + return list(self._routes) + + def match(self, method: str, path: str) -> dict[str, Any] | None: + """Find a route matching the given method and path.""" + method = method.upper() + for route in self._routes: + if route['method'] == method and route['path'] == path: + return route + return None + + +# Global router instance +router = Router() + + +def register_routes() -> None: + """Register all SageAPI API routes. + + Called during module initialization to populate the router + with all available endpoints. + """ + from .api.health import health_check + from .api.balance import get_customer_balance + from .api.accounting import create_accounting_record, query_accounting_records + from .api.users import query_users + from .api.pricing import query_pricing + + # Health check (no auth required) + router.register( + 'GET', '/api/v1/health', + handler=health_check, + auth='none', + description='Health check endpoint', + ) + + # Customer balance + router.register( + 'GET', '/api/v1/balance', + handler=get_customer_balance, + auth='dapi', + description='Query customer balance', + ) + + # Accounting + router.register( + 'POST', '/api/v1/accounting', + handler=create_accounting_record, + auth='dapi', + description='Create an accounting record', + ) + router.register( + 'GET', '/api/v1/accounting', + handler=query_accounting_records, + auth='dapi', + description='Query accounting records', + ) + + # Users + router.register( + 'GET', '/api/v1/users', + handler=query_users, + auth='dapi', + description='Query user information', + ) + + # Pricing + router.register( + 'GET', '/api/v1/pricing', + handler=query_pricing, + auth='dapi', + description='Query pricing information', + ) diff --git a/sageapi/sync/__init__.py b/sageapi/sync/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/sageapi/sync/base_sync.py b/sageapi/sync/base_sync.py new file mode 100644 index 0000000..787c0b6 --- /dev/null +++ b/sageapi/sync/base_sync.py @@ -0,0 +1,125 @@ +"""Base synchronization class for SageAPI. + +Provides the foundation for all data sync workers. Handles common +concerns: checkpoint management, retry logic, batch processing, +and error reporting. +""" + +from __future__ import annotations + +import time +from abc import ABC, abstractmethod +from typing import Any + +from appPublic.log import debug, error, info + + +class BaseSync(ABC): + """Abstract base class for data synchronization workers. + + Each concrete sync subclass implements the data fetch and + persist logic for a specific upstream data source. + """ + + def __init__(self, sync_name: str, batch_size: int = 500) -> None: + self.sync_name = sync_name + self.batch_size = batch_size + self._last_checkpoint: dict[str, Any] = {} + + @abstractmethod + async def fetch_incremental(self, since_timestamp: str | None = None) -> list[dict[str, Any]]: + """Fetch incremental data from the upstream source. + + Args: + since_timestamp: Only fetch records modified after this timestamp. + None means full sync. + + Returns: + List of records to be persisted. + """ + ... + + @abstractmethod + async def persist(self, records: list[dict[str, Any]]) -> int: + """Persist fetched records to the local database. + + Args: + records: List of records to upsert. + + Returns: + Number of records successfully persisted. + """ + ... + + @abstractmethod + def get_latest_timestamp(self, records: list[dict[str, Any]]) -> str | None: + """Extract the latest modification timestamp from a batch of records. + + Used to advance the sync checkpoint after successful persist. + """ + ... + + async def _load_checkpoint(self) -> str | None: + """Load the last successful sync checkpoint timestamp. + + TODO: Implement checkpoint persistence (sync_state table). + """ + checkpoint = self._last_checkpoint.get(self.sync_name) + debug(f'Sync {self.sync_name}: loaded checkpoint = {checkpoint}') + return checkpoint + + async def _save_checkpoint(self, timestamp: str) -> None: + """Save the sync checkpoint after a successful run. + + TODO: Implement checkpoint persistence (sync_state table). + """ + self._last_checkpoint[self.sync_name] = timestamp + debug(f'Sync {self.sync_name}: saved checkpoint = {timestamp}') + + async def run(self) -> dict[str, Any]: + """Execute a full sync cycle. + + Returns: + dict with keys: success, records_fetched, records_persisted, + error (if any), duration_seconds + """ + start = time.time() + result: dict[str, Any] = { + 'sync_name': self.sync_name, + 'success': False, + 'records_fetched': 0, + 'records_persisted': 0, + 'error': None, + 'duration_seconds': 0.0, + } + + try: + checkpoint = await self._load_checkpoint() + info(f'Sync {self.sync_name}: starting (checkpoint={checkpoint})') + + records = await self.fetch_incremental(since_timestamp=checkpoint) + result['records_fetched'] = len(records) + + if records: + persisted = await self.persist(records) + result['records_persisted'] = persisted + + latest_ts = self.get_latest_timestamp(records) + if latest_ts: + await self._save_checkpoint(latest_ts) + + result['success'] = True + info( + f'Sync {self.sync_name}: completed — ' + f'fetched={result["records_fetched"]}, ' + f'persisted={result["records_persisted"]}' + ) + + except Exception as e: + error(f'Sync {self.sync_name}: failed with error: {e}') + result['error'] = str(e) + + finally: + result['duration_seconds'] = round(time.time() - start, 3) + + return result diff --git a/sageapi/sync/llmage_sync.py b/sageapi/sync/llmage_sync.py new file mode 100644 index 0000000..4deb9be --- /dev/null +++ b/sageapi/sync/llmage_sync.py @@ -0,0 +1,65 @@ +"""LLM image data synchronization for SageAPI. + +Syncs LLM catalog and provider data from the upstream Sage +llmage module into the local llmage_cache table. +""" + +from __future__ import annotations + +from typing import Any + +from appPublic.log import debug, info +from .base_sync import BaseSync + + +class LlmageSync(BaseSync): + """Incremental sync for llmage data from Sage upstream.""" + + def __init__(self, batch_size: int = 500) -> None: + super().__init__(sync_name='llmage', batch_size=batch_size) + + async def fetch_incremental(self, since_timestamp: str | None = None) -> list[dict[str, Any]]: + """Fetch llmage data updated since the last sync checkpoint. + + TODO: Implement upstream API call to Sage /api/llmage endpoint. + """ + debug(f'LlmageSync: fetching incremental data since {since_timestamp}') + # Placeholder: call upstream Sage API + return [] + + async def persist(self, records: list[dict[str, Any]]) -> int: + """Upsert llmage records into llmage_cache table. + + TODO: Implement database upsert logic. + """ + if not records: + return 0 + info(f'LlmageSync: persisting {len(records)} llmage records') + # Placeholder: upsert into llmage_cache + return len(records) + + def get_latest_timestamp(self, records: list[dict[str, Any]]) -> str | None: + """Extract the maximum updated_at from the record batch.""" + if not records: + return None + timestamps = [r.get('updated_at') for r in records if r.get('updated_at')] + return max(timestamps) if timestamps else None + + +_llmage_sync_instance: LlmageSync | None = None + + +def get_llmage_sync() -> LlmageSync: + """Get or create the LlmageSync singleton.""" + global _llmage_sync_instance + if _llmage_sync_instance is None: + _llmage_sync_instance = LlmageSync() + return _llmage_sync_instance + + +async def sync_llmage(since_timestamp: str | None = None) -> dict[str, Any]: + """Run a llmage data sync cycle.""" + syncer = get_llmage_sync() + if since_timestamp: + await syncer._save_checkpoint(since_timestamp) + return await syncer.run() diff --git a/sageapi/sync/pricing_sync.py b/sageapi/sync/pricing_sync.py new file mode 100644 index 0000000..61a9d13 --- /dev/null +++ b/sageapi/sync/pricing_sync.py @@ -0,0 +1,65 @@ +"""Pricing data synchronization for SageAPI. + +Syncs pricing program and timing data from the upstream Sage +pricing module into the local pricing_cache table. +""" + +from __future__ import annotations + +from typing import Any + +from appPublic.log import debug, info +from .base_sync import BaseSync + + +class PricingSync(BaseSync): + """Incremental sync for pricing data from Sage upstream.""" + + def __init__(self, batch_size: int = 500) -> None: + super().__init__(sync_name='pricing', batch_size=batch_size) + + async def fetch_incremental(self, since_timestamp: str | None = None) -> list[dict[str, Any]]: + """Fetch pricing data updated since the last sync checkpoint. + + TODO: Implement upstream API call to Sage /api/pricing endpoint. + """ + debug(f'PricingSync: fetching incremental data since {since_timestamp}') + # Placeholder: call upstream Sage API + return [] + + async def persist(self, records: list[dict[str, Any]]) -> int: + """Upsert pricing records into pricing_cache table. + + TODO: Implement database upsert logic. + """ + if not records: + return 0 + info(f'PricingSync: persisting {len(records)} pricing records') + # Placeholder: upsert into pricing_cache + return len(records) + + def get_latest_timestamp(self, records: list[dict[str, Any]]) -> str | None: + """Extract the maximum updated_at from the record batch.""" + if not records: + return None + timestamps = [r.get('updated_at') for r in records if r.get('updated_at')] + return max(timestamps) if timestamps else None + + +_pricing_sync_instance: PricingSync | None = None + + +def get_pricing_sync() -> PricingSync: + """Get or create the PricingSync singleton.""" + global _pricing_sync_instance + if _pricing_sync_instance is None: + _pricing_sync_instance = PricingSync() + return _pricing_sync_instance + + +async def sync_pricing(since_timestamp: str | None = None) -> dict[str, Any]: + """Run a pricing data sync cycle.""" + syncer = get_pricing_sync() + if since_timestamp: + await syncer._save_checkpoint(since_timestamp) + return await syncer.run() diff --git a/sageapi/sync/uapi_sync.py b/sageapi/sync/uapi_sync.py new file mode 100644 index 0000000..13c829f --- /dev/null +++ b/sageapi/sync/uapi_sync.py @@ -0,0 +1,65 @@ +"""UAPI data synchronization for SageAPI. + +Syncs uapi application and caller configuration from the upstream +Sage uapi module into the local uapi_cache table. +""" + +from __future__ import annotations + +from typing import Any + +from appPublic.log import debug, info +from .base_sync import BaseSync + + +class UAPISync(BaseSync): + """Incremental sync for uapi data from Sage upstream.""" + + def __init__(self, batch_size: int = 500) -> None: + super().__init__(sync_name='uapi', batch_size=batch_size) + + async def fetch_incremental(self, since_timestamp: str | None = None) -> list[dict[str, Any]]: + """Fetch uapi data updated since the last sync checkpoint. + + TODO: Implement upstream API call to Sage /api/uapi endpoint. + """ + debug(f'UAPISync: fetching incremental data since {since_timestamp}') + # Placeholder: call upstream Sage API + return [] + + async def persist(self, records: list[dict[str, Any]]) -> int: + """Upsert uapi records into uapi_cache table. + + TODO: Implement database upsert logic. + """ + if not records: + return 0 + info(f'UAPISync: persisting {len(records)} uapi records') + # Placeholder: upsert into uapi_cache + return len(records) + + def get_latest_timestamp(self, records: list[dict[str, Any]]) -> str | None: + """Extract the maximum updated_at from the record batch.""" + if not records: + return None + timestamps = [r.get('updated_at') for r in records if r.get('updated_at')] + return max(timestamps) if timestamps else None + + +_uapi_sync_instance: UAPISync | None = None + + +def get_uapi_sync() -> UAPISync: + """Get or create the UAPISync singleton.""" + global _uapi_sync_instance + if _uapi_sync_instance is None: + _uapi_sync_instance = UAPISync() + return _uapi_sync_instance + + +async def sync_uapi(since_timestamp: str | None = None) -> dict[str, Any]: + """Run a uapi data sync cycle.""" + syncer = get_uapi_sync() + if since_timestamp: + await syncer._save_checkpoint(since_timestamp) + return await syncer.run() diff --git a/sageapi/sync/user_sync.py b/sageapi/sync/user_sync.py new file mode 100644 index 0000000..f53e9f4 --- /dev/null +++ b/sageapi/sync/user_sync.py @@ -0,0 +1,76 @@ +"""User data synchronization for SageAPI. + +Syncs user data from the upstream Sage system into the local +users_cache table. Uses incremental sync based on updated_at +timestamp to minimize data transfer. +""" + +from __future__ import annotations + +from typing import Any + +from appPublic.log import debug, info +from .base_sync import BaseSync + + +class UserSync(BaseSync): + """Incremental sync for user data from Sage upstream.""" + + def __init__(self, batch_size: int = 500) -> None: + super().__init__(sync_name='users', batch_size=batch_size) + + async def fetch_incremental(self, since_timestamp: str | None = None) -> list[dict[str, Any]]: + """Fetch users updated since the last sync checkpoint. + + TODO: Implement upstream API call to Sage /api/users endpoint. + """ + debug(f'UserSync: fetching incremental data since {since_timestamp}') + # Placeholder: call upstream Sage API + # GET /api/users?updated_at_gt={since_timestamp}&limit={batch_size} + return [] + + async def persist(self, records: list[dict[str, Any]]) -> int: + """Upsert user records into users_cache table. + + TODO: Implement database upsert logic. + """ + if not records: + return 0 + info(f'UserSync: persisting {len(records)} user records') + # Placeholder: upsert into users_cache + return len(records) + + def get_latest_timestamp(self, records: list[dict[str, Any]]) -> str | None: + """Extract the maximum updated_at from the record batch.""" + if not records: + return None + timestamps = [r.get('updated_at') for r in records if r.get('updated_at')] + return max(timestamps) if timestamps else None + + +# Module-level singleton instance +_user_sync_instance: UserSync | None = None + + +def get_user_sync() -> UserSync: + """Get or create the UserSync singleton.""" + global _user_sync_instance + if _user_sync_instance is None: + _user_sync_instance = UserSync() + return _user_sync_instance + + +async def sync_users(since_timestamp: str | None = None) -> dict[str, Any]: + """Run a user data sync cycle. + + Args: + since_timestamp: Optional override for the checkpoint timestamp. + + Returns: + Sync result dict from BaseSync.run(). + """ + syncer = get_user_sync() + if since_timestamp: + # Override checkpoint for this run + await syncer._save_checkpoint(since_timestamp) + return await syncer.run() diff --git a/sageapi/utils/__init__.py b/sageapi/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/sageapi/utils/crypto.py b/sageapi/utils/crypto.py new file mode 100644 index 0000000..bfe35fb --- /dev/null +++ b/sageapi/utils/crypto.py @@ -0,0 +1,95 @@ +"""Cryptographic utility functions for SageAPI. + +Provides encryption/decryption helpers for sensitive data +(API keys, secrets, tokens) stored in the local database. +""" + +from __future__ import annotations + +import base64 +import hashlib +import hmac +from typing import Any + + +def encrypt_payload(data: str, key: str) -> str: + """Encrypt a string payload using HMAC-SHA256 keyed hash. + + This provides integrity protection. For confidentiality, + use a proper symmetric encryption scheme (AES-GCM). + + Args: + data: The plaintext data to encrypt/hash. + key: The secret key. + + Returns: + Base64-encoded HMAC hex digest. + """ + digest = hmac.new( + key.encode('utf-8'), + data.encode('utf-8'), + hashlib.sha256, + ).hexdigest() + return base64.b64encode(digest.encode('utf-8')).decode('utf-8') + + +def decrypt_payload(encoded: str, key: str, expected: str) -> bool: + """Verify that a payload matches its expected HMAC. + + Args: + encoded: The base64-encoded HMAC to verify. + key: The secret key used to compute the expected HMAC. + expected: The plaintext data whose HMAC we expect. + + Returns: + True if the HMAC matches, False otherwise. + """ + try: + stored_digest = base64.b64decode(encoded.encode('utf-8')).decode('utf-8') + except Exception: + return False + + expected_digest = hmac.new( + key.encode('utf-8'), + expected.encode('utf-8'), + hashlib.sha256, + ).hexdigest() + + return hmac.compare_digest(stored_digest, expected_digest) + + +def hash_password(password: str, salt: str = '') -> str: + """Hash a password using PBKDF2-SHA256. + + Args: + password: The plaintext password. + salt: Optional salt (auto-generated if empty). + + Returns: + Hex-encoded hash string. + """ + if not salt: + import os + salt = base64.b64encode(os.urandom(16)).decode('utf-8') + + dk = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt.encode('utf-8'), 100000) + return f'{salt}:${dk.hex()}' + + +def verify_password(password: str, stored: str) -> bool: + """Verify a password against a stored hash. + + Args: + password: The plaintext password to verify. + stored: The stored hash string (format: salt$hash). + + Returns: + True if the password matches. + """ + try: + salt, hash_value = stored.split('$', 1) + except ValueError: + return False + + dk = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt.encode('utf-8'), 100000) + return hmac.compare_digest(dk.hex(), hash_value) diff --git a/sageapi/utils/http_client.py b/sageapi/utils/http_client.py new file mode 100644 index 0000000..6773525 --- /dev/null +++ b/sageapi/utils/http_client.py @@ -0,0 +1,115 @@ +"""HTTP client for upstream Sage API calls. + +Provides a reusable async HTTP client with connection pooling, +retry logic, and automatic DAPI/UAPI authentication headers. +""" + +from __future__ import annotations + +import asyncio +from typing import Any + +from appPublic.log import debug, error + + +class SageHttpClient: + """Async HTTP client for calling upstream Sage APIs. + + Handles connection pooling, authentication headers, and + retry logic for transient failures. + """ + + def __init__( + self, + base_url: str = 'http://127.0.0.1:9180', + dapi_key: str = '', + dapi_secret: str = '', + timeout: float = 30.0, + max_retries: int = 3, + ) -> None: + self.base_url = base_url.rstrip('/') + self.dapi_key = dapi_key + self.dapi_secret = dapi_secret + self.timeout = timeout + self.max_retries = max_retries + + def _build_headers(self) -> dict[str, str]: + """Build request headers with DAPI authentication. + + TODO: Implement actual DAPI signature generation. + """ + import time + import hashlib + import hmac + + timestamp = str(int(time.time())) + string_to_sign = f'{self.dapi_key}:{timestamp}' + signature = hmac.new( + self.dapi_secret.encode('utf-8') if self.dapi_secret else b'', + string_to_sign.encode('utf-8'), + hashlib.sha256, + ).hexdigest() + + return { + 'Content-Type': 'application/json', + 'X-DAPI-Key': self.dapi_key, + 'X-DAPI-Timestamp': timestamp, + 'X-DAPI-Signature': signature, + } + + async def get( + self, + path: str, + params: dict[str, Any] | None = None, + headers: dict[str, str] | None = None, + ) -> Any: + """Send a GET request to the upstream Sage API. + + Args: + path: API path (relative to base_url). + params: Optional query parameters. + headers: Optional additional headers. + + Returns: + Parsed JSON response. + """ + url = f'{self.base_url}{path}' + request_headers = {**self._build_headers(), **(headers or {})} + + debug(f'HTTP GET {url} params={params}') + + # TODO: Replace with actual async HTTP implementation + # using aiohttp or the framework's built-in HTTP client. + # This is a placeholder that will be filled in once the + # specific HTTP library choice is confirmed. + raise NotImplementedError( + 'SageHttpClient.get: HTTP library not yet integrated. ' + 'Implement with aiohttp or framework HTTP client.' + ) + + async def post( + self, + path: str, + data: dict[str, Any] | None = None, + headers: dict[str, str] | None = None, + ) -> Any: + """Send a POST request to the upstream Sage API. + + Args: + path: API path (relative to base_url). + data: JSON body data. + headers: Optional additional headers. + + Returns: + Parsed JSON response. + """ + url = f'{self.base_url}{path}' + request_headers = {**self._build_headers(), **(headers or {})} + + debug(f'HTTP POST {url}') + + # TODO: Replace with actual async HTTP implementation. + raise NotImplementedError( + 'SageHttpClient.post: HTTP library not yet integrated. ' + 'Implement with aiohttp or framework HTTP client.' + )