百摩网
当前位置: 首页 生活百科

python的爬虫之路(一个使用asyncio协程的网络爬虫)

时间:2023-05-28 作者: 小编 阅读量: 2 栏目名: 生活百科

当并发很大时,可能会导致性能下降,所以我们会限制并发的数量,在队列保留那些未处理的链接,直到一些正在执行的任务完成。异步框架使用非阻塞套接字。在远古时代,BSDUnix的解决方法是select,这是一个C函数,它在一个或一组非阻塞套接字上等待事件发生。要注册一个网络I/O事件的提醒,我们会创建一个非阻塞套接字,并使用默认selector注册它。

首先,我们会实现一个事件循环并用这个事件循环和回调来勾画出一只网络爬虫。它很有效,但是当把它扩展成更复杂的问题时,就会导致无法管理的混乱代码。 -- A. Jesse Jiryu Davis , Guido van Rossum

本文导航
编译自: http://aosabook.org/en/500L/pages/a-web-crawler-with-asyncio-coroutines.html

作者: A. Jesse Jiryu Davis , Guido van Rossum

译者: qingyunha

本文作者:

A. Jesse Jiryu Davis 是纽约 MongoDB 的工程师。他编写了异步 MongoDB Python 驱动程序 Motor,也是 MongoDB C 驱动程序的开发领袖和 PyMongo 团队成员。 他也为 asyncio 和 Tornado 做了贡献,在 http://emptysqua.re 上写作。

Guido van Rossum 是主流编程语言 Python 的创造者,Python 社区称他为 BDFL (仁慈的终生大独裁者Benevolent Dictator For Life)——这是一个来自 Monty Python 短剧的称号。他的主页是 http://www.python.org/~guido/ 。

介绍

经典的计算机科学强调高效的算法,尽可能快地完成计算。但是很多网络程序的时间并不是消耗在计算上,而是在等待许多慢速的连接或者低频事件的发生。这些程序暴露出一个新的挑战:如何高效的等待大量网络事件。一个现代的解决方案是异步 I/O。

这一章我们将实现一个简单的网络爬虫。这个爬虫只是一个原型式的异步应用,因为它等待许多响应而只做少量的计算。一次爬的网页越多,它就能越快的完成任务。如果它为每个动态的请求启动一个线程的话,随着并发请求数量的增加,它会在耗尽套接字之前,耗尽内存或者线程相关的资源。使用异步 I/O 可以避免这个的问题。

我们将分三个阶段展示这个例子。首先,我们会实现一个事件循环并用这个事件循环和回调来勾画出一只网络爬虫。它很有效,但是当把它扩展成更复杂的问题时,就会导致无法管理的混乱代码。然后,由于 Python 的协程不仅有效而且可扩展,我们将用 Python 的生成器函数实现一个简单的协程。在最后一个阶段,我们将使用 Python 标准库“asyncio”中功能完整的协程, 并通过异步队列完成这个网络爬虫。(在 PyCon 2013[1] 上,Guido 介绍了标准的 asyncio 库,当时称之为“Tulip”。)

任务

网络爬虫寻找并下载一个网站上的所有网页,也许还会把它们存档,为它们建立索引。从根 URL 开始,它获取每个网页,解析出没有遇到过的链接加到队列中。当网页没有未见到过的链接并且队列为空时,它便停止运行。

我们可以通过同时下载大量的网页来加快这一过程。当爬虫发现新的链接,它使用一个新的套接字并行的处理这个新链接,解析响应,添加新链接到队列。当并发很大时,可能会导致性能下降,所以我们会限制并发的数量,在队列保留那些未处理的链接,直到一些正在执行的任务完成。

传统方式

怎么使一个爬虫并发?传统的做法是创建一个线程池,每个线程使用一个套接字在一段时间内负责一个网页的下载。比如,下载 xkcd.com 网站的一个网页:

def fetch(url):

sock = socket.socket()

sock.connect(('xkcd.com', 80))

request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)

sock.send(request.encode('ascii'))

