kboss/b/cntoai/llm_chat_completions.dspy
2026-05-23 14:22:30 +08:00

284 lines
9.2 KiB
Plaintext
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

def _escape(value):
if value is None:
return None
return str(value).replace("'", "''")
def _parse_bool(value, default=True):
if value is None or value == '':
return default
if isinstance(value, bool):
return value
return str(value).lower() in ('1', 'true', 'yes', 'on')
def _parse_messages(ns):
"""解析历史消息:支持 list 或 JSON 字符串"""
raw = ns.get('messages')
if not raw:
return []
if isinstance(raw, list):
return raw
if isinstance(raw, str):
import json
try:
return json.loads(raw)
except Exception:
return []
return []
def build_user_content(ns):
"""
构建单条 user 消息的 content支持文本 / 图片 / 文档链接。
参数(可组合):
message / text 文本
image_url 图片 URL
image_base64 图片 base64不含 data: 前缀)
document_url 文档 URL以 file 类型传给兼容接口)
document_text 文档纯文本(拼入 text
"""
text_parts = []
if ns.get('message'):
text_parts.append(str(ns.get('message')))
if ns.get('text'):
text_parts.append(str(ns.get('text')))
if ns.get('document_text'):
text_parts.append(str(ns.get('document_text')))
parts = []
merged_text = '\n'.join([p for p in text_parts if p]).strip()
if merged_text:
parts.append({'type': 'text', 'text': merged_text})
if ns.get('image_url'):
parts.append({
'type': 'image_url',
'image_url': {'url': ns.get('image_url')},
})
if ns.get('image_base64'):
mime = ns.get('image_mime') or 'image/jpeg'
b64 = ns.get('image_base64')
if not str(b64).startswith('data:'):
b64 = 'data:%s;base64,%s' % (mime, b64)
parts.append({
'type': 'image_url',
'image_url': {'url': b64},
})
if ns.get('document_url'):
parts.append({
'type': 'file',
'file': {'file_url': ns.get('document_url')},
})
if not parts:
return ''
if len(parts) == 1 and parts[0]['type'] == 'text':
return parts[0]['text']
return parts
async def _resolve_chat_config(ns, sor):
"""解析 API 地址与 Bearer Token"""
api_url = 'https://api.deepseek.com/chat/completions'
api_key = 'sk-c22d6573e85a4d3fa8ab932386cf2909'
# api_url = ns.get('api_url')
# api_key = ns.get('api_key')
if not api_url and ns.get('model_id'):
doc_rows = await sor.sqlExe(
"SELECT api_url FROM model_api_doc WHERE model_id = '%s' LIMIT 1;"
% _escape(ns.get('model_id')),
{},
)
if doc_rows and doc_rows[0].get('api_url'):
api_url = doc_rows[0]['api_url']
if not str(api_url).endswith('/chat/completions'):
api_url = str(api_url).rstrip('/') + '/chat/completions'
if not api_url:
param_rows = await sor.R('params', {'pname': 'cntoai_llm_chat_url'})
if param_rows:
api_url = param_rows[0]['pvalue']
else:
domain_rows = await sor.R('params', {'pname': 'cntoai_domain'})
if domain_rows:
api_url = domain_rows[0]['pvalue'].rstrip('/') + '/llmage/v1/chat/completions'
else:
api_url = 'https://ai.atvoe.com/llmage/v1/chat/completions'
if not api_key:
userid = ns.get('userid') or await get_user()
if userid:
action = ns.get('apikey_action') or 'user_self_create'
keys = await sor.R('user_api_keys', {'userid': userid, 'action': action})
if not keys:
keys = await sor.R('user_api_keys', {'userid': userid, 'action': 'sync'})
if keys:
api_key = keys[0].get('opc_apikey')
if not api_key:
key_rows = await sor.R('params', {'pname': 'cntoai_llm_api_key'})
if key_rows:
api_key = key_rows[0]['pvalue']
return api_url, api_key
def _extract_stream_piece(payload):
"""从 SSE chunk 中提取文本(兼容 OpenAI / Qwen 等格式)"""
choice = (payload.get('choices') or [{}])[0]
delta = choice.get('delta') or {}
message = choice.get('message') or {}
piece = (
delta.get('content')
or delta.get('reasoning_content')
or message.get('content')
or choice.get('text')
or payload.get('content')
or ''
)
if piece is None:
return ''
return str(piece)
async def _read_stream_response(response):
"""解析 SSE 流式响应;若上游未按 SSE 返回则回退解析整段 JSON"""
import json
chunks = []
buffer = ''
async for raw in response.content:
buffer += raw.decode('utf-8', errors='ignore')
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
line = line.strip()
if not line or line.startswith(':'):
continue
if not line.startswith('data:'):
continue
data = line[5:].strip()
if data == '[DONE]':
return ''.join(chunks), chunks
try:
payload = json.loads(data)
piece = _extract_stream_piece(payload)
if piece:
chunks.append(piece)
except Exception:
continue
reply = ''.join(chunks)
if reply:
return reply, chunks
# 上游可能忽略 stream=true直接返回完整 JSON
tail = buffer.strip()
if tail:
try:
body = json.loads(tail)
choice = (body.get('choices') or [{}])[0]
msg = choice.get('message') or {}
reply = msg.get('content') or choice.get('text') or ''
if reply:
return str(reply), [str(reply)]
except Exception:
pass
return reply, chunks
async def llm_chat_completions(ns={}):
"""
OpenAI 兼容 chat/completionsaiohttp
参数:
model (str) 模型名,必填
message / text 当前用户文本
messages 历史消息 JSON 数组或 list多轮对话
stream (bool) 是否流式,默认 True
image_url / image_base64 图片
document_url / document_text 文档
api_url / api_key 可覆盖默认配置
model_id 从 model_api_doc 读取 api_url
userid 用于查 user_api_keys
"""
import aiohttp
import json
import traceback
model = ns.get('model')
if not model:
return {'status': False, 'msg': 'model is required'}
stream = _parse_bool(ns.get('stream'), True)
history = _parse_messages(ns)
user_content = build_user_content(ns)
if not user_content and not history:
return {'status': False, 'msg': 'message is required'}
messages = list(history)
if user_content:
messages.append({'role': 'user', 'content': user_content})
payload = {
'model': model,
'stream': stream,
'messages': messages,
}
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
api_url, api_key = await _resolve_chat_config(ns, sor)
if not api_key:
return {'status': False, 'msg': '未找到 API Key请先创建或配置 cntoai_llm_api_key'}
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer %s' % api_key,
}
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=600),
) as session:
async with session.post(api_url, headers=headers, json=payload) as response:
if response.status != 200:
err_text = await response.text()
return {
'status': False,
'msg': '模型请求失败 HTTP %s: %s' % (response.status, err_text[:500]),
}
stream_chunks = []
if stream:
reply, stream_chunks = await _read_stream_response(response)
usage = {}
else:
body = await response.json()
choice = (body.get('choices') or [{}])[0]
msg = choice.get('message') or {}
reply = msg.get('content') or ''
usage = body.get('usage') or {}
return {
'status': True,
'msg': 'chat success',
'data': {
'model': model,
'reply': reply,
'messages': messages + [{'role': 'assistant', 'content': reply}],
'usage': usage,
'stream': stream,
'chunk_count': len(stream_chunks),
'chunks': stream_chunks if ns.get('with_chunks') else None,
},
}
except Exception:
return {'status': False, 'msg': 'chat failed, %s' % traceback.format_exc()}
ret = await llm_chat_completions(params_kw)
return ret