# 14.2-Scrapy-Redis源码解析

Scrapy-Redis 库已经为我们提供了 Scrapy 分布式的队列、调度器、去重等功能，其 GitHub 地址为：[https://github.com/rmax/scrapy-redis。](https://github.com/rmax/scrapy-redis%E3%80%82)

本节我们深入了解一下，利用 Redis 如何实现 Scrapy 分布式。

## 1. 获取源码

可以把源码克隆下来，执行如下命令：

```
git clone https://github.com/rmax/scrapy-redis.git
```

核心源码在 scrapy-redis/src/scrapy\_redis 目录下。

## 2. 爬取队列

从爬取队列入手，看看它的具体实现。源码文件为 queue.py，它有三个队列的实现，首先它实现了一个父类 Base，提供一些基本方法和属性，如下所示：

```python
class Base(object):
    """Per-spider base queue class"""
    def __init__(self, server, spider, key, serializer=None):
        if serializer is None:
            serializer = picklecompat
        if not hasattr(serializer, 'loads'):
            raise TypeError("serializer does not implement 'loads' function: % r"
                            % serializer)
        if not hasattr(serializer, 'dumps'):
            raise TypeError("serializer '% s' does not implement 'dumps' function: % r"
                            % serializer)
        self.server = server
        self.spider = spider
        self.key = key % {'spider': spider.name}
        self.serializer = serializer

    def _encode_request(self, request):
        obj = request_to_dict(request, self.spider)
        return self.serializer.dumps(obj)

    def _decode_request(self, encoded_request):
        obj = self.serializer.loads(encoded_request)
        return request_from_dict(obj, self.spider)

    def __len__(self):
        """Return the length of the queue"""
        raise NotImplementedError

    def push(self, request):
        """Push a request"""
        raise NotImplementedError

    def pop(self, timeout=0):
        """Pop a request"""
        raise NotImplementedError

    def clear(self):
        """Clear queue/stack"""
        self.server.delete(self.key)
```

首先看一下 \_encode\_request() 和 \_decode\_request() 方法，因为我们需要把一 个 Request 对象存储到数据库中，但数据库无法直接存储对象，所以需要将 Request 序列化转成字符串再存储，而这两个方法就分别是序列化和反序列化的操作，利用 pickle 库来实现，一般在调用 push() 将 Request 存入数据库时会调用 \_encode\_request() 方法进行序列化，在调用 pop() 取出 Request 的时候会调用 \_decode\_request() 进行反序列化。

在父类中 \_\_len\_\_()、push() 和 pop() 方法都是未实现的，会直接抛出 NotImplementedError，因此这个类是不能直接被使用的，所以必须要实现一个子类来重写这三个方法，而不同的子类就会有不同的实现，也就有着不同的功能。

那么接下来就需要定义一些子类来继承 Base 类，并重写这几个方法，那在源码中就有三个子类的实现，它们分别是 FifoQueue、PriorityQueue、LifoQueue，我们分别来看下它们的实现原理。

首先是 FifoQueue：

```python
class FifoQueue(Base):
    """Per-spider FIFO queue"""

    def __len__(self):
        """Return the length of the queue"""
        return self.server.llen(self.key)

    def push(self, request):
        """Push a request"""
        self.server.lpush(self.key, self._encode_request(request))

    def pop(self, timeout=0):
        """Pop a request"""
        if timeout > 0:
            data = self.server.brpop(self.key, timeout)
            if isinstance(data, tuple):
                data = data[1]
        else:
            data = self.server.rpop(self.key)
        if data:
            return self._decode_request(data)
```

可以看到这个类继承了 Base 类，并重写了 \_\_len\_\_()、push()、pop() 这三个方法，在这三个方法中都是对 server 对象的操作，而 server 对象就是一个 Redis 连接对象，我们可以直接调用其操作 Redis 的方法对数据库进行操作，可以看到这里的操作方法有 llen()、lpush()、rpop() 等，那这就代表此爬取队列是使用的 Redis 的列表，序列化后的 Request 会被存入列表中，就是列表的其中一个元素，\_\_len\_\_() 方法是获取列表的长度，push() 方法中调用了 lpush() 操作，这代表从列表左侧存入数据，pop() 方法中调用了 rpop() 操作，这代表从列表右侧取出数据。

所以 Request 在列表中的存取顺序是左侧进、右侧出，所以这是有序的进出，即先进先出，英文叫做 First Input First Output，也被简称作 Fifo，而此类的名称就叫做 FifoQueue。

另外还有一个与之相反的实现类，叫做 LifoQueue，实现如下：

```python
class LifoQueue(Base):
    """Per-spider LIFO queue."""

    def __len__(self):
        """Return the length of the stack"""
        return self.server.llen(self.key)

    def push(self, request):
        """Push a request"""
        self.server.lpush(self.key, self._encode_request(request))

    def pop(self, timeout=0):
        """Pop a request"""
        if timeout > 0:
            data = self.server.blpop(self.key, timeout)
            if isinstance(data, tuple):
                data = data[1]
        else:
            data = self.server.lpop(self.key)

        if data:
            return self._decode_request(data)
```

与 FifoQueue 不同的就是它的 pop() 方法，在这里使用的是 lpop() 操作，也就是从左侧出，而 push() 方法依然是使用的 lpush() 操作，是从左侧入。那么这样达到的效果就是先进后出、后进先出，英文叫做 Last In First Out，简称为 Lifo，而此类名称就叫做 LifoQueue。同时这个存取方式类似栈的操作，所以其实也可以称作 StackQueue。

另外在源码中还有一个子类实现，叫做 PriorityQueue，顾名思义，它叫做优先级队列，实现如下：

```python
class PriorityQueue(Base):
    """Per-spider priority queue abstraction using redis' sorted set"""

    def __len__(self):
        """Return the length of the queue"""
        return self.server.zcard(self.key)

    def push(self, request):
        """Push a request"""
        data = self._encode_request(request)
        score = -request.priority
        self.server.execute_command('ZADD', self.key, score, data)

    def pop(self, timeout=0):
        """
        Pop a request
        timeout not support in this queue class
        """
        pipe = self.server.pipeline()
        pipe.multi()
        pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0)
        results, count = pipe.execute()
        if results:
            return self._decode_request(results[0])
```

在这里我们可以看到 \_\_len\_\_()、push()、pop() 方法中使用了 server 对象的 zcard()、zadd()、zrange() 操作，可以知道这里使用的存储结果是有序集合 Sorted Set，在这个集合中每个元素都可以设置一个分数，那么这个分数就代表优先级。

在 \_\_len\_\_() 方法里调用了 zcard() 操作，返回的就是有序集合的大小，也就是爬取队列的长度，在 push() 方法中调用了 zadd() 操作，就是向集合中添加元素，这里的分数指定成 Request 的优先级的相反数，因为分数低的会排在集合的前面，所以这里高优先级的 Request 就会存在集合的最前面。pop() 方法是首先调用了 zrange() 操作取出了集合的第一个元素，因为最高优先级的 Request 会存在集合最前面，所以第一个元素就是最高优先级的 Request，然后再调用 zremrangebyrank() 操作将这个元素删除，这样就完成了取出并删除的操作。

此队列是默认使用的队列，也就是爬取队列默认是使用有序集合来存储的。

## 3. 去重过滤

前面说过 Scrapy 的去重是利用集合来实现的，而在 Scrapy 分布式中的去重就需要利用共享的集合，那么这里使用的就是 Redis 中的集合数据结构。我们来看看去重类是怎样实现的，源码文件是 dupefilter.py，其内实现了一个 RFPDupeFilter 类，如下所示：

```python
class RFPDupeFilter(BaseDupeFilter):
    """Redis-based request duplicates filter.
    This class can also be used with default Scrapy's scheduler.
    """
    logger = logger
    def __init__(self, server, key, debug=False):
        """Initialize the duplicates filter.
        Parameters
        ----------
        server : redis.StrictRedis
            The redis server instance.
        key : str
            Redis key Where to store fingerprints.
        debug : bool, optional
            Whether to log filtered requests.
        """
        self.server = server
        self.key = key
        self.debug = debug
        self.logdupes = True

    @classmethod
    def from_settings(cls, settings):
        """Returns an instance from given settings.
        This uses by default the key ``dupefilter:<timestamp>``. When using the
        ``scrapy_redis.scheduler.Scheduler`` class, this method is not used as
        it needs to pass the spider name in the key.
        Parameters
        ----------
        settings : scrapy.settings.Settings
        Returns
        -------
        RFPDupeFilter
            A RFPDupeFilter instance.
        """
        server = get_redis_from_settings(settings)
        key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())}
        debug = settings.getbool('DUPEFILTER_DEBUG')
        return cls(server, key=key, debug=debug)

    @classmethod
    def from_crawler(cls, crawler):
        """Returns instance from crawler.
        Parameters
        ----------
        crawler : scrapy.crawler.Crawler
        Returns
        -------
        RFPDupeFilter
            Instance of RFPDupeFilter.
        """
        return cls.from_settings(crawler.settings)

    def request_seen(self, request):
        """Returns True if request was already seen.
        Parameters
        ----------
        request : scrapy.http.Request
        Returns
        -------
        bool
        """
        fp = self.request_fingerprint(request)
        added = self.server.sadd(self.key, fp)
        return added == 0

    def request_fingerprint(self, request):
        """Returns a fingerprint for a given request.
        Parameters
        ----------
        request : scrapy.http.Request

        Returns
        -------
        str

        """
        return request_fingerprint(request)

    def close(self, reason=''):
        """Delete data on close. Called by Scrapy's scheduler.
        Parameters
        ----------
        reason : str, optional
        """
        self.clear()

    def clear(self):
        """Clears fingerprints data."""
        self.server.delete(self.key)

    def log(self, request, spider):
        """Logs given request.
        Parameters
        ----------
        request : scrapy.http.Request
        spider : scrapy.spiders.Spider
        """
        if self.debug:
            msg = "Filtered duplicate request: %(request) s"
            self.logger.debug(msg, {'request': request}, extra={'spider': spider})
        elif self.logdupes:
            msg = ("Filtered duplicate request %(request) s"
                   "- no more duplicates will be shown"
                   "(see DUPEFILTER_DEBUG to show all duplicates)")
            self.logger.debug(msg, {'request': request}, extra={'spider': spider})
            self.logdupes = False
```

这里同样实现了一个 request\_seen() 方法，和 Scrapy 中的 request\_seen() 方法实现极其类似。不过这里集合使用的是 server 对象的 sadd() 操作，也就是集合不再是一个简单数据结构了，而是直接换成了数据库的存储方式。

鉴别重复的方式还是使用指纹，指纹同样是依靠 request\_fingerprint() 方法来获取的。获取指纹之后就直接向集合添加指纹，如果添加成功，说明这个指纹原本不存在于集合中，返回值 1。代码中最后的返回结果是判定添加结果是否为 0，如果刚才的返回值为 1，那这个判定结果就是 False，也就是不重复，否则判定为重复。

这样我们就成功利用 Redis 的集合完成了指纹的记录和重复的验证。

## 4. 调度器

Scrapy-Redis 还帮我们实现了配合 Queue、DupeFilter 使用的调度器 Scheduler，源文件名称是 scheduler.py。我们可以指定一些配置，如 SCHEDULER\_FLUSH\_ON\_START 即是否在爬取开始的时候清空爬取队列，SCHEDULER\_PERSIST 即是否在爬取结束后保持爬取队列不清除。我们可以在 settings.py 里自由配置，而此调度器很好地实现了对接。

接下来我们看看两个核心的存取方法，实现如下所示：

```python
def enqueue_request(self, request):
    if not request.dont_filter and self.df.request_seen(request):
        self.df.log(request, self.spider)
        return False
    if self.stats:
        self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
    self.queue.push(request)
    return True

def next_request(self):
    block_pop_timeout = self.idle_before_close
    request = self.queue.pop(block_pop_timeout)
    if request and self.stats:
        self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
    return request
```

enqueue\_request() 可以向队列中添加 Request，核心操作就是调用 Queue 的 push() 操作，还有一些统计和日志操作。next\_request() 就是从队列中取 Request，核心操作就是调用 Queue 的 pop() 操作，此时如果队列中还有 Request，则 Request 会直接取出来，爬取继续，否则如果队列为空，爬取则会重新开始。

## 5. 总结

那么到现在为止我们就把之前所说的三个分布式的问题解决了，总结如下：

* 爬取队列的实现，在这里提供了三种队列，使用了 Redis 的列表或有序集合来维护。
* 去重的实现，使用了 Redis 的集合来保存 Request 的指纹来提供重复过滤。
* 中断后重新爬取的实现，中断后 Redis 的队列没有清空，再次启动时调度器的 next\_request() 会从队列中取到下一个 Request，继续爬取。

## 6. 结语

以上内容便是 Scrapy-Redis 的核心源码解析。Scrapy-Redis 中还提供了 Spider、Item Pipeline 的实现，不过它们并不是必须使用。

在下一节，我们会将 Scrapy-Redis 集成到之前所实现的 Scrapy 新浪微博项目中，实现多台主机协同爬取。
