公司有个项目,主要需求就是 能够执行用户上传的插件(就是 pyton2 代码),并且按照一定的流程执行,然后要支持集群模式。
先说问题:
有个 web 界面上传插件,然后在界面上面配置几个插件执行。这个任务会按照设置好的周期执行。
插件的流程是具体的 为:查询数据--处理数据--发送数据
importlib
将插件动态导入进内存中 以便后面调用处理数据
的进程收到 kafka 消息 根据消息内要执行的处理数据插件
、发送数据插件
来分别按照流程调用插件求助大佬们,有没有开源的框架能够胜任这种情况的?已经尽可能精简了,希望大佬们留下一点点意见。
1
musi 246 天前
打开一下思路
引入 faas ,想稳定就云厂商,想自建就找开源的 然后就变成了两个服务之间的交互 rpc/http 看喜好 |
3
Dongxiaohao 246 天前
没写过你这种流程引擎,但是有类似的户上传 Python 算法,给服务端去执行;
Python 那边起了一个 Flask 的 webserver ,提供 deploy 和 uninstall 和调用算法 predict 方法的接口。 大致流程就是 web 服务器部署算法之后,Java 把算法文件传给 Flask ,Flask 动态加载这个 Python 算法,存在内存中。 然后等待 Java 调用 predict 的请求就行了。 要和算法编写人员约定好算法文件的部分格式,比如主类名称,和方法名,不能乱写。 |
4
foolishcrab 246 天前 via iPhone
你看一下 perfect hq
|
5
SmiteChow 246 天前
核心问题:动态代码应该动态编译执行,而不是存为文件。以下是实战中的代码片段,供参考
``` code = compile(python_code, run_file_path, 'exec') space = globals() space['__builtins__'].update({ 'asql_runtime': self.runtime, 'asql_types': data_types, 'asql_stdlib': stdlib, }) func = FunctionType(code.co_consts[0], space) ``` 其他的问题: 1. 调试,可以提供命令行 sdk 2. 动态编译执行已解决 3. 具体问题具体分析,语法检查目前实现方式没问题 4. 单独开进程去跑才能控制开销,一下是实战中的代码片段,供参考 ``` recv_end, send_end = multiprocessing.Pipe(False) process = multiprocessing.Process(target=self.execute_python_method, args=(send_end, func_name, *args)) process.start() bag = {'process': process, 'timeout': False} timer = threading.Timer(self.configure.hook_method_max_execute_time, self.terminate_python_method, args=(bag,)) timer.start() process.join() # 超时结束 if bag['timeout']: raise HookMethodExecuteTimeOut(func_name) # 在规定时间内结束 timer.cancel() result, ok = recv_end.recv() ``` |
7
xhatt510 OP @foolishcrab 好的,我去看看
|
8
xhatt510 OP @Dongxiaohao 估计和我这个差不多的实现方式
|
9
imaple 246 天前
听着好像 xxljob 就能解决, 定时调度自定义 python 代码
|
11
ryulxy 246 天前
我做过的事情和这个有点像,是 C++开发的引擎里嵌入 Python 虚拟机载入用户编写的 Python 脚本执行,可以动态载入 Python 脚本,脚本修改后可以热重载不用重启,然后捕获 Python 脚本编译阶段的报错不运行,和游戏引擎脚本差不多
|