V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
eccentric579
V2EX  ›  Python

apscheduler 如何在完成有限个任务后退出

  •  
  •   eccentric579 · 2021-09-16 03:57:47 +08:00 · 1602 次点击
    这是一个创建于 1207 天前的主题,其中的信息可能已经有所发展或是发生改变。
    today = datetime.datetime.now().strf(''%Y-%m-%d'')
    scheduler = BlockingScheduler()
    scheduler.add_job(job, 'cron', hour = '9-11', minute = '*/5', end_date = day + " 11:35:00", timezone = tz)  
    scheduler.start()  
    scheduler.shutdown()    
    

    用 blockingscheduler 不会返回
    用 background, shutdown 又没有阻塞,直接就结束了

    2 条回复    2021-09-16 09:56:06 +08:00
    eccentric579
        1
    eccentric579  
    OP
       2021-09-16 08:56:41 +08:00
    睡了一觉,起来有点思路了。
    用 blockingscheduler,那么创建一个检查任务完成就触发异常的 job,用 try 捕捉然后 shutdown
    或者用 backgroundsheduler,让主进程检查任务完成,未完成就 sleep

    感觉有点像是土办法
    princelai
        2
    princelai  
       2021-09-16 09:56:06 +08:00
    我之前写过类似的,和你思路差不多,外部用一个类记录状态并判断

    ```python
    from apscheduler.executors.pool import ThreadPoolExecutor
    from apscheduler.jobstores.memory import MemoryJobStore
    from apscheduler.schedulers.background import BlockingScheduler
    from loguru import logger


    class LimitJob:
    n = 0

    @classmethod
    def print_job(cls):
    cls.n += 1
    logger.info(f"run job {cls.n} times")
    if cls.n >= 5:
    jobs.shutdown(wait=False)


    def init_scheduler() -> BlockingScheduler:
    jobstores = {
    'default': MemoryJobStore()
    }
    executors = {
    'default': ThreadPoolExecutor(1)
    }
    job_defaults = {
    'coalesce': False,
    'max_instances': 1}

    scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
    scheduler.add_job(LimitJob.print_job, 'interval', seconds=3, id="print_job")
    logger.info("调度任务创建成功")
    scheduler.print_jobs()
    return scheduler


    if __name__ == '__main__':
    jobs = init_scheduler()
    jobs.start()

    ```

    https://pastebin.ubuntu.com/p/XQMtf4RXgp/
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2643 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 05:04 · PVG 13:04 · LAX 21:04 · JFK 00:04
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.