From 4aaeb4203578de5fed4cbcf5e5e21c9568007749 Mon Sep 17 00:00:00 2001 From: yumoqing Date: Mon, 15 Jun 2026 20:42:33 +0800 Subject: [PATCH] feat: initial rag-pipeline service - pluggable RAG with KG support - CLIP embedding (9086) + Milvus VDB (8886) + NetworkX graph (9092) - BGE-Reranker (9090) for result reranking - Hybrid retrieval: vector search + graph expansion + RRF fusion - API: /api/ingest, /api/search, /api/pipelines, /api/plugins, /api/status - Two pipelines: kg-rag-standard (full) and kg-rag-lite (vector only) - Tested E2E: ingest + search with rerank_score=0.99 --- .gitignore | 9 ++ README.md | 127 +++++++++++++++++++++ ah.py | 6 + build.sh | 55 +++++++++ conf/config.json | 28 +++++ core/__init__.py | 0 core/chunker.py | 73 ++++++++++++ core/extractor.py | 79 +++++++++++++ core/retriever.py | 163 ++++++++++++++++++++++++++ init.py | 126 +++++++++++++++++++++ pipeline.py | 270 ++++++++++++++++++++++++++++++++++++++++++++ plugins/__init__.py | 0 plugins/registry.py | 129 +++++++++++++++++++++ workers/__init__.py | 0 14 files changed, 1065 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 ah.py create mode 100755 build.sh create mode 100644 conf/config.json create mode 100644 core/__init__.py create mode 100644 core/chunker.py create mode 100644 core/extractor.py create mode 100644 core/retriever.py create mode 100644 init.py create mode 100644 pipeline.py create mode 100644 plugins/__init__.py create mode 100644 plugins/registry.py create mode 100644 workers/__init__.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..38b930f --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +ah.pid +nohup.out +__pycache__/ +*.pyc +*__pycache__/ +logs/ +files/ +data/ +pipelines/*.json diff --git a/README.md b/README.md new file mode 100644 index 0000000..0c20f9a --- /dev/null +++ b/README.md @@ -0,0 +1,127 @@ +# RAG Pipeline Service + +可插拔 RAG 编排服务,支持知识图谱增强的多模态检索。 + +## 架构 + +``` + ┌─────────────────┐ + │ RAG Pipeline │ + │ (9093 CPU) │ + └────────┬────────┘ + │ + ┌──────────────────┼──────────────────┐ + │ │ │ + ┌─────▼─────┐ ┌─────▼─────┐ ┌─────▼─────┐ + │ CLIP │ │ VDB │ │ Graph │ + │ 9086 │ │ 8886 │ │ 9092 │ + │ GPU 2 │ │ CPU │ │ CPU │ + └───────────┘ └───────────┘ └───────────┘ + │ │ │ + ┌─────▼─────┐ ┌─────▼─────┐ + │BGE Rerank │ │ LLM │ + │ 9090 │ │harnessed │ + │ GPU 2 │ │ │ + └───────────┘ └───────────┘ +``` + +## 插件槽位 + +| 槽位 | 当前实现 | 可替换为 | +|------|---------|---------| +| embedding | CLIP-ViT-H/14 (9086) | BGE-M3, OpenAI embedding | +| vdb | Milvus VDB (8886) | Qdrant, Weaviate | +| graph | NetworkX (9092) | FalkorDB, Neo4j | +| reranker | BGE-Reranker (9090) | Cohere, LLM rerank | +| llm | harnessed_agent | 任意 LLM | +| chunker | recursive/sentence | 自定义分块策略 | +| extractor | LLM-structured | spaCy, GraphRAG | +| retriever | hybrid/vector_only | 自定义检索策略 | +| face | InsightFace (待部署) | FaceNet, DeepFace | + +## Pipeline 配置 + +### kg-rag-standard(标准版) +- CLIP embedding + Milvus + NetworkX 图 + BGE Rerank +- 向量召回 + 图扩展 + RRF 融合 + 精排 + +### kg-rag-lite(轻量版) +- CLIP embedding + Milvus + BGE Rerank +- 纯向量检索,无图谱 + +## API + +### GET /api/status +服务状态和可用插件列表。 + +### POST /api/ingest +入库文档。 + +```json +{ + "document": "文本内容...", + "pipeline": "kg-rag-standard", + "collection": "knowledge", + "graph_name": "knowledge" +} +``` + +流程: 分块 → CLIP embedding → VDB 存储 → 实体抽取 → 图存储 + +### POST /api/search +混合检索。 + +```json +{ + "query": "搜索问题", + "pipeline": "kg-rag-standard", + "collection": "knowledge", + "graph_name": "knowledge", + "top_k": 5 +} +``` + +流程: CLIP embed → VDB 向量召回 → 图扩展 → RRF 融合 → BGE Rerank + +### GET/POST /api/pipelines +管理 pipeline 配置。 + +```json +POST: 创建自定义 pipeline +GET: 列出所有 pipeline +GET ?name=xxx: 获取指定 pipeline +``` + +### GET /api/plugins +列出所有可用插件及状态。 + +## 部署 + +```bash +cd /data/ymq/rag-pipeline +bash build.sh deploy +bash build.sh stop +bash build.sh status +``` + +## 端到端测试 + +```bash +# 1. 入库 +curl -X POST http://localhost:9093/api/ingest \ + -H "Content-Type: application/json" \ + -d '{"document":"张三在ABC公司担任技术总监,他和李四是同事关系。","pipeline":"kg-rag-standard"}' + +# 2. 检索 +curl -X POST http://localhost:9093/api/search \ + -H "Content-Type: application/json" \ + -d '{"query":"张三在哪家公司工作?","pipeline":"kg-rag-standard"}' +``` + +## 端口 + +9093 (CPU only) + +## Git + +git@git.opencomputing.cn:yumoqing/rag-pipeline.git diff --git a/ah.py b/ah.py new file mode 100644 index 0000000..f5ca022 --- /dev/null +++ b/ah.py @@ -0,0 +1,6 @@ +# -*- coding:utf-8 -*- +from ahserver.webapp import webapp +from init import load_rag_pipeline + +if __name__ == '__main__': + webapp(load_rag_pipeline) diff --git a/build.sh b/build.sh new file mode 100755 index 0000000..149d3a1 --- /dev/null +++ b/build.sh @@ -0,0 +1,55 @@ +#!/usr/bin/env bash +# RAG Pipeline Service +set -e +cd "$(dirname "$0")" + +SERVICE_NAME="rag-pipeline" +PORT=9093 +PY=/data/ymq/wan22-service/py3/bin/python +action="${1:-status}" + +case "$action" in + deploy|update) + echo "=== $SERVICE_NAME Deploy (CPU, port $PORT) ===" + if [ -f ah.pid ] && kill -0 $(cat ah.pid) 2>/dev/null; then + kill $(cat ah.pid) 2>/dev/null || true; sleep 2 + fi + if [ -d .git ] && [ -f .git/HEAD ]; then + git pull origin master 2>/dev/null || true + fi + mkdir -p logs files wwwroot pipelines data + export PYTHONPATH="$(pwd)" + nohup $PY ah.py > nohup.out 2>&1 & + echo $! > ah.pid + echo "Started PID $(cat ah.pid) on port $PORT" + sleep 3 + if curl -s http://localhost:$PORT/api/status > /dev/null 2>&1; then + echo "Service healthy" + else + echo "WARNING: not responding, check nohup.out" + tail -30 nohup.out + fi + ;; + stop) + if [ -f ah.pid ]; then + kill $(cat ah.pid) 2>/dev/null || true; rm -f ah.pid; echo "Stopped" + else echo "Not running"; fi + ;; + start) + mkdir -p logs files wwwroot pipelines data + export PYTHONPATH="$(pwd)" + nohup $PY ah.py > nohup.out 2>&1 & + echo $! > ah.pid; echo "Started PID $(cat ah.pid)" + ;; + status) + echo "=== $SERVICE_NAME Status ===" + if [ -f ah.pid ] && kill -0 $(cat ah.pid) 2>/dev/null; then + echo "Process: running (PID $(cat ah.pid))" + else echo "Process: not running"; fi + echo "Port: $PORT" + if curl -s --max-time 3 http://localhost:$PORT/api/status > /dev/null 2>&1; then + echo "HTTP: OK" + else echo "HTTP: not responding"; fi + ;; + *) echo "Usage: $0 {deploy|update|stop|start|status}"; exit 1 ;; +esac diff --git a/conf/config.json b/conf/config.json new file mode 100644 index 0000000..cc3c2c6 --- /dev/null +++ b/conf/config.json @@ -0,0 +1,28 @@ +{ + "password_key": "RagPipeline2026Key", + "filesroot": "$[workdir]$/files", + "logger": { + "name": "rag-pipeline", + "levelname": "info", + "logfile": "$[workdir]$/logs/rag-pipeline.log" + }, + "website": { + "paths": [["$[workdir]$/wwwroot", ""]], + "client_max_size": 52428800, + "host": "0.0.0.0", + "port": 9093, + "coding": "utf-8", + "indexes": ["index.html"], + "startswiths": [ + {"leading": "/api/status", "registerfunction": "status"}, + {"leading": "/api/ingest", "registerfunction": "ingest"}, + {"leading": "/api/search", "registerfunction": "search"}, + {"leading": "/api/pipelines", "registerfunction": "pipelines"}, + {"leading": "/api/plugins", "registerfunction": "plugins"} + ], + "processors": [ + [".tmpl", "tmpl"], [".app", "app"], [".ui", "bui"], + [".dspy", "dspy"], [".md", "md"] + ] + } +} diff --git a/core/__init__.py b/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/core/chunker.py b/core/chunker.py new file mode 100644 index 0000000..5f9891a --- /dev/null +++ b/core/chunker.py @@ -0,0 +1,73 @@ +# -*- coding:utf-8 -*- +"""Document chunking strategies.""" +import re +from typing import List, Dict + + +def recursive_chunk(text: str, chunk_size: int = 512, overlap: int = 64) -> List[Dict]: + """Split text into chunks with overlap.""" + if not text or len(text) <= chunk_size: + return [{"id": "chunk_0", "text": text, "index": 0}] + + chunks = [] + start = 0 + idx = 0 + + while start < len(text): + end = start + chunk_size + + # Try to break at sentence boundary + if end < len(text): + # Look for sentence ending in the last 100 chars + search_start = max(start + chunk_size - 100, start) + search_region = text[search_start:end] + + for sep in ['。', '!', '?', '. ', '! ', '? ', '\n\n', '\n', ';', ';']: + last_sep = search_region.rfind(sep) + if last_sep >= 0: + end = search_start + last_sep + len(sep) + break + + chunk_text = text[start:end].strip() + if chunk_text: + chunks.append({ + "id": f"chunk_{idx}", + "text": chunk_text, + "index": idx + }) + idx += 1 + + # Move forward with overlap + start = end - overlap + if start >= len(text): + break + + return chunks + + +def sentence_chunk(text: str) -> List[Dict]: + """Split text by sentences.""" + # Split on Chinese/English sentence endings + sentences = re.split(r'(?<=[。!?.!?])\s*', text) + + chunks = [] + for idx, sent in enumerate(sentences): + sent = sent.strip() + if sent: + chunks.append({ + "id": f"chunk_{idx}", + "text": sent, + "index": idx + }) + + return chunks + + +def chunk_document(text: str, strategy: str = "recursive", **kwargs) -> List[Dict]: + """Chunk a document using the specified strategy.""" + if strategy == "sentence": + return sentence_chunk(text) + else: + return recursive_chunk(text, + chunk_size=kwargs.get("chunk_size", 512), + overlap=kwargs.get("overlap", 64)) diff --git a/core/extractor.py b/core/extractor.py new file mode 100644 index 0000000..275a18a --- /dev/null +++ b/core/extractor.py @@ -0,0 +1,79 @@ +# -*- coding:utf-8 -*- +"""Entity and relation extraction via LLM.""" +import json +import re +from typing import List, Dict + + +EXTRACTION_PROMPT = """你是一个知识图谱抽取专家。请从以下文本中抽取实体和关系。 + +文本: +{text} + +请按以下JSON格式返回: +{{ + "entities": [ + {{"name": "实体名", "type": "实体类型(person/company/product/concept/event/location)", "description": "简要描述"}} + ], + "relations": [ + {{"source": "源实体名", "target": "目标实体名", "relation": "关系类型", "description": "关系描述"}} + ] +}} + +只返回JSON,不要其他内容。""" + + +def extract_entities_relations(text: str, llm_func=None) -> Dict: + """Extract entities and relations from text using LLM. + + Args: + text: Input text + llm_func: Function that takes a prompt and returns LLM response text. + If None, returns empty result. + """ + if not llm_func: + return {"entities": [], "relations": []} + + prompt = EXTRACTION_PROMPT.format(text=text[:2000]) # Limit text length + + try: + response = llm_func(prompt) + + # Try to extract JSON from response + # Remove markdown code blocks if present + response = response.strip() + if response.startswith('```'): + response = re.sub(r'^```(?:json)?\s*', '', response) + response = re.sub(r'\s*```$', '', response) + + result = json.loads(response) + + # Validate structure + entities = result.get("entities", []) + relations = result.get("relations", []) + + # Basic validation + valid_entities = [] + for e in entities: + if isinstance(e, dict) and "name" in e: + valid_entities.append({ + "name": e["name"], + "type": e.get("type", "unknown"), + "description": e.get("description", "") + }) + + valid_relations = [] + entity_names = {e["name"] for e in valid_entities} + for r in relations: + if isinstance(r, dict) and "source" in r and "target" in r: + valid_relations.append({ + "source": r["source"], + "target": r["target"], + "relation": r.get("relation", "related_to"), + "description": r.get("description", "") + }) + + return {"entities": valid_entities, "relations": valid_relations} + + except Exception as e: + return {"entities": [], "relations": [], "error": str(e)} diff --git a/core/retriever.py b/core/retriever.py new file mode 100644 index 0000000..515615d --- /dev/null +++ b/core/retriever.py @@ -0,0 +1,163 @@ +# -*- coding:utf-8 -*- +"""Hybrid retrieval: vector search + graph expansion + RRF fusion.""" +from typing import List, Dict + +def rrf_fusion(results_list: List[List[Dict]], k: int = 60) -> List[Dict]: + """Reciprocal Rank Fusion - merge multiple ranked lists.""" + scores = {} + + for results in results_list: + for rank, item in enumerate(results): + doc_id = item.get("id", item.get("node_id", str(rank))) + rrf_score = 1.0 / (k + rank + 1) + + if doc_id not in scores: + scores[doc_id] = {"item": item, "score": 0} + scores[doc_id]["score"] += rrf_score + + sorted_items = sorted(scores.values(), key=lambda x: -x["score"]) + + for item in sorted_items: + item["item"]["rrf_score"] = round(item["score"], 6) + + return [item["item"] for item in sorted_items] + + +def vector_search(query_embedding: List[float], vdb_plugin: Dict, + collection: str = "knowledge", top_k: int = 20) -> List[Dict]: + """Search VDB for similar vectors.""" + from plugins.registry import call_plugin + + endpoint = vdb_plugin.get("endpoint") + if not endpoint: + return [] + + result = call_plugin(endpoint, "/v1/query", { + "colname": collection, + "vector": query_embedding, + "pagerows": top_k, + "page": 1 + }) + + if "error" in result or result.get("status") != "SUCCEEDED": + return [] + + # VDB returns {"status": "SUCCEEDED", "data": {"rows": [...]}} + data = result.get("data", {}) + return data.get("rows", []) if isinstance(data, dict) else [] + + +def graph_expand(entity_ids: List[str], graph_plugin: Dict, + graph_name: str = "knowledge", hops: int = 2) -> List[Dict]: + """Expand entities via graph neighbors.""" + from plugins.registry import call_plugin + + endpoint = graph_plugin.get("endpoint") + if not endpoint or graph_plugin.get("type") == "none": + return [] + + all_neighbors = [] + seen = set() + + for entity_id in entity_ids: + # Graph uses entity_{name} format from ingest + node_id = f"entity_{entity_id}".replace(" ", "_") + result = call_plugin(endpoint, "/api/graph/neighbors", { + "graph": graph_name, + "node_id": node_id, + "depth": hops + }) + + if "error" not in result: + for neighbor in result.get("neighbors", []): + nid = neighbor.get("node_id", "") + if nid and nid not in seen: + seen.add(nid) + neighbor["source_entity"] = entity_id + neighbor["id"] = nid + all_neighbors.append(neighbor) + + return all_neighbors + + +def hybrid_retrieve(query_embedding: List[float], + extracted_entities: List[str], + pipeline: Dict, + collection: str = "knowledge", + graph_name: str = "knowledge") -> List[Dict]: + """Hybrid retrieval: vector + graph + RRF fusion.""" + plugins = pipeline.get("plugins", {}) + retriever_config = plugins.get("retriever", {}) + + vector_top_k = retriever_config.get("vector_top_k", 20) + graph_hops = retriever_config.get("graph_hops", 2) + retriever_type = retriever_config.get("type", "hybrid") + + results_lists = [] + + # Vector search + vdb_plugin = plugins.get("vdb", {}) + vector_results = vector_search(query_embedding, vdb_plugin, collection, vector_top_k) + if vector_results: + results_lists.append(vector_results) + + # Graph expansion (only if hybrid mode and graph enabled) + if retriever_type == "hybrid" and extracted_entities: + graph_plugin = plugins.get("graph", {}) + graph_results = graph_expand(extracted_entities, graph_plugin, graph_name, graph_hops) + if graph_results: + results_lists.append(graph_results) + + if not results_lists: + return [] + + if len(results_lists) == 1: + return results_lists[0] + + return rrf_fusion(results_lists) + + +def rerank_results(query: str, results: List[Dict], reranker_plugin: Dict, + top_k: int = 5) -> List[Dict]: + """Rerank results using BGE-Reranker.""" + from plugins.registry import call_plugin + + if not results or reranker_plugin.get("type") == "none": + return results[:top_k] + + endpoint = reranker_plugin.get("endpoint") + if not endpoint: + return results[:top_k] + + # Prepare documents for reranking + documents = [] + for r in results: + doc = r.get("text", "") or r.get("description", "") or r.get("transcript", "") + if not doc: + doc = str(r.get("name", "")) + " " + str(r.get("attrs", {})) + documents.append(doc) + + if not documents: + return results[:top_k] + + rerank_result = call_plugin(endpoint, "/api/rerank", { + "query": query, + "documents": documents, + "top_k": top_k + }) + + if "error" in rerank_result: + return results[:top_k] + + # Map reranked docs back to original results + ranked = [] + for rd in rerank_result.get("ranked_docs", []): + doc_text = rd.get("doc", "") + for orig in results: + orig_text = orig.get("text", "") or orig.get("description", "") or str(orig.get("name", "")) + if doc_text == orig_text or doc_text in orig_text or orig_text in doc_text: + orig["rerank_score"] = rd.get("score", 0) + ranked.append(orig) + break + + return ranked if ranked else results[:top_k] diff --git a/init.py b/init.py new file mode 100644 index 0000000..1851788 --- /dev/null +++ b/init.py @@ -0,0 +1,126 @@ +# -*- coding:utf-8 -*- +"""RAG Pipeline API endpoints.""" +from traceback import format_exc +from ahserver.serverenv import ServerEnv +from appPublic.registerfunction import RegisterFunction +from appPublic.log import exception +import json + + +async def status_handler(request, params_kw, *args, **kwargs): + import sys, os + sys.path.insert(0, os.getcwd()) + from plugins.registry import list_pipelines, list_plugins + + pipelines = list_pipelines() + plugins = list_plugins() + + # Check service availability + services = {} + for cap, plugin_list in plugins.items(): + for p in plugin_list: + if p.get("status") == "available": + services[cap] = p.get("type", "unknown") + + return json.dumps({ + "service": "rag-pipeline", + "port": 9093, + "active_services": services, + "pipelines": [p["name"] for p in pipelines], + "endpoints": ["/api/status", "/api/ingest", "/api/search", "/api/pipelines", "/api/plugins"] + }, indent=2, ensure_ascii=False) + + +async def ingest_handler(request, params_kw, *args, **kwargs): + import sys, os + sys.path.insert(0, os.getcwd()) + from pipeline import ingest + + try: + document = params_kw.get("document", "") + if not document: + return json.dumps({"error": "document text required"}) + + pipeline_name = params_kw.get("pipeline", "kg-rag-standard") + collection = params_kw.get("collection", "knowledge") + graph_name = params_kw.get("graph_name", "knowledge") + + # Optional LLM for extraction (not used in basic mode) + # llm_func = params_kw.get("llm_func") # Can't pass functions via HTTP + + result = ingest(document, pipeline_name, collection, graph_name, llm_func=None) + return json.dumps(result, ensure_ascii=False) + + except Exception as e: + exception(f"{e}, {format_exc()}") + return json.dumps({"error": str(e)}) + + +async def search_handler(request, params_kw, *args, **kwargs): + import sys, os + sys.path.insert(0, os.getcwd()) + from pipeline import search + + try: + query = params_kw.get("query", "") + if not query: + return json.dumps({"error": "query text required"}) + + pipeline_name = params_kw.get("pipeline", "kg-rag-standard") + collection = params_kw.get("collection", "knowledge") + graph_name = params_kw.get("graph_name", "knowledge") + top_k = int(params_kw.get("top_k", 5)) + + result = search(query, pipeline_name, collection, graph_name, top_k, llm_func=None) + return json.dumps(result, ensure_ascii=False) + + except Exception as e: + exception(f"{e}, {format_exc()}") + return json.dumps({"error": str(e)}) + + +async def pipelines_handler(request, params_kw, *args, **kwargs): + import sys, os + sys.path.insert(0, os.getcwd()) + from plugins.registry import list_pipelines, get_pipeline, save_pipeline + + method = request.method + + if method == "POST": + # Create/update pipeline + config = params_kw + if "name" not in config: + return json.dumps({"error": "pipeline config with 'name' required"}) + result = save_pipeline(config) + return json.dumps({"status": "SUCCEEDED", **result}, ensure_ascii=False) + else: + # List pipelines + name = params_kw.get("name") + if name: + pipeline = get_pipeline(name) + if pipeline: + return json.dumps({"status": "SUCCEEDED", "pipeline": pipeline}, indent=2, ensure_ascii=False) + return json.dumps({"error": f"Pipeline '{name}' not found"}) + + pipelines = list_pipelines() + return json.dumps({"status": "SUCCEEDED", "pipelines": pipelines}, indent=2, ensure_ascii=False) + + +async def plugins_handler(request, params_kw, *args, **kwargs): + import sys, os + sys.path.insert(0, os.getcwd()) + from plugins.registry import list_plugins + + plugins = list_plugins() + return json.dumps({"status": "SUCCEEDED", "plugins": plugins}, indent=2, ensure_ascii=False) + + +def load_rag_pipeline(): + """Register API handlers""" + env = ServerEnv() + rf = RegisterFunction() + rf.register("status", status_handler) + rf.register("ingest", ingest_handler) + rf.register("search", search_handler) + rf.register("pipelines", pipelines_handler) + rf.register("plugins", plugins_handler) diff --git a/pipeline.py b/pipeline.py new file mode 100644 index 0000000..5e5efab --- /dev/null +++ b/pipeline.py @@ -0,0 +1,270 @@ +# -*- coding:utf-8 -*- +"""RAG Pipeline orchestration - ingest and search workflows.""" +import json +import time +import uuid +from typing import Dict, List, Optional +from traceback import format_exc + +from plugins.registry import get_pipeline, call_plugin +from core.chunker import chunk_document +from core.extractor import extract_entities_relations +from core.retriever import hybrid_retrieve, rerank_results + + +def _get_embedding(texts: List[str], embedding_plugin: Dict) -> List[List[float]]: + """Get text embeddings from CLIP service.""" + endpoint = embedding_plugin.get("endpoint") + if not endpoint: + return [] + + result = call_plugin(endpoint, "/api/text", {"texts": texts}) + if "error" in result: + return [] + + return result.get("embeddings", []) + + +def _ensure_collection(vdb_plugin: Dict, collection: str, dim: int = 1024) -> Dict: + """Ensure VDB collection exists with correct schema.""" + endpoint = vdb_plugin.get("endpoint") + if not endpoint: + return {"error": "VDB endpoint not configured"} + + # Check if collection exists + list_result = call_plugin(endpoint, "/v1/listcollections", {}) + existing = list_result.get("collections", []) + + if collection in existing: + return {"status": "exists"} + + # Create with proper field schema + fields = [ + {"name": "id", "type": "str", "is_primary": True, "max_length": 256}, + {"name": "text", "type": "str", "max_length": 65535}, + {"name": "description", "type": "str", "max_length": 65535}, + {"name": "type", "type": "str", "max_length": 128}, + {"name": "embedding", "type": "fvector", "dim": dim} + ] + + result = call_plugin(endpoint, "/v1/createcollection", { + "colname": collection, + "fields": fields, + "description": "RAG knowledge collection", + "metric": "COSINE" + }) + + return result + + +def _store_in_vdb(items: List[Dict], embeddings: List[List[float]], + vdb_plugin: Dict, collection: str = "knowledge") -> Dict: + """Store items with embeddings in VDB.""" + endpoint = vdb_plugin.get("endpoint") + if not endpoint: + return {"error": "VDB endpoint not configured"} + + # Ensure collection exists + ensure_result = _ensure_collection(vdb_plugin, collection) + if "error" in ensure_result: + return ensure_result + + # Prepare batch insert - data must match schema fields + rows = [] + for item, emb in zip(items, embeddings): + row = { + "id": item.get("id", str(uuid.uuid4())[:16]), + "text": item.get("text", ""), + "description": item.get("description", ""), + "type": item.get("type", "chunk"), + "embedding": emb + } + rows.append(row) + + result = call_plugin(endpoint, "/v1/batchinsert", { + "colname": collection, + "data": rows + }) + + return result + + +def _store_in_graph(entities: List[Dict], relations: List[Dict], + graph_plugin: Dict, graph_name: str = "knowledge") -> Dict: + """Store entities and relations in graph.""" + endpoint = graph_plugin.get("endpoint") + if not endpoint or graph_plugin.get("type") == "none": + return {"status": "skipped", "reason": "graph disabled"} + + added_nodes = 0 + added_edges = 0 + + for entity in entities: + name = entity.get("name", "") + node_id = f"entity_{name}".replace(" ", "_") + result = call_plugin(endpoint, "/api/graph/add_node", { + "graph": graph_name, + "node_id": node_id, + "attrs": { + "name": name, + "type": entity.get("type", "unknown"), + "description": entity.get("description", "") + } + }) + if "error" not in result: + added_nodes += 1 + + for rel in relations: + source = f"entity_{rel.get('source', '')}".replace(" ", "_") + target = f"entity_{rel.get('target', '')}".replace(" ", "_") + result = call_plugin(endpoint, "/api/graph/add_edge", { + "graph": graph_name, + "source": source, + "target": target, + "attrs": { + "relation": rel.get("relation", "related_to"), + "description": rel.get("description", "") + } + }) + if "error" not in result: + added_edges += 1 + + # Save graph to disk + call_plugin(endpoint, "/api/graph/save", {"graph": graph_name}) + + return {"status": "ok", "nodes_added": added_nodes, "edges_added": added_edges} + + +def ingest(document: str, pipeline_name: str = "kg-rag-standard", + collection: str = "knowledge", graph_name: str = "knowledge", + llm_func=None) -> Dict: + """Full ingest pipeline: chunk -> embed -> extract -> store.""" + start_time = time.time() + + pipeline = get_pipeline(pipeline_name) + plugins = pipeline.get("plugins", {}) + + # Step 1: Chunk + chunker_config = plugins.get("chunker", {}) + chunks = chunk_document(document, + strategy=chunker_config.get("type", "recursive"), + chunk_size=chunker_config.get("chunk_size", 512), + overlap=chunker_config.get("overlap", 64)) + + chunk_texts = [c["text"] for c in chunks] + + # Step 2: Embed + embedding_plugin = plugins.get("embedding", {}) + embeddings = _get_embedding(chunk_texts, embedding_plugin) + + if not embeddings: + return {"error": "Failed to get embeddings", "chunks": len(chunks)} + + # Step 3: Store chunks + embeddings in VDB + vdb_plugin = plugins.get("vdb", {}) + vdb_result = _store_in_vdb(chunks, embeddings, vdb_plugin, collection) + + # Step 4: Extract entities and relations + extractor_config = plugins.get("extractor", {}) + extraction = {"entities": [], "relations": []} + + if extractor_config.get("type") != "none" and llm_func: + extraction = extract_entities_relations(document[:3000], llm_func) + + # Step 5: Store in graph + graph_plugin = plugins.get("graph", {}) + graph_result = _store_in_graph( + extraction["entities"], + extraction["relations"], + graph_plugin, + graph_name + ) + else: + graph_result = {"status": "skipped"} + + elapsed = round(time.time() - start_time, 3) + + return { + "status": "SUCCEEDED", + "pipeline": pipeline_name, + "chunks": len(chunks), + "embeddings": len(embeddings), + "entities": len(extraction.get("entities", [])), + "relations": len(extraction.get("relations", [])), + "vdb_result": vdb_result, + "graph_result": graph_result, + "elapsed": elapsed + } + + +def search(query: str, pipeline_name: str = "kg-rag-standard", + collection: str = "knowledge", graph_name: str = "knowledge", + top_k: int = 5, llm_func=None) -> Dict: + """Full search pipeline: embed -> retrieve -> rerank -> generate.""" + start_time = time.time() + + pipeline = get_pipeline(pipeline_name) + plugins = pipeline.get("plugins", {}) + + # Step 1: Embed query + embedding_plugin = plugins.get("embedding", {}) + query_embeddings = _get_embedding([query], embedding_plugin) + + if not query_embeddings: + return {"error": "Failed to embed query"} + + query_embedding = query_embeddings[0] + + # Step 2: Extract entities from query (for graph expansion) + extracted_entities = [] + if llm_func: + quick_extraction = extract_entities_relations(query, llm_func) + extracted_entities = [e["name"] for e in quick_extraction.get("entities", [])] + + # Step 3: Hybrid retrieval + results = hybrid_retrieve( + query_embedding, + extracted_entities, + pipeline, + collection, + graph_name + ) + + # Step 4: Rerank + reranker_plugin = plugins.get("reranker", {}) + ranked_results = rerank_results(query, results, reranker_plugin, top_k) + + # Step 5: Generate answer (optional) + answer = None + if llm_func and ranked_results: + context = "\n\n".join([ + r.get("text", "") or r.get("description", "") or str(r) + for r in ranked_results[:top_k] + ]) + + gen_prompt = f"""基于以下上下文回答问题。如果上下文中没有相关信息,请如实说明。 + +上下文: +{context[:2000]} + +问题:{query} + +请用中文回答:""" + + try: + answer = llm_func(gen_prompt) + except: + answer = None + + elapsed = round(time.time() - start_time, 3) + + return { + "status": "SUCCEEDED", + "pipeline": pipeline_name, + "query": query, + "results": ranked_results[:top_k], + "total_retrieved": len(results), + "answer": answer, + "extracted_entities": extracted_entities, + "elapsed": elapsed + } diff --git a/plugins/__init__.py b/plugins/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/plugins/registry.py b/plugins/registry.py new file mode 100644 index 0000000..7a6ab0b --- /dev/null +++ b/plugins/registry.py @@ -0,0 +1,129 @@ +# -*- coding:utf-8 -*- +"""Plugin registry - manages available RAG plugins and pipeline configs.""" +import json +import os +import urllib.request +from typing import Dict, List, Optional + +CONFIG_DIR = "/data/ymq/rag-pipeline/pipelines" + +# Default pipeline config +DEFAULT_PIPELINE = { + "name": "kg-rag-standard", + "description": "标准知识图谱RAG", + "plugins": { + "embedding": {"type": "clip-vith14", "endpoint": "http://localhost:9086"}, + "vdb": {"type": "vdb-milvus", "endpoint": "http://localhost:8886"}, + "graph": {"type": "networkx", "endpoint": "http://localhost:9092"}, + "llm": {"type": "harnessed", "endpoint": "internal"}, + "reranker": {"type": "bge-reranker", "endpoint": "http://localhost:9090"}, + "chunker": {"type": "recursive", "chunk_size": 512, "overlap": 64}, + "extractor": {"type": "llm-structured"}, + "retriever": {"type": "hybrid", "vector_top_k": 20, "graph_hops": 2} + } +} + +LITE_PIPELINE = { + "name": "kg-rag-lite", + "description": "轻量版RAG(纯向量,无图谱)", + "plugins": { + "embedding": {"type": "clip-vith14", "endpoint": "http://localhost:9086"}, + "vdb": {"type": "vdb-milvus", "endpoint": "http://localhost:8886"}, + "graph": {"type": "none"}, + "llm": {"type": "harnessed", "endpoint": "internal"}, + "reranker": {"type": "bge-reranker", "endpoint": "http://localhost:9090"}, + "chunker": {"type": "recursive", "chunk_size": 512, "overlap": 64}, + "retriever": {"type": "vector_only", "top_k": 10} + } +} + + +def _ensure_config_dir(): + os.makedirs(CONFIG_DIR, exist_ok=True) + + +def list_pipelines() -> List[Dict]: + """List all available pipeline configs.""" + _ensure_config_dir() + pipelines = [DEFAULT_PIPELINE, LITE_PIPELINE] + + # Load custom pipelines from disk + for f in os.listdir(CONFIG_DIR): + if f.endswith('.json'): + try: + with open(os.path.join(CONFIG_DIR, f), 'r') as fh: + pipelines.append(json.load(fh)) + except: + pass + + return pipelines + + +def get_pipeline(name: str) -> Optional[Dict]: + """Get a pipeline config by name.""" + for p in list_pipelines(): + if p["name"] == name: + return p + return DEFAULT_PIPELINE + + +def save_pipeline(config: Dict) -> Dict: + """Save a custom pipeline config.""" + _ensure_config_dir() + filepath = os.path.join(CONFIG_DIR, f"{config['name']}.json") + with open(filepath, 'w') as f: + json.dump(config, f, ensure_ascii=False, indent=2) + return {"status": "ok", "filepath": filepath} + + +def list_plugins() -> Dict: + """List all available plugins by capability.""" + return { + "embedding": [ + {"type": "clip-vith14", "model": "BAAI/CLIP-ViT-H-14", "dim": 1024, "endpoint": "http://localhost:9086", "status": "available"}, + {"type": "bge-m3", "model": "BAAI/bge-m3", "dim": 1024, "status": "not_deployed"}, + ], + "vdb": [ + {"type": "vdb-milvus", "backend": "Milvus", "endpoint": "http://localhost:8886", "status": "available"}, + {"type": "qdrant", "backend": "Qdrant", "status": "not_deployed"}, + ], + "graph": [ + {"type": "networkx", "backend": "NetworkX", "endpoint": "http://localhost:9092", "status": "available"}, + {"type": "falkordb", "backend": "FalkorDB+Redis", "status": "blocked_redis_module"}, + {"type": "none", "description": "Disable graph"}, + ], + "llm": [ + {"type": "harnessed", "description": "harnessed_agent", "status": "available"}, + ], + "reranker": [ + {"type": "bge-reranker", "model": "BAAI/bge-reranker-v2-m3", "endpoint": "http://localhost:9090", "status": "available"}, + {"type": "none", "description": "Skip reranking"}, + ], + "chunker": [ + {"type": "recursive", "description": "Recursive character splitter"}, + {"type": "sentence", "description": "Sentence-based splitter"}, + ], + "extractor": [ + {"type": "llm-structured", "description": "LLM-based entity/relation extraction"}, + {"type": "none", "description": "Skip extraction"}, + ], + "retriever": [ + {"type": "hybrid", "description": "Vector + Graph hybrid retrieval"}, + {"type": "vector_only", "description": "Pure vector retrieval"}, + ], + "face": [ + {"type": "insightface", "model": "buffalo_l", "dim": 512, "status": "blocked_no_model"}, + ] + } + + +def call_plugin(endpoint: str, path: str, data: Dict, timeout: int = 30) -> Dict: + """Call a plugin endpoint via HTTP POST.""" + url = f"{endpoint}{path}" + payload = json.dumps(data).encode('utf-8') + req = urllib.request.Request(url, data=payload, headers={'Content-Type': 'application/json'}) + try: + with urllib.request.urlopen(req, timeout=timeout) as resp: + return json.loads(resp.read().decode('utf-8')) + except Exception as e: + return {"error": str(e)} diff --git a/workers/__init__.py b/workers/__init__.py new file mode 100644 index 0000000..e69de29