Compare commits
No commits in common. "main" and "master" have entirely different histories.
9
.gitignore
vendored
Normal file
9
.gitignore
vendored
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
ah.pid
|
||||||
|
nohup.out
|
||||||
|
__pycache__/
|
||||||
|
*.pyc
|
||||||
|
*__pycache__/
|
||||||
|
logs/
|
||||||
|
files/
|
||||||
|
data/
|
||||||
|
pipelines/*.json
|
||||||
127
README.md
127
README.md
@ -1,2 +1,127 @@
|
|||||||
# rag-pipeline
|
# RAG Pipeline Service
|
||||||
|
|
||||||
|
可插拔 RAG 编排服务,支持知识图谱增强的多模态检索。
|
||||||
|
|
||||||
|
## 架构
|
||||||
|
|
||||||
|
```
|
||||||
|
┌─────────────────┐
|
||||||
|
│ RAG Pipeline │
|
||||||
|
│ (9093 CPU) │
|
||||||
|
└────────┬────────┘
|
||||||
|
│
|
||||||
|
┌──────────────────┼──────────────────┐
|
||||||
|
│ │ │
|
||||||
|
┌─────▼─────┐ ┌─────▼─────┐ ┌─────▼─────┐
|
||||||
|
│ CLIP │ │ VDB │ │ Graph │
|
||||||
|
│ 9086 │ │ 8886 │ │ 9092 │
|
||||||
|
│ GPU 2 │ │ CPU │ │ CPU │
|
||||||
|
└───────────┘ └───────────┘ └───────────┘
|
||||||
|
│ │ │
|
||||||
|
┌─────▼─────┐ ┌─────▼─────┐
|
||||||
|
│BGE Rerank │ │ LLM │
|
||||||
|
│ 9090 │ │harnessed │
|
||||||
|
│ GPU 2 │ │ │
|
||||||
|
└───────────┘ └───────────┘
|
||||||
|
```
|
||||||
|
|
||||||
|
## 插件槽位
|
||||||
|
|
||||||
|
| 槽位 | 当前实现 | 可替换为 |
|
||||||
|
|------|---------|---------|
|
||||||
|
| embedding | CLIP-ViT-H/14 (9086) | BGE-M3, OpenAI embedding |
|
||||||
|
| vdb | Milvus VDB (8886) | Qdrant, Weaviate |
|
||||||
|
| graph | NetworkX (9092) | FalkorDB, Neo4j |
|
||||||
|
| reranker | BGE-Reranker (9090) | Cohere, LLM rerank |
|
||||||
|
| llm | harnessed_agent | 任意 LLM |
|
||||||
|
| chunker | recursive/sentence | 自定义分块策略 |
|
||||||
|
| extractor | LLM-structured | spaCy, GraphRAG |
|
||||||
|
| retriever | hybrid/vector_only | 自定义检索策略 |
|
||||||
|
| face | InsightFace (待部署) | FaceNet, DeepFace |
|
||||||
|
|
||||||
|
## Pipeline 配置
|
||||||
|
|
||||||
|
### kg-rag-standard(标准版)
|
||||||
|
- CLIP embedding + Milvus + NetworkX 图 + BGE Rerank
|
||||||
|
- 向量召回 + 图扩展 + RRF 融合 + 精排
|
||||||
|
|
||||||
|
### kg-rag-lite(轻量版)
|
||||||
|
- CLIP embedding + Milvus + BGE Rerank
|
||||||
|
- 纯向量检索,无图谱
|
||||||
|
|
||||||
|
## API
|
||||||
|
|
||||||
|
### GET /api/status
|
||||||
|
服务状态和可用插件列表。
|
||||||
|
|
||||||
|
### POST /api/ingest
|
||||||
|
入库文档。
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"document": "文本内容...",
|
||||||
|
"pipeline": "kg-rag-standard",
|
||||||
|
"collection": "knowledge",
|
||||||
|
"graph_name": "knowledge"
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
流程: 分块 → CLIP embedding → VDB 存储 → 实体抽取 → 图存储
|
||||||
|
|
||||||
|
### POST /api/search
|
||||||
|
混合检索。
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"query": "搜索问题",
|
||||||
|
"pipeline": "kg-rag-standard",
|
||||||
|
"collection": "knowledge",
|
||||||
|
"graph_name": "knowledge",
|
||||||
|
"top_k": 5
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
流程: CLIP embed → VDB 向量召回 → 图扩展 → RRF 融合 → BGE Rerank
|
||||||
|
|
||||||
|
### GET/POST /api/pipelines
|
||||||
|
管理 pipeline 配置。
|
||||||
|
|
||||||
|
```json
|
||||||
|
POST: 创建自定义 pipeline
|
||||||
|
GET: 列出所有 pipeline
|
||||||
|
GET ?name=xxx: 获取指定 pipeline
|
||||||
|
```
|
||||||
|
|
||||||
|
### GET /api/plugins
|
||||||
|
列出所有可用插件及状态。
|
||||||
|
|
||||||
|
## 部署
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cd /data/ymq/rag-pipeline
|
||||||
|
bash build.sh deploy
|
||||||
|
bash build.sh stop
|
||||||
|
bash build.sh status
|
||||||
|
```
|
||||||
|
|
||||||
|
## 端到端测试
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# 1. 入库
|
||||||
|
curl -X POST http://localhost:9093/api/ingest \
|
||||||
|
-H "Content-Type: application/json" \
|
||||||
|
-d '{"document":"张三在ABC公司担任技术总监,他和李四是同事关系。","pipeline":"kg-rag-standard"}'
|
||||||
|
|
||||||
|
# 2. 检索
|
||||||
|
curl -X POST http://localhost:9093/api/search \
|
||||||
|
-H "Content-Type: application/json" \
|
||||||
|
-d '{"query":"张三在哪家公司工作?","pipeline":"kg-rag-standard"}'
|
||||||
|
```
|
||||||
|
|
||||||
|
## 端口
|
||||||
|
|
||||||
|
9093 (CPU only)
|
||||||
|
|
||||||
|
## Git
|
||||||
|
|
||||||
|
git@git.opencomputing.cn:yumoqing/rag-pipeline.git
|
||||||
|
|||||||
6
ah.py
Normal file
6
ah.py
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
# -*- coding:utf-8 -*-
|
||||||
|
from ahserver.webapp import webapp
|
||||||
|
from init import load_rag_pipeline
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
webapp(load_rag_pipeline)
|
||||||
55
build.sh
Executable file
55
build.sh
Executable file
@ -0,0 +1,55 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
# RAG Pipeline Service
|
||||||
|
set -e
|
||||||
|
cd "$(dirname "$0")"
|
||||||
|
|
||||||
|
SERVICE_NAME="rag-pipeline"
|
||||||
|
PORT=9093
|
||||||
|
PY=/data/ymq/wan22-service/py3/bin/python
|
||||||
|
action="${1:-status}"
|
||||||
|
|
||||||
|
case "$action" in
|
||||||
|
deploy|update)
|
||||||
|
echo "=== $SERVICE_NAME Deploy (CPU, port $PORT) ==="
|
||||||
|
if [ -f ah.pid ] && kill -0 $(cat ah.pid) 2>/dev/null; then
|
||||||
|
kill $(cat ah.pid) 2>/dev/null || true; sleep 2
|
||||||
|
fi
|
||||||
|
if [ -d .git ] && [ -f .git/HEAD ]; then
|
||||||
|
git pull origin master 2>/dev/null || true
|
||||||
|
fi
|
||||||
|
mkdir -p logs files wwwroot pipelines data
|
||||||
|
export PYTHONPATH="$(pwd)"
|
||||||
|
nohup $PY ah.py > nohup.out 2>&1 &
|
||||||
|
echo $! > ah.pid
|
||||||
|
echo "Started PID $(cat ah.pid) on port $PORT"
|
||||||
|
sleep 3
|
||||||
|
if curl -s http://localhost:$PORT/api/status > /dev/null 2>&1; then
|
||||||
|
echo "Service healthy"
|
||||||
|
else
|
||||||
|
echo "WARNING: not responding, check nohup.out"
|
||||||
|
tail -30 nohup.out
|
||||||
|
fi
|
||||||
|
;;
|
||||||
|
stop)
|
||||||
|
if [ -f ah.pid ]; then
|
||||||
|
kill $(cat ah.pid) 2>/dev/null || true; rm -f ah.pid; echo "Stopped"
|
||||||
|
else echo "Not running"; fi
|
||||||
|
;;
|
||||||
|
start)
|
||||||
|
mkdir -p logs files wwwroot pipelines data
|
||||||
|
export PYTHONPATH="$(pwd)"
|
||||||
|
nohup $PY ah.py > nohup.out 2>&1 &
|
||||||
|
echo $! > ah.pid; echo "Started PID $(cat ah.pid)"
|
||||||
|
;;
|
||||||
|
status)
|
||||||
|
echo "=== $SERVICE_NAME Status ==="
|
||||||
|
if [ -f ah.pid ] && kill -0 $(cat ah.pid) 2>/dev/null; then
|
||||||
|
echo "Process: running (PID $(cat ah.pid))"
|
||||||
|
else echo "Process: not running"; fi
|
||||||
|
echo "Port: $PORT"
|
||||||
|
if curl -s --max-time 3 http://localhost:$PORT/api/status > /dev/null 2>&1; then
|
||||||
|
echo "HTTP: OK"
|
||||||
|
else echo "HTTP: not responding"; fi
|
||||||
|
;;
|
||||||
|
*) echo "Usage: $0 {deploy|update|stop|start|status}"; exit 1 ;;
|
||||||
|
esac
|
||||||
28
conf/config.json
Normal file
28
conf/config.json
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
{
|
||||||
|
"password_key": "RagPipeline2026Key",
|
||||||
|
"filesroot": "$[workdir]$/files",
|
||||||
|
"logger": {
|
||||||
|
"name": "rag-pipeline",
|
||||||
|
"levelname": "info",
|
||||||
|
"logfile": "$[workdir]$/logs/rag-pipeline.log"
|
||||||
|
},
|
||||||
|
"website": {
|
||||||
|
"paths": [["$[workdir]$/wwwroot", ""]],
|
||||||
|
"client_max_size": 52428800,
|
||||||
|
"host": "0.0.0.0",
|
||||||
|
"port": 9093,
|
||||||
|
"coding": "utf-8",
|
||||||
|
"indexes": ["index.html"],
|
||||||
|
"startswiths": [
|
||||||
|
{"leading": "/api/status", "registerfunction": "status"},
|
||||||
|
{"leading": "/api/ingest", "registerfunction": "ingest"},
|
||||||
|
{"leading": "/api/search", "registerfunction": "search"},
|
||||||
|
{"leading": "/api/pipelines", "registerfunction": "pipelines"},
|
||||||
|
{"leading": "/api/plugins", "registerfunction": "plugins"}
|
||||||
|
],
|
||||||
|
"processors": [
|
||||||
|
[".tmpl", "tmpl"], [".app", "app"], [".ui", "bui"],
|
||||||
|
[".dspy", "dspy"], [".md", "md"]
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
0
core/__init__.py
Normal file
0
core/__init__.py
Normal file
73
core/chunker.py
Normal file
73
core/chunker.py
Normal file
@ -0,0 +1,73 @@
|
|||||||
|
# -*- coding:utf-8 -*-
|
||||||
|
"""Document chunking strategies."""
|
||||||
|
import re
|
||||||
|
from typing import List, Dict
|
||||||
|
|
||||||
|
|
||||||
|
def recursive_chunk(text: str, chunk_size: int = 512, overlap: int = 64) -> List[Dict]:
|
||||||
|
"""Split text into chunks with overlap."""
|
||||||
|
if not text or len(text) <= chunk_size:
|
||||||
|
return [{"id": "chunk_0", "text": text, "index": 0}]
|
||||||
|
|
||||||
|
chunks = []
|
||||||
|
start = 0
|
||||||
|
idx = 0
|
||||||
|
|
||||||
|
while start < len(text):
|
||||||
|
end = start + chunk_size
|
||||||
|
|
||||||
|
# Try to break at sentence boundary
|
||||||
|
if end < len(text):
|
||||||
|
# Look for sentence ending in the last 100 chars
|
||||||
|
search_start = max(start + chunk_size - 100, start)
|
||||||
|
search_region = text[search_start:end]
|
||||||
|
|
||||||
|
for sep in ['。', '!', '?', '. ', '! ', '? ', '\n\n', '\n', ';', ';']:
|
||||||
|
last_sep = search_region.rfind(sep)
|
||||||
|
if last_sep >= 0:
|
||||||
|
end = search_start + last_sep + len(sep)
|
||||||
|
break
|
||||||
|
|
||||||
|
chunk_text = text[start:end].strip()
|
||||||
|
if chunk_text:
|
||||||
|
chunks.append({
|
||||||
|
"id": f"chunk_{idx}",
|
||||||
|
"text": chunk_text,
|
||||||
|
"index": idx
|
||||||
|
})
|
||||||
|
idx += 1
|
||||||
|
|
||||||
|
# Move forward with overlap
|
||||||
|
start = end - overlap
|
||||||
|
if start >= len(text):
|
||||||
|
break
|
||||||
|
|
||||||
|
return chunks
|
||||||
|
|
||||||
|
|
||||||
|
def sentence_chunk(text: str) -> List[Dict]:
|
||||||
|
"""Split text by sentences."""
|
||||||
|
# Split on Chinese/English sentence endings
|
||||||
|
sentences = re.split(r'(?<=[。!?.!?])\s*', text)
|
||||||
|
|
||||||
|
chunks = []
|
||||||
|
for idx, sent in enumerate(sentences):
|
||||||
|
sent = sent.strip()
|
||||||
|
if sent:
|
||||||
|
chunks.append({
|
||||||
|
"id": f"chunk_{idx}",
|
||||||
|
"text": sent,
|
||||||
|
"index": idx
|
||||||
|
})
|
||||||
|
|
||||||
|
return chunks
|
||||||
|
|
||||||
|
|
||||||
|
def chunk_document(text: str, strategy: str = "recursive", **kwargs) -> List[Dict]:
|
||||||
|
"""Chunk a document using the specified strategy."""
|
||||||
|
if strategy == "sentence":
|
||||||
|
return sentence_chunk(text)
|
||||||
|
else:
|
||||||
|
return recursive_chunk(text,
|
||||||
|
chunk_size=kwargs.get("chunk_size", 512),
|
||||||
|
overlap=kwargs.get("overlap", 64))
|
||||||
79
core/extractor.py
Normal file
79
core/extractor.py
Normal file
@ -0,0 +1,79 @@
|
|||||||
|
# -*- coding:utf-8 -*-
|
||||||
|
"""Entity and relation extraction via LLM."""
|
||||||
|
import json
|
||||||
|
import re
|
||||||
|
from typing import List, Dict
|
||||||
|
|
||||||
|
|
||||||
|
EXTRACTION_PROMPT = """你是一个知识图谱抽取专家。请从以下文本中抽取实体和关系。
|
||||||
|
|
||||||
|
文本:
|
||||||
|
{text}
|
||||||
|
|
||||||
|
请按以下JSON格式返回:
|
||||||
|
{{
|
||||||
|
"entities": [
|
||||||
|
{{"name": "实体名", "type": "实体类型(person/company/product/concept/event/location)", "description": "简要描述"}}
|
||||||
|
],
|
||||||
|
"relations": [
|
||||||
|
{{"source": "源实体名", "target": "目标实体名", "relation": "关系类型", "description": "关系描述"}}
|
||||||
|
]
|
||||||
|
}}
|
||||||
|
|
||||||
|
只返回JSON,不要其他内容。"""
|
||||||
|
|
||||||
|
|
||||||
|
def extract_entities_relations(text: str, llm_func=None) -> Dict:
|
||||||
|
"""Extract entities and relations from text using LLM.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
text: Input text
|
||||||
|
llm_func: Function that takes a prompt and returns LLM response text.
|
||||||
|
If None, returns empty result.
|
||||||
|
"""
|
||||||
|
if not llm_func:
|
||||||
|
return {"entities": [], "relations": []}
|
||||||
|
|
||||||
|
prompt = EXTRACTION_PROMPT.format(text=text[:2000]) # Limit text length
|
||||||
|
|
||||||
|
try:
|
||||||
|
response = llm_func(prompt)
|
||||||
|
|
||||||
|
# Try to extract JSON from response
|
||||||
|
# Remove markdown code blocks if present
|
||||||
|
response = response.strip()
|
||||||
|
if response.startswith('```'):
|
||||||
|
response = re.sub(r'^```(?:json)?\s*', '', response)
|
||||||
|
response = re.sub(r'\s*```$', '', response)
|
||||||
|
|
||||||
|
result = json.loads(response)
|
||||||
|
|
||||||
|
# Validate structure
|
||||||
|
entities = result.get("entities", [])
|
||||||
|
relations = result.get("relations", [])
|
||||||
|
|
||||||
|
# Basic validation
|
||||||
|
valid_entities = []
|
||||||
|
for e in entities:
|
||||||
|
if isinstance(e, dict) and "name" in e:
|
||||||
|
valid_entities.append({
|
||||||
|
"name": e["name"],
|
||||||
|
"type": e.get("type", "unknown"),
|
||||||
|
"description": e.get("description", "")
|
||||||
|
})
|
||||||
|
|
||||||
|
valid_relations = []
|
||||||
|
entity_names = {e["name"] for e in valid_entities}
|
||||||
|
for r in relations:
|
||||||
|
if isinstance(r, dict) and "source" in r and "target" in r:
|
||||||
|
valid_relations.append({
|
||||||
|
"source": r["source"],
|
||||||
|
"target": r["target"],
|
||||||
|
"relation": r.get("relation", "related_to"),
|
||||||
|
"description": r.get("description", "")
|
||||||
|
})
|
||||||
|
|
||||||
|
return {"entities": valid_entities, "relations": valid_relations}
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
return {"entities": [], "relations": [], "error": str(e)}
|
||||||
163
core/retriever.py
Normal file
163
core/retriever.py
Normal file
@ -0,0 +1,163 @@
|
|||||||
|
# -*- coding:utf-8 -*-
|
||||||
|
"""Hybrid retrieval: vector search + graph expansion + RRF fusion."""
|
||||||
|
from typing import List, Dict
|
||||||
|
|
||||||
|
def rrf_fusion(results_list: List[List[Dict]], k: int = 60) -> List[Dict]:
|
||||||
|
"""Reciprocal Rank Fusion - merge multiple ranked lists."""
|
||||||
|
scores = {}
|
||||||
|
|
||||||
|
for results in results_list:
|
||||||
|
for rank, item in enumerate(results):
|
||||||
|
doc_id = item.get("id", item.get("node_id", str(rank)))
|
||||||
|
rrf_score = 1.0 / (k + rank + 1)
|
||||||
|
|
||||||
|
if doc_id not in scores:
|
||||||
|
scores[doc_id] = {"item": item, "score": 0}
|
||||||
|
scores[doc_id]["score"] += rrf_score
|
||||||
|
|
||||||
|
sorted_items = sorted(scores.values(), key=lambda x: -x["score"])
|
||||||
|
|
||||||
|
for item in sorted_items:
|
||||||
|
item["item"]["rrf_score"] = round(item["score"], 6)
|
||||||
|
|
||||||
|
return [item["item"] for item in sorted_items]
|
||||||
|
|
||||||
|
|
||||||
|
def vector_search(query_embedding: List[float], vdb_plugin: Dict,
|
||||||
|
collection: str = "knowledge", top_k: int = 20) -> List[Dict]:
|
||||||
|
"""Search VDB for similar vectors."""
|
||||||
|
from plugins.registry import call_plugin
|
||||||
|
|
||||||
|
endpoint = vdb_plugin.get("endpoint")
|
||||||
|
if not endpoint:
|
||||||
|
return []
|
||||||
|
|
||||||
|
result = call_plugin(endpoint, "/v1/query", {
|
||||||
|
"colname": collection,
|
||||||
|
"vector": query_embedding,
|
||||||
|
"pagerows": top_k,
|
||||||
|
"page": 1
|
||||||
|
})
|
||||||
|
|
||||||
|
if "error" in result or result.get("status") != "SUCCEEDED":
|
||||||
|
return []
|
||||||
|
|
||||||
|
# VDB returns {"status": "SUCCEEDED", "data": {"rows": [...]}}
|
||||||
|
data = result.get("data", {})
|
||||||
|
return data.get("rows", []) if isinstance(data, dict) else []
|
||||||
|
|
||||||
|
|
||||||
|
def graph_expand(entity_ids: List[str], graph_plugin: Dict,
|
||||||
|
graph_name: str = "knowledge", hops: int = 2) -> List[Dict]:
|
||||||
|
"""Expand entities via graph neighbors."""
|
||||||
|
from plugins.registry import call_plugin
|
||||||
|
|
||||||
|
endpoint = graph_plugin.get("endpoint")
|
||||||
|
if not endpoint or graph_plugin.get("type") == "none":
|
||||||
|
return []
|
||||||
|
|
||||||
|
all_neighbors = []
|
||||||
|
seen = set()
|
||||||
|
|
||||||
|
for entity_id in entity_ids:
|
||||||
|
# Graph uses entity_{name} format from ingest
|
||||||
|
node_id = f"entity_{entity_id}".replace(" ", "_")
|
||||||
|
result = call_plugin(endpoint, "/api/graph/neighbors", {
|
||||||
|
"graph": graph_name,
|
||||||
|
"node_id": node_id,
|
||||||
|
"depth": hops
|
||||||
|
})
|
||||||
|
|
||||||
|
if "error" not in result:
|
||||||
|
for neighbor in result.get("neighbors", []):
|
||||||
|
nid = neighbor.get("node_id", "")
|
||||||
|
if nid and nid not in seen:
|
||||||
|
seen.add(nid)
|
||||||
|
neighbor["source_entity"] = entity_id
|
||||||
|
neighbor["id"] = nid
|
||||||
|
all_neighbors.append(neighbor)
|
||||||
|
|
||||||
|
return all_neighbors
|
||||||
|
|
||||||
|
|
||||||
|
def hybrid_retrieve(query_embedding: List[float],
|
||||||
|
extracted_entities: List[str],
|
||||||
|
pipeline: Dict,
|
||||||
|
collection: str = "knowledge",
|
||||||
|
graph_name: str = "knowledge") -> List[Dict]:
|
||||||
|
"""Hybrid retrieval: vector + graph + RRF fusion."""
|
||||||
|
plugins = pipeline.get("plugins", {})
|
||||||
|
retriever_config = plugins.get("retriever", {})
|
||||||
|
|
||||||
|
vector_top_k = retriever_config.get("vector_top_k", 20)
|
||||||
|
graph_hops = retriever_config.get("graph_hops", 2)
|
||||||
|
retriever_type = retriever_config.get("type", "hybrid")
|
||||||
|
|
||||||
|
results_lists = []
|
||||||
|
|
||||||
|
# Vector search
|
||||||
|
vdb_plugin = plugins.get("vdb", {})
|
||||||
|
vector_results = vector_search(query_embedding, vdb_plugin, collection, vector_top_k)
|
||||||
|
if vector_results:
|
||||||
|
results_lists.append(vector_results)
|
||||||
|
|
||||||
|
# Graph expansion (only if hybrid mode and graph enabled)
|
||||||
|
if retriever_type == "hybrid" and extracted_entities:
|
||||||
|
graph_plugin = plugins.get("graph", {})
|
||||||
|
graph_results = graph_expand(extracted_entities, graph_plugin, graph_name, graph_hops)
|
||||||
|
if graph_results:
|
||||||
|
results_lists.append(graph_results)
|
||||||
|
|
||||||
|
if not results_lists:
|
||||||
|
return []
|
||||||
|
|
||||||
|
if len(results_lists) == 1:
|
||||||
|
return results_lists[0]
|
||||||
|
|
||||||
|
return rrf_fusion(results_lists)
|
||||||
|
|
||||||
|
|
||||||
|
def rerank_results(query: str, results: List[Dict], reranker_plugin: Dict,
|
||||||
|
top_k: int = 5) -> List[Dict]:
|
||||||
|
"""Rerank results using BGE-Reranker."""
|
||||||
|
from plugins.registry import call_plugin
|
||||||
|
|
||||||
|
if not results or reranker_plugin.get("type") == "none":
|
||||||
|
return results[:top_k]
|
||||||
|
|
||||||
|
endpoint = reranker_plugin.get("endpoint")
|
||||||
|
if not endpoint:
|
||||||
|
return results[:top_k]
|
||||||
|
|
||||||
|
# Prepare documents for reranking
|
||||||
|
documents = []
|
||||||
|
for r in results:
|
||||||
|
doc = r.get("text", "") or r.get("description", "") or r.get("transcript", "")
|
||||||
|
if not doc:
|
||||||
|
doc = str(r.get("name", "")) + " " + str(r.get("attrs", {}))
|
||||||
|
documents.append(doc)
|
||||||
|
|
||||||
|
if not documents:
|
||||||
|
return results[:top_k]
|
||||||
|
|
||||||
|
rerank_result = call_plugin(endpoint, "/api/rerank", {
|
||||||
|
"query": query,
|
||||||
|
"documents": documents,
|
||||||
|
"top_k": top_k
|
||||||
|
})
|
||||||
|
|
||||||
|
if "error" in rerank_result:
|
||||||
|
return results[:top_k]
|
||||||
|
|
||||||
|
# Map reranked docs back to original results
|
||||||
|
ranked = []
|
||||||
|
for rd in rerank_result.get("ranked_docs", []):
|
||||||
|
doc_text = rd.get("doc", "")
|
||||||
|
for orig in results:
|
||||||
|
orig_text = orig.get("text", "") or orig.get("description", "") or str(orig.get("name", ""))
|
||||||
|
if doc_text == orig_text or doc_text in orig_text or orig_text in doc_text:
|
||||||
|
orig["rerank_score"] = rd.get("score", 0)
|
||||||
|
ranked.append(orig)
|
||||||
|
break
|
||||||
|
|
||||||
|
return ranked if ranked else results[:top_k]
|
||||||
126
init.py
Normal file
126
init.py
Normal file
@ -0,0 +1,126 @@
|
|||||||
|
# -*- coding:utf-8 -*-
|
||||||
|
"""RAG Pipeline API endpoints."""
|
||||||
|
from traceback import format_exc
|
||||||
|
from ahserver.serverenv import ServerEnv
|
||||||
|
from appPublic.registerfunction import RegisterFunction
|
||||||
|
from appPublic.log import exception
|
||||||
|
import json
|
||||||
|
|
||||||
|
|
||||||
|
async def status_handler(request, params_kw, *args, **kwargs):
|
||||||
|
import sys, os
|
||||||
|
sys.path.insert(0, os.getcwd())
|
||||||
|
from plugins.registry import list_pipelines, list_plugins
|
||||||
|
|
||||||
|
pipelines = list_pipelines()
|
||||||
|
plugins = list_plugins()
|
||||||
|
|
||||||
|
# Check service availability
|
||||||
|
services = {}
|
||||||
|
for cap, plugin_list in plugins.items():
|
||||||
|
for p in plugin_list:
|
||||||
|
if p.get("status") == "available":
|
||||||
|
services[cap] = p.get("type", "unknown")
|
||||||
|
|
||||||
|
return json.dumps({
|
||||||
|
"service": "rag-pipeline",
|
||||||
|
"port": 9093,
|
||||||
|
"active_services": services,
|
||||||
|
"pipelines": [p["name"] for p in pipelines],
|
||||||
|
"endpoints": ["/api/status", "/api/ingest", "/api/search", "/api/pipelines", "/api/plugins"]
|
||||||
|
}, indent=2, ensure_ascii=False)
|
||||||
|
|
||||||
|
|
||||||
|
async def ingest_handler(request, params_kw, *args, **kwargs):
|
||||||
|
import sys, os
|
||||||
|
sys.path.insert(0, os.getcwd())
|
||||||
|
from pipeline import ingest
|
||||||
|
|
||||||
|
try:
|
||||||
|
document = params_kw.get("document", "")
|
||||||
|
if not document:
|
||||||
|
return json.dumps({"error": "document text required"})
|
||||||
|
|
||||||
|
pipeline_name = params_kw.get("pipeline", "kg-rag-standard")
|
||||||
|
collection = params_kw.get("collection", "knowledge")
|
||||||
|
graph_name = params_kw.get("graph_name", "knowledge")
|
||||||
|
|
||||||
|
# Optional LLM for extraction (not used in basic mode)
|
||||||
|
# llm_func = params_kw.get("llm_func") # Can't pass functions via HTTP
|
||||||
|
|
||||||
|
result = ingest(document, pipeline_name, collection, graph_name, llm_func=None)
|
||||||
|
return json.dumps(result, ensure_ascii=False)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
exception(f"{e}, {format_exc()}")
|
||||||
|
return json.dumps({"error": str(e)})
|
||||||
|
|
||||||
|
|
||||||
|
async def search_handler(request, params_kw, *args, **kwargs):
|
||||||
|
import sys, os
|
||||||
|
sys.path.insert(0, os.getcwd())
|
||||||
|
from pipeline import search
|
||||||
|
|
||||||
|
try:
|
||||||
|
query = params_kw.get("query", "")
|
||||||
|
if not query:
|
||||||
|
return json.dumps({"error": "query text required"})
|
||||||
|
|
||||||
|
pipeline_name = params_kw.get("pipeline", "kg-rag-standard")
|
||||||
|
collection = params_kw.get("collection", "knowledge")
|
||||||
|
graph_name = params_kw.get("graph_name", "knowledge")
|
||||||
|
top_k = int(params_kw.get("top_k", 5))
|
||||||
|
|
||||||
|
result = search(query, pipeline_name, collection, graph_name, top_k, llm_func=None)
|
||||||
|
return json.dumps(result, ensure_ascii=False)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
exception(f"{e}, {format_exc()}")
|
||||||
|
return json.dumps({"error": str(e)})
|
||||||
|
|
||||||
|
|
||||||
|
async def pipelines_handler(request, params_kw, *args, **kwargs):
|
||||||
|
import sys, os
|
||||||
|
sys.path.insert(0, os.getcwd())
|
||||||
|
from plugins.registry import list_pipelines, get_pipeline, save_pipeline
|
||||||
|
|
||||||
|
method = request.method
|
||||||
|
|
||||||
|
if method == "POST":
|
||||||
|
# Create/update pipeline
|
||||||
|
config = params_kw
|
||||||
|
if "name" not in config:
|
||||||
|
return json.dumps({"error": "pipeline config with 'name' required"})
|
||||||
|
result = save_pipeline(config)
|
||||||
|
return json.dumps({"status": "SUCCEEDED", **result}, ensure_ascii=False)
|
||||||
|
else:
|
||||||
|
# List pipelines
|
||||||
|
name = params_kw.get("name")
|
||||||
|
if name:
|
||||||
|
pipeline = get_pipeline(name)
|
||||||
|
if pipeline:
|
||||||
|
return json.dumps({"status": "SUCCEEDED", "pipeline": pipeline}, indent=2, ensure_ascii=False)
|
||||||
|
return json.dumps({"error": f"Pipeline '{name}' not found"})
|
||||||
|
|
||||||
|
pipelines = list_pipelines()
|
||||||
|
return json.dumps({"status": "SUCCEEDED", "pipelines": pipelines}, indent=2, ensure_ascii=False)
|
||||||
|
|
||||||
|
|
||||||
|
async def plugins_handler(request, params_kw, *args, **kwargs):
|
||||||
|
import sys, os
|
||||||
|
sys.path.insert(0, os.getcwd())
|
||||||
|
from plugins.registry import list_plugins
|
||||||
|
|
||||||
|
plugins = list_plugins()
|
||||||
|
return json.dumps({"status": "SUCCEEDED", "plugins": plugins}, indent=2, ensure_ascii=False)
|
||||||
|
|
||||||
|
|
||||||
|
def load_rag_pipeline():
|
||||||
|
"""Register API handlers"""
|
||||||
|
env = ServerEnv()
|
||||||
|
rf = RegisterFunction()
|
||||||
|
rf.register("status", status_handler)
|
||||||
|
rf.register("ingest", ingest_handler)
|
||||||
|
rf.register("search", search_handler)
|
||||||
|
rf.register("pipelines", pipelines_handler)
|
||||||
|
rf.register("plugins", plugins_handler)
|
||||||
270
pipeline.py
Normal file
270
pipeline.py
Normal file
@ -0,0 +1,270 @@
|
|||||||
|
# -*- coding:utf-8 -*-
|
||||||
|
"""RAG Pipeline orchestration - ingest and search workflows."""
|
||||||
|
import json
|
||||||
|
import time
|
||||||
|
import uuid
|
||||||
|
from typing import Dict, List, Optional
|
||||||
|
from traceback import format_exc
|
||||||
|
|
||||||
|
from plugins.registry import get_pipeline, call_plugin
|
||||||
|
from core.chunker import chunk_document
|
||||||
|
from core.extractor import extract_entities_relations
|
||||||
|
from core.retriever import hybrid_retrieve, rerank_results
|
||||||
|
|
||||||
|
|
||||||
|
def _get_embedding(texts: List[str], embedding_plugin: Dict) -> List[List[float]]:
|
||||||
|
"""Get text embeddings from CLIP service."""
|
||||||
|
endpoint = embedding_plugin.get("endpoint")
|
||||||
|
if not endpoint:
|
||||||
|
return []
|
||||||
|
|
||||||
|
result = call_plugin(endpoint, "/api/text", {"texts": texts})
|
||||||
|
if "error" in result:
|
||||||
|
return []
|
||||||
|
|
||||||
|
return result.get("embeddings", [])
|
||||||
|
|
||||||
|
|
||||||
|
def _ensure_collection(vdb_plugin: Dict, collection: str, dim: int = 1024) -> Dict:
|
||||||
|
"""Ensure VDB collection exists with correct schema."""
|
||||||
|
endpoint = vdb_plugin.get("endpoint")
|
||||||
|
if not endpoint:
|
||||||
|
return {"error": "VDB endpoint not configured"}
|
||||||
|
|
||||||
|
# Check if collection exists
|
||||||
|
list_result = call_plugin(endpoint, "/v1/listcollections", {})
|
||||||
|
existing = list_result.get("collections", [])
|
||||||
|
|
||||||
|
if collection in existing:
|
||||||
|
return {"status": "exists"}
|
||||||
|
|
||||||
|
# Create with proper field schema
|
||||||
|
fields = [
|
||||||
|
{"name": "id", "type": "str", "is_primary": True, "max_length": 256},
|
||||||
|
{"name": "text", "type": "str", "max_length": 65535},
|
||||||
|
{"name": "description", "type": "str", "max_length": 65535},
|
||||||
|
{"name": "type", "type": "str", "max_length": 128},
|
||||||
|
{"name": "embedding", "type": "fvector", "dim": dim}
|
||||||
|
]
|
||||||
|
|
||||||
|
result = call_plugin(endpoint, "/v1/createcollection", {
|
||||||
|
"colname": collection,
|
||||||
|
"fields": fields,
|
||||||
|
"description": "RAG knowledge collection",
|
||||||
|
"metric": "COSINE"
|
||||||
|
})
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def _store_in_vdb(items: List[Dict], embeddings: List[List[float]],
|
||||||
|
vdb_plugin: Dict, collection: str = "knowledge") -> Dict:
|
||||||
|
"""Store items with embeddings in VDB."""
|
||||||
|
endpoint = vdb_plugin.get("endpoint")
|
||||||
|
if not endpoint:
|
||||||
|
return {"error": "VDB endpoint not configured"}
|
||||||
|
|
||||||
|
# Ensure collection exists
|
||||||
|
ensure_result = _ensure_collection(vdb_plugin, collection)
|
||||||
|
if "error" in ensure_result:
|
||||||
|
return ensure_result
|
||||||
|
|
||||||
|
# Prepare batch insert - data must match schema fields
|
||||||
|
rows = []
|
||||||
|
for item, emb in zip(items, embeddings):
|
||||||
|
row = {
|
||||||
|
"id": item.get("id", str(uuid.uuid4())[:16]),
|
||||||
|
"text": item.get("text", ""),
|
||||||
|
"description": item.get("description", ""),
|
||||||
|
"type": item.get("type", "chunk"),
|
||||||
|
"embedding": emb
|
||||||
|
}
|
||||||
|
rows.append(row)
|
||||||
|
|
||||||
|
result = call_plugin(endpoint, "/v1/batchinsert", {
|
||||||
|
"colname": collection,
|
||||||
|
"data": rows
|
||||||
|
})
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def _store_in_graph(entities: List[Dict], relations: List[Dict],
|
||||||
|
graph_plugin: Dict, graph_name: str = "knowledge") -> Dict:
|
||||||
|
"""Store entities and relations in graph."""
|
||||||
|
endpoint = graph_plugin.get("endpoint")
|
||||||
|
if not endpoint or graph_plugin.get("type") == "none":
|
||||||
|
return {"status": "skipped", "reason": "graph disabled"}
|
||||||
|
|
||||||
|
added_nodes = 0
|
||||||
|
added_edges = 0
|
||||||
|
|
||||||
|
for entity in entities:
|
||||||
|
name = entity.get("name", "")
|
||||||
|
node_id = f"entity_{name}".replace(" ", "_")
|
||||||
|
result = call_plugin(endpoint, "/api/graph/add_node", {
|
||||||
|
"graph": graph_name,
|
||||||
|
"node_id": node_id,
|
||||||
|
"attrs": {
|
||||||
|
"name": name,
|
||||||
|
"type": entity.get("type", "unknown"),
|
||||||
|
"description": entity.get("description", "")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
if "error" not in result:
|
||||||
|
added_nodes += 1
|
||||||
|
|
||||||
|
for rel in relations:
|
||||||
|
source = f"entity_{rel.get('source', '')}".replace(" ", "_")
|
||||||
|
target = f"entity_{rel.get('target', '')}".replace(" ", "_")
|
||||||
|
result = call_plugin(endpoint, "/api/graph/add_edge", {
|
||||||
|
"graph": graph_name,
|
||||||
|
"source": source,
|
||||||
|
"target": target,
|
||||||
|
"attrs": {
|
||||||
|
"relation": rel.get("relation", "related_to"),
|
||||||
|
"description": rel.get("description", "")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
if "error" not in result:
|
||||||
|
added_edges += 1
|
||||||
|
|
||||||
|
# Save graph to disk
|
||||||
|
call_plugin(endpoint, "/api/graph/save", {"graph": graph_name})
|
||||||
|
|
||||||
|
return {"status": "ok", "nodes_added": added_nodes, "edges_added": added_edges}
|
||||||
|
|
||||||
|
|
||||||
|
def ingest(document: str, pipeline_name: str = "kg-rag-standard",
|
||||||
|
collection: str = "knowledge", graph_name: str = "knowledge",
|
||||||
|
llm_func=None) -> Dict:
|
||||||
|
"""Full ingest pipeline: chunk -> embed -> extract -> store."""
|
||||||
|
start_time = time.time()
|
||||||
|
|
||||||
|
pipeline = get_pipeline(pipeline_name)
|
||||||
|
plugins = pipeline.get("plugins", {})
|
||||||
|
|
||||||
|
# Step 1: Chunk
|
||||||
|
chunker_config = plugins.get("chunker", {})
|
||||||
|
chunks = chunk_document(document,
|
||||||
|
strategy=chunker_config.get("type", "recursive"),
|
||||||
|
chunk_size=chunker_config.get("chunk_size", 512),
|
||||||
|
overlap=chunker_config.get("overlap", 64))
|
||||||
|
|
||||||
|
chunk_texts = [c["text"] for c in chunks]
|
||||||
|
|
||||||
|
# Step 2: Embed
|
||||||
|
embedding_plugin = plugins.get("embedding", {})
|
||||||
|
embeddings = _get_embedding(chunk_texts, embedding_plugin)
|
||||||
|
|
||||||
|
if not embeddings:
|
||||||
|
return {"error": "Failed to get embeddings", "chunks": len(chunks)}
|
||||||
|
|
||||||
|
# Step 3: Store chunks + embeddings in VDB
|
||||||
|
vdb_plugin = plugins.get("vdb", {})
|
||||||
|
vdb_result = _store_in_vdb(chunks, embeddings, vdb_plugin, collection)
|
||||||
|
|
||||||
|
# Step 4: Extract entities and relations
|
||||||
|
extractor_config = plugins.get("extractor", {})
|
||||||
|
extraction = {"entities": [], "relations": []}
|
||||||
|
|
||||||
|
if extractor_config.get("type") != "none" and llm_func:
|
||||||
|
extraction = extract_entities_relations(document[:3000], llm_func)
|
||||||
|
|
||||||
|
# Step 5: Store in graph
|
||||||
|
graph_plugin = plugins.get("graph", {})
|
||||||
|
graph_result = _store_in_graph(
|
||||||
|
extraction["entities"],
|
||||||
|
extraction["relations"],
|
||||||
|
graph_plugin,
|
||||||
|
graph_name
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
graph_result = {"status": "skipped"}
|
||||||
|
|
||||||
|
elapsed = round(time.time() - start_time, 3)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"status": "SUCCEEDED",
|
||||||
|
"pipeline": pipeline_name,
|
||||||
|
"chunks": len(chunks),
|
||||||
|
"embeddings": len(embeddings),
|
||||||
|
"entities": len(extraction.get("entities", [])),
|
||||||
|
"relations": len(extraction.get("relations", [])),
|
||||||
|
"vdb_result": vdb_result,
|
||||||
|
"graph_result": graph_result,
|
||||||
|
"elapsed": elapsed
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def search(query: str, pipeline_name: str = "kg-rag-standard",
|
||||||
|
collection: str = "knowledge", graph_name: str = "knowledge",
|
||||||
|
top_k: int = 5, llm_func=None) -> Dict:
|
||||||
|
"""Full search pipeline: embed -> retrieve -> rerank -> generate."""
|
||||||
|
start_time = time.time()
|
||||||
|
|
||||||
|
pipeline = get_pipeline(pipeline_name)
|
||||||
|
plugins = pipeline.get("plugins", {})
|
||||||
|
|
||||||
|
# Step 1: Embed query
|
||||||
|
embedding_plugin = plugins.get("embedding", {})
|
||||||
|
query_embeddings = _get_embedding([query], embedding_plugin)
|
||||||
|
|
||||||
|
if not query_embeddings:
|
||||||
|
return {"error": "Failed to embed query"}
|
||||||
|
|
||||||
|
query_embedding = query_embeddings[0]
|
||||||
|
|
||||||
|
# Step 2: Extract entities from query (for graph expansion)
|
||||||
|
extracted_entities = []
|
||||||
|
if llm_func:
|
||||||
|
quick_extraction = extract_entities_relations(query, llm_func)
|
||||||
|
extracted_entities = [e["name"] for e in quick_extraction.get("entities", [])]
|
||||||
|
|
||||||
|
# Step 3: Hybrid retrieval
|
||||||
|
results = hybrid_retrieve(
|
||||||
|
query_embedding,
|
||||||
|
extracted_entities,
|
||||||
|
pipeline,
|
||||||
|
collection,
|
||||||
|
graph_name
|
||||||
|
)
|
||||||
|
|
||||||
|
# Step 4: Rerank
|
||||||
|
reranker_plugin = plugins.get("reranker", {})
|
||||||
|
ranked_results = rerank_results(query, results, reranker_plugin, top_k)
|
||||||
|
|
||||||
|
# Step 5: Generate answer (optional)
|
||||||
|
answer = None
|
||||||
|
if llm_func and ranked_results:
|
||||||
|
context = "\n\n".join([
|
||||||
|
r.get("text", "") or r.get("description", "") or str(r)
|
||||||
|
for r in ranked_results[:top_k]
|
||||||
|
])
|
||||||
|
|
||||||
|
gen_prompt = f"""基于以下上下文回答问题。如果上下文中没有相关信息,请如实说明。
|
||||||
|
|
||||||
|
上下文:
|
||||||
|
{context[:2000]}
|
||||||
|
|
||||||
|
问题:{query}
|
||||||
|
|
||||||
|
请用中文回答:"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
answer = llm_func(gen_prompt)
|
||||||
|
except:
|
||||||
|
answer = None
|
||||||
|
|
||||||
|
elapsed = round(time.time() - start_time, 3)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"status": "SUCCEEDED",
|
||||||
|
"pipeline": pipeline_name,
|
||||||
|
"query": query,
|
||||||
|
"results": ranked_results[:top_k],
|
||||||
|
"total_retrieved": len(results),
|
||||||
|
"answer": answer,
|
||||||
|
"extracted_entities": extracted_entities,
|
||||||
|
"elapsed": elapsed
|
||||||
|
}
|
||||||
0
plugins/__init__.py
Normal file
0
plugins/__init__.py
Normal file
129
plugins/registry.py
Normal file
129
plugins/registry.py
Normal file
@ -0,0 +1,129 @@
|
|||||||
|
# -*- coding:utf-8 -*-
|
||||||
|
"""Plugin registry - manages available RAG plugins and pipeline configs."""
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import urllib.request
|
||||||
|
from typing import Dict, List, Optional
|
||||||
|
|
||||||
|
CONFIG_DIR = "/data/ymq/rag-pipeline/pipelines"
|
||||||
|
|
||||||
|
# Default pipeline config
|
||||||
|
DEFAULT_PIPELINE = {
|
||||||
|
"name": "kg-rag-standard",
|
||||||
|
"description": "标准知识图谱RAG",
|
||||||
|
"plugins": {
|
||||||
|
"embedding": {"type": "clip-vith14", "endpoint": "http://localhost:9086"},
|
||||||
|
"vdb": {"type": "vdb-milvus", "endpoint": "http://localhost:8886"},
|
||||||
|
"graph": {"type": "networkx", "endpoint": "http://localhost:9092"},
|
||||||
|
"llm": {"type": "harnessed", "endpoint": "internal"},
|
||||||
|
"reranker": {"type": "bge-reranker", "endpoint": "http://localhost:9090"},
|
||||||
|
"chunker": {"type": "recursive", "chunk_size": 512, "overlap": 64},
|
||||||
|
"extractor": {"type": "llm-structured"},
|
||||||
|
"retriever": {"type": "hybrid", "vector_top_k": 20, "graph_hops": 2}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
LITE_PIPELINE = {
|
||||||
|
"name": "kg-rag-lite",
|
||||||
|
"description": "轻量版RAG(纯向量,无图谱)",
|
||||||
|
"plugins": {
|
||||||
|
"embedding": {"type": "clip-vith14", "endpoint": "http://localhost:9086"},
|
||||||
|
"vdb": {"type": "vdb-milvus", "endpoint": "http://localhost:8886"},
|
||||||
|
"graph": {"type": "none"},
|
||||||
|
"llm": {"type": "harnessed", "endpoint": "internal"},
|
||||||
|
"reranker": {"type": "bge-reranker", "endpoint": "http://localhost:9090"},
|
||||||
|
"chunker": {"type": "recursive", "chunk_size": 512, "overlap": 64},
|
||||||
|
"retriever": {"type": "vector_only", "top_k": 10}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _ensure_config_dir():
|
||||||
|
os.makedirs(CONFIG_DIR, exist_ok=True)
|
||||||
|
|
||||||
|
|
||||||
|
def list_pipelines() -> List[Dict]:
|
||||||
|
"""List all available pipeline configs."""
|
||||||
|
_ensure_config_dir()
|
||||||
|
pipelines = [DEFAULT_PIPELINE, LITE_PIPELINE]
|
||||||
|
|
||||||
|
# Load custom pipelines from disk
|
||||||
|
for f in os.listdir(CONFIG_DIR):
|
||||||
|
if f.endswith('.json'):
|
||||||
|
try:
|
||||||
|
with open(os.path.join(CONFIG_DIR, f), 'r') as fh:
|
||||||
|
pipelines.append(json.load(fh))
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return pipelines
|
||||||
|
|
||||||
|
|
||||||
|
def get_pipeline(name: str) -> Optional[Dict]:
|
||||||
|
"""Get a pipeline config by name."""
|
||||||
|
for p in list_pipelines():
|
||||||
|
if p["name"] == name:
|
||||||
|
return p
|
||||||
|
return DEFAULT_PIPELINE
|
||||||
|
|
||||||
|
|
||||||
|
def save_pipeline(config: Dict) -> Dict:
|
||||||
|
"""Save a custom pipeline config."""
|
||||||
|
_ensure_config_dir()
|
||||||
|
filepath = os.path.join(CONFIG_DIR, f"{config['name']}.json")
|
||||||
|
with open(filepath, 'w') as f:
|
||||||
|
json.dump(config, f, ensure_ascii=False, indent=2)
|
||||||
|
return {"status": "ok", "filepath": filepath}
|
||||||
|
|
||||||
|
|
||||||
|
def list_plugins() -> Dict:
|
||||||
|
"""List all available plugins by capability."""
|
||||||
|
return {
|
||||||
|
"embedding": [
|
||||||
|
{"type": "clip-vith14", "model": "BAAI/CLIP-ViT-H-14", "dim": 1024, "endpoint": "http://localhost:9086", "status": "available"},
|
||||||
|
{"type": "bge-m3", "model": "BAAI/bge-m3", "dim": 1024, "status": "not_deployed"},
|
||||||
|
],
|
||||||
|
"vdb": [
|
||||||
|
{"type": "vdb-milvus", "backend": "Milvus", "endpoint": "http://localhost:8886", "status": "available"},
|
||||||
|
{"type": "qdrant", "backend": "Qdrant", "status": "not_deployed"},
|
||||||
|
],
|
||||||
|
"graph": [
|
||||||
|
{"type": "networkx", "backend": "NetworkX", "endpoint": "http://localhost:9092", "status": "available"},
|
||||||
|
{"type": "falkordb", "backend": "FalkorDB+Redis", "status": "blocked_redis_module"},
|
||||||
|
{"type": "none", "description": "Disable graph"},
|
||||||
|
],
|
||||||
|
"llm": [
|
||||||
|
{"type": "harnessed", "description": "harnessed_agent", "status": "available"},
|
||||||
|
],
|
||||||
|
"reranker": [
|
||||||
|
{"type": "bge-reranker", "model": "BAAI/bge-reranker-v2-m3", "endpoint": "http://localhost:9090", "status": "available"},
|
||||||
|
{"type": "none", "description": "Skip reranking"},
|
||||||
|
],
|
||||||
|
"chunker": [
|
||||||
|
{"type": "recursive", "description": "Recursive character splitter"},
|
||||||
|
{"type": "sentence", "description": "Sentence-based splitter"},
|
||||||
|
],
|
||||||
|
"extractor": [
|
||||||
|
{"type": "llm-structured", "description": "LLM-based entity/relation extraction"},
|
||||||
|
{"type": "none", "description": "Skip extraction"},
|
||||||
|
],
|
||||||
|
"retriever": [
|
||||||
|
{"type": "hybrid", "description": "Vector + Graph hybrid retrieval"},
|
||||||
|
{"type": "vector_only", "description": "Pure vector retrieval"},
|
||||||
|
],
|
||||||
|
"face": [
|
||||||
|
{"type": "insightface", "model": "buffalo_l", "dim": 512, "status": "available", "endpoint": "http://localhost:9091"},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def call_plugin(endpoint: str, path: str, data: Dict, timeout: int = 30) -> Dict:
|
||||||
|
"""Call a plugin endpoint via HTTP POST."""
|
||||||
|
url = f"{endpoint}{path}"
|
||||||
|
payload = json.dumps(data).encode('utf-8')
|
||||||
|
req = urllib.request.Request(url, data=payload, headers={'Content-Type': 'application/json'})
|
||||||
|
try:
|
||||||
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
||||||
|
return json.loads(resp.read().decode('utf-8'))
|
||||||
|
except Exception as e:
|
||||||
|
return {"error": str(e)}
|
||||||
0
workers/__init__.py
Normal file
0
workers/__init__.py
Normal file
Loading…
x
Reference in New Issue
Block a user