百摩网
当前位置: 首页 生活百科

python的爬虫之路(一个使用asyncio协程的网络爬虫)

时间:2023-05-28 作者: 小编 阅读量: 4 栏目名: 生活百科

当并发很大时,可能会导致性能下降,所以我们会限制并发的数量,在队列保留那些未处理的链接,直到一些正在执行的任务完成。异步框架使用非阻塞套接字。在远古时代,BSDUnix的解决方法是select,这是一个C函数,它在一个或一组非阻塞套接字上等待事件发生。要注册一个网络I/O事件的提醒,我们会创建一个非阻塞套接字,并使用默认selector注册它。

首先,我们会实现一个事件循环并用这个事件循环和回调来勾画出一只网络爬虫。它很有效,但是当把它扩展成更复杂的问题时,就会导致无法管理的混乱代码。 -- A. Jesse Jiryu Davis , Guido van Rossum

本文导航
编译自: http://aosabook.org/en/500L/pages/a-web-crawler-with-asyncio-coroutines.html

作者: A. Jesse Jiryu Davis , Guido van Rossum

译者: qingyunha

本文作者:

A. Jesse Jiryu Davis 是纽约 MongoDB 的工程师。他编写了异步 MongoDB Python 驱动程序 Motor,也是 MongoDB C 驱动程序的开发领袖和 PyMongo 团队成员。 他也为 asyncio 和 Tornado 做了贡献,在 http://emptysqua.re 上写作。

Guido van Rossum 是主流编程语言 Python 的创造者,Python 社区称他为 BDFL (仁慈的终生大独裁者Benevolent Dictator For Life)——这是一个来自 Monty Python 短剧的称号。他的主页是 http://www.python.org/~guido/ 。

介绍

经典的计算机科学强调高效的算法,尽可能快地完成计算。但是很多网络程序的时间并不是消耗在计算上,而是在等待许多慢速的连接或者低频事件的发生。这些程序暴露出一个新的挑战:如何高效的等待大量网络事件。一个现代的解决方案是异步 I/O。

这一章我们将实现一个简单的网络爬虫。这个爬虫只是一个原型式的异步应用,因为它等待许多响应而只做少量的计算。一次爬的网页越多,它就能越快的完成任务。如果它为每个动态的请求启动一个线程的话,随着并发请求数量的增加,它会在耗尽套接字之前,耗尽内存或者线程相关的资源。使用异步 I/O 可以避免这个的问题。

我们将分三个阶段展示这个例子。首先,我们会实现一个事件循环并用这个事件循环和回调来勾画出一只网络爬虫。它很有效,但是当把它扩展成更复杂的问题时,就会导致无法管理的混乱代码。然后,由于 Python 的协程不仅有效而且可扩展,我们将用 Python 的生成器函数实现一个简单的协程。在最后一个阶段,我们将使用 Python 标准库“asyncio”中功能完整的协程, 并通过异步队列完成这个网络爬虫。(在 PyCon 2013[1] 上,Guido 介绍了标准的 asyncio 库,当时称之为“Tulip”。)

任务

网络爬虫寻找并下载一个网站上的所有网页,也许还会把它们存档,为它们建立索引。从根 URL 开始,它获取每个网页,解析出没有遇到过的链接加到队列中。当网页没有未见到过的链接并且队列为空时,它便停止运行。

我们可以通过同时下载大量的网页来加快这一过程。当爬虫发现新的链接,它使用一个新的套接字并行的处理这个新链接,解析响应,添加新链接到队列。当并发很大时,可能会导致性能下降,所以我们会限制并发的数量,在队列保留那些未处理的链接,直到一些正在执行的任务完成。

传统方式

怎么使一个爬虫并发?传统的做法是创建一个线程池,每个线程使用一个套接字在一段时间内负责一个网页的下载。比如,下载 xkcd.com 网站的一个网页:

def fetch(url):

sock = socket.socket()

sock.connect(('xkcd.com', 80))

request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)

sock.send(request.encode('ascii'))

