V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
seventhbible
V2EX  ›  Python

请问在 Python 的事件系统中,如何可以通过事件通知立刻终结一个正在运行的子线程?

  •  
  •   seventhbible · 2021-02-22 10:43:16 +08:00 · 2041 次点击
    这是一个创建于 1371 天前的主题,其中的信息可能已经有所发展或是发生改变。

    大家好,最近在完善手头上一个基于事件系统的 GUI 。

    现在遇到一个问题,就是当我在执行一个按钮点击事件的时候,实际会开一个子线程进行业务逻辑的处理,这个处理过程可能会比较长,并且中间可能会出现一些不符合预期的情况发生,当发生这种情况时,我希望会借由发送一个错误的事件通知 EventManager,然后调用 listener 的方法来立刻终结这个对应的错误线程(因为之前已经发生了错误了,后续逻辑代码继续执行没有意义了)

    代码如下,大佬们直接复制运行即可观察。如能解答,万分感谢!

    from queue import Queue, Empty
    from threading import *
    from tkinter import *
    import time
    from tkinter import ttk
    
    EVENT_TYPE_1 = "Count"
    EVENT_TYPE_2 = "Error"
    MAX_NUMBER = 10
    CUR_NUMBER = 0
    
    
    class event_manager:
        def __init__(self):
            self._eventQueue = Queue()
            self._thread = Thread(target=self.Run, daemon=True)
            self._handlers = {}
            self._active = False
    
        def Start(self):
            self._active = True
            self._thread.start()
    
        def Run(self):
            while self._active is True:
                try:
                    event = self._eventQueue.get(block=True, timeout=1)
                    self.Process(event)
                except Empty:
                    pass
    
        def Process(self, event):
            if event.type in self._handlers:
                for handler in self._handlers[event.type]:
                    handler()
            else:
                pass
    
        def Stop(self):
            self._active = False
            self._thread.join()
    
        def addEventListenter(self, type_, handler):
            try:
                handlerList = self._handlers[type_]
    
            except KeyError:
                handlerList = []
                self._handlers[type_] = handlerList
    
            if handler not in handlerList:
                handlerList.append(handler)
    
        def removeEventListenter(self, type_, handler):
            try:
                handlerList = self._handlers[type_]
                if handler in handlerList:
                    handlerList.remove(handler)
                if not handlerList:
                    del self._handlers[type_]
            except KeyError:
                pass
    
        def sendEvent(self, event):
            self._eventQueue.put(event)
    
    
    class Event:
        def __init__(self, event_event_name, cur_done_task, type_=None):
            self.type = type_
            self._event_name = event_event_name
            self._curDoneTask = cur_done_task
    
    
    class EventSource:
        def __init__(self, event_name, event_mgr, max_number, type):
            self._event_name = event_name
            self._event_manager = event_mgr
            self._type = type
            self._max_number = max_number
    
        def count(self):
            global CUR_NUMBER
            for i in range(self._max_number):
                CUR_NUMBER = i + 1
    
                if CUR_NUMBER == 4:  # 在业务逻辑线程中增加检测环节,如果发生错误就会发送错误事件,希望可以立刻终结当前的线程,不执行后续的代码
                    print("************ detect error occurred , this thread should be terminated immediately !")
                    errorEvent = Event("error", CUR_NUMBER, type_=EVENT_TYPE_2)
                    self._event_manager.sendEvent(errorEvent)
    
                print(
                    "************ main thread start:now start process {} - count : {}".format(self._event_name, CUR_NUMBER))
                event = Event("test", CUR_NUMBER, type_=self._type)
                self._event_manager.sendEvent(event)
                time.sleep(1)
    
    
    class GUIListener(Tk):
        def __init__(self):
            super(GUIListener, self).__init__()
    
            self.title("Progress GUI")
            self.geometry("1200x805+600+100")
            self.config(bg="#535353")
            self.resizable(True, True)
            self.taskThread = None
    
            self.progressBar = ttk.Progressbar(master=self, orient=HORIZONTAL, maximum=MAX_NUMBER, length=300)
            self.progressBar.pack()
            self.button = ttk.Button(self, text="Run", command=lambda: self.button_function(MAX_NUMBER))
            self.button.pack()
    
        def update_progress_value(self):
            print("************Sub thread start: detect progress bar value is now...{}".format(self.progressBar['value']))
            self.progressBar['value'] = CUR_NUMBER
            self.progressBar.update_idletasks()
            print("************Sub thread start: update progress bar value to...{}".format(CUR_NUMBER))
    
        def button_function(self, max_number):
            # 在正式开始执行逻辑子线程之前,确实可以提前做一些判断,来决定是否满足条件,开始接下来的逻辑子线程,但是这个不在本次讨论范围内
            es = EventSource("eventSource", eventMgr, max_number, EVENT_TYPE_1)
            self.taskThread = Thread(target=es.count, daemon=True).start()  # 这里开始执行实际的业务逻辑子线程
    
        def terminate_error_thread(self):  # 这个方法在 GUIListener 接受到事件源发出的错误逻辑时被唤起,用来立刻终结正在执行事件源的线程,不做后续无用的代码逻辑处理
            pass
            # TODO: but how to implement this method ?
    
    
    if __name__ == '__main__':
        gui = GUIListener()
    
        eventMgr = event_manager()
        eventMgr.addEventListenter(EVENT_TYPE_1, gui.update_progress_value)
        eventMgr.addEventListenter(EVENT_TYPE_2, gui.terminate_error_thread)
    
        eventMgr.Start()
    
        gui.mainloop()
    
    

    顺便一提,希望得到的结果是这样的

    ************ main thread start:now start process eventSource - count : 1
    ************Sub thread start: detect progress bar value is now...0.0
    ************Sub thread start: update progress bar value to...1
    ************ main thread start:now start process eventSource - count : 2
    ************Sub thread start: detect progress bar value is now...1
    ************Sub thread start: update progress bar value to...2
    ************ main thread start:now start process eventSource - count : 3
    ************Sub thread start: detect progress bar value is now...2
    ************Sub thread start: update progress bar value to...3
    ************ detect error occurred , this thread should be terminated immediately !
    

    到这里就应该自然停止。

    14 条回复    2021-02-25 15:24:09 +08:00
    ch2
        1
    ch2  
       2021-02-22 11:49:59 +08:00
    seventhbible
        2
    seventhbible  
    OP
       2021-02-22 13:37:21 +08:00
    emmmmm,看了一下楼上大佬的链接,大意是我需要重新定义一个继承了线程的类,然后重写这个类下的 stop 方法(用一个布尔值的开关来控制 thread 的 run 方法)。
    在我的代码示例中,就是每次发送出错误事件的时候,通知修改这个布尔值开关 [设为全局变量] 变为 False 然后自然使得接下来的线程自动跳出 run 方法?
    不好意思,我接触 python 时间还不够长久,有些地方理解力还不是很强,如果有说错请指正。
    todd7zhang
        3
    todd7zhang  
       2021-02-22 14:07:42 +08:00
    子线程处理逻辑,中间可能会有异常,然后子线程发事件给 manager,然后让 manager 来结束这个子线程?
    如果是这样的话,为啥不是子线程中间出错了,自己退出不就行了?
    seventhbible
        4
    seventhbible  
    OP
       2021-02-22 15:23:36 +08:00
    @todd7zhang 理想情况下我是希望可以借助发送 event 来终结当前的子线程,因为 event 可以带出来错误的各种信息。对后续处理会很有帮助。
    no1xsyzy
        5
    no1xsyzy  
       2021-02-22 15:28:08 +08:00
    你可以直接 eventMgr.Stop() 来停止
    不要在 GUIListener 里写 event_manager 的停止逻辑

    - eventMgr.addEventListenter(EVENT_TYPE_2, gui.terminate_error_thread)
    + eventMgr.addEventListenter(EVENT_TYPE_2, eventMgr.Stop)

    但会造成内存泄漏,请用 weakref 替换这一 call
    seventhbible
        6
    seventhbible  
    OP
       2021-02-22 15:34:56 +08:00
    @no1xsyzy 抱歉小弟我才疏学浅,这里的 weakref 是如何替换?
    imn1
        7
    imn1  
       2021-02-22 15:51:43 +08:00
    代码就不阅读了,没空去研究逻辑

    简单的做法,就是主线程立一个 flag,子线程读取这个 flag,变更就跳出循环(每次循环判断),跳出后重置 flag,结束子线程,要确保其他控件的事件可以更改这个 flag,GUI 做这个不太难

    粗略看,你的代码是在主线程 loop ?应该放到子线程,这样主线程才能接收其他事件
    no1xsyzy
        8
    no1xsyzy  
       2021-02-22 15:59:05 +08:00
    @imn1 他这不是主线程 loop,主线程在 gui

    @seventhbible 你这里 eventMgr 在 globals 里,不好调,也不想帮你大修,你随便地看一下官方的 weakref 实现吧(不是指 C 实现,而是有了 _weakref.ref 之后如何实现其他的工具)。
    当然,因为它在 globals 里面,估计不修也没事儿。我说的内存泄漏是 eventMgr 循环引用自身。

    any( hanlder.__self__ is eventMgr for handler in eventMgr._handlers[EVENT_TYPE_2] )
    seventhbible
        9
    seventhbible  
    OP
       2021-02-22 16:45:32 +08:00
    感谢大佬们的回复,可能是一下子知识出现断层了,我先补一下其他知识。如果不懂再问。。。
    ec0
        10
    ec0  
       2021-02-22 17:01:56 +08:00
    子线程自己退出,退出前发送 event

    比如在 count 函数中

    if CUR_NUMBER == 4:
    (缩进)errorEvent = Event("error", CUR_NUMBER, type_=EVENT_TYPE_2)
    (缩进)self._event_manager.sendEvent(errorEvent)
    (缩进)return

    也就是说 event 只是传递消息,线程的终结交给子线程自己
    seventhbible
        11
    seventhbible  
    OP
       2021-02-22 17:44:17 +08:00
    @ec0 对!这也是一种方法,感谢大佬回复。但是如果我需要将这个封装成一个传参的通用方法 check_error 的话,从结构上来说它应该属于哪里呢?
    seventhbible
        12
    seventhbible  
    OP
       2021-02-22 17:51:54 +08:00
    @ec0 而且这样的话线程并不会自己结束,会无限循环这个 for 循环,从 1 到 3
    seventhbible
        13
    seventhbible  
    OP
       2021-02-23 10:47:37 +08:00
    @ec0 确实这个方法可以,我刚刚说错了。。。但是请问有没有一种统一的方法由错误事件唤起一个通用的方法来退出特定的子线程?因为可能我处理不同子线程的逻辑业务都会很多,每个逻辑业务的判断错误条件五花八门,如果可以的话,我希望只要子线程出现异常,就统一发送错误信息,交由事件管理器唤起一个统一的方法来退出这个子线程。
    seventhbible
        14
    seventhbible  
    OP
       2021-02-25 15:24:09 +08:00
    最后报告一下,是我的思考方式错误了。应该使用 try except 来判断异常错误,最后让线程自动退出关闭的。主动杀掉正在运行的线程这种操作并不合理。。。。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1235 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 33ms · UTC 23:45 · PVG 07:45 · LAX 15:45 · JFK 18:45
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.