You need to enable JavaScript to run this app.
导航

RTM 推流 SDK 使用说明

最近更新时间2023.02.02 18:42:57

首次发布时间2022.12.30 11:28:27

介绍

RTM 推流 SDK 是火山引擎慢直播产品提供的 RTM 推流套件,可以帮助用户快速进行低延迟推流,并且在视频流推流至服务端后进行低延迟直播。

关键能力

媒体支持

  • 音频能力支持 Opus。

  • 视频能力支持 H264。

芯片适配

  • arm64 和 x86 芯片均可运行。

安装

RTM 推流 SDK 以动态库的方式提供,将其引入项目编译即可。可参考下文中的完整示例。

SDK 下载

芯片版本SDK 下载
x86请联系您所在区域的销售或者通过火山引擎官网发起工单
arm64请联系您所在区域的销售或者通过火山引擎官网发起工单

创建 RTM 推流资源

  1. 登录火山引擎 慢直播控制台,创建 RTMP 接入类型的空间。

  2. 进入空间后,打开 视频流管理 页面,点击 添加视频流

  3. 打开视频流详情页面,获得 RTM 推流地址,然后参考使用示例进行 RTM 推流。如下图所示:

接口文档

元数据定义

/**
 * Frame types enum
 */
typedef enum {
    /**
     * No flags are set
     */
    RTM_FRAME_FLAG_NONE = 0,

    /**
     * The frame is a key frame - I or IDR
     */
    RTM_FRAME_FLAG_KEY_FRAME = (1 << 0),

    /**
     * The frame is discardable - no other frames depend on it
     */
    RTM_FRAME_FLAG_DISCARDABLE_FRAME = (1 << 1),

    /**
     * The frame is invisible for rendering
     */
    RTM_FRAME_FLAG_INVISIBLE_FRAME = (1 << 2),

    /**
     * The frame is an explicit marker for the end-of-fragment
     */
    RTM_FRAME_FLAG_END_OF_FRAGMENT = (1 << 3),
} RTM_FRAME_FLAGS;

/**
 * The representation of the Frame
 */
typedef struct {
    // Id of the frame
    UINT32 index;

    // Flags associated with the frame (ex. IFrame for frames)
    RTM_FRAME_FLAGS flags;

    // The decoding timestamp of the frame in 100ns precision
    UINT64 decodingTs;

    // The presentation timestamp of the frame in 100ns precision
    UINT64 presentationTs;

    // The duration of the frame in 100ns precision. Can be 0.
    UINT64 duration;

    // Size of the frame data in bytes
    UINT32 size;

    // The frame bits
    PBYTE frameData;

    // Id of the track this frame belongs to
    UINT64 trackId;
} RTMFrame, *PRTMFrame;

创建推流上下文

/**
 * @brief 创建新的推流上下文
 * 
 * @param url 云端的推流地址
 * @return UINT64 上下文地址
 */
UINT64 RTMCtxNew(const char* url);

添加视频通道

#define RTM_VIDEO_CODEC_H264 1
#define RTM_VIDEO_CODEC_H265 2

typedef struct RTMVideoConfig {
  INT32 Codec;
} RTMVideoConfigT;

/**
 * @brief 为推流添加视频 track
 * 
 * @param ctx RTMCtxNew 初始化时产生的返回值
 * @param cfg 
 */
VOID RTMAddVideoTrack(UINT64 ctx, RTMVideoConfigT* cfg);

添加音频通道

#define RTM_AUDIO_CODEC_OPUS 1

typedef struct RTMAudioConfig {
  INT32 Codec;
} RTMAudioConfigT;

/**
 * @brief 为推流添加音频 track
 * 
 * @param ctx 
 * @param cfg 
 */
VOID RTMAddAudioTrack(UINT64 ctx, RTMAudioConfigT* cfg);

开始建立推流连接

