Compare commits
No commits in common. "master" and "main" have entirely different histories.
9
.gitignore
vendored
9
.gitignore
vendored
@ -1,9 +0,0 @@
|
|||||||
ah.pid
|
|
||||||
nohup.out
|
|
||||||
__pycache__/
|
|
||||||
*.pyc
|
|
||||||
*__pycache__/
|
|
||||||
logs/
|
|
||||||
files/
|
|
||||||
data/
|
|
||||||
pipelines/*.json
|
|
||||||
127
README.md
127
README.md
@ -1,127 +1,2 @@
|
|||||||
# RAG Pipeline Service
|
# rag-pipeline
|
||||||
|
|
||||||
可插拔 RAG 编排服务,支持知识图谱增强的多模态检索。
|
|
||||||
|
|
||||||
## 架构
|
|
||||||
|
|
||||||
```
|
|
||||||
┌─────────────────┐
|
|
||||||
│ RAG Pipeline │
|
|
||||||
│ (9093 CPU) │
|
|
||||||
└────────┬────────┘
|
|
||||||
│
|
|
||||||
┌──────────────────┼──────────────────┐
|
|
||||||
│ │ │
|
|
||||||
┌─────▼─────┐ ┌─────▼─────┐ ┌─────▼─────┐
|
|
||||||
│ CLIP │ │ VDB │ │ Graph │
|
|
||||||
│ 9086 │ │ 8886 │ │ 9092 │
|
|
||||||
│ GPU 2 │ │ CPU │ │ CPU │
|
|
||||||
└───────────┘ └───────────┘ └───────────┘
|
|
||||||
│ │ │
|
|
||||||
┌─────▼─────┐ ┌─────▼─────┐
|
|
||||||
│BGE Rerank │ │ LLM │
|
|
||||||
│ 9090 │ │harnessed │
|
|
||||||
│ GPU 2 │ │ │
|
|
||||||
└───────────┘ └───────────┘
|
|
||||||
```
|
|
||||||
|
|
||||||
## 插件槽位
|
|
||||||
|
|
||||||
| 槽位 | 当前实现 | 可替换为 |
|
|
||||||
|------|---------|---------|
|
|
||||||
| embedding | CLIP-ViT-H/14 (9086) | BGE-M3, OpenAI embedding |
|
|
||||||
| vdb | Milvus VDB (8886) | Qdrant, Weaviate |
|
|
||||||
| graph | NetworkX (9092) | FalkorDB, Neo4j |
|
|
||||||
| reranker | BGE-Reranker (9090) | Cohere, LLM rerank |
|
|
||||||
| llm | harnessed_agent | 任意 LLM |
|
|
||||||
| chunker | recursive/sentence | 自定义分块策略 |
|
|
||||||
| extractor | LLM-structured | spaCy, GraphRAG |
|
|
||||||
| retriever | hybrid/vector_only | 自定义检索策略 |
|
|
||||||
| face | InsightFace (待部署) | FaceNet, DeepFace |
|
|
||||||
|
|
||||||
## Pipeline 配置
|
|
||||||
|
|
||||||
### kg-rag-standard(标准版)
|
|
||||||
- CLIP embedding + Milvus + NetworkX 图 + BGE Rerank
|
|
||||||
- 向量召回 + 图扩展 + RRF 融合 + 精排
|
|
||||||
|
|
||||||
### kg-rag-lite(轻量版)
|
|
||||||
- CLIP embedding + Milvus + BGE Rerank
|
|
||||||
- 纯向量检索,无图谱
|
|
||||||
|
|
||||||
## API
|
|
||||||
|
|
||||||
### GET /api/status
|
|
||||||
服务状态和可用插件列表。
|
|
||||||
|
|
||||||
### POST /api/ingest
|
|
||||||
入库文档。
|
|
||||||
|
|
||||||
```json
|
|
||||||
{
|
|
||||||
"document": "文本内容...",
|
|
||||||
"pipeline": "kg-rag-standard",
|
|
||||||
"collection": "knowledge",
|
|
||||||
"graph_name": "knowledge"
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
流程: 分块 → CLIP embedding → VDB 存储 → 实体抽取 → 图存储
|
|
||||||
|
|
||||||
### POST /api/search
|
|
||||||
混合检索。
|
|
||||||
|
|
||||||
```json
|
|
||||||
{
|
|
||||||
"query": "搜索问题",
|
|
||||||
"pipeline": "kg-rag-standard",
|
|
||||||
"collection": "knowledge",
|
|
||||||
"graph_name": "knowledge",
|
|
||||||
"top_k": 5
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
流程: CLIP embed → VDB 向量召回 → 图扩展 → RRF 融合 → BGE Rerank
|
|
||||||
|
|
||||||
### GET/POST /api/pipelines
|
|
||||||
管理 pipeline 配置。
|
|
||||||
|
|
||||||
```json
|
|
||||||
POST: 创建自定义 pipeline
|
|
||||||
GET: 列出所有 pipeline
|
|
||||||
GET ?name=xxx: 获取指定 pipeline
|
|
||||||
```
|
|
||||||
|
|
||||||
### GET /api/plugins
|
|
||||||
列出所有可用插件及状态。
|
|
||||||
|
|
||||||
## 部署
|
|
||||||
|
|
||||||
```bash
|
|
||||||
cd /data/ymq/rag-pipeline
|
|
||||||
bash build.sh deploy
|
|
||||||
bash build.sh stop
|
|
||||||
bash build.sh status
|
|
||||||
```
|
|
||||||
|
|
||||||
## 端到端测试
|
|
||||||
|
|
||||||
```bash
|
|
||||||
# 1. 入库
|
|
||||||
curl -X POST http://localhost:9093/api/ingest \
|
|
||||||
-H "Content-Type: application/json" \
|
|
||||||
-d '{"document":"张三在ABC公司担任技术总监,他和李四是同事关系。","pipeline":"kg-rag-standard"}'
|
|
||||||
|
|
||||||
# 2. 检索
|
|
||||||
curl -X POST http://localhost:9093/api/search \
|
|
||||||
-H "Content-Type: application/json" \
|
|
||||||
-d '{"query":"张三在哪家公司工作?","pipeline":"kg-rag-standard"}'
|
|
||||||
```
|
|
||||||
|
|
||||||
## 端口
|
|
||||||
|
|
||||||
9093 (CPU only)
|
|
||||||
|
|
||||||
## Git
|
|
||||||
|
|
||||||
git@git.opencomputing.cn:yumoqing/rag-pipeline.git
|
|
||||||
|
|||||||
6
ah.py
6
ah.py
@ -1,6 +0,0 @@
|
|||||||
# -*- coding:utf-8 -*-
|
|
||||||
from ahserver.webapp import webapp
|
|
||||||
from init import load_rag_pipeline
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
webapp(load_rag_pipeline)
|
|
||||||
55
build.sh
55
build.sh
@ -1,55 +0,0 @@
|
|||||||
#!/usr/bin/env bash
|
|
||||||
# RAG Pipeline Service
|
|
||||||
set -e
|
|
||||||
cd "$(dirname "$0")"
|
|
||||||
|
|
||||||
SERVICE_NAME="rag-pipeline"
|
|
||||||
PORT=9093
|
|
||||||
PY=/data/ymq/wan22-service/py3/bin/python
|
|
||||||
action="${1:-status}"
|
|
||||||
|
|
||||||
case "$action" in
|
|
||||||
deploy|update)
|
|
||||||
echo "=== $SERVICE_NAME Deploy (CPU, port $PORT) ==="
|
|
||||||
if [ -f ah.pid ] && kill -0 $(cat ah.pid) 2>/dev/null; then
|
|
||||||
kill $(cat ah.pid) 2>/dev/null || true; sleep 2
|
|
||||||
fi
|
|
||||||
if [ -d .git ] && [ -f .git/HEAD ]; then
|
|
||||||
git pull origin master 2>/dev/null || true
|
|
||||||
fi
|
|
||||||
mkdir -p logs files wwwroot pipelines data
|
|
||||||
export PYTHONPATH="$(pwd)"
|
|
||||||
nohup $PY ah.py > nohup.out 2>&1 &
|
|
||||||
echo $! > ah.pid
|
|
||||||
echo "Started PID $(cat ah.pid) on port $PORT"
|
|
||||||
sleep 3
|
|
||||||
if curl -s http://localhost:$PORT/api/status > /dev/null 2>&1; then
|
|
||||||
echo "Service healthy"
|
|
||||||
else
|
|
||||||
echo "WARNING: not responding, check nohup.out"
|
|
||||||
tail -30 nohup.out
|
|
||||||
fi
|
|
||||||
;;
|
|
||||||
stop)
|
|
||||||
if [ -f ah.pid ]; then
|
|
||||||
kill $(cat ah.pid) 2>/dev/null || true; rm -f ah.pid; echo "Stopped"
|
|
||||||
else echo "Not running"; fi
|
|
||||||
;;
|
|
||||||
start)
|
|
||||||
mkdir -p logs files wwwroot pipelines data
|
|
||||||
export PYTHONPATH="$(pwd)"
|
|
||||||
nohup $PY ah.py > nohup.out 2>&1 &
|
|
||||||
echo $! > ah.pid; echo "Started PID $(cat ah.pid)"
|
|
||||||
;;
|
|
||||||
status)
|
|
||||||
echo "=== $SERVICE_NAME Status ==="
|
|
||||||
if [ -f ah.pid ] && kill -0 $(cat ah.pid) 2>/dev/null; then
|
|
||||||
echo "Process: running (PID $(cat ah.pid))"
|
|
||||||
else echo "Process: not running"; fi
|
|
||||||
echo "Port: $PORT"
|
|
||||||
if curl -s --max-time 3 http://localhost:$PORT/api/status > /dev/null 2>&1; then
|
|
||||||
echo "HTTP: OK"
|
|
||||||
else echo "HTTP: not responding"; fi
|
|
||||||
;;
|
|
||||||
*) echo "Usage: $0 {deploy|update|stop|start|status}"; exit 1 ;;
|
|
||||||
esac
|
|
||||||
@ -1,28 +0,0 @@
|
|||||||
{
|
|
||||||
"password_key": "RagPipeline2026Key",
|
|
||||||
"filesroot": "$[workdir]$/files",
|
|
||||||
"logger": {
|
|
||||||
"name": "rag-pipeline",
|
|
||||||
"levelname": "info",
|
|
||||||
"logfile": "$[workdir]$/logs/rag-pipeline.log"
|
|
||||||
},
|
|
||||||
"website": {
|
|
||||||
"paths": [["$[workdir]$/wwwroot", ""]],
|
|
||||||
"client_max_size": 52428800,
|
|
||||||
"host": "0.0.0.0",
|
|
||||||
"port": 9093,
|
|
||||||
"coding": "utf-8",
|
|
||||||
"indexes": ["index.html"],
|
|
||||||
"startswiths": [
|
|
||||||
{"leading": "/api/status", "registerfunction": "status"},
|
|
||||||
{"leading": "/api/ingest", "registerfunction": "ingest"},
|
|
||||||
{"leading": "/api/search", "registerfunction": "search"},
|
|
||||||
{"leading": "/api/pipelines", "registerfunction": "pipelines"},
|
|
||||||
{"leading": "/api/plugins", "registerfunction": "plugins"}
|
|
||||||
],
|
|
||||||
"processors": [
|
|
||||||
[".tmpl", "tmpl"], [".app", "app"], [".ui", "bui"],
|
|
||||||
[".dspy", "dspy"], [".md", "md"]
|
|
||||||
]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,73 +0,0 @@
|
|||||||
# -*- coding:utf-8 -*-
|
|
||||||
"""Document chunking strategies."""
|
|
||||||
import re
|
|
||||||
from typing import List, Dict
|
|
||||||
|
|
||||||
|
|
||||||
def recursive_chunk(text: str, chunk_size: int = 512, overlap: int = 64) -> List[Dict]:
|
|
||||||
"""Split text into chunks with overlap."""
|
|
||||||
if not text or len(text) <= chunk_size:
|
|
||||||
return [{"id": "chunk_0", "text": text, "index": 0}]
|
|
||||||
|
|
||||||
chunks = []
|
|
||||||
start = 0
|
|
||||||
idx = 0
|
|
||||||
|
|
||||||
while start < len(text):
|
|
||||||
end = start + chunk_size
|
|
||||||
|
|
||||||
# Try to break at sentence boundary
|
|
||||||
if end < len(text):
|
|
||||||
# Look for sentence ending in the last 100 chars
|
|
||||||
search_start = max(start + chunk_size - 100, start)
|
|
||||||
search_region = text[search_start:end]
|
|
||||||
|
|
||||||
for sep in ['。', '!', '?', '. ', '! ', '? ', '\n\n', '\n', ';', ';']:
|
|
||||||
last_sep = search_region.rfind(sep)
|
|
||||||
if last_sep >= 0:
|
|
||||||
end = search_start + last_sep + len(sep)
|
|
||||||
break
|
|
||||||
|
|
||||||
chunk_text = text[start:end].strip()
|
|
||||||
if chunk_text:
|
|
||||||
chunks.append({
|
|
||||||
"id": f"chunk_{idx}",
|
|
||||||
"text": chunk_text,
|
|
||||||
"index": idx
|
|
||||||
})
|
|
||||||
idx += 1
|
|
||||||
|
|
||||||
# Move forward with overlap
|
|
||||||
start = end - overlap
|
|
||||||
if start >= len(text):
|
|
||||||
break
|
|
||||||
|
|
||||||
return chunks
|
|
||||||
|
|
||||||
|
|
||||||
def sentence_chunk(text: str) -> List[Dict]:
|
|
||||||
"""Split text by sentences."""
|
|
||||||
# Split on Chinese/English sentence endings
|
|
||||||
sentences = re.split(r'(?<=[。!?.!?])\s*', text)
|
|
||||||
|
|
||||||
chunks = []
|
|
||||||
for idx, sent in enumerate(sentences):
|
|
||||||
sent = sent.strip()
|
|
||||||
if sent:
|
|
||||||
chunks.append({
|
|
||||||
"id": f"chunk_{idx}",
|
|
||||||
"text": sent,
|
|
||||||
"index": idx
|
|
||||||
})
|
|
||||||
|
|
||||||
return chunks
|
|
||||||
|
|
||||||
|
|
||||||
def chunk_document(text: str, strategy: str = "recursive", **kwargs) -> List[Dict]:
|
|
||||||
"""Chunk a document using the specified strategy."""
|
|
||||||
if strategy == "sentence":
|
|
||||||
return sentence_chunk(text)
|
|
||||||
else:
|
|
||||||
return recursive_chunk(text,
|
|
||||||
chunk_size=kwargs.get("chunk_size", 512),
|
|
||||||
overlap=kwargs.get("overlap", 64))
|
|
||||||
@ -1,79 +0,0 @@
|
|||||||
# -*- coding:utf-8 -*-
|
|
||||||
"""Entity and relation extraction via LLM."""
|
|
||||||
import json
|
|
||||||
import re
|
|
||||||
from typing import List, Dict
|
|
||||||
|
|
||||||
|
|
||||||
EXTRACTION_PROMPT = """你是一个知识图谱抽取专家。请从以下文本中抽取实体和关系。
|
|
||||||
|
|
||||||
文本:
|
|
||||||
{text}
|
|
||||||
|
|
||||||
请按以下JSON格式返回:
|
|
||||||
{{
|
|
||||||
"entities": [
|
|
||||||
{{"name": "实体名", "type": "实体类型(person/company/product/concept/event/location)", "description": "简要描述"}}
|
|
||||||
],
|
|
||||||
"relations": [
|
|
||||||
{{"source": "源实体名", "target": "目标实体名", "relation": "关系类型", "description": "关系描述"}}
|
|
||||||
]
|
|
||||||
}}
|
|
||||||
|
|
||||||
只返回JSON,不要其他内容。"""
|
|
||||||
|
|
||||||
|
|
||||||
def extract_entities_relations(text: str, llm_func=None) -> Dict:
|
|
||||||
"""Extract entities and relations from text using LLM.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
text: Input text
|
|
||||||
llm_func: Function that takes a prompt and returns LLM response text.
|
|
||||||
If None, returns empty result.
|
|
||||||
"""
|
|
||||||
if not llm_func:
|
|
||||||
return {"entities": [], "relations": []}
|
|
||||||
|
|
||||||
prompt = EXTRACTION_PROMPT.format(text=text[:2000]) # Limit text length
|
|
||||||
|
|
||||||
try:
|
|
||||||
response = llm_func(prompt)
|
|
||||||
|
|
||||||
# Try to extract JSON from response
|
|
||||||
# Remove markdown code blocks if present
|
|
||||||
response = response.strip()
|
|
||||||
if response.startswith('```'):
|
|
||||||
response = re.sub(r'^```(?:json)?\s*', '', response)
|
|
||||||
response = re.sub(r'\s*```$', '', response)
|
|
||||||
|
|
||||||
result = json.loads(response)
|
|
||||||
|
|
||||||
# Validate structure
|
|
||||||
entities = result.get("entities", [])
|
|
||||||
relations = result.get("relations", [])
|
|
||||||
|
|
||||||
# Basic validation
|
|
||||||
valid_entities = []
|
|
||||||
for e in entities:
|
|
||||||
if isinstance(e, dict) and "name" in e:
|
|
||||||
valid_entities.append({
|
|
||||||
"name": e["name"],
|
|
||||||
"type": e.get("type", "unknown"),
|
|
||||||
"description": e.get("description", "")
|
|
||||||
})
|
|
||||||
|
|
||||||
valid_relations = []
|
|
||||||
entity_names = {e["name"] for e in valid_entities}
|
|
||||||
for r in relations:
|
|
||||||
if isinstance(r, dict) and "source" in r and "target" in r:
|
|
||||||
valid_relations.append({
|
|
||||||
"source": r["source"],
|
|
||||||
"target": r["target"],
|
|
||||||
"relation": r.get("relation", "related_to"),
|
|
||||||
"description": r.get("description", "")
|
|
||||||
})
|
|
||||||
|
|
||||||
return {"entities": valid_entities, "relations": valid_relations}
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
return {"entities": [], "relations": [], "error": str(e)}
|
|
||||||
@ -1,163 +0,0 @@
|
|||||||
# -*- coding:utf-8 -*-
|
|
||||||
"""Hybrid retrieval: vector search + graph expansion + RRF fusion."""
|
|
||||||
from typing import List, Dict
|
|
||||||
|
|
||||||
def rrf_fusion(results_list: List[List[Dict]], k: int = 60) -> List[Dict]:
|
|
||||||
"""Reciprocal Rank Fusion - merge multiple ranked lists."""
|
|
||||||
scores = {}
|
|
||||||
|
|
||||||
for results in results_list:
|
|
||||||
for rank, item in enumerate(results):
|
|
||||||
doc_id = item.get("id", item.get("node_id", str(rank)))
|
|
||||||
rrf_score = 1.0 / (k + rank + 1)
|
|
||||||
|
|
||||||
if doc_id not in scores:
|
|
||||||
scores[doc_id] = {"item": item, "score": 0}
|
|
||||||
scores[doc_id]["score"] += rrf_score
|
|
||||||
|
|
||||||
sorted_items = sorted(scores.values(), key=lambda x: -x["score"])
|
|
||||||
|
|
||||||
for item in sorted_items:
|
|
||||||
item["item"]["rrf_score"] = round(item["score"], 6)
|
|
||||||
|
|
||||||
return [item["item"] for item in sorted_items]
|
|
||||||
|
|
||||||
|
|
||||||
def vector_search(query_embedding: List[float], vdb_plugin: Dict,
|
|
||||||
collection: str = "knowledge", top_k: int = 20) -> List[Dict]:
|
|
||||||
"""Search VDB for similar vectors."""
|
|
||||||
from plugins.registry import call_plugin
|
|
||||||
|
|
||||||
endpoint = vdb_plugin.get("endpoint")
|
|
||||||
if not endpoint:
|
|
||||||
return []
|
|
||||||
|
|
||||||
result = call_plugin(endpoint, "/v1/query", {
|
|
||||||
"colname": collection,
|
|
||||||
"vector": query_embedding,
|
|
||||||
"pagerows": top_k,
|
|
||||||
"page": 1
|
|
||||||
})
|
|
||||||
|
|
||||||
if "error" in result or result.get("status") != "SUCCEEDED":
|
|
||||||
return []
|
|
||||||
|
|
||||||
# VDB returns {"status": "SUCCEEDED", "data": {"rows": [...]}}
|
|
||||||
data = result.get("data", {})
|
|
||||||
return data.get("rows", []) if isinstance(data, dict) else []
|
|
||||||
|
|
||||||
|
|
||||||
def graph_expand(entity_ids: List[str], graph_plugin: Dict,
|
|
||||||
graph_name: str = "knowledge", hops: int = 2) -> List[Dict]:
|
|
||||||
"""Expand entities via graph neighbors."""
|
|
||||||
from plugins.registry import call_plugin
|
|
||||||
|
|
||||||
endpoint = graph_plugin.get("endpoint")
|
|
||||||
if not endpoint or graph_plugin.get("type") == "none":
|
|
||||||
return []
|
|
||||||
|
|
||||||
all_neighbors = []
|
|
||||||
seen = set()
|
|
||||||
|
|
||||||
for entity_id in entity_ids:
|
|
||||||
# Graph uses entity_{name} format from ingest
|
|
||||||
node_id = f"entity_{entity_id}".replace(" ", "_")
|
|
||||||
result = call_plugin(endpoint, "/api/graph/neighbors", {
|
|
||||||
"graph": graph_name,
|
|
||||||
"node_id": node_id,
|
|
||||||
"depth": hops
|
|
||||||
})
|
|
||||||
|
|
||||||
if "error" not in result:
|
|
||||||
for neighbor in result.get("neighbors", []):
|
|
||||||
nid = neighbor.get("node_id", "")
|
|
||||||
if nid and nid not in seen:
|
|
||||||
seen.add(nid)
|
|
||||||
neighbor["source_entity"] = entity_id
|
|
||||||
neighbor["id"] = nid
|
|
||||||
all_neighbors.append(neighbor)
|
|
||||||
|
|
||||||
return all_neighbors
|
|
||||||
|
|
||||||
|
|
||||||
def hybrid_retrieve(query_embedding: List[float],
|
|
||||||
extracted_entities: List[str],
|
|
||||||
pipeline: Dict,
|
|
||||||
collection: str = "knowledge",
|
|
||||||
graph_name: str = "knowledge") -> List[Dict]:
|
|
||||||
"""Hybrid retrieval: vector + graph + RRF fusion."""
|
|
||||||
plugins = pipeline.get("plugins", {})
|
|
||||||
retriever_config = plugins.get("retriever", {})
|
|
||||||
|
|
||||||
vector_top_k = retriever_config.get("vector_top_k", 20)
|
|
||||||
graph_hops = retriever_config.get("graph_hops", 2)
|
|
||||||
retriever_type = retriever_config.get("type", "hybrid")
|
|
||||||
|
|
||||||
results_lists = []
|
|
||||||
|
|
||||||
# Vector search
|
|
||||||
vdb_plugin = plugins.get("vdb", {})
|
|
||||||
vector_results = vector_search(query_embedding, vdb_plugin, collection, vector_top_k)
|
|
||||||
if vector_results:
|
|
||||||
results_lists.append(vector_results)
|
|
||||||
|
|
||||||
# Graph expansion (only if hybrid mode and graph enabled)
|
|
||||||
if retriever_type == "hybrid" and extracted_entities:
|
|
||||||
graph_plugin = plugins.get("graph", {})
|
|
||||||
graph_results = graph_expand(extracted_entities, graph_plugin, graph_name, graph_hops)
|
|
||||||
if graph_results:
|
|
||||||
results_lists.append(graph_results)
|
|
||||||
|
|
||||||
if not results_lists:
|
|
||||||
return []
|
|
||||||
|
|
||||||
if len(results_lists) == 1:
|
|
||||||
return results_lists[0]
|
|
||||||
|
|
||||||
return rrf_fusion(results_lists)
|
|
||||||
|
|
||||||
|
|
||||||
def rerank_results(query: str, results: List[Dict], reranker_plugin: Dict,
|
|
||||||
top_k: int = 5) -> List[Dict]:
|
|
||||||
"""Rerank results using BGE-Reranker."""
|
|
||||||
from plugins.registry import call_plugin
|
|
||||||
|
|
||||||
if not results or reranker_plugin.get("type") == "none":
|
|
||||||
return results[:top_k]
|
|
||||||
|
|
||||||
endpoint = reranker_plugin.get("endpoint")
|
|
||||||
if not endpoint:
|
|
||||||
return results[:top_k]
|
|
||||||
|
|
||||||
# Prepare documents for reranking
|
|
||||||
documents = []
|
|
||||||
for r in results:
|
|
||||||
doc = r.get("text", "") or r.get("description", "") or r.get("transcript", "")
|
|
||||||
if not doc:
|
|
||||||
doc = str(r.get("name", "")) + " " + str(r.get("attrs", {}))
|
|
||||||
documents.append(doc)
|
|
||||||
|
|
||||||
if not documents:
|
|
||||||
return results[:top_k]
|
|
||||||
|
|
||||||
rerank_result = call_plugin(endpoint, "/api/rerank", {
|
|
||||||
"query": query,
|
|
||||||
"documents": documents,
|
|
||||||
"top_k": top_k
|
|
||||||
})
|
|
||||||
|
|
||||||
if "error" in rerank_result:
|
|
||||||
return results[:top_k]
|
|
||||||
|
|
||||||
# Map reranked docs back to original results
|
|
||||||
ranked = []
|
|
||||||
for rd in rerank_result.get("ranked_docs", []):
|
|
||||||
doc_text = rd.get("doc", "")
|
|
||||||
for orig in results:
|
|
||||||
orig_text = orig.get("text", "") or orig.get("description", "") or str(orig.get("name", ""))
|
|
||||||
if doc_text == orig_text or doc_text in orig_text or orig_text in doc_text:
|
|
||||||
orig["rerank_score"] = rd.get("score", 0)
|
|
||||||
ranked.append(orig)
|
|
||||||
break
|
|
||||||
|
|
||||||
return ranked if ranked else results[:top_k]
|
|
||||||
126
init.py
126
init.py
@ -1,126 +0,0 @@
|
|||||||
# -*- coding:utf-8 -*-
|
|
||||||
"""RAG Pipeline API endpoints."""
|
|
||||||
from traceback import format_exc
|
|
||||||
from ahserver.serverenv import ServerEnv
|
|
||||||
from appPublic.registerfunction import RegisterFunction
|
|
||||||
from appPublic.log import exception
|
|
||||||
import json
|
|
||||||
|
|
||||||
|
|
||||||
async def status_handler(request, params_kw, *args, **kwargs):
|
|
||||||
import sys, os
|
|
||||||
sys.path.insert(0, os.getcwd())
|
|
||||||
from plugins.registry import list_pipelines, list_plugins
|
|
||||||
|
|
||||||
pipelines = list_pipelines()
|
|
||||||
plugins = list_plugins()
|
|
||||||
|
|
||||||
# Check service availability
|
|
||||||
services = {}
|
|
||||||
for cap, plugin_list in plugins.items():
|
|
||||||
for p in plugin_list:
|
|
||||||
if p.get("status") == "available":
|
|
||||||
services[cap] = p.get("type", "unknown")
|
|
||||||
|
|
||||||
return json.dumps({
|
|
||||||
"service": "rag-pipeline",
|
|
||||||
"port": 9093,
|
|
||||||
"active_services": services,
|
|
||||||
"pipelines": [p["name"] for p in pipelines],
|
|
||||||
"endpoints": ["/api/status", "/api/ingest", "/api/search", "/api/pipelines", "/api/plugins"]
|
|
||||||
}, indent=2, ensure_ascii=False)
|
|
||||||
|
|
||||||
|
|
||||||
async def ingest_handler(request, params_kw, *args, **kwargs):
|
|
||||||
import sys, os
|
|
||||||
sys.path.insert(0, os.getcwd())
|
|
||||||
from pipeline import ingest
|
|
||||||
|
|
||||||
try:
|
|
||||||
document = params_kw.get("document", "")
|
|
||||||
if not document:
|
|
||||||
return json.dumps({"error": "document text required"})
|
|
||||||
|
|
||||||
pipeline_name = params_kw.get("pipeline", "kg-rag-standard")
|
|
||||||
collection = params_kw.get("collection", "knowledge")
|
|
||||||
graph_name = params_kw.get("graph_name", "knowledge")
|
|
||||||
|
|
||||||
# Optional LLM for extraction (not used in basic mode)
|
|
||||||
# llm_func = params_kw.get("llm_func") # Can't pass functions via HTTP
|
|
||||||
|
|
||||||
result = ingest(document, pipeline_name, collection, graph_name, llm_func=None)
|
|
||||||
return json.dumps(result, ensure_ascii=False)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
exception(f"{e}, {format_exc()}")
|
|
||||||
return json.dumps({"error": str(e)})
|
|
||||||
|
|
||||||
|
|
||||||
async def search_handler(request, params_kw, *args, **kwargs):
|
|
||||||
import sys, os
|
|
||||||
sys.path.insert(0, os.getcwd())
|
|
||||||
from pipeline import search
|
|
||||||
|
|
||||||
try:
|
|
||||||
query = params_kw.get("query", "")
|
|
||||||
if not query:
|
|
||||||
return json.dumps({"error": "query text required"})
|
|
||||||
|
|
||||||
pipeline_name = params_kw.get("pipeline", "kg-rag-standard")
|
|
||||||
collection = params_kw.get("collection", "knowledge")
|
|
||||||
graph_name = params_kw.get("graph_name", "knowledge")
|
|
||||||
top_k = int(params_kw.get("top_k", 5))
|
|
||||||
|
|
||||||
result = search(query, pipeline_name, collection, graph_name, top_k, llm_func=None)
|
|
||||||
return json.dumps(result, ensure_ascii=False)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
exception(f"{e}, {format_exc()}")
|
|
||||||
return json.dumps({"error": str(e)})
|
|
||||||
|
|
||||||
|
|
||||||
async def pipelines_handler(request, params_kw, *args, **kwargs):
|
|
||||||
import sys, os
|
|
||||||
sys.path.insert(0, os.getcwd())
|
|
||||||
from plugins.registry import list_pipelines, get_pipeline, save_pipeline
|
|
||||||
|
|
||||||
method = request.method
|
|
||||||
|
|
||||||
if method == "POST":
|
|
||||||
# Create/update pipeline
|
|
||||||
config = params_kw
|
|
||||||
if "name" not in config:
|
|
||||||
return json.dumps({"error": "pipeline config with 'name' required"})
|
|
||||||
result = save_pipeline(config)
|
|
||||||
return json.dumps({"status": "SUCCEEDED", **result}, ensure_ascii=False)
|
|
||||||
else:
|
|
||||||
# List pipelines
|
|
||||||
name = params_kw.get("name")
|
|
||||||
if name:
|
|
||||||
pipeline = get_pipeline(name)
|
|
||||||
if pipeline:
|
|
||||||
return json.dumps({"status": "SUCCEEDED", "pipeline": pipeline}, indent=2, ensure_ascii=False)
|
|
||||||
return json.dumps({"error": f"Pipeline '{name}' not found"})
|
|
||||||
|
|
||||||
pipelines = list_pipelines()
|
|
||||||
return json.dumps({"status": "SUCCEEDED", "pipelines": pipelines}, indent=2, ensure_ascii=False)
|
|
||||||
|
|
||||||
|
|
||||||
async def plugins_handler(request, params_kw, *args, **kwargs):
|
|
||||||
import sys, os
|
|
||||||
sys.path.insert(0, os.getcwd())
|
|
||||||
from plugins.registry import list_plugins
|
|
||||||
|
|
||||||
plugins = list_plugins()
|
|
||||||
return json.dumps({"status": "SUCCEEDED", "plugins": plugins}, indent=2, ensure_ascii=False)
|
|
||||||
|
|
||||||
|
|
||||||
def load_rag_pipeline():
|
|
||||||
"""Register API handlers"""
|
|
||||||
env = ServerEnv()
|
|
||||||
rf = RegisterFunction()
|
|
||||||
rf.register("status", status_handler)
|
|
||||||
rf.register("ingest", ingest_handler)
|
|
||||||
rf.register("search", search_handler)
|
|
||||||
rf.register("pipelines", pipelines_handler)
|
|
||||||
rf.register("plugins", plugins_handler)
|
|
||||||
270
pipeline.py
270
pipeline.py
@ -1,270 +0,0 @@
|
|||||||
# -*- coding:utf-8 -*-
|
|
||||||
"""RAG Pipeline orchestration - ingest and search workflows."""
|
|
||||||
import json
|
|
||||||
import time
|
|
||||||
import uuid
|
|
||||||
from typing import Dict, List, Optional
|
|
||||||
from traceback import format_exc
|
|
||||||
|
|
||||||
from plugins.registry import get_pipeline, call_plugin
|
|
||||||
from core.chunker import chunk_document
|
|
||||||
from core.extractor import extract_entities_relations
|
|
||||||
from core.retriever import hybrid_retrieve, rerank_results
|
|
||||||
|
|
||||||
|
|
||||||
def _get_embedding(texts: List[str], embedding_plugin: Dict) -> List[List[float]]:
|
|
||||||
"""Get text embeddings from CLIP service."""
|
|
||||||
endpoint = embedding_plugin.get("endpoint")
|
|
||||||
if not endpoint:
|
|
||||||
return []
|
|
||||||
|
|
||||||
result = call_plugin(endpoint, "/api/text", {"texts": texts})
|
|
||||||
if "error" in result:
|
|
||||||
return []
|
|
||||||
|
|
||||||
return result.get("embeddings", [])
|
|
||||||
|
|
||||||
|
|
||||||
def _ensure_collection(vdb_plugin: Dict, collection: str, dim: int = 1024) -> Dict:
|
|
||||||
"""Ensure VDB collection exists with correct schema."""
|
|
||||||
endpoint = vdb_plugin.get("endpoint")
|
|
||||||
if not endpoint:
|
|
||||||
return {"error": "VDB endpoint not configured"}
|
|
||||||
|
|
||||||
# Check if collection exists
|
|
||||||
list_result = call_plugin(endpoint, "/v1/listcollections", {})
|
|
||||||
existing = list_result.get("collections", [])
|
|
||||||
|
|
||||||
if collection in existing:
|
|
||||||
return {"status": "exists"}
|
|
||||||
|
|
||||||
# Create with proper field schema
|
|
||||||
fields = [
|
|
||||||
{"name": "id", "type": "str", "is_primary": True, "max_length": 256},
|
|
||||||
{"name": "text", "type": "str", "max_length": 65535},
|
|
||||||
{"name": "description", "type": "str", "max_length": 65535},
|
|
||||||
{"name": "type", "type": "str", "max_length": 128},
|
|
||||||
{"name": "embedding", "type": "fvector", "dim": dim}
|
|
||||||
]
|
|
||||||
|
|
||||||
result = call_plugin(endpoint, "/v1/createcollection", {
|
|
||||||
"colname": collection,
|
|
||||||
"fields": fields,
|
|
||||||
"description": "RAG knowledge collection",
|
|
||||||
"metric": "COSINE"
|
|
||||||
})
|
|
||||||
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
def _store_in_vdb(items: List[Dict], embeddings: List[List[float]],
|
|
||||||
vdb_plugin: Dict, collection: str = "knowledge") -> Dict:
|
|
||||||
"""Store items with embeddings in VDB."""
|
|
||||||
endpoint = vdb_plugin.get("endpoint")
|
|
||||||
if not endpoint:
|
|
||||||
return {"error": "VDB endpoint not configured"}
|
|
||||||
|
|
||||||
# Ensure collection exists
|
|
||||||
ensure_result = _ensure_collection(vdb_plugin, collection)
|
|
||||||
if "error" in ensure_result:
|
|
||||||
return ensure_result
|
|
||||||
|
|
||||||
# Prepare batch insert - data must match schema fields
|
|
||||||
rows = []
|
|
||||||
for item, emb in zip(items, embeddings):
|
|
||||||
row = {
|
|
||||||
"id": item.get("id", str(uuid.uuid4())[:16]),
|
|
||||||
"text": item.get("text", ""),
|
|
||||||
"description": item.get("description", ""),
|
|
||||||
"type": item.get("type", "chunk"),
|
|
||||||
"embedding": emb
|
|
||||||
}
|
|
||||||
rows.append(row)
|
|
||||||
|
|
||||||
result = call_plugin(endpoint, "/v1/batchinsert", {
|
|
||||||
"colname": collection,
|
|
||||||
"data": rows
|
|
||||||
})
|
|
||||||
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
def _store_in_graph(entities: List[Dict], relations: List[Dict],
|
|
||||||
graph_plugin: Dict, graph_name: str = "knowledge") -> Dict:
|
|
||||||
"""Store entities and relations in graph."""
|
|
||||||
endpoint = graph_plugin.get("endpoint")
|
|
||||||
if not endpoint or graph_plugin.get("type") == "none":
|
|
||||||
return {"status": "skipped", "reason": "graph disabled"}
|
|
||||||
|
|
||||||
added_nodes = 0
|
|
||||||
added_edges = 0
|
|
||||||
|
|
||||||
for entity in entities:
|
|
||||||
name = entity.get("name", "")
|
|
||||||
node_id = f"entity_{name}".replace(" ", "_")
|
|
||||||
result = call_plugin(endpoint, "/api/graph/add_node", {
|
|
||||||
"graph": graph_name,
|
|
||||||
"node_id": node_id,
|
|
||||||
"attrs": {
|
|
||||||
"name": name,
|
|
||||||
"type": entity.get("type", "unknown"),
|
|
||||||
"description": entity.get("description", "")
|
|
||||||
}
|
|
||||||
})
|
|
||||||
if "error" not in result:
|
|
||||||
added_nodes += 1
|
|
||||||
|
|
||||||
for rel in relations:
|
|
||||||
source = f"entity_{rel.get('source', '')}".replace(" ", "_")
|
|
||||||
target = f"entity_{rel.get('target', '')}".replace(" ", "_")
|
|
||||||
result = call_plugin(endpoint, "/api/graph/add_edge", {
|
|
||||||
"graph": graph_name,
|
|
||||||
"source": source,
|
|
||||||
"target": target,
|
|
||||||
"attrs": {
|
|
||||||
"relation": rel.get("relation", "related_to"),
|
|
||||||
"description": rel.get("description", "")
|
|
||||||
}
|
|
||||||
})
|
|
||||||
if "error" not in result:
|
|
||||||
added_edges += 1
|
|
||||||
|
|
||||||
# Save graph to disk
|
|
||||||
call_plugin(endpoint, "/api/graph/save", {"graph": graph_name})
|
|
||||||
|
|
||||||
return {"status": "ok", "nodes_added": added_nodes, "edges_added": added_edges}
|
|
||||||
|
|
||||||
|
|
||||||
def ingest(document: str, pipeline_name: str = "kg-rag-standard",
|
|
||||||
collection: str = "knowledge", graph_name: str = "knowledge",
|
|
||||||
llm_func=None) -> Dict:
|
|
||||||
"""Full ingest pipeline: chunk -> embed -> extract -> store."""
|
|
||||||
start_time = time.time()
|
|
||||||
|
|
||||||
pipeline = get_pipeline(pipeline_name)
|
|
||||||
plugins = pipeline.get("plugins", {})
|
|
||||||
|
|
||||||
# Step 1: Chunk
|
|
||||||
chunker_config = plugins.get("chunker", {})
|
|
||||||
chunks = chunk_document(document,
|
|
||||||
strategy=chunker_config.get("type", "recursive"),
|
|
||||||
chunk_size=chunker_config.get("chunk_size", 512),
|
|
||||||
overlap=chunker_config.get("overlap", 64))
|
|
||||||
|
|
||||||
chunk_texts = [c["text"] for c in chunks]
|
|
||||||
|
|
||||||
# Step 2: Embed
|
|
||||||
embedding_plugin = plugins.get("embedding", {})
|
|
||||||
embeddings = _get_embedding(chunk_texts, embedding_plugin)
|
|
||||||
|
|
||||||
if not embeddings:
|
|
||||||
return {"error": "Failed to get embeddings", "chunks": len(chunks)}
|
|
||||||
|
|
||||||
# Step 3: Store chunks + embeddings in VDB
|
|
||||||
vdb_plugin = plugins.get("vdb", {})
|
|
||||||
vdb_result = _store_in_vdb(chunks, embeddings, vdb_plugin, collection)
|
|
||||||
|
|
||||||
# Step 4: Extract entities and relations
|
|
||||||
extractor_config = plugins.get("extractor", {})
|
|
||||||
extraction = {"entities": [], "relations": []}
|
|
||||||
|
|
||||||
if extractor_config.get("type") != "none" and llm_func:
|
|
||||||
extraction = extract_entities_relations(document[:3000], llm_func)
|
|
||||||
|
|
||||||
# Step 5: Store in graph
|
|
||||||
graph_plugin = plugins.get("graph", {})
|
|
||||||
graph_result = _store_in_graph(
|
|
||||||
extraction["entities"],
|
|
||||||
extraction["relations"],
|
|
||||||
graph_plugin,
|
|
||||||
graph_name
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
graph_result = {"status": "skipped"}
|
|
||||||
|
|
||||||
elapsed = round(time.time() - start_time, 3)
|
|
||||||
|
|
||||||
return {
|
|
||||||
"status": "SUCCEEDED",
|
|
||||||
"pipeline": pipeline_name,
|
|
||||||
"chunks": len(chunks),
|
|
||||||
"embeddings": len(embeddings),
|
|
||||||
"entities": len(extraction.get("entities", [])),
|
|
||||||
"relations": len(extraction.get("relations", [])),
|
|
||||||
"vdb_result": vdb_result,
|
|
||||||
"graph_result": graph_result,
|
|
||||||
"elapsed": elapsed
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def search(query: str, pipeline_name: str = "kg-rag-standard",
|
|
||||||
collection: str = "knowledge", graph_name: str = "knowledge",
|
|
||||||
top_k: int = 5, llm_func=None) -> Dict:
|
|
||||||
"""Full search pipeline: embed -> retrieve -> rerank -> generate."""
|
|
||||||
start_time = time.time()
|
|
||||||
|
|
||||||
pipeline = get_pipeline(pipeline_name)
|
|
||||||
plugins = pipeline.get("plugins", {})
|
|
||||||
|
|
||||||
# Step 1: Embed query
|
|
||||||
embedding_plugin = plugins.get("embedding", {})
|
|
||||||
query_embeddings = _get_embedding([query], embedding_plugin)
|
|
||||||
|
|
||||||
if not query_embeddings:
|
|
||||||
return {"error": "Failed to embed query"}
|
|
||||||
|
|
||||||
query_embedding = query_embeddings[0]
|
|
||||||
|
|
||||||
# Step 2: Extract entities from query (for graph expansion)
|
|
||||||
extracted_entities = []
|
|
||||||
if llm_func:
|
|
||||||
quick_extraction = extract_entities_relations(query, llm_func)
|
|
||||||
extracted_entities = [e["name"] for e in quick_extraction.get("entities", [])]
|
|
||||||
|
|
||||||
# Step 3: Hybrid retrieval
|
|
||||||
results = hybrid_retrieve(
|
|
||||||
query_embedding,
|
|
||||||
extracted_entities,
|
|
||||||
pipeline,
|
|
||||||
collection,
|
|
||||||
graph_name
|
|
||||||
)
|
|
||||||
|
|
||||||
# Step 4: Rerank
|
|
||||||
reranker_plugin = plugins.get("reranker", {})
|
|
||||||
ranked_results = rerank_results(query, results, reranker_plugin, top_k)
|
|
||||||
|
|
||||||
# Step 5: Generate answer (optional)
|
|
||||||
answer = None
|
|
||||||
if llm_func and ranked_results:
|
|
||||||
context = "\n\n".join([
|
|
||||||
r.get("text", "") or r.get("description", "") or str(r)
|
|
||||||
for r in ranked_results[:top_k]
|
|
||||||
])
|
|
||||||
|
|
||||||
gen_prompt = f"""基于以下上下文回答问题。如果上下文中没有相关信息,请如实说明。
|
|
||||||
|
|
||||||
上下文:
|
|
||||||
{context[:2000]}
|
|
||||||
|
|
||||||
问题:{query}
|
|
||||||
|
|
||||||
请用中文回答:"""
|
|
||||||
|
|
||||||
try:
|
|
||||||
answer = llm_func(gen_prompt)
|
|
||||||
except:
|
|
||||||
answer = None
|
|
||||||
|
|
||||||
elapsed = round(time.time() - start_time, 3)
|
|
||||||
|
|
||||||
return {
|
|
||||||
"status": "SUCCEEDED",
|
|
||||||
"pipeline": pipeline_name,
|
|
||||||
"query": query,
|
|
||||||
"results": ranked_results[:top_k],
|
|
||||||
"total_retrieved": len(results),
|
|
||||||
"answer": answer,
|
|
||||||
"extracted_entities": extracted_entities,
|
|
||||||
"elapsed": elapsed
|
|
||||||
}
|
|
||||||
@ -1,129 +0,0 @@
|
|||||||
# -*- coding:utf-8 -*-
|
|
||||||
"""Plugin registry - manages available RAG plugins and pipeline configs."""
|
|
||||||
import json
|
|
||||||
import os
|
|
||||||
import urllib.request
|
|
||||||
from typing import Dict, List, Optional
|
|
||||||
|
|
||||||
CONFIG_DIR = "/data/ymq/rag-pipeline/pipelines"
|
|
||||||
|
|
||||||
# Default pipeline config
|
|
||||||
DEFAULT_PIPELINE = {
|
|
||||||
"name": "kg-rag-standard",
|
|
||||||
"description": "标准知识图谱RAG",
|
|
||||||
"plugins": {
|
|
||||||
"embedding": {"type": "clip-vith14", "endpoint": "http://localhost:9086"},
|
|
||||||
"vdb": {"type": "vdb-milvus", "endpoint": "http://localhost:8886"},
|
|
||||||
"graph": {"type": "networkx", "endpoint": "http://localhost:9092"},
|
|
||||||
"llm": {"type": "harnessed", "endpoint": "internal"},
|
|
||||||
"reranker": {"type": "bge-reranker", "endpoint": "http://localhost:9090"},
|
|
||||||
"chunker": {"type": "recursive", "chunk_size": 512, "overlap": 64},
|
|
||||||
"extractor": {"type": "llm-structured"},
|
|
||||||
"retriever": {"type": "hybrid", "vector_top_k": 20, "graph_hops": 2}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
LITE_PIPELINE = {
|
|
||||||
"name": "kg-rag-lite",
|
|
||||||
"description": "轻量版RAG(纯向量,无图谱)",
|
|
||||||
"plugins": {
|
|
||||||
"embedding": {"type": "clip-vith14", "endpoint": "http://localhost:9086"},
|
|
||||||
"vdb": {"type": "vdb-milvus", "endpoint": "http://localhost:8886"},
|
|
||||||
"graph": {"type": "none"},
|
|
||||||
"llm": {"type": "harnessed", "endpoint": "internal"},
|
|
||||||
"reranker": {"type": "bge-reranker", "endpoint": "http://localhost:9090"},
|
|
||||||
"chunker": {"type": "recursive", "chunk_size": 512, "overlap": 64},
|
|
||||||
"retriever": {"type": "vector_only", "top_k": 10}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def _ensure_config_dir():
|
|
||||||
os.makedirs(CONFIG_DIR, exist_ok=True)
|
|
||||||
|
|
||||||
|
|
||||||
def list_pipelines() -> List[Dict]:
|
|
||||||
"""List all available pipeline configs."""
|
|
||||||
_ensure_config_dir()
|
|
||||||
pipelines = [DEFAULT_PIPELINE, LITE_PIPELINE]
|
|
||||||
|
|
||||||
# Load custom pipelines from disk
|
|
||||||
for f in os.listdir(CONFIG_DIR):
|
|
||||||
if f.endswith('.json'):
|
|
||||||
try:
|
|
||||||
with open(os.path.join(CONFIG_DIR, f), 'r') as fh:
|
|
||||||
pipelines.append(json.load(fh))
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
|
|
||||||
return pipelines
|
|
||||||
|
|
||||||
|
|
||||||
def get_pipeline(name: str) -> Optional[Dict]:
|
|
||||||
"""Get a pipeline config by name."""
|
|
||||||
for p in list_pipelines():
|
|
||||||
if p["name"] == name:
|
|
||||||
return p
|
|
||||||
return DEFAULT_PIPELINE
|
|
||||||
|
|
||||||
|
|
||||||
def save_pipeline(config: Dict) -> Dict:
|
|
||||||
"""Save a custom pipeline config."""
|
|
||||||
_ensure_config_dir()
|
|
||||||
filepath = os.path.join(CONFIG_DIR, f"{config['name']}.json")
|
|
||||||
with open(filepath, 'w') as f:
|
|
||||||
json.dump(config, f, ensure_ascii=False, indent=2)
|
|
||||||
return {"status": "ok", "filepath": filepath}
|
|
||||||
|
|
||||||
|
|
||||||
def list_plugins() -> Dict:
|
|
||||||
"""List all available plugins by capability."""
|
|
||||||
return {
|
|
||||||
"embedding": [
|
|
||||||
{"type": "clip-vith14", "model": "BAAI/CLIP-ViT-H-14", "dim": 1024, "endpoint": "http://localhost:9086", "status": "available"},
|
|
||||||
{"type": "bge-m3", "model": "BAAI/bge-m3", "dim": 1024, "status": "not_deployed"},
|
|
||||||
],
|
|
||||||
"vdb": [
|
|
||||||
{"type": "vdb-milvus", "backend": "Milvus", "endpoint": "http://localhost:8886", "status": "available"},
|
|
||||||
{"type": "qdrant", "backend": "Qdrant", "status": "not_deployed"},
|
|
||||||
],
|
|
||||||
"graph": [
|
|
||||||
{"type": "networkx", "backend": "NetworkX", "endpoint": "http://localhost:9092", "status": "available"},
|
|
||||||
{"type": "falkordb", "backend": "FalkorDB+Redis", "status": "blocked_redis_module"},
|
|
||||||
{"type": "none", "description": "Disable graph"},
|
|
||||||
],
|
|
||||||
"llm": [
|
|
||||||
{"type": "harnessed", "description": "harnessed_agent", "status": "available"},
|
|
||||||
],
|
|
||||||
"reranker": [
|
|
||||||
{"type": "bge-reranker", "model": "BAAI/bge-reranker-v2-m3", "endpoint": "http://localhost:9090", "status": "available"},
|
|
||||||
{"type": "none", "description": "Skip reranking"},
|
|
||||||
],
|
|
||||||
"chunker": [
|
|
||||||
{"type": "recursive", "description": "Recursive character splitter"},
|
|
||||||
{"type": "sentence", "description": "Sentence-based splitter"},
|
|
||||||
],
|
|
||||||
"extractor": [
|
|
||||||
{"type": "llm-structured", "description": "LLM-based entity/relation extraction"},
|
|
||||||
{"type": "none", "description": "Skip extraction"},
|
|
||||||
],
|
|
||||||
"retriever": [
|
|
||||||
{"type": "hybrid", "description": "Vector + Graph hybrid retrieval"},
|
|
||||||
{"type": "vector_only", "description": "Pure vector retrieval"},
|
|
||||||
],
|
|
||||||
"face": [
|
|
||||||
{"type": "insightface", "model": "buffalo_l", "dim": 512, "status": "available", "endpoint": "http://localhost:9091"},
|
|
||||||
]
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def call_plugin(endpoint: str, path: str, data: Dict, timeout: int = 30) -> Dict:
|
|
||||||
"""Call a plugin endpoint via HTTP POST."""
|
|
||||||
url = f"{endpoint}{path}"
|
|
||||||
payload = json.dumps(data).encode('utf-8')
|
|
||||||
req = urllib.request.Request(url, data=payload, headers={'Content-Type': 'application/json'})
|
|
||||||
try:
|
|
||||||
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
||||||
return json.loads(resp.read().decode('utf-8'))
|
|
||||||
except Exception as e:
|
|
||||||
return {"error": str(e)}
|
|
||||||
Loading…
x
Reference in New Issue
Block a user