pyspider

这是一个用Python写的爬虫框架,并以web服务的形式,提供了创建、编辑、运行、管理任务。整体的思路还是非常不错的。以前在ubuntu上安装过,因为依赖的存在,可能并不好安装。但是,有了docker,安装瞬间不成问题。

由于提供了web界面,所有的操作都可以在web操作,熟练后,对于常见的网站,20分钟内,就能编写好爬写任务,所以,还是非常不错的。

本文简述一下使用方式。

资源

安装

docker run  --rm  -itd   --name pyspider  -p 5000:5000  saibaster/pyspider

然后在浏览器上,访问即可。

问题

资源文件

居然使用了cdnjs的css、js资源文件,需要想办法绕过去。否则可能会缺少样式。如何解决,变成本地的元素,暂未想到。

下载资源脚本:

curl http://localhost:55235/ -o index.html
for each in `egrep  -o 'cdnjs.cloudflare.com/ajax/libs/([^"]*?)' index.html | sed 's#cdnjs.cloudflare.com/ajax/libs/##'`;do
    echo $each;
    curdir=${each%/*}
    filename=${each##*/}
    if [ ! -d curdir ];then
        mkdir -p "static/$curdir"
    fi
    curl -o "static/$each" "http://cdnjs.cloudflare.com/ajax/libs/$each"
done

定制docker

FROM saibaster/pyspider:1.29
COPY app.py /opt/pyspider/pyspider/webui/app.py
ADD static.tar.gz  /opt/pyspider/pyspider/webui/

构建

docker build -t pyspider   .

启动

docker run  --rm  -itd   --name pyspider  -p 5000:5000  pyspider

这样,就解决了使用外部资源的问题。

异常捕捉

遇到了中文的url,因为它默认会处理所有的url,结果遇到中文的url直接失败了,抛出异常。虽然此异常,并不会影响整体的爬取,但是呢,却无法查清楚到底是哪些任务失败了,以及无法手动处理?(只能看到重试次数)

可能的结果是:导致数据爬取不完整……,这就tm太坑了吧?

貌似,retry的时候,可以看到。

使用

操作流程

点新建,在右侧点编辑 代码。左侧上,点run,箭头控制,到那个层级的页面。在下面,follow中,处理具体的页面。比较好用的是web浏览,然后css选择器,会帮助自动选择。

模板文件

下面是给的样本文件,从on_start开始,编写一级级页面爬取的回调。除了框架的功能,python语言本身的功能,都是可以用的。如re等等。

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# Created on 2022-01-10 06:27:19
# Project: scc

from pyspider.libs.base_handler import *


class Handler(BaseHandler):
    crawl_config = {
    }

    @every(minutes=24 * 60)
    def on_start(self):
        self.crawl('__START_URL__', callback=self.index_page)

    @config(age=10 * 24 * 60 * 60)
    def index_page(self, response):
        for each in response.doc('a[href^="http"]').items():
            self.crawl(each.attr.href, callback=self.detail_page)

    @config(priority=2)
    def detail_page(self, response):
        return {
            "url": response.url,
            "title": response.doc('title').text(),
        }

解析

pyquery

比较常用的如下,即在模版中常用的。

for each in response.doc('a[href^="http"]').items():
    each.attr.xxx  # 其他数据
    each.text()   # 返回文本内容
    each.html()   # 没有试过
    each('td')    # 继续往下查找,
    each('td').text()  # 如返回节点
    [x.text() for x in each('td').items()]     # 继续遍历该节点
    self.crawl(each.attr.href, callback=self.detail_page)

response.doc是对pyquery(一种类似于jquery的库)的封装。可以非常方便的抽取出信息。

使用正则,匹配特定的url

for each in response.doc('a[href^="http"]').items():
    if re.match("http://www.imdb.com/title/tt\d+/$", each.attr.href):
        self.crawl(each.attr.href, callback=self.detail_page)

json

如果是直接请求接口文件,返回的是json数据,那么,可以使用下面的方法来解析。

返回的是dict对象