/**
 * @brief 开始推流建联,失败会返回 false
 * 
 * @param ctx 
 * @param msg 若失败,会记录失败的信息,用户可以读取错误信息
 * @return true 
 * @return false 
 */
BOOL RTMStartPush(UINT64 ctx, PCHAR *msg);

发送媒体数据

/**
 * @brief 发送媒体数据,frame 需要设置 track id, track id 从 0 开始,先视频,后音频
 * 
 * @param ctx 
 * @param frame 
 * @return true 
 * @return false 
 */
BOOL RTMSendMediaData(UINT64 ctx, PRTMFrame frame);

注册推流中断回调

typedef void(RTMDisconnectedCB)(UINT64 ctx);
void RegisterRTMDisconnectedCB(UINT64 ctx, RTMDisconnectedCB *cb);

释放推流资源

VOID RTMRelease(UINT64 ctx);

使用示例

在项目中引入以下资源即可使用,以最简单的 C 程序为例:

iot_demo.zip
177.66KB

下载压缩包后,在根目录编译即可:

gcc -L3rd/rtmsdk -lrtm_sdk -l main.c 3rd/rtmsdk/include/rtm.h 3rd/rtmsdk/include/common.h -o iot_demo

如何使用 GStreamer 处理媒体

以下示例为使用 GStreamer 处理媒体。详细步骤包括:

  1. 初始化系统资源。
gst_init(NULL, NULL);
ctx = RTMCtxNew("<创建的 RTM 推流地址>");
RTMVideoConfigT v;
memset(&v, 0x00, sizeof(RTMVideoConfigT));
v.Codec = RTM_VIDEO_CODEC_H264;
RTMAddVideoTrack(ctx, &v);
  1. 安装 v4l2loopback 模块,用于配置虚拟摄像头。
sudo apt install v4l2loopback-dkms
sudo modprobe v4l2loopback
  1. 准备测试媒体资源。
随意下载一个媒体资源,然后

ffmpeg -re -i <下载的文件> -map 0:v -f v4l2 /dev/video0
  1. 编写处理媒体部分代码。
  • 接收 GStreamer 的媒体数据并处理。
// 这个函数用于接收 GStreamer 推送过来的数据,收到数据后,可以选择调用 write stream 进行发送
GstFlowReturn on_new_sample(GstElement *sink, gpointer data, UINT64 trackid) {
  GstBuffer *buffer;
  BOOL isDroppable, delta;
  GstFlowReturn ret = GST_FLOW_OK;
  GstSample *sample = NULL;
  GstMapInfo info;
  GstSegment *segment;
  GstClockTime buf_pts;
  RTMFrame frame;
  STATUS status;
  PCHAR session_key = (PCHAR)data;
  UINT32 i;

  if (session_key == NULL) {
    printf("[KVS GStreamer Master] on_new_sample(): operation returned status "
           "code: 0x%08x \n",
           STATUS_NULL_ARG);
    goto CleanUp;
  }

  info.data = NULL;
  sample = gst_app_sink_pull_sample(GST_APP_SINK(sink));

  buffer = gst_sample_get_buffer(sample);
  isDroppable = GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_CORRUPTED) ||
                GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DECODE_ONLY) ||
                (GST_BUFFER_FLAGS(buffer) == GST_BUFFER_FLAG_DISCONT) ||
                (GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DISCONT) &&
                 GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT)) ||
                // drop if buffer contains header only and has invalid timestamp
                !GST_BUFFER_PTS_IS_VALID(buffer);
  // printf("on new_sample isDroppable:%d", isDroppable);

  if (!isDroppable) {
    delta = GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT);

    frame.flags = delta ? RTM_FRAME_FLAG_NONE : RTM_FRAME_FLAG_KEY_FRAME;

    // convert from segment timestamp to running time in live mode.
    segment = gst_sample_get_segment(sample);
    buf_pts =
        gst_segment_to_running_time(segment, GST_FORMAT_TIME, buffer->pts);
    if (!GST_CLOCK_TIME_IS_VALID(buf_pts)) {
      printf("[KVS GStreamer Master] Frame contains invalid PTS dropping the "
             "frame. \n");
    }

    if (!(gst_buffer_map(buffer, &info, GST_MAP_READ))) {
      printf("[KVS GStreamer Master] on_new_sample(): Gst buffer mapping "
             "failed\n");
      goto CleanUp;
    }

    // printf("[GStreamer] on_new_sample(): send frame trackId:%ld size:%d
    // presentTs:%ld\n", trackid, info.size, pSessionConfig->videoTimestamp);

    frame.duration = 0;
    frame.size = (UINT32)info.size;
    frame.frameData = (PBYTE)info.data;

    frame.presentationTs = buffer->pts/100;
    frame.decodingTs = frame.presentationTs;
    frame.trackId = 0;

    ts += SAMPLE_VIDEO_FRAME_DURATION;
    if (RTMSendMediaData(ctx, &frame) == STATUS_SUCCESS) {
      printf("writeframe() %d %d delta: %d\n", info.size, trackid,
             ts - frame.presentationTs);
    } else {
      printf("writeFrame() failed with 0x%08x\n", status);
    }
  }

