はじまりの大地

このコミットが含まれているのは:
2024-07-15 09:14:04 +09:00
コミット 6632905f32
3501個のファイルの変更1439465行の追加0行の削除
+113
ファイルの表示
@@ -0,0 +1,113 @@
#!/usr/bin/env node
import { Command, InvalidArgumentError } from '@commander-js/extra-typings'
import { RunnerJobType } from '@peertube/peertube-models'
import { listRegistered, registerRunner, unregisterRunner } from './register/index.js'
import { RunnerServer } from './server/index.js'
import { getSupportedJobsList } from './server/shared/supported-job.js'
import { ConfigManager, logger } from './shared/index.js'
const program = new Command()
.version(process.env.PACKAGE_VERSION)
.option(
'--id <id>',
'Runner server id, so you can run multiple PeerTube server runners with different configurations on the same machine',
'default'
)
.option('--verbose', 'Run in verbose mode')
.hook('preAction', thisCommand => {
const options = thisCommand.opts()
ConfigManager.Instance.init(options.id)
if (options.verbose === true) {
logger.level = 'debug'
}
})
program.command('server')
.description('Run in server mode, to execute remote jobs of registered PeerTube instances')
.option(
'--enable-job <type>',
'Enable this job type (multiple --enable-job options can be specified). ' +
'By default all supported jobs are enabled). ' +
'Supported job types: ' + getSupportedJobsList().join(', '),
(value: RunnerJobType, previous: RunnerJobType[]) => [ ...previous, value ],
[]
)
.action(async options => {
try {
let enabledJobs: Set<RunnerJobType>
if (options.enableJob) {
for (const jobType of options.enableJob) {
if (getSupportedJobsList().includes(jobType) !== true) {
throw new InvalidArgumentError(`${jobType} is not a supported job`)
}
enabledJobs = new Set(options.enableJob)
}
}
await new RunnerServer(enabledJobs).run()
} catch (err) {
logger.error(err, 'Cannot run PeerTube runner as server mode')
process.exit(-1)
}
})
program.command('register')
.description('Register a new PeerTube instance to process runner jobs')
.requiredOption('--url <url>', 'PeerTube instance URL', parseUrl)
.requiredOption('--registration-token <token>', 'Runner registration token (can be found in PeerTube instance administration')
.requiredOption('--runner-name <name>', 'Runner name')
.option('--runner-description <description>', 'Runner description')
.action(async options => {
try {
await registerRunner(options)
} catch (err) {
console.error('Cannot register this PeerTube runner.')
console.error(err)
process.exit(-1)
}
})
program.command('unregister')
.description('Unregister the runner from PeerTube instance')
.requiredOption('--url <url>', 'PeerTube instance URL', parseUrl)
.requiredOption('--runner-name <name>', 'Runner name')
.action(async options => {
try {
await unregisterRunner(options)
} catch (err) {
console.error('Cannot unregister this PeerTube runner.')
console.error(err)
process.exit(-1)
}
})
program.command('list-registered')
.description('List registered PeerTube instances')
.action(async () => {
try {
await listRegistered()
} catch (err) {
console.error('Cannot list registered PeerTube instances.')
console.error(err)
process.exit(-1)
}
})
program.parse()
// ---------------------------------------------------------------------------
// Private
// ---------------------------------------------------------------------------
function parseUrl (url: string) {
if (url.startsWith('http://') !== true && url.startsWith('https://') !== true) {
throw new InvalidArgumentError('URL should start with a http:// or https://')
}
return url
}
+1
ファイルの表示
@@ -0,0 +1 @@
export * from './register.js'
+36
ファイルの表示
@@ -0,0 +1,36 @@
import { IPCClient } from '../shared/ipc/index.js'
export async function registerRunner (options: {
url: string
registrationToken: string
runnerName: string
runnerDescription?: string
}) {
const client = new IPCClient()
await client.run()
await client.askRegister(options)
client.stop()
}
export async function unregisterRunner (options: {
url: string
runnerName: string
}) {
const client = new IPCClient()
await client.run()
await client.askUnregister(options)
client.stop()
}
export async function listRegistered () {
const client = new IPCClient()
await client.run()
await client.askListRegistered()
client.stop()
}
+1
ファイルの表示
@@ -0,0 +1 @@
export * from './server.js'
+2
ファイルの表示
@@ -0,0 +1,2 @@
export * from './shared/index.js'
export * from './process.js'
+51
ファイルの表示
@@ -0,0 +1,51 @@
import {
RunnerJobLiveRTMPHLSTranscodingPayload,
RunnerJobStudioTranscodingPayload,
RunnerJobTranscriptionPayload,
RunnerJobVODAudioMergeTranscodingPayload,
RunnerJobVODHLSTranscodingPayload,
RunnerJobVODWebVideoTranscodingPayload
} from '@peertube/peertube-models'
import { logger } from '../../shared/index.js'
import { processAudioMergeTranscoding, processHLSTranscoding, ProcessOptions, processWebVideoTranscoding } from './shared/index.js'
import { ProcessLiveRTMPHLSTranscoding } from './shared/process-live.js'
import { processStudioTranscoding } from './shared/process-studio.js'
import { processVideoTranscription } from './shared/process-transcription.js'
export async function processJob (options: ProcessOptions) {
const { server, job } = options
logger.info(`[${server.url}] Processing job of type ${job.type}: ${job.uuid}`, { payload: job.payload })
switch (job.type) {
case 'vod-audio-merge-transcoding':
await processAudioMergeTranscoding(options as ProcessOptions<RunnerJobVODAudioMergeTranscodingPayload>)
break
case 'vod-web-video-transcoding':
await processWebVideoTranscoding(options as ProcessOptions<RunnerJobVODWebVideoTranscodingPayload>)
break
case 'vod-hls-transcoding':
await processHLSTranscoding(options as ProcessOptions<RunnerJobVODHLSTranscodingPayload>)
break
case 'live-rtmp-hls-transcoding':
await new ProcessLiveRTMPHLSTranscoding(options as ProcessOptions<RunnerJobLiveRTMPHLSTranscodingPayload>).process()
break
case 'video-studio-transcoding':
await processStudioTranscoding(options as ProcessOptions<RunnerJobStudioTranscodingPayload>)
break
case 'video-transcription':
await processVideoTranscription(options as ProcessOptions<RunnerJobTranscriptionPayload>)
break
default:
logger.error(`Unknown job ${job.type} to process`)
return
}
logger.info(`[${server.url}] Finished processing job of type ${job.type}: ${job.uuid}`)
}
+106
ファイルの表示
@@ -0,0 +1,106 @@
import { remove } from 'fs-extra/esm'
import { join } from 'path'
import { FFmpegEdition, FFmpegLive, FFmpegVOD, getDefaultAvailableEncoders, getDefaultEncodersToTry } from '@peertube/peertube-ffmpeg'
import { RunnerJob, RunnerJobPayload } from '@peertube/peertube-models'
import { buildUUID } from '@peertube/peertube-node-utils'
import { PeerTubeServer } from '@peertube/peertube-server-commands'
import { ConfigManager, downloadFile, logger } from '../../../shared/index.js'
import { getWinstonLogger } from './winston-logger.js'
export type JobWithToken <T extends RunnerJobPayload = RunnerJobPayload> = RunnerJob<T> & { jobToken: string }
export type ProcessOptions <T extends RunnerJobPayload = RunnerJobPayload> = {
server: PeerTubeServer
job: JobWithToken<T>
runnerToken: string
}
export async function downloadInputFile (options: {
url: string
job: JobWithToken
runnerToken: string
}) {
const { url, job, runnerToken } = options
const destination = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID())
try {
await downloadFile({ url, jobToken: job.jobToken, runnerToken, destination })
} catch (err) {
remove(destination)
.catch(err => logger.error({ err }, `Cannot remove ${destination}`))
throw err
}
return destination
}
export function scheduleTranscodingProgress (options: {
server: PeerTubeServer
runnerToken: string
job: JobWithToken
progressGetter: () => number
}) {
const { job, server, progressGetter, runnerToken } = options
const updateInterval = ConfigManager.Instance.isTestInstance()
? 500
: 60000
const update = () => {
server.runnerJobs.update({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken, progress: progressGetter() })
.catch(err => logger.error({ err }, 'Cannot send job progress'))
}
const interval = setInterval(() => {
update()
}, updateInterval)
update()
return interval
}
// ---------------------------------------------------------------------------
export function buildFFmpegVOD (options: {
onJobProgress: (progress: number) => void
}) {
const { onJobProgress } = options
return new FFmpegVOD({
...getCommonFFmpegOptions(),
updateJobProgress: arg => {
const progress = arg < 0 || arg > 100
? undefined
: arg
onJobProgress(progress)
}
})
}
export function buildFFmpegLive () {
return new FFmpegLive(getCommonFFmpegOptions())
}
export function buildFFmpegEdition () {
return new FFmpegEdition(getCommonFFmpegOptions())
}
function getCommonFFmpegOptions () {
const config = ConfigManager.Instance.getConfig()
return {
niceness: config.ffmpeg.nice,
threads: config.ffmpeg.threads,
tmpDirectory: ConfigManager.Instance.getTranscodingDirectory(),
profile: 'default',
availableEncoders: {
available: getDefaultAvailableEncoders(),
encodersToTry: getDefaultEncodersToTry()
},
logger: getWinstonLogger()
}
}
+3
ファイルの表示
@@ -0,0 +1,3 @@
export * from './common.js'
export * from './process-vod.js'
export * from './winston-logger.js'
+342
ファイルの表示
@@ -0,0 +1,342 @@
import { FSWatcher, watch } from 'chokidar'
import { FfmpegCommand } from 'fluent-ffmpeg'
import { ensureDir, remove } from 'fs-extra/esm'
import { basename, join } from 'path'
import { wait } from '@peertube/peertube-core-utils'
import { ffprobePromise, getVideoStreamBitrate, getVideoStreamDimensionsInfo, hasAudioStream } from '@peertube/peertube-ffmpeg'
import {
LiveRTMPHLSTranscodingSuccess,
LiveRTMPHLSTranscodingUpdatePayload,
PeerTubeProblemDocument,
RunnerJobLiveRTMPHLSTranscodingPayload,
ServerErrorCode
} from '@peertube/peertube-models'
import { buildUUID } from '@peertube/peertube-node-utils'
import { ConfigManager } from '../../../shared/config-manager.js'
import { logger } from '../../../shared/index.js'
import { buildFFmpegLive, ProcessOptions } from './common.js'
export class ProcessLiveRTMPHLSTranscoding {
private readonly outputPath: string
private readonly fsWatchers: FSWatcher[] = []
// Playlist name -> chunks
private readonly pendingChunksPerPlaylist = new Map<string, string[]>()
private readonly playlistsCreated = new Set<string>()
private allPlaylistsCreated = false
private ffmpegCommand: FfmpegCommand
private ended = false
private errored = false
constructor (private readonly options: ProcessOptions<RunnerJobLiveRTMPHLSTranscodingPayload>) {
this.outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID())
logger.debug(`Using ${this.outputPath} to process live rtmp hls transcoding job ${options.job.uuid}`)
}
process () {
const job = this.options.job
const payload = job.payload
return new Promise<void>(async (res, rej) => {
try {
await ensureDir(this.outputPath)
logger.info(`Probing ${payload.input.rtmpUrl}`)
const probe = await ffprobePromise(payload.input.rtmpUrl)
logger.info({ probe }, `Probed ${payload.input.rtmpUrl}`)
const hasAudio = await hasAudioStream(payload.input.rtmpUrl, probe)
const bitrate = await getVideoStreamBitrate(payload.input.rtmpUrl, probe)
const { ratio } = await getVideoStreamDimensionsInfo(payload.input.rtmpUrl, probe)
const m3u8Watcher = watch(this.outputPath + '/*.m3u8')
this.fsWatchers.push(m3u8Watcher)
const tsWatcher = watch(this.outputPath + '/*.ts')
this.fsWatchers.push(tsWatcher)
m3u8Watcher.on('change', p => {
logger.debug(`${p} m3u8 playlist changed`)
})
m3u8Watcher.on('add', p => {
this.playlistsCreated.add(p)
if (this.playlistsCreated.size === this.options.job.payload.output.toTranscode.length + 1) {
this.allPlaylistsCreated = true
logger.info('All m3u8 playlists are created.')
}
})
tsWatcher.on('add', async p => {
try {
await this.sendPendingChunks()
} catch (err) {
this.onUpdateError({ err, rej, res })
}
const playlistName = this.getPlaylistIdFromTS(p)
const pendingChunks = this.pendingChunksPerPlaylist.get(playlistName) || []
pendingChunks.push(p)
this.pendingChunksPerPlaylist.set(playlistName, pendingChunks)
})
tsWatcher.on('unlink', p => {
this.sendDeletedChunkUpdate(p)
.catch(err => this.onUpdateError({ err, rej, res }))
})
this.ffmpegCommand = await buildFFmpegLive().getLiveTranscodingCommand({
inputUrl: payload.input.rtmpUrl,
outPath: this.outputPath,
masterPlaylistName: 'master.m3u8',
segmentListSize: payload.output.segmentListSize,
segmentDuration: payload.output.segmentDuration,
toTranscode: payload.output.toTranscode,
bitrate,
ratio,
hasAudio,
probe
})
logger.info(`Running live transcoding for ${payload.input.rtmpUrl}`)
this.ffmpegCommand.on('error', (err, stdout, stderr) => {
this.onFFmpegError({ err, stdout, stderr })
res()
})
this.ffmpegCommand.on('end', () => {
this.onFFmpegEnded()
.catch(err => logger.error({ err }, 'Error in FFmpeg end handler'))
res()
})
this.ffmpegCommand.run()
} catch (err) {
rej(err)
}
})
}
// ---------------------------------------------------------------------------
private onUpdateError (options: {
err: Error
res: () => void
rej: (reason?: any) => void
}) {
const { err, res, rej } = options
if (this.errored) return
if (this.ended) return
this.errored = true
this.ffmpegCommand.kill('SIGINT')
const type = ((err as any).res?.body as PeerTubeProblemDocument)?.code
if (type === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) {
logger.info('Stopping transcoding as the job is not in processing state anymore')
this.sendSuccess()
.catch(err => logger.error({ err }, 'Cannot send success'))
res()
} else {
logger.error({ err }, 'Cannot send update after added/deleted chunk, stopping live transcoding')
this.sendError(err)
.catch(subErr => logger.error({ err: subErr }, 'Cannot send error'))
rej(err)
}
this.cleanup()
}
// ---------------------------------------------------------------------------
private onFFmpegError (options: {
err: any
stdout: string
stderr: string
}) {
const { err, stdout, stderr } = options
// Don't care that we killed the ffmpeg process
if (err?.message?.includes('Exiting normally')) return
if (this.errored) return
if (this.ended) return
this.errored = true
logger.error({ err, stdout, stderr }, 'FFmpeg transcoding error.')
this.sendError(err)
.catch(subErr => logger.error({ err: subErr }, 'Cannot send error'))
this.cleanup()
}
private async sendError (err: Error) {
await this.options.server.runnerJobs.error({
jobToken: this.options.job.jobToken,
jobUUID: this.options.job.uuid,
runnerToken: this.options.runnerToken,
message: err.message
})
}
// ---------------------------------------------------------------------------
private async onFFmpegEnded () {
if (this.ended) return
this.ended = true
logger.info('FFmpeg ended, sending success to server')
// Wait last ffmpeg chunks generation
await wait(1500)
this.sendSuccess()
.catch(err => logger.error({ err }, 'Cannot send success'))
this.cleanup()
}
private async sendSuccess () {
const successBody: LiveRTMPHLSTranscodingSuccess = {}
await this.options.server.runnerJobs.success({
jobToken: this.options.job.jobToken,
jobUUID: this.options.job.uuid,
runnerToken: this.options.runnerToken,
payload: successBody
})
}
// ---------------------------------------------------------------------------
private sendDeletedChunkUpdate (deletedChunk: string): Promise<any> {
if (this.ended) return Promise.resolve()
logger.debug(`Sending removed live chunk ${deletedChunk} update`)
const videoChunkFilename = basename(deletedChunk)
let payload: LiveRTMPHLSTranscodingUpdatePayload = {
type: 'remove-chunk',
videoChunkFilename
}
if (this.allPlaylistsCreated) {
const playlistName = this.getPlaylistName(videoChunkFilename)
payload = {
...payload,
masterPlaylistFile: join(this.outputPath, 'master.m3u8'),
resolutionPlaylistFilename: playlistName,
resolutionPlaylistFile: join(this.outputPath, playlistName)
}
}
return this.updateWithRetry(payload)
}
private async sendPendingChunks (): Promise<any> {
if (this.ended) return Promise.resolve()
const promises: Promise<any>[] = []
for (const playlist of this.pendingChunksPerPlaylist.keys()) {
for (const chunk of this.pendingChunksPerPlaylist.get(playlist)) {
logger.debug(`Sending added live chunk ${chunk} update`)
const videoChunkFilename = basename(chunk)
let payload: LiveRTMPHLSTranscodingUpdatePayload = {
type: 'add-chunk',
videoChunkFilename,
videoChunkFile: chunk
}
if (this.allPlaylistsCreated) {
const playlistName = this.getPlaylistName(videoChunkFilename)
payload = {
...payload,
masterPlaylistFile: join(this.outputPath, 'master.m3u8'),
resolutionPlaylistFilename: playlistName,
resolutionPlaylistFile: join(this.outputPath, playlistName)
}
}
promises.push(this.updateWithRetry(payload))
}
this.pendingChunksPerPlaylist.set(playlist, [])
}
await Promise.all(promises)
}
private async updateWithRetry (payload: LiveRTMPHLSTranscodingUpdatePayload, currentTry = 1): Promise<any> {
if (this.ended || this.errored) return
try {
await this.options.server.runnerJobs.update({
jobToken: this.options.job.jobToken,
jobUUID: this.options.job.uuid,
runnerToken: this.options.runnerToken,
payload
})
} catch (err) {
if (currentTry >= 3) throw err
if ((err.res?.body as PeerTubeProblemDocument)?.code === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) throw err
logger.warn({ err }, 'Will retry update after error')
await wait(250)
return this.updateWithRetry(payload, currentTry + 1)
}
}
private getPlaylistName (videoChunkFilename: string) {
return `${videoChunkFilename.split('-')[0]}.m3u8`
}
private getPlaylistIdFromTS (segmentPath: string) {
const playlistIdMatcher = /^([\d+])-/
return basename(segmentPath).match(playlistIdMatcher)[1]
}
// ---------------------------------------------------------------------------
private cleanup () {
logger.debug(`Cleaning up job ${this.options.job.uuid}`)
for (const fsWatcher of this.fsWatchers) {
fsWatcher.close()
.catch(err => logger.error({ err }, 'Cannot close watcher'))
}
remove(this.outputPath)
.catch(err => logger.error({ err }, `Cannot remove ${this.outputPath}`))
}
}
+165
ファイルの表示
@@ -0,0 +1,165 @@
import { remove } from 'fs-extra/esm'
import { join } from 'path'
import { pick } from '@peertube/peertube-core-utils'
import {
RunnerJobStudioTranscodingPayload,
VideoStudioTask,
VideoStudioTaskCutPayload,
VideoStudioTaskIntroPayload,
VideoStudioTaskOutroPayload,
VideoStudioTaskPayload,
VideoStudioTaskWatermarkPayload,
VideoStudioTranscodingSuccess
} from '@peertube/peertube-models'
import { buildUUID } from '@peertube/peertube-node-utils'
import { ConfigManager } from '../../../shared/config-manager.js'
import { logger } from '../../../shared/index.js'
import { buildFFmpegEdition, downloadInputFile, JobWithToken, ProcessOptions, scheduleTranscodingProgress } from './common.js'
export async function processStudioTranscoding (options: ProcessOptions<RunnerJobStudioTranscodingPayload>) {
const { server, job, runnerToken } = options
const payload = job.payload
let inputPath: string
let outputPath: string
let tmpInputFilePath: string
let tasksProgress = 0
const updateProgressInterval = scheduleTranscodingProgress({
job,
server,
runnerToken,
progressGetter: () => tasksProgress
})
try {
logger.info(`Downloading input file ${payload.input.videoFileUrl} for job ${job.jobToken}`)
inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job })
tmpInputFilePath = inputPath
logger.info(`Input file ${payload.input.videoFileUrl} downloaded for job ${job.jobToken}. Running studio transcoding tasks.`)
for (const task of payload.tasks) {
const outputFilename = 'output-edition-' + buildUUID() + '.mp4'
outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), outputFilename)
await processTask({
inputPath: tmpInputFilePath,
outputPath,
task,
job,
runnerToken
})
if (tmpInputFilePath) await remove(tmpInputFilePath)
// For the next iteration
tmpInputFilePath = outputPath
tasksProgress += Math.floor(100 / payload.tasks.length)
}
const successBody: VideoStudioTranscodingSuccess = {
videoFile: outputPath
}
await server.runnerJobs.success({
jobToken: job.jobToken,
jobUUID: job.uuid,
runnerToken,
payload: successBody
})
} finally {
if (tmpInputFilePath) await remove(tmpInputFilePath)
if (outputPath) await remove(outputPath)
if (updateProgressInterval) clearInterval(updateProgressInterval)
}
}
// ---------------------------------------------------------------------------
// Private
// ---------------------------------------------------------------------------
type TaskProcessorOptions <T extends VideoStudioTaskPayload = VideoStudioTaskPayload> = {
inputPath: string
outputPath: string
task: T
runnerToken: string
job: JobWithToken
}
const taskProcessors: { [id in VideoStudioTask['name']]: (options: TaskProcessorOptions) => Promise<any> } = {
'add-intro': processAddIntroOutro,
'add-outro': processAddIntroOutro,
'cut': processCut,
'add-watermark': processAddWatermark
}
async function processTask (options: TaskProcessorOptions) {
const { task } = options
const processor = taskProcessors[options.task.name]
if (!process) throw new Error('Unknown task ' + task.name)
return processor(options)
}
async function processAddIntroOutro (options: TaskProcessorOptions<VideoStudioTaskIntroPayload | VideoStudioTaskOutroPayload>) {
const { inputPath, task, runnerToken, job } = options
logger.debug('Adding intro/outro to ' + inputPath)
const introOutroPath = await downloadInputFile({ url: task.options.file, runnerToken, job })
try {
await buildFFmpegEdition().addIntroOutro({
...pick(options, [ 'inputPath', 'outputPath' ]),
introOutroPath,
type: task.name === 'add-intro'
? 'intro'
: 'outro'
})
} finally {
await remove(introOutroPath)
}
}
function processCut (options: TaskProcessorOptions<VideoStudioTaskCutPayload>) {
const { inputPath, task } = options
logger.debug(`Cutting ${inputPath}`)
return buildFFmpegEdition().cutVideo({
...pick(options, [ 'inputPath', 'outputPath' ]),
start: task.options.start,
end: task.options.end
})
}
async function processAddWatermark (options: TaskProcessorOptions<VideoStudioTaskWatermarkPayload>) {
const { inputPath, task, runnerToken, job } = options
logger.debug('Adding watermark to ' + inputPath)
const watermarkPath = await downloadInputFile({ url: task.options.file, runnerToken, job })
try {
await buildFFmpegEdition().addWatermark({
...pick(options, [ 'inputPath', 'outputPath' ]),
watermarkPath,
videoFilters: {
watermarkSizeRatio: task.options.watermarkSizeRatio,
horitonzalMarginRatio: task.options.horitonzalMarginRatio,
verticalMarginRatio: task.options.verticalMarginRatio
}
})
} finally {
await remove(watermarkPath)
}
}
+79
ファイルの表示
@@ -0,0 +1,79 @@
import { hasAudioStream } from '@peertube/peertube-ffmpeg'
import { RunnerJobTranscriptionPayload, TranscriptionSuccess } from '@peertube/peertube-models'
import { buildSUUID } from '@peertube/peertube-node-utils'
import { TranscriptionModel, WhisperBuiltinModel, transcriberFactory } from '@peertube/peertube-transcription'
import { remove } from 'fs-extra/esm'
import { join } from 'path'
import { ConfigManager } from '../../../shared/config-manager.js'
import { logger } from '../../../shared/index.js'
import { ProcessOptions, downloadInputFile, scheduleTranscodingProgress } from './common.js'
import { getWinstonLogger } from './winston-logger.js'
export async function processVideoTranscription (options: ProcessOptions<RunnerJobTranscriptionPayload>) {
const { server, job, runnerToken } = options
const config = ConfigManager.Instance.getConfig().transcription
const payload = job.payload
let inputPath: string
const updateProgressInterval = scheduleTranscodingProgress({
job,
server,
runnerToken,
progressGetter: () => undefined
})
const outputPath = join(ConfigManager.Instance.getTranscriptionDirectory(), buildSUUID())
const transcriber = transcriberFactory.createFromEngineName({
engineName: config.engine,
enginePath: config.enginePath,
logger: getWinstonLogger()
})
try {
logger.info(`Downloading input file ${payload.input.videoFileUrl} for transcription job ${job.jobToken}`)
inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job })
logger.info(`Downloaded input file ${payload.input.videoFileUrl} for job ${job.jobToken}. Running transcription.`)
if (await hasAudioStream(inputPath) !== true) {
await server.runnerJobs.error({
jobToken: job.jobToken,
jobUUID: job.uuid,
runnerToken,
message: 'This input file does not contain audio'
})
return
}
const transcriptFile = await transcriber.transcribe({
mediaFilePath: inputPath,
model: config.modelPath
? await TranscriptionModel.fromPath(config.modelPath)
: new WhisperBuiltinModel(config.model),
format: 'vtt',
transcriptDirectory: outputPath
})
const successBody: TranscriptionSuccess = {
inputLanguage: transcriptFile.language,
vttFile: transcriptFile.path
}
await server.runnerJobs.success({
jobToken: job.jobToken,
jobUUID: job.uuid,
runnerToken,
payload: successBody
})
} finally {
if (inputPath) await remove(inputPath)
if (outputPath) await remove(outputPath)
if (updateProgressInterval) clearInterval(updateProgressInterval)
}
}
+201
ファイルの表示
@@ -0,0 +1,201 @@
import { remove } from 'fs-extra/esm'
import { join } from 'path'
import {
RunnerJobVODAudioMergeTranscodingPayload,
RunnerJobVODHLSTranscodingPayload,
RunnerJobVODWebVideoTranscodingPayload,
VODAudioMergeTranscodingSuccess,
VODHLSTranscodingSuccess,
VODWebVideoTranscodingSuccess
} from '@peertube/peertube-models'
import { buildUUID } from '@peertube/peertube-node-utils'
import { ConfigManager } from '../../../shared/config-manager.js'
import { logger } from '../../../shared/index.js'
import { buildFFmpegVOD, downloadInputFile, ProcessOptions, scheduleTranscodingProgress } from './common.js'
export async function processWebVideoTranscoding (options: ProcessOptions<RunnerJobVODWebVideoTranscodingPayload>) {
const { server, job, runnerToken } = options
const payload = job.payload
let ffmpegProgress: number
let inputPath: string
const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `output-${buildUUID()}.mp4`)
const updateProgressInterval = scheduleTranscodingProgress({
job,
server,
runnerToken,
progressGetter: () => ffmpegProgress
})
try {
logger.info(`Downloading input file ${payload.input.videoFileUrl} for web video transcoding job ${job.jobToken}`)
inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job })
logger.info(`Downloaded input file ${payload.input.videoFileUrl} for job ${job.jobToken}. Running web video transcoding.`)
const ffmpegVod = buildFFmpegVOD({
onJobProgress: progress => { ffmpegProgress = progress }
})
await ffmpegVod.transcode({
type: 'video',
inputPath,
outputPath,
inputFileMutexReleaser: () => {},
resolution: payload.output.resolution,
fps: payload.output.fps
})
const successBody: VODWebVideoTranscodingSuccess = {
videoFile: outputPath
}
await server.runnerJobs.success({
jobToken: job.jobToken,
jobUUID: job.uuid,
runnerToken,
payload: successBody
})
} finally {
if (inputPath) await remove(inputPath)
if (outputPath) await remove(outputPath)
if (updateProgressInterval) clearInterval(updateProgressInterval)
}
}
export async function processHLSTranscoding (options: ProcessOptions<RunnerJobVODHLSTranscodingPayload>) {
const { server, job, runnerToken } = options
const payload = job.payload
let ffmpegProgress: number
let inputPath: string
const uuid = buildUUID()
const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `${uuid}-${payload.output.resolution}.m3u8`)
const videoFilename = `${uuid}-${payload.output.resolution}-fragmented.mp4`
const videoPath = join(join(ConfigManager.Instance.getTranscodingDirectory(), videoFilename))
const updateProgressInterval = scheduleTranscodingProgress({
job,
server,
runnerToken,
progressGetter: () => ffmpegProgress
})
try {
logger.info(`Downloading input file ${payload.input.videoFileUrl} for HLS transcoding job ${job.jobToken}`)
inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job })
logger.info(`Downloaded input file ${payload.input.videoFileUrl} for job ${job.jobToken}. Running HLS transcoding.`)
const ffmpegVod = buildFFmpegVOD({
onJobProgress: progress => { ffmpegProgress = progress }
})
await ffmpegVod.transcode({
type: 'hls',
copyCodecs: false,
inputPath,
hlsPlaylist: { videoFilename },
outputPath,
inputFileMutexReleaser: () => {},
resolution: payload.output.resolution,
fps: payload.output.fps
})
const successBody: VODHLSTranscodingSuccess = {
resolutionPlaylistFile: outputPath,
videoFile: videoPath
}
await server.runnerJobs.success({
jobToken: job.jobToken,
jobUUID: job.uuid,
runnerToken,
payload: successBody
})
} finally {
if (inputPath) await remove(inputPath)
if (outputPath) await remove(outputPath)
if (videoPath) await remove(videoPath)
if (updateProgressInterval) clearInterval(updateProgressInterval)
}
}
export async function processAudioMergeTranscoding (options: ProcessOptions<RunnerJobVODAudioMergeTranscodingPayload>) {
const { server, job, runnerToken } = options
const payload = job.payload
let ffmpegProgress: number
let audioPath: string
let inputPath: string
const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `output-${buildUUID()}.mp4`)
const updateProgressInterval = scheduleTranscodingProgress({
job,
server,
runnerToken,
progressGetter: () => ffmpegProgress
})
try {
logger.info(
`Downloading input files ${payload.input.audioFileUrl} and ${payload.input.previewFileUrl} ` +
`for audio merge transcoding job ${job.jobToken}`
)
audioPath = await downloadInputFile({ url: payload.input.audioFileUrl, runnerToken, job })
inputPath = await downloadInputFile({ url: payload.input.previewFileUrl, runnerToken, job })
logger.info(
`Downloaded input files ${payload.input.audioFileUrl} and ${payload.input.previewFileUrl} ` +
`for job ${job.jobToken}. Running audio merge transcoding.`
)
const ffmpegVod = buildFFmpegVOD({
onJobProgress: progress => { ffmpegProgress = progress }
})
await ffmpegVod.transcode({
type: 'merge-audio',
audioPath,
inputPath,
outputPath,
inputFileMutexReleaser: () => {},
resolution: payload.output.resolution,
fps: payload.output.fps
})
const successBody: VODAudioMergeTranscodingSuccess = {
videoFile: outputPath
}
await server.runnerJobs.success({
jobToken: job.jobToken,
jobUUID: job.uuid,
runnerToken,
payload: successBody
})
} finally {
if (audioPath) await remove(audioPath)
if (inputPath) await remove(inputPath)
if (outputPath) await remove(outputPath)
if (updateProgressInterval) clearInterval(updateProgressInterval)
}
}
+19
ファイルの表示
@@ -0,0 +1,19 @@
import { LogFn } from 'pino'
import { logger } from '../../../shared/index.js'
export function getWinstonLogger () {
return {
info: buildLogLevelFn(logger.info.bind(logger)),
debug: buildLogLevelFn(logger.debug.bind(logger)),
warn: buildLogLevelFn(logger.warn.bind(logger)),
error: buildLogLevelFn(logger.error.bind(logger))
}
}
function buildLogLevelFn (log: LogFn) {
return (arg1: string, arg2?: object) => {
if (arg2) return log(arg2, arg1)
return log(arg1)
}
}
+322
ファイルの表示
@@ -0,0 +1,322 @@
import { ensureDir, remove } from 'fs-extra/esm'
import { readdir } from 'fs/promises'
import { join } from 'path'
import { io, Socket } from 'socket.io-client'
import { pick, shuffle, wait } from '@peertube/peertube-core-utils'
import { PeerTubeProblemDocument, RunnerJobType, ServerErrorCode } from '@peertube/peertube-models'
import { PeerTubeServer as PeerTubeServerCommand } from '@peertube/peertube-server-commands'
import { ConfigManager } from '../shared/index.js'
import { IPCServer } from '../shared/ipc/index.js'
import { logger } from '../shared/logger.js'
import { JobWithToken, processJob } from './process/index.js'
import { getSupportedJobsList, isJobSupported } from './shared/index.js'
type PeerTubeServer = PeerTubeServerCommand & {
runnerToken: string
runnerName: string
runnerDescription?: string
}
export class RunnerServer {
private servers: PeerTubeServer[] = []
private processingJobs: { job: JobWithToken, server: PeerTubeServer }[] = []
private checkingAvailableJobs = false
private cleaningUp = false
private initialized = false
private readonly sockets = new Map<PeerTubeServer, Socket>()
constructor (private readonly enabledJobs?: Set<RunnerJobType>) {}
async run () {
logger.info('Running PeerTube runner in server mode')
const enabledJobsArray = this.enabledJobs
? Array.from(this.enabledJobs)
: getSupportedJobsList()
logger.info('Supported and enabled job types: ' + enabledJobsArray.join(', '))
await ConfigManager.Instance.load()
for (const registered of ConfigManager.Instance.getConfig().registeredInstances) {
const serverCommand = new PeerTubeServerCommand({ url: registered.url })
this.loadServer(Object.assign(serverCommand, registered))
logger.info(`Loading registered instance ${registered.url}`)
}
// Run IPC
const ipcServer = new IPCServer()
try {
await ipcServer.run(this)
} catch (err) {
logger.error('Cannot start local socket for IPC communication', err)
process.exit(-1)
}
// Cleanup on exit
for (const code of [ 'SIGTERM', 'SIGINT', 'SIGUSR1', 'SIGUSR2', 'uncaughtException' ]) {
process.on(code, async (err, origin) => {
if (code === 'uncaughtException') {
logger.error({ err, origin }, 'uncaughtException')
}
await this.onExit()
})
}
// Process jobs
await ensureDir(ConfigManager.Instance.getTranscodingDirectory())
await this.cleanupTMP()
logger.info(`Using ${ConfigManager.Instance.getTranscodingDirectory()} for transcoding directory`)
this.initialized = true
await this.checkAvailableJobs()
}
// ---------------------------------------------------------------------------
async registerRunner (options: {
url: string
registrationToken: string
runnerName: string
runnerDescription?: string
}) {
const { url, registrationToken, runnerName, runnerDescription } = options
logger.info(`Registering runner ${runnerName} on ${url}...`)
const serverCommand = new PeerTubeServerCommand({ url })
const { runnerToken } = await serverCommand.runners.register({ name: runnerName, description: runnerDescription, registrationToken })
const server: PeerTubeServer = Object.assign(serverCommand, {
runnerToken,
runnerName,
runnerDescription
})
this.loadServer(server)
await this.saveRegisteredInstancesInConf()
logger.info(`Registered runner ${runnerName} on ${url}`)
}
private loadServer (server: PeerTubeServer) {
this.servers.push(server)
const url = server.url + '/runners'
const socket = io(url, {
auth: {
runnerToken: server.runnerToken
},
transports: [ 'websocket' ]
})
socket.on('connect_error', err => logger.warn({ err }, `Cannot connect to ${url} socket`))
socket.on('available-jobs', () => this.safeAsyncCheckAvailableJobs())
socket.on('connect', () => {
logger.info(`Connected to ${url} socket`)
this.safeAsyncCheckAvailableJobs()
})
socket.on('disconnect', () => logger.warn(`Disconnected from ${url} socket`))
socket.io.on('ping', () => logger.debug(`Received a "ping" for ${url}`))
this.sockets.set(server, socket)
}
async unregisterRunner (options: {
url: string
runnerName: string
}) {
const { url, runnerName } = options
const server = this.servers.find(s => s.url === url && s.runnerName === runnerName)
if (!server) {
logger.error(`Unknown server ${url} - ${runnerName} to unregister`)
return
}
logger.info(`Unregistering runner ${runnerName} on ${url}...`)
try {
await server.runners.unregister({ runnerToken: server.runnerToken })
} catch (err) {
logger.error({ err }, `Cannot unregister runner ${runnerName} on ${url}`)
}
this.unloadServer(server)
await this.saveRegisteredInstancesInConf()
logger.info(`Unregistered runner ${runnerName} on ${url}`)
}
private unloadServer (server: PeerTubeServer) {
this.servers = this.servers.filter(s => s !== server)
const socket = this.sockets.get(server)
socket.disconnect()
this.sockets.delete(server)
}
listRegistered () {
return {
servers: this.servers.map(s => {
return {
url: s.url,
runnerName: s.runnerName,
runnerDescription: s.runnerDescription
}
})
}
}
// ---------------------------------------------------------------------------
private safeAsyncCheckAvailableJobs () {
this.checkAvailableJobs()
.catch(err => logger.error({ err }, `Cannot check available jobs`))
}
private async checkAvailableJobs () {
if (!this.initialized) return
if (this.checkingAvailableJobs) return
this.checkingAvailableJobs = true
let hadAvailableJob = false
for (const server of shuffle([ ...this.servers ])) {
try {
logger.info('Checking available jobs on ' + server.url)
const job = await this.requestJob(server)
if (!job) continue
hadAvailableJob = true
await this.tryToExecuteJobAsync(server, job)
} catch (err) {
hadAvailableJob = false
const code = (err.res?.body as PeerTubeProblemDocument)?.code
if (code === ServerErrorCode.RUNNER_JOB_NOT_IN_PENDING_STATE) {
logger.debug({ err }, 'Runner job is not in pending state anymore, retry later')
continue
}
if (code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) {
logger.error({ err }, `Unregistering ${server.url} as the runner token ${server.runnerToken} is invalid`)
await this.unregisterRunner({ url: server.url, runnerName: server.runnerName })
continue
}
logger.error({ err }, `Cannot request/accept job on ${server.url} for runner ${server.runnerName}`)
}
}
this.checkingAvailableJobs = false
if (hadAvailableJob && this.canProcessMoreJobs()) {
await wait(2500)
this.checkAvailableJobs()
.catch(err => logger.error({ err }, 'Cannot check more available jobs'))
}
}
private async requestJob (server: PeerTubeServer) {
logger.debug(`Requesting jobs on ${server.url} for runner ${server.runnerName}`)
const { availableJobs } = await server.runnerJobs.request({ runnerToken: server.runnerToken })
const filtered = availableJobs.filter(j => isJobSupported(j, this.enabledJobs))
if (filtered.length === 0) {
logger.debug(`No job available on ${server.url} for runner ${server.runnerName}`)
return undefined
}
return filtered[0]
}
private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) {
if (!this.canProcessMoreJobs()) return
const { job } = await server.runnerJobs.accept({ runnerToken: server.runnerToken, jobUUID: jobToAccept.uuid })
const processingJob = { job, server }
this.processingJobs.push(processingJob)
processJob({ server, job, runnerToken: server.runnerToken })
.catch(err => {
logger.error({ err }, 'Cannot process job')
server.runnerJobs.error({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken: server.runnerToken, message: err.message })
.catch(err2 => logger.error({ err: err2 }, 'Cannot abort job after error'))
})
.finally(() => {
this.processingJobs = this.processingJobs.filter(p => p !== processingJob)
return this.checkAvailableJobs()
})
}
// ---------------------------------------------------------------------------
private saveRegisteredInstancesInConf () {
const data = this.servers.map(s => {
return pick(s, [ 'url', 'runnerToken', 'runnerName', 'runnerDescription' ])
})
return ConfigManager.Instance.setRegisteredInstances(data)
}
private canProcessMoreJobs () {
return this.processingJobs.length < ConfigManager.Instance.getConfig().jobs.concurrency
}
// ---------------------------------------------------------------------------
private async cleanupTMP () {
const files = await readdir(ConfigManager.Instance.getTranscodingDirectory())
for (const file of files) {
await remove(join(ConfigManager.Instance.getTranscodingDirectory(), file))
}
}
private async onExit () {
if (this.cleaningUp) return
this.cleaningUp = true
logger.info('Cleaning up after program exit')
try {
for (const { server, job } of this.processingJobs) {
await server.runnerJobs.abort({
jobToken: job.jobToken,
jobUUID: job.uuid,
reason: 'Runner stopped',
runnerToken: server.runnerToken
})
}
await this.cleanupTMP()
} catch (err) {
logger.error(err)
process.exit(-1)
}
process.exit()
}
}
+1
ファイルの表示
@@ -0,0 +1 @@
export * from './supported-job.js'
+50
ファイルの表示
@@ -0,0 +1,50 @@
import {
RunnerJobLiveRTMPHLSTranscodingPayload,
RunnerJobPayload,
RunnerJobStudioTranscodingPayload,
RunnerJobTranscriptionPayload,
RunnerJobType,
RunnerJobVODAudioMergeTranscodingPayload,
RunnerJobVODHLSTranscodingPayload,
RunnerJobVODWebVideoTranscodingPayload,
VideoStudioTaskPayload
} from '@peertube/peertube-models'
const supportedMatrix: { [ id in RunnerJobType ]: (payload: RunnerJobPayload) => boolean } = {
'vod-web-video-transcoding': (_payload: RunnerJobVODWebVideoTranscodingPayload) => {
return true
},
'vod-hls-transcoding': (_payload: RunnerJobVODHLSTranscodingPayload) => {
return true
},
'vod-audio-merge-transcoding': (_payload: RunnerJobVODAudioMergeTranscodingPayload) => {
return true
},
'live-rtmp-hls-transcoding': (_payload: RunnerJobLiveRTMPHLSTranscodingPayload) => {
return true
},
'video-studio-transcoding': (payload: RunnerJobStudioTranscodingPayload) => {
const tasks = payload?.tasks
const supported = new Set<VideoStudioTaskPayload['name']>([ 'add-intro', 'add-outro', 'add-watermark', 'cut' ])
if (!Array.isArray(tasks)) return false
return tasks.every(t => t && supported.has(t.name))
},
'video-transcription': (_payload: RunnerJobTranscriptionPayload) => {
return true
}
}
export function isJobSupported (job: { type: RunnerJobType, payload: RunnerJobPayload }, enabledJobs?: Set<RunnerJobType>) {
if (enabledJobs && !enabledJobs.has(job.type)) return false
const fn = supportedMatrix[job.type]
if (!fn) return false
return fn(job.payload as any)
}
export function getSupportedJobsList () {
return Object.keys(supportedMatrix)
}
+158
ファイルの表示
@@ -0,0 +1,158 @@
import { parse, stringify } from '@iarna/toml'
import { TranscriptionEngineName, WhisperBuiltinModelName } from '@peertube/peertube-transcription'
import envPaths from 'env-paths'
import { ensureDir, pathExists, remove } from 'fs-extra/esm'
import { readFile, writeFile } from 'fs/promises'
import merge from 'lodash-es/merge.js'
import { dirname, join } from 'path'
import { logger } from '../shared/index.js'
const paths = envPaths('peertube-runner')
type Config = {
jobs: {
concurrency: number
}
ffmpeg: {
threads: number
nice: number
}
registeredInstances: {
url: string
runnerToken: string
runnerName: string
runnerDescription?: string
}[]
transcription: {
engine: TranscriptionEngineName
enginePath: string | null
model: WhisperBuiltinModelName
modelPath: string | null
}
}
export class ConfigManager {
private static instance: ConfigManager
private config: Config = {
jobs: {
concurrency: 2
},
ffmpeg: {
threads: 2,
nice: 20
},
transcription: {
engine: 'whisper-ctranslate2',
enginePath: null,
model: 'small',
modelPath: null
},
registeredInstances: []
}
private id: string
private configFilePath: string
private constructor () {}
init (id: string) {
this.id = id
this.configFilePath = join(this.getConfigDir(), 'config.toml')
}
async load () {
logger.info(`Using ${this.configFilePath} as configuration file`)
if (this.isTestInstance()) {
logger.info('Removing configuration file as we are using the "test" id')
await remove(this.configFilePath)
}
await ensureDir(dirname(this.configFilePath))
if (!await pathExists(this.configFilePath)) {
await this.save()
}
const file = await readFile(this.configFilePath, 'utf-8')
this.config = merge(this.config, parse(file))
}
save () {
return writeFile(this.configFilePath, stringify(this.config))
}
// ---------------------------------------------------------------------------
async setRegisteredInstances (registeredInstances: {
url: string
runnerToken: string
runnerName: string
runnerDescription?: string
}[]) {
this.config.registeredInstances = registeredInstances
await this.save()
}
// ---------------------------------------------------------------------------
getConfig () {
return this.deepFreeze(this.config)
}
// ---------------------------------------------------------------------------
getTranscodingDirectory () {
return join(paths.cache, this.id, 'transcoding')
}
getTranscriptionDirectory () {
return join(paths.cache, this.id, 'transcription')
}
getSocketDirectory () {
return join(paths.data, this.id)
}
getSocketPath () {
return join(this.getSocketDirectory(), 'peertube-runner.sock')
}
getConfigDir () {
return join(paths.config, this.id)
}
// ---------------------------------------------------------------------------
isTestInstance () {
return typeof this.id === 'string' && this.id.match(/^test-\d$/)
}
// ---------------------------------------------------------------------------
// Thanks: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Object/freeze
private deepFreeze <T extends object> (object: T) {
const propNames = Reflect.ownKeys(object)
// Freeze properties before freezing self
for (const name of propNames) {
const value = object[name]
if ((value && typeof value === 'object') || typeof value === 'function') {
this.deepFreeze(value)
}
}
return Object.freeze({ ...object })
}
static get Instance () {
return this.instance || (this.instance = new this())
}
}
+67
ファイルの表示
@@ -0,0 +1,67 @@
import { createWriteStream } from 'fs'
import { remove } from 'fs-extra/esm'
import { RequestOptions } from 'https'
import { http, https } from 'follow-redirects'
import { logger } from './logger.js'
export function downloadFile (options: {
url: string
destination: string
runnerToken: string
jobToken: string
}) {
const { url, destination, runnerToken, jobToken } = options
logger.debug(`Downloading file ${url}`)
return new Promise<void>((res, rej) => {
const parsed = new URL(url)
const body = JSON.stringify({
runnerToken,
jobToken
})
const getOptions: RequestOptions = {
method: 'POST',
hostname: parsed.hostname,
port: parsed.port,
path: parsed.pathname,
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(body, 'utf-8')
}
}
const request = getRequest(url)(getOptions, response => {
const code = response.statusCode ?? 0
if (code >= 400) {
return rej(new Error(response.statusMessage))
}
const file = createWriteStream(destination)
file.on('finish', () => res())
response.pipe(file)
})
request.on('error', err => {
remove(destination)
.catch(err => logger.error(err))
return rej(err)
})
request.write(body)
request.end()
})
}
// ---------------------------------------------------------------------------
function getRequest (url: string) {
if (url.startsWith('https://')) return https.request.bind(https)
return http.request.bind(http)
}
+3
ファイルの表示
@@ -0,0 +1,3 @@
export * from './config-manager.js'
export * from './http.js'
export * from './logger.js'
+2
ファイルの表示
@@ -0,0 +1,2 @@
export * from './ipc-client.js'
export * from './ipc-server.js'
+88
ファイルの表示
@@ -0,0 +1,88 @@
import CliTable3 from 'cli-table3'
import { ensureDir } from 'fs-extra/esm'
import { Client as NetIPC } from '@peertube/net-ipc'
import { ConfigManager } from '../config-manager.js'
import { IPCReponse, IPCReponseData, IPCRequest } from './shared/index.js'
export class IPCClient {
private netIPC: NetIPC
async run () {
await ensureDir(ConfigManager.Instance.getSocketDirectory())
const socketPath = ConfigManager.Instance.getSocketPath()
this.netIPC = new NetIPC({ path: socketPath })
try {
await this.netIPC.connect()
} catch (err) {
if (err.code === 'ECONNREFUSED') {
throw new Error(
'This runner is not currently running in server mode on this system. ' +
'Please run it using the `server` command first (in another terminal for example) and then retry your command.'
)
}
throw err
}
}
async askRegister (options: {
url: string
registrationToken: string
runnerName: string
runnerDescription?: string
}) {
const req: IPCRequest = {
type: 'register',
...options
}
const { success, error } = await this.netIPC.request(req) as IPCReponse
if (success) console.log('PeerTube instance registered')
else console.error('Could not register PeerTube instance on runner server side', error)
}
async askUnregister (options: {
url: string
runnerName: string
}) {
const req: IPCRequest = {
type: 'unregister',
...options
}
const { success, error } = await this.netIPC.request(req) as IPCReponse
if (success) console.log('PeerTube instance unregistered')
else console.error('Could not unregister PeerTube instance on runner server side', error)
}
async askListRegistered () {
const req: IPCRequest = {
type: 'list-registered'
}
const { success, error, data } = await this.netIPC.request(req) as IPCReponse<IPCReponseData>
if (!success) {
console.error('Could not list registered PeerTube instances', error)
return
}
const table = new CliTable3({
head: [ 'instance', 'runner name', 'runner description' ]
})
for (const server of data.servers) {
table.push([ server.url, server.runnerName, server.runnerDescription ])
}
console.log(table.toString())
}
stop () {
this.netIPC.destroy()
}
}
+61
ファイルの表示
@@ -0,0 +1,61 @@
import { ensureDir } from 'fs-extra/esm'
import { Server as NetIPC } from '@peertube/net-ipc'
import { pick } from '@peertube/peertube-core-utils'
import { RunnerServer } from '../../server/index.js'
import { ConfigManager } from '../config-manager.js'
import { logger } from '../logger.js'
import { IPCReponse, IPCReponseData, IPCRequest } from './shared/index.js'
export class IPCServer {
private netIPC: NetIPC
private runnerServer: RunnerServer
async run (runnerServer: RunnerServer) {
this.runnerServer = runnerServer
await ensureDir(ConfigManager.Instance.getSocketDirectory())
const socketPath = ConfigManager.Instance.getSocketPath()
this.netIPC = new NetIPC({ path: socketPath })
await this.netIPC.start()
logger.info(`IPC socket created on ${socketPath}`)
this.netIPC.on('request', async (req: IPCRequest, res) => {
try {
const data = await this.process(req)
this.sendReponse(res, { success: true, data })
} catch (err) {
logger.error('Cannot execute RPC call', err)
this.sendReponse(res, { success: false, error: err.message })
}
})
}
private async process (req: IPCRequest) {
switch (req.type) {
case 'register':
await this.runnerServer.registerRunner(pick(req, [ 'url', 'registrationToken', 'runnerName', 'runnerDescription' ]))
return undefined
case 'unregister':
await this.runnerServer.unregisterRunner(pick(req, [ 'url', 'runnerName' ]))
return undefined
case 'list-registered':
return Promise.resolve(this.runnerServer.listRegistered())
default:
throw new Error('Unknown RPC call ' + (req as any).type)
}
}
private sendReponse <T extends IPCReponseData> (
response: (data: any) => Promise<void>,
body: IPCReponse<T>
) {
response(body)
.catch(err => logger.error('Cannot send response after IPC request', err))
}
}
+2
ファイルの表示
@@ -0,0 +1,2 @@
export * from './ipc-request.model.js'
export * from './ipc-response.model.js'
+15
ファイルの表示
@@ -0,0 +1,15 @@
export type IPCRequest =
IPCRequestRegister |
IPCRequestUnregister |
IPCRequestListRegistered
export type IPCRequestRegister = {
type: 'register'
url: string
registrationToken: string
runnerName: string
runnerDescription?: string
}
export type IPCRequestUnregister = { type: 'unregister', url: string, runnerName: string }
export type IPCRequestListRegistered = { type: 'list-registered' }
+15
ファイルの表示
@@ -0,0 +1,15 @@
export type IPCReponse <T extends IPCReponseData = undefined> = {
success: boolean
error?: string
data?: T
}
export type IPCReponseData =
// list registered
{
servers: {
runnerName: string
runnerDescription: string
url: string
}[]
}
+12
ファイルの表示
@@ -0,0 +1,12 @@
import { pino } from 'pino'
import pretty from 'pino-pretty'
const logger = pino(pretty.default({
colorize: true
}))
logger.level = 'info'
export {
logger
}