response = b''

chunk = sock.recv(4096)

while chunk:

response= chunk

chunk = sock.recv(4096)

# Page is now downloaded.

links = parse_links(response)

q.add(links)

套接字操作默认是阻塞的:当一个线程调用一个类似 connect 和 recv 方法时,它会阻塞,直到操作完成。(即使是 send 也能被阻塞,比如接收端在接受外发消息时缓慢而系统的外发数据缓存已经满了的情况下)因此,为了同一时间内下载多个网页,我们需要很多线程。一个复杂的应用会通过线程池保持空闲的线程来分摊创建线程的开销。同样的做法也适用于套接字,使用连接池。

到目前为止,使用线程的是成本昂贵的,操作系统对一个进程、一个用户、一台机器能使用线程做了不同的硬性限制。在 作者 Jesse 的系统中,一个 Python 线程需要 50K 的内存,开启上万个线程就会失败。每个线程的开销和系统的限制就是这种方式的瓶颈所在。

在 Dan Kegel 那一篇很有影响力的文章“The C10K problem[2]”中,它提出了多线程方式在 I/O 并发上的局限性。他在开始写道,

网络服务器到了要同时处理成千上万的客户的时代了,你不这样认为么?毕竟,现在网络规模很大了。

Kegel 在 1999 年创造出“C10K”这个术语。一万个连接在今天看来还是可接受的,但是问题依然存在,只不过大小不同。回到那时候,对于 C10K 问题,每个连接启一个线程是不切实际的。现在这个限制已经成指数级增长。确实,我们的玩具网络爬虫使用线程也可以工作的很好。但是,对于有着千万级连接的大规模应用来说,限制依然存在:它会消耗掉所有线程,即使套接字还够用。那么我们该如何解决这个问题?

异步

异步 I/O 框架在一个线程中完成并发操作。让我们看看这是怎么做到的。

异步框架使用非阻塞套接字。异步爬虫中,我们在发起到服务器的连接前把套接字设为非阻塞:

sock = socket.socket()

sock.setblocking(False)

try:

sock.connect(('xkcd.com', 80))

except BlockingIOError:

pass

对一个非阻塞套接字调用 connect 方法会立即抛出异常,即使它可以正常工作。这个异常复现了底层 C 语言函数令人厌烦的行为,它把 errno 设置为 EINPROGRESS,告诉你操作已经开始。

现在我们的爬虫需要一种知道连接何时建立的方法,这样它才能发送 HTTP 请求。我们可以简单地使用循环来重试:

request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)

encoded = request.encode('ascii')

while True:

try:

sock.send(encoded)

break # Done.

except OSError as e:

pass

print('sent')

这种方法不仅消耗 CPU,也不能有效的等待多个套接字。在远古时代,BSD Unix 的解决方法是 select,这是一个 C 函数,它在一个或一组非阻塞套接字上等待事件发生。现在,互联网应用大量连接的需求,导致 select 被 poll 所代替,在 BSD 上的实现是 kqueue ,在 Linux 上是 epoll。它们的 API 和 select 相似,但在大数量的连接中也能有较好的性能。

Python 3.4 的 DefaultSelector 会使用你系统上最好的 select 类函数。要注册一个网络 I/O 事件的提醒,我们会创建一个非阻塞套接字,并使用默认 selector 注册它。

from selectors import DefaultSelector, EVENT_WRITE

selector = DefaultSelector()

sock = socket.socket()

sock.setblocking(False)

try:

sock.connect(('xkcd.com', 80))

except BlockingIOError:

pass

def connected():

selector.unregister(sock.Fileno())

print('connected!')

selector.register(sock.fileno(), EVENT_WRITE, connected)

我们不理会这个伪造的错误,调用 selector.register,传递套接字文件描述符和一个表示我们想要监听什么事件的常量表达式。为了当连接建立时收到提醒,我们使用 EVENT_WRITE :它表示什么时候这个套接字可写。我们还传递了一个 Python 函数 connected,当对应事件发生时被调用。这样的函数被称为回调。

