前段时间看到这个grequests库,感觉还是蛮有意思的,所以今天来对这个库拆解拆解。这个库是崇拜的大神kennethreitz写的。Github地址:https://github.com/kennethreitz/grequests

首先看到文档上给的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import grequests
urls = [
'http://www.heroku.com',
'http://python-tablib.org',
'http://httpbin.org',
'http://python-requests.org',
'http://fakedomain/',
'http://kennethreitz.com'
]
# 创建没有发送的request集合
rs = (grequests.get(u) for u in urls)
# 发送
grequests.map(rs)
# 为了防止超时和异常发生,可以指定一个异常处理器
def exception_handler(request, exception):
print("Request failed")
reqs = [
grequests.get('http://httpbin.org/delay/1', timeout=0.001),
grequests.get('http://fakedomain/'),
grequests.get('http://httpbin.org/status/500')]
grequests.map(reqs, exception_handler=exception_handler)
另外,可以使用imap来提高性能

根据这个示例,我们来看看源代码

AsyncRequest

首先来看grequests.get

1
2
get = partial(AsyncRequest, 'GET')
options = partial(AsyncRequest, 'OPTIONS')

它和其他诸多HTTP方法一样,只是一个快捷方式,其本质是调用了AsyncRequest,看到这个名字就应该知道是异步的Request,所以应该是对普通的Request做了封装和修改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class AsyncRequest(object):
""" 异步的Request,接收和Session.request相同的参数,还有一些额外的参数
session: 发送请求的session
callback: 在返回对象上的回调函数,和传递hooks={'response': callback}一样
"""
def __init__(self, method, url, **kwargs):
#: Request method
self.method = method
#: URL to request
self.url = url
#: Associated ``Session``
self.session = kwargs.pop('session', None)
if self.session is None:
# requests里的Session对象
self.session = Session()
callback = kwargs.pop('callback', None)
if callback:
kwargs['hooks'] = {'response': callback}
#: The rest arguments for ``Session.request``
self.kwargs = kwargs
#: Resulting ``Response``
self.response = None

可以看到使用partial(AsyncRequest, 'GET')会使得method默认为GET,然后再rs = (grequests.get(u) for u in urls)会形成一个生成器,里面生成AsyncRequest对象。下面来看看这些request是如何发送的:

map

之后程序会调用grequests.map(rs),那么我们来看看map:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def map(requests, stream=False, size=None, exception_handler=None, gtimeout=None):
"""并发的将Requests列表转换成响应
requests: Request对象的集合
stream: 如果为True,那么响应内容不会立即下载
size: 指定同时发起的请求数目,如果为None,就不会有限制
exception_handler: 回调函数,当异常发生的时候调用,参数是Request和Exception
gtimeout: Gevent合并所有的超时时间,单位为秒(与每个request的超时时间无关)
"""
# 将生成器直接转换成list
requests = list(requests)
# gevent的Pool对象,是时候研究一波gevent了
pool = Pool(size) if size else None
# 调用send函数来发送请求
jobs = [send(r, pool, stream=stream) for r in requests]
# 等待所有的greenlet处理单元结束运行
gevent.joinall(jobs, timeout=gtimeout)
ret = []
# 处理所有的请求响应,并且处理异常
for request in requests:
if request.response is not None:
ret.append(request.response)
# 如果有异常处理器并且request有异常进行处理
elif exception_handler and hasattr(request, 'exception'):
ret.append(exception_handler(request, request.exception))
else:
# 否则结果置为None
ret.append(None)
return ret

可以看到map函数很简单,大体流程就是建立了greenlet处理器池,然后对每个request进行调用,然后等待结束,最后得到响应并且处理响应。

1
2
3
4
# 如果给定size则创建greenlet池,否则为None
pool = Pool(size) if size else None
jobs = [send(r, pool, stream=stream) for r in requests]
gevent.joinall(jobs, timeout=gtimeout)

