diff --git a/harnessed_agent/core.py b/harnessed_agent/core.py index 6d0f42e..ce7c20b 100644 --- a/harnessed_agent/core.py +++ b/harnessed_agent/core.py @@ -358,53 +358,44 @@ class HermesAgent: try: async with self.db.sqlorContext('default') as sor: - # First, get high priority memories (priority >= high_priority_threshold) - high_priority_filters = {'user_id': user_id, 'priority__gte': self.config.high_priority_threshold} - # Fallback to sqlExe for ordering - sql = "SELECT * FROM hermes_memory WHERE user_id = :user_id AND priority >= :priority ORDER BY priority DESC, last_accessed DESC" - high_priority_memories = await sor.sqlExe(sql, {'user_id': user_id, 'priority': self.config.high_priority_threshold}) + # High priority memories (priority >= high_priority_threshold), sorted by priority DESC, last_accessed DESC + hp_rows = await sor.R('hermes_memory', {'user_id': user_id, 'priority__gte': self.config.high_priority_threshold}, ns={'sort': 'priority desc,last_accessed desc'}) + high_priority_memories = hp_rows or [] - # Add high priority memories first (they're always included if within token limit) for memory in high_priority_memories: memory_tokens = self._estimate_tokens(memory['content']) if current_tokens + memory_tokens <= max_tokens: selected_memories.append(memory) current_tokens += memory_tokens - # Update access statistics - await self._update_memory_access_stats(memory['id']) + await self._update_memory_access_stats(user_id, memory['id']) else: break - # If we have remaining tokens, add medium priority memories remaining_tokens = max_tokens - current_tokens if remaining_tokens > 0: - sql_medium = "SELECT * FROM hermes_memory WHERE user_id = :user_id AND priority >= :low_p AND priority < :high_p ORDER BY last_accessed DESC, priority DESC" - medium_priority_memories = await sor.sqlExe(sql_medium, { - 'user_id': user_id, 'low_p': self.config.low_priority_threshold, - 'high_p': self.config.high_priority_threshold - }) + mp_rows = await sor.R('hermes_memory', {'user_id': user_id, 'priority__gte': self.config.low_priority_threshold, 'priority__lt': self.config.high_priority_threshold}, ns={'sort': 'last_accessed desc,priority desc'}) + medium_priority_memories = mp_rows or [] for memory in medium_priority_memories: memory_tokens = self._estimate_tokens(memory['content']) if current_tokens + memory_tokens <= max_tokens: selected_memories.append(memory) current_tokens += memory_tokens - await self._update_memory_access_stats(memory['id']) + await self._update_memory_access_stats(user_id, memory['id']) else: break - # Finally, if still tokens available, add low priority recent memories remaining_tokens = max_tokens - current_tokens if remaining_tokens > 0: - sql_low = "SELECT * FROM hermes_memory WHERE user_id = :user_id AND priority < :low_p ORDER BY last_accessed DESC" - low_priority_memories = await sor.sqlExe(sql_low, {'user_id': user_id, 'low_p': self.config.low_priority_threshold}) + lp_rows = await sor.R('hermes_memory', {'user_id': user_id, 'priority__lt': self.config.low_priority_threshold}, ns={'sort': 'last_accessed desc'}) + low_priority_memories = lp_rows or [] for memory in low_priority_memories: memory_tokens = self._estimate_tokens(memory['content']) if current_tokens + memory_tokens <= max_tokens: selected_memories.append(memory) current_tokens += memory_tokens - await self._update_memory_access_stats(memory['id']) + await self._update_memory_access_stats(user_id, memory['id']) else: break @@ -414,23 +405,22 @@ class HermesAgent: # Return empty list on error to avoid breaking the main flow return [] - async def _update_memory_access_stats(self, memory_id: str): + async def _update_memory_access_stats(self, user_id: str, memory_id: str): """Update memory access statistics""" try: async with self.db.sqlorContext('default') as sor: - # Get current access count - memories = await sor.R('hermes_memory', {'id': memory_id}) + memories = await sor.R('hermes_memory', {'id': memory_id, 'user_id': user_id}) if memories: current_count = memories[0].get('access_count', 0) update_data = { 'id': memory_id, + 'user_id': user_id, 'access_count': current_count + 1, 'last_accessed': datetime.now(), 'updated_at': datetime.now() } await sor.U('hermes_memory', update_data) except Exception: - # Silently ignore stats update errors pass async def _cleanup_old_memories(self, user_id: str): @@ -665,8 +655,8 @@ class HermesAgent: {'tags': {'$like': f'%{query}%'}} ] - sql_sess = f"SELECT * FROM hermes_sessions WHERE user_id = :user_id ORDER BY started_at DESC LIMIT {limit}" - sessions = await sor.sqlExe(sql_sess, {'user_id': user_id}) + sessions = await sor.R('hermes_sessions', filters, ns={'sort': 'started_at desc'}) + sessions = (sessions or [])[:limit] return { "success": True, "sessions": sessions, @@ -872,7 +862,8 @@ class HermesAgent: filters['enabled'] = kwargs['enabled'] sql_skills = "SELECT * FROM harnessed_remote_skills WHERE user_id = :user_id ORDER BY name ASC" - skills = await sor.sqlExe(sql_skills, {'user_id': user_id}) + skills = await sor.R('harnessed_remote_skills', filters, ns={'sort': 'name asc'}) + skills = skills or [] return {"success": True, "skills": skills, "user_id": user_id} elif action == "deploy": @@ -1187,9 +1178,8 @@ class HermesAgent: user_id = self._get_current_user_id(context) if context else "anonymous" try: async with self.db.sqlorContext('default') as sor: - sql_wf = "SELECT * FROM hermes_workflows WHERE user_id = :user_id ORDER BY created_at DESC" - workflows = await sor.sqlExe(sql_wf, {'user_id': user_id}) - return {"success": True, "workflows": workflows, "user_id": user_id} + workflows = await sor.R('hermes_workflows', {'user_id': user_id}, ns={'sort': 'created_at desc'}) + return {"success": True, "workflows": workflows or [], "user_id": user_id} except Exception as e: return {"success": False, "error": str(e), "user_id": user_id} @@ -1204,8 +1194,9 @@ class HermesAgent: if workflow_id: filters['workflow_id'] = workflow_id - sql_exec = f"SELECT * FROM hermes_executions WHERE user_id = :user_id ORDER BY created_at DESC LIMIT {limit} OFFSET {offset}" - executions = await sor.sqlExe(sql_exec, {'user_id': user_id}) + executions = await sor.R('hermes_executions', filters, ns={'sort': 'created_at desc'}) + executions = executions or [] + executions = executions[offset:offset + limit] return {"success": True, "executions": executions, "user_id": user_id} except Exception as e: return {"success": False, "error": str(e), "user_id": user_id} diff --git a/harnessed_agent/llm_client.py b/harnessed_agent/llm_client.py index 4a538c6..487d4c6 100644 --- a/harnessed_agent/llm_client.py +++ b/harnessed_agent/llm_client.py @@ -76,27 +76,39 @@ LLM_PROVIDERS = { # ============================================================ async def _get_llm_config() -> Dict[str, Any]: - """Get LLM client configuration from harnessed_agent_config table. - - Uses 'default' database only, as LLM config is global and shared across modules. - Do NOT use get_module_dbname() which returns the calling module's DB (e.g. crm_db), - which may have stale or incorrect config data. - """ + """Get LLM client configuration from harnessed_agent_config table for current user.""" dbnames_to_try = ['default'] + env = None + user_id = None + try: + env = ServerEnv() + user_id = getattr(env, 'user_id', None) or getattr(env, 'userid', None) + info(f"[llm_config] user_id={repr(user_id)}") + module_db = env.get_module_dbname('harnessed_agent') + if module_db and module_db not in dbnames_to_try: + dbnames_to_try.insert(0, module_db) + except Exception as e: + error(f"[llm_config] Exception: {type(e).__name__}: {e}") + + info(f"[llm_config] dbnames_to_try={dbnames_to_try}") for dbname in dbnames_to_try: try: async with DBPools().sqlorContext(dbname) as sor: - sql = "SELECT * FROM harnessed_agent_config ORDER BY updated_at DESC LIMIT 1" - rows = await sor.sqlExe(sql, {}) + where = {} + if user_id: + where['user_id'] = user_id + rows = await sor.R('harnessed_agent_config', where if where else None, ns={'sort': 'updated_at desc'}) + info(f"[llm_config] DB='{dbname}' sor.R returned {len(rows) if rows else 0} rows") if rows: - info(f"Loaded LLM config from DB '{dbname}'") - return rows[0] + row = rows[0] + info(f"[llm_config] DB='{dbname}' config: llm_provider={repr(row.get('llm_provider'))}, default_model={repr(row.get('default_model'))}, llm_service_url={repr(row.get('llm_service_url'))}") + return row else: - warning(f"No rows in harnessed_agent_config in DB '{dbname}'") + warning(f"No rows in harnessed_agent_config in DB '{dbname}' for user_id={repr(user_id)}") except Exception as e: - error(f"Failed to fetch LLM config from DB '{dbname}': {e}") - + error(f"Failed to fetch LLM config from DB '{dbname}': {type(e).__name__}: {e}") + error("LLM config not found in any database") return {} diff --git a/harnessed_agent/orchestrator.py b/harnessed_agent/orchestrator.py index f816c89..4c9cabf 100644 --- a/harnessed_agent/orchestrator.py +++ b/harnessed_agent/orchestrator.py @@ -192,7 +192,7 @@ class HermesOrchestrator: tasks = await sor.R('hermes_tasks', { 'workflow_id': workflow_id, 'user_id': user_id - }, orderby='order_index ASC') + }, ns={'sort': 'order_index asc'}) # Convert to TaskDefinition objects task_definitions = [] @@ -412,7 +412,7 @@ class HermesOrchestrator: result = await self._execute_single_task(task, user_id, context) # Record successful execution - await self._record_execution_end(execution_id, "completed", result, None, attempt) + await self._record_execution_end(execution_id, user_id, "completed", result, None, attempt) return result except Exception as e: @@ -421,7 +421,7 @@ class HermesOrchestrator: continue else: # Record failed execution - await self._record_execution_end(execution_id, "failed", None, last_error, attempt) + await self._record_execution_end(execution_id, user_id, "failed", None, last_error, attempt) return {"success": False, "error": last_error, "task_id": task.id, "attempts": attempt + 1} # This should never be reached @@ -486,7 +486,7 @@ class HermesOrchestrator: # Silently ignore recording errors pass - async def _record_execution_end(self, execution_id: str, status: str, + async def _record_execution_end(self, execution_id: str, user_id: str, status: str, result: Dict[str, Any], error: str, retry_count: int): """Record execution end in database""" try: @@ -494,6 +494,7 @@ class HermesOrchestrator: end_time = datetime.now() data = { 'id': execution_id, + 'user_id': user_id, 'execution_status': status, 'end_time': end_time, 'duration_seconds': None, # Will be calculated @@ -503,7 +504,7 @@ class HermesOrchestrator: 'updated_at': end_time } # Get start time to calculate duration - executions = await sor.R('hermes_executions', {'id': execution_id}) + executions = await sor.R('hermes_executions', {'id': execution_id, 'user_id': user_id}) if executions and executions[0].get('start_time'): start_time = executions[0]['start_time'] if isinstance(start_time, str):