feat: multi-process architecture with independent backend processes
- Extract backend_accounting from llmage cleanupctx to independent process - Add bin/backend_accounting.py for standalone LLM billing loop - Rewrite start.sh with two-phase startup: 1. Independent backend programs (run once) 2. Sage Web workers (SO_REUSEPORT on same port) - Rewrite stop.sh to handle both workers and backend processes - Add .gitignore for build artifacts and runtime files Architecture: - CPU core detection for worker count - All workers share port 9180 via SO_REUSEPORT - Backend processes tracked in sage_backend.pid - Workers tracked in sage.pid
This commit is contained in:
parent
53285aa17e
commit
3de5a1ce91
42
.gitignore
vendored
Normal file
42
.gitignore
vendored
Normal file
@ -0,0 +1,42 @@
|
||||
# Python
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*.egg-info/
|
||||
dist/
|
||||
build/
|
||||
|
||||
# Virtual environment
|
||||
py3/
|
||||
|
||||
# Logs
|
||||
logs/
|
||||
|
||||
# PID files
|
||||
*.pid
|
||||
|
||||
# Generated files
|
||||
*.pem
|
||||
*.key
|
||||
merchant_*.pem
|
||||
alipay_*.pem
|
||||
pay_*.pem
|
||||
|
||||
# Database
|
||||
models/mysql.ddl.sql
|
||||
|
||||
# pkgs (submodules should be in their own repos)
|
||||
pkgs/
|
||||
|
||||
# wwwroot (linked from module repos)
|
||||
wwwroot/
|
||||
|
||||
# Migration scripts (run once, not needed in repo)
|
||||
migrate_*.py
|
||||
reset_*.py
|
||||
check_*.py
|
||||
set_*.sh
|
||||
setup_*.sh
|
||||
|
||||
# Sage runtime
|
||||
sage.pid
|
||||
sage_backend.pid
|
||||
79
bin/backend_accounting.py
Normal file
79
bin/backend_accounting.py
Normal file
@ -0,0 +1,79 @@
|
||||
#!/usr/bin/env python
|
||||
"""
|
||||
独立运行的LLM后台计费程序。
|
||||
从 sage.py 的 llmage 模块中提取,避免多进程模式下重复运行。
|
||||
"""
|
||||
import os
|
||||
import sys
|
||||
import asyncio
|
||||
import signal
|
||||
|
||||
# 切换到 Sage 工作目录
|
||||
os.chdir(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'py3', 'lib', 'python3.10', 'site-packages'))
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'pkgs'))
|
||||
|
||||
from appPublic.folderUtils import ProgramPath
|
||||
from appPublic.jsonConfig import getConfig
|
||||
from sqlor.dbpools import DBPools
|
||||
from appPublic.log import MyLogger, debug, exception, info
|
||||
|
||||
# 初始化配置
|
||||
p = ProgramPath()
|
||||
config = getConfig(NS={'workdir': os.getcwd(), 'ProgramPath': p})
|
||||
DBPools(config.databases)
|
||||
|
||||
# 导入 llmage 的计费函数
|
||||
from llmage.accounting import (
|
||||
get_accounting_llmusages,
|
||||
llm_accounting,
|
||||
llm_accoung_failed
|
||||
)
|
||||
|
||||
async def backend_accounting():
|
||||
"""LLM 使用计费循环"""
|
||||
info('backend accounting started ...')
|
||||
while True:
|
||||
try:
|
||||
lus = await get_accounting_llmusages()
|
||||
except Exception as e:
|
||||
exception(f'{e}')
|
||||
lus = []
|
||||
debug(f'{len(lus)=} need to accounting........')
|
||||
for lu in lus:
|
||||
try:
|
||||
debug(f'backend_accounting(): {lu.id=} handleing...')
|
||||
await llm_accounting(lu)
|
||||
except Exception as e:
|
||||
exception(f'{e}, {lu.id=}')
|
||||
await llm_accoung_failed(lu.id)
|
||||
|
||||
await asyncio.sleep(10)
|
||||
|
||||
def main():
|
||||
logger = MyLogger('backend_accounting', levelname='info',
|
||||
logfile=os.path.join(os.getcwd(), 'logs', 'backend_accounting.log'))
|
||||
info(f'Backend accounting process started (PID: {os.getpid()})')
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
def handle_signal(signum, frame):
|
||||
info(f'Received signal {signum}, shutting down...')
|
||||
for task in asyncio.all_tasks(loop):
|
||||
task.cancel()
|
||||
loop.stop()
|
||||
|
||||
signal.signal(signal.SIGTERM, handle_signal)
|
||||
signal.signal(signal.SIGINT, handle_signal)
|
||||
|
||||
try:
|
||||
loop.run_until_complete(backend_accounting())
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
finally:
|
||||
loop.close()
|
||||
info('Backend accounting process stopped.')
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
63
start.sh
63
start.sh
@ -1,6 +1,10 @@
|
||||
#!/bin/bash
|
||||
# Sage Web Application Start Script
|
||||
# Multi-process support based on CPU cores
|
||||
# 多进程支持: 端口复用 + 独立后台程序
|
||||
#
|
||||
# 架构说明:
|
||||
# 1. 独立后台程序 (bin/*.py) - 每个只启动一次,避免重复
|
||||
# 2. Sage Web Workers - 根据CPU核心数启动多个worker,使用端口复用
|
||||
|
||||
set -e
|
||||
|
||||
@ -13,6 +17,7 @@ PYTHON="./py3/bin/python"
|
||||
APP_ENTRY="app/sage.py"
|
||||
PIDFILE="$WORKDIR/sage.pid"
|
||||
LOGDIR="$WORKDIR/logs"
|
||||
BINPIDFILE="$WORKDIR/sage_backend.pid"
|
||||
|
||||
# 确保logs目录存在
|
||||
mkdir -p "$LOGDIR"
|
||||
@ -52,14 +57,10 @@ echo "工作目录: $WORKDIR"
|
||||
echo "Python: $PYTHON"
|
||||
echo "========================================="
|
||||
|
||||
# 获取 CPU 核心数,决定启动多少 Worker
|
||||
WORKERS=$(nproc)
|
||||
echo "检测到 ${WORKERS} 个 CPU 核心,准备启动 ${WORKERS} 个工作进程..."
|
||||
|
||||
# 获取基础端口
|
||||
BASE_PORT=9180
|
||||
# 获取端口
|
||||
PORT=9180
|
||||
if command -v python3 &> /dev/null; then
|
||||
BASE_PORT=$($PYTHON -c "
|
||||
PORT=$($PYTHON -c "
|
||||
import json
|
||||
try:
|
||||
with open('$WORKDIR/conf/config.json') as f:
|
||||
@ -70,18 +71,52 @@ except Exception as e:
|
||||
" 2>/dev/null || echo 9180)
|
||||
fi
|
||||
|
||||
# 清空 PID 文件
|
||||
# =========================================
|
||||
# 步骤 1: 启动独立后台程序
|
||||
# =========================================
|
||||
echo ""
|
||||
echo "--- 启动独立后台程序 ---"
|
||||
|
||||
# 清空后台程序 PID 文件
|
||||
> "$BINPIDFILE"
|
||||
|
||||
# 1.1 启动 LLM 后台计费程序
|
||||
if [ -f "$WORKDIR/bin/backend_accounting.py" ]; then
|
||||
LOGFILE="$LOGDIR/backend_accounting.log"
|
||||
echo ">>> 启动 backend_accounting ..."
|
||||
nohup $PYTHON "$WORKDIR/bin/backend_accounting.py" > "$LOGFILE" 2>&1 &
|
||||
PID=$!
|
||||
echo "backend_accounting:$PID" >> "$BINPIDFILE"
|
||||
sleep 0.5
|
||||
if kill -0 $PID 2>/dev/null; then
|
||||
echo " -> backend_accounting PID: $PID (成功)"
|
||||
else
|
||||
echo " -> 警告: backend_accounting 启动失败,查看 $LOGFILE"
|
||||
fi
|
||||
fi
|
||||
|
||||
# =========================================
|
||||
# 步骤 2: 启动 Sage Web Workers (端口复用)
|
||||
# =========================================
|
||||
echo ""
|
||||
echo "--- 启动 Sage Web Workers (端口复用) ---"
|
||||
|
||||
# 获取 CPU 核心数,决定启动多少 Worker
|
||||
WORKERS=$(nproc)
|
||||
echo "检测到 ${WORKERS} 个 CPU 核心,准备启动 ${WORKERS} 个 worker 进程..."
|
||||
echo "所有 worker 共享端口 ${PORT} (SO_REUSEPORT)"
|
||||
|
||||
# 清空 Worker PID 文件
|
||||
> "$PIDFILE"
|
||||
|
||||
# 循环启动 Worker
|
||||
for (( i=0; i<WORKERS; i++ ))
|
||||
do
|
||||
PORT=$((BASE_PORT + i))
|
||||
LOGFILE="$LOGDIR/sage_worker_${i}.log"
|
||||
|
||||
echo ">>> 启动 Worker $((i+1))/${WORKERS} on port $PORT ..."
|
||||
|
||||
# 启动服务
|
||||
# 启动服务 - 所有worker使用相同端口,依赖ahserver的SO_REUSEPORT支持
|
||||
nohup $PYTHON $APP_ENTRY --workdir "$WORKDIR" --port $PORT > "$LOGFILE" 2>&1 &
|
||||
APP_PID=$!
|
||||
|
||||
@ -97,8 +132,10 @@ do
|
||||
fi
|
||||
done
|
||||
|
||||
echo ""
|
||||
echo "========================================="
|
||||
echo "所有服务已启动"
|
||||
echo "PID 文件: $PIDFILE"
|
||||
echo "访问地址: http://localhost:${BASE_PORT} (以及其他 ${WORKERS} 个端口)"
|
||||
echo "Worker PID 文件: $PIDFILE"
|
||||
echo "Backend PID 文件: $BINPIDFILE"
|
||||
echo "访问地址: http://localhost:${PORT}"
|
||||
echo "========================================="
|
||||
|
||||
120
stop.sh
120
stop.sh
@ -1,6 +1,6 @@
|
||||
#!/bin/bash
|
||||
# Sage Web Application Stop Script
|
||||
# Supports multi-process setup
|
||||
# 停止所有 Web Workers 和独立后台程序
|
||||
|
||||
set -e
|
||||
|
||||
@ -9,71 +9,107 @@ cd "$(dirname "$0")"
|
||||
|
||||
WORKDIR="$(pwd)"
|
||||
PIDFILE="$WORKDIR/sage.pid"
|
||||
BINPIDFILE="$WORKDIR/sage_backend.pid"
|
||||
|
||||
echo "========================================="
|
||||
echo "停止 Sage Web Application"
|
||||
echo "========================================="
|
||||
|
||||
STOPPED_PIDS=""
|
||||
# =========================================
|
||||
# 步骤 1: 停止 Web Workers
|
||||
# =========================================
|
||||
echo ""
|
||||
echo "--- 停止 Web Workers ---"
|
||||
|
||||
# 1. 尝试从 PID 文件停止
|
||||
if [ -f "$PIDFILE" ]; then
|
||||
echo "读取 PID 文件..."
|
||||
while read -r APP_PID; do
|
||||
# 跳过空行
|
||||
if [ -z "$APP_PID" ]; then continue; fi
|
||||
echo "读取 Worker PID 文件..."
|
||||
|
||||
while IFS= read -r pid || [ -n "$pid" ]; do
|
||||
pid=$(echo "$pid" | tr -d '[:space:]')
|
||||
[ -z "$pid" ] && continue
|
||||
|
||||
if kill -0 "$APP_PID" 2>/dev/null; then
|
||||
echo "正在停止 Worker (PID: $APP_PID) ..."
|
||||
kill "$APP_PID" 2>/dev/null || true
|
||||
STOPPED_PIDS="$STOPPED_PIDS $APP_PID"
|
||||
if kill -0 "$pid" 2>/dev/null; then
|
||||
echo "正在停止 Worker (PID: $pid) ..."
|
||||
kill "$pid" 2>/dev/null || true
|
||||
else
|
||||
echo "Worker (PID: $APP_PID) 已停止"
|
||||
echo "Worker (PID: $pid) 已不在运行"
|
||||
fi
|
||||
done < "$PIDFILE"
|
||||
|
||||
# 等待进程结束
|
||||
WAIT_COUNT=0
|
||||
while [ $WAIT_COUNT -lt 10 ]; do
|
||||
ALL_STOPPED=true
|
||||
for PID in $STOPPED_PIDS; do
|
||||
if kill -0 "$PID" 2>/dev/null; then
|
||||
ALL_STOPPED=false
|
||||
# 等待进程退出
|
||||
echo "等待服务关闭..."
|
||||
for i in $(seq 1 10); do
|
||||
all_stopped=true
|
||||
while IFS= read -r pid || [ -n "$pid" ]; do
|
||||
pid=$(echo "$pid" | tr -d '[:space:]')
|
||||
[ -z "$pid" ] && continue
|
||||
if kill -0 "$pid" 2>/dev/null; then
|
||||
all_stopped=false
|
||||
break
|
||||
fi
|
||||
done
|
||||
done < "$PIDFILE"
|
||||
|
||||
if $ALL_STOPPED; then
|
||||
if $all_stopped; then
|
||||
echo "所有 Worker 已停止 (用时 ${i}s)"
|
||||
break
|
||||
fi
|
||||
|
||||
sleep 1
|
||||
WAIT_COUNT=$((WAIT_COUNT + 1))
|
||||
echo "等待服务关闭... ($WAIT_COUNT/10)"
|
||||
done
|
||||
|
||||
# 强制杀死未退出的
|
||||
for PID in $STOPPED_PIDS; do
|
||||
if kill -0 "$PID" 2>/dev/null; then
|
||||
echo "强制停止进程: $PID"
|
||||
kill -9 "$PID" 2>/dev/null || true
|
||||
# 强制杀死仍在运行的进程
|
||||
while IFS= read -r pid || [ -n "$pid" ]; do
|
||||
pid=$(echo "$pid" | tr -d '[:space:]')
|
||||
[ -z "$pid" ] && continue
|
||||
if kill -0 "$pid" 2>/dev/null; then
|
||||
echo "强制停止 Worker (PID: $pid)"
|
||||
kill -9 "$pid" 2>/dev/null || true
|
||||
fi
|
||||
done
|
||||
done < "$PIDFILE"
|
||||
|
||||
# 清理 PID 文件
|
||||
rm -f "$PIDFILE"
|
||||
else
|
||||
echo "未找到 Worker PID 文件 ($PIDFILE)"
|
||||
fi
|
||||
|
||||
# 2. 兜底清理 (通过进程名查找,防止 PID 文件丢失)
|
||||
# 注意:这里匹配 app/sage.py
|
||||
PIDS=$(ps aux | grep "[a]pp/sage.py" | awk '{print $2}' || true)
|
||||
if [ -n "$PIDS" ]; then
|
||||
echo "发现残留进程,强制清理..."
|
||||
for PID in $PIDS; do
|
||||
kill -9 "$PID" 2>/dev/null || true
|
||||
done
|
||||
# =========================================
|
||||
# 步骤 2: 停止独立后台程序
|
||||
# =========================================
|
||||
echo ""
|
||||
echo "--- 停止独立后台程序 ---"
|
||||
|
||||
if [ -f "$BINPIDFILE" ]; then
|
||||
echo "读取后台程序 PID 文件..."
|
||||
|
||||
while IFS= read -r line || [ -n "$line" ]; do
|
||||
line=$(echo "$line" | tr -d '[:space:]')
|
||||
[ -z "$line" ] && continue
|
||||
|
||||
# 格式: name:pid
|
||||
name="${line%%:*}"
|
||||
pid="${line##*:}"
|
||||
|
||||
if kill -0 "$pid" 2>/dev/null; then
|
||||
echo "正在停止 $name (PID: $pid) ..."
|
||||
kill "$pid" 2>/dev/null || true
|
||||
sleep 1
|
||||
if kill -0 "$pid" 2>/dev/null; then
|
||||
echo "强制停止 $name (PID: $pid)"
|
||||
kill -9 "$pid" 2>/dev/null || true
|
||||
fi
|
||||
echo " -> $name 已停止"
|
||||
else
|
||||
echo "$name (PID: $pid) 已不在运行"
|
||||
fi
|
||||
done < "$BINPIDFILE"
|
||||
|
||||
# 清理 PID 文件
|
||||
rm -f "$BINPIDFILE"
|
||||
else
|
||||
echo "未找到后台程序 PID 文件 ($BINPIDFILE)"
|
||||
fi
|
||||
|
||||
# 清理 PID 文件
|
||||
rm -f "$PIDFILE"
|
||||
|
||||
echo ""
|
||||
echo "========================================="
|
||||
echo "服务已停止"
|
||||
echo "所有服务已停止"
|
||||
echo "========================================="
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user