ニジカ投稿局 https://tv.nizika.tv
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

videos-redundancy-scheduler.ts 13 KiB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375
  1. import { move } from 'fs-extra/esm'
  2. import { join } from 'path'
  3. import { getServerActor } from '@server/models/application/application.js'
  4. import { VideoModel } from '@server/models/video/video.js'
  5. import {
  6. MStreamingPlaylistFiles,
  7. MVideoAccountLight,
  8. MVideoFile,
  9. MVideoFileVideo,
  10. MVideoRedundancyFileVideo,
  11. MVideoRedundancyStreamingPlaylistVideo,
  12. MVideoRedundancyVideo,
  13. MVideoWithAllFiles
  14. } from '@server/types/models/index.js'
  15. import { VideosRedundancyStrategy } from '@peertube/peertube-models'
  16. import { logger, loggerTagsFactory } from '../../helpers/logger.js'
  17. import { downloadWebTorrentVideo } from '../../helpers/webtorrent.js'
  18. import { CONFIG } from '../../initializers/config.js'
  19. import { DIRECTORIES, REDUNDANCY, VIDEO_IMPORT_TIMEOUT } from '../../initializers/constants.js'
  20. import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy.js'
  21. import { sendCreateCacheFile, sendUpdateCacheFile } from '../activitypub/send/index.js'
  22. import { getLocalVideoCacheFileActivityPubUrl, getLocalVideoCacheStreamingPlaylistActivityPubUrl } from '../activitypub/url.js'
  23. import { getOrCreateAPVideo } from '../activitypub/videos/index.js'
  24. import { downloadPlaylistSegments } from '../hls.js'
  25. import { removeVideoRedundancy } from '../redundancy.js'
  26. import { generateHLSRedundancyUrl, generateWebVideoRedundancyUrl } from '../video-urls.js'
  27. import { AbstractScheduler } from './abstract-scheduler.js'
  28. const lTags = loggerTagsFactory('redundancy')
  29. type CandidateToDuplicate = {
  30. redundancy: VideosRedundancyStrategy
  31. video: MVideoWithAllFiles
  32. files: MVideoFile[]
  33. streamingPlaylists: MStreamingPlaylistFiles[]
  34. }
  35. function isMVideoRedundancyFileVideo (
  36. o: MVideoRedundancyFileVideo | MVideoRedundancyStreamingPlaylistVideo
  37. ): o is MVideoRedundancyFileVideo {
  38. return !!(o as MVideoRedundancyFileVideo).VideoFile
  39. }
  40. export class VideosRedundancyScheduler extends AbstractScheduler {
  41. private static instance: VideosRedundancyScheduler
  42. protected schedulerIntervalMs = CONFIG.REDUNDANCY.VIDEOS.CHECK_INTERVAL
  43. private constructor () {
  44. super()
  45. }
  46. async createManualRedundancy (videoId: number) {
  47. const videoToDuplicate = await VideoModel.loadWithFiles(videoId)
  48. if (!videoToDuplicate) {
  49. logger.warn('Video to manually duplicate %d does not exist anymore.', videoId, lTags())
  50. return
  51. }
  52. return this.createVideoRedundancies({
  53. video: videoToDuplicate,
  54. redundancy: null,
  55. files: videoToDuplicate.VideoFiles,
  56. streamingPlaylists: videoToDuplicate.VideoStreamingPlaylists
  57. })
  58. }
  59. protected async internalExecute () {
  60. for (const redundancyConfig of CONFIG.REDUNDANCY.VIDEOS.STRATEGIES) {
  61. logger.info('Running redundancy scheduler for strategy %s.', redundancyConfig.strategy, lTags())
  62. try {
  63. const videoToDuplicate = await this.findVideoToDuplicate(redundancyConfig)
  64. if (!videoToDuplicate) continue
  65. const candidateToDuplicate = {
  66. video: videoToDuplicate,
  67. redundancy: redundancyConfig,
  68. files: videoToDuplicate.VideoFiles,
  69. streamingPlaylists: videoToDuplicate.VideoStreamingPlaylists
  70. }
  71. await this.purgeCacheIfNeeded(candidateToDuplicate)
  72. if (await this.isTooHeavy(candidateToDuplicate)) {
  73. logger.info('Video %s is too big for our cache, skipping.', videoToDuplicate.url, lTags(videoToDuplicate.uuid))
  74. continue
  75. }
  76. logger.info(
  77. 'Will duplicate video %s in redundancy scheduler "%s".',
  78. videoToDuplicate.url, redundancyConfig.strategy, lTags(videoToDuplicate.uuid)
  79. )
  80. await this.createVideoRedundancies(candidateToDuplicate)
  81. } catch (err) {
  82. logger.error('Cannot run videos redundancy %s.', redundancyConfig.strategy, { err, ...lTags() })
  83. }
  84. }
  85. await this.extendsLocalExpiration()
  86. await this.purgeRemoteExpired()
  87. }
  88. static get Instance () {
  89. return this.instance || (this.instance = new this())
  90. }
  91. private async extendsLocalExpiration () {
  92. const expired = await VideoRedundancyModel.listLocalExpired()
  93. for (const redundancyModel of expired) {
  94. try {
  95. const redundancyConfig = CONFIG.REDUNDANCY.VIDEOS.STRATEGIES.find(s => s.strategy === redundancyModel.strategy)
  96. // If the admin disabled the redundancy, remove this redundancy instead of extending it
  97. if (!redundancyConfig) {
  98. logger.info(
  99. 'Destroying redundancy %s because the redundancy %s does not exist anymore.',
  100. redundancyModel.url, redundancyModel.strategy
  101. )
  102. await removeVideoRedundancy(redundancyModel)
  103. continue
  104. }
  105. const { totalUsed } = await VideoRedundancyModel.getStats(redundancyConfig.strategy)
  106. // If the admin decreased the cache size, remove this redundancy instead of extending it
  107. if (totalUsed > redundancyConfig.size) {
  108. logger.info('Destroying redundancy %s because the cache size %s is too heavy.', redundancyModel.url, redundancyModel.strategy)
  109. await removeVideoRedundancy(redundancyModel)
  110. continue
  111. }
  112. await this.extendsRedundancy(redundancyModel)
  113. } catch (err) {
  114. logger.error(
  115. 'Cannot extend or remove expiration of %s video from our redundancy system.',
  116. this.buildEntryLogId(redundancyModel), { err, ...lTags(redundancyModel.getVideoUUID()) }
  117. )
  118. }
  119. }
  120. }
  121. private async extendsRedundancy (redundancyModel: MVideoRedundancyVideo) {
  122. const redundancy = CONFIG.REDUNDANCY.VIDEOS.STRATEGIES.find(s => s.strategy === redundancyModel.strategy)
  123. // Redundancy strategy disabled, remove our redundancy instead of extending expiration
  124. if (!redundancy) {
  125. await removeVideoRedundancy(redundancyModel)
  126. return
  127. }
  128. await this.extendsExpirationOf(redundancyModel, redundancy.minLifetime)
  129. }
  130. private async purgeRemoteExpired () {
  131. const expired = await VideoRedundancyModel.listRemoteExpired()
  132. for (const redundancyModel of expired) {
  133. try {
  134. await removeVideoRedundancy(redundancyModel)
  135. } catch (err) {
  136. logger.error(
  137. 'Cannot remove redundancy %s from our redundancy system.',
  138. this.buildEntryLogId(redundancyModel), lTags(redundancyModel.getVideoUUID())
  139. )
  140. }
  141. }
  142. }
  143. private findVideoToDuplicate (cache: VideosRedundancyStrategy) {
  144. if (cache.strategy === 'most-views') {
  145. return VideoRedundancyModel.findMostViewToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR)
  146. }
  147. if (cache.strategy === 'trending') {
  148. return VideoRedundancyModel.findTrendingToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR)
  149. }
  150. if (cache.strategy === 'recently-added') {
  151. const minViews = cache.minViews
  152. return VideoRedundancyModel.findRecentlyAddedToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR, minViews)
  153. }
  154. }
  155. private async createVideoRedundancies (data: CandidateToDuplicate) {
  156. const video = await this.loadAndRefreshVideo(data.video.url)
  157. if (!video) {
  158. logger.info('Video %s we want to duplicate does not existing anymore, skipping.', data.video.url, lTags(data.video.uuid))
  159. return
  160. }
  161. for (const file of data.files) {
  162. const existingRedundancy = await VideoRedundancyModel.loadLocalByFileId(file.id)
  163. if (existingRedundancy) {
  164. await this.extendsRedundancy(existingRedundancy)
  165. continue
  166. }
  167. await this.createVideoFileRedundancy(data.redundancy, video, file)
  168. }
  169. for (const streamingPlaylist of data.streamingPlaylists) {
  170. const existingRedundancy = await VideoRedundancyModel.loadLocalByStreamingPlaylistId(streamingPlaylist.id)
  171. if (existingRedundancy) {
  172. await this.extendsRedundancy(existingRedundancy)
  173. continue
  174. }
  175. await this.createStreamingPlaylistRedundancy(data.redundancy, video, streamingPlaylist)
  176. }
  177. }
  178. private async createVideoFileRedundancy (redundancy: VideosRedundancyStrategy | null, video: MVideoAccountLight, fileArg: MVideoFile) {
  179. let strategy = 'manual'
  180. let expiresOn: Date = null
  181. if (redundancy) {
  182. strategy = redundancy.strategy
  183. expiresOn = this.buildNewExpiration(redundancy.minLifetime)
  184. }
  185. const file = fileArg as MVideoFileVideo
  186. file.Video = video
  187. const serverActor = await getServerActor()
  188. logger.info('Duplicating %s - %d in videos redundancy with "%s" strategy.', video.url, file.resolution, strategy, lTags(video.uuid))
  189. const tmpPath = await downloadWebTorrentVideo({ uri: file.torrentUrl }, VIDEO_IMPORT_TIMEOUT)
  190. const destPath = join(CONFIG.STORAGE.REDUNDANCY_DIR, file.filename)
  191. await move(tmpPath, destPath, { overwrite: true })
  192. const createdModel: MVideoRedundancyFileVideo = await VideoRedundancyModel.create({
  193. expiresOn,
  194. url: getLocalVideoCacheFileActivityPubUrl(file),
  195. fileUrl: generateWebVideoRedundancyUrl(file),
  196. strategy,
  197. videoFileId: file.id,
  198. actorId: serverActor.id
  199. })
  200. createdModel.VideoFile = file
  201. await sendCreateCacheFile(serverActor, video, createdModel)
  202. logger.info('Duplicated %s - %d -> %s.', video.url, file.resolution, createdModel.url, lTags(video.uuid))
  203. }
  204. private async createStreamingPlaylistRedundancy (
  205. redundancy: VideosRedundancyStrategy,
  206. video: MVideoAccountLight,
  207. playlistArg: MStreamingPlaylistFiles
  208. ) {
  209. let strategy = 'manual'
  210. let expiresOn: Date = null
  211. if (redundancy) {
  212. strategy = redundancy.strategy
  213. expiresOn = this.buildNewExpiration(redundancy.minLifetime)
  214. }
  215. const playlist = Object.assign(playlistArg, { Video: video })
  216. const serverActor = await getServerActor()
  217. logger.info('Duplicating %s streaming playlist in videos redundancy with "%s" strategy.', video.url, strategy, lTags(video.uuid))
  218. const destDirectory = join(DIRECTORIES.HLS_REDUNDANCY, video.uuid)
  219. const masterPlaylistUrl = playlist.getMasterPlaylistUrl(video)
  220. const maxSizeKB = this.getTotalFileSizes([], [ playlist ]) / 1000
  221. const toleranceKB = maxSizeKB + ((5 * maxSizeKB) / 100) // 5% more tolerance
  222. await downloadPlaylistSegments(masterPlaylistUrl, destDirectory, VIDEO_IMPORT_TIMEOUT, toleranceKB)
  223. const createdModel: MVideoRedundancyStreamingPlaylistVideo = await VideoRedundancyModel.create({
  224. expiresOn,
  225. url: getLocalVideoCacheStreamingPlaylistActivityPubUrl(video, playlist),
  226. fileUrl: generateHLSRedundancyUrl(video, playlistArg),
  227. strategy,
  228. videoStreamingPlaylistId: playlist.id,
  229. actorId: serverActor.id
  230. })
  231. createdModel.VideoStreamingPlaylist = playlist
  232. await sendCreateCacheFile(serverActor, video, createdModel)
  233. logger.info('Duplicated playlist %s -> %s.', masterPlaylistUrl, createdModel.url, lTags(video.uuid))
  234. }
  235. private async extendsExpirationOf (redundancy: MVideoRedundancyVideo, expiresAfterMs: number) {
  236. logger.info('Extending expiration of %s.', redundancy.url, lTags(redundancy.getVideoUUID()))
  237. const serverActor = await getServerActor()
  238. redundancy.expiresOn = this.buildNewExpiration(expiresAfterMs)
  239. await redundancy.save()
  240. await sendUpdateCacheFile(serverActor, redundancy)
  241. }
  242. private async purgeCacheIfNeeded (candidateToDuplicate: CandidateToDuplicate) {
  243. while (await this.isTooHeavy(candidateToDuplicate)) {
  244. const redundancy = candidateToDuplicate.redundancy
  245. const toDelete = await VideoRedundancyModel.loadOldestLocalExpired(redundancy.strategy, redundancy.minLifetime)
  246. if (!toDelete) return
  247. const videoId = toDelete.VideoFile
  248. ? toDelete.VideoFile.videoId
  249. : toDelete.VideoStreamingPlaylist.videoId
  250. const redundancies = await VideoRedundancyModel.listLocalByVideoId(videoId)
  251. for (const redundancy of redundancies) {
  252. await removeVideoRedundancy(redundancy)
  253. }
  254. }
  255. }
  256. private async isTooHeavy (candidateToDuplicate: CandidateToDuplicate) {
  257. const maxSize = candidateToDuplicate.redundancy.size
  258. const { totalUsed: alreadyUsed } = await VideoRedundancyModel.getStats(candidateToDuplicate.redundancy.strategy)
  259. const videoSize = this.getTotalFileSizes(candidateToDuplicate.files, candidateToDuplicate.streamingPlaylists)
  260. const willUse = alreadyUsed + videoSize
  261. logger.debug('Checking candidate size.', { maxSize, alreadyUsed, videoSize, willUse, ...lTags(candidateToDuplicate.video.uuid) })
  262. return willUse > maxSize
  263. }
  264. private buildNewExpiration (expiresAfterMs: number) {
  265. return new Date(Date.now() + expiresAfterMs)
  266. }
  267. private buildEntryLogId (object: MVideoRedundancyFileVideo | MVideoRedundancyStreamingPlaylistVideo) {
  268. if (isMVideoRedundancyFileVideo(object)) return `${object.VideoFile.Video.url}-${object.VideoFile.resolution}`
  269. return `${object.VideoStreamingPlaylist.getMasterPlaylistUrl(object.VideoStreamingPlaylist.Video)}`
  270. }
  271. private getTotalFileSizes (files: MVideoFile[], playlists: MStreamingPlaylistFiles[]): number {
  272. const fileReducer = (previous: number, current: MVideoFile) => previous + current.size
  273. let allFiles = files
  274. for (const p of playlists) {
  275. allFiles = allFiles.concat(p.VideoFiles)
  276. }
  277. return allFiles.reduce(fileReducer, 0)
  278. }
  279. private async loadAndRefreshVideo (videoUrl: string) {
  280. // We need more attributes and check if the video still exists
  281. const getVideoOptions = {
  282. videoObject: videoUrl,
  283. syncParam: { rates: false, shares: false, comments: false, refreshVideo: true },
  284. fetchType: 'all' as 'all'
  285. }
  286. const { video } = await getOrCreateAPVideo(getVideoOptions)
  287. return video
  288. }
  289. }