Compare commits

..

No commits in common. "main" and "master" have entirely different histories.
main ... master

11 changed files with 617 additions and 1 deletions

23
.gitignore vendored Normal file
View File

@ -0,0 +1,23 @@
__pycache__/
*.pyc
*.pyo
*.pyd
.Python
env/
venv/
.env
*.log
*.log.*
*.pid
*.seed
*.pid.lock
coverage/
.nyc_output/
.pytest_cache/
.mypy_cache/
.idea/
.vscode/
*.swp
*.swo
*~
.DS_Store

View File

@ -1,2 +1,84 @@
# verify_delivery # verify-delivery-service
KTV产线交付质检HTTP服务 — 4项QA检查
## 服务说明
将KTV产线的交付质检流程封装为HTTP服务提供统一的质检入口。
### 4项QA检查
1. **QA1 字幕时间精准性** — 时间单调递增、无重叠、无gap>2s
2. **QA2 字幕歌词正确性** — ASS中歌词与原始歌词逐行比对
3. **QA3 MTV使用原音频** — 单轨、duration匹配原曲
4. **QA4 KTV双轨音序** — Track1=伴奏(Accompaniment), Track2=原唱(Original)
## 部署
```bash
git clone git@git.opencomputing.cn:yumoqing/verify_delivery.git
cd verify_delivery
chmod +x start.sh stop.sh
./start.sh
```
服务运行在 `http://0.0.0.0:9085`
## API端点
### GET /api/status
服务状态检查
### POST /api/submit
提交质检任务
参数:
- `mtv_path`: MTV视频路径可选
- `ktv_path`: KTV视频路径可选
- `ass_path`: ASS字幕路径可选
- `lyrics_path`: 原始歌词路径(可选)
- `calibrated_path`: calibrated.json路径可选用于QA1
- `original_duration`: 原曲时长(秒,可选)
返回:
```json
{
"status": "submitted",
"task_id": "xxx",
"message": "质检任务已提交"
}
```
### GET /api/task?task_id=xxx
查询质检任务结果
返回:
```json
{
"task_id": "xxx",
"status": "completed",
"result": {
"status": "PASSED",
"qa_results": {
"QA1_timeline": {"passed": true, "errors": []},
"QA2_lyrics": {"passed": true, "errors": []},
"QA3_mtv": {"passed": true, "errors": []},
"QA4_ktv": {"passed": true, "errors": []}
},
"total_errors": 0,
"errors": []
}
}
```
## 技术栈
- ahserver (ahserver.webapp)
- longtasks (异步任务队列)
- ffprobe (视频/音频元数据提取)
- Python 3.10
## Git仓库
- 远端:`git@git.opencomputing.cn:yumoqing/verify_delivery.git`
- 本地:`~/test/verify-delivery-service/`

46
ah.py Normal file
View File

