V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
evemoo
V2EX  ›  Python

requests 线程池无法中止子线程问题

  •  
  •   evemoo · 2023-08-08 11:35:02 +08:00 · 1336 次点击
    这是一个创建于 458 天前的主题,其中的信息可能已经有所发展或是发生改变。

    后面会接触断点续传、aiohttp 、httpx (

    线程池提交完任务(哪怕只开了一个 worker ),tqdm 显示进度条后按 Ctrl + C 并不会中断正在下载的任务。

    from concurrent.futures import ThreadPoolExecutor, as_completed
    
    import requests
    from tqdm import tqdm
    
    
    def download(url):
        try:
            filesize = int(requests.get(url, stream=True).headers['Content-length']) or 0
            filename = requests.get(url, stream=True).headers['content-disposition']
            with requests.get(url, stream=True) as responese:
                with tqdm(
                    total=filesize,
                    desc=f"Downloading: ",
                     unit="B",
                     unit_divisor=1024,
                     ascii=True,
                     unit_scale=True,
                    ) as bar:
                    with open(filename, mode="ab") as f:  # 追加模式
                        for chunk in responese.iter_content(1024):
                            f.write(chunk)
                            bar.update(len(chunk))
        except (SystemExit, KeyboardInterrupt):
            raise "KeyboardInterrupt"
                  
    
    urls = [...]
    with ThreadPoolExecutor(maxworkers=8) as pool:
    	tasks = pool.map(download, urls)
    
    for task in as_completed(tasks):
    	print(task.result())
    
    
    第 1 条附言  ·  2023-08-08 12:27:31 +08:00

    更准确的提问应该是:concurrent.futures 的 submit、map 方法提交后,无法捕获 Ctrl + C 信号终止子线程 简单测试 delay server 如下:

    import time
    
    from fastapi import FastAPI
    from fastapi.responses import FileResponse
    from uvicorn import run
    
    app = FastAPI()
    
    
    @app.get("/")
    def hello():
        return {"data": "hello world"}
    
    
    @app.get("/pic/{file}")
    def d(file: str):
        time.sleep(60)
        return FileResponse(f"pic/{file}")
    
    
    if __name__ == "__main__":
        run("download_server:app", host="localhost", port=8000, reload=True)
    
    
    6 条回复    2023-08-11 10:54:01 +08:00
    t133
        1
    t133  
       2023-08-08 11:55:08 +08:00 via iPhone   ❤️ 1
    指定 daemon thread 可解
    evemoo
        2
    evemoo  
    OP
       2023-08-08 13:13:59 +08:00
    @t133
    实际测试,join() 还要带 timeout 参数才能 raise "KeyboardInterrupt"

    ```python
    tasks = []
    for url in urls:
    task = threading.Thread(target=download, args=(url,))
    task.setDaemon(True)
    tasks.append(task)
    task.start()

    for task in tasks:
    task.join(2)
    ```
    evemoo
        3
    evemoo  
    OP
       2023-08-08 13:15:00 +08:00
    搜了下 concurrent.futrues 的 daemon ,没有找到相似解法。
    ruanimal
        4
    ruanimal  
       2023-08-08 14:08:29 +08:00
    你需要的是 graceful shutdown 吧,看看 signal 处理
    ClericPy
        5
    ClericPy  
       2023-08-08 18:40:02 +08:00
    和 Requests 无关, 多线程本来就很难终止, 不然我也不会 all-in 协程去了.
    chaoshui
        6
    chaoshui  
       2023-08-11 10:54:01 +08:00   ❤️ 1
    如果不关心子线程的任务情况的话,可以直接在主线程关闭线程池 pool. shutdown(wait=False) 。
    如果关心子线程的任务情况,比如正在下载的文件继续下载完毕或者怎样,可以通过一个 threading.Event 通知正在进行的任务退出,配合 Future 对象的 cancel 方法可以干净地处理所有的任务。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2384 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 30ms · UTC 15:45 · PVG 23:45 · LAX 07:45 · JFK 10:45
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.