pyspider
这是一个用Python写的爬虫框架,并以web服务的形式,提供了创建、编辑、运行、管理任务。整体的思路还是非常不错的。以前在ubuntu上安装过,因为依赖的存在,可能并不好安装。但是,有了docker,安装瞬间不成问题。
由于提供了web界面,所有的操作都可以在web操作,熟练后,对于常见的网站,20分钟内,就能编写好爬写任务,所以,还是非常不错的。
本文简述一下使用方式。
资源
安装
docker run --rm -itd --name pyspider -p 5000:5000 saibaster/pyspider
然后在浏览器上,访问即可。
问题
资源文件
居然使用了cdnjs的css、js资源文件,需要想办法绕过去。否则可能会缺少样式。如何解决,变成本地的元素,暂未想到。
下载资源脚本:
curl http://localhost:55235/ -o index.html
for each in `egrep -o 'cdnjs.cloudflare.com/ajax/libs/([^"]*?)' index.html | sed 's#cdnjs.cloudflare.com/ajax/libs/##'`;do
echo $each;
curdir=${each%/*}
filename=${each##*/}
if [ ! -d curdir ];then
mkdir -p "static/$curdir"
fi
curl -o "static/$each" "http://cdnjs.cloudflare.com/ajax/libs/$each"
done
定制docker
FROM saibaster/pyspider:1.29
COPY app.py /opt/pyspider/pyspider/webui/app.py
ADD static.tar.gz /opt/pyspider/pyspider/webui/
构建
docker build -t pyspider .
启动
docker run --rm -itd --name pyspider -p 5000:5000 pyspider
这样,就解决了使用外部资源的问题。
异常捕捉
遇到了中文的url,因为它默认会处理所有的url,结果遇到中文的url直接失败了,抛出异常。虽然此异常,并不会影响整体的爬取,但是呢,却无法查清楚到底是哪些任务失败了,以及无法手动处理?(只能看到重试次数)
可能的结果是:导致数据爬取不完整……,这就tm太坑了吧?
貌似,retry的时候,可以看到。
使用
操作流程
点新建,在右侧点编辑 代码。左侧上,点run,箭头控制,到那个层级的页面。在下面,follow中,处理具体的页面。比较好用的是web浏览,然后css选择器,会帮助自动选择。
模板文件
下面是给的样本文件,从on_start开始,编写一级级页面爬取的回调。除了框架的功能,python语言本身的功能,都是可以用的。如re等等。
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# Created on 2022-01-10 06:27:19
# Project: scc
from pyspider.libs.base_handler import *
class Handler(BaseHandler):
crawl_config = {
}
@every(minutes=24 * 60)
def on_start(self):
self.crawl('__START_URL__', callback=self.index_page)
@config(age=10 * 24 * 60 * 60)
def index_page(self, response):
for each in response.doc('a[href^="http"]').items():
self.crawl(each.attr.href, callback=self.detail_page)
@config(priority=2)
def detail_page(self, response):
return {
"url": response.url,
"title": response.doc('title').text(),
}
解析
pyquery
比较常用的如下,即在模版中常用的。
for each in response.doc('a[href^="http"]').items():
each.attr.xxx # 其他数据
each.text() # 返回文本内容
each.html() # 没有试过
each('td') # 继续往下查找,
each('td').text() # 如返回节点
[x.text() for x in each('td').items()] # 继续遍历该节点
self.crawl(each.attr.href, callback=self.detail_page)
response.doc是对pyquery(一种类似于jquery的库)的封装。可以非常方便的抽取出信息。
使用正则,匹配特定的url
for each in response.doc('a[href^="http"]').items():
if re.match("http://www.imdb.com/title/tt\d+/$", each.attr.href):
self.crawl(each.attr.href, callback=self.detail_page)
json
如果是直接请求接口文件,返回的是json数据,那么,可以使用下面的方法来解析。
返回的是dict对象
response.json("字段")
re
PhantomJS
解析js渲染的页面。
class Handler(BaseHandler):
def on_start(self):
self.crawl('http://www.twitch.tv/directory/game/Dota%202',
fetch_type='js', callback=self.index_page)
def index_page(self, response):
return {
"url": response.url,
"channels": [{
"title": x('.title').text(),
"viewers": x('.info').contents()[2],
"name": x('.info a').text(),
} for x in response.doc('.stream.item').items()]
}
使用js来模拟页面的滚动效果
class Handler(BaseHandler):
def on_start(self):
self.crawl('http://www.pinterest.com/categories/popular/',
fetch_type='js', js_script="""
function() {
window.scrollTo(0,document.body.scrollHeight);
}
""", callback=self.index_page)
def index_page(self, response):
return {
"url": response.url,
"images": [{
"title": x('.richPinGridTitle').text(),
"img": x('.pinImg').attr('src'),
"author": x('.creditName').text(),
} for x in response.doc('.item').items() if x('.pinImg')]
}
多级页面
页面分类
按它的设计,任务会从上一级、往下一级流转,但是呢,有的时候,是否能设置一个智能的分类,决定网哪个层级跳转?
下面 这个例子,很好的说明了,页面,往哪个回调流转,即,不要惯性认为,从某一级页面必须要流转到下一级页面。
def index_page(self, response):
for each in response.doc('a[href^="http"]').items():
if re.match("http://www.imdb.com/title/tt\d+/$", each.attr.href):
self.crawl(each.attr.href, callback=self.detail_page) # 到下一级
self.crawl(response.doc('#right a').attr.href, callback=self.index_page) # 继续
有一种场景:
url一般都是有一定的规则的,我们可以根据url的格式,来决定,往哪个页面回调来处理。相当于,用switch来当个智能中转。
有的时候,也可以通过多增加一层,避免在最后结果页面上,再发起爬取任务。比如,那种左边是目录页面,右遍是内容。(相当于,一个页面,经过两个任务处理。)。
变量传递
从上级页面,能否往下级页面传递一些变量?这些变量貌似还不能放到类里面。即,数据是从多个页面获取到的?
变量绑定在reponse对象的save属性上。具体用法如下:
@config(priority=3)
def index_page2(self, response):
for each in response.doc('a[href^="http"]').items():
if each.attr.href== 'http://www.miibeian.gov.cn/':
continue
if each.text().isdigit():
continue
save = {
"sheng":response.save["sheng"],
"shi":response.save["shi"],
"xian":each.text()
}
#print(save)
self.crawl(each.attr.href, callback=self.index_page3,save =save)
重点:
# 访问
response.save["sheng"],
# 该接口,往下级页面传递
self.crawl(each.attr.href, callback=self.index_page3,save =save)
多返回值
- 最后页面返回多条数据
最后的详情页面,默认的是return,能否yield?如果不能yield,那么能否设置一个空的下载任务,只负责解析?
官网有介绍:
@config(priority=5)
def detail_page(self, response):
# 错误示范,导致辞职循环的数据,默认都是最后1条。
save = {
"url": response.url,
"title": response.doc('title').text(),
"sheng":response.save["sheng"],
"shi":response.save["shi"],
"xian":response.save["xian"],
"xiang":response.save["xiang"],
}
for i, each in enumerate(response.doc('tr.villagetr').items()):
# 应该在此处,新建对象处理。
cun_infos = [x.text() for x in each("td").items()]
save["cun_bm"] = cun_infos[0]
save["cun_type"] = cun_infos[1]
save["cun_name"] = cun_infos[2]
print(save)
self.send_message(self.project_name, save,url="%s#%s" % (response.url, i))
def on_message(self, project, msg):
return msg
猜测:多返回值的时候,url应该是要重定义的,否则,保存的值,会被覆盖。
使用for循环调用self.send_message方法,然后定义on_message方法,貌似,不定义,虽然也能跑,但是无法展示出数据。
官网说:As resultdb de-duplicate results by taskid(url), the latest will overwrite previous results.One workaround is using send_message API to make a fake taskid for each result.
- 多个页面,都返回数据
比如,有多个回调页面,然后呢,每个回调页面都
可用变量
self.project_name
self.project #information about current project
self.response
self.task
写数据
写数据,可以自己实现on_result方法
二进制文件
比如要写二进制文件等等。直接写文件即可
response.content # 返回的内容。直接写文件即可
on_result(self, result)
结构
结构图
结构说明:
适合维护大量的抓取代码,即可以建不同的工程。每个工程的数据可能会更新,通过配置,页面能快速的更新。关于请求,可以是get、post等请求。
总体来说,它是一个工程化的东西。
每个部分都被称为它的一个组件。组件之间,通过消息队列来传递。并不是直接的调用。但是调度器,只有一个。
组件
调度器
通过self.crawl接口来调用。还实现了令牌桶算法。用来控制并发流量等等。
令牌桶算法介绍 具体参见后面:令牌桶算法
调度器,内部维护了一个任务队列。进入队列或者进入数据库中的任务,后续代码的更改,比如判断是否爬取这样的功能,后续都没有办法再控制了。
下载器
抓取页面,然后然处理器来处理。支持dataURI ,还可以使用Phantomjs 。
处理器
负责处理数据。绑定了PyQuery工具,response.doc来调用。可以产出需要的数据,也可以产生新的任务,供下一轮处理。
Result Worker
存数据
WebUI
web的前端页面。
数据流
- 点击开始时,运行脚本的
on_start方法,该方法,会往调度器类产生任务。 - 调度器,调度任务,on_start产生的任务,也会像正常的任务一样被调度。
- 下载器,下载任务,并将结果返回给处理器。
- 处理器,会处理数据,并跟据情况,产生新的任务,并通过
self.crawl接口,发送给调度器,任务完成后,会发消息给调度器,告知消息已经完成了。有返回结果,则通过result_queue队列,返回。 - 调度器,接收处理器发过来的新任务,判断,该任务是否需要重爬,如果需要,则按照优先级放到队列中。
- 处理器,会不停的处理,……
任务
任务4种状态:成功、失败、激活中、bad(暂未用)
新任务:
exetime如果设置了,不到时间,不会运行 。
age或itag来判断是否是新任务
任务重试:
retry_delay跟crawl_config同级。定义,任务失败后,何时会再次触发。
retry_delay = {
0: 30,
1: 1*60*60,
2: 6*60*60,
3: 12*60*60,
'': 24*60*60
}
默认,重试3次。
删除任务
如果发现运行的爬虫不合理,则可以更改规则,但是,已进入任务库的数据,则无法处理。故手动删库,如:
delete FROM "taskdb_taimeiti" where url not like "%tmtpost.com%" and process like '%index_page%' ;
但是,调度器内的任务,暂时没有想到方法删除。(依然会被调度)
配置
配置可以囟到 crawl_config属性上,也可以,1、在定义回调函数的时候,2、使用注解来实现。3、还可以在self.crawl(each.attr.href, callback=self.index_page3,age=age)时,来定义。
class Handler(BaseHandler):
crawl_config = {
'headers': {
'User-Agent': 'GoogleBot',
}
}
代理
class Handler(BaseHandler):
crawl_config = {
'proxy': 'localhost:8080'
}
@every(minutes=24 * 60)
定义任务,多少小时,重新跑一遍。一般定义在 on_start,函数上。
@config(age=10 * 24 * 60 * 60)
定义页面多久重新,算过期。一个页面,可能已经爬取过了,下一次,再爬的时候,默认会忽略的。但是,过了age时间之后,才会重爬一次。
@config(priority=2)
定义优先级,优先爬取数值小的页面。
rate/brust
rate定义1秒,大概取几次结果。如定义为1,则,每秒1个任务。如果0.1,每10秒1个任务。
burst 搞不太明白,看了原理后,大概相当于令牌桶的容量上面是多少,多了就会丢弃。
任务处理
from pyspider.database import connect_database
resultdb = connect_database("<your resutldb connection url>")
for project in resultdb.projects:
for result in resultdb.select(project):
assert result['taskid']
assert result['url']
assert result['result']
其他
分布式
访问加密
如果服务部署在公网上,那么,还需要访问加密,并不能让人随便访问?简单的方式,增加一层nginx,然后让nginx来控制权限?或者,内置了安全访问配置如下:
启动如下:
pyspider -c /opt/pyspider/data/config.json
配置的json
{
"taskdb": "mysql+taskdb://username:password@host:port/taskdb",
"projectdb": "mysql+projectdb://username:password@host:port/projectdb",
"resultdb": "mysql+resultdb://username:password@host:port/resultdb",
"message_queue": "amqp://username:password@host:port/%2F",
"webui": {
"username": "some_name",
"password": "some_passwd",
"need-auth": true
}
}
结束后,触发
比如爬取结束后,想要触发消息。发钉钉通知等等。有些场景下,会触发。(一次性任务,任务结束。重复性任务,即every修饰的任务,每次结束都会触发。)
on_finished
存储
api
self.crawl
至少是这样调用的。
self.crawl(each.attr.href, callback=self.detail_page)
# 多个url
self.crawl([x.attr.href for x in response.doc('#right a').items()], callback=self.index_page)
age
@config(age=10 * 24 * 60 * 60)
def index_page(self, response):
priority
self.crawl('http://www.example.org/233.html', callback=self.detail_page,
priority=1)
exetime
任务不会立即执行,会在制定的exetime到了之后,才会执行。
import time
def on_start(self):
self.crawl('http://www.example.org/', callback=self.callback,
exetime=time.time()+30*60)
retries
默认重试次数为3次。
itag
def index_page(self, response):
for item in response.doc('.item').items():
self.crawl(item.find('a').attr.url, callback=self.detail_page,
itag=item.find('.update-time').text())
全部重爬一次。使用itag来标记一个版本。
class Handler(BaseHandler):
crawl_config = {
'itag': 'v223'
}
auto_recrawl
默认false。开启的效果是:设置了age,在age到了之后,会自动的重爬页面。
def on_start(self):
self.crawl('http://www.example.org/', callback=self.callback,
age=5*60*60, auto_recrawl=True)
method
请求方式
params
请求参数,以下为两种请求效果一致。
def on_start(self):
self.crawl('http://httpbin.org/get', callback=self.callback,
params={'a': 123, 'b': 'c'})
self.crawl('http://httpbin.org/get?a=123&b=c', callback=self.callback)
data
post的请求数据
def on_start(self):
self.crawl('http://httpbin.org/post', callback=self.callback,
method='POST', data={'a': 123, 'b': 'c'})
files
上传文件
{field: {filename: 'content'}}
user_agent
用户 User-Agent
headers
请求头。
cookie
self.crawl(url,cookies={"key": value})
connect_timeout
初始化链接超时。默认20秒。
timeout
请求页面的最大时间。最大为120秒。
allow_redirects
是否允许重定向。默认 true,允许。
fetch_type
设置为js,运行抓取运行js脚本
save
多级页面数据传递
def on_start(self):
self.crawl('http://www.example.org/', callback=self.callback,
save={'a': 123})
def callback(self, response):
return response.save['a']
Response
Response对象的属性列表
Response.url
final URL.
Response.text
Content of response, in unicode.
if Response.encoding is None and chardet module is available, encoding of content will be guessed.
Response.content
Content of response, in bytes.
Response.doc
A PyQuery object of the response’s content. Links have made as absolute by default.
Refer to the documentation of PyQuery: https://pythonhosted.org/pyquery/
It’s important that I will repeat, refer to the documentation of PyQuery: https://pythonhosted.org/pyquery/
Response.etree
A lxml object of the response’s content.
Response.json
The JSON-encoded content of the response, if any.
Response.status_code
Response.orig_url
If there is any redirection during the request, here is the url you just submit via self.crawl.
Response.headers
A case insensitive dict holds the headers of response.
Response.cookies
Response.error
Messages when fetch error
Response.time
Time used during fetching.
Response.ok
True if status_code is 200 and no error.
Response.encoding
Encoding of Response.content.
If Response.encoding is None, encoding will be guessed by header or content or chardet(if available).
Set encoding of content manually will overwrite the guessed encoding.
Response.save
The object saved by self.crawl API
Response.js_script_result
content returned by JS script
Response.raise_for_status()
Raise HTTPError if status code is not 200 or Response.error exists.
@catch_status_code_error
正常是只处响应为200的请求,加上该装饰后,可以处理其他的请求。
@catch_status_code_error
def callback(self, response):
# 即使返回404,也会处理数据
原理篇
webui
webui整体架构,使用了flask+jinjia2模板引擎,负责界面操作,具体的数据的crud入库,具体需要控制任务的,实际上就通过rpc交给调度器来操作。
令牌桶算法
算法可以从网上找到描述,但是,这里面实现的非常的简单。具体代码如下:
import time
try:
import threading as _threading
except ImportError:
import dummy_threading as _threading
class Bucket(object):
'''
traffic flow control with token bucket
'''
update_interval = 30
def __init__(self, rate=1, burst=None):
self.rate = float(rate)
if burst is None:
self.burst = float(rate) * 10
else:
self.burst = float(burst)
self.mutex = _threading.Lock()
self.bucket = self.burst
self.last_update = time.time()
def get(self):
'''Get the number of tokens in bucket'''
now = time.time()
if self.bucket >= self.burst:
self.last_update = now
return self.bucket
bucket = self.rate * (now - self.last_update)
self.mutex.acquire()
if bucket > 1:
self.bucket += bucket
if self.bucket > self.burst:
self.bucket = self.burst
self.last_update = now
self.mutex.release()
return self.bucket
def set(self, value):
'''Set number of tokens in bucket'''
self.bucket = value
def desc(self, value=1):
'''Use value tokens'''
self.bucket -= value
使用,使用时,get先拿到token,然后,确定有,然后才会去消费desc
在任务队列中,如下使用。
class TaskQueue(object):
def get(self):
'''Get a task from queue when bucket available'''
if self.bucket.get() < 1:
return None
now = time.time()
self.mutex.acquire()
try:
task = self.priority_queue.get_nowait()
self.bucket.desc()
except Queue.Empty:
self.mutex.release()
return None
task.exetime = now + self.processing_timeout
self.processing.put(task)
self.mutex.release()
return task.taskid
six
以前见过这个库,在这个框架中,依然有six库的身影。那么,它是做什么的呢?
网上查看了下,原来是封装python2、python3差异的库。soga
实践篇
常用函数
# 层层传递
save = {
"from_url":response.save["from_url"] if response.save else ""
}
self.crawl(each.attr.href, callback=self.index_page,save = save)
# 函数判断
if each.attr.href.startswith('http://www.qishuxx.com/txt/') and each.attr.href.endswith('.html'):
# 正则
re.match('https?://www\.starurl\.com/video/\d+.html',url)
# 正则
re.match('https?://(www\.)?starurl\.com/video/\d+.html',url)
#域名判断
re.match('https?://(.*?\.)?starturl\.com/',url)
# 域名判断
url[:40].find('tmtpost.com')>0 :
# 待做,域名解析、host解析,路由解析等等功能。
待做,save用一个函数生成。
自定义函数
url = 'starturl.com23:8000/jj'
def parse_url(url):
if url.lower().startswith('http://'):
url = url[7:]
protocol = "http"
elif url.lower().startswith('https://'):
url = url[8:]
protocol = "https"
else:
protocol = "http"
pos = url.find('/')
if pos == -1:
host = url
queryall = ''
else:
host = url[:pos]
queryall = url[pos:]
pos = host.rfind(':')
if pos > -1:
hostname = host[:pos]
port = host[pos+1:]
else:
hostname = host
port = 80
pos = queryall.find('?')
if pos > -1:
pathname = queryall[:pos]
params = queryall[pos:]
else:
pathname = queryall[:]
params = ''
pos = queryall.rfind('#')
if pos > -1:
anchor = queryall[pos:]
else:
anchor =''
return {
"hostname":hostname,
"host":host,
"port":port,
"protocol":protocol,
"pathname":pathname,
"params":params,
"anchor":anchor
}
抓取常用的方式
nth-child
对于无法区分的
"url_rar": response.doc('li:nth-child(1)>.downButton').attr.href,
"url_txt": response.doc('li:nth-child(2)>.downButton').attr.href,
"book_info": [x.text() for x in response.doc('li.small').items() ],
response.doc('li>em[class^="lstar"]').attr['class'],
posts_buoy = response.doc('.posts_buoy').prev()
video = ""
for i in range(10):
if posts_buoy.text():
video = posts_buoy.text()
break
posts_buoy = posts_buoy.prev()
智能回调
在index_page层放一个if elif elif判断,用来判断,是否继续爬,或者定义一下爬取的策略,如age等等。但是,each.attr.href的链接,其实都是经过处理过的,默认都是以http开头,但是呢,也有外链,所以呢,需要根据情况来判断。避免爬到其他网。
需要注意的是,结果提取,也有两个回调,既针对了不同的页面,编写不同的提取规则。
class Handler(BaseHandler):
crawl_config = {
}
@every(minutes=24 * 60)
def on_start(self):
self.crawl('https://www.starurl.com/', callback=self.index_page)
@config(age=10 * 24 * 60 * 60)
def index_page(self, response):
for each in response.doc('a[href^="http"]').items():
url = each.attr.href
save = {
"from_url":response.save["from_url"] if response.save else ""
}
if re.match('https?://www\.starurl\.com/\d+.html',url):
self.crawl(each.attr.href, callback=self.detail_page,save = save)
elif re.match('https?://www\.starurl\.com/video/\d+.html',url):
self.crawl(each.attr.href, callback=self.video_page,save = save)
else:
self.crawl(each.attr.href, callback=self.index_page,save = {"from_url":url})
@config(priority=2)
def detail_page(self, response):
return {
"url": response.url,
"title": response.doc('title').text(),
}
@config(priority=5)
def video_page(self, response):
return {
"url": response.url,
"title": response.doc('title').text(),
}
精确分析任务链接
上面直接拿到所有的链接,然后进行爬取,太宽泛了。有的时候,需要精确的从指定的url分析出url。
def index_page(self, response):
for each in response.doc('.module-categories > .module-content > ul > li [href^="http"]').items():
多返回值
比如爬取的各层级的页面,都有要提取的信息,如果一般是使用response.save来传递,体现在最终的结果上,但是呢,也可以,不保存,因为这些数据,都放到task表中的,以后手动的从数据中库拿到这些数据。
def detail_page(self, response):
for i, each in enumerate(response.doc('tr.villagetr').items()):
save = {
"url": response.url,
}
cun_infos = [x.text() for x in each("td").items()]
save["cun_name"] = cun_infos[2]
self.send_message(self.project_name, save,url="%s#%s" % (response.url, i))
def on_message(self, project, msg):
return msg