fix: multi-user isolation and sqlor-compatible queries

- Replace all sor.sqlExe(ORDER BY/LIMIT) with sor.R(ns={sort:...}) pattern
- Add user_id filter to all database queries for multi-user isolation
- Replace invalid sor.R(orderby=...) with ns={sort:...}
- _get_llm_config: use sor.R with user_id filter and ns sort
- core.py: fix _get_intelligent_memory_context, search_sessions,
  list_workflows, list_executions, manage_remote_skills (list)
- core.py: _update_memory_access_stats now requires user_id param
- orchestrator.py: fix _load_workflow_definition ns sort,
  _record_execution_end requires user_id param
- Add debug logging for config lookup (user_id, dbname, row data)
This commit is contained in:
yumoqing 2026-05-07 16:39:07 +08:00
parent 8e10a24d55
commit a88c964666
3 changed files with 53 additions and 49 deletions

View File

@ -358,53 +358,44 @@ class HermesAgent:
try:
async with self.db.sqlorContext('default') as sor:
# First, get high priority memories (priority >= high_priority_threshold)
high_priority_filters = {'user_id': user_id, 'priority__gte': self.config.high_priority_threshold}
# Fallback to sqlExe for ordering
sql = "SELECT * FROM hermes_memory WHERE user_id = :user_id AND priority >= :priority ORDER BY priority DESC, last_accessed DESC"
high_priority_memories = await sor.sqlExe(sql, {'user_id': user_id, 'priority': self.config.high_priority_threshold})
# High priority memories (priority >= high_priority_threshold), sorted by priority DESC, last_accessed DESC
hp_rows = await sor.R('hermes_memory', {'user_id': user_id, 'priority__gte': self.config.high_priority_threshold}, ns={'sort': 'priority desc,last_accessed desc'})
high_priority_memories = hp_rows or []
# Add high priority memories first (they're always included if within token limit)
for memory in high_priority_memories:
memory_tokens = self._estimate_tokens(memory['content'])
if current_tokens + memory_tokens <= max_tokens:
selected_memories.append(memory)
current_tokens += memory_tokens
# Update access statistics
await self._update_memory_access_stats(memory['id'])
await self._update_memory_access_stats(user_id, memory['id'])
else:
break
# If we have remaining tokens, add medium priority memories
remaining_tokens = max_tokens - current_tokens
if remaining_tokens > 0:
sql_medium = "SELECT * FROM hermes_memory WHERE user_id = :user_id AND priority >= :low_p AND priority < :high_p ORDER BY last_accessed DESC, priority DESC"
medium_priority_memories = await sor.sqlExe(sql_medium, {
'user_id': user_id, 'low_p': self.config.low_priority_threshold,
'high_p': self.config.high_priority_threshold
})
mp_rows = await sor.R('hermes_memory', {'user_id': user_id, 'priority__gte': self.config.low_priority_threshold, 'priority__lt': self.config.high_priority_threshold}, ns={'sort': 'last_accessed desc,priority desc'})
medium_priority_memories = mp_rows or []
for memory in medium_priority_memories:
memory_tokens = self._estimate_tokens(memory['content'])
if current_tokens + memory_tokens <= max_tokens:
selected_memories.append(memory)
current_tokens += memory_tokens
await self._update_memory_access_stats(memory['id'])
await self._update_memory_access_stats(user_id, memory['id'])
else:
break
# Finally, if still tokens available, add low priority recent memories
remaining_tokens = max_tokens - current_tokens
if remaining_tokens > 0:
sql_low = "SELECT * FROM hermes_memory WHERE user_id = :user_id AND priority < :low_p ORDER BY last_accessed DESC"
low_priority_memories = await sor.sqlExe(sql_low, {'user_id': user_id, 'low_p': self.config.low_priority_threshold})
lp_rows = await sor.R('hermes_memory', {'user_id': user_id, 'priority__lt': self.config.low_priority_threshold}, ns={'sort': 'last_accessed desc'})
low_priority_memories = lp_rows or []
for memory in low_priority_memories:
memory_tokens = self._estimate_tokens(memory['content'])
if current_tokens + memory_tokens <= max_tokens:
selected_memories.append(memory)
current_tokens += memory_tokens
await self._update_memory_access_stats(memory['id'])
await self._update_memory_access_stats(user_id, memory['id'])
else:
break
@ -414,23 +405,22 @@ class HermesAgent:
# Return empty list on error to avoid breaking the main flow
return []
async def _update_memory_access_stats(self, memory_id: str):
async def _update_memory_access_stats(self, user_id: str, memory_id: str):
"""Update memory access statistics"""
try:
async with self.db.sqlorContext('default') as sor:
# Get current access count
memories = await sor.R('hermes_memory', {'id': memory_id})
memories = await sor.R('hermes_memory', {'id': memory_id, 'user_id': user_id})
if memories:
current_count = memories[0].get('access_count', 0)
update_data = {
'id': memory_id,
'user_id': user_id,
'access_count': current_count + 1,
'last_accessed': datetime.now(),
'updated_at': datetime.now()
}
await sor.U('hermes_memory', update_data)
except Exception:
# Silently ignore stats update errors
pass
async def _cleanup_old_memories(self, user_id: str):
@ -665,8 +655,8 @@ class HermesAgent:
{'tags': {'$like': f'%{query}%'}}
]
sql_sess = f"SELECT * FROM hermes_sessions WHERE user_id = :user_id ORDER BY started_at DESC LIMIT {limit}"
sessions = await sor.sqlExe(sql_sess, {'user_id': user_id})
sessions = await sor.R('hermes_sessions', filters, ns={'sort': 'started_at desc'})
sessions = (sessions or [])[:limit]
return {
"success": True,
"sessions": sessions,
@ -872,7 +862,8 @@ class HermesAgent:
filters['enabled'] = kwargs['enabled']
sql_skills = "SELECT * FROM harnessed_remote_skills WHERE user_id = :user_id ORDER BY name ASC"
skills = await sor.sqlExe(sql_skills, {'user_id': user_id})
skills = await sor.R('harnessed_remote_skills', filters, ns={'sort': 'name asc'})
skills = skills or []
return {"success": True, "skills": skills, "user_id": user_id}
elif action == "deploy":
@ -1187,9 +1178,8 @@ class HermesAgent:
user_id = self._get_current_user_id(context) if context else "anonymous"
try:
async with self.db.sqlorContext('default') as sor:
sql_wf = "SELECT * FROM hermes_workflows WHERE user_id = :user_id ORDER BY created_at DESC"
workflows = await sor.sqlExe(sql_wf, {'user_id': user_id})
return {"success": True, "workflows": workflows, "user_id": user_id}
workflows = await sor.R('hermes_workflows', {'user_id': user_id}, ns={'sort': 'created_at desc'})
return {"success": True, "workflows": workflows or [], "user_id": user_id}
except Exception as e:
return {"success": False, "error": str(e), "user_id": user_id}
@ -1204,8 +1194,9 @@ class HermesAgent:
if workflow_id:
filters['workflow_id'] = workflow_id
sql_exec = f"SELECT * FROM hermes_executions WHERE user_id = :user_id ORDER BY created_at DESC LIMIT {limit} OFFSET {offset}"
executions = await sor.sqlExe(sql_exec, {'user_id': user_id})
executions = await sor.R('hermes_executions', filters, ns={'sort': 'created_at desc'})
executions = executions or []
executions = executions[offset:offset + limit]
return {"success": True, "executions": executions, "user_id": user_id}
except Exception as e:
return {"success": False, "error": str(e), "user_id": user_id}

View File

@ -76,27 +76,39 @@ LLM_PROVIDERS = {
# ============================================================
async def _get_llm_config() -> Dict[str, Any]:
"""Get LLM client configuration from harnessed_agent_config table.
Uses 'default' database only, as LLM config is global and shared across modules.
Do NOT use get_module_dbname() which returns the calling module's DB (e.g. crm_db),
which may have stale or incorrect config data.
"""
"""Get LLM client configuration from harnessed_agent_config table for current user."""
dbnames_to_try = ['default']
env = None
user_id = None
try:
env = ServerEnv()
user_id = getattr(env, 'user_id', None) or getattr(env, 'userid', None)
info(f"[llm_config] user_id={repr(user_id)}")
module_db = env.get_module_dbname('harnessed_agent')
if module_db and module_db not in dbnames_to_try:
dbnames_to_try.insert(0, module_db)
except Exception as e:
error(f"[llm_config] Exception: {type(e).__name__}: {e}")
info(f"[llm_config] dbnames_to_try={dbnames_to_try}")
for dbname in dbnames_to_try:
try:
async with DBPools().sqlorContext(dbname) as sor:
sql = "SELECT * FROM harnessed_agent_config ORDER BY updated_at DESC LIMIT 1"
rows = await sor.sqlExe(sql, {})
where = {}
if user_id:
where['user_id'] = user_id
rows = await sor.R('harnessed_agent_config', where if where else None, ns={'sort': 'updated_at desc'})
info(f"[llm_config] DB='{dbname}' sor.R returned {len(rows) if rows else 0} rows")
if rows:
info(f"Loaded LLM config from DB '{dbname}'")
return rows[0]
row = rows[0]
info(f"[llm_config] DB='{dbname}' config: llm_provider={repr(row.get('llm_provider'))}, default_model={repr(row.get('default_model'))}, llm_service_url={repr(row.get('llm_service_url'))}")
return row
else:
warning(f"No rows in harnessed_agent_config in DB '{dbname}'")
warning(f"No rows in harnessed_agent_config in DB '{dbname}' for user_id={repr(user_id)}")
except Exception as e:
error(f"Failed to fetch LLM config from DB '{dbname}': {e}")
error(f"Failed to fetch LLM config from DB '{dbname}': {type(e).__name__}: {e}")
error("LLM config not found in any database")
return {}

View File

@ -192,7 +192,7 @@ class HermesOrchestrator:
tasks = await sor.R('hermes_tasks', {
'workflow_id': workflow_id,
'user_id': user_id
}, orderby='order_index ASC')
}, ns={'sort': 'order_index asc'})
# Convert to TaskDefinition objects
task_definitions = []
@ -412,7 +412,7 @@ class HermesOrchestrator:
result = await self._execute_single_task(task, user_id, context)
# Record successful execution
await self._record_execution_end(execution_id, "completed", result, None, attempt)
await self._record_execution_end(execution_id, user_id, "completed", result, None, attempt)
return result
except Exception as e:
@ -421,7 +421,7 @@ class HermesOrchestrator:
continue
else:
# Record failed execution
await self._record_execution_end(execution_id, "failed", None, last_error, attempt)
await self._record_execution_end(execution_id, user_id, "failed", None, last_error, attempt)
return {"success": False, "error": last_error, "task_id": task.id, "attempts": attempt + 1}
# This should never be reached
@ -486,7 +486,7 @@ class HermesOrchestrator:
# Silently ignore recording errors
pass
async def _record_execution_end(self, execution_id: str, status: str,
async def _record_execution_end(self, execution_id: str, user_id: str, status: str,
result: Dict[str, Any], error: str, retry_count: int):
"""Record execution end in database"""
try:
@ -494,6 +494,7 @@ class HermesOrchestrator:
end_time = datetime.now()
data = {
'id': execution_id,
'user_id': user_id,
'execution_status': status,
'end_time': end_time,
'duration_seconds': None, # Will be calculated
@ -503,7 +504,7 @@ class HermesOrchestrator:
'updated_at': end_time
}
# Get start time to calculate duration
executions = await sor.R('hermes_executions', {'id': execution_id})
executions = await sor.R('hermes_executions', {'id': execution_id, 'user_id': user_id})
if executions and executions[0].get('start_time'):
start_time = executions[0]['start_time']
if isinstance(start_time, str):