编程语言
432
- 引言 在数据采集过程中,爬虫经常需要面对 重复数据 的问题。如果每次爬取都全量抓取,不仅浪费资源,还可能导致数据冗余。增量爬取(Incremental Crawling) 是一种高效策略,它仅抓取 新增或更新 的数据,而跳过已采集的旧数据。 本文将详细介绍 Python爬虫的增量爬取与历史数据比对 策略,涵盖以下内容:
- 增量爬取的核心思路
- 去重方案对比(数据库、文件、内存)
- 基于时间戳、哈希、数据库比对的实现方法
- 完整代码示例(Scrapy + MySQL 增量爬取)
- 增量爬取的核心思路 增量爬取的核心是 识别哪些数据是新的或已更新的,通常采用以下方式: ● 基于时间戳(Last-Modified / Update-Time) ● 基于内容哈希(MD5/SHA1) ● 基于数据库比对(MySQL/Redis/MongoDB)
- 1 基于时间戳的增量爬取 适用于数据源带有 发布时间(如新闻、博客) 的场景:
- 记录上次爬取的 最新时间戳
- 下次爬取时,只抓取 晚于该时间戳的数据 优点:简单高效,适用于结构化数据 缺点:依赖数据源的时间字段,不适用于无时间戳的网页
- 2 基于内容哈希的去重 适用于 内容可能更新但URL不变 的页面(如电商价格):
- 计算页面内容的 哈希值(如MD5)
- 比对哈希值,若变化则视为更新 优点:适用于动态内容 缺点:计算开销较大
- 3 基于数据库比对的增量爬取 适用于 大规模数据管理:
- 将已爬取的 URL 或关键字段 存入数据库(MySQL/Redis)
- 每次爬取前查询数据库,判断是否已存在 优点:支持分布式去重 缺点:需要额外存储
- 去重方案对比 方案 适用场景 优点 缺点 内存去重 单机小规模爬虫 速度快(set() ) 重启后数据丢失 文件存储 中小规模爬虫 简单(CSV/JSON) 性能较低 SQL数据库 结构化数据管理 支持复杂查询(MySQL) 需要数据库维护 NoSQL数据库 高并发分布式爬虫 高性能(Redis/MongoDB) 内存占用较高
- 增量爬取实现方法
- 1 基于时间戳的增量爬取(示例)
from datetime import datetime
class NewsSpider(scrapy.Spider): name = "news_spider" last_crawl_time = None # 上次爬取的最新时间
def start_requests(self): # 从文件/DB加载上次爬取时间 self.last_crawl_time = self.load_last_crawl_time() # 设置代理信息 proxy = "http://www.16yun.cn:5445" proxy_auth = "16QMSOML:280651" # 添加代理到请求中 yield scrapy.Request( url="https://news.example.com/latest", meta={ 'proxy': proxy, 'proxy_user_pass': proxy_auth } ) def parse(self, response): # 检查响应状态码,判断是否成功获取数据 if response.status != 200: self.logger.error(f"Failed to fetch data from {response.url}. Status code: {response.status}") self.logger.error("This might be due to network issues or an invalid URL. Please check the URL and try again.") return for article in response.css(".article"): pub_time = datetime.strptime( article.css(".time::text").get(), "%Y-%m-%d %H:%M:%S" ) if self.last_crawl_time and pub_time <= self.last_crawl_time: continue # 跳过旧文章 yield { "title": article.css("h2::text").get(), "time": pub_time, } # 更新最新爬取时间 self.save_last_crawl_time(datetime.now()) def load_last_crawl_time(self): try: with open("last_crawl.txt", "r") as f: return datetime.strptime(f.read(), "%Y-%m-%d %H:%M:%S") except FileNotFoundError: return None def save_last_crawl_time(self, time): with open("last_crawl.txt", "w") as f: f.write(time.strftime("%Y-%m-%d %H:%M:%S"))
4.2 基于内容哈希的去重(示例) ``` import hashlib class ContentHashSpider(scrapy.Spider): name = "hash_spider" seen_hashes = set() # 存储已爬取的哈希 def parse(self, response): content = response.css("body").get() content_hash = hashlib.md5(content.encode()).hexdigest() if content_hash in self.seen_hashes: return # 跳过重复内容 self.seen_hashes.add(content_hash) yield {"url": response.url, "content": content}
4.3 基于MySQL的增量爬取(完整示例) (1)MySQL 表结构
id INT AUTO_INCREMENT PRIMARY KEY, url VARCHAR(255) UNIQUE, content_hash CHAR(32), last_updated TIMESTAMP );
(2)Scrapy 爬虫代码
import hashlib from scrapy import Spider, Request class MySQLIncrementalSpider(Spider): name = "mysql_incremental" start_urls = ["https://example.com"] def __init__(self): self.conn = pymysql.connect( host="localhost", user="root", password="123456", db="crawler_db" ) self.cursor = self.conn.cursor() def parse(self, response): url = response.url content = response.text content_hash = hashlib.md5(content.encode()).hexdigest() # 检查是否已爬取 self.cursor.execute( "SELECT content_hash FROM crawled_data WHERE url=%s", (url,) ) result = self.cursor.fetchone() if result and result[0] == content_hash: return # 内容未更新 # 插入或更新数据库 self.cursor.execute( """INSERT INTO crawled_data (url, content_hash, last_updated) VALUES (%s, %s, NOW()) ON DUPLICATE KEY UPDATE content_hash=%s, last_updated=NOW()""", (url, content_hash, content_hash) ) self.conn.commit() yield {"url": url, "content": content} def close(self, reason): self.cursor.close() self.conn.close()
- 结论 策略 适用场景 推荐存储方案 时间戳比对 新闻、博客等带时间的数据 文件/MySQL 内容哈希 动态内容(如商品价格) Redis/内存 数据库去重 结构化数据管理 MySQL/MongoDB 最佳实践: ● 小型爬虫 → 内存去重(set()) ● 中型爬虫 → 文件存储(JSON/CSV) ● 大型分布式爬虫 → Redis + MySQL 通过合理选择增量爬取策略,可以显著提升爬虫效率,减少资源浪费。