Merge branch 'main' of git.opencomputing.cn:yumoqing/kboss

This commit is contained in:
hrx 2026-05-23 14:52:18 +08:00
commit e6b7aa0c94
22 changed files with 2770 additions and 115 deletions

727
b/cntoai/chat.html Normal file
View File

@ -0,0 +1,727 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>模型对话测试 · cntoai</title>
<style>
:root {
--bg: #f0f4f8;
--panel: #fff;
--border: #e2e8f0;
--primary: #2563eb;
--primary-hover: #1d4ed8;
--text: #1e293b;
--muted: #64748b;
--user-bg: #2563eb;
--assistant-bg: #f1f5f9;
}
* { box-sizing: border-box; margin: 0; padding: 0; }
body {
font-family: "Segoe UI", system-ui, sans-serif;
background: var(--bg);
color: var(--text);
height: 100vh;
overflow: hidden;
}
.layout { display: flex; height: 100vh; }
.sidebar {
width: 300px;
background: var(--panel);
border-right: 1px solid var(--border);
display: flex;
flex-direction: column;
flex-shrink: 0;
}
.sidebar-header {
padding: 16px;
border-bottom: 1px solid var(--border);
}
.sidebar-header h1 { font-size: 15px; margin-bottom: 4px; }
.sidebar-header p { font-size: 11px; color: var(--muted); word-break: break-all; }
.btn {
display: inline-flex;
align-items: center;
justify-content: center;
padding: 8px 14px;
font-size: 13px;
border: none;
border-radius: 8px;
cursor: pointer;
transition: background 0.15s;
}
.btn-primary { background: var(--primary); color: #fff; width: 100%; }
.btn-primary:hover { background: var(--primary-hover); }
.btn-primary:disabled { opacity: 0.5; cursor: not-allowed; }
.btn-ghost {
background: transparent;
color: var(--muted);
border: 1px solid var(--border);
margin-top: 8px;
width: 100%;
font-size: 12px;
}
.btn-ghost:hover { background: #f8fafc; color: var(--text); }
.btn-danger { color: #dc2626; border-color: #fecaca; }
.history-list {
flex: 1;
overflow-y: auto;
padding: 8px;
}
.history-item {
padding: 10px 12px;
border-radius: 8px;
cursor: pointer;
font-size: 13px;
margin-bottom: 4px;
white-space: nowrap;
overflow: hidden;
text-overflow: ellipsis;
}
.history-item:hover { background: #f1f5f9; }
.history-item.active { background: #eff6ff; color: var(--primary); }
.history-empty { padding: 16px; font-size: 12px; color: var(--muted); text-align: center; }
.main { flex: 1; display: flex; flex-direction: column; min-width: 0; }
.toolbar {
padding: 12px 20px;
background: var(--panel);
border-bottom: 1px solid var(--border);
display: flex;
flex-wrap: wrap;
gap: 12px;
align-items: center;
}
.toolbar label { font-size: 12px; color: var(--muted); margin-right: 4px; }
.toolbar select, .toolbar input[type="text"] {
padding: 6px 10px;
border: 1px solid var(--border);
border-radius: 6px;
font-size: 13px;
min-width: 160px;
}
.toolbar .chk { display: flex; align-items: center; gap: 6px; font-size: 13px; }
.notice {
padding: 10px 20px;
background: #fffbeb;
border-bottom: 1px solid #fde68a;
font-size: 12px;
color: #92400e;
line-height: 1.5;
}
.notice strong { display: block; margin-bottom: 4px; }
.messages {
flex: 1;
overflow-y: auto;
padding: 20px;
display: flex;
flex-direction: column;
gap: 16px;
}
.msg {
max-width: 85%;
padding: 12px 16px;
border-radius: 12px;
font-size: 14px;
line-height: 1.6;
white-space: pre-wrap;
word-break: break-word;
}
.msg.user {
align-self: flex-end;
background: var(--user-bg);
color: #fff;
}
.msg.assistant {
align-self: flex-start;
background: var(--assistant-bg);
border: 1px solid var(--border);
}
.msg .role-tag {
font-size: 11px;
opacity: 0.75;
margin-bottom: 6px;
}
.msg img.preview { max-width: 200px; border-radius: 8px; margin-top: 8px; display: block; }
.composer {
padding: 16px 20px;
background: var(--panel);
border-top: 1px solid var(--border);
}
.composer textarea {
width: 100%;
min-height: 80px;
padding: 12px;
border: 1px solid var(--border);
border-radius: 10px;
font-size: 14px;
resize: vertical;
font-family: inherit;
}
.composer-actions {
display: flex;
align-items: center;
justify-content: space-between;
margin-top: 10px;
flex-wrap: wrap;
gap: 8px;
}
.composer-actions .left { display: flex; gap: 8px; align-items: center; }
.pending-img {
position: relative;
display: inline-block;
margin-bottom: 8px;
}
.pending-img img { max-height: 60px; border-radius: 6px; }
.pending-img button {
position: absolute;
top: -6px;
right: -6px;
width: 20px;
height: 20px;
border-radius: 50%;
border: none;
background: #64748b;
color: #fff;
cursor: pointer;
font-size: 12px;
line-height: 1;
}
.settings {
padding: 12px 16px;
border-top: 1px solid var(--border);
font-size: 12px;
}
.settings summary { cursor: pointer; color: var(--muted); }
.settings-grid {
display: grid;
gap: 8px;
margin-top: 10px;
}
.settings-grid input { width: 100%; padding: 6px 8px; border: 1px solid var(--border); border-radius: 6px; font-size: 12px; }
.auth-panel {
padding: 12px 16px;
border-bottom: 1px solid var(--border);
background: #f8fafc;
}
.auth-panel h2 { font-size: 13px; margin-bottom: 10px; color: var(--text); }
.auth-panel label {
display: block;
font-size: 11px;
color: var(--muted);
margin-bottom: 4px;
margin-top: 8px;
}
.auth-panel label:first-of-type { margin-top: 0; }
.auth-panel input {
width: 100%;
padding: 7px 9px;
border: 1px solid var(--border);
border-radius: 6px;
font-size: 12px;
}
.auth-panel .hint { font-size: 10px; color: var(--muted); margin-top: 10px; line-height: 1.4; }
.status-bar {
font-size: 11px;
color: var(--muted);
padding: 4px 20px 8px;
}
.loading { color: var(--primary); }
.error { color: #dc2626; }
</style>
</head>
<body>
<div class="layout">
<aside class="sidebar">
<div class="auth-panel">
<h2>接口凭证(手动测试)</h2>
<label for="cfgApiBase">Dspy 网关(本系统接口域名)</label>
<input type="text" id="cfgApiBase" value="https://dev.opencomputing.cn" placeholder="https://dev.opencomputing.cn">
<label for="cfgApiUrl">api_url模型 chat/completions 完整地址)</label>
<input type="text" id="cfgApiUrl" value="https://ai.atvoe.com/llmage/v1/chat/completions" placeholder="https://.../v1/chat/completions">
<label for="cfgApiKey">api_keyBearer 令牌,不含 Bearer 前缀)</label>
<input type="text" id="cfgApiKey" placeholder="xGvvta0hnXPDDHIp7knfB" autocomplete="off">
<label for="cfgUserid">userid会话归属用户持久化接口必填</label>
<input type="text" id="cfgUserid" placeholder="users 表 id">
<p class="hint">填写后所有请求会携带 api_url、api_key、userid可不登录 Cookie 测试。凭证仅保存在本机 localStorage。</p>
<button type="button" class="btn btn-ghost" id="btnSaveAuth" style="margin-top:10px">保存凭证到本地</button>
</div>
<div class="sidebar-header">
<h1>对话历史</h1>
<button type="button" class="btn btn-primary" id="btnNewChat">开启新对话</button>
<button type="button" class="btn btn-ghost btn-danger" id="btnDeleteSession" disabled>删除当前会话</button>
</div>
<div class="history-list" id="historyList">
<div class="history-empty">填写 userid 后刷新</div>
</div>
<details class="settings">
<summary>其它选项</summary>
<div class="settings-grid">
<label>model_id可选从文档表读 api_url<input type="text" id="cfgModelId" placeholder="model_management 表 id"></label>
<label class="chk"><input type="checkbox" id="cfgPersist" checked> 使用 chat_send持久化多轮</label>
</div>
</details>
</aside>
<main class="main">
<div class="notice" id="noticeBox">
<strong>测试说明</strong>
左侧可手动填写 <code>api_url</code><code>api_key</code><code>userid</code> 直接联调;未填 userid 时持久化接口会失败。Dspy 网关默认 <code>https://dev.opencomputing.cn</code>,路径为 <code>/cntoai/*.dspy</code>。须已执行 <code>chat_tables.sql</code>
</div>
<div class="toolbar">
<div>
<label for="modelSelect">模型</label>
<select id="modelSelect">
<option value="">加载模型列表…</option>
</select>
</div>
<div>
<label for="modelManual">或手动输入 model</label>
<input type="text" id="modelManual" placeholder="qwen3.6-plus" value="qwen3.6-plus">
</div>
<label class="chk"><input type="checkbox" id="chkStream" checked> 流式(后端汇总后返回全文)</label>
<button type="button" class="btn btn-ghost" id="btnRefreshHistory" style="width:auto;margin:0">刷新历史</button>
</div>
<div class="messages" id="messages"></div>
<div class="status-bar" id="statusBar"></div>
<div class="composer">
<div class="pending-img" id="pendingImgWrap" style="display:none">
<img id="pendingImg" alt="preview">
<button type="button" id="btnClearImg" title="移除图片">×</button>
</div>
<textarea id="inputText" placeholder="输入消息Ctrl+Enter 发送;可上传图片测试图文问答"></textarea>
<div class="composer-actions">
<div class="left">
<label class="btn btn-ghost" style="width:auto;margin:0;cursor:pointer">
图片<input type="file" id="fileImage" accept="image/*" hidden>
</label>
<span style="font-size:12px;color:var(--muted)">Ctrl+Enter 发送</span>
</div>
<button type="button" class="btn btn-primary" id="btnSend" style="width:auto;min-width:100px">发送</button>
</div>
</div>
</main>
</div>
<script>
(function () {
const $ = (id) => document.getElementById(id);
const state = {
apiBase: 'https://dev.opencomputing.cn',
sessionId: '',
sending: false,
imageBase64: '',
imageMime: 'image/jpeg',
models: [],
};
function getApiBase() {
return ($('cfgApiBase').value || 'https://dev.opencomputing.cn').replace(/\/$/, '');
}
function getModel() {
const sel = $('modelSelect').value;
const manual = $('modelManual').value.trim();
return manual || sel || '';
}
function getModelId() {
const opt = $('modelSelect').selectedOptions[0];
return $('cfgModelId').value.trim() || (opt && opt.dataset.modelId) || '';
}
/** 手动凭证,随每个 cntoai 接口请求传递 */
function getAuthExtras() {
const extras = {};
const apiUrl = $('cfgApiUrl').value.trim();
const apiKey = $('cfgApiKey').value.trim();
const userid = $('cfgUserid').value.trim();
if (apiUrl) extras.api_url = apiUrl;
if (apiKey) extras.api_key = apiKey;
if (userid) extras.userid = userid;
return extras;
}
const LS_AUTH = 'cntoai_chat_auth_v1';
function saveAuthLocal() {
localStorage.setItem(LS_AUTH, JSON.stringify({
apiBase: $('cfgApiBase').value,
apiUrl: $('cfgApiUrl').value,
apiKey: $('cfgApiKey').value,
userid: $('cfgUserid').value,
}));
setStatus('凭证已保存到浏览器本地');
}
function loadAuthLocal() {
try {
const raw = localStorage.getItem(LS_AUTH);
if (!raw) return;
const o = JSON.parse(raw);
if (o.apiBase) $('cfgApiBase').value = o.apiBase;
if (o.apiUrl) $('cfgApiUrl').value = o.apiUrl;
if (o.apiKey) $('cfgApiKey').value = o.apiKey;
if (o.userid) $('cfgUserid').value = o.userid;
} catch (e) { /* ignore */ }
}
function buildUrl(path) {
const base = getApiBase();
const p = path.startsWith('/') ? path : '/' + path;
return base + p;
}
/** 调用 dspy优先 POST JSON失败时尝试 GET */
async function callDspy(path, params, method) {
const url = buildUrl(path);
const body = { ...params, ...getAuthExtras() };
const usePost = method === 'POST' || (method !== 'GET' && JSON.stringify(body).length > 1800);
const opts = {
credentials: 'include',
headers: { 'Accept': 'application/json' },
};
let res;
if (usePost) {
opts.method = 'POST';
opts.headers['Content-Type'] = 'application/json';
opts.body = JSON.stringify(body);
res = await fetch(url, opts);
} else {
const q = new URLSearchParams();
Object.keys(body).forEach((k) => {
const v = body[k];
if (v !== undefined && v !== null && v !== '') {
q.set(k, typeof v === 'object' ? JSON.stringify(v) : String(v));
}
});
res = await fetch(url + (q.toString() ? '?' + q.toString() : ''), {
...opts,
method: 'GET',
});
}
const text = await res.text();
let data;
try {
data = JSON.parse(text);
} catch (e) {
throw new Error('非 JSON 响应 HTTP ' + res.status + ': ' + text.slice(0, 200));
}
return data;
}
function setStatus(msg, isError) {
const el = $('statusBar');
el.textContent = msg;
el.className = 'status-bar' + (isError ? ' error' : msg ? ' loading' : '');
}
function renderMessages(list) {
const box = $('messages');
if (!list.length) {
box.innerHTML = '<div class="msg assistant" style="align-self:center;max-width:100%"><div class="role-tag">提示</div>选择或新建对话后开始聊天</div>';
return;
}
box.innerHTML = list.map((m) => {
const role = m.role === 'user' ? 'user' : 'assistant';
const label = role === 'user' ? '我' : '助手';
const img = m.imagePreview ? '<img class="preview" src="' + m.imagePreview + '" alt="">' : '';
return '<div class="msg ' + role + '"><div class="role-tag">' + label + '</div>' + escapeHtml(m.content) + img + '</div>';
}).join('');
box.scrollTop = box.scrollHeight;
}
function escapeHtml(s) {
const d = document.createElement('div');
d.textContent = s;
return d.innerHTML;
}
async function loadModels() {
const sel = $('modelSelect');
try {
const res = await callDspy('/cntoai/model_management_customer_search.dspy', {
page_size: 200,
current_page: 1,
}, 'GET');
if (!res.status) {
sel.innerHTML = '<option value="">加载失败: ' + (res.msg || '') + '</option>';
return;
}
state.models = res.data.model_list || [];
if (!state.models.length) {
sel.innerHTML = '<option value="">无已上架模型,请手动输入</option>';
return;
}
sel.innerHTML = state.models.map((m) => {
const name = m.model_name || m.display_name || m.id;
return '<option value="' + escapeAttr(m.model_name || name) + '" data-model-id="' + escapeAttr(m.id) + '">' +
escapeHtml((m.display_name || m.model_name) + ' (' + (m.provider || '') + ')') + '</option>';
}).join('');
if (state.models[0]) {
$('modelManual').value = state.models[0].model_name || '';
}
} catch (e) {
sel.innerHTML = '<option value="">请求异常</option>';
setStatus('模型列表: ' + e.message, true);
}
}
function escapeAttr(s) {
return String(s).replace(/"/g, '&quot;');
}
async function loadHistory() {
const box = $('historyList');
try {
const res = await callDspy('/cntoai/chat_session_list.dspy', { page_size: 50 }, 'GET');
if (!res.status) {
box.innerHTML = '<div class="history-empty">' + escapeHtml(res.msg || '加载失败') + '</div>';
return;
}
const sessions = res.data.sessions || [];
if (!sessions.length) {
box.innerHTML = '<div class="history-empty">暂无历史(发送消息后会出现在此)</div>';
return;
}
box.innerHTML = sessions.map((s) => {
const active = s.id === state.sessionId ? ' active' : '';
const title = escapeHtml(s.title || '未命名');
return '<div class="history-item' + active + '" data-id="' + escapeAttr(s.id) + '" title="' + title + '">' + title + '</div>';
}).join('');
box.querySelectorAll('.history-item').forEach((el) => {
el.addEventListener('click', () => loadSession(el.dataset.id));
});
} catch (e) {
box.innerHTML = '<div class="history-empty">' + escapeHtml(e.message) + '</div>';
}
}
async function loadSession(sessionId) {
state.sessionId = sessionId;
$('btnDeleteSession').disabled = !sessionId;
setStatus('加载会话…');
try {
const res = await callDspy('/cntoai/chat_session_messages.dspy', { session_id: sessionId }, 'GET');
if (!res.status) {
setStatus(res.msg || '加载失败', true);
return;
}
const session = res.data.session || {};
if (session.model) {
$('modelManual').value = session.model;
const sel = $('modelSelect');
for (let i = 0; i < sel.options.length; i++) {
if (sel.options[i].value === session.model) {
sel.selectedIndex = i;
break;
}
}
}
const msgs = (res.data.messages || []).map((m) => ({
role: m.role,
content: m.content || '',
}));
syncUiMessages(msgs);
renderMessages(uiMessages);
await loadHistory();
setStatus('');
} catch (e) {
setStatus(e.message, true);
}
}
async function deleteSession() {
if (!state.sessionId || !confirm('确定删除当前会话?')) return;
try {
const res = await callDspy('/cntoai/chat_session_delete.dspy', { session_id: state.sessionId }, 'GET');
if (res.status) {
state.sessionId = '';
$('btnDeleteSession').disabled = true;
syncUiMessages([]);
renderMessages([]);
await loadHistory();
setStatus('已删除');
} else {
setStatus(res.msg || '删除失败', true);
}
} catch (e) {
setStatus(e.message, true);
}
}
function newChat() {
state.sessionId = '';
$('btnDeleteSession').disabled = true;
syncUiMessages([]);
renderMessages([]);
setStatus('新对话');
}
let uiMessages = [];
function syncUiMessages(list) {
uiMessages = list.map((m) => ({ role: m.role, content: m.content || '' }));
}
async function sendMessage() {
const text = $('inputText').value.trim();
const model = getModel();
if (!model) {
alert('请选择或输入模型名称');
return;
}
if (!text && !state.imageBase64) {
alert('请输入文本或上传图片');
return;
}
if (state.sending) return;
const auth = getAuthExtras();
const persist = $('cfgPersist').checked;
if (persist && !auth.userid) {
alert('使用 chat_send 持久化时,请在左侧填写 userid');
return;
}
if (!auth.api_key) {
if (!confirm('未填写 api_key将依赖服务端配置或登录用户 Key是否继续')) return;
}
const userContent = text || '[图片消息]';
uiMessages.push({
role: 'user',
content: userContent,
imagePreview: state.imageBase64 ? ('data:' + state.imageMime + ';base64,' + state.imageBase64) : '',
});
renderMessages(uiMessages);
$('inputText').value = '';
const imgB64 = state.imageBase64;
const imgMime = state.imageMime;
clearImage();
state.sending = true;
$('btnSend').disabled = true;
setStatus('请求中…');
const payload = {
model,
message: text,
stream: $('chkStream').checked,
model_id: getModelId() || undefined,
...getAuthExtras(),
};
if (state.sessionId) payload.session_id = state.sessionId;
if (imgB64) {
payload.image_base64 = imgB64;
payload.image_mime = imgMime;
}
const path = persist ? '/cntoai/chat_send.dspy' : '/cntoai/llm_chat_completions.dspy';
if (!persist) {
payload.messages = uiMessages.slice(0, -1).map((m) => ({ role: m.role, content: m.content }));
payload.message = text || '请描述这张图片';
if (imgB64) {
payload.image_base64 = imgB64;
payload.image_mime = imgMime;
}
delete payload.session_id;
}
try {
const res = await callDspy(path, payload, 'POST');
if (!res.status) {
setStatus(res.msg || '请求失败', true);
uiMessages.pop();
renderMessages(uiMessages);
return;
}
const reply = persist ? (res.data && res.data.reply) : (res.data && res.data.reply);
if (persist && res.data && res.data.session_id) {
state.sessionId = res.data.session_id;
$('btnDeleteSession').disabled = false;
}
uiMessages.push({ role: 'assistant', content: reply || '(空回复)' });
renderMessages(uiMessages);
await loadHistory();
setStatus('完成');
} catch (e) {
setStatus(e.message, true);
uiMessages.pop();
renderMessages(uiMessages);
} finally {
state.sending = false;
$('btnSend').disabled = false;
}
}
function clearImage() {
state.imageBase64 = '';
$('pendingImgWrap').style.display = 'none';
$('fileImage').value = '';
}
$('fileImage').addEventListener('change', (e) => {
const file = e.target.files[0];
if (!file || !file.type.startsWith('image/')) return;
state.imageMime = file.type || 'image/jpeg';
const reader = new FileReader();
reader.onload = () => {
const dataUrl = reader.result;
state.imageBase64 = String(dataUrl).split(',')[1] || '';
$('pendingImg').src = dataUrl;
$('pendingImgWrap').style.display = 'inline-block';
};
reader.readAsDataURL(file);
});
$('btnClearImg').addEventListener('click', clearImage);
$('btnSend').addEventListener('click', sendMessage);
$('btnNewChat').addEventListener('click', newChat);
$('btnDeleteSession').addEventListener('click', deleteSession);
$('btnRefreshHistory').addEventListener('click', () => {
if (!$('cfgUserid').value.trim() && $('cfgPersist').checked) {
alert('持久化接口需要填写 userid');
return;
}
loadHistory();
});
$('inputText').addEventListener('keydown', (e) => {
if (e.ctrlKey && e.key === 'Enter') sendMessage();
});
$('modelSelect').addEventListener('change', () => {
const v = $('modelSelect').value;
if (v) $('modelManual').value = v;
const opt = $('modelSelect').selectedOptions[0];
if (opt && opt.dataset.modelId) $('cfgModelId').value = opt.dataset.modelId;
});
$('btnSaveAuth').addEventListener('click', saveAuthLocal);
$('cfgUserid').addEventListener('change', loadHistory);
$('cfgApiBase').addEventListener('change', () => {
loadModels();
loadHistory();
});
function init() {
loadAuthLocal();
renderMessages([]);
loadModels();
if ($('cfgUserid').value.trim()) loadHistory();
else {
$('historyList').innerHTML = '<div class="history-empty">请填写 userid 后点「刷新历史」</div>';
}
}
init();
})();
</script>
</body>
</html>

177
b/cntoai/chat_send.dspy Normal file
View File

@ -0,0 +1,177 @@
def _escape(value):
if value is None:
return None
return str(value).replace("'", "''")
def _parse_bool(value, default=True):
if value is None or value == '':
return default
if isinstance(value, bool):
return value
return str(value).lower() in ('1', 'true', 'yes', 'on')
def _title_from_message(ns):
text = ns.get('message') or ns.get('text') or ''
text = str(text).strip().replace('\n', ' ')
if not text:
return '新对话'
return text[:30] + ('...' if len(text) > 30 else '')
def _build_user_content(ns):
text_parts = []
if ns.get('message'):
text_parts.append(str(ns.get('message')))
if ns.get('text'):
text_parts.append(str(ns.get('text')))
if ns.get('document_text'):
text_parts.append(str(ns.get('document_text')))
parts = []
merged_text = '\n'.join([p for p in text_parts if p]).strip()
if merged_text:
parts.append({'type': 'text', 'text': merged_text})
if ns.get('image_url'):
parts.append({'type': 'image_url', 'image_url': {'url': ns.get('image_url')}})
if ns.get('image_base64'):
mime = ns.get('image_mime') or 'image/jpeg'
b64 = ns.get('image_base64')
if not str(b64).startswith('data:'):
b64 = 'data:%s;base64,%s' % (mime, b64)
parts.append({'type': 'image_url', 'image_url': {'url': b64}})
if ns.get('document_url'):
parts.append({'type': 'file', 'file': {'file_url': ns.get('document_url')}})
if not parts:
return ''
if len(parts) == 1 and parts[0]['type'] == 'text':
return parts[0]['text']
return parts
async def _load_session_messages(sor, session_id):
sql = """
SELECT role, content, content_type
FROM chat_message
WHERE session_id = '%s'
ORDER BY created_at ASC;
""" % _escape(session_id)
rows = await sor.sqlExe(sql, {})
messages = []
for row in rows:
content = row.get('content') or ''
if row.get('content_type') == 'mixed':
import json
try:
content = json.loads(content)
except Exception:
pass
messages.append({'role': row['role'], 'content': content})
return messages
async def chat_send(ns={}):
"""
发送消息并保存多轮对话(需先执行 chat_tables.sql
参数model, message, stream(默认true), session_id,
image_url, image_base64, document_url, document_text,
with_chunks(true时返回上游 SSE 分片列表,便于确认流式)
说明本接口chat_send.dspy为 JSON 一次性返回。
需要浏览器端实时流式请调用 chat_send_stream.dspySSE
"""
import json
import traceback
# model = ns.get('model')
model = 'deepseek-v4-pro'
if not model:
return {'status': False, 'msg': 'model is required'}
userid = ns.get('userid') or await get_user()
if not userid:
return {'status': False, 'msg': '未找到用户'}
user_content = _build_user_content(ns)
if not user_content:
return {'status': False, 'msg': '请输入文本,或提供图片/文档参数'}
content_type = 'mixed' if isinstance(user_content, list) else 'text'
store_content = json.dumps(user_content, ensure_ascii=False) if content_type == 'mixed' else str(user_content)
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
session_id = ns.get('session_id')
if not session_id:
session_id = uuid()
await sor.C('chat_session', {
'id': session_id,
'userid': userid,
'model': model,
'title': _title_from_message(ns),
})
else:
sessions = await sor.R('chat_session', {'id': session_id, 'userid': userid})
if not sessions:
return {'status': False, 'msg': '会话不存在'}
await sor.C('chat_message', {
'id': uuid(),
'session_id': session_id,
'role': 'user',
'content': store_content,
'content_type': content_type,
})
history = await _load_session_messages(sor, session_id)
stream_val = _parse_bool(ns.get('stream'), True)
chat_result = await path_call('llm_chat_completions.dspy', {
'model': model,
'messages': history,
'stream': stream_val,
'userid': userid,
'api_url': ns.get('api_url'),
'api_key': ns.get('api_key'),
'model_id': ns.get('model_id'),
'with_chunks': ns.get('with_chunks', True),
})
if not chat_result.get('status'):
return chat_result
reply = chat_result['data']['reply']
chunks = chat_result['data'].get('chunks') or []
chunk_count = chat_result['data'].get('chunk_count', 0)
await sor.C('chat_message', {
'id': uuid(),
'session_id': session_id,
'role': 'assistant',
'content': reply,
'content_type': 'text',
})
await sor.sqlExe(
"UPDATE chat_session SET updated_at = NOW() WHERE id = '%s';"
% _escape(session_id),
{},
)
return {
'status': True,
'msg': 'send success',
'data': {
'session_id': session_id,
'reply': reply,
'model': model,
'stream': stream_val,
'chunk_count': chunk_count,
'chunks': chunks if ns.get('with_chunks', True) else None,
},
}
except Exception:
return {'status': False, 'msg': 'send failed, %s' % traceback.format_exc()}
ret = await chat_send(params_kw)
return ret

View File

@ -0,0 +1,311 @@
def _escape(value):
if value is None:
return None
return str(value).replace("'", "''")
def _parse_bool(value, default=True):
if value is None or value == '':
return default
if isinstance(value, bool):
return value
return str(value).lower() in ('1', 'true', 'yes', 'on')
def _title_from_message(ns):
text = ns.get('message') or ns.get('text') or ''
text = str(text).strip().replace('\n', ' ')
if not text:
return '新对话'
return text[:30] + ('...' if len(text) > 30 else '')
def _build_user_content(ns):
text_parts = []
if ns.get('message'):
text_parts.append(str(ns.get('message')))
if ns.get('text'):
text_parts.append(str(ns.get('text')))
if ns.get('document_text'):
text_parts.append(str(ns.get('document_text')))
parts = []
merged_text = '\n'.join([p for p in text_parts if p]).strip()
if merged_text:
parts.append({'type': 'text', 'text': merged_text})
if ns.get('image_url'):
parts.append({'type': 'image_url', 'image_url': {'url': ns.get('image_url')}})
if ns.get('image_base64'):
mime = ns.get('image_mime') or 'image/jpeg'
b64 = ns.get('image_base64')
if not str(b64).startswith('data:'):
b64 = 'data:%s;base64,%s' % (mime, b64)
parts.append({'type': 'image_url', 'image_url': {'url': b64}})
if ns.get('document_url'):
parts.append({'type': 'file', 'file': {'file_url': ns.get('document_url')}})
if not parts:
return ''
if len(parts) == 1 and parts[0]['type'] == 'text':
return parts[0]['text']
return parts
async def _load_session_messages(sor, session_id):
sql = """
SELECT role, content, content_type
FROM chat_message
WHERE session_id = '%s'
ORDER BY created_at ASC;
""" % _escape(session_id)
rows = await sor.sqlExe(sql, {})
messages = []
for row in rows:
content = row.get('content') or ''
if row.get('content_type') == 'mixed':
import json
try:
content = json.loads(content)
except Exception:
pass
messages.append({'role': row['role'], 'content': content})
return messages
async def _resolve_chat_config(ns, sor):
# api_url = ns.get('api_url')
# api_key = ns.get('api_key')
api_url = 'https://api.deepseek.com/chat/completions'
api_key = 'sk-c22d6573e85a4d3fa8ab932386cf2909'
if not api_url and ns.get('model_id'):
doc_rows = await sor.sqlExe(
"SELECT api_url FROM model_api_doc WHERE model_id = '%s' LIMIT 1;"
% _escape(ns.get('model_id')),
{},
)
if doc_rows and doc_rows[0].get('api_url'):
api_url = doc_rows[0]['api_url']
if not str(api_url).endswith('/chat/completions'):
api_url = str(api_url).rstrip('/') + '/chat/completions'
if not api_url:
param_rows = await sor.R('params', {'pname': 'cntoai_llm_chat_url'})
if param_rows:
api_url = param_rows[0]['pvalue']
else:
domain_rows = await sor.R('params', {'pname': 'cntoai_domain'})
if domain_rows:
api_url = domain_rows[0]['pvalue'].rstrip('/') + '/llmage/v1/chat/completions'
else:
api_url = 'https://ai.atvoe.com/llmage/v1/chat/completions'
if not api_key:
userid = ns.get('userid') or await get_user()
if userid:
action = ns.get('apikey_action') or 'user_self_create'
keys = await sor.R('user_api_keys', {'userid': userid, 'action': action})
if not keys:
keys = await sor.R('user_api_keys', {'userid': userid, 'action': 'sync'})
if keys:
api_key = keys[0].get('opc_apikey')
if not api_key:
key_rows = await sor.R('params', {'pname': 'cntoai_llm_api_key'})
if key_rows:
api_key = key_rows[0]['pvalue']
return api_url, api_key
def _extract_stream_piece(payload):
choice = (payload.get('choices') or [{}])[0]
delta = choice.get('delta') or {}
message = choice.get('message') or {}
piece = (
delta.get('content')
or delta.get('reasoning_content')
or message.get('content')
or choice.get('text')
or payload.get('content')
or ''
)
if piece is None:
return ''
return str(piece)
def _sse_event(obj):
import json
return 'data: %s\n\n' % json.dumps(obj, ensure_ascii=False)
async def _iter_upstream_stream(api_url, api_key, payload):
import aiohttp
import json
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer %s' % api_key,
}
payload = dict(payload)
payload['stream'] = True
buffer = ''
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=600)) as session:
async with session.post(api_url, headers=headers, json=payload) as response:
if response.status != 200:
err_text = await response.text()
yield {'type': 'error', 'msg': 'HTTP %s: %s' % (response.status, err_text[:500])}
return
async for raw in response.content:
buffer += raw.decode('utf-8', errors='ignore')
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
line = line.strip()
if not line or line.startswith(':') or not line.startswith('data:'):
continue
data = line[5:].strip()
if data == '[DONE]':
return
try:
payload_obj = json.loads(data)
piece = _extract_stream_piece(payload_obj)
if piece:
yield {'type': 'content', 'content': piece}
except Exception:
continue
tail = buffer.strip()
if tail:
try:
body = json.loads(tail)
choice = (body.get('choices') or [{}])[0]
msg = choice.get('message') or {}
piece = msg.get('content') or choice.get('text') or ''
if piece:
yield {'type': 'content', 'content': str(piece)}
except Exception:
pass
async def inference_generator(request, params_kw=None, **kw):
"""
流式 chat_send先存 user 消息SSE 推送 assistant 片段,结束后存库。
SSE 事件:
{"type":"meta","session_id":"...","model":"..."}
{"type":"content","content":"片段"}
{"type":"done","session_id":"...","reply":"完整文本","model":"..."}
{"type":"error","msg":"..."}
"""
import json
import traceback
ns = params_kw or {}
# model = ns.get('model')
model = 'deepseek-v4-pro'
if not model:
yield _sse_event({'type': 'error', 'msg': 'model is required'})
yield 'data: [DONE]\n\n'
return
userid = ns.get('userid') or await get_user()
if not userid:
yield _sse_event({'type': 'error', 'msg': '未找到用户'})
yield 'data: [DONE]\n\n'
return
user_content = _build_user_content(ns)
if not user_content:
yield _sse_event({'type': 'error', 'msg': '请输入文本,或提供图片/文档参数'})
yield 'data: [DONE]\n\n'
return
content_type = 'mixed' if isinstance(user_content, list) else 'text'
store_content = json.dumps(user_content, ensure_ascii=False) if content_type == 'mixed' else str(user_content)
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
session_id = ns.get('session_id')
if not session_id:
session_id = uuid()
await sor.C('chat_session', {
'id': session_id,
'userid': userid,
'model': model,
'title': _title_from_message(ns),
})
else:
sessions = await sor.R('chat_session', {'id': session_id, 'userid': userid})
if not sessions:
yield _sse_event({'type': 'error', 'msg': '会话不存在'})
yield 'data: [DONE]\n\n'
return
await sor.C('chat_message', {
'id': uuid(),
'session_id': session_id,
'role': 'user',
'content': store_content,
'content_type': content_type,
})
history = await _load_session_messages(sor, session_id)
api_url, api_key = await _resolve_chat_config(ns, sor)
if not api_key:
yield _sse_event({'type': 'error', 'msg': '未找到 API Key'})
yield 'data: [DONE]\n\n'
return
yield _sse_event({
'type': 'meta',
'session_id': session_id,
'model': model,
'stream': True,
})
parts = []
async for evt in _iter_upstream_stream(api_url, api_key, {
'model': model,
'messages': history,
}):
if evt.get('type') == 'error':
yield _sse_event(evt)
yield 'data: [DONE]\n\n'
return
if evt.get('type') == 'content':
parts.append(evt['content'])
yield _sse_event(evt)
reply = ''.join(parts)
await sor.C('chat_message', {
'id': uuid(),
'session_id': session_id,
'role': 'assistant',
'content': reply,
'content_type': 'text',
})
await sor.sqlExe(
"UPDATE chat_session SET updated_at = NOW() WHERE id = '%s';"
% _escape(session_id),
{},
)
yield _sse_event({
'type': 'done',
'session_id': session_id,
'reply': reply,
'model': model,
})
yield 'data: [DONE]\n\n'
except Exception:
yield _sse_event({'type': 'error', 'msg': traceback.format_exc()})
yield 'data: [DONE]\n\n'
async def inference(request, *args, params_kw=None, **kw):
from functools import partial
env = request._run_ns.copy()
f = partial(inference_generator, request, params_kw=params_kw, **kw)
return await env.stream_response(request, f, content_type='text/event-stream')
ret = await inference(request, params_kw=params_kw)
return ret

View File

@ -0,0 +1,39 @@
def _escape(value):
if value is None:
return None
return str(value).replace("'", "''")
async def chat_session_delete(ns={}):
"""删除会话及其全部消息"""
session_id = ns.get('session_id')
if not session_id:
return {'status': False, 'msg': 'session_id is required'}
userid = ns.get('userid') or await get_user()
if not userid:
return {'status': False, 'msg': '未找到用户'}
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
sessions = await sor.R('chat_session', {'id': session_id, 'userid': userid})
if not sessions:
return {'status': False, 'msg': '会话不存在'}
await sor.sqlExe(
"DELETE FROM chat_message WHERE session_id = '%s';" % _escape(session_id),
{},
)
await sor.sqlExe(
"DELETE FROM chat_session WHERE id = '%s' AND userid = '%s';"
% (_escape(session_id), _escape(userid)),
{},
)
return {'status': True, 'msg': 'delete success'}
except Exception as e:
return {'status': False, 'msg': 'delete failed, %s' % str(e)}
ret = await chat_session_delete(params_kw)
return ret

View File

@ -0,0 +1,50 @@
def _escape(value):
if value is None:
return None
return str(value).replace("'", "''")
async def chat_session_list(ns={}):
"""当前用户的对话会话列表(左侧栏历史)"""
userid = ns.get('userid') or await get_user()
if not userid:
return {'status': False, 'msg': '未找到用户'}
page_size = int(ns.get('page_size')) if ns.get('page_size') else 100
current_page = int(ns.get('current_page')) if ns.get('current_page') else 1
offset = (current_page - 1) * page_size
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
count_sql = """
SELECT COUNT(*) AS total_count FROM chat_session
WHERE userid = '%s';
""" % _escape(userid)
total = (await sor.sqlExe(count_sql, {}))[0]['total_count']
find_sql = """
SELECT id, model, title, created_at, updated_at
FROM chat_session
WHERE userid = '%s'
ORDER BY updated_at DESC
LIMIT %s OFFSET %s;
""" % (_escape(userid), page_size, offset)
sessions = await sor.sqlExe(find_sql, {})
return {
'status': True,
'msg': 'list success',
'data': {
'total_count': total,
'page_size': page_size,
'current_page': current_page,
'sessions': sessions,
},
}
except Exception as e:
return {'status': False, 'msg': 'list failed, %s' % str(e)}
ret = await chat_session_list(params_kw)
return ret

View File

@ -0,0 +1,66 @@
def _escape(value):
if value is None:
return None
return str(value).replace("'", "''")
async def chat_session_messages(ns={}):
"""获取某次会话的全部消息"""
session_id = ns.get('session_id')
if not session_id:
return {'status': False, 'msg': 'session_id is required'}
userid = ns.get('userid') or await get_user()
if not userid:
return {'status': False, 'msg': '未找到用户'}
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
sessions = await sor.R('chat_session', {'id': session_id, 'userid': userid})
if not sessions:
return {'status': False, 'msg': '会话不存在'}
sql = """
SELECT id, role, content, content_type, created_at
FROM chat_message
WHERE session_id = '%s'
ORDER BY created_at ASC;
""" % _escape(session_id)
rows = await sor.sqlExe(sql, {})
messages = []
for row in rows:
content = row.get('content') or ''
if row.get('content_type') == 'mixed':
import json
try:
content = json.loads(content)
except Exception:
pass
if isinstance(content, list):
text_parts = [p.get('text', '') for p in content if p.get('type') == 'text']
display = '\n'.join([t for t in text_parts if t]) or '[多媒体消息]'
else:
display = content
messages.append({
'id': row['id'],
'role': row['role'],
'content': display,
'created_at': row.get('created_at'),
})
return {
'status': True,
'msg': 'get messages success',
'data': {
'session': sessions[0],
'messages': messages,
},
}
except Exception as e:
return {'status': False, 'msg': 'get messages failed, %s' % str(e)}
ret = await chat_session_messages(params_kw)
return ret

23
b/cntoai/chat_tables.sql Normal file
View File

@ -0,0 +1,23 @@
-- 多轮对话:请先执行本脚本创建表后再使用 chat_send / chat_session_* 接口
CREATE TABLE IF NOT EXISTS `chat_session` (
`id` varchar(64) NOT NULL COMMENT '会话ID',
`userid` varchar(64) NOT NULL COMMENT '用户ID',
`model` varchar(128) NOT NULL COMMENT '模型名称',
`title` varchar(255) DEFAULT NULL COMMENT '会话标题(首条问题摘要)',
`created_at` datetime DEFAULT current_timestamp() COMMENT '创建时间',
`updated_at` datetime DEFAULT current_timestamp() ON UPDATE current_timestamp() COMMENT '更新时间',
PRIMARY KEY (`id`),
KEY `idx_userid_updated` (`userid`, `updated_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='模型对话会话';
CREATE TABLE IF NOT EXISTS `chat_message` (
`id` varchar(64) NOT NULL COMMENT '消息ID',
`session_id` varchar(64) NOT NULL COMMENT '会话ID',
`role` varchar(32) NOT NULL COMMENT '角色: user / assistant / system',
`content` mediumtext COMMENT '消息内容纯文本或JSON',
`content_type` varchar(32) DEFAULT 'text' COMMENT 'text / mixed',
`created_at` datetime DEFAULT current_timestamp() COMMENT '创建时间',
PRIMARY KEY (`id`),
KEY `idx_session_id` (`session_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='模型对话消息';

View File

@ -60,6 +60,7 @@ async def create_model_apikey(ns={}):
async with session.post(url, headers=headers, data=json.dumps(payload)) as response:
# 打印响应状态码
debug(f"create_model_apikey状态码: {response.status}")
debug(f"create_model_apikey响应: {await response.text()}")
result_sysnc = await response.json()
if not result_sysnc.get('status') == 'ok':
@ -76,12 +77,13 @@ async def create_model_apikey(ns={}):
remote_table_id = result_sysnc['data'].get('id')
name = result_sysnc['data'].get('name')
secretkey = result_sysnc['data'].get('secretkey')
apikey = result_sysnc['data'].get('apikey')
await sor.C('user_api_keys', {
'userid': ns['userid'],
'remote_table_id': remote_table_id,
'name': name,
'opc_apikey': 1,
'opc_apikey': apikey,
'secretkey': secretkey,
'action': 'user_self_create',
})

View File

@ -0,0 +1,53 @@
def _escape(value):
if value is None:
return None
return str(value).replace("'", "''")
async def get_model_api_doc(ns={}):
"""
根据 model_id 查询模型 API 文档。
参数:
model_id (str) 模型ID必填
返回 data 字段:
id, model_id, curl_code, python_code, created_at, updated_at
"""
model_id = ns.get('id')
if not model_id:
return {'status': False, 'msg': 'model id is required'}
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
# 通过model_id从model_management表中查询model_name
model_name_sql = """
SELECT model_name FROM model_management WHERE id = '%s' LIMIT 1;
""" % _escape(model_id)
model_name = await sor.sqlExe(model_name_sql, {})
if not model_name:
return {'status': False, 'msg': 'model not found'}
model_name = model_name[0]['model_name']
find_sql = """
SELECT id, api_url, model_id, curl_code, python_code, created_at, updated_at
FROM model_api_doc
WHERE model_id = '%s'
LIMIT 1;
""" % _escape(model_id)
result = await sor.sqlExe(find_sql, {})
if not result:
return {'status': False, 'msg': 'api doc not found'}
result[0]['model_name'] = model_name
return {
'status': True,
'msg': 'get model api doc success',
'data': result[0],
}
except Exception as e:
return {'status': False, 'msg': 'get model api doc failed, %s' % str(e)}
ret = await get_model_api_doc(params_kw)
return ret

View File

@ -10,90 +10,99 @@ async def get_model_apikey(ns={}):
'msg': '未找到用户'
}
action = ns.get('action')
if not action:
action = 'user_self_create'
# 通过userid从user_api_keys表中查询opc_apikey
db = DBPools()
async with db.sqlorContext('kboss') as sor:
records = await sor.R('user_api_keys', {'userid': ns['userid'], 'action': 'sync'})
records = await sor.R('user_api_keys', {'userid': ns['userid'], 'action': action})
if not records:
return {
'status': False,
'msg': '未找到用户opc_apikey'
'msg': 'apikey不存在'
}
already_sync_user_key = records[0]['opc_apikey']
already_sync_user_appid = records[0]['appid']
# domain 从数据库params表中获取到pname=cntoai_domain的pvalue值
db = DBPools()
async with db.sqlorContext('kboss') as sor:
domain = await sor.R('params', {'pname': 'cntoai_domain'})
if domain:
domain = domain[0]['pvalue']
else:
debug(f"get_model_apikey未找到域名")
return {
'status': False,
'msg': '未找到域名'
}
# 目标URL
url = f"{domain}/dapi/downapps.dspy"
# 请求头
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer %s" % already_sync_user_key
return {
'status': True,
'msg': '获取模型apikey成功',
'data': records
}
try:
# 创建一个异步会话
result_sysnc = None
async with aiohttp.ClientSession() as session:
# 发送GET请求
async with session.get(url, headers=headers) as response:
# 打印响应状态码
debug(f"get_model_apikey状态码: {response.status}")
result_sysnc = await response.json()
if not result_sysnc.get('status') == 'ok':
debug(f"get_model_apikey获取模型apikey失败: {result_sysnc}")
return {
'status': False,
'msg': f"获取模型apikey失败: {result_sysnc}"
}
db = DBPools()
async with db.sqlorContext('kboss') as sor:
# user_api_keys表格 userid/opc_apikey
# 首先判断apikey是否存在
apikeys = result_sysnc['data']['apikeys']
# 遍历apikeys如果apikey不存在则创建 如果存在则做更新 根据userid和remote_table_id判断
for apikey_item in apikeys:
remote_table_id = apikey_item.get('id')
name = '' if not apikey_item.get('name') else apikey_item.get('name')
apikeyid = apikey_item.get('apikeyid')
exist_record = await sor.R('user_api_keys', {'userid': ns['userid'], 'remote_table_id': remote_table_id})
if exist_record:
update_sql = f"UPDATE user_api_keys SET name = '{name}', opc_apikey = '{apikeyid}' WHERE userid = '{ns['userid']}' AND remote_table_id = '{remote_table_id}'"
await sor.sqlExe(update_sql, {})
else:
await sor.C('user_api_keys', {
'userid': ns['userid'],
'remote_table_id': remote_table_id,
'name': name,
'opc_apikey': apikeyid,
'action': 'user_self_create',
})
result_sysnc['status'] = True
return result_sysnc
# already_sync_user_key = records[0]['opc_apikey']
# already_sync_user_appid = records[0]['appid']
except Exception as e:
debug(f"get_model_apikey获取模型apikey失败: {e}")
return {
'status': False,
'msg': f"get_model_apikey获取模型apikey失败: {e}"
}
# # domain 从数据库params表中获取到pname=cntoai_domain的pvalue值
# db = DBPools()
# async with db.sqlorContext('kboss') as sor:
# domain = await sor.R('params', {'pname': 'cntoai_domain'})
# if domain:
# domain = domain[0]['pvalue']
# else:
# debug(f"get_model_apikey未找到域名")
# return {
# 'status': False,
# 'msg': '未找到域名'
# }
# # 目标URL
# url = f"{domain}/dapi/downapps.dspy"
# # 请求头
# headers = {
# "Content-Type": "application/json",
# "Authorization": "Bearer %s" % already_sync_user_key
# }
# try:
# # 创建一个异步会话
# result_sysnc = None
# async with aiohttp.ClientSession() as session:
# # 发送GET请求
# async with session.get(url, headers=headers) as response:
# # 打印响应状态码
# debug(f"get_model_apikey状态码: {response.status}")
# result_sysnc = await response.json()
# if not result_sysnc.get('status') == 'ok':
# debug(f"get_model_apikey获取模型apikey失败: {result_sysnc}")
# return {
# 'status': False,
# 'msg': f"获取模型apikey失败: {result_sysnc}"
# }
# db = DBPools()
# async with db.sqlorContext('kboss') as sor:
# # user_api_keys表格 userid/opc_apikey
# # 首先判断apikey是否存在
# apikeys = result_sysnc['data']['apikeys']
# # 遍历apikeys如果apikey不存在则创建 如果存在则做更新 根据userid和remote_table_id判断
# for apikey_item in apikeys:
# remote_table_id = apikey_item.get('id')
# name = '' if not apikey_item.get('name') else apikey_item.get('name')
# apikeyid = apikey_item.get('apikeyid')
# exist_record = await sor.R('user_api_keys', {'userid': ns['userid'], 'remote_table_id': remote_table_id})
# if exist_record:
# update_sql = f"UPDATE user_api_keys SET name = '{name}', opc_apikey = '{apikeyid}' WHERE userid = '{ns['userid']}' AND remote_table_id = '{remote_table_id}'"
# await sor.sqlExe(update_sql, {})
# else:
# await sor.C('user_api_keys', {
# 'userid': ns['userid'],
# 'remote_table_id': remote_table_id,
# 'name': name,
# 'opc_apikey': apikeyid,
# 'action': 'user_self_create',
# })
# result_sysnc['status'] = True
# return result_sysnc
# except Exception as e:
# debug(f"get_model_apikey获取模型apikey失败: {e}")
# return {
# 'status': False,
# 'msg': f"get_model_apikey获取模型apikey失败: {e}"
# }
ret = await get_model_apikey(params_kw)

View File

@ -6,22 +6,27 @@ async def get_user_balance(ns={}):
:return: 账户余额(与 getCustomerBalance 返回值一致)
"""
debug(ns)
apikey = ns.get('apikey')
# apikey = ns.get('apikey')
userid = ns.get('userid')
db = DBPools()
async with db.sqlorContext('kboss') as sor:
if not apikey:
return {
'status': 'error',
'msg': 'apikey is required'
}
userid_li = await sor.R('user_api_keys', {'opc_apikey': apikey})
if not userid_li:
return {
'status': 'error',
'msg': 'apikey无效请联系管理员'
}
# if not apikey:
# return {
# 'status': 'error',
# 'msg': 'apikey is required'
# }
# userid_li = await sor.R('user_api_keys', {'opc_apikey': apikey})
# if not userid_li:
# return {
# 'status': 'error',
# 'msg': 'apikey无效请联系管理员'
# }
# userid = userid_li[0]['userid']
if not userid:
return {
'status': 'error',
'msg': 'userid is required'
}
user = await sor.R('users', {'id': userid})
if not user:
return {

View File

@ -0,0 +1,283 @@
def _escape(value):
if value is None:
return None
return str(value).replace("'", "''")
def _parse_bool(value, default=True):
if value is None or value == '':
return default
if isinstance(value, bool):
return value
return str(value).lower() in ('1', 'true', 'yes', 'on')
def _parse_messages(ns):
"""解析历史消息:支持 list 或 JSON 字符串"""
raw = ns.get('messages')
if not raw:
return []
if isinstance(raw, list):
return raw
if isinstance(raw, str):
import json
try:
return json.loads(raw)
except Exception:
return []
return []
def build_user_content(ns):
"""
构建单条 user 消息的 content支持文本 / 图片 / 文档链接。
参数(可组合):
message / text 文本
image_url 图片 URL
image_base64 图片 base64不含 data: 前缀)
document_url 文档 URL以 file 类型传给兼容接口)
document_text 文档纯文本(拼入 text
"""
text_parts = []
if ns.get('message'):
text_parts.append(str(ns.get('message')))
if ns.get('text'):
text_parts.append(str(ns.get('text')))
if ns.get('document_text'):
text_parts.append(str(ns.get('document_text')))
parts = []
merged_text = '\n'.join([p for p in text_parts if p]).strip()
if merged_text:
parts.append({'type': 'text', 'text': merged_text})
if ns.get('image_url'):
parts.append({
'type': 'image_url',
'image_url': {'url': ns.get('image_url')},
})
if ns.get('image_base64'):
mime = ns.get('image_mime') or 'image/jpeg'
b64 = ns.get('image_base64')
if not str(b64).startswith('data:'):
b64 = 'data:%s;base64,%s' % (mime, b64)
parts.append({
'type': 'image_url',
'image_url': {'url': b64},
})
if ns.get('document_url'):
parts.append({
'type': 'file',
'file': {'file_url': ns.get('document_url')},
})
if not parts:
return ''
if len(parts) == 1 and parts[0]['type'] == 'text':
return parts[0]['text']
return parts
async def _resolve_chat_config(ns, sor):
"""解析 API 地址与 Bearer Token"""
api_url = 'https://api.deepseek.com/chat/completions'
api_key = 'sk-c22d6573e85a4d3fa8ab932386cf2909'
# api_url = ns.get('api_url')
# api_key = ns.get('api_key')
if not api_url and ns.get('model_id'):
doc_rows = await sor.sqlExe(
"SELECT api_url FROM model_api_doc WHERE model_id = '%s' LIMIT 1;"
% _escape(ns.get('model_id')),
{},
)
if doc_rows and doc_rows[0].get('api_url'):
api_url = doc_rows[0]['api_url']
if not str(api_url).endswith('/chat/completions'):
api_url = str(api_url).rstrip('/') + '/chat/completions'
if not api_url:
param_rows = await sor.R('params', {'pname': 'cntoai_llm_chat_url'})
if param_rows:
api_url = param_rows[0]['pvalue']
else:
domain_rows = await sor.R('params', {'pname': 'cntoai_domain'})
if domain_rows:
api_url = domain_rows[0]['pvalue'].rstrip('/') + '/llmage/v1/chat/completions'
else:
api_url = 'https://ai.atvoe.com/llmage/v1/chat/completions'
if not api_key:
userid = ns.get('userid') or await get_user()
if userid:
action = ns.get('apikey_action') or 'user_self_create'
keys = await sor.R('user_api_keys', {'userid': userid, 'action': action})
if not keys:
keys = await sor.R('user_api_keys', {'userid': userid, 'action': 'sync'})
if keys:
api_key = keys[0].get('opc_apikey')
if not api_key:
key_rows = await sor.R('params', {'pname': 'cntoai_llm_api_key'})
if key_rows:
api_key = key_rows[0]['pvalue']
return api_url, api_key
def _extract_stream_piece(payload):
"""从 SSE chunk 中提取文本(兼容 OpenAI / Qwen 等格式)"""
choice = (payload.get('choices') or [{}])[0]
delta = choice.get('delta') or {}
message = choice.get('message') or {}
piece = (
delta.get('content')
or delta.get('reasoning_content')
or message.get('content')
or choice.get('text')
or payload.get('content')
or ''
)
if piece is None:
return ''
return str(piece)
async def _read_stream_response(response):
"""解析 SSE 流式响应;若上游未按 SSE 返回则回退解析整段 JSON"""
import json
chunks = []
buffer = ''
async for raw in response.content:
buffer += raw.decode('utf-8', errors='ignore')
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
line = line.strip()
if not line or line.startswith(':'):
continue
if not line.startswith('data:'):
continue
data = line[5:].strip()
if data == '[DONE]':
return ''.join(chunks), chunks
try:
payload = json.loads(data)
piece = _extract_stream_piece(payload)
if piece:
chunks.append(piece)
except Exception:
continue
reply = ''.join(chunks)
if reply:
return reply, chunks
# 上游可能忽略 stream=true直接返回完整 JSON
tail = buffer.strip()
if tail:
try:
body = json.loads(tail)
choice = (body.get('choices') or [{}])[0]
msg = choice.get('message') or {}
reply = msg.get('content') or choice.get('text') or ''
if reply:
return str(reply), [str(reply)]
except Exception:
pass
return reply, chunks
async def llm_chat_completions(ns={}):
"""
OpenAI 兼容 chat/completionsaiohttp
参数:
model (str) 模型名,必填
message / text 当前用户文本
messages 历史消息 JSON 数组或 list多轮对话
stream (bool) 是否流式,默认 True
image_url / image_base64 图片
document_url / document_text 文档
api_url / api_key 可覆盖默认配置
model_id 从 model_api_doc 读取 api_url
userid 用于查 user_api_keys
"""
import aiohttp
import json
import traceback
model = ns.get('model')
if not model:
return {'status': False, 'msg': 'model is required'}
stream = _parse_bool(ns.get('stream'), True)
history = _parse_messages(ns)
user_content = build_user_content(ns)
if not user_content and not history:
return {'status': False, 'msg': 'message is required'}
messages = list(history)
if user_content:
messages.append({'role': 'user', 'content': user_content})
payload = {
'model': model,
'stream': stream,
'messages': messages,
}
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
api_url, api_key = await _resolve_chat_config(ns, sor)
if not api_key:
return {'status': False, 'msg': '未找到 API Key请先创建或配置 cntoai_llm_api_key'}
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer %s' % api_key,
}
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=600),
) as session:
async with session.post(api_url, headers=headers, json=payload) as response:
if response.status != 200:
err_text = await response.text()
return {
'status': False,
'msg': '模型请求失败 HTTP %s: %s' % (response.status, err_text[:500]),
}
stream_chunks = []
if stream:
reply, stream_chunks = await _read_stream_response(response)
usage = {}
else:
body = await response.json()
choice = (body.get('choices') or [{}])[0]
msg = choice.get('message') or {}
reply = msg.get('content') or ''
usage = body.get('usage') or {}
return {
'status': True,
'msg': 'chat success',
'data': {
'model': model,
'reply': reply,
'messages': messages + [{'role': 'assistant', 'content': reply}],
'usage': usage,
'stream': stream,
'chunk_count': len(stream_chunks),
'chunks': stream_chunks if ns.get('with_chunks') else None,
},
}
except Exception:
return {'status': False, 'msg': 'chat failed, %s' % traceback.format_exc()}
ret = await llm_chat_completions(params_kw)
return ret

View File

@ -0,0 +1,241 @@
def _escape(value):
if value is None:
return None
return str(value).replace("'", "''")
def _parse_bool(value, default=True):
if value is None or value == '':
return default
if isinstance(value, bool):
return value
return str(value).lower() in ('1', 'true', 'yes', 'on')
def _parse_messages(ns):
raw = ns.get('messages')
if not raw:
return []
if isinstance(raw, list):
return raw
if isinstance(raw, str):
import json
try:
return json.loads(raw)
except Exception:
return []
return []
def build_user_content(ns):
text_parts = []
if ns.get('message'):
text_parts.append(str(ns.get('message')))
if ns.get('text'):
text_parts.append(str(ns.get('text')))
if ns.get('document_text'):
text_parts.append(str(ns.get('document_text')))
parts = []
merged_text = '\n'.join([p for p in text_parts if p]).strip()
if merged_text:
parts.append({'type': 'text', 'text': merged_text})
if ns.get('image_url'):
parts.append({'type': 'image_url', 'image_url': {'url': ns.get('image_url')}})
if ns.get('image_base64'):
mime = ns.get('image_mime') or 'image/jpeg'
b64 = ns.get('image_base64')
if not str(b64).startswith('data:'):
b64 = 'data:%s;base64,%s' % (mime, b64)
parts.append({'type': 'image_url', 'image_url': {'url': b64}})
if ns.get('document_url'):
parts.append({'type': 'file', 'file': {'file_url': ns.get('document_url')}})
if not parts:
return ''
if len(parts) == 1 and parts[0]['type'] == 'text':
return parts[0]['text']
return parts
async def _resolve_chat_config(ns, sor):
api_url = ns.get('api_url')
api_key = ns.get('api_key')
if not api_url and ns.get('model_id'):
doc_rows = await sor.sqlExe(
"SELECT api_url FROM model_api_doc WHERE model_id = '%s' LIMIT 1;"
% _escape(ns.get('model_id')),
{},
)
if doc_rows and doc_rows[0].get('api_url'):
api_url = doc_rows[0]['api_url']
if not str(api_url).endswith('/chat/completions'):
api_url = str(api_url).rstrip('/') + '/chat/completions'
if not api_url:
param_rows = await sor.R('params', {'pname': 'cntoai_llm_chat_url'})
if param_rows:
api_url = param_rows[0]['pvalue']
else:
domain_rows = await sor.R('params', {'pname': 'cntoai_domain'})
if domain_rows:
api_url = domain_rows[0]['pvalue'].rstrip('/') + '/llmage/v1/chat/completions'
else:
api_url = 'https://ai.atvoe.com/llmage/v1/chat/completions'
if not api_key:
userid = ns.get('userid') or await get_user()
if userid:
action = ns.get('apikey_action') or 'user_self_create'
keys = await sor.R('user_api_keys', {'userid': userid, 'action': action})
if not keys:
keys = await sor.R('user_api_keys', {'userid': userid, 'action': 'sync'})
if keys:
api_key = keys[0].get('opc_apikey')
if not api_key:
key_rows = await sor.R('params', {'pname': 'cntoai_llm_api_key'})
if key_rows:
api_key = key_rows[0]['pvalue']
return api_url, api_key
def _extract_stream_piece(payload):
choice = (payload.get('choices') or [{}])[0]
delta = choice.get('delta') or {}
message = choice.get('message') or {}
piece = (
delta.get('content')
or delta.get('reasoning_content')
or message.get('content')
or choice.get('text')
or payload.get('content')
or ''
)
if piece is None:
return ''
return str(piece)
def _sse_event(obj):
import json
return 'data: %s\n\n' % json.dumps(obj, ensure_ascii=False)
async def _iter_upstream_stream(api_url, api_key, payload):
"""向上游发起流式请求,逐片 yield 文本"""
import aiohttp
import json
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer %s' % api_key,
}
payload = dict(payload)
payload['stream'] = True
buffer = ''
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=600)) as session:
async with session.post(api_url, headers=headers, json=payload) as response:
if response.status != 200:
err_text = await response.text()
yield {'type': 'error', 'msg': 'HTTP %s: %s' % (response.status, err_text[:500])}
return
async for raw in response.content:
buffer += raw.decode('utf-8', errors='ignore')
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
line = line.strip()
if not line or line.startswith(':') or not line.startswith('data:'):
continue
data = line[5:].strip()
if data == '[DONE]':
return
try:
payload_obj = json.loads(data)
piece = _extract_stream_piece(payload_obj)
if piece:
yield {'type': 'content', 'content': piece}
except Exception:
continue
tail = buffer.strip()
if tail:
try:
body = json.loads(tail)
choice = (body.get('choices') or [{}])[0]
msg = choice.get('message') or {}
piece = msg.get('content') or choice.get('text') or ''
if piece:
yield {'type': 'content', 'content': str(piece)}
except Exception:
pass
async def inference_generator(request, params_kw=None, **kw):
"""
SSE 流式输出,事件格式:
{"type":"meta","model":"..."}
{"type":"content","content":"片段"}
{"type":"done","reply":"完整文本"}
{"type":"error","msg":"..."}
结束data: [DONE]
"""
import traceback
ns = params_kw or {}
model = ns.get('model')
if not model:
yield _sse_event({'type': 'error', 'msg': 'model is required'})
yield 'data: [DONE]\n\n'
return
history = _parse_messages(ns)
user_content = build_user_content(ns)
if not user_content and not history:
yield _sse_event({'type': 'error', 'msg': 'message is required'})
yield 'data: [DONE]\n\n'
return
messages = list(history)
if user_content:
messages.append({'role': 'user', 'content': user_content})
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
api_url, api_key = await _resolve_chat_config(ns, sor)
if not api_key:
yield _sse_event({'type': 'error', 'msg': '未找到 API Key'})
yield 'data: [DONE]\n\n'
return
yield _sse_event({'type': 'meta', 'model': model, 'stream': True})
parts = []
async for evt in _iter_upstream_stream(api_url, api_key, {
'model': model,
'messages': messages,
}):
if evt.get('type') == 'error':
yield _sse_event(evt)
yield 'data: [DONE]\n\n'
return
if evt.get('type') == 'content':
parts.append(evt['content'])
yield _sse_event(evt)
reply = ''.join(parts)
yield _sse_event({'type': 'done', 'reply': reply, 'model': model})
yield 'data: [DONE]\n\n'
except Exception:
yield _sse_event({'type': 'error', 'msg': traceback.format_exc()})
yield 'data: [DONE]\n\n'
async def inference(request, *args, params_kw=None, **kw):
from functools import partial
env = request._run_ns.copy()
f = partial(inference_generator, request, params_kw=params_kw, **kw)
return await env.stream_response(request, f, content_type='text/event-stream')
ret = await inference(request, params_kw=params_kw)
return ret

View File

@ -26,7 +26,7 @@ def _build_model_dict(ns, include_listing_status=False):
async def model_management_search(ns={}):
"""
分页查询模型列表,支持按 model_name / model_type / provider 筛选。
返回模型总数、待上架总数、已上架总数。
返回模型总数、待上架总数、已上架总数,以及厂商列表、模型类型列表
"""
import traceback
@ -35,10 +35,10 @@ async def model_management_search(ns={}):
offset = (current_page - 1) * page_size
conditions = ['1=1']
if ns.get('model_name'):
model_name = ns.get('model_name')
if ns.get('display_name'):
display_name = ns.get('display_name')
# 模糊查询
conditions.append(f"model_name LIKE '%%%%{model_name}%%%%'")
conditions.append(f"display_name LIKE '%%%%{display_name}%%%%'")
if ns.get('model_type'):
conditions.append("model_type = '%s'" % _escape(ns.get('model_type')))
if ns.get('provider'):
@ -55,10 +55,23 @@ async def model_management_search(ns={}):
stats_li = await sor.sqlExe(stats_sql, {})
stats = stats_li[0] if stats_li else {}
provider_sql = """
SELECT DISTINCT provider FROM model_management
WHERE provider IS NOT NULL AND provider != ''
ORDER BY provider;
"""
model_type_sql = """
SELECT DISTINCT model_type FROM model_management
WHERE model_type IS NOT NULL AND model_type != ''
ORDER BY model_type;
"""
count_sql = """SELECT COUNT(*) AS total_count FROM model_management WHERE %s;""" % where_clause
filter_total = (await sor.sqlExe(count_sql, {}))[0]['total_count']
find_sql = """SELECT * FROM model_management WHERE %s ORDER BY sort_order ASC LIMIT %s OFFSET %s;""" % (where_clause, page_size, offset)
provider_rows = await sor.sqlExe(provider_sql, {})
model_type_rows = await sor.sqlExe(model_type_sql, {})
model_list = await sor.sqlExe(find_sql, {})
return {
@ -68,6 +81,8 @@ async def model_management_search(ns={}):
'total_count': stats.get('total_count', 0),
'pending_count': int(stats.get('pending_count') or 0),
'listed_count': int(stats.get('listed_count') or 0),
'provider_list': [r['provider'] for r in provider_rows],
'model_type_list': [r['model_type'] for r in model_type_rows],
'filter_total': filter_total,
'page_size': page_size,
'current_page': current_page,

View File

@ -26,6 +26,7 @@ async def _charge_order(sor, orderid, order_type='NEW'):
"""
order_rows = await sor.R('bz_order', {'id': orderid})
if not order_rows:
debug(f"订单不存在")
return {'status': 'error', 'msg': '订单不存在'}
order_row = order_rows[0]
@ -38,10 +39,11 @@ async def _charge_order(sor, orderid, order_type='NEW'):
count = 0
if count - float(order_row['amount']) < 0:
pricedifference = count - round(order_row['amount'], 2)
debug(f"账户余额不足,订单金额: {order_row['amount']}, 账户余额: {count}, 差额: {pricedifference}")
return {
'status': 'error',
'msg': '账户余额不足',
'pricedifference': round(pricedifference, 2),
'pricedifference': round(pricedifference, 10),
}
await order2bill(orderid, sor)
@ -121,8 +123,10 @@ async def _charge_order(sor, orderid, order_type='NEW'):
# )
# await sor.C('customer_goods', nss)
debug(f"支付成功")
return {'status': True, 'msg': '支付成功'}
except Exception as error:
debug(f"支付失败: {error}")
return {'status': 'error', 'msg': str(error)}
async def calc_price_by_saleprotocol(sor, org, product_id, supply_price, quantity=1):
@ -142,11 +146,14 @@ async def calc_price_by_saleprotocol(sor, org, product_id, supply_price, quantit
supply_price = abs(float(supply_price))
quantity = int(quantity)
except (TypeError, ValueError):
debug(f"calc_price_by_saleprotocol supply_price / quantity 必须为有效数字")
return {'status': 'error', 'msg': 'supply_price / quantity 必须为有效数字'}
if supply_price <= 0:
debug(f"calc_price_by_saleprotocol supply_price 必须大于 0")
return {'status': 'error', 'msg': 'supply_price 必须大于 0'}
if quantity <= 0:
debug(f"calc_price_by_saleprotocol quantity 必须大于 0")
return {'status': 'error', 'msg': 'quantity 必须大于 0'}
saleprotocol_to_person = await sor.R(
@ -193,12 +200,13 @@ async def calc_price_by_saleprotocol(sor, org, product_id, supply_price, quantit
)
if not product_salemode:
debug(f"calc_price_by_saleprotocol 还未上线这个产品的协议配置")
return {'status': 'error', 'msg': '还未上线这个产品的协议配置'}
discount = product_salemode[0]['discount']
list_price = supply_price
price = abs(round(list_price * discount, 2))
amount = abs(round(price * quantity, 2))
price = abs(round(list_price * discount, 12))
amount = abs(round(price * quantity, 12))
return {
'status': True,
@ -246,17 +254,20 @@ async def process_user_billing(ns={}):
llmid = ns.get('llmid')
if not llmid:
debug(f"{userid} process_user_billing llmid必传")
return {
'status': 'error',
'msg': 'llmid必传'
}
try:
amount = round(float(amount), 2)
amount = round(float(amount), 12)
except (TypeError, ValueError):
debug(f"{userid} process_user_billing amount 必须为有效数字")
return {'status': 'error', 'msg': 'amount 必须为有效数字'}
if amount <= 0:
debug(f"{userid} process_user_billing amount 必须大于 0")
return {'status': 'error', 'msg': 'amount 必须大于 0'}
db = DBPools()
@ -264,6 +275,7 @@ async def process_user_billing(ns={}):
try:
product_li = await sor.R('product', {'providerpid': llmid, 'del_flg': '0'})
if not product_li:
debug(f"{userid} process_user_billing 未找到对应产品,请确认")
return {
'status': 'error',
'msg': '未找到对应产品,请确认'
@ -273,26 +285,30 @@ async def process_user_billing(ns={}):
providerid = product['providerid']
providername_list = await sor.R('organization', {'id': providerid})
if not providername_list:
debug(f"{userid} process_user_billing 厂商不存在 %s" % providername)
return {
'status': 'error',
'msg': '厂商不存在 %s' % providername
}
providername = providername_list[0]['orgname']
userid_li = await sor.R('user_api_keys', {'opc_apikey': apikey})
if not userid_li:
return {
'status': 'error',
'msg': 'apikey无效请联系管理员'
}
# userid_li = await sor.R('user_api_keys', {'opc_apikey': apikey})
# if not userid_li:
# debug(f"{userid} process_user_billing apikey无效请联系管理员")
# return {
# 'status': 'error',
# 'msg': 'apikey无效请联系管理员'
# }
# userid = userid_li[0]['userid']
user_list = await sor.R('users', {'id': userid})
if not user_list:
debug(f"{userid} process_user_billing 用户不存在 %s" % userid)
return {'status': 'error', 'msg': '用户不存在 %s' % userid}
org_list = await sor.R('organization', {'id': user_list[0]['orgid']})
if not org_list:
debug(f"{userid} process_user_billing 用户所属机构不存在")
return {'status': 'error', 'msg': '用户所属机构不存在'}
customerid = org_list[0]['id']
@ -329,7 +345,7 @@ async def process_user_billing(ns={}):
return {
'status': 'error',
'msg': '账户余额不足',
'pricedifference': round(balance - amount, 2),
'pricedifference': round(balance - amount, 12),
}
order_id = uuid()
@ -343,7 +359,7 @@ async def process_user_billing(ns={}):
'order_date': now_str,
'source': providername,
'amount': amount,
'originalprice': round(originalprice, 2),
'originalprice': round(originalprice, 12),
'ordertype': 'prepay',
'servicename': productname,
}

View File

@ -19,9 +19,6 @@ async def sync_cn_ai_user(ns={}):
email = user_info[0]['email']
debug(f"sync_cn_ai_user同步用户: {userid}, {orgid}, {username}, {name}, {email}")
already_sync_user_key = '2i68AZ81di_q5f8AySDrJ'
already_sync_user_dappid = 'cndemo'
# 目标URL
# domain 从数据库params表中获取到pname=cntoai_domain的pvalue值
domain = None
@ -36,6 +33,24 @@ async def sync_cn_ai_user(ns={}):
'status': False,
'msg': '未找到域名'
}
already_sync_user_key = await sor.R('params', {'pname': 'cntoai_already_sync_user_key'})
if already_sync_user_key:
already_sync_user_key = already_sync_user_key[0]['pvalue']
else:
debug(f"sync_cn_ai_user未找到已同步用户key")
return {
'status': False,
'msg': '未找到已同步用户key'
}
already_sync_user_dappid = await sor.R('params', {'pname': 'cntoai_already_sync_user_dappid'})
if already_sync_user_dappid:
already_sync_user_dappid = already_sync_user_dappid[0]['pvalue']
else:
debug(f"sync_cn_ai_user未找到已同步用户dappid")
return {
'status': False,
'msg': '未找到已同步用户dappid'
}
url = f"{domain}/rbac/usersync"

262
b/cntoai/test_chat.py Normal file
View File

@ -0,0 +1,262 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
cntoai 对话相关接口联调测试dev.opencomputing.cn
用法
pip install requests
set CNTOAI_USERID=你的用户id
set CNTOAI_API_KEY=你的api_key
python test_chat.py
可选环境变量
CNTOAI_BASE_URL 默认 https://dev.opencomputing.cn
CNTOAI_MODEL 默认 qwen3.6-plus
CNTOAI_LLM_API_URL 默认 https://ai.atvoe.com/llmage/v1/chat/completions
CNTOAI_COOKIE 浏览器 Cookie未传 userid 时用于鉴权
单测
python test_chat.py --only models
python test_chat.py --only completions
python test_chat.py --only send
python test_chat.py --only session
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import time
from typing import Any, Dict, Optional, Tuple
try:
import requests
except ImportError:
print("请先安装依赖: pip install requests")
sys.exit(1)
BASE_URL = "https://dev.opencomputing.cn"
USERID = "hSqZuekZ1yKmhKmCN9UAK"
API_KEY = "sk-c22d6573e85a4d3fa8ab932386cf2909"
# API_URL = "https://ai.atvoe.com/llmage/v1/chat/completions"
API_URL = "https://api.deepseek.com/chat/completions"
# MODEL = "qwen3.6-plus"
MODEL = "deepseek-v4-pro"
COOKIE = "".strip()
TIMEOUT = int(120)
class ChatApiClient:
def __init__(self, base_url: str = BASE_URL):
self.base_url = base_url.rstrip("/")
self.session = requests.Session()
self.session.headers.update({"Accept": "application/json"})
if COOKIE:
self.session.headers["Cookie"] = COOKIE
def _url(self, path: str) -> str:
path = path if path.startswith("/") else f"/{path}"
return f"{self.base_url}{path}"
def _auth(self, extra: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
params: Dict[str, Any] = {}
if USERID:
params["userid"] = USERID
if API_KEY:
params["api_key"] = API_KEY
if API_URL:
params["api_url"] = API_URL
if extra:
params.update(extra)
return params
def get(self, path: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
resp = self.session.get(self._url(path), params=self._auth(params), timeout=TIMEOUT)
return self._parse(resp)
def post(self, path: str, data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
resp = self.session.post(
self._url(path),
json=self._auth(data),
headers={"Content-Type": "application/json"},
timeout=TIMEOUT,
)
return self._parse(resp)
@staticmethod
def _parse(resp: requests.Response) -> Dict[str, Any]:
print(f" HTTP {resp.status_code} {resp.url[:120]}...")
try:
return resp.json()
except Exception:
return {"status": False, "msg": f"非 JSON 响应: {resp.text[:300]}"}
def ok(name: str, data: Dict[str, Any]) -> bool:
passed = data.get("status") is True
tag = "PASS" if passed else "FAIL"
print(f"\n[{tag}] {name}")
print(json.dumps(data, ensure_ascii=False, indent=2)[:2000])
if not passed:
print(f" -> {data.get('msg', 'unknown error')}")
return passed
def test_model_list(client: ChatApiClient) -> bool:
print("\n=== GET /cntoai/model_management_customer_search.dspy ===")
data = client.get("/cntoai/model_management_customer_search.dspy", {
"page_size": 20,
"current_page": 1,
})
if ok("模型列表", data) and data.get("data"):
models = data["data"].get("model_list") or []
print(f"{len(models)} 个模型")
if models:
m0 = models[0]
print(f" 首个: {m0.get('model_name')} / {m0.get('display_name')}")
return data.get("status") is True
def test_llm_chat_completions(client: ChatApiClient) -> bool:
print("\n=== POST /cntoai/llm_chat_completions.dspy ===")
data = client.post("/cntoai/llm_chat_completions.dspy", {
"model": MODEL,
"message": "用一句话介绍你自己",
"stream": True,
})
if ok("直连模型", data) and data.get("data"):
print(f" 回复摘要: {(data['data'].get('reply') or '')[:200]}")
return data.get("status") is True
def test_chat_send(
client: ChatApiClient,
session_id: Optional[str] = None,
) -> Tuple[bool, Optional[str]]:
print("\n=== POST /cntoai/chat_send.dspy ===")
payload: Dict[str, Any] = {
"model": MODEL,
"message": (
"你好,这是 test_chat.py 自动化测试"
if not session_id
else "继续,用一句话回复我"
),
"stream": True,
}
if session_id:
payload["session_id"] = session_id
data = client.post("/cntoai/chat_send.dspy", payload)
if ok("发送消息", data) and data.get("data"):
sid = data["data"].get("session_id")
print(f" session_id: {sid}")
print(f" 回复摘要: {(data['data'].get('reply') or '')[:200]}")
return True, sid
return False, session_id
def test_chat_session_list(client: ChatApiClient) -> bool:
print("\n=== GET /cntoai/chat_session_list.dspy ===")
data = client.get("/cntoai/chat_session_list.dspy", {"page_size": 10})
if ok("会话列表", data) and data.get("data"):
sessions = data["data"].get("sessions") or []
print(f"{data['data'].get('total_count', len(sessions))} 条会话")
for s in sessions[:3]:
print(f" - {s.get('id')} | {s.get('title')}")
return data.get("status") is True
def test_chat_session_messages(client: ChatApiClient, session_id: str) -> bool:
print("\n=== GET /cntoai/chat_session_messages.dspy ===")
data = client.get("/cntoai/chat_session_messages.dspy", {"session_id": session_id})
if ok("会话消息", data) and data.get("data"):
msgs = data["data"].get("messages") or []
print(f" 消息数: {len(msgs)}")
for m in msgs:
print(f" [{m.get('role')}] {str(m.get('content') or '')[:80]}")
return data.get("status") is True
def test_chat_session_delete(client: ChatApiClient, session_id: str) -> bool:
print("\n=== GET /cntoai/chat_session_delete.dspy ===")
data = client.get("/cntoai/chat_session_delete.dspy", {"session_id": session_id})
return ok("删除会话", data)
def check_config(require_userid: bool = True) -> bool:
print("配置:")
print(f" BASE_URL = {BASE_URL}")
print(f" MODEL = {MODEL}")
print(f" API_URL = {API_URL or '(走服务端配置)'}")
print(f" USERID = {USERID or '(未设置)'}")
print(f" API_KEY = {'已设置' if API_KEY else '(未设置)'}")
print(f" COOKIE = {'已设置' if COOKIE else '(未设置)'}")
if require_userid and not USERID and not COOKIE:
print("\n错误: 持久化接口需要 CNTOAI_USERID 或 CNTOAI_COOKIE")
return False
if not API_KEY:
print("\n警告: 未设置 CNTOAI_API_KEY将依赖服务端 Key")
return True
def main() -> int:
parser = argparse.ArgumentParser(description="cntoai chat API 联调测试")
parser.add_argument(
"--only",
choices=["models", "completions", "send", "session", "delete", "all"],
default="all",
)
parser.add_argument("--keep-session", action="store_true", help="不删除测试会话")
parser.add_argument("--base-url", default=BASE_URL)
args = parser.parse_args()
client = ChatApiClient(base_url=args.base_url)
results = []
session_id: Optional[str] = None
if args.only in ("all", "models"):
results.append(("models", test_model_list(client)))
if args.only in ("all", "completions"):
if check_config(require_userid=False):
results.append(("completions", test_llm_chat_completions(client)))
else:
results.append(("completions", False))
if args.only in ("all", "send", "session", "delete"):
if not check_config(require_userid=True):
return 1
if args.only in ("all", "send"):
passed, session_id = test_chat_send(client)
results.append(("send_1", passed))
if passed and session_id:
time.sleep(1)
passed2, session_id = test_chat_send(client, session_id=session_id)
results.append(("send_2_multiturn", passed2))
if args.only in ("all", "session") and session_id:
results.append(("session_list", test_chat_session_list(client)))
results.append(("session_messages", test_chat_session_messages(client, session_id)))
elif args.only == "session":
results.append(("session_list", test_chat_session_list(client)))
if args.only in ("all", "delete") and session_id and not args.keep_session:
results.append(("delete", test_chat_session_delete(client, session_id)))
elif session_id and args.keep_session:
print(f"\n保留测试会话: {session_id}")
print("\n" + "=" * 50)
print("汇总:")
failed = sum(1 for _, p in results if not p)
for name, passed in results:
print(f" {'OK' if passed else 'FAIL'} {name}")
print("=" * 50)
return 1 if failed else 0
if __name__ == "__main__":
sys.exit(main())

247
b/cntoai/test_demo.py Normal file
View File

@ -0,0 +1,247 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
chat_send_stream.dspy SSE 流式接口测试
用法
pip install requests
python test_demo.py
环境变量可选覆盖下方默认值
CNTOAI_BASE_URL / CNTOAI_USERID / CNTOAI_API_KEY
CNTOAI_MODEL / CNTOAI_LLM_API_URL / CNTOAI_MESSAGE
"""
from __future__ import annotations
import argparse
import json
import os
import sys
from typing import Any, Dict, Generator, List, Optional
try:
import requests
except ImportError:
print("请先安装: pip install requests")
sys.exit(1)
BASE_URL = os.environ.get("CNTOAI_BASE_URL", "https://dev.opencomputing.cn").rstrip("/")
USERID = os.environ.get("CNTOAI_USERID", "hSqZuekZ1yKmhKmCN9UAK").strip()
API_KEY = os.environ.get("CNTOAI_API_KEY", "sk-c22d6573e85a4d3fa8ab932386cf2909").strip()
API_URL = os.environ.get("CNTOAI_LLM_API_URL", "https://api.deepseek.com/v1/chat/completions").strip()
MODEL = os.environ.get("CNTOAI_MODEL", "deepseek-chat").strip()
MESSAGE = os.environ.get("CNTOAI_MESSAGE", "你好,请用三句话介绍你自己").strip()
TIMEOUT = int(os.environ.get("CNTOAI_TIMEOUT", "300"))
STREAM_PATH = "/cntoai/chat_send_stream.dspy"
def build_payload(session_id: Optional[str] = None, message: Optional[str] = None) -> Dict[str, Any]:
payload: Dict[str, Any] = {
"model": MODEL,
"message": message or MESSAGE,
"userid": USERID,
"api_key": API_KEY,
"api_url": API_URL,
}
if session_id:
payload["session_id"] = session_id
return payload
def parse_sse_text(text: str) -> List[Dict[str, Any]]:
events: List[Dict[str, Any]] = []
for line in text.splitlines():
line = line.strip()
if not line.startswith("data:"):
continue
data = line[5:].strip()
if data == "[DONE]":
break
try:
events.append(json.loads(data))
except json.JSONDecodeError:
print(f"[warn] 无法解析: {line[:200]}")
return events
def parse_sse_stream(response: requests.Response) -> Generator[Dict[str, Any], None, None]:
"""
按字节缓冲解析 SSE
勿用 iter_lines(decode_unicode=True)TCP 分块可能截断 UTF-8 多字节字符导致乱码和 JSON 解析失败
"""
buffer = b""
for chunk in response.iter_content(chunk_size=4096):
if not chunk:
continue
buffer += chunk
while b"\n" in buffer:
line_bytes, buffer = buffer.split(b"\n", 1)
if not line_bytes.strip():
continue
line = line_bytes.decode("utf-8").strip()
if not line.startswith("data:"):
continue
data = line[5:].strip()
if data == "[DONE]":
return
try:
yield json.loads(data)
except json.JSONDecodeError:
print(f"\n[warn] JSON 解析失败: {line[:120]}...")
tail = buffer.strip()
if tail:
line = tail.decode("utf-8", errors="replace").strip()
if line.startswith("data:"):
data = line[5:].strip()
if data and data != "[DONE]":
try:
yield json.loads(data)
except json.JSONDecodeError:
pass
def diagnose_empty_response(resp: requests.Response) -> None:
ctype = resp.headers.get("Content-Type", "")
body = resp.content or b""
print("\n[诊断] 响应体为空或无可解析 SSE")
print(f" Content-Type : {ctype}")
print(f" body 长度 : {len(body)}")
if body:
print(f" body 前 500B : {body[:500]!r}")
if "text/html" in ctype and len(body) == 0:
print("\n 可能原因: chat_send_stream.dspy 未执行 inference 入口。")
print(" 请确认文件末尾包含:")
print(" ret = await inference(request, params_kw=params_kw)")
print(" return ret")
print(" 并重新部署到 dev 后再测。")
def test_chat_send_stream(session_id: Optional[str] = None, message: Optional[str] = None) -> Optional[str]:
url = BASE_URL + STREAM_PATH
payload = build_payload(session_id=session_id, message=message)
print("=" * 60)
print("chat_send_stream.dspy 流式测试")
print(f" URL : {url}")
print(f" MODEL : {MODEL}")
print(f" USERID : {USERID}")
print(f" API_URL : {API_URL}")
print(f" message : {payload.get('message')}")
if session_id:
print(f" session : {session_id}")
print("=" * 60)
if not USERID:
print("错误: 请设置 CNTOAI_USERID")
return None
resp = requests.post(
url,
json=payload,
headers={
"Accept": "text/event-stream",
"Content-Type": "application/json",
},
stream=True,
timeout=TIMEOUT,
)
ctype = resp.headers.get("Content-Type", "")
print(f"\nHTTP {resp.status_code} Content-Type: {ctype}\n")
if resp.status_code != 200:
print(resp.text[:500])
return None
if "text/event-stream" not in ctype:
raw = resp.content
diagnose_empty_response(resp)
if raw:
for evt in parse_sse_text(raw.decode("utf-8", errors="ignore")):
print("[parsed]", evt)
return None
session_out: Optional[str] = session_id
full_reply: List[str] = []
has_content = False
event_count = 0
print("--- 流式输出 ---")
for evt in parse_sse_stream(resp):
event_count += 1
etype = evt.get("type")
if etype == "meta":
session_out = evt.get("session_id") or session_out
print(f"[meta] session_id={session_out} model={evt.get('model')}")
continue
if etype == "content":
piece = evt.get("content") or ""
has_content = True
full_reply.append(piece)
print(piece, end="", flush=True)
continue
if etype == "done":
session_out = evt.get("session_id") or session_out
reply = evt.get("reply") or ""
print(f"\n\n[done] session_id={session_out}")
print(f"[done] reply 长度={len(reply)}")
if reply and not has_content:
print(reply)
continue
if etype == "error":
print(f"\n[error] {evt.get('msg')}")
return session_out
print(f"\n[unknown] {evt}")
print("\n--- 结束 ---")
if event_count == 0:
diagnose_empty_response(resp)
elif full_reply:
joined = "".join(full_reply)
print(f"拼接回复({len(joined)}字): {joined[:300]}...")
return session_out
def main() -> int:
if sys.platform == "win32":
try:
sys.stdout.reconfigure(encoding="utf-8")
except Exception:
pass
parser = argparse.ArgumentParser(description="chat_send_stream.dspy SSE 测试")
parser.add_argument("--session-id", help="续聊会话 ID")
parser.add_argument("--message", "-m", help="覆盖默认 message")
parser.add_argument("--twice", action="store_true", help="同一会话连发两条")
args = parser.parse_args()
sid = test_chat_send_stream(session_id=args.session_id, message=args.message)
if sid is None:
return 1
if args.twice and sid:
print("\n" + "=" * 60)
print("第二轮(多轮续聊)")
sid2 = test_chat_send_stream(
session_id=sid,
message=args.message or "继续,用一句话总结上面内容",
)
if sid2 is None:
return 1
if sid:
print(f"\n提示: 续聊 python test_demo.py --session-id {sid}")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@ -0,0 +1,70 @@
async def forgotPassword(ns):
"""
忘记密码:校验短信验证码后重置密码。
参数:
id (str) 用户ID找回验证码接口返回的 userid
password (str) 新密码
codeid (str) 验证码ID
vcode (str) 验证码
也可传 mobile 或 username 定位用户(未传 id 时)。
"""
import re
import traceback
if not ns.get('password'):
return {'status': False, 'msg': '新密码不能为空'}
if len(ns.get('password')) < 8 or not re.search(r'[a-zA-Z]', ns.get('password')) or not re.search(r'[0-9]', ns.get('password')):
return {'status': False, 'msg': '密码至少8位包含大小写字母、特殊字符、数字'}
if not ns.get('codeid'):
return {'status': False, 'msg': '验证码ID不能为空'}
if not ns.get('vcode'):
return {'status': False, 'msg': '验证码不能为空'}
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
code = await sor.R('validatecode', {'id': ns.get('codeid'), 'vcode': ns.get('vcode')})
if code:
create_at = code[0]['create_at']
now = datetime.datetime.now()
create_at_dt = datetime.datetime.strptime(create_at, "%Y-%m-%d %H:%M:%S")
if (now - create_at_dt).seconds > 500:
return {'status': False, 'msg': '验证码过期'}
else:
return {'status': False, 'msg': '验证码不正确'}
user = None
if ns.get('id'):
users = await sor.R('users', {'id': ns.get('id'), 'del_flg': '0'})
if users:
user = users[0]
elif ns.get('mobile'):
users = await sor.R('users', {'mobile': ns.get('mobile'), 'del_flg': '0'})
if users:
user = users[0]
elif ns.get('username'):
users = await sor.R('users', {'username': ns.get('username'), 'del_flg': '0'})
if not users:
users = await sor.R('users', {'mobile': ns.get('username'), 'del_flg': '0'})
if users:
user = users[0]
else:
return {'status': False, 'msg': '用户标识不能为空'}
if not user:
return {'status': False, 'msg': '用户不存在'}
new_password = password_encode(ns['password'])
update_sql = """UPDATE users SET password = '%s' WHERE id = '%s';""" % (new_password, user['id'])
await sor.sqlExe(update_sql, {})
return {'status': True, 'msg': '密码重置成功'}
except Exception as error:
debug(f"forgotPassword 错误: {error}")
debug(f"forgotPassword 错误堆栈: {traceback.format_exc()}")
return {'status': False, 'msg': '密码重置失败, %s' % str(error)}
ret = await forgotPassword(params_kw)
return ret

View File

@ -1,14 +1,40 @@
async def sync_cn_ai_user(userid=None, orgid=None, username=None, name=None, email=None):
import aiohttp
debug(f"sync_cn_ai_user同步用户: {userid}, {orgid}, {username}, {name}, {email}")
already_sync_user_key = '2i68AZ81di_q5f8AySDrJ'
already_sync_user_dappid = 'cndemo'
# 目标URL
url = "https://ai.atvoe.com/rbac/usersync"
# url = 'https://ai.atvoe.com/tmp/env.dspy'
# domain 从数据库params表中获取到pname=cntoai_domain的pvalue值
domain = None
db = DBPools()
async with db.sqlorContext('kboss') as sor:
domain = await sor.R('params', {'pname': 'cntoai_domain'})
if domain:
domain = domain[0]['pvalue']
else:
debug(f"sync_cn_ai_user未找到域名")
return {
'status': False,
'msg': '未找到域名'
}
already_sync_user_key = await sor.R('params', {'pname': 'cntoai_already_sync_user_key'})
if already_sync_user_key:
already_sync_user_key = already_sync_user_key[0]['pvalue']
else:
debug(f"sync_cn_ai_user未找到已同步用户key")
return {
'status': False,
'msg': '未找到已同步用户key'
}
already_sync_user_dappid = await sor.R('params', {'pname': 'cntoai_already_sync_user_dappid'})
if already_sync_user_dappid:
already_sync_user_dappid = already_sync_user_dappid[0]['pvalue']
else:
debug(f"sync_cn_ai_user未找到已同步用户dappid")
return {
'status': False,
'msg': '未找到已同步用户dappid'
}
url = f"{domain}/rbac/usersync"
# 请求头
headers = {
@ -82,7 +108,6 @@ async def sync_cn_ai_user(userid=None, orgid=None, username=None, name=None, ema
'msg': f"sync_cn_ai_user{userid}同步用户失败: {e}"
}
async def registerUser(ns):
"""
用户注册

View File

@ -539,6 +539,12 @@ jiajie_ali_products = [
]
async def get_firstpage_product_tree(ns={}):
token_market = await path_call('../cntoai/model_management_customer_search.dspy', ns)
# if token_market.get('status'):
# token_market = token_market.get('data')
# else:
# token_market = None
data = {
"product_service": [
{
@ -590,7 +596,8 @@ async def get_firstpage_product_tree(ns={}):
# },
# ],
},
]
],
'token_market': token_market
},
{
'id': "2", 'firTitle': "元境", 'secMenu': [

View File

@ -112,6 +112,18 @@ async def mobilecode(ns):
return {'status': False, 'msg': '发送失败'}
else:
return {'status': False, 'action': 'redirect', 'msg': '用户未注册, 请到注册页面注册'}
# 忘记密码逻辑:检查手机号是否存在
elif action_type == 'forgotpassword':
if len(userreacs) >= 1:
code = await generate_vcode()
nss = await send_vcode(userreacs[0]['mobile'], '用户注册登录验证', {'SMSvCode': code.get('vcode')})
if nss['status']:
return {'status': True, 'msg': '验证码发送成功', 'codeid': code.get('id')}
else:
return {'status': False, 'msg': '发送失败'}
else:
return {'status': False, 'action': 'redirect', 'msg': '用户未注册, 请到注册页面注册'}
# 原有逻辑如果没有指定action_type保持原有逻辑
else: