Scrapy-Redis 库已经为我们提供了 Scrapy 分布式的队列、调度器、去重等功能,其 GitHub 地址为:https://github.com/rmax/scrapy-redis。
本节我们深入了解一下,利用 Redis 如何实现 Scrapy 分布式。
可以把源码克隆下来,执行如下命令:
git clone https://github.com/rmax/scrapy-redis.git
核心源码在 scrapy-redis/src/scrapy_redis 目录下。
从爬取队列入手,看看它的具体实现。源码文件为 queue.py,它有三个队列的实现,首先它实现了一个父类 Base,提供一些基本方法和属性,如下所示:
class Base(object):"""Per-spider base queue class"""def __init__(self, server, spider, key, serializer=None):if serializer is None:serializer = picklecompatif not hasattr(serializer, 'loads'):raise TypeError("serializer does not implement 'loads' function: % r"% serializer)if not hasattr(serializer, 'dumps'):raise TypeError("serializer '% s' does not implement 'dumps' function: % r"% serializer)self.server = serverself.spider = spiderself.key = key % {'spider': spider.name}self.serializer = serializerdef _encode_request(self, request):obj = request_to_dict(request, self.spider)return self.serializer.dumps(obj)def _decode_request(self, encoded_request):obj = self.serializer.loads(encoded_request)return request_from_dict(obj, self.spider)def __len__(self):"""Return the length of the queue"""raise NotImplementedErrordef push(self, request):"""Push a request"""raise NotImplementedErrordef pop(self, timeout=0):"""Pop a request"""raise NotImplementedErrordef clear(self):"""Clear queue/stack"""self.server.delete(self.key)
首先看一下 _encode_request() 和 _decode_request() 方法,因为我们需要把一 个 Request 对象存储到数据库中,但数据库无法直接存储对象,所以需要将 Request 序列化转成字符串再存储,而这两个方法就分别是序列化和反序列化的操作,利用 pickle 库来实现,一般在调用 push() 将 Request 存入数据库时会调用 _encode_request() 方法进行序列化,在调用 pop() 取出 Request 的时候会调用 _decode_request() 进行反序列化。
在父类中 __len__()、push() 和 pop() 方法都是未实现的,会直接抛出 NotImplementedError,因此这个类是不能直接被使用的,所以必须要实现一个子类来重写这三个方法,而不同的子类就会有不同的实现,也就有着不同的功能。
那么接下来就需要定义一些子类来继承 Base 类,并重写这几个方法,那在源码中就有三个子类的实现,它们分别是 FifoQueue、PriorityQueue、LifoQueue,我们分别来看下它们的实现原理。
首先是 FifoQueue:
class FifoQueue(Base):"""Per-spider FIFO queue"""def __len__(self):"""Return the length of the queue"""return self.server.llen(self.key)def push(self, request):"""Push a request"""self.server.lpush(self.key, self._encode_request(request))def pop(self, timeout=0):"""Pop a request"""if timeout > 0:data = self.server.brpop(self.key, timeout)if isinstance(data, tuple):data = data[1]else:data = self.server.rpop(self.key)if data:return self._decode_request(data)
可以看到这个类继承了 Base 类,并重写了 __len__()、push()、pop() 这三个方法,在这三个方法中都是对 server 对象的操作,而 server 对象就是一个 Redis 连接对象,我们可以直接调用其操作 Redis 的方法对数据库进行操作,可以看到这里的操作方法有 llen()、lpush()、rpop() 等,那这就代表此爬取队列是使用的 Redis 的列表,序列化后的 Request 会被存入列表中,就是列表的其中一个元素,__len__() 方法是获取列表的长度,push() 方法中调用了 lpush() 操作,这代表从列表左侧存入数据,pop() 方法中调用了 rpop() 操作,这代表从列表右侧取出数据。
所以 Request 在列表中的存取顺序是左侧进、右侧出,所以这是有序的进出,即先进先出,英文叫做 First Input First Output,也被简称作 Fifo,而此类的名称就叫做 FifoQueue。
另外还有一个与之相反的实现类,叫做 LifoQueue,实现如下:
class LifoQueue(Base):"""Per-spider LIFO queue."""def __len__(self):"""Return the length of the stack"""return self.server.llen(self.key)def push(self, request):"""Push a request"""self.server.lpush(self.key, self._encode_request(request))def pop(self, timeout=0):"""Pop a request"""if timeout > 0:data = self.server.blpop(self.key, timeout)if isinstance(data, tuple):data = data[1]else:data = self.server.lpop(self.key)if data:return self._decode_request(data)
与 FifoQueue 不同的就是它的 pop() 方法,在这里使用的是 lpop() 操作,也就是从左侧出,而 push() 方法依然是使用的 lpush() 操作,是从左侧入。那么这样达到的效果就是先进后出、后进先出,英文叫做 Last In First Out,简称为 Lifo,而此类名称就叫做 LifoQueue。同时这个存取方式类似栈的操作,所以其实也可以称作 StackQueue。
另外在源码中还有一个子类实现,叫做 PriorityQueue,顾名思义,它叫做优先级队列,实现如下:
class PriorityQueue(Base):"""Per-spider priority queue abstraction using redis' sorted set"""def __len__(self):"""Return the length of the queue"""return self.server.zcard(self.key)def push(self, request):"""Push a request"""data = self._encode_request(request)score = -request.priorityself.server.execute_command('ZADD', self.key, score, data)def pop(self, timeout=0):"""Pop a requesttimeout not support in this queue class"""pipe = self.server.pipeline()pipe.multi()pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0)results, count = pipe.execute()if results:return self._decode_request(results[0])
在这里我们可以看到 __len__()、push()、pop() 方法中使用了 server 对象的 zcard()、zadd()、zrange() 操作,可以知道这里使用的存储结果是有序集合 Sorted Set,在这个集合中每个元素都可以设置一个分数,那么这个分数就代表优先级。
在 __len__() 方法里调用了 zcard() 操作,返回的就是有序集合的大小,也就是爬取队列的长度,在 push() 方法中调用了 zadd() 操作,就是向集合中添加元素,这里的分数指定成 Request 的优先级的相反数,因为分数低的会排在集合的前面,所以这里高优先级的 Request 就会存在集合的最前面。pop() 方法是首先调用了 zrange() 操作取出了集合的第一个元素,因为最高优先级的 Request 会存在集合最前面,所以第一个元素就是最高优先级的 Request,然后再调用 zremrangebyrank() 操作将这个元素删除,这样就完成了取出并删除的操作。
此队列是默认使用的队列,也就是爬取队列默认是使用有序集合来存储的。
前面说过 Scrapy 的去重是利用集合来实现的,而在 Scrapy 分布式中的去重就需要利用共享的集合,那么这里使用的就是 Redis 中的集合数据结构。我们来看看去重类是怎样实现的,源码文件是 dupefilter.py,其内实现了一个 RFPDupeFilter 类,如下所示:
class RFPDupeFilter(BaseDupeFilter):"""Redis-based request duplicates filter.This class can also be used with default Scrapy's scheduler."""logger = loggerdef __init__(self, server, key, debug=False):"""Initialize the duplicates filter.Parameters----------server : redis.StrictRedisThe redis server instance.key : strRedis key Where to store fingerprints.debug : bool, optionalWhether to log filtered requests."""self.server = serverself.key = keyself.debug = debugself.logdupes = True@classmethoddef from_settings(cls, settings):"""Returns an instance from given settings.This uses by default the key ``dupefilter:<timestamp>``. When using the``scrapy_redis.scheduler.Scheduler`` class, this method is not used asit needs to pass the spider name in the key.Parameters----------settings : scrapy.settings.SettingsReturns-------RFPDupeFilterA RFPDupeFilter instance."""server = get_redis_from_settings(settings)key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())}debug = settings.getbool('DUPEFILTER_DEBUG')return cls(server, key=key, debug=debug)@classmethoddef from_crawler(cls, crawler):"""Returns instance from crawler.Parameters----------crawler : scrapy.crawler.CrawlerReturns-------RFPDupeFilterInstance of RFPDupeFilter."""return cls.from_settings(crawler.settings)def request_seen(self, request):"""Returns True if request was already seen.Parameters----------request : scrapy.http.RequestReturns-------bool"""fp = self.request_fingerprint(request)added = self.server.sadd(self.key, fp)return added == 0def request_fingerprint(self, request):"""Returns a fingerprint for a given request.Parameters----------request : scrapy.http.RequestReturns-------str"""return request_fingerprint(request)def close(self, reason=''):"""Delete data on close. Called by Scrapy's scheduler.Parameters----------reason : str, optional"""self.clear()def clear(self):"""Clears fingerprints data."""self.server.delete(self.key)def log(self, request, spider):"""Logs given request.Parameters----------request : scrapy.http.Requestspider : scrapy.spiders.Spider"""if self.debug:msg = "Filtered duplicate request: %(request) s"self.logger.debug(msg, {'request': request}, extra={'spider': spider})elif self.logdupes:msg = ("Filtered duplicate request %(request) s""- no more duplicates will be shown""(see DUPEFILTER_DEBUG to show all duplicates)")self.logger.debug(msg, {'request': request}, extra={'spider': spider})self.logdupes = False
这里同样实现了一个 request_seen() 方法,和 Scrapy 中的 request_seen() 方法实现极其类似。不过这里集合使用的是 server 对象的 sadd() 操作,也就是集合不再是一个简单数据结构了,而是直接换成了数据库的存储方式。
鉴别重复的方式还是使用指纹,指纹同样是依靠 request_fingerprint() 方法来获取的。获取指纹之后就直接向集合添加指纹,如果添加成功,说明这个指纹原本不存在于集合中,返回值 1。代码中最后的返回结果是判定添加结果是否为 0,如果刚才的返回值为 1,那这个判定结果就是 False,也就是不重复,否则判定为重复。
这样我们就成功利用 Redis 的集合完成了指纹的记录和重复的验证。
Scrapy-Redis 还帮我们实现了配合 Queue、DupeFilter 使用的调度器 Scheduler,源文件名称是 scheduler.py。我们可以指定一些配置,如 SCHEDULER_FLUSH_ON_START 即是否在爬取开始的时候清空爬取队列,SCHEDULER_PERSIST 即是否在爬取结束后保持爬取队列不清除。我们可以在 settings.py 里自由配置,而此调度器很好地实现了对接。
接下来我们看看两个核心的存取方法,实现如下所示:
def enqueue_request(self, request):if not request.dont_filter and self.df.request_seen(request):self.df.log(request, self.spider)return Falseif self.stats:self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)self.queue.push(request)return Truedef next_request(self):block_pop_timeout = self.idle_before_closerequest = self.queue.pop(block_pop_timeout)if request and self.stats:self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)return request
enqueue_request() 可以向队列中添加 Request,核心操作就是调用 Queue 的 push() 操作,还有一些统计和日志操作。next_request() 就是从队列中取 Request,核心操作就是调用 Queue 的 pop() 操作,此时如果队列中还有 Request,则 Request 会直接取出来,爬取继续,否则如果队列为空,爬取则会重新开始。
那么到现在为止我们就把之前所说的三个分布式的问题解决了,总结如下:
爬取队列的实现,在这里提供了三种队列,使用了 Redis 的列表或有序集合来维护。
去重的实现,使用了 Redis 的集合来保存 Request 的指纹来提供重复过滤。
中断后重新爬取的实现,中断后 Redis 的队列没有清空,再次启动时调度器的 next_request() 会从队列中取到下一个 Request,继续爬取。
以上内容便是 Scrapy-Redis 的核心源码解析。Scrapy-Redis 中还提供了 Spider、Item Pipeline 的实现,不过它们并不是必须使用。
在下一节,我们会将 Scrapy-Redis 集成到之前所实现的 Scrapy 新浪微博项目中,实现多台主机协同爬取。