response = b''

chunk = sock.recv(4096)

while chunk:

response= chunk

chunk = sock.recv(4096)

# Page is now downloaded.

links = parse_links(response)

q.add(links)

套接字操作默认是阻塞的:当一个线程调用一个类似 connect 和 recv 方法时,它会阻塞,直到操作完成。(即使是 send 也能被阻塞,比如接收端在接受外发消息时缓慢而系统的外发数据缓存已经满了的情况下)因此,为了同一时间内下载多个网页,我们需要很多线程。一个复杂的应用会通过线程池保持空闲的线程来分摊创建线程的开销。同样的做法也适用于套接字,使用连接池。

到目前为止,使用线程的是成本昂贵的,操作系统对一个进程、一个用户、一台机器能使用线程做了不同的硬性限制。在 作者 Jesse 的系统中,一个 Python 线程需要 50K 的内存,开启上万个线程就会失败。每个线程的开销和系统的限制就是这种方式的瓶颈所在。

在 Dan Kegel 那一篇很有影响力的文章“The C10K problem[2]”中,它提出了多线程方式在 I/O 并发上的局限性。他在开始写道,

网络服务器到了要同时处理成千上万的客户的时代了,你不这样认为么?毕竟,现在网络规模很大了。

Kegel 在 1999 年创造出“C10K”这个术语。一万个连接在今天看来还是可接受的,但是问题依然存在,只不过大小不同。回到那时候,对于 C10K 问题,每个连接启一个线程是不切实际的。现在这个限制已经成指数级增长。确实,我们的玩具网络爬虫使用线程也可以工作的很好。但是,对于有着千万级连接的大规模应用来说,限制依然存在:它会消耗掉所有线程,即使套接字还够用。那么我们该如何解决这个问题?

异步

异步 I/O 框架在一个线程中完成并发操作。让我们看看这是怎么做到的。

异步框架使用非阻塞套接字。异步爬虫中,我们在发起到服务器的连接前把套接字设为非阻塞:

sock = socket.socket()

sock.setblocking(False)

try:

sock.connect(('xkcd.com', 80))

except BlockingIOError:

pass

对一个非阻塞套接字调用 connect 方法会立即抛出异常,即使它可以正常工作。这个异常复现了底层 C 语言函数令人厌烦的行为,它把 errno 设置为 EINPROGRESS,告诉你操作已经开始。

现在我们的爬虫需要一种知道连接何时建立的方法,这样它才能发送 HTTP 请求。我们可以简单地使用循环来重试:

request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)

encoded = request.encode('ascii')

while True:

try:

sock.send(encoded)

break # Done.

except OSError as e:

pass

print('sent')

这种方法不仅消耗 CPU,也不能有效的等待多个套接字。在远古时代,BSD Unix 的解决方法是 select,这是一个 C 函数,它在一个或一组非阻塞套接字上等待事件发生。现在,互联网应用大量连接的需求,导致 select 被 poll 所代替,在 BSD 上的实现是 kqueue ,在 Linux 上是 epoll。它们的 API 和 select 相似,但在大数量的连接中也能有较好的性能。

Python 3.4 的 DefaultSelector 会使用你系统上最好的 select 类函数。要注册一个网络 I/O 事件的提醒,我们会创建一个非阻塞套接字,并使用默认 selector 注册它。

from selectors import DefaultSelector, EVENT_WRITE

selector = DefaultSelector()

sock = socket.socket()

sock.setblocking(False)

try:

sock.connect(('xkcd.com', 80))

except BlockingIOError:

pass

def connected():

selector.unregister(sock.Fileno())

print('connected!')

selector.register(sock.fileno(), EVENT_WRITE, connected)

我们不理会这个伪造的错误,调用 selector.register,传递套接字文件描述符和一个表示我们想要监听什么事件的常量表达式。为了当连接建立时收到提醒,我们使用 EVENT_WRITE :它表示什么时候这个套接字可写。我们还传递了一个 Python 函数 connected,当对应事件发生时被调用。这样的函数被称为回调。

