Python爬虫去重策略:增量爬取与历史数据比对 2周前

  1. 引言 在数据采集过程中,爬虫经常需要面对 重复数据 的问题。如果每次爬取都全量抓取,不仅浪费资源,还可能导致数据冗余。增量爬取(Incremental Crawling) 是一种高效策略,它仅抓取 新增或更新 的数据,而跳过已采集的旧数据。 本文将详细介绍 Python爬虫的增量爬取与历史数据比对 策略,涵盖以下内容:
  2. 增量爬取的核心思路
  3. 去重方案对比(数据库、文件、内存)
  4. 基于时间戳、哈希、数据库比对的实现方法
  5. 完整代码示例(Scrapy + MySQL 增量爬取)
  6. 增量爬取的核心思路 增量爬取的核心是 识别哪些数据是新的或已更新的,通常采用以下方式: ● 基于时间戳(Last-Modified / Update-Time) ● 基于内容哈希(MD5/SHA1) ● 基于数据库比对(MySQL/Redis/MongoDB)
  7. 1 基于时间戳的增量爬取 适用于数据源带有 发布时间(如新闻、博客) 的场景:
  8. 记录上次爬取的 最新时间戳
  9. 下次爬取时,只抓取 晚于该时间戳的数据 优点:简单高效,适用于结构化数据 缺点:依赖数据源的时间字段,不适用于无时间戳的网页
  10. 2 基于内容哈希的去重 适用于 内容可能更新但URL不变 的页面(如电商价格):
  11. 计算页面内容的 哈希值(如MD5)
  12. 比对哈希值,若变化则视为更新 优点:适用于动态内容 缺点:计算开销较大
  13. 3 基于数据库比对的增量爬取 适用于 大规模数据管理:
  14. 将已爬取的 URL 或关键字段 存入数据库(MySQL/Redis)
  15. 每次爬取前查询数据库,判断是否已存在 优点:支持分布式去重 缺点:需要额外存储
  16. 去重方案对比 方案 适用场景 优点 缺点 内存去重 单机小规模爬虫 速度快(set() ) 重启后数据丢失 文件存储 中小规模爬虫 简单(CSV/JSON) 性能较低 SQL数据库 结构化数据管理 支持复杂查询(MySQL) 需要数据库维护 NoSQL数据库 高并发分布式爬虫 高性能(Redis/MongoDB) 内存占用较高
  17. 增量爬取实现方法
  18. 1 基于时间戳的增量爬取(示例)
    from datetime import datetime
    

class NewsSpider(scrapy.Spider): name = "news_spider" last_crawl_time = None # 上次爬取的最新时间

def start_requests(self):
    # 从文件/DB加载上次爬取时间
    self.last_crawl_time = self.load_last_crawl_time()

    # 设置代理信息
    proxy = "http://www.16yun.cn:5445"
    proxy_auth = "16QMSOML:280651"

    # 添加代理到请求中
    yield scrapy.Request(
        url="https://news.example.com/latest",
        meta={
            'proxy': proxy,
            'proxy_user_pass': proxy_auth
        }
    )

def parse(self, response):
    # 检查响应状态码,判断是否成功获取数据
    if response.status != 200:
        self.logger.error(f"Failed to fetch data from {response.url}. Status code: {response.status}")
        self.logger.error("This might be due to network issues or an invalid URL. Please check the URL and try again.")
        return

    for article in response.css(".article"):
        pub_time = datetime.strptime(
            article.css(".time::text").get(), 
            "%Y-%m-%d %H:%M:%S"
        )
        if self.last_crawl_time and pub_time <= self.last_crawl_time:
            continue  # 跳过旧文章

        yield {
            "title": article.css("h2::text").get(),
            "time": pub_time,
        }

    # 更新最新爬取时间
    self.save_last_crawl_time(datetime.now())

def load_last_crawl_time(self):
    try:
        with open("last_crawl.txt", "r") as f:
            return datetime.strptime(f.read(), "%Y-%m-%d %H:%M:%S")
    except FileNotFoundError:
        return None

def save_last_crawl_time(self, time):
    with open("last_crawl.txt", "w") as f:
        f.write(time.strftime("%Y-%m-%d %H:%M:%S"))
4.2 基于内容哈希的去重(示例)
``` import hashlib

class ContentHashSpider(scrapy.Spider):
    name = "hash_spider"
    seen_hashes = set()  # 存储已爬取的哈希

    def parse(self, response):
        content = response.css("body").get()
        content_hash = hashlib.md5(content.encode()).hexdigest()

        if content_hash in self.seen_hashes:
            return  # 跳过重复内容

        self.seen_hashes.add(content_hash)
        yield {"url": response.url, "content": content}

4.3 基于MySQL的增量爬取(完整示例) (1)MySQL 表结构

id INT AUTO_INCREMENT PRIMARY KEY,
    url VARCHAR(255) UNIQUE,
    content_hash CHAR(32),
    last_updated TIMESTAMP
);

(2)Scrapy 爬虫代码

import hashlib
from scrapy import Spider, Request

class MySQLIncrementalSpider(Spider):
    name = "mysql_incremental"
    start_urls = ["https://example.com"]

    def __init__(self):
        self.conn = pymysql.connect(
            host="localhost",
            user="root",
            password="123456",
            db="crawler_db"
        )
        self.cursor = self.conn.cursor()

    def parse(self, response):
        url = response.url
        content = response.text
        content_hash = hashlib.md5(content.encode()).hexdigest()

        # 检查是否已爬取
        self.cursor.execute(
            "SELECT content_hash FROM crawled_data WHERE url=%s",
            (url,)
        )
        result = self.cursor.fetchone()

        if result and result[0] == content_hash:
            return  # 内容未更新

        # 插入或更新数据库
        self.cursor.execute(
            """INSERT INTO crawled_data (url, content_hash, last_updated)
               VALUES (%s, %s, NOW())
               ON DUPLICATE KEY UPDATE 
               content_hash=%s, last_updated=NOW()""",
            (url, content_hash, content_hash)
        )
        self.conn.commit()

        yield {"url": url, "content": content}

    def close(self, reason):
        self.cursor.close()
        self.conn.close()
  1. 结论 策略 适用场景 推荐存储方案 时间戳比对 新闻、博客等带时间的数据 文件/MySQL 内容哈希 动态内容(如商品价格) Redis/内存 数据库去重 结构化数据管理 MySQL/MongoDB 最佳实践: ● 小型爬虫 → 内存去重(set()) ● 中型爬虫 → 文件存储(JSON/CSV) ● 大型分布式爬虫 → Redis + MySQL 通过合理选择增量爬取策略,可以显著提升爬虫效率,减少资源浪费。
金馆长的哈哈哈
我之所以想变强,是为了活得轻松写意。
1
发布数
1
关注者
431
累计阅读

热门教程文档

Swift
54小节
Golang
23小节
MyBatis
19小节
Next
43小节
QT
33小节