sageapi/sageapi/sync/pricing_sync.py
Hermes Agent 5c65c78752 feat: sageapi initial scaffold
- 36 files: module structure following module-development-spec
- 7 table definitions: users_cache, pricing_cache, llmage_cache, uapi_cache, sync_state, customer_balance, accounting_records
- Auth: dapi_auth + uapi_sign
- Sync: base_sync + entity-specific sync modules (users/pricing/uapi/llmage)
- Cache: LRU cache manager with TTL
- API: balance, accounting, users, pricing, health handlers
- Config: YAML config loader with env overrides
- Utils: HTTP client, crypto helpers
2026-05-20 17:53:53 +08:00

66 lines
2.1 KiB
Python

"""Pricing data synchronization for SageAPI.
Syncs pricing program and timing data from the upstream Sage
pricing module into the local pricing_cache table.
"""
from __future__ import annotations
from typing import Any
from appPublic.log import debug, info
from .base_sync import BaseSync
class PricingSync(BaseSync):
"""Incremental sync for pricing data from Sage upstream."""
def __init__(self, batch_size: int = 500) -> None:
super().__init__(sync_name='pricing', batch_size=batch_size)
async def fetch_incremental(self, since_timestamp: str | None = None) -> list[dict[str, Any]]:
"""Fetch pricing data updated since the last sync checkpoint.
TODO: Implement upstream API call to Sage /api/pricing endpoint.
"""
debug(f'PricingSync: fetching incremental data since {since_timestamp}')
# Placeholder: call upstream Sage API
return []
async def persist(self, records: list[dict[str, Any]]) -> int:
"""Upsert pricing records into pricing_cache table.
TODO: Implement database upsert logic.
"""
if not records:
return 0
info(f'PricingSync: persisting {len(records)} pricing records')
# Placeholder: upsert into pricing_cache
return len(records)
def get_latest_timestamp(self, records: list[dict[str, Any]]) -> str | None:
"""Extract the maximum updated_at from the record batch."""
if not records:
return None
timestamps = [r.get('updated_at') for r in records if r.get('updated_at')]
return max(timestamps) if timestamps else None
_pricing_sync_instance: PricingSync | None = None
def get_pricing_sync() -> PricingSync:
"""Get or create the PricingSync singleton."""
global _pricing_sync_instance
if _pricing_sync_instance is None:
_pricing_sync_instance = PricingSync()
return _pricing_sync_instance
async def sync_pricing(since_timestamp: str | None = None) -> dict[str, Any]:
"""Run a pricing data sync cycle."""
syncer = get_pricing_sync()
if since_timestamp:
await syncer._save_checkpoint(since_timestamp)
return await syncer.run()