CleanUp:

  if (info.data != NULL) {
    gst_buffer_unmap(buffer, &info);
  }

  if (sample != NULL) {
    gst_sample_unref(sample);
  }

  return ret;
}
  • 开启 GStreamer 任务。
GstFlowReturn on_new_sample_video(GstElement *sink, gpointer data) {
  return on_new_sample(sink, data, DEFAULT_VIDEO_TRACK_ID);
}

// 使用 GStreamer 打开媒体文件,开始准备推送数据流
PVOID sendGstreamerVideo(PVOID args) {
  printf("sendGstreamerVideo start\n");
  STATUS retStatus = STATUS_SUCCESS;
  GstElement *appsinkVideo = NULL, *pipeline = NULL;
  GstBus *bus;
  GstMessage *msg;
  GError *error = NULL;
  PCHAR session_key = (PCHAR)args;

  printf("Pipeline video generate start\n");
  pipeline = gst_parse_launch(
      "v4l2src device=/dev/video0 ! videoconvert ! "
      " x264enc aud=false ! "
      " video/x-h264,stream-format=byte-stream,alignment=au,profile=baseline ! "
      " appsink sync=true emit-signals=true name=appsink-video",
      &error);

  if (pipeline == NULL) {
    printf("[KVS GStreamer Master] sendGstreamerAudioVideo(): Failed to launch "
           "gstreamer, operation returned status code: 0x%08x \n",
           STATUS_INTERNAL_ERROR);
    goto CleanUp;
  }

  printf("Pipeline video generate done\n");
  appsinkVideo = gst_bin_get_by_name(GST_BIN(pipeline), "appsink-video");

  if (appsinkVideo == NULL) {
    printf("[KVS GStreamer Master] sendGstreamerAudioVideo(): cant find "
           "appsink, operation returned status code: 0x%08x \n",
           STATUS_INTERNAL_ERROR);
    goto CleanUp;
  }

  if (appsinkVideo != NULL) {
    g_signal_connect(appsinkVideo, "new-sample",
                     G_CALLBACK(on_new_sample_video), (gpointer)session_key);
  }

  gst_element_set_state(pipeline, GST_STATE_PLAYING);

  /* block until error or EOS */
  bus = gst_element_get_bus(pipeline);
  msg = gst_bus_timed_pop_filtered(bus, GST_CLOCK_TIME_NONE,
                                   GST_MESSAGE_ERROR | GST_MESSAGE_EOS);

  /* Free resources */
  if (msg != NULL) {
    gst_message_unref(msg);
  }
  gst_object_unref(bus);
  gst_element_set_state(pipeline, GST_STATE_NULL);
  gst_object_unref(pipeline);

CleanUp:

  if (error != NULL) {
    printf("%s", error->message);
    g_clear_error(&error);
  }

  return retStatus;
}
  1. 运行程序。编译后直接执行即可,可以在控制台观看直播。

  2. 完整示例:

