那么接下来就需要定义一些子类来继承 Base 类,并重写这几个方法,那在源码中就有三个子类的实现,它们分别是 FifoQueue、PriorityQueue、LifoQueue,我们分别来看下它们的实现原理。
首先是 FifoQueue:
classFifoQueue(Base):"""Per-spider FIFO queue"""def__len__(self):"""Return the length of the queue"""return self.server.llen(self.key)defpush(self,request):"""Push a request""" self.server.lpush(self.key, self._encode_request(request))defpop(self,timeout=0):"""Pop a request"""if timeout >0: data = self.server.brpop(self.key, timeout)ifisinstance(data, tuple): data = data[1]else: data = self.server.rpop(self.key)if data:return self._decode_request(data)
可以看到这个类继承了 Base 类,并重写了 __len__()、push()、pop() 这三个方法,在这三个方法中都是对 server 对象的操作,而 server 对象就是一个 Redis 连接对象,我们可以直接调用其操作 Redis 的方法对数据库进行操作,可以看到这里的操作方法有 llen()、lpush()、rpop() 等,那这就代表此爬取队列是使用的 Redis 的列表,序列化后的 Request 会被存入列表中,就是列表的其中一个元素,__len__() 方法是获取列表的长度,push() 方法中调用了 lpush() 操作,这代表从列表左侧存入数据,pop() 方法中调用了 rpop() 操作,这代表从列表右侧取出数据。
所以 Request 在列表中的存取顺序是左侧进、右侧出,所以这是有序的进出,即先进先出,英文叫做 First Input First Output,也被简称作 Fifo,而此类的名称就叫做 FifoQueue。
另外还有一个与之相反的实现类,叫做 LifoQueue,实现如下:
classLifoQueue(Base):"""Per-spider LIFO queue."""def__len__(self):"""Return the length of the stack"""return self.server.llen(self.key)defpush(self,request):"""Push a request""" self.server.lpush(self.key, self._encode_request(request))defpop(self,timeout=0):"""Pop a request"""if timeout >0: data = self.server.blpop(self.key, timeout)ifisinstance(data, tuple): data = data[1]else: data = self.server.lpop(self.key)if data:return self._decode_request(data)
与 FifoQueue 不同的就是它的 pop() 方法,在这里使用的是 lpop() 操作,也就是从左侧出,而 push() 方法依然是使用的 lpush() 操作,是从左侧入。那么这样达到的效果就是先进后出、后进先出,英文叫做 Last In First Out,简称为 Lifo,而此类名称就叫做 LifoQueue。同时这个存取方式类似栈的操作,所以其实也可以称作 StackQueue。
classPriorityQueue(Base):"""Per-spider priority queue abstraction using redis' sorted set"""def__len__(self):"""Return the length of the queue"""return self.server.zcard(self.key)defpush(self,request):"""Push a request""" data = self._encode_request(request) score =-request.priority self.server.execute_command('ZADD', self.key, score, data)defpop(self,timeout=0):""" Pop a request timeout not support in this queue class """ pipe = self.server.pipeline() pipe.multi() pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0) results, count = pipe.execute()if results:return self._decode_request(results[0])
在这里我们可以看到 __len__()、push()、pop() 方法中使用了 server 对象的 zcard()、zadd()、zrange() 操作,可以知道这里使用的存储结果是有序集合 Sorted Set,在这个集合中每个元素都可以设置一个分数,那么这个分数就代表优先级。
classRFPDupeFilter(BaseDupeFilter):"""Redis-based request duplicates filter. This class can also be used with default Scrapy's scheduler. """ logger = loggerdef__init__(self,server,key,debug=False):"""Initialize the duplicates filter. Parameters ---------- server : redis.StrictRedis The redis server instance. key : str Redis key Where to store fingerprints. debug : bool, optional Whether to log filtered requests. """ self.server = server self.key = key self.debug = debug self.logdupes =True@classmethoddeffrom_settings(cls,settings):"""Returns an instance from given settings. This uses by default the key ``dupefilter:<timestamp>``. When using the ``scrapy_redis.scheduler.Scheduler`` class, this method is not used as it needs to pass the spider name in the key. Parameters ---------- settings : scrapy.settings.Settings Returns ------- RFPDupeFilter A RFPDupeFilter instance. """ server =get_redis_from_settings(settings) key = defaults.DUPEFILTER_KEY %{'timestamp':int(time.time())} debug = settings.getbool('DUPEFILTER_DEBUG')returncls(server, key=key, debug=debug)@classmethoddeffrom_crawler(cls,crawler):"""Returns instance from crawler. Parameters ---------- crawler : scrapy.crawler.Crawler Returns ------- RFPDupeFilter Instance of RFPDupeFilter. """return cls.from_settings(crawler.settings)defrequest_seen(self,request):"""Returns True if request was already seen. Parameters ---------- request : scrapy.http.Request Returns ------- bool """ fp = self.request_fingerprint(request) added = self.server.sadd(self.key, fp)return added ==0defrequest_fingerprint(self,request):"""Returns a fingerprint for a given request. Parameters ---------- request : scrapy.http.Request Returns ------- str """returnrequest_fingerprint(request)defclose(self,reason=''):"""Delete data on close. Called by Scrapy's scheduler. Parameters ---------- reason : str, optional """ self.clear()defclear(self):"""Clears fingerprints data.""" self.server.delete(self.key)deflog(self,request,spider):"""Logs given request. Parameters ---------- request : scrapy.http.Request spider : scrapy.spiders.Spider """if self.debug: msg ="Filtered duplicate request: %(request) s" self.logger.debug(msg, {'request': request}, extra={'spider': spider})elif self.logdupes: msg = ("Filtered duplicate request %(request) s""- no more duplicates will be shown""(see DUPEFILTER_DEBUG to show all duplicates)") self.logger.debug(msg, {'request': request}, extra={'spider': spider}) self.logdupes =False