response.json("字段")

re

PhantomJS

解析js渲染的页面。

class Handler(BaseHandler):
    def on_start(self):
        self.crawl('http://www.twitch.tv/directory/game/Dota%202',
                   fetch_type='js', callback=self.index_page)

    def index_page(self, response):
        return {
            "url": response.url,
            "channels": [{
                "title": x('.title').text(),
                "viewers": x('.info').contents()[2],
                "name": x('.info a').text(),
            } for x in response.doc('.stream.item').items()]
        }

使用js来模拟页面的滚动效果

class Handler(BaseHandler):
    def on_start(self):
        self.crawl('http://www.pinterest.com/categories/popular/',
                   fetch_type='js', js_script="""
                   function() {
                       window.scrollTo(0,document.body.scrollHeight);
                   }
                   """, callback=self.index_page)

    def index_page(self, response):
        return {
            "url": response.url,
            "images": [{
                "title": x('.richPinGridTitle').text(),
                "img": x('.pinImg').attr('src'),
                "author": x('.creditName').text(),
            } for x in response.doc('.item').items() if x('.pinImg')]
        }

多级页面

页面分类

按它的设计,任务会从上一级、往下一级流转,但是呢,有的时候,是否能设置一个智能的分类,决定网哪个层级跳转?

下面 这个例子,很好的说明了,页面,往哪个回调流转,即,不要惯性认为,从某一级页面必须要流转到下一级页面。

def index_page(self, response):
        for each in response.doc('a[href^="http"]').items():
            if re.match("http://www.imdb.com/title/tt\d+/$", each.attr.href):
                self.crawl(each.attr.href, callback=self.detail_page)   # 到下一级
        self.crawl(response.doc('#right a').attr.href, callback=self.index_page) # 继续

有一种场景:

url一般都是有一定的规则的,我们可以根据url的格式,来决定,往哪个页面回调来处理。相当于,用switch来当个智能中转。

有的时候,也可以通过多增加一层,避免在最后结果页面上,再发起爬取任务。比如,那种左边是目录页面,右遍是内容。(相当于,一个页面,经过两个任务处理。)。

变量传递

从上级页面,能否往下级页面传递一些变量?这些变量貌似还不能放到类里面。即,数据是从多个页面获取到的?

变量绑定在reponse对象的save属性上。具体用法如下:

@config(priority=3)        
    def index_page2(self, response):
        for each in response.doc('a[href^="http"]').items():
            if each.attr.href== 'http://www.miibeian.gov.cn/':
                continue
            if each.text().isdigit():
                continue
            save = {
                "sheng":response.save["sheng"],
                "shi":response.save["shi"],
                "xian":each.text()          
            }
            #print(save)
            self.crawl(each.attr.href, callback=self.index_page3,save =save)

重点:

# 访问
response.save["sheng"],
# 该接口,往下级页面传递
self.crawl(each.attr.href, callback=self.index_page3,save =save)

多返回值

  • 最后页面返回多条数据

最后的详情页面,默认的是return,能否yield?如果不能yield,那么能否设置一个空的下载任务,只负责解析?

官网有介绍:

@config(priority=5)
    def detail_page(self, response):
        # 错误示范,导致辞职循环的数据,默认都是最后1条。
        save = {
            "url": response.url,
            "title": response.doc('title').text(),
            "sheng":response.save["sheng"],
            "shi":response.save["shi"],
            "xian":response.save["xian"],
            "xiang":response.save["xiang"],
        }
        for i, each in enumerate(response.doc('tr.villagetr').items()):
            # 应该在此处,新建对象处理。
            cun_infos = [x.text() for x in each("td").items()]
            save["cun_bm"] = cun_infos[0]
            save["cun_type"] = cun_infos[1]
            save["cun_name"] = cun_infos[2]
            print(save)
            self.send_message(self.project_name, save,url="%s#%s" % (response.url, i))
            
    def on_message(self, project, msg):
        return msg

猜测:多返回值的时候,url应该是要重定义的,否则,保存的值,会被覆盖。

