V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
seoguess
V2EX  ›  Python

Python 爬虫多线程问题咨询

  •  
  •   seoguess · 2019-04-25 15:22:06 +08:00 · 2887 次点击
    这是一个创建于 2044 天前的主题,其中的信息可能已经有所发展或是发生改变。

    刚开始研究多线程,看了很多视频跟文章,但还是概念混淆,所以过来论坛求助。

    1、

    对于爬虫而言,for 循环多线程抓取 N 个页面,然后获取内容到 MongoDB 数据库,错误链接写入 error.txt,是线程安全的吗?如果不安全,我通过在类里面的采集入库跟写入文件加锁,是不是可以解决?我之前没加锁,抓了 36W 条数据,感觉没发现有什么问题。

    入库部分如下:

    # 启动抓取函数
    def run(self):
        try:
            rst_json = self.claw_detail()
            if rst_json != None:
                gLock.acquire()
                result = table_lines.insert_one(rst_json)
                gLock.release()
            else:
                result = 'null'
        except:
            with open(file_path + "/error.txt", 'a+') as f1:
                f1.write(self.hotelid)
                f1.write('\n')
                f1.flush()
            result = 'error'
            time.sleep(5)
        finally:
            time.sleep(1)
    
        return result
    

    2、

    多层 for 循环嵌套的时候,这么用多线程可以吗? t.join()这么用没有问题吧? t.setDaemon(True)需要设置吗?

    main 函数

    def main(): for dateTuple in dateList:

        threads = []
        
        for i in id_lines.find():
            hotelId = i.get('hotelId')
            threads.append(hotelId)
    
    
        for hotelid in threads:
            t = ClawData(hotelid,headersCookie)
            #t.setDaemon(True)
            t.start()
    
        for hotelid in threads:
            t.join()
    
        time.sleep(3)
    

    非常感谢!

    14 条回复    2019-04-26 23:13:06 +08:00
    seoguess
        1
    seoguess  
    OP
       2019-04-25 16:18:42 +08:00
    如果 for 循环加上加锁不安全的话,那么 Queue 队列应该如何应用?

    非生产者消费者模式感觉太复杂了,抓取的类只需要传入 cookie 跟对应的 id 来生成 url。

    或者我把 url 跟 cookie 变成一个 tuple,然后判断 not Queue.empty(),然后通过 for 循环 + Queue.get()多线程去抓取内容可行?

    求解惑,谢谢!
    Juszoe
        2
    Juszoe  
       2019-04-25 19:02:17 +08:00   ❤️ 1
    python 有全局锁 GIL,不用考虑线程不安全的问题,IO 型任务多线程随便用,除非你要把一段代码做成原子操作
    scriptB0y
        3
    scriptB0y  
       2019-04-25 19:07:35 +08:00   ❤️ 1
    @Juszoe 错啦,Python 的 GIL 带来的作用是虚拟机的 opcode 是线程安全的,但是并不是每一行代码是线程安全的。请看这个例子:

    https://python-parallel-programmning-cookbook.readthedocs.io/zh_CN/latest/chapter2/06_Thread_synchronization_with_Lock_and_Rlock.html
    scriptB0y
        4
    scriptB0y  
       2019-04-25 19:11:31 +08:00
    对于楼主的问题,不是线程安全的,会有多个线程同时操作数据库,但是只要你的任务之间不相关,没有共享资源,即使线程不安全也是没有问题滴。

    但是有几点建议楼主可以参考:
    1. 写文件肯定是非线程安全的,你这种用法应该使用 logging 模块,第一他是 thread safe 的,第二它是专门写 log 的。
    2. 你不需要自己操作这么低级的 API 啦,可以看下 concurrent 模块,里面封装了线程池,用起来更方便! https://docs.python.org/3/library/concurrent.futures.html
    Juszoe
        5
    Juszoe  
       2019-04-25 19:16:51 +08:00
    @scriptB0y #3 学到了,可能我之前的线程都是任务不相关的,所以没有感觉到
    scriptB0y
        6
    scriptB0y  
       2019-04-25 19:19:42 +08:00
    @Juszoe 是的,即使是相关的,也是需要一定的几率才会发现线程不安全的现象。Python3 GIL 调度策略换了,出现的几率更低了。
    seoguess
        7
    seoguess  
    OP
       2019-04-25 19:43:34 +08:00
    @scriptB0y 谢谢!我研究一下。
    Leigg
        8
    Leigg  
       2019-04-25 19:45:03 +08:00 via iPhone   ❤️ 1
    没有同步或者资源竞争问题不讨论线程安全,setDaemon 需要设置 true,否则当程序异常停止时,会有僵尸线程。
    seoguess
        9
    seoguess  
    OP
       2019-04-25 21:24:25 +08:00
    @Leigg 谢谢!如果我的 main 函数大致结构是:

    for cookie in cookies: # 1、获取 N 个不同的生成的 cookie
    for id in id_list: #2、获取不同的 id 来生成 url
    t = threading.Thread(claw(cookie,id), args) # 3、多线程获取内容、入库、记录错误
    t.start()


    是不是逻辑上,我把第三步中的采集入库、记录错误上锁了,1、2 中对应的 cookie 跟 id 不会出现多线程引起的数据错乱?
    Leigg
        10
    Leigg  
       2019-04-25 21:35:21 +08:00 via iPhone
    记录错误用 logging,不能用 with open ;
    入库记得 db 连接用连接池,或者入库单独一个函数,总之不要每个线程创建一个 db 连接,没有其他的同步或者资源竞争问题吧?
    seoguess
        11
    seoguess  
    OP
       2019-04-26 10:16:22 +08:00
    @scriptB0y 我用 concurrent 模块重新修改了下代码,发现效率比我之前的代码差了好多....
    for 循环: #获取 cookie:
    threads = [ (i.get('hotelId'),headersCookie) for i in id_lines.find() ]
    pool = ThreadPoolExecutor()
    future_tasks = [ pool.submit(start_claw, t) for t in threads ]
    wait(future_tasks, return_when=ALL_COMPLETED)

    time.sleep(3)


    3K 左右的链接,用时 382 秒

    for 循环: #获取 cookie:
    threads = []

    for i in id_lines.find():
    hotelId = i.get('hotelId')
    threads.append(hotelId)


    for hotelid in threads:
    t = ClawData(hotelid,headersCookie)
    t.setDaemon(True) #防止程序异常退出时,有僵尸进程存在
    t.start()

    for hotelid in threads:
    t.join()

    time.sleep(3)

    用时:52 秒

    请问为啥效率可以差别这么大?
    zy342500
        12
    zy342500  
       2019-04-26 17:43:58 +08:00   ❤️ 1
    @seoguess pool = ThreadPoolExecutor(max_workers=20) 这样写 给他设置工作线程数
    seoguess
        13
    seoguess  
    OP
       2019-04-26 22:56:20 +08:00
    @zy342500 谢谢,我以为放空的话就是没有限制。

    max_workers=100,跑完用时 79 秒
    max_workers=1000,跑完用时 49 秒
    seoguess
        14
    seoguess  
    OP
       2019-04-26 23:13:06 +08:00
    原来 max_worker 为空的情况下,默认线程为 cpu 核数量*5,难怪花了 300+秒。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   3459 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 33ms · UTC 10:58 · PVG 18:58 · LAX 02:58 · JFK 05:58
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.