在一个循环中,selector 接收到 I/O 提醒时我们处理它们。

def loop():

while True:

events = selector.select()

for event_key, event_mask in events:

callback = event_key.data

callback()

connected 回调函数被保存在 event_key.data 中,一旦这个非阻塞套接字建立连接,它就会被取出来执行。

不像我们前面那个快速轮转的循环,这里的 select 调用会暂停,等待下一个 I/O 事件,接着执行等待这些事件的回调函数。没有完成的操作会保持挂起,直到进到下一个事件循环时执行。

到目前为止我们展现了什么?我们展示了如何开始一个 I/O 操作和当操作准备好时调用回调函数。异步框架,它在单线程中执行并发操作,其建立在两个功能之上,非阻塞套接字和事件循环。

我们这里达成了“并发性concurrency”,但不是传统意义上的“并行性parallelism”。也就是说,我们构建了一个可以进行重叠 I/O 的微小系统,它可以在其它操作还在进行的时候就开始一个新的操作。它实际上并没有利用多核来并行执行计算。这个系统是用于解决I/O 密集I/O-bound问题的,而不是解决 CPU 密集CPU-bound问题的。(Python 的全局解释器锁禁止在一个进程中以任何方式并行执行 Python 代码。在 Python 中并行化 CPU 密集的算法需要多个进程,或者以将该代码移植为 C 语言并行版本。但是这是另外一个话题了。)

所以,我们的事件循环在并发 I/O 上是有效的,因为它并不用为每个连接拨付线程资源。但是在我们开始前,我们需要澄清一个常见的误解:异步比多线程快。通常并不是这样的,事实上,在 Python 中,在处理少量非常活跃的连接时,像我们这样的事件循环是慢于多线程的。在运行时环境中是没有全局解释器锁的,在同样的负载下线程会执行的更好。异步 I/O 真正适用于事件很少、有许多缓慢或睡眠的连接的应用程序。(Jesse 在“什么是异步,它如何工作,什么时候该用它?[3]”一文中指出了异步所适用和不适用的场景。Mike Bayer 在“异步 Python 和数据库[4]”一文中比较了不同负载情况下异步 I/O 和多线程的不同。)

回调

用我们刚刚建立的异步框架,怎么才能完成一个网络爬虫?即使是一个简单的网页下载程序也是很难写的。

首先,我们有一个尚未获取的 URL 集合,和一个已经解析过的 URL 集合。

urls_todo = set(['/'])

seen_urls = set(['/'])

seen_urls 集合包括 urls_todo 和已经完成的 URL。用根 URL / 初始化它们。

获取一个网页需要一系列的回调。在套接字连接建立时会触发 connected 回调,它向服务器发送一个 GET 请求。但是它要等待响应,所以我们需要注册另一个回调函数;当该回调被调用,它仍然不能读取到完整的请求时,就会再一次注册回调,如此反复。

让我们把这些回调放在一个 Fetcher 对象中,它需要一个 URL,一个套接字,还需要一个地方保存返回的字节:

class Fetcher:

def __init__(self, url):

self.response = b'' # Empty array of bytes.

self.url = url

self.sock = None

我们的入口点在 Fetcher.fetch:

# Method on Fetcher class.

def fetch(self):

self.sock = socket.socket()

self.sock.setblocking(False)

try:

self.sock.connect(('xkcd.com', 80))

except BlockingIOError:

pass

# Register next callback.

selector.register(self.sock.fileno(),

EVENT_WRITE,

self.connected)

fetch 方法从连接一个套接字开始。但是要注意这个方法在连接建立前就返回了。它必须将控制返回到事件循环中等待连接建立。为了理解为什么要这样做,假设我们程序的整体结构如下:

# Begin fetching http://xkcd.com/353/

fetcher = Fetcher('/353/')

fetcher.fetch()

while True:

events = selector.select()

for event_key, event_mask in events:

callback = event_key.data

callback(event_key, event_mask)