@ -0,0 +1,46 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""KTV产线交付质检服务 — verify_delivery as HTTP service"""
import sys, os
sys.path.insert(0, os.path.dirname(__file__))
from ahserver.webapp import webapp
from ahserver.serverenv import ServerEnv
from ahserver.configuredServer import add_startup
from longtasks.longtasks import LongTasks, schedule_once
from appPublic.log import debug
from workers.verify import VerifyWorker
class VerifyTasks(LongTasks):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
async def process_task(self, payload, workid=None):
import json
if isinstance(payload, str):
payload = json.loads(payload)
worker = VerifyWorker(self)
result = await worker.run_task(payload)
return result
async def on_app_built(app):
env = ServerEnv()
lt = env.longtasks
if lt:
schedule_once(0.1, lt.run)
debug('Verify delivery longtasks worker started')
def init():
env = ServerEnv()
env.longtasks = VerifyTasks(
'redis://127.0.0.1:6379', 'verify_delivery',
worker_cnt=1, stuck_seconds=600, max_age_hours=24
)
add_startup(on_app_built)
if __name__ == '__main__':
webapp(init)

View File

@ -0,0 +1,9 @@
# -*- coding: utf-8 -*-
import json
result = {
"service": "verify-delivery",
"status": "running",
"checks": ["QA1_timeline", "QA2_lyrics", "QA3_mtv", "QA4_ktv"],
"description": "KTV产线交付质检服务 — 4项QA检查"
}
return json.dumps(result, ensure_ascii=False, indent=2)

60
app/api/submit/index.dspy Normal file
View File

@ -0,0 +1,60 @@
# -*- coding:utf-8 -*-
# POST /api/submit - 提交KTV交付质检任务
import json
import uuid
from ahserver.serverenv import ServerEnv
method = request.method
if method == "POST":
mtv_path = params_kw.get("mtv_path", "")
ktv_path = params_kw.get("ktv_path", "")
ass_path = params_kw.get("ass_path", "")
lyrics_path = params_kw.get("lyrics_path", "")
calibrated_path = params_kw.get("calibrated_path", "")
original_duration = params_kw.get("original_duration", 0)
if not any([mtv_path, ktv_path]):
return json.dumps({"error": "至少需要提供mtv_path或ktv_path"}, ensure_ascii=False)
task_id = params_kw.get("task_id", str(uuid.uuid4()).replace("-", "")[:12])
payload = {
"task_type": "verify",
"task_id": task_id,
"mtv_path": mtv_path,
"ktv_path": ktv_path,
"ass_path": ass_path,
"lyrics_path": lyrics_path,
"calibrated_path": calibrated_path,
"original_duration": float(original_duration) if original_duration else 0
}
env = ServerEnv()
longtasks = env.longtasks
if longtasks is None:
return json.dumps({"error": "service not ready"}, ensure_ascii=False)
result = await longtasks.submit_task(payload)
real_task_id = result.get("task_id", str(result)) if isinstance(result, dict) else str(result)
return json.dumps({
"task_id": real_task_id,
"status": "queued",
"message": "质检任务已提交",
"check_url": f"/api/task?task_id={real_task_id}"
}, ensure_ascii=False)
else:
return json.dumps({
"usage": "POST with JSON body",
"params": {
"mtv_path": "string (optional, MTV视频路径)",
"ktv_path": "string (optional, KTV视频路径)",
"ass_path": "string (optional, ASS字幕路径)",
"lyrics_path": "string (optional, 原始歌词路径)",
"calibrated_path": "string (optional, calibrated.json路径)",
"original_duration": "float (optional, 原曲时长秒)",
"task_id": "string (optional, auto-generated)"
}
}, ensure_ascii=False)

17
app/api/task/index.dspy Normal file
View File

@ -0,0 +1,17 @@
# -*- coding:utf-8 -*-
# GET /api/task?task_id=xxx - 查询任务状态
import json
from ahserver.serverenv import ServerEnv
task_id = params_kw.get("task_id", "")
if not task_id:
return json.dumps({"error": "task_id is required"}, ensure_ascii=False)
env = ServerEnv()
longtasks = env.longtasks
if longtasks is None:
return json.dumps({"error": "service not ready"}, ensure_ascii=False)
status = await longtasks.get_status(task_id)
return json.dumps(status)

150
build.sh Executable file
View File

@ -0,0 +1,150 @@
#!/bin/bash
# 一键部署脚本模板
# 用法: ./build.sh [deploy|update|stop|status]
set -e
SERVICE_NAME="verify_delivery"
GIT_REPO="git@git.opencomputing.cn:yumoqing/verify_delivery.git"
SERVICE_PORT=9085
DEPLOY_DIR="/data/ymq/$SERVICE_NAME"
VENV_PATH="/data/ymq/wan22-service/py3"
GPU_ID="CPU"
# 颜色输出
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m'
log_info() { echo -e "${GREEN}[INFO]${NC} $1"; }
log_warn() { echo -e "${YELLOW}[WARN]${NC} $1"; }
log_error() { echo -e "${RED}[ERROR]${NC} $1"; }
check_deps() {
command -v git >/dev/null || { log_error "git not found"; exit 1; }
[ -f "$VENV_PATH/bin/python" ] || { log_error "Python venv not found: $VENV_PATH"; exit 1; }
}
deploy() {
log_info "Deploying $SERVICE_NAME..."
# 检查依赖
check_deps
# 克隆或更新代码
if [ -d "$DEPLOY_DIR/.git" ]; then
log_info "Updating existing deployment..."
cd "$DEPLOY_DIR"
git fetch origin
git reset --hard origin/master
else
log_info "Cloning repository..."
cd /data/ymq
git clone "$GIT_REPO" "$SERVICE_NAME"
cd "$DEPLOY_DIR"
fi
# 创建必要目录
mkdir -p "$DEPLOY_DIR/app/api/status"
mkdir -p "$DEPLOY_DIR/app/api/submit"
mkdir -p "$DEPLOY_DIR/app/api/task"
# 设置权限
chmod +x start.sh stop.sh 2>/dev/null || true
# 启动服务
start_service
}
start_service() {
log_info "Starting $SERVICE_NAME on port $SERVICE_PORT..."
# 停止旧进程
if [ -f stop.sh ]; then
bash stop.sh 2>/dev/null || true
sleep 2
fi
# 启动新进程
bash start.sh
# 等待启动
sleep 3
# 验证
if ss -tlnp | grep -q ":$SERVICE_PORT "; then
log_info "✓ Service started successfully"
verify_api
else
log_error "✗ Service failed to start"
log_error "Check logs: $DEPLOY_DIR/nohup.out"
exit 1
fi
}
verify_api() {
log_info "Verifying API endpoints..."
# 检查 status endpoint
if curl -s "http://127.0.0.1:$SERVICE_PORT/api/status" | grep -q "service"; then
log_info "✓ /api/status OK"
else
log_warn "✗ /api/status failed"
fi
}
stop_service() {
log_info "Stopping $SERVICE_NAME..."
if [ -f "$DEPLOY_DIR/stop.sh" ]; then
cd "$DEPLOY_DIR"
bash stop.sh
log_info "✓ Service stopped"
else
log_warn "stop.sh not found"
fi
}
show_status() {
echo "=== $SERVICE_NAME Status ==="
echo "Port: $SERVICE_PORT"
echo "Deploy Dir: $DEPLOY_DIR"
echo ""
# 检查进程
if ss -tlnp | grep -q ":$SERVICE_PORT "; then
echo -e "Status: ${GREEN}RUNNING${NC}"
PID=$(ss -tlnp | grep ":$SERVICE_PORT " | grep -oP 'pid=\K[0-9]+')
echo "PID: $PID"
else
echo -e "Status: ${RED}STOPPED${NC}"
fi
echo ""
# 检查 API
echo "API Endpoints:"
curl -s "http://127.0.0.1:$SERVICE_PORT/api/status" 2>/dev/null | python3 -m json.tool 2>/dev/null || echo " (not responding)"
}
# 主入口
case "${1:-deploy}" in
deploy|install)
deploy
;;
update|upgrade)
deploy
;;
stop)
stop_service
;;
start)
start_service
;;
status)
show_status
;;
*)
echo "Usage: $0 {deploy|update|stop|start|status}"
exit 1
;;
esac

16
conf/config.json Normal file
View File

@ -0,0 +1,16 @@
{
"password_key": "VerifyDeliveryService2026Key",
"databases": {},
"session_redis": {"host": "127.0.0.1", "port": 6379, "db": 1},
"website": {
"paths": [["$[workdir]$/app", ""]],
"host": "0.0.0.0",
"port": 9085,
"coding": "utf-8",
"indexes": ["index.html", "index.dspy"],
"processors": [[".dspy", "dspy"]],
"startswiths": [{"leading": "/idfile", "registerfunction": "idfile"}]
},
"hot_reload": false,
"filesroot": "/tmp/verify-delivery-outputs"
}

5
start.sh Executable file
View File

@ -0,0 +1,5 @@
#!/bin/bash
cd "$(dirname "$0")"
nohup /data/ymq/wan22-service/py3/bin/python ah.py > service.log 2>&1 &
echo $! > service.pid
echo "verify-delivery-service started, PID: $!"

14
stop.sh Executable file
View File

@ -0,0 +1,14 @@
#!/bin/bash
cd "$(dirname "$0")"
if [ -f service.pid ]; then
pid=$(cat service.pid)
if kill -0 $pid 2>/dev/null; then
kill $pid
echo "verify-delivery-service stopped, PID: $pid"
else
echo "Process $pid not running"
fi
rm -f service.pid
else
echo "service.pid not found"
fi

194
workers/verify.py Normal file
View File

@ -0,0 +1,194 @@
# -*- coding: utf-8 -*-
"""KTV产线交付质检Worker — 4项QA检查"""
import json, os, subprocess, re
def run_ffprobe(filepath, args):
cmd = ["ffprobe", "-v", "error"] + args + [filepath]
r = subprocess.run(cmd, capture_output=True, text=True)
return r.stdout.strip()
class VerifyWorker:
def __init__(self, longtasks):
self.lt = longtasks
async def run_task(self, payload):
mtv_path = payload.get("mtv_path", "")
ktv_path = payload.get("ktv_path", "")
ass_path = payload.get("ass_path", "")
lyrics_path = payload.get("lyrics_path", "")
original_duration = float(payload.get("original_duration", 0))
# 检查文件是否存在
missing = []
for label, p in [("MTV", mtv_path), ("KTV", ktv_path), ("ASS", ass_path), ("歌词", lyrics_path)]:
if p and not os.path.exists(p):
missing.append(f"{label}: {p}")
if missing:
return {"status": "FAILED", "error": "文件不存在", "missing": missing}
all_errors = []
qa_results = {}
# QA-1: 字幕时间精准性从calibrated数据检查
# 如果有calibrated_path检查时间轴
calibrated_path = payload.get("calibrated_path", "")
if calibrated_path and os.path.exists(calibrated_path):
qa1_errors = self._check_timeline(calibrated_path)
qa_results["QA1_timeline"] = {"passed": len(qa1_errors) == 0, "errors": qa1_errors}
all_errors.extend(qa1_errors)
else:
qa_results["QA1_timeline"] = {"passed": True, "errors": [], "skipped": "no calibrated_path"}
# QA-2: 字幕歌词正确性
qa2_errors = self._check_subtitles(ass_path, lyrics_path)
qa_results["QA2_lyrics"] = {"passed": len(qa2_errors) == 0, "errors": qa2_errors}
all_errors.extend(qa2_errors)
# QA-3: MTV使用原音频
if mtv_path and os.path.exists(mtv_path):
qa3_errors = self._check_mtv(mtv_path, original_duration)
qa_results["QA3_mtv"] = {"passed": len(qa3_errors) == 0, "errors": qa3_errors}
all_errors.extend(qa3_errors)
else:
qa_results["QA3_mtv"] = {"passed": True, "errors": [], "skipped": "no MTV file"}
# QA-4: KTV双轨音序
if ktv_path and os.path.exists(ktv_path):
qa4_errors = self._check_ktv(ktv_path, original_duration)
qa_results["QA4_ktv"] = {"passed": len(qa4_errors) == 0, "errors": qa4_errors}
all_errors.extend(qa4_errors)
else:
qa_results["QA4_ktv"] = {"passed": True, "errors": [], "skipped": "no KTV file"}
# 汇总
passed = len(all_errors) == 0
return {
"status": "PASSED" if passed else "FAILED",
"qa_results": qa_results,
"total_errors": len(all_errors),
"errors": all_errors
}
def _check_timeline(self, calibrated_path):
"""QA-1: 字幕时间精准性 — 时间单调递增、无重叠"""
errors = []
try:
with open(calibrated_path, 'r') as f:
data = json.load(f)
lyrics = data.get("lyrics", data.get("segments", data))
if not isinstance(lyrics, list):
return ["calibrated数据格式错误应为list"]
for i in range(len(lyrics)):
seg = lyrics[i]
start = seg.get("start", 0)
end = seg.get("end", 0)
if end <= start:
errors.append(f"{i+1}: end({end:.2f})<=start({start:.2f})")
if i > 0:
prev_end = lyrics[i-1].get("end", 0)
if start < prev_end - 0.1:
errors.append(f"{i+1}: start({start:.2f})与前一行end({prev_end:.2f})重叠")
except Exception as e:
errors.append(f"读取calibrated文件失败: {e}")
return errors
def _check_subtitles(self, ass_path, lyrics_path):
"""QA-2: 字幕歌词正确性 — ASS中歌词与原始歌词逐行比对"""
errors = []
if not ass_path or not lyrics_path:
return errors
if not os.path.exists(ass_path):
return [f"ASS文件不存在: {ass_path}"]
if not os.path.exists(lyrics_path):
return [f"原始歌词文件不存在: {lyrics_path}"]
with open(lyrics_path, 'r') as f:
original_lines = [l.strip() for l in f.readlines() if l.strip()]
with open(ass_path, 'r') as f:
ass_content = f.read()
karaoke_lines = []
for line in ass_content.split('\n'):
if 'Karaoke' in line and 'Dialogue' in line:
parts = line.split(',', 9)
if len(parts) >= 10:
text = parts[9]
clean = re.sub(r'\{\\kf\d+\}', '', text).strip()
if clean:
karaoke_lines.append(clean)
if len(karaoke_lines) != len(original_lines):
errors.append(f"歌词行数不匹配: ASS={len(karaoke_lines)}行, 原始={len(original_lines)}")
mismatch_count = 0
for i in range(min(len(karaoke_lines), len(original_lines))):
kl = karaoke_lines[i].replace(' ', '')
ol = original_lines[i].replace(' ', '')
if kl != ol:
mismatch_count += 1
if mismatch_count <= 5:
errors.append(f"{i+1}: ASS='{kl}' vs 原始='{ol}'")
if mismatch_count > 5:
errors.append(f"... 还有{mismatch_count-5}处不匹配")
return errors
def _check_mtv(self, path, original_duration):
"""QA-3: MTV使用原音频单轨duration匹配"""
errors = []
try:
streams = run_ffprobe(path, [
"-show_entries", "stream=index,codec_type,duration",
"-show_entries", "stream_tags=handler_name",
"-of", "json"
])
data = json.loads(streams)
audio_streams = [s for s in data.get("streams", []) if s.get("codec_type") == "audio"]
if len(audio_streams) != 1:
errors.append(f"MTV应有1条音轨实际{len(audio_streams)}")
if audio_streams and original_duration > 0:
dur = float(audio_streams[0].get("duration", 0))
if abs(dur - original_duration) > 2:
errors.append(f"音频时长{dur:.1f}s与原曲{original_duration:.1f}s差距>2s")
fmt = run_ffprobe(path, ["-show_entries", "format=duration", "-of", "csv=p=0"])
if fmt and original_duration > 0:
video_dur = float(fmt)
if abs(video_dur - original_duration) > 3:
errors.append(f"视频时长{video_dur:.1f}s与原曲{original_duration:.1f}s差距>3s")
except Exception as e:
errors.append(f"ffprobe失败: {e}")
return errors
def _check_ktv(self, path, original_duration):
"""QA-4: KTV双轨音序 — Track1=伴奏, Track2=原唱"""
errors = []
try:
streams = run_ffprobe(path, [
"-show_entries", "stream=index,codec_type,duration",
"-show_entries", "stream_tags=handler_name",
"-of", "json"
])
data = json.loads(streams)
audio_streams = [s for s in data.get("streams", []) if s.get("codec_type") == "audio"]
if len(audio_streams) != 2:
errors.append(f"KTV应有2条音轨实际{len(audio_streams)}")
return errors
t1 = audio_streams[0].get("tags", {}).get("handler_name", "")
if "伴奏" not in t1 and "Accompaniment" not in t1:
errors.append(f"Track1应为伴奏实际标签: '{t1}'")
t2 = audio_streams[1].get("tags", {}).get("handler_name", "")
if "原唱" not in t2 and "Original" not in t2:
errors.append(f"Track2应为原唱实际标签: '{t2}'")
if original_duration > 0:
for i, a in enumerate(audio_streams):
dur = float(a.get("duration", 0))
if abs(dur - original_duration) > 2:
errors.append(f"音轨{i+1}时长{dur:.1f}s与原曲{original_duration:.1f}s差距>2s")
except Exception as e:
errors.append(f"ffprobe失败: {e}")
return errors