Compare commits

...

No commits in common. "main" and "master" have entirely different histories.
main ... master

14 changed files with 1064 additions and 1 deletions

9
.gitignore vendored Normal file
View File

@ -0,0 +1,9 @@
ah.pid
nohup.out
__pycache__/
*.pyc
*__pycache__/
logs/
files/
data/
pipelines/*.json

127
README.md
View File

@ -1,2 +1,127 @@
# rag-pipeline # RAG Pipeline Service
可插拔 RAG 编排服务,支持知识图谱增强的多模态检索。
## 架构
```
┌─────────────────┐
│ RAG Pipeline │
│ (9093 CPU) │
└────────┬────────┘
┌──────────────────┼──────────────────┐
│ │ │
┌─────▼─────┐ ┌─────▼─────┐ ┌─────▼─────┐
│ CLIP │ │ VDB │ │ Graph │
│ 9086 │ │ 8886 │ │ 9092 │
│ GPU 2 │ │ CPU │ │ CPU │
└───────────┘ └───────────┘ └───────────┘
│ │ │
┌─────▼─────┐ ┌─────▼─────┐
│BGE Rerank │ │ LLM │
│ 9090 │ │harnessed │
│ GPU 2 │ │ │
└───────────┘ └───────────┘
```
## 插件槽位
| 槽位 | 当前实现 | 可替换为 |
|------|---------|---------|
| embedding | CLIP-ViT-H/14 (9086) | BGE-M3, OpenAI embedding |
| vdb | Milvus VDB (8886) | Qdrant, Weaviate |
| graph | NetworkX (9092) | FalkorDB, Neo4j |
| reranker | BGE-Reranker (9090) | Cohere, LLM rerank |
| llm | harnessed_agent | 任意 LLM |
| chunker | recursive/sentence | 自定义分块策略 |
| extractor | LLM-structured | spaCy, GraphRAG |
| retriever | hybrid/vector_only | 自定义检索策略 |
| face | InsightFace (待部署) | FaceNet, DeepFace |
## Pipeline 配置
### kg-rag-standard标准版
- CLIP embedding + Milvus + NetworkX 图 + BGE Rerank
- 向量召回 + 图扩展 + RRF 融合 + 精排
### kg-rag-lite轻量版
- CLIP embedding + Milvus + BGE Rerank
- 纯向量检索,无图谱
## API
### GET /api/status
服务状态和可用插件列表。
### POST /api/ingest
入库文档。
```json
{
"document": "文本内容...",
"pipeline": "kg-rag-standard",
"collection": "knowledge",
"graph_name": "knowledge"
}
```
流程: 分块 → CLIP embedding → VDB 存储 → 实体抽取 → 图存储
### POST /api/search
混合检索。
```json
{
"query": "搜索问题",
"pipeline": "kg-rag-standard",
"collection": "knowledge",
"graph_name": "knowledge",
"top_k": 5
}
```
流程: CLIP embed → VDB 向量召回 → 图扩展 → RRF 融合 → BGE Rerank
### GET/POST /api/pipelines
管理 pipeline 配置。
```json
POST: 创建自定义 pipeline
GET: 列出所有 pipeline
GET ?name=xxx: 获取指定 pipeline
```
### GET /api/plugins
列出所有可用插件及状态。
## 部署
```bash
cd /data/ymq/rag-pipeline
bash build.sh deploy
bash build.sh stop
bash build.sh status
```
## 端到端测试
```bash
# 1. 入库
curl -X POST http://localhost:9093/api/ingest \
-H "Content-Type: application/json" \
-d '{"document":"张三在ABC公司担任技术总监他和李四是同事关系。","pipeline":"kg-rag-standard"}'
# 2. 检索
curl -X POST http://localhost:9093/api/search \
-H "Content-Type: application/json" \
-d '{"query":"张三在哪家公司工作?","pipeline":"kg-rag-standard"}'
```
## 端口
9093 (CPU only)
## Git
git@git.opencomputing.cn:yumoqing/rag-pipeline.git

6
ah.py Normal file
View File

@ -0,0 +1,6 @@
# -*- coding:utf-8 -*-
from ahserver.webapp import webapp
from init import load_rag_pipeline
if __name__ == '__main__':
webapp(load_rag_pipeline)

55
build.sh Executable file
View File

@ -0,0 +1,55 @@
#!/usr/bin/env bash
# RAG Pipeline Service
set -e
cd "$(dirname "$0")"
SERVICE_NAME="rag-pipeline"
PORT=9093
PY=/data/ymq/wan22-service/py3/bin/python
action="${1:-status}"
case "$action" in
deploy|update)
echo "=== $SERVICE_NAME Deploy (CPU, port $PORT) ==="
if [ -f ah.pid ] && kill -0 $(cat ah.pid) 2>/dev/null; then
kill $(cat ah.pid) 2>/dev/null || true; sleep 2
fi
if [ -d .git ] && [ -f .git/HEAD ]; then
git pull origin master 2>/dev/null || true
fi
mkdir -p logs files wwwroot pipelines data
export PYTHONPATH="$(pwd)"
nohup $PY ah.py > nohup.out 2>&1 &
echo $! > ah.pid
echo "Started PID $(cat ah.pid) on port $PORT"
sleep 3
if curl -s http://localhost:$PORT/api/status > /dev/null 2>&1; then
echo "Service healthy"
else
echo "WARNING: not responding, check nohup.out"
tail -30 nohup.out
fi
;;
stop)
if [ -f ah.pid ]; then
kill $(cat ah.pid) 2>/dev/null || true; rm -f ah.pid; echo "Stopped"
else echo "Not running"; fi
;;
start)
mkdir -p logs files wwwroot pipelines data
export PYTHONPATH="$(pwd)"
nohup $PY ah.py > nohup.out 2>&1 &
echo $! > ah.pid; echo "Started PID $(cat ah.pid)"
;;
status)
echo "=== $SERVICE_NAME Status ==="
if [ -f ah.pid ] && kill -0 $(cat ah.pid) 2>/dev/null; then
echo "Process: running (PID $(cat ah.pid))"
else echo "Process: not running"; fi
echo "Port: $PORT"
if curl -s --max-time 3 http://localhost:$PORT/api/status > /dev/null 2>&1; then
echo "HTTP: OK"
else echo "HTTP: not responding"; fi
;;
*) echo "Usage: $0 {deploy|update|stop|start|status}"; exit 1 ;;
esac

28
conf/config.json Normal file
View File

@ -0,0 +1,28 @@
{
"password_key": "RagPipeline2026Key",
"filesroot": "$[workdir]$/files",
"logger": {
"name": "rag-pipeline",
"levelname": "info",
"logfile": "$[workdir]$/logs/rag-pipeline.log"
},
"website": {
"paths": [["$[workdir]$/wwwroot", ""]],
"client_max_size": 52428800,
"host": "0.0.0.0",
"port": 9093,
"coding": "utf-8",
"indexes": ["index.html"],
"startswiths": [
{"leading": "/api/status", "registerfunction": "status"},
{"leading": "/api/ingest", "registerfunction": "ingest"},
{"leading": "/api/search", "registerfunction": "search"},
{"leading": "/api/pipelines", "registerfunction": "pipelines"},
{"leading": "/api/plugins", "registerfunction": "plugins"}
],
"processors": [
[".tmpl", "tmpl"], [".app", "app"], [".ui", "bui"],
[".dspy", "dspy"], [".md", "md"]
]
}
}

0
core/__init__.py Normal file
View File

73
core/chunker.py Normal file
View File

@ -0,0 +1,73 @@
# -*- coding:utf-8 -*-
"""Document chunking strategies."""
import re
from typing import List, Dict
def recursive_chunk(text: str, chunk_size: int = 512, overlap: int = 64) -> List[Dict]:
"""Split text into chunks with overlap."""
if not text or len(text) <= chunk_size:
return [{"id": "chunk_0", "text": text, "index": 0}]
chunks = []
start = 0
idx = 0
while start < len(text):
end = start + chunk_size
# Try to break at sentence boundary
if end < len(text):
# Look for sentence ending in the last 100 chars
search_start = max(start + chunk_size - 100, start)
search_region = text[search_start:end]
for sep in ['', '', '', '. ', '! ', '? ', '\n\n', '\n', '', ';']:
last_sep = search_region.rfind(sep)
if last_sep >= 0:
end = search_start + last_sep + len(sep)
break
chunk_text = text[start:end].strip()
if chunk_text:
chunks.append({
"id": f"chunk_{idx}",
"text": chunk_text,
"index": idx
})
idx += 1
# Move forward with overlap
start = end - overlap
if start >= len(text):
break
return chunks
def sentence_chunk(text: str) -> List[Dict]:
"""Split text by sentences."""
# Split on Chinese/English sentence endings
sentences = re.split(r'(?<=[。!?.!?])\s*', text)
chunks = []
for idx, sent in enumerate(sentences):
sent = sent.strip()
if sent:
chunks.append({
"id": f"chunk_{idx}",
"text": sent,
"index": idx
})
return chunks
def chunk_document(text: str, strategy: str = "recursive", **kwargs) -> List[Dict]:
"""Chunk a document using the specified strategy."""
if strategy == "sentence":
return sentence_chunk(text)
else:
return recursive_chunk(text,
chunk_size=kwargs.get("chunk_size", 512),
overlap=kwargs.get("overlap", 64))

79
core/extractor.py Normal file
View File

@ -0,0 +1,79 @@
# -*- coding:utf-8 -*-
"""Entity and relation extraction via LLM."""
import json
import re
from typing import List, Dict
EXTRACTION_PROMPT = """你是一个知识图谱抽取专家。请从以下文本中抽取实体和关系。
文本
{text}
请按以下JSON格式返回
{{
"entities": [
{{"name": "实体名", "type": "实体类型(person/company/product/concept/event/location)", "description": "简要描述"}}
],
"relations": [
{{"source": "源实体名", "target": "目标实体名", "relation": "关系类型", "description": "关系描述"}}
]
}}
只返回JSON不要其他内容"""
def extract_entities_relations(text: str, llm_func=None) -> Dict:
"""Extract entities and relations from text using LLM.
Args:
text: Input text
llm_func: Function that takes a prompt and returns LLM response text.
If None, returns empty result.
"""
if not llm_func:
return {"entities": [], "relations": []}
prompt = EXTRACTION_PROMPT.format(text=text[:2000]) # Limit text length
try:
response = llm_func(prompt)
# Try to extract JSON from response
# Remove markdown code blocks if present
response = response.strip()
if response.startswith('```'):
response = re.sub(r'^```(?:json)?\s*', '', response)
response = re.sub(r'\s*```$', '', response)
result = json.loads(response)
# Validate structure
entities = result.get("entities", [])
relations = result.get("relations", [])
# Basic validation
valid_entities = []
for e in entities:
if isinstance(e, dict) and "name" in e:
valid_entities.append({
"name": e["name"],
"type": e.get("type", "unknown"),
"description": e.get("description", "")
})
valid_relations = []
entity_names = {e["name"] for e in valid_entities}
for r in relations:
if isinstance(r, dict) and "source" in r and "target" in r:
valid_relations.append({
"source": r["source"],
"target": r["target"],
"relation": r.get("relation", "related_to"),
"description": r.get("description", "")
})
return {"entities": valid_entities, "relations": valid_relations}
except Exception as e:
return {"entities": [], "relations": [], "error": str(e)}

163
core/retriever.py Normal file
View File

@ -0,0 +1,163 @@
# -*- coding:utf-8 -*-
"""Hybrid retrieval: vector search + graph expansion + RRF fusion."""
from typing import List, Dict
def rrf_fusion(results_list: List[List[Dict]], k: int = 60) -> List[Dict]:
"""Reciprocal Rank Fusion - merge multiple ranked lists."""
scores = {}
for results in results_list:
for rank, item in enumerate(results):
doc_id = item.get("id", item.get("node_id", str(rank)))
rrf_score = 1.0 / (k + rank + 1)
if doc_id not in scores:
scores[doc_id] = {"item": item, "score": 0}
scores[doc_id]["score"] += rrf_score
sorted_items = sorted(scores.values(), key=lambda x: -x["score"])
for item in sorted_items:
item["item"]["rrf_score"] = round(item["score"], 6)
return [item["item"] for item in sorted_items]
def vector_search(query_embedding: List[float], vdb_plugin: Dict,
collection: str = "knowledge", top_k: int = 20) -> List[Dict]:
"""Search VDB for similar vectors."""
from plugins.registry import call_plugin
endpoint = vdb_plugin.get("endpoint")
if not endpoint:
return []
result = call_plugin(endpoint, "/v1/query", {
"colname": collection,
"vector": query_embedding,
"pagerows": top_k,
"page": 1
})
if "error" in result or result.get("status") != "SUCCEEDED":
return []
# VDB returns {"status": "SUCCEEDED", "data": {"rows": [...]}}
data = result.get("data", {})
return data.get("rows", []) if isinstance(data, dict) else []
def graph_expand(entity_ids: List[str], graph_plugin: Dict,
graph_name: str = "knowledge", hops: int = 2) -> List[Dict]:
"""Expand entities via graph neighbors."""
from plugins.registry import call_plugin
endpoint = graph_plugin.get("endpoint")
if not endpoint or graph_plugin.get("type") == "none":
return []
all_neighbors = []
seen = set()
for entity_id in entity_ids:
# Graph uses entity_{name} format from ingest
node_id = f"entity_{entity_id}".replace(" ", "_")
result = call_plugin(endpoint, "/api/graph/neighbors", {
"graph": graph_name,
"node_id": node_id,
"depth": hops
})
if "error" not in result:
for neighbor in result.get("neighbors", []):
nid = neighbor.get("node_id", "")
if nid and nid not in seen:
seen.add(nid)
neighbor["source_entity"] = entity_id
neighbor["id"] = nid
all_neighbors.append(neighbor)
return all_neighbors
def hybrid_retrieve(query_embedding: List[float],
extracted_entities: List[str],
pipeline: Dict,
collection: str = "knowledge",
graph_name: str = "knowledge") -> List[Dict]:
"""Hybrid retrieval: vector + graph + RRF fusion."""
plugins = pipeline.get("plugins", {})
retriever_config = plugins.get("retriever", {})
vector_top_k = retriever_config.get("vector_top_k", 20)
graph_hops = retriever_config.get("graph_hops", 2)
retriever_type = retriever_config.get("type", "hybrid")
results_lists = []
# Vector search
vdb_plugin = plugins.get("vdb", {})
vector_results = vector_search(query_embedding, vdb_plugin, collection, vector_top_k)
if vector_results:
results_lists.append(vector_results)
# Graph expansion (only if hybrid mode and graph enabled)
if retriever_type == "hybrid" and extracted_entities:
graph_plugin = plugins.get("graph", {})
graph_results = graph_expand(extracted_entities, graph_plugin, graph_name, graph_hops)
if graph_results:
results_lists.append(graph_results)
if not results_lists:
return []
if len(results_lists) == 1:
return results_lists[0]
return rrf_fusion(results_lists)
def rerank_results(query: str, results: List[Dict], reranker_plugin: Dict,
top_k: int = 5) -> List[Dict]:
"""Rerank results using BGE-Reranker."""
from plugins.registry import call_plugin
if not results or reranker_plugin.get("type") == "none":
return results[:top_k]
endpoint = reranker_plugin.get("endpoint")
if not endpoint:
return results[:top_k]
# Prepare documents for reranking
documents = []
for r in results:
doc = r.get("text", "") or r.get("description", "") or r.get("transcript", "")
if not doc:
doc = str(r.get("name", "")) + " " + str(r.get("attrs", {}))
documents.append(doc)
if not documents:
return results[:top_k]
rerank_result = call_plugin(endpoint, "/api/rerank", {
"query": query,
"documents": documents,
"top_k": top_k
})
if "error" in rerank_result:
return results[:top_k]
# Map reranked docs back to original results
ranked = []
for rd in rerank_result.get("ranked_docs", []):
doc_text = rd.get("doc", "")
for orig in results:
orig_text = orig.get("text", "") or orig.get("description", "") or str(orig.get("name", ""))
if doc_text == orig_text or doc_text in orig_text or orig_text in doc_text:
orig["rerank_score"] = rd.get("score", 0)
ranked.append(orig)
break
return ranked if ranked else results[:top_k]

126
init.py Normal file
View File

@ -0,0 +1,126 @@
# -*- coding:utf-8 -*-
"""RAG Pipeline API endpoints."""
from traceback import format_exc
from ahserver.serverenv import ServerEnv
from appPublic.registerfunction import RegisterFunction
from appPublic.log import exception
import json
async def status_handler(request, params_kw, *args, **kwargs):
import sys, os
sys.path.insert(0, os.getcwd())
from plugins.registry import list_pipelines, list_plugins
pipelines = list_pipelines()
plugins = list_plugins()
# Check service availability
services = {}
for cap, plugin_list in plugins.items():
for p in plugin_list:
if p.get("status") == "available":
services[cap] = p.get("type", "unknown")
return json.dumps({
"service": "rag-pipeline",
"port": 9093,
"active_services": services,
"pipelines": [p["name"] for p in pipelines],
"endpoints": ["/api/status", "/api/ingest", "/api/search", "/api/pipelines", "/api/plugins"]
}, indent=2, ensure_ascii=False)
async def ingest_handler(request, params_kw, *args, **kwargs):
import sys, os
sys.path.insert(0, os.getcwd())
from pipeline import ingest
try:
document = params_kw.get("document", "")
if not document:
return json.dumps({"error": "document text required"})
pipeline_name = params_kw.get("pipeline", "kg-rag-standard")
collection = params_kw.get("collection", "knowledge")
graph_name = params_kw.get("graph_name", "knowledge")
# Optional LLM for extraction (not used in basic mode)
# llm_func = params_kw.get("llm_func") # Can't pass functions via HTTP
result = ingest(document, pipeline_name, collection, graph_name, llm_func=None)
return json.dumps(result, ensure_ascii=False)
except Exception as e:
exception(f"{e}, {format_exc()}")
return json.dumps({"error": str(e)})
async def search_handler(request, params_kw, *args, **kwargs):
import sys, os
sys.path.insert(0, os.getcwd())
from pipeline import search
try:
query = params_kw.get("query", "")
if not query:
return json.dumps({"error": "query text required"})
pipeline_name = params_kw.get("pipeline", "kg-rag-standard")
collection = params_kw.get("collection", "knowledge")
graph_name = params_kw.get("graph_name", "knowledge")
top_k = int(params_kw.get("top_k", 5))
result = search(query, pipeline_name, collection, graph_name, top_k, llm_func=None)
return json.dumps(result, ensure_ascii=False)
except Exception as e:
exception(f"{e}, {format_exc()}")
return json.dumps({"error": str(e)})
async def pipelines_handler(request, params_kw, *args, **kwargs):
import sys, os
sys.path.insert(0, os.getcwd())
from plugins.registry import list_pipelines, get_pipeline, save_pipeline
method = request.method
if method == "POST":
# Create/update pipeline
config = params_kw
if "name" not in config:
return json.dumps({"error": "pipeline config with 'name' required"})
result = save_pipeline(config)
return json.dumps({"status": "SUCCEEDED", **result}, ensure_ascii=False)
else:
# List pipelines
name = params_kw.get("name")
if name:
pipeline = get_pipeline(name)
if pipeline:
return json.dumps({"status": "SUCCEEDED", "pipeline": pipeline}, indent=2, ensure_ascii=False)
return json.dumps({"error": f"Pipeline '{name}' not found"})
pipelines = list_pipelines()
return json.dumps({"status": "SUCCEEDED", "pipelines": pipelines}, indent=2, ensure_ascii=False)
async def plugins_handler(request, params_kw, *args, **kwargs):
import sys, os
sys.path.insert(0, os.getcwd())
from plugins.registry import list_plugins
plugins = list_plugins()
return json.dumps({"status": "SUCCEEDED", "plugins": plugins}, indent=2, ensure_ascii=False)
def load_rag_pipeline():
"""Register API handlers"""
env = ServerEnv()
rf = RegisterFunction()
rf.register("status", status_handler)
rf.register("ingest", ingest_handler)
rf.register("search", search_handler)
rf.register("pipelines", pipelines_handler)
rf.register("plugins", plugins_handler)

270
pipeline.py Normal file
View File

@ -0,0 +1,270 @@
# -*- coding:utf-8 -*-
"""RAG Pipeline orchestration - ingest and search workflows."""
import json
import time
import uuid
from typing import Dict, List, Optional
from traceback import format_exc
from plugins.registry import get_pipeline, call_plugin
from core.chunker import chunk_document
from core.extractor import extract_entities_relations
from core.retriever import hybrid_retrieve, rerank_results
def _get_embedding(texts: List[str], embedding_plugin: Dict) -> List[List[float]]:
"""Get text embeddings from CLIP service."""
endpoint = embedding_plugin.get("endpoint")
if not endpoint:
return []
result = call_plugin(endpoint, "/api/text", {"texts": texts})
if "error" in result:
return []
return result.get("embeddings", [])
def _ensure_collection(vdb_plugin: Dict, collection: str, dim: int = 1024) -> Dict:
"""Ensure VDB collection exists with correct schema."""
endpoint = vdb_plugin.get("endpoint")
if not endpoint:
return {"error": "VDB endpoint not configured"}
# Check if collection exists
list_result = call_plugin(endpoint, "/v1/listcollections", {})
existing = list_result.get("collections", [])
if collection in existing:
return {"status": "exists"}
# Create with proper field schema
fields = [
{"name": "id", "type": "str", "is_primary": True, "max_length": 256},
{"name": "text", "type": "str", "max_length": 65535},
{"name": "description", "type": "str", "max_length": 65535},
{"name": "type", "type": "str", "max_length": 128},
{"name": "embedding", "type": "fvector", "dim": dim}
]
result = call_plugin(endpoint, "/v1/createcollection", {
"colname": collection,
"fields": fields,
"description": "RAG knowledge collection",
"metric": "COSINE"
})
return result
def _store_in_vdb(items: List[Dict], embeddings: List[List[float]],
vdb_plugin: Dict, collection: str = "knowledge") -> Dict:
"""Store items with embeddings in VDB."""
endpoint = vdb_plugin.get("endpoint")
if not endpoint:
return {"error": "VDB endpoint not configured"}
# Ensure collection exists
ensure_result = _ensure_collection(vdb_plugin, collection)
if "error" in ensure_result:
return ensure_result
# Prepare batch insert - data must match schema fields
rows = []
for item, emb in zip(items, embeddings):
row = {
"id": item.get("id", str(uuid.uuid4())[:16]),
"text": item.get("text", ""),
"description": item.get("description", ""),
"type": item.get("type", "chunk"),
"embedding": emb
}
rows.append(row)
result = call_plugin(endpoint, "/v1/batchinsert", {
"colname": collection,
"data": rows
})
return result
def _store_in_graph(entities: List[Dict], relations: List[Dict],
graph_plugin: Dict, graph_name: str = "knowledge") -> Dict:
"""Store entities and relations in graph."""
endpoint = graph_plugin.get("endpoint")
if not endpoint or graph_plugin.get("type") == "none":
return {"status": "skipped", "reason": "graph disabled"}
added_nodes = 0
added_edges = 0
for entity in entities:
name = entity.get("name", "")
node_id = f"entity_{name}".replace(" ", "_")
result = call_plugin(endpoint, "/api/graph/add_node", {
"graph": graph_name,
"node_id": node_id,
"attrs": {
"name": name,
"type": entity.get("type", "unknown"),
"description": entity.get("description", "")
}
})
if "error" not in result:
added_nodes += 1
for rel in relations:
source = f"entity_{rel.get('source', '')}".replace(" ", "_")
target = f"entity_{rel.get('target', '')}".replace(" ", "_")
result = call_plugin(endpoint, "/api/graph/add_edge", {
"graph": graph_name,
"source": source,
"target": target,
"attrs": {
"relation": rel.get("relation", "related_to"),
"description": rel.get("description", "")
}
})
if "error" not in result:
added_edges += 1
# Save graph to disk
call_plugin(endpoint, "/api/graph/save", {"graph": graph_name})
return {"status": "ok", "nodes_added": added_nodes, "edges_added": added_edges}
def ingest(document: str, pipeline_name: str = "kg-rag-standard",
collection: str = "knowledge", graph_name: str = "knowledge",
llm_func=None) -> Dict:
"""Full ingest pipeline: chunk -> embed -> extract -> store."""
start_time = time.time()
pipeline = get_pipeline(pipeline_name)
plugins = pipeline.get("plugins", {})
# Step 1: Chunk
chunker_config = plugins.get("chunker", {})
chunks = chunk_document(document,
strategy=chunker_config.get("type", "recursive"),
chunk_size=chunker_config.get("chunk_size", 512),
overlap=chunker_config.get("overlap", 64))
chunk_texts = [c["text"] for c in chunks]
# Step 2: Embed
embedding_plugin = plugins.get("embedding", {})
embeddings = _get_embedding(chunk_texts, embedding_plugin)
if not embeddings:
return {"error": "Failed to get embeddings", "chunks": len(chunks)}
# Step 3: Store chunks + embeddings in VDB
vdb_plugin = plugins.get("vdb", {})
vdb_result = _store_in_vdb(chunks, embeddings, vdb_plugin, collection)
# Step 4: Extract entities and relations
extractor_config = plugins.get("extractor", {})
extraction = {"entities": [], "relations": []}
if extractor_config.get("type") != "none" and llm_func:
extraction = extract_entities_relations(document[:3000], llm_func)
# Step 5: Store in graph
graph_plugin = plugins.get("graph", {})
graph_result = _store_in_graph(
extraction["entities"],
extraction["relations"],
graph_plugin,
graph_name
)
else:
graph_result = {"status": "skipped"}
elapsed = round(time.time() - start_time, 3)
return {
"status": "SUCCEEDED",
"pipeline": pipeline_name,
"chunks": len(chunks),
"embeddings": len(embeddings),
"entities": len(extraction.get("entities", [])),
"relations": len(extraction.get("relations", [])),
"vdb_result": vdb_result,
"graph_result": graph_result,
"elapsed": elapsed
}
def search(query: str, pipeline_name: str = "kg-rag-standard",
collection: str = "knowledge", graph_name: str = "knowledge",
top_k: int = 5, llm_func=None) -> Dict:
"""Full search pipeline: embed -> retrieve -> rerank -> generate."""
start_time = time.time()
pipeline = get_pipeline(pipeline_name)
plugins = pipeline.get("plugins", {})
# Step 1: Embed query
embedding_plugin = plugins.get("embedding", {})
query_embeddings = _get_embedding([query], embedding_plugin)
if not query_embeddings:
return {"error": "Failed to embed query"}
query_embedding = query_embeddings[0]
# Step 2: Extract entities from query (for graph expansion)
extracted_entities = []
if llm_func:
quick_extraction = extract_entities_relations(query, llm_func)
extracted_entities = [e["name"] for e in quick_extraction.get("entities", [])]
# Step 3: Hybrid retrieval
results = hybrid_retrieve(
query_embedding,
extracted_entities,
pipeline,
collection,
graph_name
)
# Step 4: Rerank
reranker_plugin = plugins.get("reranker", {})
ranked_results = rerank_results(query, results, reranker_plugin, top_k)
# Step 5: Generate answer (optional)
answer = None
if llm_func and ranked_results:
context = "\n\n".join([
r.get("text", "") or r.get("description", "") or str(r)
for r in ranked_results[:top_k]
])
gen_prompt = f"""基于以下上下文回答问题。如果上下文中没有相关信息,请如实说明。
上下文
{context[:2000]}
问题{query}
请用中文回答"""
try:
answer = llm_func(gen_prompt)
except:
answer = None
elapsed = round(time.time() - start_time, 3)
return {
"status": "SUCCEEDED",
"pipeline": pipeline_name,
"query": query,
"results": ranked_results[:top_k],
"total_retrieved": len(results),
"answer": answer,
"extracted_entities": extracted_entities,
"elapsed": elapsed
}

0
plugins/__init__.py Normal file
View File

129
plugins/registry.py Normal file
View File

@ -0,0 +1,129 @@
# -*- coding:utf-8 -*-
"""Plugin registry - manages available RAG plugins and pipeline configs."""
import json
import os
import urllib.request
from typing import Dict, List, Optional
CONFIG_DIR = "/data/ymq/rag-pipeline/pipelines"
# Default pipeline config
DEFAULT_PIPELINE = {
"name": "kg-rag-standard",
"description": "标准知识图谱RAG",
"plugins": {
"embedding": {"type": "clip-vith14", "endpoint": "http://localhost:9086"},
"vdb": {"type": "vdb-milvus", "endpoint": "http://localhost:8886"},
"graph": {"type": "networkx", "endpoint": "http://localhost:9092"},
"llm": {"type": "harnessed", "endpoint": "internal"},
"reranker": {"type": "bge-reranker", "endpoint": "http://localhost:9090"},
"chunker": {"type": "recursive", "chunk_size": 512, "overlap": 64},
"extractor": {"type": "llm-structured"},
"retriever": {"type": "hybrid", "vector_top_k": 20, "graph_hops": 2}
}
}
LITE_PIPELINE = {
"name": "kg-rag-lite",
"description": "轻量版RAG纯向量无图谱",
"plugins": {
"embedding": {"type": "clip-vith14", "endpoint": "http://localhost:9086"},
"vdb": {"type": "vdb-milvus", "endpoint": "http://localhost:8886"},
"graph": {"type": "none"},
"llm": {"type": "harnessed", "endpoint": "internal"},
"reranker": {"type": "bge-reranker", "endpoint": "http://localhost:9090"},
"chunker": {"type": "recursive", "chunk_size": 512, "overlap": 64},
"retriever": {"type": "vector_only", "top_k": 10}
}
}
def _ensure_config_dir():
os.makedirs(CONFIG_DIR, exist_ok=True)
def list_pipelines() -> List[Dict]:
"""List all available pipeline configs."""
_ensure_config_dir()
pipelines = [DEFAULT_PIPELINE, LITE_PIPELINE]
# Load custom pipelines from disk
for f in os.listdir(CONFIG_DIR):
if f.endswith('.json'):
try:
with open(os.path.join(CONFIG_DIR, f), 'r') as fh:
pipelines.append(json.load(fh))
except:
pass
return pipelines
def get_pipeline(name: str) -> Optional[Dict]:
"""Get a pipeline config by name."""
for p in list_pipelines():
if p["name"] == name:
return p
return DEFAULT_PIPELINE
def save_pipeline(config: Dict) -> Dict:
"""Save a custom pipeline config."""
_ensure_config_dir()
filepath = os.path.join(CONFIG_DIR, f"{config['name']}.json")
with open(filepath, 'w') as f:
json.dump(config, f, ensure_ascii=False, indent=2)
return {"status": "ok", "filepath": filepath}
def list_plugins() -> Dict:
"""List all available plugins by capability."""
return {
"embedding": [
{"type": "clip-vith14", "model": "BAAI/CLIP-ViT-H-14", "dim": 1024, "endpoint": "http://localhost:9086", "status": "available"},
{"type": "bge-m3", "model": "BAAI/bge-m3", "dim": 1024, "status": "not_deployed"},
],
"vdb": [
{"type": "vdb-milvus", "backend": "Milvus", "endpoint": "http://localhost:8886", "status": "available"},
{"type": "qdrant", "backend": "Qdrant", "status": "not_deployed"},
],
"graph": [
{"type": "networkx", "backend": "NetworkX", "endpoint": "http://localhost:9092", "status": "available"},
{"type": "falkordb", "backend": "FalkorDB+Redis", "status": "blocked_redis_module"},
{"type": "none", "description": "Disable graph"},
],
"llm": [
{"type": "harnessed", "description": "harnessed_agent", "status": "available"},
],
"reranker": [
{"type": "bge-reranker", "model": "BAAI/bge-reranker-v2-m3", "endpoint": "http://localhost:9090", "status": "available"},
{"type": "none", "description": "Skip reranking"},
],
"chunker": [
{"type": "recursive", "description": "Recursive character splitter"},
{"type": "sentence", "description": "Sentence-based splitter"},
],
"extractor": [
{"type": "llm-structured", "description": "LLM-based entity/relation extraction"},
{"type": "none", "description": "Skip extraction"},
],
"retriever": [
{"type": "hybrid", "description": "Vector + Graph hybrid retrieval"},
{"type": "vector_only", "description": "Pure vector retrieval"},
],
"face": [
{"type": "insightface", "model": "buffalo_l", "dim": 512, "status": "available", "endpoint": "http://localhost:9091"},
]
}
def call_plugin(endpoint: str, path: str, data: Dict, timeout: int = 30) -> Dict:
"""Call a plugin endpoint via HTTP POST."""
url = f"{endpoint}{path}"
payload = json.dumps(data).encode('utf-8')
req = urllib.request.Request(url, data=payload, headers={'Content-Type': 'application/json'})
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
return json.loads(resp.read().decode('utf-8'))
except Exception as e:
return {"error": str(e)}

0
workers/__init__.py Normal file
View File