ニジカ投稿局 https://tv.nizika.tv
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

job-queue.ts 19 KiB


  1. import { pick, timeoutPromise } from '@peertube/peertube-core-utils'
  2. import {
  3. ActivitypubFollowPayload,
  4. ActivitypubHttpBroadcastPayload,
  5. ActivitypubHttpFetcherPayload,
  6. ActivitypubHttpUnicastPayload,
  7. ActorKeysPayload,
  8. AfterVideoChannelImportPayload,
  9. CreateUserExportPayload,
  10. DeleteResumableUploadMetaFilePayload,
  11. EmailPayload,
  12. FederateVideoPayload,
  13. GenerateStoryboardPayload,
  14. ImportUserArchivePayload,
  15. JobState,
  16. JobType,
  17. ManageVideoTorrentPayload,
  18. MoveStoragePayload,
  19. NotifyPayload,
  20. RefreshPayload,
  21. TranscodingJobBuilderPayload,
  22. VideoChannelImportPayload,
  23. VideoFileImportPayload,
  24. VideoImportPayload,
  25. VideoLiveEndingPayload,
  26. VideoRedundancyPayload,
  27. VideoStudioEditionPayload,
  28. VideoTranscodingPayload,
  29. VideoTranscriptionPayload
  30. } from '@peertube/peertube-models'
  31. import { parseDurationToMs } from '@server/helpers/core-utils.js'
  32. import { jobStates } from '@server/helpers/custom-validators/jobs.js'
  33. import { CONFIG } from '@server/initializers/config.js'
  34. import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy.js'
  35. import {
  36. FlowJob,
  37. FlowProducer,
  38. Job,
  39. JobsOptions,
  40. Queue,
  41. QueueEvents,
  42. QueueEventsOptions,
  43. QueueOptions,
  44. Worker,
  45. WorkerOptions
  46. } from 'bullmq'
  47. import { logger } from '../../helpers/logger.js'
  48. import { JOB_ATTEMPTS, JOB_CONCURRENCY, JOB_REMOVAL_OPTIONS, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants.js'
  49. import { Hooks } from '../plugins/hooks.js'
  50. import { Redis } from '../redis.js'
  51. import { processActivityPubCleaner } from './handlers/activitypub-cleaner.js'
  52. import { processActivityPubFollow } from './handlers/activitypub-follow.js'
  53. import {
  54. processActivityPubHttpSequentialBroadcast,
  55. processActivityPubParallelHttpBroadcast
  56. } from './handlers/activitypub-http-broadcast.js'
  57. import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher.js'
  58. import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast.js'
  59. import { refreshAPObject } from './handlers/activitypub-refresher.js'
  60. import { processActorKeys } from './handlers/actor-keys.js'
  61. import { processAfterVideoChannelImport } from './handlers/after-video-channel-import.js'
  62. import { processCreateUserExport } from './handlers/create-user-export.js'
  63. import { processEmail } from './handlers/email.js'
  64. import { processFederateVideo } from './handlers/federate-video.js'
  65. import { processGenerateStoryboard } from './handlers/generate-storyboard.js'
  66. import { processImportUserArchive } from './handlers/import-user-archive.js'
  67. import { processManageVideoTorrent } from './handlers/manage-video-torrent.js'
  68. import { onMoveToFileSystemFailure, processMoveToFileSystem } from './handlers/move-to-file-system.js'
  69. import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage.js'
  70. import { processNotify } from './handlers/notify.js'
  71. import { processTranscodingJobBuilder } from './handlers/transcoding-job-builder.js'
  72. import { processVideoChannelImport } from './handlers/video-channel-import.js'
  73. import { processVideoFileImport } from './handlers/video-file-import.js'
  74. import { processVideoImport } from './handlers/video-import.js'
  75. import { processVideoLiveEnding } from './handlers/video-live-ending.js'
  76. import { processVideoStudioEdition } from './handlers/video-studio-edition.js'
  77. import { processVideoTranscoding } from './handlers/video-transcoding.js'
  78. import { processVideoTranscription } from './handlers/video-transcription.js'
  79. import { processVideosViewsStats } from './handlers/video-views-stats.js'
  80. export type CreateJobArgument =
  81. { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
  82. { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } |
  83. { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
  84. { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
  85. { type: 'activitypub-cleaner', payload: {} } |
  86. { type: 'activitypub-follow', payload: ActivitypubFollowPayload } |
  87. { type: 'video-file-import', payload: VideoFileImportPayload } |
  88. { type: 'video-transcoding', payload: VideoTranscodingPayload } |
  89. { type: 'email', payload: EmailPayload } |
  90. { type: 'transcoding-job-builder', payload: TranscodingJobBuilderPayload } |
  91. { type: 'video-import', payload: VideoImportPayload } |
  92. { type: 'activitypub-refresher', payload: RefreshPayload } |
  93. { type: 'videos-views-stats', payload: {} } |
  94. { type: 'video-live-ending', payload: VideoLiveEndingPayload } |
  95. { type: 'actor-keys', payload: ActorKeysPayload } |
  96. { type: 'video-redundancy', payload: VideoRedundancyPayload } |
  97. { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } |
  98. { type: 'video-studio-edition', payload: VideoStudioEditionPayload } |
  99. { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } |
  100. { type: 'move-to-object-storage', payload: MoveStoragePayload } |
  101. { type: 'move-to-file-system', payload: MoveStoragePayload } |
  102. { type: 'video-channel-import', payload: VideoChannelImportPayload } |
  103. { type: 'after-video-channel-import', payload: AfterVideoChannelImportPayload } |
  104. { type: 'notify', payload: NotifyPayload } |
  105. { type: 'federate-video', payload: FederateVideoPayload } |
  106. { type: 'create-user-export', payload: CreateUserExportPayload } |
  107. { type: 'generate-video-storyboard', payload: GenerateStoryboardPayload } |
  108. { type: 'import-user-archive', payload: ImportUserArchivePayload } |
  109. { type: 'video-transcription', payload: VideoTranscriptionPayload }
  110. export type CreateJobOptions = {
  111. delay?: number
  112. priority?: number
  113. failParentOnFailure?: boolean
  114. }
  115. const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
  116. 'activitypub-cleaner': processActivityPubCleaner,
  117. 'activitypub-follow': processActivityPubFollow,
  118. 'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast,
  119. 'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast,
  120. 'activitypub-http-fetcher': processActivityPubHttpFetcher,
  121. 'activitypub-http-unicast': processActivityPubHttpUnicast,
  122. 'activitypub-refresher': refreshAPObject,
  123. 'actor-keys': processActorKeys,
  124. 'after-video-channel-import': processAfterVideoChannelImport,
  125. 'email': processEmail,
  126. 'federate-video': processFederateVideo,
  127. 'transcoding-job-builder': processTranscodingJobBuilder,
  128. 'manage-video-torrent': processManageVideoTorrent,
  129. 'move-to-object-storage': processMoveToObjectStorage,
  130. 'move-to-file-system': processMoveToFileSystem,
  131. 'notify': processNotify,
  132. 'video-channel-import': processVideoChannelImport,
  133. 'video-file-import': processVideoFileImport,
  134. 'video-import': processVideoImport,
  135. 'video-live-ending': processVideoLiveEnding,
  136. 'video-redundancy': processVideoRedundancy,
  137. 'video-studio-edition': processVideoStudioEdition,
  138. 'video-transcoding': processVideoTranscoding,
  139. 'videos-views-stats': processVideosViewsStats,
  140. 'generate-video-storyboard': processGenerateStoryboard,
  141. 'create-user-export': processCreateUserExport,
  142. 'import-user-archive': processImportUserArchive,
  143. 'video-transcription': processVideoTranscription
  144. }
  145. const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = {
  146. 'move-to-object-storage': onMoveToObjectStorageFailure,
  147. 'move-to-file-system': onMoveToFileSystemFailure
  148. }
  149. const jobTypes: JobType[] = [
  150. 'activitypub-cleaner',
  151. 'activitypub-follow',
  152. 'activitypub-http-broadcast-parallel',
  153. 'activitypub-http-broadcast',
  154. 'activitypub-http-fetcher',
  155. 'activitypub-http-unicast',
  156. 'activitypub-refresher',
  157. 'actor-keys',
  158. 'after-video-channel-import',
  159. 'email',
  160. 'federate-video',
  161. 'generate-video-storyboard',
  162. 'manage-video-torrent',
  163. 'move-to-object-storage',
  164. 'move-to-file-system',
  165. 'notify',
  166. 'transcoding-job-builder',
  167. 'video-channel-import',
  168. 'video-file-import',
  169. 'video-import',
  170. 'video-live-ending',
  171. 'video-redundancy',
  172. 'video-studio-edition',
  173. 'video-transcription',
  174. 'videos-views-stats',
  175. 'create-user-export',
  176. 'import-user-archive',
  177. 'video-transcoding'
  178. ]
  179. const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ])
  180. class JobQueue {
  181. private static instance: JobQueue
  182. private workers: { [id in JobType]?: Worker } = {}
  183. private queues: { [id in JobType]?: Queue } = {}
  184. private queueEvents: { [id in JobType]?: QueueEvents } = {}
  185. private flowProducer: FlowProducer
  186. private initialized = false
  187. private jobRedisPrefix: string
  188. private constructor () {
  189. }
  190. init () {
  191. // Already initialized
  192. if (this.initialized === true) return
  193. this.initialized = true
  194. this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST
  195. for (const handlerName of Object.keys(handlers)) {
  196. this.buildWorker(handlerName)
  197. this.buildQueue(handlerName)
  198. this.buildQueueEvent(handlerName)
  199. }
  200. this.flowProducer = new FlowProducer({
  201. connection: Redis.getRedisClientOptions('FlowProducer'),
  202. prefix: this.jobRedisPrefix
  203. })
  204. this.flowProducer.on('error', err => { logger.error('Error in flow producer', { err }) })
  205. this.addRepeatableJobs()
  206. }
  207. private buildWorker (handlerName: JobType) {
  208. const workerOptions: WorkerOptions = {
  209. autorun: false,
  210. concurrency: this.getJobConcurrency(handlerName),
  211. prefix: this.jobRedisPrefix,
  212. connection: Redis.getRedisClientOptions('Worker'),
  213. maxStalledCount: 10
  214. }
  215. const handler = function (job: Job) {
  216. const timeout = JOB_TTL[handlerName]
  217. const p = handlers[handlerName](job)
  218. if (!timeout) return p
  219. return timeoutPromise(p, timeout)
  220. }
  221. const processor = async (jobArg: Job<any>) => {
  222. const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName })
  223. return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result')
  224. }
  225. const worker = new Worker(handlerName, processor, workerOptions)
  226. worker.on('failed', (job, err) => {
  227. const logLevel = silentFailure.has(handlerName)
  228. ? 'debug'
  229. : 'error'
  230. logger.log(logLevel, 'Cannot execute job %s in queue %s.', job.id, handlerName, { payload: job.data, err })
  231. if (errorHandlers[job.name]) {
  232. errorHandlers[job.name](job, err)
  233. .catch(err => logger.error('Cannot run error handler for job failure %d in queue %s.', job.id, handlerName, { err }))
  234. }
  235. })
  236. worker.on('error', err => { logger.error('Error in job worker %s.', handlerName, { err }) })
  237. this.workers[handlerName] = worker
  238. }
  239. private buildQueue (handlerName: JobType) {
  240. const queueOptions: QueueOptions = {
  241. connection: Redis.getRedisClientOptions('Queue'),
  242. prefix: this.jobRedisPrefix
  243. }
  244. const queue = new Queue(handlerName, queueOptions)
  245. queue.on('error', err => { logger.error('Error in job queue %s.', handlerName, { err }) })
  246. this.queues[handlerName] = queue
  247. queue.removeDeprecatedPriorityKey()
  248. .catch(err => logger.error('Cannot remove bullmq deprecated priority keys of ' + handlerName, { err }))
  249. }
  250. private buildQueueEvent (handlerName: JobType) {
  251. const queueEventsOptions: QueueEventsOptions = {
  252. autorun: false,
  253. connection: Redis.getRedisClientOptions('QueueEvent'),
  254. prefix: this.jobRedisPrefix
  255. }
  256. const queueEvents = new QueueEvents(handlerName, queueEventsOptions)
  257. queueEvents.on('error', err => { logger.error('Error in job queue events %s.', handlerName, { err }) })
  258. this.queueEvents[handlerName] = queueEvents
  259. }
  260. // ---------------------------------------------------------------------------
  261. async terminate () {
  262. const promises = Object.keys(this.workers)
  263. .map(handlerName => {
  264. const worker: Worker = this.workers[handlerName]
  265. const queue: Queue = this.queues[handlerName]
  266. const queueEvent: QueueEvents = this.queueEvents[handlerName]
  267. return Promise.all([
  268. worker.close(false),
  269. queue.close(),
  270. queueEvent.close()
  271. ])
  272. })
  273. return Promise.all(promises)
  274. }
  275. start () {
  276. const promises = Object.keys(this.workers)
  277. .map(handlerName => {
  278. const worker: Worker = this.workers[handlerName]
  279. const queueEvent: QueueEvents = this.queueEvents[handlerName]
  280. return Promise.all([
  281. worker.run(),
  282. queueEvent.run()
  283. ])
  284. })
  285. return Promise.all(promises)
  286. }
  287. async pause () {
  288. for (const handlerName of Object.keys(this.workers)) {
  289. const worker: Worker = this.workers[handlerName]
  290. await worker.pause()
  291. }
  292. }
  293. resume () {
  294. for (const handlerName of Object.keys(this.workers)) {
  295. const worker: Worker = this.workers[handlerName]
  296. worker.resume()
  297. }
  298. }
  299. // ---------------------------------------------------------------------------
  300. createJobAsync (options: CreateJobArgument & CreateJobOptions): void {
  301. this.createJob(options)
  302. .catch(err => logger.error('Cannot create job.', { err, options }))
  303. }
  304. createJob (options: CreateJobArgument & CreateJobOptions | undefined) {
  305. if (!options) return
  306. const queue: Queue = this.queues[options.type]
  307. if (queue === undefined) {
  308. logger.error('Unknown queue %s: cannot create job.', options.type)
  309. return
  310. }
  311. const jobOptions = this.buildJobOptions(options.type as JobType, pick(options, [ 'priority', 'delay' ]))
  312. return queue.add('job', options.payload, jobOptions)
  313. }
  314. createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) {
  315. let lastJob: FlowJob
  316. for (const job of jobs) {
  317. if (!job) continue
  318. lastJob = {
  319. ...this.buildJobFlowOption(job),
  320. children: lastJob
  321. ? [ lastJob ]
  322. : []
  323. }
  324. }
  325. return this.flowProducer.add(lastJob)
  326. }
  327. createJobWithChildren (parent: CreateJobArgument & CreateJobOptions, children: (CreateJobArgument & CreateJobOptions)[]) {
  328. return this.flowProducer.add({
  329. ...this.buildJobFlowOption(parent),
  330. children: children.map(c => this.buildJobFlowOption(c))
  331. })
  332. }
  333. private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions): FlowJob {
  334. return {
  335. name: 'job',
  336. data: job.payload,
  337. queueName: job.type,
  338. opts: {
  339. failParentOnFailure: true,
  340. ...this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay', 'failParentOnFailure' ]))
  341. }
  342. }
  343. }
  344. private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions {
  345. return {
  346. backoff: { delay: 60 * 1000, type: 'exponential' },
  347. attempts: JOB_ATTEMPTS[type],
  348. priority: options.priority,
  349. delay: options.delay,
  350. ...this.buildJobRemovalOptions(type)
  351. }
  352. }
  353. // ---------------------------------------------------------------------------
  354. async listForApi (options: {
  355. state?: JobState
  356. start: number
  357. count: number
  358. asc?: boolean
  359. jobType: JobType
  360. }): Promise<Job[]> {
  361. const { state, start, count, asc, jobType } = options
  362. const states = this.buildStateFilter(state)
  363. const filteredJobTypes = this.buildTypeFilter(jobType)
  364. let results: Job[] = []
  365. for (const jobType of filteredJobTypes) {
  366. const queue: Queue = this.queues[jobType]
  367. if (queue === undefined) {
  368. logger.error('Unknown queue %s to list jobs.', jobType)
  369. continue
  370. }
  371. let jobs = await queue.getJobs(states, 0, start + count, asc)
  372. // FIXME: we have sometimes undefined values https://github.com/taskforcesh/bullmq/issues/248
  373. jobs = jobs.filter(j => !!j)
  374. results = results.concat(jobs)
  375. }
  376. results.sort((j1: any, j2: any) => {
  377. if (j1.timestamp < j2.timestamp) return -1
  378. else if (j1.timestamp === j2.timestamp) return 0
  379. return 1
  380. })
  381. if (asc === false) results.reverse()
  382. return results.slice(start, start + count)
  383. }
  384. async count (state: JobState, jobType?: JobType): Promise<number> {
  385. const states = this.buildStateFilter(state)
  386. const filteredJobTypes = this.buildTypeFilter(jobType)
  387. let total = 0
  388. for (const type of filteredJobTypes) {
  389. const queue = this.queues[type]
  390. if (queue === undefined) {
  391. logger.error('Unknown queue %s to count jobs.', type)
  392. continue
  393. }
  394. const counts = await queue.getJobCounts()
  395. for (const s of states) {
  396. total += counts[s]
  397. }
  398. }
  399. return total
  400. }
  401. private buildStateFilter (state?: JobState) {
  402. if (!state) return Array.from(jobStates)
  403. const states = [ state ]
  404. // Include parent and prioritized if filtering on waiting
  405. if (state === 'waiting') {
  406. states.push('waiting-children')
  407. states.push('prioritized')
  408. }
  409. return states
  410. }
  411. private buildTypeFilter (jobType?: JobType) {
  412. if (!jobType) return jobTypes
  413. return jobTypes.filter(t => t === jobType)
  414. }
  415. async getStats () {
  416. const promises = jobTypes.map(async t => ({ jobType: t, counts: await this.queues[t].getJobCounts() }))
  417. return Promise.all(promises)
  418. }
  419. // ---------------------------------------------------------------------------
  420. async removeOldJobs () {
  421. for (const key of Object.keys(this.queues)) {
  422. const queue: Queue = this.queues[key]
  423. await queue.clean(parseDurationToMs('7 days'), 1000, 'completed')
  424. await queue.clean(parseDurationToMs('7 days'), 1000, 'failed')
  425. }
  426. }
  427. private addRepeatableJobs () {
  428. this.queues['videos-views-stats'].add('job', {}, {
  429. repeat: REPEAT_JOBS['videos-views-stats'],
  430. ...this.buildJobRemovalOptions('videos-views-stats')
  431. }).catch(err => logger.error('Cannot add repeatable job.', { err }))
  432. if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) {
  433. this.queues['activitypub-cleaner'].add('job', {}, {
  434. repeat: REPEAT_JOBS['activitypub-cleaner'],
  435. ...this.buildJobRemovalOptions('activitypub-cleaner')
  436. }).catch(err => logger.error('Cannot add repeatable job.', { err }))
  437. }
  438. }
  439. private getJobConcurrency (jobType: JobType) {
  440. if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY
  441. if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY
  442. return JOB_CONCURRENCY[jobType]
  443. }
  444. private buildJobRemovalOptions (queueName: string) {
  445. return {
  446. removeOnComplete: {
  447. // Wants seconds
  448. age: (JOB_REMOVAL_OPTIONS.SUCCESS[queueName] || JOB_REMOVAL_OPTIONS.SUCCESS.DEFAULT) / 1000,
  449. count: JOB_REMOVAL_OPTIONS.COUNT
  450. },
  451. removeOnFail: {
  452. // Wants seconds
  453. age: (JOB_REMOVAL_OPTIONS.FAILURE[queueName] || JOB_REMOVAL_OPTIONS.FAILURE.DEFAULT) / 1000,
  454. count: JOB_REMOVAL_OPTIONS.COUNT / 1000
  455. }
  456. }
  457. }
  458. static get Instance () {
  459. return this.instance || (this.instance = new this())
  460. }
  461. }
  462. // ---------------------------------------------------------------------------
  463. export {
  464. JobQueue, jobTypes
  465. }