ニジカ投稿局 https://tv.nizika.tv
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

runner-job.ts 7.8 KiB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. import {
  2. RunnerJob,
  3. RunnerJobAdmin,
  4. RunnerJobState,
  5. type RunnerJobPayload,
  6. type RunnerJobPrivatePayload,
  7. type RunnerJobStateType,
  8. type RunnerJobType
  9. } from '@peertube/peertube-models'
  10. import { isArray, isUUIDValid } from '@server/helpers/custom-validators/misc.js'
  11. import { CONSTRAINTS_FIELDS, RUNNER_JOB_STATES } from '@server/initializers/constants.js'
  12. import { MRunnerJob, MRunnerJobRunner, MRunnerJobRunnerParent } from '@server/types/models/runners/index.js'
  13. import { Op, Transaction } from 'sequelize'
  14. import {
  15. AllowNull,
  16. BelongsTo,
  17. Column,
  18. CreatedAt,
  19. DataType,
  20. Default,
  21. ForeignKey,
  22. IsUUID, Scopes,
  23. Table,
  24. UpdatedAt
  25. } from 'sequelize-typescript'
  26. import { SequelizeModel, getSort, searchAttribute } from '../shared/index.js'
  27. import { RunnerModel } from './runner.js'
  28. enum ScopeNames {
  29. WITH_RUNNER = 'WITH_RUNNER',
  30. WITH_PARENT = 'WITH_PARENT'
  31. }
  32. @Scopes(() => ({
  33. [ScopeNames.WITH_RUNNER]: {
  34. include: [
  35. {
  36. model: RunnerModel.unscoped(),
  37. required: false
  38. }
  39. ]
  40. },
  41. [ScopeNames.WITH_PARENT]: {
  42. include: [
  43. {
  44. model: RunnerJobModel.unscoped(),
  45. required: false
  46. }
  47. ]
  48. }
  49. }))
  50. @Table({
  51. tableName: 'runnerJob',
  52. indexes: [
  53. {
  54. fields: [ 'uuid' ],
  55. unique: true
  56. },
  57. {
  58. fields: [ 'processingJobToken' ],
  59. unique: true
  60. },
  61. {
  62. fields: [ 'runnerId' ]
  63. }
  64. ]
  65. })
  66. export class RunnerJobModel extends SequelizeModel<RunnerJobModel> {
  67. @AllowNull(false)
  68. @IsUUID(4)
  69. @Column(DataType.UUID)
  70. uuid: string
  71. @AllowNull(false)
  72. @Column
  73. type: RunnerJobType
  74. @AllowNull(false)
  75. @Column(DataType.JSONB)
  76. payload: RunnerJobPayload
  77. @AllowNull(false)
  78. @Column(DataType.JSONB)
  79. privatePayload: RunnerJobPrivatePayload
  80. @AllowNull(false)
  81. @Column
  82. state: RunnerJobStateType
  83. @AllowNull(false)
  84. @Default(0)
  85. @Column
  86. failures: number
  87. @AllowNull(true)
  88. @Column(DataType.STRING(CONSTRAINTS_FIELDS.RUNNER_JOBS.ERROR_MESSAGE.max))
  89. error: string
  90. // Less has priority
  91. @AllowNull(false)
  92. @Column
  93. priority: number
  94. // Used to fetch the appropriate job when the runner wants to post the result
  95. @AllowNull(true)
  96. @Column
  97. processingJobToken: string
  98. @AllowNull(true)
  99. @Column
  100. progress: number
  101. @AllowNull(true)
  102. @Column
  103. startedAt: Date
  104. @AllowNull(true)
  105. @Column
  106. finishedAt: Date
  107. @CreatedAt
  108. createdAt: Date
  109. @UpdatedAt
  110. updatedAt: Date
  111. @ForeignKey(() => RunnerJobModel)
  112. @Column
  113. dependsOnRunnerJobId: number
  114. @BelongsTo(() => RunnerJobModel, {
  115. foreignKey: {
  116. name: 'dependsOnRunnerJobId',
  117. allowNull: true
  118. },
  119. onDelete: 'cascade'
  120. })
  121. DependsOnRunnerJob: Awaited<RunnerJobModel>
  122. @ForeignKey(() => RunnerModel)
  123. @Column
  124. runnerId: number
  125. @BelongsTo(() => RunnerModel, {
  126. foreignKey: {
  127. name: 'runnerId',
  128. allowNull: true
  129. },
  130. onDelete: 'SET NULL'
  131. })
  132. Runner: Awaited<RunnerModel>
  133. // ---------------------------------------------------------------------------
  134. static loadWithRunner (uuid: string) {
  135. const query = {
  136. where: { uuid }
  137. }
  138. return RunnerJobModel.scope(ScopeNames.WITH_RUNNER).findOne<MRunnerJobRunner>(query)
  139. }
  140. static loadByRunnerAndJobTokensWithRunner (options: {
  141. uuid: string
  142. runnerToken: string
  143. jobToken: string
  144. }) {
  145. const { uuid, runnerToken, jobToken } = options
  146. const query = {
  147. where: {
  148. uuid,
  149. processingJobToken: jobToken
  150. },
  151. include: {
  152. model: RunnerModel.unscoped(),
  153. required: true,
  154. where: {
  155. runnerToken
  156. }
  157. }
  158. }
  159. return RunnerJobModel.findOne<MRunnerJobRunner>(query)
  160. }
  161. static listAvailableJobs () {
  162. const query = {
  163. limit: 10,
  164. order: getSort('priority'),
  165. where: {
  166. state: RunnerJobState.PENDING
  167. }
  168. }
  169. return RunnerJobModel.findAll<MRunnerJob>(query)
  170. }
  171. static listStalledJobs (options: {
  172. staleTimeMS: number
  173. types: RunnerJobType[]
  174. }) {
  175. const before = new Date(Date.now() - options.staleTimeMS)
  176. return RunnerJobModel.findAll<MRunnerJob>({
  177. where: {
  178. type: {
  179. [Op.in]: options.types
  180. },
  181. state: RunnerJobState.PROCESSING,
  182. updatedAt: {
  183. [Op.lt]: before
  184. }
  185. }
  186. })
  187. }
  188. static listChildrenOf (job: MRunnerJob, transaction?: Transaction) {
  189. const query = {
  190. where: {
  191. dependsOnRunnerJobId: job.id
  192. },
  193. transaction
  194. }
  195. return RunnerJobModel.findAll<MRunnerJob>(query)
  196. }
  197. static listForApi (options: {
  198. start: number
  199. count: number
  200. sort: string
  201. search?: string
  202. stateOneOf?: RunnerJobStateType[]
  203. }) {
  204. const { start, count, sort, search, stateOneOf } = options
  205. const query = {
  206. offset: start,
  207. limit: count,
  208. order: getSort(sort),
  209. where: []
  210. }
  211. if (search) {
  212. if (isUUIDValid(search)) {
  213. query.where.push({ uuid: search })
  214. } else {
  215. query.where.push({
  216. [Op.or]: [
  217. searchAttribute(search, 'type'),
  218. searchAttribute(search, '$Runner.name$')
  219. ]
  220. })
  221. }
  222. }
  223. if (isArray(stateOneOf) && stateOneOf.length !== 0) {
  224. query.where.push({
  225. state: {
  226. [Op.in]: stateOneOf
  227. }
  228. })
  229. }
  230. return Promise.all([
  231. RunnerJobModel.scope([ ScopeNames.WITH_RUNNER ]).count(query),
  232. RunnerJobModel.scope([ ScopeNames.WITH_RUNNER, ScopeNames.WITH_PARENT ]).findAll<MRunnerJobRunnerParent>(query)
  233. ]).then(([ total, data ]) => ({ total, data }))
  234. }
  235. static updateDependantJobsOf (runnerJob: MRunnerJob) {
  236. const where = {
  237. dependsOnRunnerJobId: runnerJob.id
  238. }
  239. return RunnerJobModel.update({ state: RunnerJobState.PENDING }, { where })
  240. }
  241. static cancelAllNonFinishedJobs (options: { type: RunnerJobType }) {
  242. const where = {
  243. type: options.type,
  244. state: {
  245. [Op.in]: [ RunnerJobState.COMPLETING, RunnerJobState.PENDING, RunnerJobState.PROCESSING, RunnerJobState.WAITING_FOR_PARENT_JOB ]
  246. }
  247. }
  248. return RunnerJobModel.update({ state: RunnerJobState.CANCELLED }, { where })
  249. }
  250. // ---------------------------------------------------------------------------
  251. resetToPending () {
  252. this.state = RunnerJobState.PENDING
  253. this.processingJobToken = null
  254. this.progress = null
  255. this.startedAt = null
  256. this.runnerId = null
  257. }
  258. setToErrorOrCancel (
  259. // eslint-disable-next-line max-len
  260. state: typeof RunnerJobState.PARENT_ERRORED | typeof RunnerJobState.ERRORED | typeof RunnerJobState.CANCELLED | typeof RunnerJobState.PARENT_CANCELLED
  261. ) {
  262. this.state = state
  263. this.processingJobToken = null
  264. this.finishedAt = new Date()
  265. }
  266. toFormattedJSON (this: MRunnerJobRunnerParent): RunnerJob {
  267. const runner = this.Runner
  268. ? {
  269. id: this.Runner.id,
  270. name: this.Runner.name,
  271. description: this.Runner.description
  272. }
  273. : null
  274. const parent = this.DependsOnRunnerJob
  275. ? {
  276. id: this.DependsOnRunnerJob.id,
  277. uuid: this.DependsOnRunnerJob.uuid,
  278. type: this.DependsOnRunnerJob.type,
  279. state: {
  280. id: this.DependsOnRunnerJob.state,
  281. label: RUNNER_JOB_STATES[this.DependsOnRunnerJob.state]
  282. }
  283. }
  284. : undefined
  285. return {
  286. uuid: this.uuid,
  287. type: this.type,
  288. state: {
  289. id: this.state,
  290. label: RUNNER_JOB_STATES[this.state]
  291. },
  292. progress: this.progress,
  293. priority: this.priority,
  294. failures: this.failures,
  295. error: this.error,
  296. payload: this.payload,
  297. startedAt: this.startedAt?.toISOString(),
  298. finishedAt: this.finishedAt?.toISOString(),
  299. createdAt: this.createdAt.toISOString(),
  300. updatedAt: this.updatedAt.toISOString(),
  301. parent,
  302. runner
  303. }
  304. }
  305. toFormattedAdminJSON (this: MRunnerJobRunnerParent): RunnerJobAdmin {
  306. return {
  307. ...this.toFormattedJSON(),
  308. privatePayload: this.privatePayload
  309. }
  310. }
  311. }