You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

使用urllib3/requests结合ThreadPoolExecutor遍历URL数组时,如何遵守各站点的不同crawl-delay?

使用urllib3/requests结合ThreadPoolExecutor遍历URL数组时,如何遵守各站点的不同crawl-delay?

嘿,这个场景我太熟悉了!用ThreadPoolExecutor做多线程爬虫时,要给不同站点遵守不同的爬取延迟,核心就是给每个站点单独管控请求间隔,不能让同一站点的请求挤在一起。下面我给你一步步拆解实现思路和代码:

核心思路

  • 首先把URL按站点(域名)分组,同一站点的URL共享同一个crawl-delay规则
  • 给每个站点维护两个关键信息:最后一次请求的时间戳,以及一个线程锁(保证同一站点的请求串行处理,避免并发突破延迟限制)
  • 每次发起请求前,先检查当前时间和该站点上一次请求的间隔,不够延迟时间就等够了再发请求

具体代码实现

首先导入需要的模块:

import time
import threading
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urlparse
import requests  # 用urllib3的话逻辑完全一致,只是请求写法不同

然后定义一个专门管理站点延迟的类,这个类会帮我们处理每个站点的等待逻辑:

class SiteRateLimiter:
    def __init__(self):
        # 存储每个站点的最后请求时间,键是域名,值是时间戳
        self.last_request_times = {}
        # 存储每个站点的线程锁,确保同一站点的请求串行执行
        self.locks = {}
    
    def wait_for_delay(self, domain, crawl_delay):
        # 如果是第一次处理这个站点,初始化时间和锁
        if domain not in self.last_request_times:
            self.last_request_times[domain] = 0
            self.locks[domain] = threading.Lock()
        
        # 加锁,保证同一站点的请求不会同时进入等待逻辑
        with self.locks[domain]:
            current_time = time.time()
            time_since_last_request = current_time - self.last_request_times[domain]
            
            # 如果距离上一次请求的时间不够延迟要求,就等待剩余时间
            if time_since_last_request < crawl_delay:
                time.sleep(crawl_delay - time_since_last_request)
            
            # 更新该站点的最后请求时间为当前时间
            self.last_request_times[domain] = time.time()

接下来写处理单个URL的函数,这个函数会先调用限流器等待,再发起请求:

def fetch_url(url, rate_limiter, crawl_delay_map):
    # 解析URL拿到域名,用来匹配对应的延迟规则
    parsed_url = urlparse(url)
    domain = parsed_url.netloc
    # 获取当前站点的延迟,默认0(无延迟)
    crawl_delay = crawl_delay_map.get(domain, 0)
    
    # 先遵守延迟规则,等够时间再发请求
    rate_limiter.wait_for_delay(domain, crawl_delay)
    
    # 发起请求(换成urllib3的写法也很简单)
    try:
        response = requests.get(url, timeout=10)
        print(f"✅ 成功抓取 {url},状态码:{response.status_code}")
        # 这里可以添加响应内容的处理逻辑,比如解析HTML、存数据等
        return response.text
    except Exception as e:
        print(f"❌ 抓取 {url} 失败:{str(e)}")
        return None

最后是主程序,把所有部分串起来:

if __name__ == "__main__":
    # 你的URL列表
    urls = [
        "https://news.ycombinator.com/",
        "https://news.ycombinator.com/?p=2",
        "https://news.ycombinator.com/?p=3",
        "https://www.infoworld.com/",
        "https://www.theregister.com",
    ]
    # 定义每个站点的crawl-delay映射
    crawl_delay_map = {
        "news.ycombinator.com": 30,
        "www.theregister.com": 5,
        # infoworld没有延迟,会用默认的0
    }
    
    # 初始化站点限流器
    rate_limiter = SiteRateLimiter()
    
    # 启动线程池,线程数可以根据你的需求调整(别太大,避免被封)
    with ThreadPoolExecutor(max_workers=3) as executor:
        # 把所有URL任务提交到线程池
        futures = [executor.submit(fetch_url, url, rate_limiter, crawl_delay_map) for url in urls]
        
        # 可选:等待所有任务完成并处理结果
        for future in futures:
            result = future.result()
            # 这里可以对每个请求的结果做后续处理

额外注意点

  • 如果要支持从robots.txt自动获取crawl-delay,可以用robotexclusionrulesparser这类库提前解析每个站点的robots规则,自动填充crawl_delay_map
  • 线程数不要设置得过高,不然即使有延迟,多站点并发请求也可能触发目标站点的反爬机制
  • 如果用urllib3代替requests,只需要把请求部分换成urllib3的写法:
    import urllib3
    http = urllib3.PoolManager()
    response = http.request('GET', url)
    

备注:内容来源于stack exchange,提问作者Extect

火山引擎 最新活动