#include "rtm.h"
#include "stdio.h"

#include "gst/gst.h"
#include "gst/app/gstappsink.h"

#define DEFAULT_FPS_VALUE 30
#define DEFAULT_TIME_UNIT_IN_NANOS 100
#define HUNDREDS_OF_NANOS_IN_A_MICROSECOND 10LL
#define HUNDREDS_OF_NANOS_IN_A_MILLISECOND                                     \
  (HUNDREDS_OF_NANOS_IN_A_MICROSECOND * 1000LL)
#define HUNDREDS_OF_NANOS_IN_A_SECOND                                          \
  (HUNDREDS_OF_NANOS_IN_A_MILLISECOND * 1000LL)
#define HUNDREDS_OF_NANOS_IN_A_MINUTE (HUNDREDS_OF_NANOS_IN_A_SECOND * 60LL)
#define HUNDREDS_OF_NANOS_IN_AN_HOUR (HUNDREDS_OF_NANOS_IN_A_MINUTE * 60LL)
#define SAMPLE_VIDEO_FRAME_DURATION                                            \
  (HUNDREDS_OF_NANOS_IN_A_SECOND / DEFAULT_FPS_VALUE)

UINT64 ts = 0;
UINT64 ctx = 0;

GstFlowReturn on_new_sample(GstElement *sink, gpointer data, UINT64 trackid) {
  GstBuffer *buffer;
  BOOL isDroppable, delta;
  GstFlowReturn ret = GST_FLOW_OK;
  GstSample *sample = NULL;
  GstMapInfo info;
  GstSegment *segment;
  GstClockTime buf_pts;
  RTMFrame frame;
  STATUS status;
  PCHAR session_key = (PCHAR)data;
  UINT32 i;

  if (session_key == NULL) {
    printf("[KVS GStreamer Master] on_new_sample(): operation returned status "
           "code: 0x%08x \n",
           STATUS_NULL_ARG);
    goto CleanUp;
  }

  info.data = NULL;
  sample = gst_app_sink_pull_sample(GST_APP_SINK(sink));

  buffer = gst_sample_get_buffer(sample);
  isDroppable = GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_CORRUPTED) ||
                GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DECODE_ONLY) ||
                (GST_BUFFER_FLAGS(buffer) == GST_BUFFER_FLAG_DISCONT) ||
                (GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DISCONT) &&
                 GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT)) ||
                // drop if buffer contains header only and has invalid timestamp
                !GST_BUFFER_PTS_IS_VALID(buffer);
  // printf("on new_sample isDroppable:%d", isDroppable);

  if (!isDroppable) {
    delta = GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT);

    frame.flags = delta ? RTM_FRAME_FLAG_NONE : RTM_FRAME_FLAG_KEY_FRAME;

    // convert from segment timestamp to running time in live mode.
    segment = gst_sample_get_segment(sample);
    buf_pts =
        gst_segment_to_running_time(segment, GST_FORMAT_TIME, buffer->pts);
    if (!GST_CLOCK_TIME_IS_VALID(buf_pts)) {
      printf("[KVS GStreamer Master] Frame contains invalid PTS dropping the "
             "frame. \n");
    }

    if (!(gst_buffer_map(buffer, &info, GST_MAP_READ))) {
      printf("[KVS GStreamer Master] on_new_sample(): Gst buffer mapping "
             "failed\n");
      goto CleanUp;
    }

    // printf("[GStreamer] on_new_sample(): send frame trackId:%ld size:%d
    // presentTs:%ld\n", trackid, info.size, pSessionConfig->videoTimestamp);

    frame.duration = 0;
    frame.size = (UINT32)info.size;
    frame.frameData = (PBYTE)info.data;

    frame.presentationTs = buffer->pts/100;
    frame.decodingTs = frame.presentationTs;
    frame.trackId = 0;

    ts += SAMPLE_VIDEO_FRAME_DURATION;
    if (RTMSendMediaData(ctx, &frame) == STATUS_SUCCESS) {
      printf("writeframe() %d %d delta: %d\n", info.size, trackid,
             ts - frame.presentationTs);
    } else {
      printf("writeFrame() failed with 0x%08x\n", status);
    }
  }