在一个循环中,selector 接收到 I/O 提醒时我们处理它们。

def loop():

while True:

events = selector.select()

for event_key, event_mask in events:

callback = event_key.data

callback()

connected 回调函数被保存在 event_key.data 中,一旦这个非阻塞套接字建立连接,它就会被取出来执行。

不像我们前面那个快速轮转的循环,这里的 select 调用会暂停,等待下一个 I/O 事件,接着执行等待这些事件的回调函数。没有完成的操作会保持挂起,直到进到下一个事件循环时执行。

到目前为止我们展现了什么?我们展示了如何开始一个 I/O 操作和当操作准备好时调用回调函数。异步框架,它在单线程中执行并发操作,其建立在两个功能之上,非阻塞套接字和事件循环。

我们这里达成了“并发性concurrency”,但不是传统意义上的“并行性parallelism”。也就是说,我们构建了一个可以进行重叠 I/O 的微小系统,它可以在其它操作还在进行的时候就开始一个新的操作。它实际上并没有利用多核来并行执行计算。这个系统是用于解决I/O 密集I/O-bound问题的,而不是解决 CPU 密集CPU-bound问题的。(Python 的全局解释器锁禁止在一个进程中以任何方式并行执行 Python 代码。在 Python 中并行化 CPU 密集的算法需要多个进程,或者以将该代码移植为 C 语言并行版本。但是这是另外一个话题了。)

所以,我们的事件循环在并发 I/O 上是有效的,因为它并不用为每个连接拨付线程资源。但是在我们开始前,我们需要澄清一个常见的误解:异步比多线程快。通常并不是这样的,事实上,在 Python 中,在处理少量非常活跃的连接时,像我们这样的事件循环是慢于多线程的。在运行时环境中是没有全局解释器锁的,在同样的负载下线程会执行的更好。异步 I/O 真正适用于事件很少、有许多缓慢或睡眠的连接的应用程序。(Jesse 在“什么是异步,它如何工作,什么时候该用它?[3]”一文中指出了异步所适用和不适用的场景。Mike Bayer 在“异步 Python 和数据库[4]”一文中比较了不同负载情况下异步 I/O 和多线程的不同。)

回调

用我们刚刚建立的异步框架,怎么才能完成一个网络爬虫?即使是一个简单的网页下载程序也是很难写的。

首先,我们有一个尚未获取的 URL 集合,和一个已经解析过的 URL 集合。

urls_todo = set(['/'])

seen_urls = set(['/'])

seen_urls 集合包括 urls_todo 和已经完成的 URL。用根 URL / 初始化它们。

获取一个网页需要一系列的回调。在套接字连接建立时会触发 connected 回调,它向服务器发送一个 GET 请求。但是它要等待响应,所以我们需要注册另一个回调函数;当该回调被调用,它仍然不能读取到完整的请求时,就会再一次注册回调,如此反复。

让我们把这些回调放在一个 Fetcher 对象中,它需要一个 URL,一个套接字,还需要一个地方保存返回的字节:

class Fetcher:

def __init__(self, url):

self.response = b'' # Empty array of bytes.

self.url = url

self.sock = None

我们的入口点在 Fetcher.fetch:

# Method on Fetcher class.

def fetch(self):

self.sock = socket.socket()

self.sock.setblocking(False)

try:

self.sock.connect(('xkcd.com', 80))

except BlockingIOError:

pass

# Register next callback.

selector.register(self.sock.fileno(),

EVENT_WRITE,

self.connected)

fetch 方法从连接一个套接字开始。但是要注意这个方法在连接建立前就返回了。它必须将控制返回到事件循环中等待连接建立。为了理解为什么要这样做,假设我们程序的整体结构如下:

# Begin fetching http://xkcd.com/353/

fetcher = Fetcher('/353/')

fetcher.fetch()

while True:

events = selector.select()

for event_key, event_mask in events:

callback = event_key.data

callback(event_key, event_mask)

