diff --git a/build.sh b/build.sh new file mode 100644 index 0000000..1d0cdba --- /dev/null +++ b/build.sh @@ -0,0 +1,46 @@ +i#!/usr/bin/env bash +# clone from git@git.opencomputing.cn/yumoqing/vdb +# git clone https://git.opencomputing.cn/yumoqing/vdb +cdir=$(pwd) +uname=$(id -un) +gname=$(id -gn) +sudo apt install redis-server +python3 -m venv py3 +source py3/bin/activate +pip install . +pip install apppublic sqlor ahserver bricks-for-python +mkdir $cdir/logs +cd $cdir +cat > $cdir/vdb.service < $cdir/start.sh < $cdir/stop.sh <=61.0"] +build-backend = "setuptools.build_meta" + +[project] +name = "vdb" +version = "0.1.0" +description = "向量数据库服务" +readme = "README.md" +requires-python = ">=3.10" +dependencies = [ + "apppublic", + "sqlor", + "ahserver", + "pymilrus" +] + +[tool.setuptools.packages.find] +where = ["."] # 声明在哪个目录下查找包,默认是当前目录 +include = ["vdb"] # 包含哪些包 diff --git a/vdb/__init__.py b/vdb/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/vdb/init.py b/vdb/init.py new file mode 100644 index 0000000..04218aa --- /dev/null +++ b/vdb/init.py @@ -0,0 +1,125 @@ +from traceback import format_exc +from functools import partial +from ahserver.serverenv import ServerEnv +from appPublic.worker import awaitify +from appPublic.registerfunction import RegisterFunction +from appPublic.log import debug, exception +from appPublic.jsonConfig import getConfig +from .milvus import MilvusManager + +def ownerparting(data): + if data.get('ownerid'): + return data.get('ownerid') + return '_default' + +async def create_collection(request, params_kw, *args, **kwargs): + colname = params_kw.colname + fields = params_kw.fields + description = params_kw.description or "" + env = request._run_ns + f = awaitify(env.vdb.create_collection) + try: + r = await f(colname, fields, description) + return { + "status":"SUCCEEDED", + } + except Exception as e: + exception(f'{e}, {format_exc()}') + return { + "status":"FAILED", + "error": f"{e}" + } + +async def drop_collection(request, params_kw, *args, **kwargs): + colname = params_kw.colname + env = request._run_ns + f = awaitify(env.vdb.drop_collection) + try: + r = awiat f(colname) + return { + "status": "SUCCEEDED" + } + except Exception as e: + exception(f"{e}, {format_exc()}") + return { + "status": "FAILED", + "error": f"{e}" + } + +async def upsert(request, params_kw, *args, **kwargs): + colname = params_kw.colname + data = params_kw.data + if not isinstance(data, list): + data = [data] + + env = request._run_ns + f = awaitify(env.vdb.upsert) + try: + r = awiat f(colname, data) + return { + "status": "SUCCEEDED" + } + except Exception as e: + exception(f"{e}, {format_exc()}") + return { + "status": "FAILED", + "error": f"{e}" + } + +async def delete(request, params_kw, *args, **kwargs): + colname = params_kw.colname + pks = params_kw.pks + + env = request._run_ns + f = awaitify(env.vdb.delete) + try: + r = awiat f(colname, pks) + return { + "status": "SUCCEEDED" + } + except Exception as e: + exception(f"{e}, {format_exc()}") + return { + "status": "FAILED", + "error": f"{e}" + } + +async def query(request, params_kw, *args, **kwargs): + colname = params_kw.colname + vector=None, expr=None, pagerows=80, page=1, output_fields + vector = params_kw.vector + expr = params_kw.expr + pagerows = params_kw.pagerows or 80 + page = params_kw.page or 1 + output_fields = params_kw.output_fields + + env = request._run_ns + f1 = awaitify(env.vdb.drop_collection) + try: + f = partial(f1, colname, vector=vector, expr=expr, pagerows=pagerows, page=page, output_fields=output_fields) + r = awiat f() + return { + "status": "SUCCEEDED" + "data": r + } + except Exception as e: + exception(f"{e}, {format_exc()}") + return { + "status": "FAILED", + "error": f"{e}" + } + +def load_vdb(): + config = getConfig() + vdb = None + vdb_type = config.vdb_type + if vdb_type == 'milvus': + vdb = MilvusManager(partitionize=ownerparting) + env = ServerEnv() + env.vdb = vdb + rf = RegisterFunction() + rf.register('create_collection', create_collection) + rf.register('drop_collection', drop_collection) + rf.register('upsert', upsert) + rf.register('delete', delete) + rf.register('query', query)