ニジカ投稿局 https://tv.nizika.tv
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

616 lines
20 KiB

  1. import { pick, wait } from '@peertube/peertube-core-utils'
  2. import {
  3. ffprobePromise,
  4. getVideoStreamBitrate,
  5. getVideoStreamDimensionsInfo,
  6. getVideoStreamFPS,
  7. hasAudioStream
  8. } from '@peertube/peertube-ffmpeg'
  9. import { LiveVideoError, LiveVideoErrorType, VideoState } from '@peertube/peertube-models'
  10. import { retryTransactionWrapper } from '@server/helpers/database-utils.js'
  11. import { logger, loggerTagsFactory } from '@server/helpers/logger.js'
  12. import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config.js'
  13. import { VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants.js'
  14. import { sequelizeTypescript } from '@server/initializers/database.js'
  15. import { RunnerJobModel } from '@server/models/runner/runner-job.js'
  16. import { UserModel } from '@server/models/user/user.js'
  17. import { VideoLiveReplaySettingModel } from '@server/models/video/video-live-replay-setting.js'
  18. import { VideoLiveSessionModel } from '@server/models/video/video-live-session.js'
  19. import { VideoLiveModel } from '@server/models/video/video-live.js'
  20. import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist.js'
  21. import { VideoModel } from '@server/models/video/video.js'
  22. import { MUser, MVideo, MVideoLiveSession, MVideoLiveVideo, MVideoLiveVideoWithSetting } from '@server/types/models/index.js'
  23. import { FfprobeData } from 'fluent-ffmpeg'
  24. import { readFile, readdir } from 'fs/promises'
  25. import { Server, createServer } from 'net'
  26. import context from 'node-media-server/src/node_core_ctx.js'
  27. import nodeMediaServerLogger from 'node-media-server/src/node_core_logger.js'
  28. import NodeRtmpSession from 'node-media-server/src/node_rtmp_session.js'
  29. import { join } from 'path'
  30. import { Server as ServerTLS, createServer as createServerTLS } from 'tls'
  31. import { federateVideoIfNeeded } from '../activitypub/videos/index.js'
  32. import { JobQueue } from '../job-queue/index.js'
  33. import { Notifier } from '../notifier/notifier.js'
  34. import { getLiveReplayBaseDirectory } from '../paths.js'
  35. import { PeerTubeSocket } from '../peertube-socket.js'
  36. import { Hooks } from '../plugins/hooks.js'
  37. import { computeResolutionsToTranscode } from '../transcoding/transcoding-resolutions.js'
  38. import { LiveQuotaStore } from './live-quota-store.js'
  39. import { cleanupAndDestroyPermanentLive, getLiveSegmentTime } from './live-utils.js'
  40. import { MuxingSession } from './shared/index.js'
  41. // Disable node media server logs
  42. nodeMediaServerLogger.setLogType(0)
  43. const config = {
  44. rtmp: {
  45. port: CONFIG.LIVE.RTMP.PORT,
  46. chunk_size: VIDEO_LIVE.RTMP.CHUNK_SIZE,
  47. gop_cache: VIDEO_LIVE.RTMP.GOP_CACHE,
  48. ping: VIDEO_LIVE.RTMP.PING,
  49. ping_timeout: VIDEO_LIVE.RTMP.PING_TIMEOUT
  50. }
  51. }
  52. const lTags = loggerTagsFactory('live')
  53. class LiveManager {
  54. private static instance: LiveManager
  55. private readonly muxingSessions = new Map<string, MuxingSession>()
  56. private readonly videoSessions = new Map<string, string>()
  57. private rtmpServer: Server
  58. private rtmpsServer: ServerTLS
  59. private running = false
  60. private constructor () {
  61. }
  62. init () {
  63. const events = this.getContext().nodeEvent
  64. events.on('postPublish', (sessionId: string, streamPath: string) => {
  65. logger.debug('RTMP received stream', { id: sessionId, streamPath, ...lTags(sessionId) })
  66. const splittedPath = streamPath.split('/')
  67. if (splittedPath.length !== 3 || splittedPath[1] !== VIDEO_LIVE.RTMP.BASE_PATH) {
  68. logger.warn('Live path is incorrect.', { streamPath, ...lTags(sessionId) })
  69. return this.abortSession(sessionId)
  70. }
  71. const session = this.getContext().sessions.get(sessionId)
  72. const inputLocalUrl = session.inputOriginLocalUrl + streamPath
  73. const inputPublicUrl = session.inputOriginPublicUrl + streamPath
  74. this.handleSession({ sessionId, inputPublicUrl, inputLocalUrl, streamKey: splittedPath[2] })
  75. .catch(err => logger.error('Cannot handle session', { err, ...lTags(sessionId) }))
  76. })
  77. events.on('donePublish', (sessionId: string) => {
  78. logger.info('Live session ended.', { sessionId, ...lTags(sessionId) })
  79. // Force session aborting, so we kill ffmpeg even if it still has data to process (slow CPU)
  80. setTimeout(() => this.abortSession(sessionId), 2000)
  81. })
  82. registerConfigChangedHandler(() => {
  83. if (!this.running && CONFIG.LIVE.ENABLED === true) {
  84. this.run().catch(err => logger.error('Cannot run live server.', { err }))
  85. return
  86. }
  87. if (this.running && CONFIG.LIVE.ENABLED === false) {
  88. this.stop()
  89. }
  90. })
  91. // Cleanup broken lives, that were terminated by a server restart for example
  92. this.handleBrokenLives()
  93. .catch(err => logger.error('Cannot handle broken lives.', { err, ...lTags() }))
  94. }
  95. async run () {
  96. this.running = true
  97. if (CONFIG.LIVE.RTMP.ENABLED) {
  98. logger.info('Running RTMP server on port %d', CONFIG.LIVE.RTMP.PORT, lTags())
  99. this.rtmpServer = createServer(socket => {
  100. const session = new NodeRtmpSession(config, socket)
  101. session.inputOriginLocalUrl = 'rtmp://127.0.0.1:' + CONFIG.LIVE.RTMP.PORT
  102. session.inputOriginPublicUrl = WEBSERVER.RTMP_URL
  103. session.run()
  104. })
  105. this.rtmpServer.on('error', err => {
  106. logger.error('Cannot run RTMP server.', { err, ...lTags() })
  107. })
  108. this.rtmpServer.listen(CONFIG.LIVE.RTMP.PORT, CONFIG.LIVE.RTMP.HOSTNAME)
  109. }
  110. if (CONFIG.LIVE.RTMPS.ENABLED) {
  111. logger.info('Running RTMPS server on port %d', CONFIG.LIVE.RTMPS.PORT, lTags())
  112. const [ key, cert ] = await Promise.all([
  113. readFile(CONFIG.LIVE.RTMPS.KEY_FILE),
  114. readFile(CONFIG.LIVE.RTMPS.CERT_FILE)
  115. ])
  116. const serverOptions = { key, cert }
  117. this.rtmpsServer = createServerTLS(serverOptions, socket => {
  118. const session = new NodeRtmpSession(config, socket)
  119. session.inputOriginLocalUrl = 'rtmps://127.0.0.1:' + CONFIG.LIVE.RTMPS.PORT
  120. session.inputOriginPublicUrl = WEBSERVER.RTMPS_URL
  121. session.run()
  122. })
  123. this.rtmpsServer.on('error', err => {
  124. logger.error('Cannot run RTMPS server.', { err, ...lTags() })
  125. })
  126. this.rtmpsServer.listen(CONFIG.LIVE.RTMPS.PORT, CONFIG.LIVE.RTMPS.HOSTNAME)
  127. }
  128. }
  129. stop () {
  130. this.running = false
  131. if (this.rtmpServer) {
  132. logger.info('Stopping RTMP server.', lTags())
  133. this.rtmpServer.close()
  134. this.rtmpServer = undefined
  135. }
  136. if (this.rtmpsServer) {
  137. logger.info('Stopping RTMPS server.', lTags())
  138. this.rtmpsServer.close()
  139. this.rtmpsServer = undefined
  140. }
  141. // Sessions is an object
  142. this.getContext().sessions.forEach((session: any) => {
  143. if (session instanceof NodeRtmpSession) {
  144. session.stop()
  145. }
  146. })
  147. }
  148. isRunning () {
  149. return !!this.rtmpServer
  150. }
  151. hasSession (sessionId: string) {
  152. return this.getContext().sessions.has(sessionId)
  153. }
  154. stopSessionOfVideo (options: {
  155. videoUUID: string
  156. error: LiveVideoErrorType | null
  157. expectedSessionId?: string // Prevent stopping another session of permanent live
  158. errorOnReplay?: boolean
  159. }) {
  160. const { videoUUID, expectedSessionId, error } = options
  161. const sessionId = this.videoSessions.get(videoUUID)
  162. if (!sessionId) {
  163. logger.debug('No live session to stop for video %s', videoUUID, lTags(sessionId, videoUUID))
  164. return
  165. }
  166. if (expectedSessionId && expectedSessionId !== sessionId) {
  167. logger.debug(
  168. `No live session ${expectedSessionId} to stop for video ${videoUUID} (current session: ${sessionId})`,
  169. lTags(sessionId, videoUUID)
  170. )
  171. return
  172. }
  173. logger.info('Stopping live session of video %s', videoUUID, { error, ...lTags(sessionId, videoUUID) })
  174. this.saveEndingSession(options)
  175. .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId, videoUUID) }))
  176. this.videoSessions.delete(videoUUID)
  177. this.abortSession(sessionId)
  178. }
  179. private getContext () {
  180. return context
  181. }
  182. private abortSession (sessionId: string) {
  183. const session = this.getContext().sessions.get(sessionId)
  184. if (session) {
  185. session.stop()
  186. this.getContext().sessions.delete(sessionId)
  187. }
  188. const muxingSession = this.muxingSessions.get(sessionId)
  189. if (muxingSession) {
  190. // Muxing session will fire and event so we correctly cleanup the session
  191. muxingSession.abort()
  192. this.muxingSessions.delete(sessionId)
  193. }
  194. }
  195. private async handleSession (options: {
  196. sessionId: string
  197. inputLocalUrl: string
  198. inputPublicUrl: string
  199. streamKey: string
  200. }) {
  201. const { inputLocalUrl, inputPublicUrl, sessionId, streamKey } = options
  202. const videoLive = await VideoLiveModel.loadByStreamKey(streamKey)
  203. if (!videoLive) {
  204. logger.warn('Unknown live video with stream key %s.', streamKey, lTags(sessionId))
  205. return this.abortSession(sessionId)
  206. }
  207. const video = videoLive.Video
  208. if (video.isBlacklisted()) {
  209. logger.warn('Video is blacklisted. Refusing stream %s.', streamKey, lTags(sessionId, video.uuid))
  210. return this.abortSession(sessionId)
  211. }
  212. const user = await UserModel.loadByLiveId(videoLive.id)
  213. if (user.blocked) {
  214. logger.warn('User is blocked. Refusing stream %s.', streamKey, lTags(sessionId, video.uuid))
  215. return this.abortSession(sessionId)
  216. }
  217. if (this.videoSessions.has(video.uuid)) {
  218. logger.warn('Video %s has already a live session. Refusing stream %s.', video.uuid, streamKey, lTags(sessionId, video.uuid))
  219. return this.abortSession(sessionId)
  220. }
  221. // Cleanup old potential live (could happen with a permanent live)
  222. const oldStreamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id)
  223. if (oldStreamingPlaylist) {
  224. if (!videoLive.permanentLive) throw new Error('Found previous session in a non permanent live: ' + video.uuid)
  225. await cleanupAndDestroyPermanentLive(video, oldStreamingPlaylist)
  226. }
  227. this.videoSessions.set(video.uuid, sessionId)
  228. const now = Date.now()
  229. const probe = await ffprobePromise(inputLocalUrl)
  230. const [ { resolution, ratio }, fps, bitrate, hasAudio ] = await Promise.all([
  231. getVideoStreamDimensionsInfo(inputLocalUrl, probe),
  232. getVideoStreamFPS(inputLocalUrl, probe),
  233. getVideoStreamBitrate(inputLocalUrl, probe),
  234. hasAudioStream(inputLocalUrl, probe)
  235. ])
  236. logger.info(
  237. '%s probing took %d ms (bitrate: %d, fps: %d, resolution: %d)',
  238. inputLocalUrl, Date.now() - now, bitrate, fps, resolution, lTags(sessionId, video.uuid)
  239. )
  240. const allResolutions = await Hooks.wrapObject(
  241. this.buildAllResolutionsToTranscode(resolution, hasAudio),
  242. 'filter:transcoding.auto.resolutions-to-transcode.result',
  243. { video }
  244. )
  245. logger.info(
  246. 'Handling live video of original resolution %d.', resolution,
  247. { allResolutions, ...lTags(sessionId, video.uuid) }
  248. )
  249. return this.runMuxingSession({
  250. sessionId,
  251. videoLive,
  252. user,
  253. inputLocalUrl,
  254. inputPublicUrl,
  255. fps,
  256. bitrate,
  257. ratio,
  258. allResolutions,
  259. hasAudio,
  260. probe
  261. })
  262. }
  263. private async runMuxingSession (options: {
  264. sessionId: string
  265. videoLive: MVideoLiveVideoWithSetting
  266. user: MUser
  267. inputLocalUrl: string
  268. inputPublicUrl: string
  269. fps: number
  270. bitrate: number
  271. ratio: number
  272. allResolutions: number[]
  273. hasAudio: boolean
  274. probe: FfprobeData
  275. }) {
  276. const { sessionId, videoLive, user, ratio } = options
  277. const videoUUID = videoLive.Video.uuid
  278. const localLTags = lTags(sessionId, videoUUID)
  279. const liveSession = await this.saveStartingSession(videoLive)
  280. LiveQuotaStore.Instance.addNewLive(user.id, sessionId)
  281. const muxingSession = new MuxingSession({
  282. context: this.getContext(),
  283. sessionId,
  284. videoLive,
  285. user,
  286. ...pick(options, [ 'inputLocalUrl', 'inputPublicUrl', 'bitrate', 'ratio', 'fps', 'allResolutions', 'hasAudio', 'probe' ])
  287. })
  288. muxingSession.on('live-ready', () => this.publishAndFederateLive({ live: videoLive, ratio, localLTags }))
  289. muxingSession.on('bad-socket-health', ({ videoUUID }) => {
  290. logger.error(
  291. 'Too much data in client socket stream (ffmpeg is too slow to transcode the video).' +
  292. ' Stopping session of video %s.', videoUUID,
  293. localLTags
  294. )
  295. this.stopSessionOfVideo({ videoUUID, error: LiveVideoError.BAD_SOCKET_HEALTH })
  296. })
  297. muxingSession.on('duration-exceeded', ({ videoUUID }) => {
  298. logger.info('Stopping session of %s: max duration exceeded.', videoUUID, localLTags)
  299. this.stopSessionOfVideo({ videoUUID, error: LiveVideoError.DURATION_EXCEEDED })
  300. })
  301. muxingSession.on('quota-exceeded', ({ videoUUID }) => {
  302. logger.info('Stopping session of %s: user quota exceeded.', videoUUID, localLTags)
  303. this.stopSessionOfVideo({ videoUUID, error: LiveVideoError.QUOTA_EXCEEDED })
  304. })
  305. muxingSession.on('transcoding-error', ({ videoUUID }) => {
  306. this.stopSessionOfVideo({ videoUUID, error: LiveVideoError.FFMPEG_ERROR })
  307. })
  308. muxingSession.on('transcoding-end', ({ videoUUID }) => {
  309. this.onMuxingFFmpegEnd(videoUUID, sessionId)
  310. })
  311. muxingSession.on('after-cleanup', ({ videoUUID }) => {
  312. this.muxingSessions.delete(sessionId)
  313. LiveQuotaStore.Instance.removeLive(user.id, sessionId)
  314. muxingSession.destroy()
  315. return this.onAfterMuxingCleanup({ videoUUID, liveSession })
  316. .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags }))
  317. })
  318. this.muxingSessions.set(sessionId, muxingSession)
  319. muxingSession.runMuxing()
  320. .catch(err => {
  321. logger.error('Cannot run muxing.', { err, ...localLTags })
  322. this.muxingSessions.delete(sessionId)
  323. muxingSession.destroy()
  324. this.stopSessionOfVideo({
  325. videoUUID,
  326. error: err.liveVideoErrorCode || LiveVideoError.UNKNOWN_ERROR,
  327. errorOnReplay: true // Replay cannot be processed as muxing session failed directly
  328. })
  329. })
  330. }
  331. private async publishAndFederateLive (options: {
  332. live: MVideoLiveVideo
  333. ratio: number
  334. localLTags: { tags: (string | number)[] }
  335. }) {
  336. const { live, ratio, localLTags } = options
  337. const videoId = live.videoId
  338. try {
  339. const video = await VideoModel.loadFull(videoId)
  340. logger.info('Will publish and federate live %s.', video.url, localLTags)
  341. video.state = VideoState.PUBLISHED
  342. video.publishedAt = new Date()
  343. video.aspectRatio = ratio
  344. await video.save()
  345. live.Video = video
  346. await wait(getLiveSegmentTime(live.latencyMode) * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION)
  347. try {
  348. await federateVideoIfNeeded(video, false)
  349. } catch (err) {
  350. logger.error('Cannot federate live video %s.', video.url, { err, ...localLTags })
  351. }
  352. Notifier.Instance.notifyOnNewVideoOrLiveIfNeeded(video)
  353. PeerTubeSocket.Instance.sendVideoLiveNewState(video)
  354. Hooks.runAction('action:live.video.state.updated', { video })
  355. } catch (err) {
  356. logger.error('Cannot save/federate live video %d.', videoId, { err, ...localLTags })
  357. }
  358. }
  359. private onMuxingFFmpegEnd (videoUUID: string, sessionId: string) {
  360. // Session already cleaned up
  361. if (!this.videoSessions.has(videoUUID)) return
  362. this.videoSessions.delete(videoUUID)
  363. this.saveEndingSession({ videoUUID, error: null })
  364. .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) }))
  365. }
  366. private async onAfterMuxingCleanup (options: {
  367. videoUUID: string
  368. liveSession?: MVideoLiveSession
  369. cleanupNow?: boolean // Default false
  370. }) {
  371. const { videoUUID, liveSession: liveSessionArg, cleanupNow = false } = options
  372. logger.debug('Live of video %s has been cleaned up. Moving to its next state.', videoUUID, lTags(videoUUID))
  373. try {
  374. const fullVideo = await VideoModel.loadFull(videoUUID)
  375. if (!fullVideo) return
  376. const live = await VideoLiveModel.loadByVideoId(fullVideo.id)
  377. const liveSession = liveSessionArg ?? await VideoLiveSessionModel.findLatestSessionOf(fullVideo.id)
  378. // On server restart during a live
  379. if (!liveSession.endDate) {
  380. liveSession.endDate = new Date()
  381. await liveSession.save()
  382. }
  383. JobQueue.Instance.createJobAsync({
  384. type: 'video-live-ending',
  385. payload: {
  386. videoId: fullVideo.id,
  387. replayDirectory: live.saveReplay
  388. ? await this.findReplayDirectory(fullVideo)
  389. : undefined,
  390. liveSessionId: liveSession.id,
  391. streamingPlaylistId: fullVideo.getHLSPlaylist()?.id,
  392. publishedAt: fullVideo.publishedAt.toISOString()
  393. },
  394. delay: cleanupNow
  395. ? 0
  396. : VIDEO_LIVE.CLEANUP_DELAY
  397. })
  398. fullVideo.state = live.permanentLive
  399. ? VideoState.WAITING_FOR_LIVE
  400. : VideoState.LIVE_ENDED
  401. await fullVideo.save()
  402. PeerTubeSocket.Instance.sendVideoLiveNewState(fullVideo)
  403. await federateVideoIfNeeded(fullVideo, false)
  404. Hooks.runAction('action:live.video.state.updated', { video: fullVideo })
  405. } catch (err) {
  406. logger.error('Cannot save/federate new video state of live streaming of video %s.', videoUUID, { err, ...lTags(videoUUID) })
  407. }
  408. }
  409. private async handleBrokenLives () {
  410. await RunnerJobModel.cancelAllNonFinishedJobs({ type: 'live-rtmp-hls-transcoding' })
  411. const videoUUIDs = await VideoModel.listPublishedLiveUUIDs()
  412. for (const uuid of videoUUIDs) {
  413. await this.onAfterMuxingCleanup({ videoUUID: uuid, cleanupNow: true })
  414. }
  415. }
  416. private async findReplayDirectory (video: MVideo) {
  417. const directory = getLiveReplayBaseDirectory(video)
  418. const files = await readdir(directory)
  419. if (files.length === 0) return undefined
  420. return join(directory, files.sort().reverse()[0])
  421. }
  422. private buildAllResolutionsToTranscode (originResolution: number, hasAudio: boolean) {
  423. const includeInput = CONFIG.LIVE.TRANSCODING.ALWAYS_TRANSCODE_ORIGINAL_RESOLUTION
  424. const resolutionsEnabled = CONFIG.LIVE.TRANSCODING.ENABLED
  425. ? computeResolutionsToTranscode({ input: originResolution, type: 'live', includeInput, strictLower: false, hasAudio })
  426. : []
  427. if (resolutionsEnabled.length === 0) {
  428. return [ originResolution ]
  429. }
  430. return resolutionsEnabled
  431. }
  432. private saveStartingSession (videoLive: MVideoLiveVideoWithSetting) {
  433. const replaySettings = videoLive.saveReplay
  434. ? new VideoLiveReplaySettingModel({
  435. privacy: videoLive.ReplaySetting.privacy
  436. })
  437. : null
  438. return retryTransactionWrapper(() => {
  439. return sequelizeTypescript.transaction(async t => {
  440. if (videoLive.saveReplay) {
  441. await replaySettings.save({ transaction: t })
  442. }
  443. return VideoLiveSessionModel.create({
  444. startDate: new Date(),
  445. liveVideoId: videoLive.videoId,
  446. saveReplay: videoLive.saveReplay,
  447. replaySettingId: videoLive.saveReplay ? replaySettings.id : null,
  448. endingProcessed: false
  449. }, { transaction: t })
  450. })
  451. })
  452. }
  453. private async saveEndingSession (options: {
  454. videoUUID: string
  455. error: LiveVideoErrorType | null
  456. errorOnReplay?: boolean
  457. }) {
  458. const { videoUUID, error, errorOnReplay } = options
  459. const liveSession = await VideoLiveSessionModel.findCurrentSessionOf(videoUUID)
  460. if (!liveSession) return
  461. liveSession.endDate = new Date()
  462. liveSession.error = error
  463. if (errorOnReplay === true) {
  464. liveSession.endingProcessed = true
  465. }
  466. return liveSession.save()
  467. }
  468. static get Instance () {
  469. return this.instance || (this.instance = new this())
  470. }
  471. }
  472. // ---------------------------------------------------------------------------
  473. export {
  474. LiveManager
  475. }