You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Socket.IO直播服务客户端异常崩溃后僵尸流状态的可靠清理方案咨询

Socket.IO直播服务客户端异常崩溃后僵尸流状态的可靠清理方案咨询

这个僵尸流的问题我太熟了!之前做互动直播SaaS服务的时候,刚上线就遇到用户强杀APP后再也开不了播的情况,折腾了好几天才把方案打磨稳定,给你分享几个生产环境验证过的靠谱思路:

方案一:自定义心跳+内存定时器(单实例场景)

如果你的后端是单进程单实例部署,用内存存储活跃流状态+自定义心跳定时器是最直接的方案。核心思路是:给每个开播用户绑定一个超时定时器,客户端每隔一段时间主动发心跳,后端收到就重置定时器;如果超时没收到心跳,就判定为僵尸流并执行清理。

核心代码示例

// 用Map存用户的活跃直播状态和定时器
const activeStreams = new Map(); // key: userId, value: { streamId, timerId, socketId }

io.on('connection', (socket) => {
  // 处理用户开播请求
  socket.on('startStream', async (userId, streamId) => {
    // 先检查是否已有活跃流
    if (activeStreams.has(userId)) {
      socket.emit('error', 'You are already streaming');
      return;
    }

    // 启动60秒超时清理定时器
    const timerId = setTimeout(async () => {
      await cleanupStream(userId, streamId);
      activeStreams.delete(userId);
      console.log(`清理了用户${userId}的僵尸流`);
    }, 60000);

    // 记录活跃流状态
    activeStreams.set(userId, {
      streamId,
      timerId,
      socketId: socket.id
    });

    // 监听客户端自定义心跳,收到就重置定时器
    socket.on('streamHeartbeat', () => {
      clearTimeout(timerId);
      const newTimerId = setTimeout(async () => {
        await cleanupStream(userId, streamId);
        activeStreams.delete(userId);
      }, 60000);
      // 更新Map里的定时器ID
      const streamData = activeStreams.get(userId);
      streamData.timerId = newTimerId;
      activeStreams.set(userId, streamData);
    });

    // 正常断开连接时的清理
    socket.on('disconnect', async () => {
      const streamData = activeStreams.get(userId);
      if (streamData && streamData.socketId === socket.id) {
        clearTimeout(streamData.timerId);
        await cleanupStream(userId, streamId);
        activeStreams.delete(userId);
      }
    });
  });
});

// 通用的流清理逻辑(要保证幂等性)
async function cleanupStream(userId, streamId) {
  // 这里写你的业务操作:更新数据库流状态、通知观众流结束等
  await db.query('UPDATE streams SET status = "ended" WHERE user_id = ? AND id = ? AND status = "active"', 
    [userId, streamId]);
  io.emit('streamEnded', streamId);
}

注意点

  • 客户端的心跳间隔要比超时时间短,比如超时设60秒,心跳就设40秒,避免网络波动导致误判
  • 清理函数一定要做幂等性校验(比如数据库更新加status = "active"条件),防止重复执行造成异常

方案二:调整Socket.IO原生心跳参数+断开事件兜底

Socket.IO本身就有内置的ping/pong心跳机制,默认pingInterval是25秒,pingTimeout是60秒。你可以把这两个参数调得更激进一点,让后端更快检测到客户端断开,再结合disconnect事件做清理。

核心配置与代码

// 启动Socket.IO时调整心跳参数
const io = require('socket.io')(server, {
  pingInterval: 20000, // 每20秒发一次ping包
  pingTimeout: 30000,  // 30秒没收到pong就判定连接断开
});

// 活跃流存储还是用上面的Map
const activeStreams = new Map();

io.on('connection', (socket) => {
  socket.on('startStream', async (userId, streamId) => {
    if (activeStreams.has(userId)) {
      socket.emit('error', 'You are already streaming');
      return;
    }

    activeStreams.set(userId, {
      streamId,
      socketId: socket.id
    });
  });

  // 利用原生disconnect事件做清理
  socket.on('disconnect', async () => {
    // 遍历找到当前socket对应的用户(如果用户多的话,建议存反向映射:socketId -> userId)
    for (const [userId, data] of activeStreams.entries()) {
      if (data.socketId === socket.id) {
        await cleanupStream(userId, data.streamId);
        activeStreams.delete(userId);
        break;
      }
    }
  });
});

注意点

  • 这个方案的缺点是:如果客户端是瞬间崩溃,原生disconnect事件可能会延迟触发(取决于心跳超时时间),但胜在不用额外开发客户端心跳逻辑
  • 单用户多socket连接的场景下,要存socketId -> userId的反向映射,避免遍历Map影响性能

方案三:Redis过期键+分布式状态管理(多实例/集群场景)

如果你的后端是多进程或者多机器集群部署,内存Map就不适用了,这时候用Redis的键过期机制来做状态管理是最优解:用户开播时在Redis存一个带过期时间的键,客户端发心跳就刷新过期时间;键过期时,后端自动执行清理逻辑。

核心代码示例

const redis = require('redis');
const redisClient = redis.createClient();
const redisSubscriber = redis.createClient();

// 订阅Redis的键过期事件
redisSubscriber.configSet('notify-keyspace-events', 'Ex');
redisSubscriber.subscribe('__keyevent@0__:expired');

// 监听过期事件,触发清理
redisSubscriber.on('message', async (channel, key) => {
  if (key.startsWith('active_stream:')) {
    const userId = key.split(':')[1];
    // 先获取对应的流ID
    const streamId = await redisClient.get(`stream_id:${userId}`);
    if (streamId) {
      await cleanupStream(userId, streamId);
      await redisClient.del(`stream_id:${userId}`);
    }
  }
});

io.on('connection', (socket) => {
  socket.on('startStream', async (userId, streamId) => {
    // 检查Redis中是否已有活跃流
    const exists = await redisClient.exists(`active_stream:${userId}`);
    if (exists) {
      socket.emit('error', 'You are already streaming');
      return;
    }

    // 存入Redis,设置60秒过期
    await redisClient.setEx(`active_stream:${userId}`, 60, '1');
    await redisClient.set(`stream_id:${userId}`, streamId);

    // 处理客户端心跳,刷新过期时间
    socket.on('streamHeartbeat', async () => {
      await redisClient.expire(`active_stream:${userId}`, 60);
    });

    // 正常断开时清理Redis键
    socket.on('disconnect', async () => {
      await redisClient.del(`active_stream:${userId}`);
      await redisClient.del(`stream_id:${userId}`);
      const streamId = await redisClient.get(`stream_id:${userId}`);
      if (streamId) {
        await cleanupStream(userId, streamId);
      }
    });
  });
});

注意点

  • 要确保Redis开启了键过期事件通知(notify-keyspace-events Ex
  • 同样要保证清理逻辑的幂等性,避免Redis过期事件重复触发导致异常

额外的避坑小贴士

  1. 重连状态校验:客户端重新打开APP连接后,先主动请求后端查询自己的直播状态,如果检测到有僵尸流,自动清理后允许开播,不用用户手动操作
  2. 超时时间选择:建议设30-60秒,太短容易误杀正常流(比如用户网络短暂波动),太长用户等待时间太难受
  3. 日志监控:一定要给清理逻辑加日志,方便后续排查僵尸流没清理的问题

这些方案从单实例到集群都覆盖了,你可以根据自己的部署架构选最适合的,我自己用方案三支撑过日活10w+的直播服务,稳定性拉满~

火山引擎 最新活动