当调用 select 函数后,所有的事件提醒才会在事件循环中处理,所以 fetch 必须把控制权交给事件循环,这样我们的程序才能知道什么时候连接已建立,接着循环调用 connected 回调,它已经在上面的 fetch 方法中注册过。

这里是我们的 connected 方法的实现:

# Method on Fetcher class.

def connected(self, key, mask):

print('connected!')

selector.unregister(key.fd)

request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(self.url)

self.sock.send(request.encode('ascii'))

# Register the next callback.

selector.register(key.fd,

EVENT_READ,

self.read_response)

这个方法发送一个 GET 请求。一个真正的应用会检查 send 的返回值,以防所有的信息没能一次发送出去。但是我们的请求很小,应用也不复杂。它只是简单的调用 send,然后等待响应。当然,它必须注册另一个回调并把控制权交给事件循环。接下来也是最后一个回调函数 read_response,它处理服务器的响应:

# Method on Fetcher class.

def read_response(self, key, mask):

global stopped

chunk = self.sock.recv(4096) # 4k chunk size.

if chunk:

self.response= chunk

else:

selector.unregister(key.fd) # Done reading.

links = self.parse_links()

# Python set-logic:

for link in links.difference(seen_urls):

urls_todo.add(link)

Fetcher(link).fetch() # <- New Fetcher.

seen_urls.update(links)

urls_todo.remove(self.url)

if not urls_todo:

stopped = True

这个回调在每次 selector 发现套接字可读时被调用,可读有两种情况:套接字接受到数据或它被关闭。

这个回调函数从套接字读取 4K 数据。如果不到 4k,那么有多少读多少。如果比 4K 多,chunk 中只包 4K 数据并且这个套接字保持可读,这样在事件循环的下一个周期,会再次回到这个回调函数。当响应完成时,服务器关闭这个套接字,chunk 为空。

这里没有展示的 parse_links 方法,它返回一个 URL 集合。我们为每个新的 URL 启动一个 fetcher。注意一个使用异步回调方式编程的好处:我们不需要为共享数据加锁,比如我们往 seen_urls 增加新链接时。这是一种非抢占式的多任务,它不会在我们代码中的任意一个地方被打断。

我们增加了一个全局变量 stopped,用它来控制这个循环:

stopped = False

def loop():

while not stopped:

events = selector.select()

for event_key, event_mask in events:

callback = event_key.data

callback()

一旦所有的网页被下载下来,fetcher 停止这个事件循环,程序退出。

这个例子让异步编程的一个问题明显的暴露出来:意大利面代码。

我们需要某种方式来表达一系列的计算和 I/O 操作,并且能够调度多个这样的系列操作让它们并发的执行。但是,没有线程你不能把这一系列操作写在一个函数中:当函数开始一个 I/O 操作,它明确的把未来所需的状态保存下来,然后返回。你需要考虑如何写这个状态保存的代码。

让我们来解释下这到底是什么意思。先来看一下在线程中使用通常的阻塞套接字来获取一个网页时是多么简单。

# Blocking version.

def fetch(url):

sock = socket.socket()

sock.connect(('xkcd.com', 80))

request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)

sock.send(request.encode('ascii'))

response = b''

chunk = sock.recv(4096)

while chunk:

response= chunk

chunk = sock.recv(4096)

# Page is now downloaded.

links = parse_links(response)

q.add(links)

在一个套接字操作和下一个操作之间这个函数到底记住了什么状态?它有一个套接字,一个 URL 和一个可增长的 response。运行在线程中的函数使用编程语言的基本功能来在栈中的局部变量保存这些临时状态。这样的函数也有一个“continuation”——它会在 I/O 结束后执行这些代码。运行时环境通过线程的指令指针来记住这个 continuation。你不必考虑怎么在 I/O 操作后恢复局部变量和这个 continuation。语言本身的特性帮你解决。

