You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

优化Puppeteer+Node.js定时爬虫:解决缩短周期引发的内存泄漏问题

看起来你遇到的核心问题是每次定时任务触发都重复创建Puppeteer Cluster,加上任务可能重叠执行,导致EventEmitter监听堆积、资源耗尽,最终出现警告和任务排队的情况。结合你的代码,我整理了几个关键优化点,帮你实现1-10秒的执行周期:

1. 复用Puppeteer Cluster,避免重复初始化

你现在每次调用getMarketData都会创建一个新的Cluster,这不仅浪费资源,还会重复绑定大量事件监听,直接导致MaxListenersExceededWarning。我们把Cluster的初始化移到定时任务外面,只创建一次:

// 把Cluster初始化移到全局,只执行一次
let cluster;

async function initCluster() {
  cluster = await Cluster.launch({
    concurrency: Cluster.CONCURRENCY_PAGE,
    maxConcurrency: 2,
    reusePages: true, // 开启页面复用,减少页面创建开销
    puppeteerOptions: {
      headless: "new", // 使用新版无头模式,性能更好
      args: [
        "--no-sandbox",
        "--disable-setuid-sandbox",
        "--disable-dev-shm-usage", // 解决内存不足问题
        "--disable-gpu"
      ]
    }
  });

  // 在这里一次性注册task逻辑,不用每次重复定义
  await cluster.task(async ({ page, data: { url, market } }) => {
    // 提前设置请求拦截,确保在goto之前生效
    await page.setRequestInterception(true);
    page.once('request', request => { // 使用once代替on,避免重复绑定
      if (request.resourceType() === 'document') {
        request.continue();
      } else {
        request.abort();
      }
    });

    console.log(`Loading ${market.name} data...`);
    await page.goto(url, { waitUntil: 'domcontentloaded', timeout: 10000 }); // 添加超时,防止任务卡住

    const html = await page.content();
    $("table[class='genTbl closedTbl crossRatesTbl elpTbl elp30'] > tbody > tr", html).each((i, elem) => {
      market.companies.push({
        name: $("td[class='bold left noWrap elp plusIconTd'] > a", html).eq(i).html(),
        last: $("td", elem).eq(2).text(),
        high: $("td", elem).eq(3).text(),
        low: $("td", elem).eq(4).text(),
        change: $("td", elem).eq(5).text(),
        changePerCent: $("td", elem).eq(6).text(),
        volume: $("td", elem).eq(7).text(),
        time: $("td", elem).eq(8).text(),
        purchase: false,
        sale: false
      });
    });
  });
}

// 初始化Cluster
initCluster().catch(err => console.error("Cluster初始化失败:", err));

2. 控制任务并发,避免定时任务重叠

定时任务每10秒触发一次,但如果上一次爬取还没完成,新的任务又会启动,导致多个Cluster/页面同时运行,资源被耗尽。我们添加一个isRunning标志位,确保同一时间只有一个爬取任务在执行:

let isRunning = false; // 标记当前是否有任务在执行

getMarketData = async () => {
  if (isRunning) {
    console.log("上一次爬取任务还未完成,跳过本次触发");
    return;
  }
  isRunning = true;

  try {
    console.log("开始获取市场数据...");
    let markets = [];
    let marketSpain = { country: 'Spain', name: 'IBEX 35', companies: [] };
    let marketGermany = { country: 'Germany', name: 'DAX', companies: [] };

    // 把市场数据对象和URL一起传入任务
    await Promise.all([
      cluster.queue({ url: 'https://uk.investing.com/equities/spain', market: marketSpain }),
      cluster.queue({ url: 'https://uk.investing.com/equities/germany', market: marketGermany })
    ]);
    await cluster.idle(); // 等待所有任务完成

    // 存储数据,改用async/await替代回调
    await MarketModel.create({ markets });
    console.log("数据存储完成!");
    markets = []; // 清空数组,避免下次复用旧数据
  } catch (err) {
    console.error("爬取或存储失败:", err);
  } finally {
    isRunning = false; // 任务完成后重置标志位
  }
}

3. 优化定时任务逻辑,减少不必要的判断

原来的定时任务表达式是*/10 * 8-17 * * 1-5,已经限制了工作日的8-17点,里面的额外时间判断可以简化,避免重复逻辑:

// 直接用定时表达式控制时间范围,去掉内部的判断
var j = schedule.scheduleJob('*/10 * 8-17 * * 1-5', function () {
  const now = new Date();
  // 只保留8:30前和17:35后的过滤
  if ((now.getHours() === 8 && now.getMinutes() < 30) || (now.getHours() === 17 && now.getMinutes() > 35)) {
    console.log("不在交易时间范围内,跳过本次爬取");
    return;
  }
  getMarketData();
});

