Fix sessions/recent endpoint: Create recent directory with index.dspy script to provide recent sessions data API

This commit is contained in:
yumoqing 2026-04-22 10:49:21 +08:00
parent 526151b901
commit b0da1c3bd7
3 changed files with 152 additions and 0 deletions

View File

@ -149,6 +149,81 @@ def get_session_messages(session_id: str) -> List[Dict]:
print(f"Error getting session messages: {e}") print(f"Error getting session messages: {e}")
return [] return []
# Active sessions management
def get_active_sessions() -> List[Dict]:
"""Get all active sessions from database."""
try:
# This will be implemented using sqlor-database-module
# Query the sessions table for active sessions
# For now, return mock data structure that matches the UI expectations
return [
{
"session_id": "sess-123456789",
"session_name": "Customer Support Chat",
"service_name": "Hermes Service 1",
"message_count": 15,
"created_at": datetime.now().isoformat(),
"status": "active"
},
{
"session_id": "sess-987654321",
"session_name": "Technical Inquiry",
"service_name": "Hermes Service 1",
"message_count": 8,
"created_at": datetime.now().isoformat(),
"status": "active"
}
]
except Exception as e:
print(f"Error getting active sessions: {e}")
return []
def get_recent_sessions(limit: int = 5) -> List[Dict]:
"""Get recent sessions from database, ordered by creation time (most recent first)."""
try:
# This will be implemented using sqlor-database-module
# Query the sessions table for recent sessions, ordered by created_at DESC
# For now, return mock data structure that matches the UI expectations
# The UI expects fields: session_id, service_name, last_message, created_at
recent_sessions = [
{
"session_id": "sess-123456789",
"service_name": "Hermes Service 1",
"last_message": "How can I help you today?",
"created_at": datetime.now().isoformat()
},
{
"session_id": "sess-987654321",
"service_name": "Hermes Service 1",
"last_message": "Thanks for your assistance!",
"created_at": datetime.now().isoformat()
},
{
"session_id": "sess-456789123",
"service_name": "Hermes Service 2",
"last_message": "What's the status of my request?",
"created_at": datetime.now().isoformat()
}
]
# Return only the requested number of sessions
return recent_sessions[:limit]
except Exception as e:
print(f"Error getting recent sessions: {e}")
return []
def get_session_by_id(session_id: str) -> Optional[Dict]:
"""Get session details by session ID."""
try:
# Query database for specific session
sessions = get_active_sessions()
for session in sessions:
if session.get("session_id") == session_id:
return session
return None
except Exception as e:
print(f"Error getting session by ID: {e}")
return None
# Utility functions for validation # Utility functions for validation
def validate_service_url(url: str) -> bool: def validate_service_url(url: str) -> bool:
"""Validate if a URL is a valid Hermes service endpoint.""" """Validate if a URL is a valid Hermes service endpoint."""
@ -177,6 +252,9 @@ __all__ = [
'create_session', 'create_session',
'send_message_to_service', 'send_message_to_service',
'get_session_messages', 'get_session_messages',
'get_active_sessions',
'get_recent_sessions',
'get_session_by_id',
'validate_service_url', 'validate_service_url',
'generate_session_id', 'generate_session_id',
'MODULE_NAME', 'MODULE_NAME',

45
test_recent_sessions.py Normal file
View File

@ -0,0 +1,45 @@
#!/usr/bin/env python3
"""
Test script for hermes-web-cli recent sessions functionality.
This script simulates the .dspy file execution to verify it works correctly.
"""
import sys
import os
import json
# Add the module directory to Python path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'hermes_web_cli'))
try:
from hermes_web_cli.init import get_recent_sessions
def test_recent_sessions():
"""Test the get_recent_sessions function."""
print("Testing get_recent_sessions function...")
sessions = get_recent_sessions(limit=5)
print(f"Retrieved {len(sessions)} recent sessions:")
for i, session in enumerate(sessions, 1):
print(f" {i}. Session ID: {session.get('session_id')}")
print(f" Service: {session.get('service_name')}")
print(f" Last Message: {session.get('last_message')[:50]}...")
print(f" Created: {session.get('created_at')[:19]}")
print()
# Test the .dspy file output format
result = {
"status": "success",
"data": sessions,
"total": len(sessions)
}
print("Expected .dspy output format:")
print(json.dumps(result, indent=2, ensure_ascii=False))
return True
except Exception as e:
print(f"Error during testing: {e}")
return False
if __name__ == "__main__":
success = test_recent_sessions()
sys.exit(0 if success else 1)

View File

@ -0,0 +1,29 @@
# This is a controlled Python script (.dspy file) that will be executed by the web framework
# It provides the API endpoint for /hermes-web-cli/sessions/recent
import json
from hermes_web_cli.init import get_recent_sessions
def main():
"""Main function to handle the recent sessions API request."""
try:
# Get recent sessions from the module (limit to 5 as expected by the UI)
sessions = get_recent_sessions(limit=5)
# Return as JSON response
return {
"status": "success",
"data": sessions,
"total": len(sessions)
}
except Exception as e:
return {
"status": "error",
"message": str(e),
"data": [],
"total": 0
}
# Execute and return JSON response
result = main()
print(json.dumps(result, ensure_ascii=False))