当调用 select 函数后,所有的事件提醒才会在事件循环中处理,所以 fetch 必须把控制权交给事件循环,这样我们的程序才能知道什么时候连接已建立,接着循环调用 connected 回调,它已经在上面的 fetch 方法中注册过。

这里是我们的 connected 方法的实现:

# Method on Fetcher class.

def connected(self, key, mask):

print('connected!')

selector.unregister(key.fd)

request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(self.url)

self.sock.send(request.encode('ascii'))

# Register the next callback.

selector.register(key.fd,

EVENT_READ,

self.read_response)

这个方法发送一个 GET 请求。一个真正的应用会检查 send 的返回值,以防所有的信息没能一次发送出去。但是我们的请求很小,应用也不复杂。它只是简单的调用 send,然后等待响应。当然,它必须注册另一个回调并把控制权交给事件循环。接下来也是最后一个回调函数 read_response,它处理服务器的响应:

# Method on Fetcher class.

def read_response(self, key, mask):

global stopped

chunk = self.sock.recv(4096) # 4k chunk size.

if chunk:

self.response= chunk

else:

selector.unregister(key.fd) # Done reading.

links = self.parse_links()

# Python set-logic:

for link in links.difference(seen_urls):

urls_todo.add(link)

Fetcher(link).fetch() # <- New Fetcher.

seen_urls.update(links)

urls_todo.remove(self.url)

if not urls_todo:

stopped = True

这个回调在每次 selector 发现套接字可读时被调用,可读有两种情况:套接字接受到数据或它被关闭。

这个回调函数从套接字读取 4K 数据。如果不到 4k,那么有多少读多少。如果比 4K 多,chunk 中只包 4K 数据并且这个套接字保持可读,这样在事件循环的下一个周期,会再次回到这个回调函数。当响应完成时,服务器关闭这个套接字,chunk 为空。

这里没有展示的 parse_links 方法,它返回一个 URL 集合。我们为每个新的 URL 启动一个 fetcher。注意一个使用异步回调方式编程的好处:我们不需要为共享数据加锁,比如我们往 seen_urls 增加新链接时。这是一种非抢占式的多任务,它不会在我们代码中的任意一个地方被打断。

我们增加了一个全局变量 stopped,用它来控制这个循环:

stopped = False

def loop():

while not stopped:

events = selector.select()

for event_key, event_mask in events:

callback = event_key.data

callback()

一旦所有的网页被下载下来,fetcher 停止这个事件循环,程序退出。

这个例子让异步编程的一个问题明显的暴露出来:意大利面代码。

我们需要某种方式来表达一系列的计算和 I/O 操作,并且能够调度多个这样的系列操作让它们并发的执行。但是,没有线程你不能把这一系列操作写在一个函数中:当函数开始一个 I/O 操作,它明确的把未来所需的状态保存下来,然后返回。你需要考虑如何写这个状态保存的代码。

让我们来解释下这到底是什么意思。先来看一下在线程中使用通常的阻塞套接字来获取一个网页时是多么简单。

# Blocking version.

def fetch(url):

sock = socket.socket()

sock.connect(('xkcd.com', 80))

request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)

sock.send(request.encode('ascii'))

response = b''

chunk = sock.recv(4096)

while chunk:

response= chunk

chunk = sock.recv(4096)

# Page is now downloaded.

links = parse_links(response)

q.add(links)

在一个套接字操作和下一个操作之间这个函数到底记住了什么状态?它有一个套接字,一个 URL 和一个可增长的 response。运行在线程中的函数使用编程语言的基本功能来在栈中的局部变量保存这些临时状态。这样的函数也有一个“continuation”——它会在 I/O 结束后执行这些代码。运行时环境通过线程的指令指针来记住这个 continuation。你不必考虑怎么在 I/O 操作后恢复局部变量和这个 continuation。语言本身的特性帮你解决。