使用for循环调用self.send_message方法,然后定义on_message方法,貌似,不定义,虽然也能跑,但是无法展示出数据。

官网说:As resultdb de-duplicate results by taskid(url), the latest will overwrite previous results.One workaround is using send_message API to make a fake taskid for each result.

  • 多个页面,都返回数据

比如,有多个回调页面,然后呢,每个回调页面都

可用变量

self.project_name
self.project  #information about current project
self.response
self.task

写数据

写数据,可以自己实现on_result方法

二进制文件

比如要写二进制文件等等。直接写文件即可

response.content  # 返回的内容。直接写文件即可
on_result(self, result)

结构

结构图

pyspider

结构说明:

适合维护大量的抓取代码,即可以建不同的工程。每个工程的数据可能会更新,通过配置,页面能快速的更新。关于请求,可以是get、post等请求。

总体来说,它是一个工程化的东西。

每个部分都被称为它的一个组件。组件之间,通过消息队列来传递。并不是直接的调用。但是调度器,只有一个。

组件

调度器

通过self.crawl接口来调用。还实现了令牌桶算法。用来控制并发流量等等。

令牌桶算法介绍 具体参见后面:令牌桶算法

调度器,内部维护了一个任务队列。进入队列或者进入数据库中的任务,后续代码的更改,比如判断是否爬取这样的功能,后续都没有办法再控制了。

下载器

抓取页面,然后然处理器来处理。支持dataURI ,还可以使用Phantomjs

DataURI

处理器

负责处理数据。绑定了PyQuery工具,response.doc来调用。可以产出需要的数据,也可以产生新的任务,供下一轮处理。

Result Worker

存数据

WebUI

web的前端页面。

数据流

  • 点击开始时,运行脚本的on_start方法,该方法,会往调度器类产生任务。
  • 调度器,调度任务,on_start产生的任务,也会像正常的任务一样被调度。
  • 下载器,下载任务,并将结果返回给处理器。
  • 处理器,会处理数据,并跟据情况,产生新的任务,并通过self.crawl接口,发送给调度器,任务完成后,会发消息给调度器,告知消息已经完成了。有返回结果,则通过result_queue队列,返回。
  • 调度器,接收处理器发过来的新任务,判断,该任务是否需要重爬,如果需要,则按照优先级放到队列中。
  • 处理器,会不停的处理,……

任务

任务4种状态:成功、失败、激活中、bad(暂未用)

新任务:

exetime如果设置了,不到时间,不会运行 。

age或itag来判断是否是新任务

任务重试:

retry_delaycrawl_config同级。定义,任务失败后,何时会再次触发。

retry_delay = {
        0: 30,
        1: 1*60*60,
        2: 6*60*60,
        3: 12*60*60,
        '': 24*60*60
    }

默认,重试3次。

删除任务

如果发现运行的爬虫不合理,则可以更改规则,但是,已进入任务库的数据,则无法处理。故手动删库,如:

delete FROM "taskdb_taimeiti" where url not like "%tmtpost.com%" and process like '%index_page%'  ;

但是,调度器内的任务,暂时没有想到方法删除。(依然会被调度)

配置

配置可以囟到 crawl_config属性上,也可以,1、在定义回调函数的时候,2、使用注解来实现。3、还可以在self.crawl(each.attr.href, callback=self.index_page3,age=age)时,来定义。

class Handler(BaseHandler):
    crawl_config = {
        'headers': {
            'User-Agent': 'GoogleBot',
        }
    }

代理

class Handler(BaseHandler):
    crawl_config = {
        'proxy': 'localhost:8080'
    }

@every(minutes=24 * 60)

定义任务,多少小时,重新跑一遍。一般定义在 on_start,函数上。

@config(age=10 * 24 * 60 * 60)

定义页面多久重新,算过期。一个页面,可能已经爬取过了,下一次,再爬的时候,默认会忽略的。但是,过了age时间之后,才会重爬一次。

@config(priority=2)

