优化Puppeteer+Node.js定时爬虫:解决缩短周期引发的内存泄漏问题
看起来你遇到的核心问题是每次定时任务触发都重复创建Puppeteer Cluster,加上任务可能重叠执行,导致EventEmitter监听堆积、资源耗尽,最终出现警告和任务排队的情况。结合你的代码,我整理了几个关键优化点,帮你实现1-10秒的执行周期:
1. 复用Puppeteer Cluster,避免重复初始化
你现在每次调用getMarketData都会创建一个新的Cluster,这不仅浪费资源,还会重复绑定大量事件监听,直接导致MaxListenersExceededWarning。我们把Cluster的初始化移到定时任务外面,只创建一次:
// 把Cluster初始化移到全局,只执行一次 let cluster; async function initCluster() { cluster = await Cluster.launch({ concurrency: Cluster.CONCURRENCY_PAGE, maxConcurrency: 2, reusePages: true, // 开启页面复用,减少页面创建开销 puppeteerOptions: { headless: "new", // 使用新版无头模式,性能更好 args: [ "--no-sandbox", "--disable-setuid-sandbox", "--disable-dev-shm-usage", // 解决内存不足问题 "--disable-gpu" ] } }); // 在这里一次性注册task逻辑,不用每次重复定义 await cluster.task(async ({ page, data: { url, market } }) => { // 提前设置请求拦截,确保在goto之前生效 await page.setRequestInterception(true); page.once('request', request => { // 使用once代替on,避免重复绑定 if (request.resourceType() === 'document') { request.continue(); } else { request.abort(); } }); console.log(`Loading ${market.name} data...`); await page.goto(url, { waitUntil: 'domcontentloaded', timeout: 10000 }); // 添加超时,防止任务卡住 const html = await page.content(); $("table[class='genTbl closedTbl crossRatesTbl elpTbl elp30'] > tbody > tr", html).each((i, elem) => { market.companies.push({ name: $("td[class='bold left noWrap elp plusIconTd'] > a", html).eq(i).html(), last: $("td", elem).eq(2).text(), high: $("td", elem).eq(3).text(), low: $("td", elem).eq(4).text(), change: $("td", elem).eq(5).text(), changePerCent: $("td", elem).eq(6).text(), volume: $("td", elem).eq(7).text(), time: $("td", elem).eq(8).text(), purchase: false, sale: false }); }); }); } // 初始化Cluster initCluster().catch(err => console.error("Cluster初始化失败:", err));
2. 控制任务并发,避免定时任务重叠
定时任务每10秒触发一次,但如果上一次爬取还没完成,新的任务又会启动,导致多个Cluster/页面同时运行,资源被耗尽。我们添加一个isRunning标志位,确保同一时间只有一个爬取任务在执行:
let isRunning = false; // 标记当前是否有任务在执行 getMarketData = async () => { if (isRunning) { console.log("上一次爬取任务还未完成,跳过本次触发"); return; } isRunning = true; try { console.log("开始获取市场数据..."); let markets = []; let marketSpain = { country: 'Spain', name: 'IBEX 35', companies: [] }; let marketGermany = { country: 'Germany', name: 'DAX', companies: [] }; // 把市场数据对象和URL一起传入任务 await Promise.all([ cluster.queue({ url: 'https://uk.investing.com/equities/spain', market: marketSpain }), cluster.queue({ url: 'https://uk.investing.com/equities/germany', market: marketGermany }) ]); await cluster.idle(); // 等待所有任务完成 // 存储数据,改用async/await替代回调 await MarketModel.create({ markets }); console.log("数据存储完成!"); markets = []; // 清空数组,避免下次复用旧数据 } catch (err) { console.error("爬取或存储失败:", err); } finally { isRunning = false; // 任务完成后重置标志位 } }
3. 优化定时任务逻辑,减少不必要的判断
原来的定时任务表达式是*/10 * 8-17 * * 1-5,已经限制了工作日的8-17点,里面的额外时间判断可以简化,避免重复逻辑:
// 直接用定时表达式控制时间范围,去掉内部的判断 var j = schedule.scheduleJob('*/10 * 8-17 * * 1-5', function () { const now = new Date(); // 只保留8:30前和17:35后的过滤 if ((now.getHours() === 8 && now.getMinutes() < 30) || (now.getHours() === 17 && now.getMinutes() > 35)) { console.log("不在交易时间范围内,跳过本次爬取"); return; } getMarketData(); });
4. 其他细节优化
- 使用
page.once('request')代替page.on('request'),确保每个页面只绑定一次请求拦截事件,避免重复添加监听导致内存泄漏。 - 给
page.goto添加timeout参数,防止页面加载超时卡住整个任务。 - 在
finally块中重置isRunning标志位,确保即使任务出错也能释放标志位,不影响下一次任务。
修改后的完整代码
const $ = require('cheerio'); const MarketModel = require('./models/marketModel'); const mongoose = require('mongoose'); const puppeteer = require('puppeteer'); var schedule = require('node-schedule'); const { Cluster } = require('puppeteer-cluster'); // 数据库连接 mongoose.connect('mongodb://localhost:27017/Tradheo', { useNewUrlParser: true }); mongoose.connection.on('error', error => console.log(error)); mongoose.Promise = global.Promise; let cluster; let isRunning = false; // 初始化Puppeteer Cluster,只执行一次 async function initCluster() { try { cluster = await Cluster.launch({ concurrency: Cluster.CONCURRENCY_PAGE, maxConcurrency: 2, reusePages: true, puppeteerOptions: { headless: "new", args: [ "--no-sandbox", "--disable-setuid-sandbox", "--disable-dev-shm-usage", "--disable-gpu", "--fast-start" ] } }); // 注册任务处理逻辑 await cluster.task(async ({ page, data: { url, market } }) => { await page.setRequestInterception(true); // 使用once避免重复绑定事件 page.once('request', request => { if (request.resourceType() === 'document') { request.continue(); } else { request.abort(); } }); await page.goto(url, { waitUntil: 'domcontentloaded', timeout: 10000 }); const html = await page.content(); market.companies = []; // 清空旧数据,避免累积 $("table[class='genTbl closedTbl crossRatesTbl elpTbl elp30'] > tbody > tr", html).each((i, elem) => { market.companies.push({ name: $("td[class='bold left noWrap elp plusIconTd'] > a", html).eq(i).html(), last: $("td", elem).eq(2).text(), high: $("td", elem).eq(3).text(), low: $("td", elem).eq(4).text(), change: $("td", elem).eq(5).text(), changePerCent: $("td", elem).eq(6).text(), volume: $("td", elem).eq(7).text(), time: $("td", elem).eq(8).text(), purchase: false, sale: false }); }); }); console.log("Cluster初始化完成"); } catch (err) { console.error("Cluster初始化失败:", err); process.exit(1); // 初始化失败退出进程,避免无效运行 } } // 爬取并存储数据的核心函数 const getMarketData = async () => { if (isRunning) { console.log("上一次任务未完成,跳过本次触发"); return; } isRunning = true; try { console.log("开始执行爬取任务..."); const marketSpain = { country: 'Spain', name: 'IBEX 35', companies: [] }; const marketGermany = { country: 'Germany', name: 'DAX', companies: [] }; // 并行执行两个爬取任务 await Promise.all([ cluster.queue({ url: 'https://uk.investing.com/equities/spain', market: marketSpain }), cluster.queue({ url: 'https://uk.investing.com/equities/germany', market: marketGermany }) ]); await cluster.idle(); // 存储数据到MongoDB await MarketModel.create({ markets: [marketSpain, marketGermany] }); console.log("数据存储成功,任务完成"); } catch (err) { console.error("任务执行失败:", err); } finally { isRunning = false; } } // 初始化Cluster initCluster(); // 定时任务配置 var j = schedule.scheduleJob('*/10 * 8-17 * * 1-5', function () { const now = new Date(); // 过滤8:30前和17:35后的时间 if ((now.getHours() === 8 && now.getMinutes() < 30) || (now.getHours() === 17 && now.getMinutes() > 35)) { console.log("当前不在交易时间窗口内,跳过"); return; } getMarketData(); }); // 进程退出时关闭Cluster,释放资源 process.on('exit', async () => { if (cluster) { await cluster.close(); } });
为什么这些修改能解决你的问题?
- 复用Cluster:避免了每次任务都创建新的浏览器实例,减少了资源开销和事件监听的重复添加,从根源上解决
MaxListenersExceededWarning。 - 任务并发控制:
isRunning标志位确保同一时间只有一个爬取任务在执行,避免任务堆积和资源耗尽。 - 优化页面加载:提前设置请求拦截、使用新版无头模式、添加超时,大幅缩短页面加载时间,让任务能在10秒内完成,甚至更短。
- 异步流程优化:改用
async/await替代回调,让代码逻辑更清晰,避免异步操作导致的逻辑混乱。
内容的提问来源于stack exchange,提问作者Ray