CleanUp:

  if (info.data != NULL) {
    gst_buffer_unmap(buffer, &info);
  }

  if (sample != NULL) {
    gst_sample_unref(sample);
  }

  return ret;
}

GstFlowReturn on_new_sample_video(GstElement *sink, gpointer data) {
  return on_new_sample(sink, data, DEFAULT_VIDEO_TRACK_ID);
}

PVOID sendGstreamerVideo(PVOID args) {
  printf("sendGstreamerVideo start\n");
  STATUS retStatus = STATUS_SUCCESS;
  GstElement *appsinkVideo = NULL, *pipeline = NULL;
  GstBus *bus;
  GstMessage *msg;
  GError *error = NULL;
  PCHAR session_key = (PCHAR)args;

  printf("Pipeline video generate start\n");
  pipeline = gst_parse_launch(
      "v4l2src device=/dev/video0 ! videoconvert ! "
      " x264enc aud=false ! "
      " video/x-h264,stream-format=byte-stream,alignment=au,profile=baseline ! "
      " appsink sync=true emit-signals=true name=appsink-video",
      &error);

  if (pipeline == NULL) {
    printf("[KVS GStreamer Master] sendGstreamerAudioVideo(): Failed to launch "
           "gstreamer, operation returned status code: 0x%08x \n",
           STATUS_INTERNAL_ERROR);
    goto CleanUp;
  }

  printf("Pipeline video generate done\n");
  appsinkVideo = gst_bin_get_by_name(GST_BIN(pipeline), "appsink-video");

  if (appsinkVideo == NULL) {
    printf("[KVS GStreamer Master] sendGstreamerAudioVideo(): cant find "
           "appsink, operation returned status code: 0x%08x \n",
           STATUS_INTERNAL_ERROR);
    goto CleanUp;
  }

  if (appsinkVideo != NULL) {
    g_signal_connect(appsinkVideo, "new-sample",
                     G_CALLBACK(on_new_sample_video), (gpointer)session_key);
  }

  gst_element_set_state(pipeline, GST_STATE_PLAYING);

  /* block until error or EOS */
  bus = gst_element_get_bus(pipeline);
  msg = gst_bus_timed_pop_filtered(bus, GST_CLOCK_TIME_NONE,
                                   GST_MESSAGE_ERROR | GST_MESSAGE_EOS);

  /* Free resources */
  if (msg != NULL) {
    gst_message_unref(msg);
  }
  gst_object_unref(bus);
  gst_element_set_state(pipeline, GST_STATE_NULL);
  gst_object_unref(pipeline);

CleanUp:

  if (error != NULL) {
    printf("%s", error->message);
    g_clear_error(&error);
  }

  return retStatus;
}

/*
ffmpeg -re -i /home/<tmp>/videos/summer.mp4 -map 0:v -f v4l2 /dev/video0
*/

int main() {
  printf("Hello World.\n");
  gst_init(NULL, NULL);
  ctx = RTMCtxNew("<创建的 rtm 推流地址>");
  RTMVideoConfigT v;
  memset(&v, 0x00, sizeof(RTMVideoConfigT));
  v.Codec = RTM_VIDEO_CODEC_H264;
  RTMAddVideoTrack(ctx, &v);
  char errMsg[1024] = {0};
  if (RTMStartPush(ctx, errMsg) == 0) {
    sendGstreamerVideo("test_a");
    g_usleep(100000000000000);
  } else {
    printf("start push failed: %s\n", errMsg);
  }
  return 0;
}