所以问题的关键还是在于gevent那几行的调用,创建管理greenlet的池,用来限制并发,创建好了之后,调用这个pool然后去发送请求,最后等待所有的greenlet结束。

Gevent的并发

无论创建还是没有创建池,最终是要调用send方法的,来看看这个函数:

1
2
3
4
5
6
def send(r, pool=None, stream=False):
"""使用指定的pool发送request对象,如果pool没有指定,这个方法就会阻塞,Pools很有用,因为你可以指定并发限制"""
if pool is not None:
return pool.spawn(r.send, stream=stream)
return gevent.spawn(r.send, stream=stream)

gevent.spawn

gevent.spawn创建一个新的Greenlet对象,并且排定运行调用function(*args, **kwargs),这个可以使用gevent.spawn或者是Greenlet.spawn,其实gevent.spawn就是Greenlet.spawn,并且最后会调用Greenlet的类方法,首先实例化一个对象,然后调用start方法,所以也相当于调用Greenlet(*args, **kwargs)

这也是类方法的一个用法,另外的实例化对象的方法

1
2
3
4
5
@classmethod
def spawn(cls, *args, **kwargs):
g = cls(*args, **kwargs)
g.start()
return g

pool.spawn

这个方法使用给定的参数开始一个新的greenlet,通常是传递给Greenlet构造函数,并且将其加入这个pool管理的greenlets集合

Pool是Group的子类,提供了限制并发的方法,其spawn方法在greenlets数目达到上限的时候阻塞,直到有一个可用的greenlet。

这个方法也是使用pool实例为Greenlet创建一个实例,然后start it

1
2
3
4
def spawn(self, *args, **kwargs):
greenlet = self.greenlet_class(*args, **kwargs)
self.start(greenlet)
return greenlet

r.send

这个是AsyncRequest的send方法,比较简单,就是发送请求,等待响应。

1
2
3
4
5
6
7
8
9
10
def send(self, **kwargs):
merged_kwargs = {}
merged_kwargs.update(self.kwargs)
merged_kwargs.update(kwargs)
try:
self.response = self.session.request(self.method, self.url, **merged_kwargs)
except Exception as e:
self.exception = e
self.traceback = traceback.format_exc()
return self

imap

imap据说可以提高性能,快来看看吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def imap(requests, stream=False, size=2, exception_handler=None):
"""并发的将Request对象的生成器转换成响应的生成器。
requests: Request对象的生成器
stream: 如果为True,则不会立即自动下载
size: 同时发起的请求数,默认为2
exception_handler: 当发生异常时候回调
"""
pool = Pool(size)
def send(r):
return r.send(stream=stream)
for request in pool.imap_unordered(send, requests):
if request.response is not None:
yield request.response
elif exception_handler:
exception_handler(request, request.exception)
pool.join()

可以看到这个函数主要是使用了pool.imap_unordered,其实pool还有一个方法是imap

pool.imap

itertools.imap()是一致的,itertools.imap()可以用于迭代无穷序列,比如itertools.imap(lambda x, y: x * y, [10, 20, 30], itertools.count(1)),如果两个序列长短不一致,以短的为准,并且imap实现了惰性计算,类似生成器。

pool.imap可以并行运行,按顺序从迭代对象中取出元素迭代,应用在函数上,然后收集结果。

如果限制了可以同时进行的greenlets数量,那么最多只有这么多个任务同时进行。

pool.imap_unordered

和imap一样,返回的结果顺序是随意的,比起imap更加轻量级,如果顺序不重要的话,首先应该选用这个。

join

等待这个group的greenlets都运行完,如果这个group没有greenlet的话,立即返回

可以看到,如果不要求顺序的话,imap_unordered会比imap更加高效,同时imap版本肯定比map版本性能好,因为map版本必须全部运行完才能拿数据,但是imap版本只要有greenlet有结果就可以取出来。

小结

这个库就这么多内容,其实主要是使用gevent封装了一层requests,所以核心就是使用gevent,gevent怎么用,如何用,待我继续研究。

不过通过看这个库,也了解到了简单的gevent的用法。