- CLIP embedding (9086) + Milvus VDB (8886) + NetworkX graph (9092) - BGE-Reranker (9090) for result reranking - Hybrid retrieval: vector search + graph expansion + RRF fusion - API: /api/ingest, /api/search, /api/pipelines, /api/plugins, /api/status - Two pipelines: kg-rag-standard (full) and kg-rag-lite (vector only) - Tested E2E: ingest + search with rerank_score=0.99
127 lines
4.4 KiB
Python
127 lines
4.4 KiB
Python
# -*- coding:utf-8 -*-
|
|
"""RAG Pipeline API endpoints."""
|
|
from traceback import format_exc
|
|
from ahserver.serverenv import ServerEnv
|
|
from appPublic.registerfunction import RegisterFunction
|
|
from appPublic.log import exception
|
|
import json
|
|
|
|
|
|
async def status_handler(request, params_kw, *args, **kwargs):
|
|
import sys, os
|
|
sys.path.insert(0, os.getcwd())
|
|
from plugins.registry import list_pipelines, list_plugins
|
|
|
|
pipelines = list_pipelines()
|
|
plugins = list_plugins()
|
|
|
|
# Check service availability
|
|
services = {}
|
|
for cap, plugin_list in plugins.items():
|
|
for p in plugin_list:
|
|
if p.get("status") == "available":
|
|
services[cap] = p.get("type", "unknown")
|
|
|
|
return json.dumps({
|
|
"service": "rag-pipeline",
|
|
"port": 9093,
|
|
"active_services": services,
|
|
"pipelines": [p["name"] for p in pipelines],
|
|
"endpoints": ["/api/status", "/api/ingest", "/api/search", "/api/pipelines", "/api/plugins"]
|
|
}, indent=2, ensure_ascii=False)
|
|
|
|
|
|
async def ingest_handler(request, params_kw, *args, **kwargs):
|
|
import sys, os
|
|
sys.path.insert(0, os.getcwd())
|
|
from pipeline import ingest
|
|
|
|
try:
|
|
document = params_kw.get("document", "")
|
|
if not document:
|
|
return json.dumps({"error": "document text required"})
|
|
|
|
pipeline_name = params_kw.get("pipeline", "kg-rag-standard")
|
|
collection = params_kw.get("collection", "knowledge")
|
|
graph_name = params_kw.get("graph_name", "knowledge")
|
|
|
|
# Optional LLM for extraction (not used in basic mode)
|
|
# llm_func = params_kw.get("llm_func") # Can't pass functions via HTTP
|
|
|
|
result = ingest(document, pipeline_name, collection, graph_name, llm_func=None)
|
|
return json.dumps(result, ensure_ascii=False)
|
|
|
|
except Exception as e:
|
|
exception(f"{e}, {format_exc()}")
|
|
return json.dumps({"error": str(e)})
|
|
|
|
|
|
async def search_handler(request, params_kw, *args, **kwargs):
|
|
import sys, os
|
|
sys.path.insert(0, os.getcwd())
|
|
from pipeline import search
|
|
|
|
try:
|
|
query = params_kw.get("query", "")
|
|
if not query:
|
|
return json.dumps({"error": "query text required"})
|
|
|
|
pipeline_name = params_kw.get("pipeline", "kg-rag-standard")
|
|
collection = params_kw.get("collection", "knowledge")
|
|
graph_name = params_kw.get("graph_name", "knowledge")
|
|
top_k = int(params_kw.get("top_k", 5))
|
|
|
|
result = search(query, pipeline_name, collection, graph_name, top_k, llm_func=None)
|
|
return json.dumps(result, ensure_ascii=False)
|
|
|
|
except Exception as e:
|
|
exception(f"{e}, {format_exc()}")
|
|
return json.dumps({"error": str(e)})
|
|
|
|
|
|
async def pipelines_handler(request, params_kw, *args, **kwargs):
|
|
import sys, os
|
|
sys.path.insert(0, os.getcwd())
|
|
from plugins.registry import list_pipelines, get_pipeline, save_pipeline
|
|
|
|
method = request.method
|
|
|
|
if method == "POST":
|
|
# Create/update pipeline
|
|
config = params_kw
|
|
if "name" not in config:
|
|
return json.dumps({"error": "pipeline config with 'name' required"})
|
|
result = save_pipeline(config)
|
|
return json.dumps({"status": "SUCCEEDED", **result}, ensure_ascii=False)
|
|
else:
|
|
# List pipelines
|
|
name = params_kw.get("name")
|
|
if name:
|
|
pipeline = get_pipeline(name)
|
|
if pipeline:
|
|
return json.dumps({"status": "SUCCEEDED", "pipeline": pipeline}, indent=2, ensure_ascii=False)
|
|
return json.dumps({"error": f"Pipeline '{name}' not found"})
|
|
|
|
pipelines = list_pipelines()
|
|
return json.dumps({"status": "SUCCEEDED", "pipelines": pipelines}, indent=2, ensure_ascii=False)
|
|
|
|
|
|
async def plugins_handler(request, params_kw, *args, **kwargs):
|
|
import sys, os
|
|
sys.path.insert(0, os.getcwd())
|
|
from plugins.registry import list_plugins
|
|
|
|
plugins = list_plugins()
|
|
return json.dumps({"status": "SUCCEEDED", "plugins": plugins}, indent=2, ensure_ascii=False)
|
|
|
|
|
|
def load_rag_pipeline():
|
|
"""Register API handlers"""
|
|
env = ServerEnv()
|
|
rf = RegisterFunction()
|
|
rf.register("status", status_handler)
|
|
rf.register("ingest", ingest_handler)
|
|
rf.register("search", search_handler)
|
|
rf.register("pipelines", pipelines_handler)
|
|
rf.register("plugins", plugins_handler)
|