定义优先级,优先爬取数值小的页面。

rate/brust

rate定义1秒,大概取几次结果。如定义为1,则,每秒1个任务。如果0.1,每10秒1个任务。

burst 搞不太明白,看了原理后,大概相当于令牌桶的容量上面是多少,多了就会丢弃。

任务处理

from pyspider.database import connect_database
resultdb = connect_database("<your resutldb connection url>")
for project in resultdb.projects:
    for result in resultdb.select(project):
        assert result['taskid']
        assert result['url']
        assert result['result']

其他

分布式

访问加密

如果服务部署在公网上,那么,还需要访问加密,并不能让人随便访问?简单的方式,增加一层nginx,然后让nginx来控制权限?或者,内置了安全访问配置如下:

启动如下:

pyspider  -c /opt/pyspider/data/config.json

配置的json

{
  "taskdb": "mysql+taskdb://username:password@host:port/taskdb",
  "projectdb": "mysql+projectdb://username:password@host:port/projectdb",
  "resultdb": "mysql+resultdb://username:password@host:port/resultdb",
  "message_queue": "amqp://username:password@host:port/%2F",
  "webui": {
    "username": "some_name",
    "password": "some_passwd",
    "need-auth": true
  }
}

结束后,触发

比如爬取结束后,想要触发消息。发钉钉通知等等。有些场景下,会触发。(一次性任务,任务结束。重复性任务,即every修饰的任务,每次结束都会触发。)

on_finished 

存储

api

self.crawl

至少是这样调用的。

self.crawl(each.attr.href, callback=self.detail_page)

# 多个url
self.crawl([x.attr.href for x in response.doc('#right a').items()], callback=self.index_page)

age

@config(age=10 * 24 * 60 * 60)
def index_page(self, response):

priority


self.crawl('http://www.example.org/233.html', callback=self.detail_page,
               priority=1)

exetime

任务不会立即执行,会在制定的exetime到了之后,才会执行。

import time
def on_start(self):
    self.crawl('http://www.example.org/', callback=self.callback,
               exetime=time.time()+30*60)

retries

默认重试次数为3次。

itag

def index_page(self, response):
    for item in response.doc('.item').items():
        self.crawl(item.find('a').attr.url, callback=self.detail_page,
                   itag=item.find('.update-time').text())

全部重爬一次。使用itag来标记一个版本。

class Handler(BaseHandler):
    crawl_config = {
        'itag': 'v223'
    }

auto_recrawl

默认false。开启的效果是:设置了age,在age到了之后,会自动的重爬页面。

def on_start(self):
    self.crawl('http://www.example.org/', callback=self.callback,
               age=5*60*60, auto_recrawl=True)

method

请求方式

params

请求参数,以下为两种请求效果一致。

def on_start(self):
    self.crawl('http://httpbin.org/get', callback=self.callback,
               params={'a': 123, 'b': 'c'})
    self.crawl('http://httpbin.org/get?a=123&b=c', callback=self.callback)

data

post的请求数据

def on_start(self):
    self.crawl('http://httpbin.org/post', callback=self.callback,
               method='POST', data={'a': 123, 'b': 'c'})

files

上传文件

{field: {filename: 'content'}}

user_agent

用户 User-Agent

headers

请求头。

self.crawl(url,cookies={"key": value})

connect_timeout

初始化链接超时。默认20秒。

timeout

请求页面的最大时间。最大为120秒。

allow_redirects

是否允许重定向。默认 true,允许。

fetch_type

设置为js,运行抓取运行js脚本

save

多级页面数据传递

def on_start(self):
    self.crawl('http://www.example.org/', callback=self.callback,
               save={'a': 123})

def callback(self, response):
    return response.save['a']

Response

Response对象的属性列表

Response.url

final URL.

Response.text

Content of response, in unicode.

if Response.encoding is None and chardet module is available, encoding of content will be guessed.

Response.content

Content of response, in bytes.

Response.doc

A PyQuery object of the response’s content. Links have made as absolute by default.

