diff --git a/dagflow/dagflow.py b/dagflow/dagflow.py index 010bb85..c7969aa 100644 --- a/dagflow/dagflow.py +++ b/dagflow/dagflow.py @@ -101,6 +101,19 @@ class FlowEngine: # ------------------------------ # Definition / Instance # ------------------------------ + def __init__(self): + self.aio = [] + + async def bgtask(self, app): + env = ServerEnv() + self.aio = [] + async with (env, 'workflow') as sor: + fid = env.uuid() + self.aio = await sor.R('flow_instance', {'status': 'running'}) + while True: + for r in self.aio: + self.step(r.org_id, r.id) + await asyncio.sleep(0.5) async def create_definition(self, org_id, name, version, dsl_text): env = ServerEnv() @@ -135,7 +148,7 @@ class FlowEngine: async def step(self, org_id, instance_id): env = ServerEnv() - async with (env, 'workflow') as sor: + async with get_sor_context(env, 'workflow') as sor: insts = await sor.R('flow_instance', { 'id': instance_id, 'org_id': org_id diff --git a/dagflow/init.py b/dagflow/init.py new file mode 100644 index 0000000..e3b201f --- /dev/null +++ b/dagflow/init.py @@ -0,0 +1,10 @@ +from ahserver.serverenv import ServerEnv +from ahserver.configuredServer import add_startup +from .dagflow import FlowEngine + +def load_dagflow(): + engine = FlowEngine() + add_startup(engine.bgtask) + env = ServerEnv() + env.workflow_engine = engine + diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..f04f63b --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,22 @@ +[build-system] +requires = ["setuptools>=61.0"] +build-backend = "setuptools.build_meta" + +[project] +name = "dagflow" +version = "0.1.0" +description = "a module to build and run workflow" +authors = [ + { name="Your Name", email="you@example.com" } +] +readme = "README.md" +requires-python = ">=3.10" +dependencies = [ + "pydantic>=1.10", + "skillkit", + "aiohttp" +] + +[tool.setuptools] +# 明确告诉它只包含 skillagent 目录 +packages = ["dagflow"]