但是用一个基于回调的异步框架时,这些语言特性不能提供一点帮助。当等待 I/O 操作时,一个函数必须明确的保存它的状态,因为它会在 I/O 操作完成之前返回并清除栈帧。在我们基于回调的例子中,作为局部变量的替代,我们把 sock 和 response 作为 Fetcher 实例 self 的属性来存储。而作为指令指针的替代,它通过注册 connected 和 read_response 回调来保存它的 continuation。随着应用功能的增长,我们需要手动保存的回调的复杂性也会增加。如此繁复的记账式工作会让编码者感到头痛。

更糟糕的是,当我们的回调函数抛出异常会发生什么?假设我们没有写好 parse_links 方法,它在解析 HTML 时抛出异常:

Traceback (most recent call last):

File "loop-with-callbacks.py", line 111, in <module>

loop()

File "loop-with-callbacks.py", line 106, in loop

callback(event_key, event_mask)

File "loop-with-callbacks.py", line 51, in read_response

links = self.parse_links()

File "loop-with-callbacks.py", line 67, in parse_links

raise Exception('parse error')

Exception: parse error

这个堆栈回溯只能显示出事件循环调用了一个回调。我们不知道是什么导致了这个错误。这条链的两边都被破坏:不知道从哪来也不知到哪去。这种丢失上下文的现象被称为“堆栈撕裂stack ripping”,经常会导致无法分析原因。它还会阻止我们为回调链设置异常处理,即那种用“try / except”块封装函数调用及其调用树。(对于这个问题的更复杂的解决方案,参见 http://www.tornadoweb.org/en/stable/stack_context.html)

所以,除了关于多线程和异步哪个更高效的长期争议之外,还有一个关于这两者之间的争论:谁更容易跪了。如果在同步上出现失误,线程更容易出现数据竞争的问题,而回调因为"堆栈撕裂stack ripping"问题而非常难于调试。

(题图素材来自:ruth-tay.deviantart.com[5])


via: http://aosabook.org/en/500L/pages/a-web-crawler-with-asyncio-coroutines.html

作者:A. Jesse Jiryu Davis , Guido van Rossum 译者:qingyunha 校对:wxy

本文由 LCTT[6] 原创翻译,Linux中国 荣誉推出

    推荐阅读
  • 斜杠青年是什么意思(斜杠青年的含义)

    以下内容大家不妨参考一二希望能帮到您!斜杠青年是什么意思斜杠青年指的是一群不再满足“专一职业”的生活方式,而选择拥有多重职业和身份的多元生活的人群。来源于英文Slash,出自《纽约时报》专栏作家麦瑞克·阿尔伯撰写的书籍《双重职业》。这些人在自我介绍中会用斜杠来区分,例如:张三,记者/演员/摄影师。“斜杠”便成了他们的代名词。斜杠青年越来越流行,已成为年轻人热衷的生活方式。

  • 男生变白技巧(这样做可以让男生变白)

    男生变白技巧蜂蜜面膜,蜂蜜面膜可以让皮肤快速变白,而且,制作方法也非常的简单。只需要将水,还有蜂蜜按照比例调配好之后,再加入一些珍珠粉,搅拌均匀,然后涂抹在脸上。多吃西红柿,西红柿会让皮肤变得白皙起来,还会起到淡化斑点的效果。大量补水,当皮肤黑了,就表示你肌肤的水分已经流失了很多。这个时候需要大量的补水,要知道补水也是美白的关键。敷美白面膜,可以适当的敷一些美白面膜,日常的基本护理也要做到位。

  • 贵阳2022年房价会上升还是下降(大众信心不足房价)

    当下,贵阳各大售楼部优惠满天飞。销量、房价双双持续下降,贵阳楼市陷入低谷已经成为事实。分区域来看,8月份贵阳核心六城区商品住宅成交量均环比下降,乌当区商品住宅市场进一步恶化,贵阳小透明实至名归,销售量仅为1.1万方,遥遥落后其他区域,而观山湖持续在销量和房价上领跑贵阳。从区域来看,价格分化逐渐形成,梯度化明显,核心区域领跑贵阳楼市。

  • 苏教版小学数学2年级下册(苏教版小学数学2年级下册学期电子版教材课本分享)

    祝各位学习顺利需要小学初中高中各版本各科电子教材课本教科书课件习题试卷请关注我们,私信回复电子资料免责声明本文仅供学习参考和教研使用,如有冒犯您的权益,请您及时联系我们,我们会及时进行下架删除处理,特此声明。

  • 脐橙炖冰糖有什么功效(冰糖脐橙的作用)

    脐橙炖冰糖有什么功效和胃降逆,冰糖橙子有和胃降逆的功效作用,有助于肠胃的健康。因为冰糖橙子味酸,是属性寒凉的水果,入肝、胃经,有和中开胃、降逆止呕之功,善用于饮食停滞而引起的呕吐、胃中浮风恶气、肝胃郁热等疾病。因为冰糖橙子味酸芳香,酸能杀菌,有除醒脾,和胃降逆,对于因饮酒过量,或做鱼蟹之菜肴,均有较好的调味和解毒醒酒作用。使菜肴更加可口适中,味香鲜美。

  • 玉米粒怎么剥(怎么把玉米粒掰下来)

    我们一起去了解并探讨一下这个问题吧!玉米粒怎么剥先将玉米从中间掰成两份部分,让它形成一个比较大的断层。用一个叉子贴着玉米粒的根部开始插入,再一点点的推动叉子,让玉米粒被剥离下来。剥离掉一部分玉米粒之后就可以用传统的方法剥离玉米粒了,之后再用同样的方法剥另一半玉米就可以了。

  • 巴西蜂胶的功效与作用及食用方法(巴西蜂胶的作用及怎么食用)

    巴西蜂胶的功效与作用及食用方法巴西蜂胶的功效有:免疫增强,抗氧化性,抗微生物菌种,抗肿瘤等功效。巴西蜂胶还可以水溶后食用,但是溶解它的水分不宜温度过高,最多不能超过五十度,在把蜂胶液放入杯子中以后冲入温水调匀饮用既可,另外大家在喝牛奶或者喝果汁的时候,也可以把蜂胶液加入进行,与它们一起饮用,会让更利于蜂胶功效的发挥。

  • 冰粉的制作方法(冰粉的制作方法是什么)

    冰粉的制作方法材料准备蜂蜜适量,水1.5升,凉粉20克,蓝莓适量,猕猴桃一个,红心火龙果半个,黄桃一个。准备所需要的白凉冻、红心火龙果、、黄桃、猕猴桃、蓝莓。水煮沸后倒入一干净锅中加入20克冰粉,顺时针方向搅拌均匀至完全溶解。放置冷却后成冰冻,入冰箱冷藏室3小时后用刀切小块。把水果切小块和冰粉放在一起备用,三汤匙凉白开、蜂蜜调搅拌均匀制成糖水,倒入调好的蜂蜜汁搅拌均匀。

  • 祁门红茶出祁门哪个镇(因境内东北有祁山)

    祁门县隶属于安徽省黄山市,总面积2257平方千米,总人口19万。村中的“会源堂古戏台”和“敦典堂古戏台”为第六批国保单位。地处祁门县箬坑乡,古名为“南源”亦称“南溪”,系千年古村,为汪姓聚居地。在祁门县箬坑乡境内,有棋盘石、白云庵遗址、上马石、下马石以及当年红军藏粮洞等景点。

  • 上班一年多裁员怎么赔偿(以经营困难为由)

    案例:孙某是某养殖行业上市公司繁殖经理,在公司有5年工龄,2021年5月,上级领导面谈说,公司经营困难,繁殖部门将要与兽医技术部门合并,让孙某办理离职手续。孙某本以为公司裁员会有赔偿,可是2个月后孙某并没有收到赔偿,孙某咨询律师,律师了解当时的行业情况,基本上所有养殖行业面临亏损,说企业经营困难,裁员是可以不用赔偿的。孙某想了想,5月份行情虽然一直下滑,但还是盈利,应当给自己赔偿!