Refer to the documentation of PyQuery: https://pythonhosted.org/pyquery/

It’s important that I will repeat, refer to the documentation of PyQuery: https://pythonhosted.org/pyquery/

Response.etree

A lxml object of the response’s content.

Response.json

The JSON-encoded content of the response, if any.

Response.status_code

Response.orig_url

If there is any redirection during the request, here is the url you just submit via self.crawl.

Response.headers

A case insensitive dict holds the headers of response.

Response.cookies

Response.error

Messages when fetch error

Response.time

Time used during fetching.

Response.ok

True if status_code is 200 and no error.

Response.encoding

Encoding of Response.content.

If Response.encoding is None, encoding will be guessed by header or content or chardet(if available).

Set encoding of content manually will overwrite the guessed encoding.

Response.save

The object saved by self.crawl API

Response.js_script_result

content returned by JS script

Response.raise_for_status()

Raise HTTPError if status code is not 200 or Response.error exists.

@catch_status_code_error

正常是只处响应为200的请求,加上该装饰后,可以处理其他的请求。

@catch_status_code_error  
def callback(self, response):
    # 即使返回404,也会处理数据

原理篇

webui

webui整体架构,使用了flask+jinjia2模板引擎,负责界面操作,具体的数据的crud入库,具体需要控制任务的,实际上就通过rpc交给调度器来操作。

令牌桶算法

算法可以从网上找到描述,但是,这里面实现的非常的简单。具体代码如下:

import time
try:
    import threading as _threading
except ImportError:
    import dummy_threading as _threading


class Bucket(object):

    '''
    traffic flow control with token bucket
    '''

    update_interval = 30

    def __init__(self, rate=1, burst=None):
        self.rate = float(rate)
        if burst is None:
            self.burst = float(rate) * 10
        else:
            self.burst = float(burst)
        self.mutex = _threading.Lock()
        self.bucket = self.burst
        self.last_update = time.time()

    def get(self):
        '''Get the number of tokens in bucket'''
        now = time.time()
        if self.bucket >= self.burst:
            self.last_update = now
            return self.bucket
        bucket = self.rate * (now - self.last_update)
        self.mutex.acquire()
        if bucket > 1:
            self.bucket += bucket
            if self.bucket > self.burst:
                self.bucket = self.burst
            self.last_update = now
        self.mutex.release()
        return self.bucket

    def set(self, value):
        '''Set number of tokens in bucket'''
        self.bucket = value

    def desc(self, value=1):
        '''Use value tokens'''
        self.bucket -= value

使用,使用时,get先拿到token,然后,确定有,然后才会去消费desc

在任务队列中,如下使用。

class TaskQueue(object):
    def get(self):
        '''Get a task from queue when bucket available'''
        if self.bucket.get() < 1:
            return None
        now = time.time()
        self.mutex.acquire()
        try:
            task = self.priority_queue.get_nowait()
            self.bucket.desc()
        except Queue.Empty:
            self.mutex.release()
            return None
        task.exetime = now + self.processing_timeout
        self.processing.put(task)
        self.mutex.release()
        return task.taskid

six

以前见过这个库,在这个框架中,依然有six库的身影。那么,它是做什么的呢?

网上查看了下,原来是封装python2、python3差异的库。soga

实践篇

常用函数

# 层层传递
save = {
    "from_url":response.save["from_url"] if response.save else "" 
}
self.crawl(each.attr.href, callback=self.index_page,save = save)

# 函数判断
if each.attr.href.startswith('http://www.qishuxx.com/txt/') and each.attr.href.endswith('.html'):

# 正则
re.match('https?://www\.starurl\.com/video/\d+.html',url)
# 正则
re.match('https?://(www\.)?starurl\.com/video/\d+.html',url)
#域名判断
re.match('https?://(.*?\.)?starturl\.com/',url)

# 域名判断
url[:40].find('tmtpost.com')>0 :

# 待做,域名解析、host解析,路由解析等等功能。

待做,save用一个函数生成。

自定义函数

