""" LLM Client for Hermes Agent - Calls supplier LLM APIs via OpenAI-compatible interface. This module provides a client-side implementation for calling external LLM providers (OpenAI, DashScope, DeepSeek, etc.) through their OpenAI-compatible /v1/chat/completions API. Usage: # From .dspy files: result = await llm_chat(messages=[{"role": "user", "content": "Hello"}]) # With explicit config: result = await llm_chat( messages=[{"role": "user", "content": "Hello"}], model="qwen3-max", temperature=0.7, stream=False ) # Stream mode: async for chunk in llm_chat_stream(messages=[...]): print(chunk.get("delta", "")) """ import json import time import asyncio from typing import Dict, Any, List, Optional, AsyncGenerator from datetime import datetime try: from aiohttp import ClientSession, ClientTimeout, ClientError except ImportError: ClientError = Exception try: from ahserver.serverenv import ServerEnv from sqlor.dbpools import DBPools from appPublic.log import info, debug, warning, error, exception except ImportError: class ServerEnv: pass class DBPools: pass def info(*a, **kw): print(*a) def debug(*a, **kw): pass def warning(*a, **kw): print(*a) def error(*a, **kw): print(*a) def exception(*a, **kw): pass # ============================================================ # LLM Provider configurations # ============================================================ LLM_PROVIDERS = { 'openai': { 'url': 'https://api.openai.com/v1', 'model_default': 'gpt-4o', }, 'dashscope': { 'url': 'https://dashscope.aliyuncs.com/compatible-mode/v1', 'model_default': 'qwen-plus', }, 'deepseek': { 'url': 'https://api.deepseek.com/v1', 'model_default': 'deepseek-chat', }, 'siliconflow': { 'url': 'https://api.siliconflow.cn/v1', 'model_default': 'Qwen/Qwen2.5-72B-Instruct', }, } # ============================================================ # Config retrieval # ============================================================ async def _get_llm_config() -> Dict[str, Any]: """Get LLM client configuration from harnessed_agent_config table.""" dbnames_to_try = ['default'] try: env = ServerEnv() module_db = env.get_module_dbname('harnessed_agent') if module_db not in dbnames_to_try: dbnames_to_try.insert(0, module_db) except Exception: pass for dbname in dbnames_to_try: try: async with DBPools().sqlorContext(dbname) as sor: rows = await sor.R('harnessed_agent_config', {}, orderby='updated_at DESC', limit=1) if rows: info(f"Loaded LLM config from DB '{dbname}'") return rows[0] else: warning(f"No rows in harnessed_agent_config in DB '{dbname}'") except Exception as e: error(f"Failed to fetch LLM config from DB '{dbname}': {e}") error("LLM config not found in any database") return {} def _resolve_provider(config: Dict[str, Any]) -> Dict[str, str]: """Resolve base URL and model from config, with provider presets.""" provider = (config.get('llm_provider') or '').lower() service_url = config.get('llm_service_url', '') api_key = config.get('llm_api_key', '') model = config.get('default_model', '') # If provider name is set, use preset URL if provider and provider in LLM_PROVIDERS: preset = LLM_PROVIDERS[provider] if not service_url: service_url = preset['url'] if not model: model = preset['model_default'] return { 'service_url': service_url.rstrip('/'), 'api_key': api_key, 'model': model, 'provider': provider, } # ============================================================ # Core LLM client # ============================================================ async def _post_chat_completions( service_url: str, api_key: str, model: str, messages: List[Dict[str, str]], temperature: float = 0.7, max_tokens: Optional[int] = None, stream: bool = False, top_p: float = 1.0, extra: Optional[Dict] = None, ) -> Dict[str, Any]: """ Call OpenAI-compatible /v1/chat/completions endpoint. Returns dict with OpenAI response format or error dict. """ if not service_url: return {'error': {'message': 'llm_service_url not configured', 'type': 'configuration_error'}} if not api_key: return {'error': {'message': 'llm_api_key not configured', 'type': 'configuration_error'}} url = f"{service_url}/chat/completions" headers = { 'Content-Type': 'application/json', 'Authorization': f'Bearer {api_key}', } body = { 'model': model, 'messages': messages, 'temperature': temperature, 'top_p': top_p, 'stream': stream, } if max_tokens is not None: body['max_tokens'] = max_tokens if extra: for key in ('stop', 'presence_penalty', 'frequency_penalty', 'tools', 'tool_choice', 'response_format', 'seed'): if key in extra and extra[key] is not None: body[key] = extra[key] max_retries = 3 base_delay = 2 for attempt in range(max_retries): try: async with ClientSession() as session: async with session.post(url, headers=headers, json=body, timeout=ClientTimeout(total=300)) as resp: if resp.status == 429: retry_after = int(resp.headers.get('Retry-After', base_delay * (2 ** attempt))) if attempt < max_retries - 1: warning(f"LLM rate limited (429), retrying in {retry_after}s (attempt {attempt+1}/{max_retries})") await asyncio.sleep(retry_after) continue return {'error': {'message': 'Rate limited by LLM provider', 'type': 'rate_limit_error', 'code': 429}} if resp.status == 500 and attempt < max_retries - 1: delay = base_delay * (2 ** attempt) warning(f"LLM server error (500), retrying in {delay}s (attempt {attempt+1}/{max_retries})") await asyncio.sleep(delay) continue if resp.status != 200: err_text = await resp.text() return { 'error': { 'message': f'LLM API error: HTTP {resp.status}', 'type': 'api_error', 'code': resp.status, 'detail': err_text[:500], } } return await resp.json() except asyncio.TimeoutError: if attempt < max_retries - 1: delay = base_delay * (2 ** attempt) warning(f"LLM API timeout, retrying in {delay}s (attempt {attempt+1}/{max_retries})") await asyncio.sleep(delay) continue return {'error': {'message': 'LLM API request timed out', 'type': 'timeout_error', 'code': 504}} except ClientError as e: if attempt < max_retries - 1: delay = base_delay * (2 ** attempt) warning(f"LLM connection error: {e}, retrying in {delay}s") await asyncio.sleep(delay) continue return {'error': {'message': f'LLM connection failed: {str(e)}', 'type': 'connection_error', 'code': 502}} return {'error': {'message': 'LLM API failed after all retries', 'type': 'server_error', 'code': 500}} async def _stream_chat_completions( service_url: str, api_key: str, model: str, messages: List[Dict[str, str]], temperature: float = 0.7, max_tokens: Optional[int] = None, top_p: float = 1.0, extra: Optional[Dict] = None, ) -> AsyncGenerator[Dict[str, Any], None]: """ Stream LLM response via SSE. Yields parsed chunk dicts. Each yielded dict contains: - delta: str (the text chunk) - finish_reason: str or None - raw: dict (the raw SSE chunk data) """ if not service_url: yield {'delta': '', 'finish_reason': 'error', 'raw': {'error': 'llm_service_url not configured'}} return if not api_key: yield {'delta': '', 'finish_reason': 'error', 'raw': {'error': 'llm_api_key not configured'}} return url = f"{service_url}/chat/completions" headers = { 'Content-Type': 'application/json', 'Authorization': f'Bearer {api_key}', } body = { 'model': model, 'messages': messages, 'temperature': temperature, 'top_p': top_p, 'stream': True, } if max_tokens is not None: body['max_tokens'] = max_tokens if extra: for key in ('stop', 'presence_penalty', 'frequency_penalty', 'tools', 'tool_choice', 'response_format', 'seed'): if key in extra and extra[key] is not None: body[key] = extra[key] async with ClientSession() as session: async with session.post(url, headers=headers, json=body, timeout=ClientTimeout(total=600)) as resp: if resp.status != 200: err_text = await resp.text() yield { 'delta': '', 'finish_reason': 'error', 'raw': {'error': f'HTTP {resp.status}', 'detail': err_text[:300]}, } return async for line in resp.content: line = line.decode('utf-8').strip() if not line or not line.startswith('data:'): continue data_str = line[5:].strip() if data_str == '[DONE]': break try: chunk = json.loads(data_str) choices = chunk.get('choices', []) if choices: choice = choices[0] delta = choice.get('delta', {}) text = delta.get('content', '') or '' finish_reason = choice.get('finish_reason') yield { 'delta': text, 'finish_reason': finish_reason, 'raw': chunk, } except json.JSONDecodeError: continue # ============================================================ # Public API functions (registered to ServerEnv) # ============================================================ async def llm_chat( messages: List[Dict[str, str]], model: str = None, temperature: float = None, max_tokens: int = None, stream: bool = False, top_p: float = None, **extra, ) -> Dict[str, Any]: """ Call LLM provider and get chat completion. This is the primary function for AI calls within harnessed_agent. Reads provider config from harnessed_agent_config table automatically. Args: messages: List of {role, content} dicts (OpenAI format) model: Override model name (uses config default_model if not set) temperature: Override temperature (uses config default_temperature if not set) max_tokens: Max response tokens stream: If True, uses streaming top_p: Override top_p **extra: Other OpenAI-compatible params (stop, tools, etc.) Returns: Dict matching OpenAI chat completion response, or error dict. """ config = await _get_llm_config() provider = _resolve_provider(config) resolved_model = model or provider['model'] resolved_temp = temperature if temperature is not None else config.get('default_temperature', 0.7) resolved_top_p = top_p if top_p is not None else config.get('top_p', 1.0) start_time = time.time() info(f"LLM chat: model={resolved_model}, temp={resolved_temp}, messages={len(messages)}") result = await _post_chat_completions( service_url=provider['service_url'], api_key=provider['api_key'], model=resolved_model, messages=messages, temperature=resolved_temp, max_tokens=max_tokens, stream=False, top_p=resolved_top_p, extra=extra if extra else None, ) elapsed = time.time() - start_time if 'error' in result: error(f"LLM chat error: model={resolved_model}, elapsed={elapsed:.2f}s, error={result['error'].get('message')}") else: usage = result.get('usage', {}) info(f"LLM chat done: model={resolved_model}, elapsed={elapsed:.2f}s, prompt_tokens={usage.get('prompt_tokens')}, completion_tokens={usage.get('completion_tokens')}") return result async def llm_chat_stream( messages: List[Dict[str, str]], model: str = None, temperature: float = None, max_tokens: int = None, top_p: float = None, **extra, ) -> AsyncGenerator[Dict[str, Any], None]: """ Stream LLM chat response. Yields dicts with {delta: str, finish_reason: str|None, raw: dict}. Example: async for chunk in llm_chat_stream(messages=[{"role": "user", "content": "Hello"}]): print(chunk['delta'], end='', flush=True) """ config = await _get_llm_config() provider = _resolve_provider(config) resolved_model = model or provider['model'] resolved_temp = temperature if temperature is not None else config.get('default_temperature', 0.7) resolved_top_p = top_p if top_p is not None else config.get('top_p', 1.0) info(f"LLM chat stream: model={resolved_model}, temp={resolved_temp}, messages={len(messages)}") async for chunk in _stream_chat_completions( service_url=provider['service_url'], api_key=provider['api_key'], model=resolved_model, messages=messages, temperature=resolved_temp, max_tokens=max_tokens, top_p=resolved_top_p, extra=extra if extra else None, ): yield chunk async def llm_list_models() -> Dict[str, Any]: """List models available from the configured LLM provider.""" config = await _get_llm_config() provider = _resolve_provider(config) if not provider['service_url'] or not provider['api_key']: return {'error': 'LLM not configured'} # Try to call /v1/models endpoint url = f"{provider['service_url']}/models" headers = { 'Content-Type': 'application/json', 'Authorization': f'Bearer {provider["api_key"]}', } try: async with ClientSession() as session: async with session.get(url, headers=headers, timeout=ClientTimeout(total=30)) as resp: if resp.status == 200: return await resp.json() else: return {'error': f'HTTP {resp.status}'} except Exception as e: return {'error': str(e)} async def llm_simple(prompt: str, system: str = None, **kwargs) -> str: """ Simplified LLM call: returns just the response text. Args: prompt: User message text system: Optional system prompt **kwargs: Passed to llm_chat Returns: Response content string, or error message. """ messages = [] if system: messages.append({'role': 'system', 'content': system}) messages.append({'role': 'user', 'content': prompt}) result = await llm_chat(messages=messages, **kwargs) if 'error' in result: return f"Error: {result['error'].get('message', 'Unknown error')}" choices = result.get('choices', []) if choices: return choices[0].get('message', {}).get('content', '') return '' async def llm_get_config() -> Dict[str, Any]: """Get current LLM client configuration (with api_key masked).""" config = await _get_llm_config() provider = _resolve_provider(config) return { 'provider': provider['provider'], 'service_url': provider['service_url'], 'api_key': '***' + provider['api_key'][-4:] if provider['api_key'] else '(not set)', 'default_model': provider['model'], 'default_temperature': config.get('default_temperature', 0.7), }