1
liuxu 2018-10-30 16:01:13 +08:00 2
那你让它快点写
|
2
gaius 2018-10-30 16:03:29 +08:00
磁盘不行?
|
3
jinue9900 2018-10-30 16:04:04 +08:00
用 redis 缓存一层再持久(滑稽)
|
4
a65420321a OP #!/usr/local/bin/python2.7
# -*- coding:utf-8 -*- import re, time, random, hashlib, urllib, requests, os, math, json, sys, base64, torndb, uuid, threading, sys, itertools, copy, traceback import requests;requests.packages.urllib3.disable_warnings() from common import * from config import * mkdir('jobs') class MyWorker(Worker): table_coms = 'zp_coms' table_jobs = 'zp_jobs' htmlfolder = 'jobs' def crawlTask(self): try: url = self.getAddr() # 获取 url htm = self.getHtml(url, None, 0)#获取 html #getJob 解析 html 获取职位数据 self.record.update(self.getJob(htm)) self.record['flag'] = 10 except: self.record['flag'] = 99 print traceback.format_exc() finally: #向每条职位数据中添加字段 self.record['job_link_href'] = url self.record['company_type']='' self.record['company_tel']='' self.record['company_email']='' self.record['job_type']='' self.record['job_type_code']='' self.record['company_fax']='' #本次任务处理完成后更新数据库 self.update(self.table_jobs, self.record) def getJob(self, job): self.record['job_title'] = grep(u'<h1>(.+?)</h1>', job) self.record['job_salary'] = grep(u'<span class="red">(.+?)</span>', job) self.record['job_date'] = grep(u'<span.*?>发布于(.+?)</span>', job) infos = grep(u'<div class="info-primary">.+?<p>(.+?)</p>', job, re.S).replace('<em class="vline"></em>', '|').split('|') for info in infos: if u'城市' in info: self.record['city_text'] = info[3:] self.record['job_city_code'] = info[3:] break cmp_infos = grep(u'h3 class="name".+?p>(.+?)</p>',job,re.S).replace('<em class="vline"></em>', '|').split('|') for info in cmp_infos: info = re.sub('<.+?>', '',info) if info in paramx['s'].values(): self.record['company_size'] = info continue if info in paramx['i'].values(): self.record['job_industry_code'] = info self.record['company_industry'] = info continue self.record['company_linkman'] = re.sub('<.*?>', '',grep(u'<h2 class="name">(.+?)</h2>',job,re.S)).strip() self.record['address'] = grep(u'div class="location-address">(.+?)</div>',job,re.S) self.record['company_link_url'] = 'https://www.zhipin.com'+grep(u'ka="job-detail-company" href="(.+?)"', job,re.S) self.record['company_name'] = grep(u'<h3 class="name".+?>(.+?)<.*?/h3>', job) self.record['cmp_company_id']=grep(u'"job-detail-company" href="/gongsi/(.+?).html"', job) def checkHtml(self, html):#用于检查页面是否有数据需要解析 if html.find(u'<title>BOSS 直聘验证码</title>' )>=0:return 0 if html.find(u'您暂时无法继续访问~' )>=0:return 0 return 1 def reqHtml(self, addr, data=None):#获取 response headers = { 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8', 'accept-encoding': 'gzip, deflate, br', 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8', 'user-agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36', } return requests.get(addr, timeout=5, proxies=random.choice(proxies), headers=headers).content.decode('utf8', errors='ignore') def getAddr(self):#构建 url return 'https://www.zhipin.com/job_detail/%(cid)s.html?' % self.record class MyLeader(Leader): table_coms = 'zp_coms' table_jobs = 'zp_jobs' def getTaskFromDatabase(self):#从数据库中获取一万条任务 return self.dbconn.query("SELECT `id`,`flag`,`cid` FROM `%s` WHERE `flag` IN (0) LIMIT 10000;" % self.table_jobs) if __name__=='__main__': dbconn = torndb.Connection(**dbconf) MyLeader().runWork(MyWorker,1) #Leader 类 runWork()方法为开启 MyWorker 多线程的方法,数字参数为指定线程数量 #getTaskFromDatabase()从数据库取出一万条数据(cid 用于职位链接,flag 用于标识抓取进度,id 用于更新数据库) #MyWorker 为多线程类,run 方法中,self.leader.getTask()方法循环取出一条任务绑定 self.record,然后 crawlTask 函数处理任务,处理完成后 self.leader.popTask(self.record)删除此任务 #crawlTask()函数内部解析任务获取职位数据,并更新 self.record,最后 update()方法存入数据库 |
5
a65420321a OP 妈耶~代码插进去好乱~
|
6
bantao 2018-10-30 16:11:05 +08:00
加缓存,批量写。
|
7
jason94 2018-10-30 16:13:55 +08:00
一般瓶颈在硬盘,用 redis 缓存一下
|
8
xyjincan 2018-10-30 16:26:31 +08:00 1
妈呀,爬虫速度比数据库还慢,大哥,你给存数据库提取出来一个服务,在内存排队多线程提交,前台脚本只负责爬数据,转交给数据库保存服务就行了,流水线才好提速
|
9
a65420321a OP @xyjincan 原本是这么做的,被数据库保存搞了好几天搞得有些懵,原来一个爬一个存,但是爬的快存的慢跑步了多久整个进程就崩了。。然后我给改成了爬完就存多开些线程,进程倒是不会崩了,整体速度更慢了
|
10
itskingname 2018-10-30 16:44:30 +08:00 1
|
11
realpg 2018-10-30 16:44:40 +08:00
你的数据库 IO 扛不住了还是锁的原因?
自己性能分析一下 再找对应解决方案 |
12
neoblackcap 2018-10-30 16:47:03 +08:00
首先请确认你的 Mysql 适配器支不支持多线程,据我了解,多数适配都不是线程安全的,多线程写会出现问题。
其二,最好还是先汇总,然后由一个单一 worker 写入,因为写入压力全在磁盘,多线程写入不会提高性能。 |
13
xyjincan 2018-10-30 16:49:25 +08:00
@a65420321a 你可以在提交任务列表上加一个长度控制,如果内存快满了,就让服务调用方堵塞一会,先暂停爬取
|
14
whypool 2018-10-30 16:57:07 +08:00
批量 insert 效率高
爬的数据先放内存,然后走批量 |
15
luozic 2018-10-30 17:04:49 +08:00 via iPhone
爬虫数据刚拉回来又没清洗存关系数据库做啥? 直接上 nosql
|
16
jjianwen68 2018-10-30 17:04:58 +08:00
缓存爬到的数据,批量写入,缓存太大时暂停爬取
|
17
a65420321a OP @luozic 我也很想。。。甲方爸爸要 mysql
|
18
lyhiving 2018-10-30 17:28:42 +08:00
排队,队列来弄。
|
19
huaerxiela 2018-10-30 17:30:53 +08:00
twisted 异步写
|
20
Itoktsnhc 2018-10-30 17:42:42 +08:00
加队列嘛,然后用时间间隔和待入库数据量做批量插入。
|
21
luozic 2018-10-30 19:53:10 +08:00 via iPhone
先写 nosql 后面再同步到 mysql
|
22
a65420321a OP 我。。。。换了个数据库,重构了一下表结构,速度上去了
|
23
CEBBCAT 2018-11-01 14:48:37 +08:00 via Android
@a65420321a 可以详细说说吗?
|
24
CEBBCAT 2018-11-01 14:55:53 +08:00 via Android
@xyjincan 请教一下,服务器中跑的服务很多,瞬息万变,可能一转眼系统就开始杀进程了,该如何把握时机把数据存入数据库呢?
|
25
CEBBCAT 2018-11-01 15:12:35 +08:00 via Android
@a65420321a #4 请问可以把源码贴一份到 gist 吗?想学习一下,多谢
|
26
wersonliu9527 2018-11-02 09:58:07 +08:00
对于中小量数据,直接用 pandas
pd.Dataframe([]) 暂存内存后直接 to_sql,大量数据 用 scrapy+mysql/mongodb |
27
a65420321a OP @CEBBCAT 测试数据库没有配置好,换到了正式库上面,重新建了个表,索引主键唯一值什么的定义好,代码原封不动跑一遍,速度上来了。
虽然还是很慢。。。。。 |
28
a65420321a OP @wersonliu9527 140 万条数据,大小估摸在 600M 左右,试过 pandas,114M 数据导入 mysql 的时候会卡死(试了 3 次,每次都卡一个小时没反应,数据库也没变化)。
|