CatFeeder/src/modules/rtsp/rtsp_server.cpp

133 lines
4.3 KiB
C++
Executable File

/**
* @file rtsp_server.cpp
* @author 吴晨
* @brief 基于 live555 的 RTSP 服务器
*
* @copyright Copyright (c) 2022-2024 OurEDA
*/
#include <stdio.h>
#include <pthread.h>
#include "BasicUsageEnvironment.hh"
#include "GroupsockHelper.hh"
#include "liveMedia.hh"
#include "module_init.h"
#include "platform/log.h"
#include "msgbus/ipc.h"
#include "LiveServerMediaSubsession.hh"
static const char *stream_name = "main";
static const char *description = "Session streamed by the main camera of rouring ROV";
static volatile char stop_running;
static pthread_t rtsp_thread;
static TaskScheduler *scheduler;
static UsageEnvironment *env;
static RTSPServer *rtspServer;
static LiveServerMediaSubsession *videoSubsession;
static LiveServerMediaSubsession *audioSubsession;
static void announceURL(RTSPServer *rtspServer, ServerMediaSession *sms) {
if (rtspServer == NULL || sms == NULL) return; // sanuty check
UsageEnvironment &env = rtspServer->envir();
env << "Play this stream using the URL ";
if (weHaveAnIPv4Address(env)) {
char *url = rtspServer->ipv4rtspURL(sms);
env << "\"" << url << "\"";
delete[] url;
if (weHaveAnIPv6Address(env)) env << " or ";
}
if (weHaveAnIPv6Address(env)) {
char *url = rtspServer->ipv6rtspURL(sms);
env << "\"" << url << "\"";
delete[] url;
}
env << "\n";
}
static void *rtsp_loop(void *arg) {
UNUSED(arg);
env->taskScheduler().doEventLoop(&stop_running);
return nullptr;
}
static void on_msg_received(ipc_request_t *req) {
if (req->id == IPC_MSG_ID_RTSP_STREAM_VIDEO) {
const ipc_msg_rtsp_stream_t *packet = (const ipc_msg_rtsp_stream_t *)req->msg;
if (videoSubsession == nullptr || videoSubsession->deviceSource() == nullptr) {
delete[] packet->data;
ipc_reply(req, nullptr);
return;
}
videoSubsession->deviceSource()->signalNewFrame(*packet);
ipc_reply(req, nullptr);
}
}
static bool rtsp_start() {
LOG_DEBUG("Starting RTSP server...\n");
ipc_register_callback(MOD_RTSP, on_msg_received);
scheduler = BasicTaskScheduler::createNew();
env = BasicUsageEnvironment::createNew(*scheduler);
rtspServer = RTSPServer::createNew(*env, 554, nullptr, 10U);
if (rtspServer == nullptr) {
*env << "Failed to create RTSP server: " << env->getResultMsg() << "\n";
return false;
}
ipc_msg_mpp_stream_info_req stream_info_req;
stream_info_req.num = 0;
ipc_request_t req = IPC_REQUEST_INIT(MOD_MPP);
req.id = IPC_MSG_ID_MPP_STREAM_INFO;
req.msg = (const ipc_msg_t *)&stream_info_req;
req.length = sizeof(ipc_msg_mpp_stream_info_req);
int result = ipc_send(&req);
if (result != IPC_OK) {
*env << "Failed to get live stream params\n";
return false;
}
ipc_msg_mpp_stream_info_res *stream_info_res = &req.res->msg.mpp_stream_info_res;
if (stream_info_res->video_codec < ROV_AUDIO) {
videoSubsession = LiveServerMediaSubsession::createNew(*env, stream_info_res->video_codec);
}
if (stream_info_res->audio_codec > ROV_AUDIO) {
audioSubsession = LiveServerMediaSubsession::createNew(*env, stream_info_res->audio_codec);
}
free(req.res);
ServerMediaSession *sms = ServerMediaSession::createNew(*env, stream_name, stream_name, description);
if (videoSubsession != nullptr) {
sms->addSubsession(videoSubsession);
}
if (audioSubsession != nullptr) {
sms->addSubsession(audioSubsession);
}
rtspServer->addServerMediaSession(sms);
announceURL(rtspServer, sms);
stop_running = 0;
result = pthread_create(&rtsp_thread, NULL, rtsp_loop, NULL);
if (result != 0) {
*env << "Create RTSP server thread failed, result: " << result << "\n";
return false;
}
return true;
}
static void rtsp_stop() {
stop_running = 1;
pthread_join(rtsp_thread, NULL);
videoSubsession = audioSubsession = nullptr;
// 若销毁RTSP服务端时客户端未断开连接则不会关闭连接, 所以在销毁之前关闭一下
rtspServer->closeAllClientSessionsForServerMediaSession(stream_name);
Medium::close(rtspServer);
if (env && !env->reclaim()) {
fprintf(stderr, "!!! UsageEnvironment release failed !!!\n");
}
delete scheduler;
}
MODULE_RUN(rtsp_start, rtsp_stop, 09);