|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570 |
- import { pick, timeoutPromise } from '@peertube/peertube-core-utils'
- import {
- ActivitypubFollowPayload,
- ActivitypubHttpBroadcastPayload,
- ActivitypubHttpFetcherPayload,
- ActivitypubHttpUnicastPayload,
- ActorKeysPayload,
- AfterVideoChannelImportPayload,
- CreateUserExportPayload,
- DeleteResumableUploadMetaFilePayload,
- EmailPayload,
- FederateVideoPayload,
- GenerateStoryboardPayload,
- ImportUserArchivePayload,
- JobState,
- JobType,
- ManageVideoTorrentPayload,
- MoveStoragePayload,
- NotifyPayload,
- RefreshPayload,
- TranscodingJobBuilderPayload,
- VideoChannelImportPayload,
- VideoFileImportPayload,
- VideoImportPayload,
- VideoLiveEndingPayload,
- VideoRedundancyPayload,
- VideoStudioEditionPayload,
- VideoTranscodingPayload,
- VideoTranscriptionPayload
- } from '@peertube/peertube-models'
- import { parseDurationToMs } from '@server/helpers/core-utils.js'
- import { jobStates } from '@server/helpers/custom-validators/jobs.js'
- import { CONFIG } from '@server/initializers/config.js'
- import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy.js'
- import {
- FlowJob,
- FlowProducer,
- Job,
- JobsOptions,
- Queue,
- QueueEvents,
- QueueEventsOptions,
- QueueOptions,
- Worker,
- WorkerOptions
- } from 'bullmq'
- import { logger } from '../../helpers/logger.js'
- import { JOB_ATTEMPTS, JOB_CONCURRENCY, JOB_REMOVAL_OPTIONS, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants.js'
- import { Hooks } from '../plugins/hooks.js'
- import { Redis } from '../redis.js'
- import { processActivityPubCleaner } from './handlers/activitypub-cleaner.js'
- import { processActivityPubFollow } from './handlers/activitypub-follow.js'
- import {
- processActivityPubHttpSequentialBroadcast,
- processActivityPubParallelHttpBroadcast
- } from './handlers/activitypub-http-broadcast.js'
- import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher.js'
- import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast.js'
- import { refreshAPObject } from './handlers/activitypub-refresher.js'
- import { processActorKeys } from './handlers/actor-keys.js'
- import { processAfterVideoChannelImport } from './handlers/after-video-channel-import.js'
- import { processCreateUserExport } from './handlers/create-user-export.js'
- import { processEmail } from './handlers/email.js'
- import { processFederateVideo } from './handlers/federate-video.js'
- import { processGenerateStoryboard } from './handlers/generate-storyboard.js'
- import { processImportUserArchive } from './handlers/import-user-archive.js'
- import { processManageVideoTorrent } from './handlers/manage-video-torrent.js'
- import { onMoveToFileSystemFailure, processMoveToFileSystem } from './handlers/move-to-file-system.js'
- import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage.js'
- import { processNotify } from './handlers/notify.js'
- import { processTranscodingJobBuilder } from './handlers/transcoding-job-builder.js'
- import { processVideoChannelImport } from './handlers/video-channel-import.js'
- import { processVideoFileImport } from './handlers/video-file-import.js'
- import { processVideoImport } from './handlers/video-import.js'
- import { processVideoLiveEnding } from './handlers/video-live-ending.js'
- import { processVideoStudioEdition } from './handlers/video-studio-edition.js'
- import { processVideoTranscoding } from './handlers/video-transcoding.js'
- import { processVideoTranscription } from './handlers/video-transcription.js'
- import { processVideosViewsStats } from './handlers/video-views-stats.js'
-
- export type CreateJobArgument =
- { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
- { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } |
- { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
- { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
- { type: 'activitypub-cleaner', payload: {} } |
- { type: 'activitypub-follow', payload: ActivitypubFollowPayload } |
- { type: 'video-file-import', payload: VideoFileImportPayload } |
- { type: 'video-transcoding', payload: VideoTranscodingPayload } |
- { type: 'email', payload: EmailPayload } |
- { type: 'transcoding-job-builder', payload: TranscodingJobBuilderPayload } |
- { type: 'video-import', payload: VideoImportPayload } |
- { type: 'activitypub-refresher', payload: RefreshPayload } |
- { type: 'videos-views-stats', payload: {} } |
- { type: 'video-live-ending', payload: VideoLiveEndingPayload } |
- { type: 'actor-keys', payload: ActorKeysPayload } |
- { type: 'video-redundancy', payload: VideoRedundancyPayload } |
- { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } |
- { type: 'video-studio-edition', payload: VideoStudioEditionPayload } |
- { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } |
- { type: 'move-to-object-storage', payload: MoveStoragePayload } |
- { type: 'move-to-file-system', payload: MoveStoragePayload } |
- { type: 'video-channel-import', payload: VideoChannelImportPayload } |
- { type: 'after-video-channel-import', payload: AfterVideoChannelImportPayload } |
- { type: 'notify', payload: NotifyPayload } |
- { type: 'federate-video', payload: FederateVideoPayload } |
- { type: 'create-user-export', payload: CreateUserExportPayload } |
- { type: 'generate-video-storyboard', payload: GenerateStoryboardPayload } |
- { type: 'import-user-archive', payload: ImportUserArchivePayload } |
- { type: 'video-transcription', payload: VideoTranscriptionPayload }
-
- export type CreateJobOptions = {
- delay?: number
- priority?: number
- failParentOnFailure?: boolean
- }
-
- const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
- 'activitypub-cleaner': processActivityPubCleaner,
- 'activitypub-follow': processActivityPubFollow,
- 'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast,
- 'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast,
- 'activitypub-http-fetcher': processActivityPubHttpFetcher,
- 'activitypub-http-unicast': processActivityPubHttpUnicast,
- 'activitypub-refresher': refreshAPObject,
- 'actor-keys': processActorKeys,
- 'after-video-channel-import': processAfterVideoChannelImport,
- 'email': processEmail,
- 'federate-video': processFederateVideo,
- 'transcoding-job-builder': processTranscodingJobBuilder,
- 'manage-video-torrent': processManageVideoTorrent,
- 'move-to-object-storage': processMoveToObjectStorage,
- 'move-to-file-system': processMoveToFileSystem,
- 'notify': processNotify,
- 'video-channel-import': processVideoChannelImport,
- 'video-file-import': processVideoFileImport,
- 'video-import': processVideoImport,
- 'video-live-ending': processVideoLiveEnding,
- 'video-redundancy': processVideoRedundancy,
- 'video-studio-edition': processVideoStudioEdition,
- 'video-transcoding': processVideoTranscoding,
- 'videos-views-stats': processVideosViewsStats,
- 'generate-video-storyboard': processGenerateStoryboard,
- 'create-user-export': processCreateUserExport,
- 'import-user-archive': processImportUserArchive,
- 'video-transcription': processVideoTranscription
- }
-
- const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = {
- 'move-to-object-storage': onMoveToObjectStorageFailure,
- 'move-to-file-system': onMoveToFileSystemFailure
- }
-
- const jobTypes: JobType[] = [
- 'activitypub-cleaner',
- 'activitypub-follow',
- 'activitypub-http-broadcast-parallel',
- 'activitypub-http-broadcast',
- 'activitypub-http-fetcher',
- 'activitypub-http-unicast',
- 'activitypub-refresher',
- 'actor-keys',
- 'after-video-channel-import',
- 'email',
- 'federate-video',
- 'generate-video-storyboard',
- 'manage-video-torrent',
- 'move-to-object-storage',
- 'move-to-file-system',
- 'notify',
- 'transcoding-job-builder',
- 'video-channel-import',
- 'video-file-import',
- 'video-import',
- 'video-live-ending',
- 'video-redundancy',
- 'video-studio-edition',
- 'video-transcription',
- 'videos-views-stats',
- 'create-user-export',
- 'import-user-archive',
- 'video-transcoding'
- ]
-
- const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ])
-
- class JobQueue {
-
- private static instance: JobQueue
-
- private workers: { [id in JobType]?: Worker } = {}
- private queues: { [id in JobType]?: Queue } = {}
- private queueEvents: { [id in JobType]?: QueueEvents } = {}
-
- private flowProducer: FlowProducer
-
- private initialized = false
- private jobRedisPrefix: string
-
- private constructor () {
- }
-
- init () {
- // Already initialized
- if (this.initialized === true) return
- this.initialized = true
-
- this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST
-
- for (const handlerName of Object.keys(handlers)) {
- this.buildWorker(handlerName)
- this.buildQueue(handlerName)
- this.buildQueueEvent(handlerName)
- }
-
- this.flowProducer = new FlowProducer({
- connection: Redis.getRedisClientOptions('FlowProducer'),
- prefix: this.jobRedisPrefix
- })
- this.flowProducer.on('error', err => { logger.error('Error in flow producer', { err }) })
-
- this.addRepeatableJobs()
- }
-
- private buildWorker (handlerName: JobType) {
- const workerOptions: WorkerOptions = {
- autorun: false,
- concurrency: this.getJobConcurrency(handlerName),
- prefix: this.jobRedisPrefix,
- connection: Redis.getRedisClientOptions('Worker'),
- maxStalledCount: 10
- }
-
- const handler = function (job: Job) {
- const timeout = JOB_TTL[handlerName]
- const p = handlers[handlerName](job)
-
- if (!timeout) return p
-
- return timeoutPromise(p, timeout)
- }
-
- const processor = async (jobArg: Job<any>) => {
- const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName })
-
- return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result')
- }
-
- const worker = new Worker(handlerName, processor, workerOptions)
-
- worker.on('failed', (job, err) => {
- const logLevel = silentFailure.has(handlerName)
- ? 'debug'
- : 'error'
-
- logger.log(logLevel, 'Cannot execute job %s in queue %s.', job.id, handlerName, { payload: job.data, err })
-
- if (errorHandlers[job.name]) {
- errorHandlers[job.name](job, err)
- .catch(err => logger.error('Cannot run error handler for job failure %d in queue %s.', job.id, handlerName, { err }))
- }
- })
-
- worker.on('error', err => { logger.error('Error in job worker %s.', handlerName, { err }) })
-
- this.workers[handlerName] = worker
- }
-
- private buildQueue (handlerName: JobType) {
- const queueOptions: QueueOptions = {
- connection: Redis.getRedisClientOptions('Queue'),
- prefix: this.jobRedisPrefix
- }
-
- const queue = new Queue(handlerName, queueOptions)
- queue.on('error', err => { logger.error('Error in job queue %s.', handlerName, { err }) })
-
- this.queues[handlerName] = queue
-
- queue.removeDeprecatedPriorityKey()
- .catch(err => logger.error('Cannot remove bullmq deprecated priority keys of ' + handlerName, { err }))
- }
-
- private buildQueueEvent (handlerName: JobType) {
- const queueEventsOptions: QueueEventsOptions = {
- autorun: false,
- connection: Redis.getRedisClientOptions('QueueEvent'),
- prefix: this.jobRedisPrefix
- }
-
- const queueEvents = new QueueEvents(handlerName, queueEventsOptions)
- queueEvents.on('error', err => { logger.error('Error in job queue events %s.', handlerName, { err }) })
-
- this.queueEvents[handlerName] = queueEvents
- }
-
- // ---------------------------------------------------------------------------
-
- async terminate () {
- const promises = Object.keys(this.workers)
- .map(handlerName => {
- const worker: Worker = this.workers[handlerName]
- const queue: Queue = this.queues[handlerName]
- const queueEvent: QueueEvents = this.queueEvents[handlerName]
-
- return Promise.all([
- worker.close(false),
- queue.close(),
- queueEvent.close()
- ])
- })
-
- return Promise.all(promises)
- }
-
- start () {
- const promises = Object.keys(this.workers)
- .map(handlerName => {
- const worker: Worker = this.workers[handlerName]
- const queueEvent: QueueEvents = this.queueEvents[handlerName]
-
- return Promise.all([
- worker.run(),
- queueEvent.run()
- ])
- })
-
- return Promise.all(promises)
- }
-
- async pause () {
- for (const handlerName of Object.keys(this.workers)) {
- const worker: Worker = this.workers[handlerName]
-
- await worker.pause()
- }
- }
-
- resume () {
- for (const handlerName of Object.keys(this.workers)) {
- const worker: Worker = this.workers[handlerName]
-
- worker.resume()
- }
- }
-
- // ---------------------------------------------------------------------------
-
- createJobAsync (options: CreateJobArgument & CreateJobOptions): void {
- this.createJob(options)
- .catch(err => logger.error('Cannot create job.', { err, options }))
- }
-
- createJob (options: CreateJobArgument & CreateJobOptions | undefined) {
- if (!options) return
-
- const queue: Queue = this.queues[options.type]
- if (queue === undefined) {
- logger.error('Unknown queue %s: cannot create job.', options.type)
- return
- }
-
- const jobOptions = this.buildJobOptions(options.type as JobType, pick(options, [ 'priority', 'delay' ]))
-
- return queue.add('job', options.payload, jobOptions)
- }
-
- createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) {
- let lastJob: FlowJob
-
- for (const job of jobs) {
- if (!job) continue
-
- lastJob = {
- ...this.buildJobFlowOption(job),
-
- children: lastJob
- ? [ lastJob ]
- : []
- }
- }
-
- return this.flowProducer.add(lastJob)
- }
-
- createJobWithChildren (parent: CreateJobArgument & CreateJobOptions, children: (CreateJobArgument & CreateJobOptions)[]) {
- return this.flowProducer.add({
- ...this.buildJobFlowOption(parent),
-
- children: children.map(c => this.buildJobFlowOption(c))
- })
- }
-
- private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions): FlowJob {
- return {
- name: 'job',
- data: job.payload,
- queueName: job.type,
- opts: {
- failParentOnFailure: true,
-
- ...this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay', 'failParentOnFailure' ]))
- }
- }
- }
-
- private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions {
- return {
- backoff: { delay: 60 * 1000, type: 'exponential' },
- attempts: JOB_ATTEMPTS[type],
- priority: options.priority,
- delay: options.delay,
-
- ...this.buildJobRemovalOptions(type)
- }
- }
-
- // ---------------------------------------------------------------------------
-
- async listForApi (options: {
- state?: JobState
- start: number
- count: number
- asc?: boolean
- jobType: JobType
- }): Promise<Job[]> {
- const { state, start, count, asc, jobType } = options
-
- const states = this.buildStateFilter(state)
- const filteredJobTypes = this.buildTypeFilter(jobType)
-
- let results: Job[] = []
-
- for (const jobType of filteredJobTypes) {
- const queue: Queue = this.queues[jobType]
-
- if (queue === undefined) {
- logger.error('Unknown queue %s to list jobs.', jobType)
- continue
- }
-
- let jobs = await queue.getJobs(states, 0, start + count, asc)
-
- // FIXME: we have sometimes undefined values https://github.com/taskforcesh/bullmq/issues/248
- jobs = jobs.filter(j => !!j)
-
- results = results.concat(jobs)
- }
-
- results.sort((j1: any, j2: any) => {
- if (j1.timestamp < j2.timestamp) return -1
- else if (j1.timestamp === j2.timestamp) return 0
-
- return 1
- })
-
- if (asc === false) results.reverse()
-
- return results.slice(start, start + count)
- }
-
- async count (state: JobState, jobType?: JobType): Promise<number> {
- const states = this.buildStateFilter(state)
- const filteredJobTypes = this.buildTypeFilter(jobType)
-
- let total = 0
-
- for (const type of filteredJobTypes) {
- const queue = this.queues[type]
- if (queue === undefined) {
- logger.error('Unknown queue %s to count jobs.', type)
- continue
- }
-
- const counts = await queue.getJobCounts()
-
- for (const s of states) {
- total += counts[s]
- }
- }
-
- return total
- }
-
- private buildStateFilter (state?: JobState) {
- if (!state) return Array.from(jobStates)
-
- const states = [ state ]
-
- // Include parent and prioritized if filtering on waiting
- if (state === 'waiting') {
- states.push('waiting-children')
- states.push('prioritized')
- }
-
- return states
- }
-
- private buildTypeFilter (jobType?: JobType) {
- if (!jobType) return jobTypes
-
- return jobTypes.filter(t => t === jobType)
- }
-
- async getStats () {
- const promises = jobTypes.map(async t => ({ jobType: t, counts: await this.queues[t].getJobCounts() }))
-
- return Promise.all(promises)
- }
-
- // ---------------------------------------------------------------------------
-
- async removeOldJobs () {
- for (const key of Object.keys(this.queues)) {
- const queue: Queue = this.queues[key]
- await queue.clean(parseDurationToMs('7 days'), 1000, 'completed')
- await queue.clean(parseDurationToMs('7 days'), 1000, 'failed')
- }
- }
-
- private addRepeatableJobs () {
- this.queues['videos-views-stats'].add('job', {}, {
- repeat: REPEAT_JOBS['videos-views-stats'],
-
- ...this.buildJobRemovalOptions('videos-views-stats')
- }).catch(err => logger.error('Cannot add repeatable job.', { err }))
-
- if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) {
- this.queues['activitypub-cleaner'].add('job', {}, {
- repeat: REPEAT_JOBS['activitypub-cleaner'],
-
- ...this.buildJobRemovalOptions('activitypub-cleaner')
- }).catch(err => logger.error('Cannot add repeatable job.', { err }))
- }
- }
-
- private getJobConcurrency (jobType: JobType) {
- if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY
- if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY
-
- return JOB_CONCURRENCY[jobType]
- }
-
- private buildJobRemovalOptions (queueName: string) {
- return {
- removeOnComplete: {
- // Wants seconds
- age: (JOB_REMOVAL_OPTIONS.SUCCESS[queueName] || JOB_REMOVAL_OPTIONS.SUCCESS.DEFAULT) / 1000,
-
- count: JOB_REMOVAL_OPTIONS.COUNT
- },
- removeOnFail: {
- // Wants seconds
- age: (JOB_REMOVAL_OPTIONS.FAILURE[queueName] || JOB_REMOVAL_OPTIONS.FAILURE.DEFAULT) / 1000,
-
- count: JOB_REMOVAL_OPTIONS.COUNT / 1000
- }
- }
- }
-
- static get Instance () {
- return this.instance || (this.instance = new this())
- }
- }
-
- // ---------------------------------------------------------------------------
-
- export {
- JobQueue, jobTypes
- }
|