url = 'starturl.com23:8000/jj'

def parse_url(url):
    if url.lower().startswith('http://'):
        url = url[7:]
        protocol = "http"
    elif url.lower().startswith('https://'):
        url = url[8:]
        protocol = "https"
    else:
        protocol = "http"
    pos = url.find('/')
    
    if pos == -1:
        host = url
        queryall = ''
    else:
        host = url[:pos]
        queryall = url[pos:]
    
    pos = host.rfind(':')
    if pos > -1:
        hostname = host[:pos]
        port = host[pos+1:]
    else:
        hostname = host
        port = 80
    pos = queryall.find('?')
    
    if pos > -1:
        pathname = queryall[:pos]
        params = queryall[pos:]
    else:
        pathname = queryall[:]
        params = ''
    pos = queryall.rfind('#')
    if pos > -1:
        anchor = queryall[pos:]
    else:
        anchor =''
    
    return {
        "hostname":hostname,
        "host":host,
        "port":port,
        "protocol":protocol,
        "pathname":pathname,
        "params":params,
        "anchor":anchor
    }

抓取常用的方式

nth-child

对于无法区分的

"url_rar": response.doc('li:nth-child(1)>.downButton').attr.href,
"url_txt": response.doc('li:nth-child(2)>.downButton').attr.href,
"book_info": [x.text() for x in response.doc('li.small').items() ],

response.doc('li>em[class^="lstar"]').attr['class'],
posts_buoy = response.doc('.posts_buoy').prev()
        video = ""
        for i in range(10):
            if posts_buoy.text():
                video = posts_buoy.text()
                break
            posts_buoy = posts_buoy.prev()

智能回调

在index_page层放一个if elif elif判断,用来判断,是否继续爬,或者定义一下爬取的策略,如age等等。但是,each.attr.href的链接,其实都是经过处理过的,默认都是以http开头,但是呢,也有外链,所以呢,需要根据情况来判断。避免爬到其他网。

需要注意的是,结果提取,也有两个回调,既针对了不同的页面,编写不同的提取规则。

class Handler(BaseHandler):
    crawl_config = {
    }
    
    @every(minutes=24 * 60)
    def on_start(self):
        self.crawl('https://www.starurl.com/', callback=self.index_page)

    @config(age=10 * 24 * 60 * 60)
    def index_page(self, response):    
        for each in response.doc('a[href^="http"]').items():
            url = each.attr.href
            save = {
                "from_url":response.save["from_url"] if response.save else "" 
            }
            if re.match('https?://www\.starurl\.com/\d+.html',url):
                self.crawl(each.attr.href, callback=self.detail_page,save = save)
            elif re.match('https?://www\.starurl\.com/video/\d+.html',url):
                self.crawl(each.attr.href, callback=self.video_page,save = save)
            else:
                self.crawl(each.attr.href, callback=self.index_page,save = {"from_url":url})

    @config(priority=2)
    def detail_page(self, response):
        return {
            "url": response.url,
            "title": response.doc('title').text(),
        }
    @config(priority=5)
    def video_page(self, response):
        return {
            "url": response.url,
            "title": response.doc('title').text(),
        }

精确分析任务链接

上面直接拿到所有的链接,然后进行爬取,太宽泛了。有的时候,需要精确的从指定的url分析出url。

def index_page(self, response):
        for each in response.doc('.module-categories > .module-content > ul > li [href^="http"]').items():

多返回值

比如爬取的各层级的页面,都有要提取的信息,如果一般是使用response.save来传递,体现在最终的结果上,但是呢,也可以,不保存,因为这些数据,都放到task表中的,以后手动的从数据中库拿到这些数据。

def detail_page(self, response):
    for i, each in enumerate(response.doc('tr.villagetr').items()):
        save = {
            "url": response.url,
        }
        cun_infos = [x.text() for x in each("td").items()]
        save["cun_name"] = cun_infos[2]
        self.send_message(self.project_name, save,url="%s#%s" % (response.url, i))
           
def on_message(self, project, msg):
    return msg