但是用一个基于回调的异步框架时,这些语言特性不能提供一点帮助。当等待 I/O 操作时,一个函数必须明确的保存它的状态,因为它会在 I/O 操作完成之前返回并清除栈帧。在我们基于回调的例子中,作为局部变量的替代,我们把 sock 和 response 作为 Fetcher 实例 self 的属性来存储。而作为指令指针的替代,它通过注册 connected 和 read_response 回调来保存它的 continuation。随着应用功能的增长,我们需要手动保存的回调的复杂性也会增加。如此繁复的记账式工作会让编码者感到头痛。

更糟糕的是,当我们的回调函数抛出异常会发生什么?假设我们没有写好 parse_links 方法,它在解析 HTML 时抛出异常:

Traceback (most recent call last):

File "loop-with-callbacks.py", line 111, in <module>

loop()

File "loop-with-callbacks.py", line 106, in loop

callback(event_key, event_mask)

File "loop-with-callbacks.py", line 51, in read_response

links = self.parse_links()

File "loop-with-callbacks.py", line 67, in parse_links

raise Exception('parse error')

Exception: parse error

这个堆栈回溯只能显示出事件循环调用了一个回调。我们不知道是什么导致了这个错误。这条链的两边都被破坏:不知道从哪来也不知到哪去。这种丢失上下文的现象被称为“堆栈撕裂stack ripping”,经常会导致无法分析原因。它还会阻止我们为回调链设置异常处理,即那种用“try / except”块封装函数调用及其调用树。(对于这个问题的更复杂的解决方案,参见 http://www.tornadoweb.org/en/stable/stack_context.html)

所以,除了关于多线程和异步哪个更高效的长期争议之外,还有一个关于这两者之间的争论:谁更容易跪了。如果在同步上出现失误,线程更容易出现数据竞争的问题,而回调因为"堆栈撕裂stack ripping"问题而非常难于调试。

(题图素材来自:ruth-tay.deviantart.com[5])


via: http://aosabook.org/en/500L/pages/a-web-crawler-with-asyncio-coroutines.html

作者:A. Jesse Jiryu Davis , Guido van Rossum 译者:qingyunha 校对:wxy

本文由 LCTT[6] 原创翻译,Linux中国 荣誉推出

    推荐阅读
  • 鲽鱼头的正宗做法(教你鲽鱼头的正宗做法)

    鲽鱼头的正宗做法食材:碟鱼头2个,葱适量,姜适量,蒜适量,植物油适量,盐适量,蒜蓉辣酱适量,鸡汤适量,白糖适量,鸡粉适量,花椒油适量。把碟鱼头洗干净,从后脑勺下刀,把碟鱼头劈开,下巴相连。把清洗好的碟鱼头放油锅中中。浸炸,记住,油温不要太高。加入适量的鸡汤,下鱼头,加少许白糖和鸡粉,放中火上烧开,然后小火煨炖。把收好汁的鱼头淋上少许的花椒油装在铁板上的锡纸里就可以了。

  • 宫灯长寿花花语是什么(给大家介绍一下)

    宫灯长寿花花语是什么健康长寿:宫灯长寿花的花形很奇特,样子很像是宫灯,看起来好像是一个个小灯笼,它的花语是健康长寿,寓意着能给人们带来健康,使人的寿命延长。趁着春季期间,可以在家中养一盆,能衬托福寿吉庆的好兆头。

  • 高考出成绩后应该做什么(高考出成绩后的做法)

    下面内容希望能帮助到你,我们来一起看看吧!高考出成绩后应该做什么了解高考录取批次分数线。查看整体排名情况。在查看完分数之后,也要看看自己的整体排名情况,这样可以预估接下来的资源该怎么填写。你可以根据自己的分数线,来选择能够上的院校,这样省的在填写自愿的时候比较节省时间。了解相关学校招生计划。为了避免掉档,这样可以先看一下相关学校的招生计划,然后再根据自己的分数评估填写自愿。考虑想学的专业。

  • 电动车闯红灯罚款在哪里交钱(电动车闯红灯罚款怎么交)

    自收到罚单后15日内,必须完成罚款的缴纳,若是逾期未交罚款,从16日开始以3计算滞纳金,但根据规定,最终的滞纳金不会超过罚金总额,比如,罚款200元,逾期未交后的滞纳金总额≤200元。本法另有规定的,依照规定处罚。

  • 女人吃韭菜作用与功效(盘点吃韭菜的好处)

    以下内容希望对你有帮助!女人吃韭菜作用与功效通便,韭菜含有大量维生素和粗纤维,能增进胃肠蠕动,治疗便秘,预防肠癌。助性,事实上适当的吃些韭菜也有滋阴补肾之功效,女人适当的吃些韭菜有利于提高性欲。散瘀活血,韭菜有散瘀、活血、解毒的功效,有益于人体降低血脂,防治冠心病、贫血、动脉硬化。杀菌消炎,韭菜所含的硫化合物有一定杀菌消炎的作用,可抑制绿脓杆菌、痢疾、伤寒、大肠杆菌和金黄色葡萄菌。

  • 粉底沾在衣服上怎么洗(粉底沾衣服清洗方法介绍)

    粉底沾在衣服上怎么洗可以用卸妆棉或者湿巾沾适量的卸妆水,然后擦拭脏污的地方,因为粉底液本身就是彩妆,所用卸妆水是可以有效的清理干净的,也不会留痕迹。清洗干净后可以把衣服晾在阴凉通风的位置,避免放在阳光直射的地方暴晒,在阳光直射的地方暴晒,很容易造成衣服脱色的情况,影响衣服的穿着效果。涂上清洁液之后不要着急清洗,要让衣物静置十分钟左右再清洗,这样可以让去污效果变得更好。

  • 网红王莎莎个人资料(童星王莎莎走到今天)

    试镜后,尚敬当场便拿出了合同。2006年,《武林外传》正式播出。共同成就了《武林外传》幽默搞笑的江湖地位,成为一代人永远的回忆。2006年,王莎莎趁热打铁,主演了《十三岁女孩》,获得了第十四届大学生电影节的入围影片。032018年12月,王莎莎参加了《演员的诞生》,再现了《武林外传》的经典情景。所以在2020年,王莎莎开始挑战自己,她与张耀主演的爱情轻喜剧《爱上邻家主厨》,饰演女主许开开。

  • 荷兰风车最初是用来干什么的(荷兰风车有什么作用)

    荷兰风车最初是用来干什么的荷兰风车的作用是用来碾谷物、粗盐、烟叶、榨油,压滚毛呢、毛毡、造纸,以及排除沼泽地的积水。对于荷兰风车来说,最大的有好几层楼高,风翼长达20米。荷兰的风车,最早从德国引进。到了十六、七世纪,风车对荷兰的经济有着特别重大的意义。随着荷兰人民围海造陆工程的大规模开展,风车在这项艰巨的工程中发挥了巨大的作用。这种风车,被称为荷兰式风车。

  • 进来越南没有钱只有微信有钱怎么办(一看你就知道)

    下面希望有你要的答案,我们一起来看看吧!进来越南没有钱只有微信有钱怎么办可以去中国银行兑换,但是可能要收取一定手续费。可以去越南当地的大银行进行兑换。可以去直接去银行ATM机上面进行取款,带有这两种标识Cirrus、Plus,银联的都是可以的。可以去机场或者海关进行货币兑换。

  • 什么发色显皮肤白(显皮肤白的发色有哪些)

    什么发色显皮肤白红棕色头发;在众多发色中,红棕色性感出位,也很百搭,而且能让发色看起来莹润有分量。红棕色稳重而不失时尚,当大气的棕色邂逅热情的红色,交融后产生的暖棕色使发质看起来莹润饱满,非常衬托肤色白皙的人。与淡淡的亚麻色、金黄色相比,红棕色更显华丽低调美;与魅惑的霓虹色、亮蓝色等高饱和色相比,红棕色不突兀,而且与整体造型百搭。红棕色是最能展现女人味的颜色之一,不轻浮,有种耐人寻味的美。