4. 其他细节优化

  • 使用page.once('request')代替page.on('request'),确保每个页面只绑定一次请求拦截事件,避免重复添加监听导致内存泄漏。
  • page.goto添加timeout参数,防止页面加载超时卡住整个任务。
  • finally块中重置isRunning标志位,确保即使任务出错也能释放标志位,不影响下一次任务。

修改后的完整代码

const $ = require('cheerio');
const MarketModel = require('./models/marketModel');
const mongoose = require('mongoose');
const puppeteer = require('puppeteer');
var schedule = require('node-schedule');
const { Cluster } = require('puppeteer-cluster');

// 数据库连接
mongoose.connect('mongodb://localhost:27017/Tradheo', { useNewUrlParser: true });
mongoose.connection.on('error', error => console.log(error));
mongoose.Promise = global.Promise;

let cluster;
let isRunning = false;

// 初始化Puppeteer Cluster,只执行一次
async function initCluster() {
  try {
    cluster = await Cluster.launch({
      concurrency: Cluster.CONCURRENCY_PAGE,
      maxConcurrency: 2,
      reusePages: true,
      puppeteerOptions: {
        headless: "new",
        args: [
          "--no-sandbox",
          "--disable-setuid-sandbox",
          "--disable-dev-shm-usage",
          "--disable-gpu",
          "--fast-start"
        ]
      }
    });

    // 注册任务处理逻辑
    await cluster.task(async ({ page, data: { url, market } }) => {
      await page.setRequestInterception(true);
      // 使用once避免重复绑定事件
      page.once('request', request => {
        if (request.resourceType() === 'document') {
          request.continue();
        } else {
          request.abort();
        }
      });

      await page.goto(url, { waitUntil: 'domcontentloaded', timeout: 10000 });
      const html = await page.content();

      market.companies = []; // 清空旧数据,避免累积
      $("table[class='genTbl closedTbl crossRatesTbl elpTbl elp30'] > tbody > tr", html).each((i, elem) => {
        market.companies.push({
          name: $("td[class='bold left noWrap elp plusIconTd'] > a", html).eq(i).html(),
          last: $("td", elem).eq(2).text(),
          high: $("td", elem).eq(3).text(),
          low: $("td", elem).eq(4).text(),
          change: $("td", elem).eq(5).text(),
          changePerCent: $("td", elem).eq(6).text(),
          volume: $("td", elem).eq(7).text(),
          time: $("td", elem).eq(8).text(),
          purchase: false,
          sale: false
        });
      });
    });

    console.log("Cluster初始化完成");
  } catch (err) {
    console.error("Cluster初始化失败:", err);
    process.exit(1); // 初始化失败退出进程,避免无效运行
  }
}

// 爬取并存储数据的核心函数
const getMarketData = async () => {
  if (isRunning) {
    console.log("上一次任务未完成,跳过本次触发");
    return;
  }
  isRunning = true;

  try {
    console.log("开始执行爬取任务...");
    const marketSpain = { country: 'Spain', name: 'IBEX 35', companies: [] };
    const marketGermany = { country: 'Germany', name: 'DAX', companies: [] };

    // 并行执行两个爬取任务
    await Promise.all([
      cluster.queue({ url: 'https://uk.investing.com/equities/spain', market: marketSpain }),
      cluster.queue({ url: 'https://uk.investing.com/equities/germany', market: marketGermany })
    ]);
    await cluster.idle();

    // 存储数据到MongoDB
    await MarketModel.create({ markets: [marketSpain, marketGermany] });
    console.log("数据存储成功,任务完成");
  } catch (err) {
    console.error("任务执行失败:", err);
  } finally {
    isRunning = false;
  }
}

// 初始化Cluster
initCluster();

// 定时任务配置
var j = schedule.scheduleJob('*/10 * 8-17 * * 1-5', function () {
  const now = new Date();
  // 过滤8:30前和17:35后的时间
  if ((now.getHours() === 8 && now.getMinutes() < 30) || (now.getHours() === 17 && now.getMinutes() > 35)) {
    console.log("当前不在交易时间窗口内,跳过");
    return;
  }
  getMarketData();
});

// 进程退出时关闭Cluster,释放资源
process.on('exit', async () => {
  if (cluster) {
    await cluster.close();
  }
});

为什么这些修改能解决你的问题?

  1. 复用Cluster:避免了每次任务都创建新的浏览器实例,减少了资源开销和事件监听的重复添加,从根源上解决MaxListenersExceededWarning
  2. 任务并发控制isRunning标志位确保同一时间只有一个爬取任务在执行,避免任务堆积和资源耗尽。
  3. 优化页面加载:提前设置请求拦截、使用新版无头模式、添加超时,大幅缩短页面加载时间,让任务能在10秒内完成,甚至更短。
  4. 异步流程优化:改用async/await替代回调,让代码逻辑更清晰,避免异步操作导致的逻辑混乱。

内容的提问来源于stack exchange,提问作者Ray

火山引擎 最新活动