ぼざろクリーチャーシリーズ DB 兼 API(自分用)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

update_db.py 31 KiB

2 months ago
2 months ago
1 month ago
2 months ago
2 months ago
1 month ago
1 month ago
1 month ago
1 month ago
1 month ago
1 month ago
1 month ago
1 month ago
2 months ago
2 months ago
2 months ago
2 months ago
1 month ago
2 months ago
1 month ago
2 months ago
1 month ago
2 months ago
1 month ago
2 months ago
2 months ago
1 month ago
1 month ago
2 months ago
1 month ago
2 months ago
1 month ago
1 month ago
1 month ago
1 month ago
1 month ago
1 month ago
1 month ago
1 month ago
1 month ago
1 month ago
1 month ago
2 months ago
1 month ago
1 month ago
1 month ago
1 month ago
1 month ago
1 month ago
1 month ago
1 month ago
1 month ago
1 month ago
1 month ago
1 month ago
2 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958
  1. """
  2. 日次で実行し,ぼざクリ DB を最新に更新する.
  3. """
  4. from __future__ import annotations
  5. import json
  6. import os
  7. import random
  8. import string
  9. import time
  10. from dataclasses import dataclass
  11. from datetime import datetime, timedelta
  12. from typing import Any, TypedDict, cast
  13. import mysql.connector
  14. import requests
  15. DbNull = (None,)
  16. DbNullType = tuple[None]
  17. class VideoSearchParam (TypedDict):
  18. q: str
  19. targets: str
  20. _sort: str
  21. fields: str
  22. _limit: int
  23. jsonFilter: str
  24. class VideoResult (TypedDict):
  25. contentId: str
  26. title: str
  27. tags: list[str]
  28. description: str
  29. viewCounter: int
  30. startTime: str
  31. class CommentResult (TypedDict):
  32. id: str
  33. no: int
  34. vposMs: int
  35. body: str
  36. commands: list[str]
  37. userId: str
  38. isPremium: bool
  39. score: int
  40. postedAt: str
  41. nicoruCount: int
  42. nicoruId: Any
  43. source: str
  44. isMyPost: bool
  45. def main (
  46. ) -> None:
  47. conn = mysql.connector.connect (user = os.environ['MYSQL_USER'],
  48. password = os.environ['MYSQL_PASS'],
  49. database = 'nizika_nico')
  50. now = datetime.now ()
  51. video_dao = VideoDao (conn)
  52. tag_dao = TagDao (conn)
  53. video_tag_dao = VideoTagDao (conn)
  54. video_history_dao = VideoHistoryDao (conn)
  55. comment_dao = CommentDao (conn)
  56. user_dao = UserDao (conn)
  57. api_data = search_nico_by_tags (['伊地知ニジカ', 'ぼざろクリーチャーシリーズ'])
  58. update_tables (video_dao, tag_dao, video_tag_dao, video_history_dao, comment_dao, user_dao,
  59. api_data, now)
  60. # TODO: 書くこと
  61. def update_tables (
  62. video_dao: VideoDao,
  63. tag_dao: TagDao,
  64. video_tag_dao: VideoTagDao,
  65. video_history_dao: VideoHistoryDao,
  66. comment_dao: CommentDao,
  67. user_dao: UserDao,
  68. api_data: list[VideoResult],
  69. now: datetime,
  70. ) -> None:
  71. video_ids: list[int] = []
  72. for datum in api_data:
  73. video = VideoDto (code = datum['contentId'],
  74. title = datum['title'],
  75. description = datum['description'],
  76. uploaded_at = datetime.fromisoformat (datum['startTime']))
  77. video_dao.upsert (video, False)
  78. if video.id_ is not None:
  79. video_ids.append (video.id_)
  80. video_history = VideoHistoryDto (video_id = video.id_,
  81. fetched_at = now,
  82. views_count = datum['viewCounter'])
  83. video_history_dao.insert (video_history)
  84. tag_ids: list[int] = []
  85. video_tags = video_tag_dao.fetch_alive_by_video_id (video.id_, False)
  86. for vt in video_tags:
  87. tag = tag_dao.find (vt.tag_id)
  88. if (tag is not None
  89. and (tag.name not in datum['tags'])
  90. and (tag.id_ is not None)):
  91. tag_ids.append (tag.id_)
  92. video_tag_dao.untag_all (video.id_, tag_ids, now)
  93. tags: list[TagDto] = []
  94. for tag_name in datum['tags']:
  95. tag = tag_dao.fetch_by_name (tag_name)
  96. if tag is None:
  97. tag = TagDto (name = tag_name)
  98. tag_dao.insert (tag)
  99. if video.id_ is not None and tag.id_ is not None:
  100. video_tag = video_tag_dao.fetch_alive_by_ids (video.id_, tag.id_, False)
  101. if video_tag is None:
  102. video_tag = VideoTagDto (video_id = video.id_,
  103. tag_id = tag.id_,
  104. tagged_at = now)
  105. video_tag_dao.insert (video_tag, False)
  106. for com in fetch_comments (video.code):
  107. user = user_dao.fetch_by_code (com['userId'])
  108. if user is None:
  109. user = UserDto (code = com['userId'])
  110. user_dao.insert (user)
  111. if video.id_ is not None and user.id_ is not None:
  112. comment = CommentDto (video_id = video.id_,
  113. comment_no = com['no'],
  114. user_id = user.id_,
  115. content = com['body'],
  116. posted_at = datetime.fromisoformat (com['postedAt']),
  117. nico_count = com['nicoruCount'],
  118. vpos_ms = com['vposMs'])
  119. comment_dao.upsert (comment, False)
  120. alive_video_ids = [d['contentId'] for d in api_data]
  121. lost_video_ids: list[int] = []
  122. videos = video_dao.fetch_alive ()
  123. for video in videos:
  124. if video.id_ is not None and video.id_ not in alive_video_ids:
  125. lost_video_ids.append (video.id_)
  126. video_dao.delete (lost_video_ids, now)
  127. def fetch_comments (
  128. video_code: str,
  129. ) -> list[CommentResult]:
  130. time.sleep (1.2)
  131. headers = { 'X-Frontend-Id': '6',
  132. 'X-Frontend-Version': '0' }
  133. action_track_id = (
  134. ''.join (random.choice (string.ascii_letters + string.digits)
  135. for _ in range (10))
  136. + '_'
  137. + str (random.randrange (10 ** 12, 10 ** 13)))
  138. url = (f"https://www.nicovideo.jp/api/watch/v3_guest/{ video_code }"
  139. + f"?actionTrackId={ action_track_id }")
  140. res = requests.post (url, headers = headers, timeout = 60).json ()
  141. try:
  142. nv_comment = res['data']['comment']['nvComment']
  143. except KeyError:
  144. return []
  145. if nv_comment is None:
  146. return []
  147. headers = { 'X-Frontend-Id': '6',
  148. 'X-Frontend-Version': '0',
  149. 'Content-Type': 'application/json' }
  150. params = { 'params': nv_comment['params'],
  151. 'additionals': { },
  152. 'threadKey': nv_comment['threadKey'] }
  153. url = nv_comment['server'] + '/v1/threads'
  154. res = (requests.post (url, json.dumps (params),
  155. headers = headers,
  156. timeout = 60)
  157. .json ())
  158. try:
  159. return res['data']['threads'][1]['comments']
  160. except (IndexError, KeyError):
  161. return []
  162. def search_nico_by_tag (
  163. tag: str,
  164. ) -> list[VideoResult]:
  165. return search_nico_by_tags ([tag])
  166. def search_nico_by_tags (
  167. tags: list[str],
  168. ) -> list[VideoResult]:
  169. today = datetime.now ()
  170. url = ('https://snapshot.search.nicovideo.jp'
  171. + '/api/v2/snapshot/video/contents/search')
  172. result_data: list[VideoResult] = []
  173. to = datetime (2022, 12, 3)
  174. while to <= today:
  175. time.sleep (1.2)
  176. until = to + timedelta (days = 14)
  177. query_filter = json.dumps ({ 'type': 'or',
  178. 'filters': [
  179. { 'type': 'range',
  180. 'field': 'startTime',
  181. 'from': '%04d-%02d-%02dT00:00:00+09:00' % (to.year, to.month, to.day),
  182. 'to': '%04d-%02d-%02dT23:59:59+09:00' % (until.year, until.month, until.day),
  183. 'include_lower': True }] })
  184. params: VideoSearchParam = { 'q': ' OR '.join (tags),
  185. 'targets': 'tagsExact',
  186. '_sort': '-viewCounter',
  187. 'fields': ('contentId,'
  188. 'title,'
  189. 'tags,'
  190. 'description,'
  191. 'viewCounter,'
  192. 'startTime'),
  193. '_limit': 100,
  194. 'jsonFilter': query_filter }
  195. res = requests.get (url, params = cast (dict[str, int | str], params), timeout = 60).json ()
  196. try:
  197. result_data += res['data']
  198. except KeyError:
  199. pass
  200. to = until + timedelta (days = 1)
  201. return result_data
  202. class VideoDao:
  203. def __init__ (
  204. self,
  205. conn
  206. ):
  207. self.conn = conn
  208. def find (
  209. self,
  210. video_id: int,
  211. with_relation_tables: bool,
  212. ) -> VideoDto | None:
  213. with self.conn.cursor () as c:
  214. c.execute ("""
  215. SELECT
  216. id,
  217. code,
  218. title,
  219. description,
  220. uploaded_at,
  221. deleted_at
  222. FROM
  223. videos
  224. WHERE
  225. id = %s
  226. ORDER BY
  227. id""", video_id)
  228. row = c.fetchone ()
  229. if row is None:
  230. return None
  231. return self._create_dto_from_row (row, with_relation_tables)
  232. def fetch_all (
  233. self,
  234. with_relation_tables: bool,
  235. ) -> list[VideoDto]:
  236. with self.conn.cursor () as c:
  237. c.execute ("""
  238. SELECT
  239. id,
  240. code,
  241. title,
  242. description,
  243. uploaded_at,
  244. deleted_at
  245. FROM
  246. videos
  247. ORDER BY
  248. id""")
  249. videos: list[VideoDto] = []
  250. for row in c.fetchall ():
  251. videos.append (self._create_dto_from_row (row, with_relation_tables))
  252. return videos
  253. def fetch_alive (
  254. self,
  255. ) -> list[VideoDto]:
  256. with self.conn.cursor () as c:
  257. c.execute ("""
  258. SELECT
  259. id,
  260. code,
  261. title,
  262. description,
  263. uploaded_at,
  264. deleted_at
  265. FROM
  266. videos
  267. WHERE
  268. deleted_at IS NULL""")
  269. videos: list[VideoDto] = []
  270. for row in c.fetchall ():
  271. videos.append (self._create_dto_from_row (row, False))
  272. return videos
  273. def upsert (
  274. self,
  275. video: VideoDto,
  276. with_relation_tables: bool,
  277. ) -> None:
  278. with self.conn.cursor () as c:
  279. c.execute ("""
  280. INSERT INTO
  281. videos(
  282. code,
  283. title,
  284. description,
  285. uploaded_at,
  286. deleted_at)
  287. VALUES
  288. (
  289. %s,
  290. %s,
  291. %s,
  292. %s,
  293. %s)
  294. ON DUPLICATE KEY UPDATE
  295. code = VALUES(code),
  296. title = VALUES(title),
  297. description = VALUES(description),
  298. uploaded_at = VALUES(uploaded_at),
  299. deleted_at = VALUES(deleted_at)""", (video.code,
  300. video.title,
  301. video.description,
  302. video.uploaded_at,
  303. video.deleted_at))
  304. video.id_ = c.lastrowid
  305. if with_relation_tables:
  306. if video.video_tags is not None:
  307. VideoTagDao (self.conn).upsert_all (video.video_tags, False)
  308. if video.comments is not None:
  309. CommentDao (self.conn).upsert_all (video.comments, False)
  310. if video.video_histories is not None:
  311. VideoHistoryDao (self.conn).upsert_all (video.video_histories)
  312. def upsert_all (
  313. self,
  314. videos: list[VideoDto],
  315. with_relation_tables: bool,
  316. ) -> None:
  317. for video in videos:
  318. self.upsert (video, with_relation_tables)
  319. def delete (
  320. self,
  321. video_ids: list[int],
  322. at: datetime,
  323. ) -> None:
  324. with self.conn.cursor () as c:
  325. c.execute ("""
  326. UPDATE
  327. videos
  328. SET
  329. deleted_at = %s
  330. WHERE
  331. id IN (%s)""", (at, (*video_ids,)))
  332. def _create_dto_from_row (
  333. self,
  334. row,
  335. with_relation_tables: bool,
  336. ) -> VideoDto:
  337. video = VideoDto (id_ = row['id'],
  338. code = row['code'],
  339. title = row['title'],
  340. description = row['description'],
  341. uploaded_at = row['uploaded_at'],
  342. deleted_at = row['deleted_at'])
  343. if with_relation_tables and video.id_ is not None:
  344. video.video_tags = VideoTagDao (self.conn).fetch_by_video_id (video.id_, False)
  345. for i in range (len (video.video_tags)):
  346. video.video_tags[i].video = video
  347. video.comments = CommentDao (self.conn).fetch_by_video_id (video.id_, False)
  348. for i in range (len (video.comments)):
  349. video.comments[i].video = video
  350. video.video_histories = VideoHistoryDao (self.conn).fetch_by_video_id (video.id_, False)
  351. for i in range (len (video.video_histories)):
  352. video.video_histories[i].video = video
  353. return video
  354. @dataclass (slots = True)
  355. class VideoDto:
  356. code: str
  357. title: str
  358. description: str
  359. uploaded_at: datetime
  360. id_: int | None = None
  361. deleted_at: datetime | DbNullType = DbNull
  362. video_tags: list[VideoTagDto] | None = None
  363. comments: list[CommentDto] | None = None
  364. video_histories: list[VideoHistoryDto] | None = None
  365. class VideoTagDao:
  366. def __init__ (
  367. self,
  368. conn,
  369. ):
  370. self.conn = conn
  371. def fetch_by_video_id (
  372. self,
  373. video_id: int,
  374. with_relation_tables: bool,
  375. ) -> list[VideoTagDto]:
  376. with self.conn.cursor () as c:
  377. c.execute ("""
  378. SELECT
  379. id,
  380. video_id,
  381. tag_id,
  382. tagged_at,
  383. untagged_at
  384. FROM
  385. video_tags
  386. WHERE
  387. video_id = %s
  388. ORDER BY
  389. id""", video_id)
  390. video_tags: list[VideoTagDto] = []
  391. for row in c.fetchall ():
  392. video_tags.append (self._create_dto_from_row (row, with_relation_tables))
  393. return video_tags
  394. def fetch_alive_by_video_id (
  395. self,
  396. video_id: int,
  397. with_relation_tables: bool,
  398. ) -> list[VideoTagDto]:
  399. with self.conn.cursor () as c:
  400. c.execute ("""
  401. SELECT
  402. id,
  403. video_id,
  404. tag_id,
  405. tagged_at,
  406. untagged_at
  407. FROM
  408. video_tags
  409. WHERE
  410. video_id = %s
  411. AND (untagged_at IS NULL)
  412. ORDER BY
  413. id""", video_id)
  414. video_tags: list[VideoTagDto] = []
  415. for row in c.fetchall ():
  416. video_tags.append (self._create_dto_from_row (row, with_relation_tables))
  417. return video_tags
  418. def fetch_alive_by_ids (
  419. self,
  420. video_id: int,
  421. tag_id: int,
  422. with_relation_tables: bool,
  423. ) -> VideoTagDto | None:
  424. with self.conn.cursor () as c:
  425. c.execute ("""
  426. SELECT
  427. id,
  428. video_id,
  429. tag_id,
  430. tagged_at,
  431. untagged_at
  432. FROM
  433. video_tags
  434. WHERE
  435. video_id = %s
  436. AND tag_id = %s""", (video_id, tag_id))
  437. row = c.fetchone ()
  438. if row is None:
  439. return None
  440. return self._create_dto_from_row (row, with_relation_tables)
  441. def insert (
  442. self,
  443. video_tag: VideoTagDto,
  444. with_relation_tables: bool,
  445. ) -> None:
  446. with self.conn.cursor () as c:
  447. c.execute ("""
  448. INSERT INTO
  449. video_tags(
  450. video_id,
  451. tag_id,
  452. tagged_at,
  453. untagged_at)
  454. VALUES
  455. (
  456. %s,
  457. %s,
  458. %s,
  459. %s)""", (video_tag.video_id, video_tag.tag_id,
  460. video_tag.tagged_at, video_tag.untagged_at))
  461. video_tag.id_ = c.lastrowid
  462. if with_relation_tables:
  463. if video_tag.video is not None:
  464. VideoDao (self.conn).upsert (video_tag.video, True)
  465. if video_tag.tag is not None:
  466. TagDao (self.conn).upsert (video_tag.tag)
  467. def upsert (
  468. self,
  469. video_tag: VideoTagDto,
  470. with_relation_tables: bool,
  471. ) -> None:
  472. with self.conn.cursor () as c:
  473. c.execute ("""
  474. INSERT INTO
  475. video_tags(
  476. video_id,
  477. tag_id,
  478. tagged_at,
  479. untagged_at)
  480. VALUES
  481. (
  482. %s,
  483. %s,
  484. %s,
  485. %s)
  486. ON DUPLICATE KEY UPDATE
  487. video_id = VALUES(video_id),
  488. tag_id = VALUES(tag_id),
  489. tagged_at = VALUES(tagged_at),
  490. untagged_at = VALUES(untagged_at)""", (video_tag.video_id,
  491. video_tag.tag_id,
  492. video_tag.tagged_at,
  493. video_tag.untagged_at))
  494. video_tag.id_ = c.lastrowid
  495. if with_relation_tables:
  496. if video_tag.video is not None:
  497. VideoDao (self.conn).upsert (video_tag.video, True)
  498. if video_tag.tag is not None:
  499. TagDao (self.conn).upsert (video_tag.tag)
  500. def upsert_all (
  501. self,
  502. video_tags: list[VideoTagDto],
  503. with_relation_tables: bool,
  504. ) -> None:
  505. for video_tag in video_tags:
  506. self.upsert (video_tag, with_relation_tables)
  507. def untag_all (
  508. self,
  509. video_id: int,
  510. tag_ids: list[int],
  511. now: datetime
  512. ) -> None:
  513. with self.conn.cursor () as c:
  514. c.execute ("""
  515. UPDATE
  516. video_tags
  517. SET
  518. untagged_at = %s
  519. WHERE
  520. video_id = %s
  521. AND tag_ids IN (%s)""", (now, video_id, (*tag_ids,)))
  522. def _create_dto_from_row (
  523. self,
  524. row,
  525. with_relation_tables: bool,
  526. ) -> VideoTagDto:
  527. video_tag = VideoTagDto (id_ = row['id'],
  528. video_id = row['video_id'],
  529. tag_id = row['tag_id'],
  530. tagged_at = row['tagged_at'],
  531. untagged_at = row['untagged_at'])
  532. if with_relation_tables:
  533. video_tag.video = VideoDao (self.conn).find (video_tag.video_id, True)
  534. video_tag.tag = TagDao (self.conn).find (video_tag.tag_id)
  535. return video_tag
  536. @dataclass (slots = True)
  537. class VideoTagDto:
  538. video_id: int
  539. tag_id: int
  540. tagged_at: datetime
  541. id_: int | None = None
  542. untagged_at: datetime | DbNullType = DbNull
  543. video: VideoDto | None = None
  544. tag: TagDto | None = None
  545. class TagDao:
  546. def __init__ (
  547. self,
  548. conn,
  549. ):
  550. self.conn = conn
  551. def find (
  552. self,
  553. tag_id: int,
  554. ) -> TagDto | None:
  555. with self.conn.cursor () as c:
  556. c.execute ("""
  557. SELECT
  558. id,
  559. name)
  560. FROM
  561. tags
  562. WHERE
  563. id = %s""", tag_id)
  564. row = c.fetchone ()
  565. if row is None:
  566. return None
  567. return self._create_dto_from_row (row)
  568. def fetch_by_name (
  569. self,
  570. tag_name: str,
  571. ) -> TagDto | None:
  572. with self.conn.cursor () as c:
  573. c.execute ("""
  574. SELECT
  575. id,
  576. name
  577. FROM
  578. tags
  579. WHERE
  580. name = %s""", tag_name)
  581. row = c.fetchone ()
  582. if row is None:
  583. return None
  584. return self._create_dto_from_row (row)
  585. def insert (
  586. self,
  587. tag: TagDto,
  588. ) -> None:
  589. with self.conn.cursor () as c:
  590. c.execute ("""
  591. INSERT INTO
  592. tags(name)
  593. VALUES
  594. (%s)""", tag.name)
  595. tag.id_ = c.lastrowid
  596. def upsert (
  597. self,
  598. tag: TagDto,
  599. ) -> None:
  600. with self.conn.cursor () as c:
  601. c.execute ("""
  602. INSERT INTO
  603. tags(name)
  604. VALUES
  605. (%s)
  606. ON DUPLICATE KEY UPDATE
  607. name = VALUES(name)""", tag.name)
  608. tag.id_ = c.lastrowid
  609. def _create_dto_from_row (
  610. self,
  611. row,
  612. ) -> TagDto:
  613. return TagDto (id_ = row['id'],
  614. name = row['name'])
  615. @dataclass (slots = True)
  616. class TagDto:
  617. name: str
  618. id_: int | None = None
  619. class VideoHistoryDao:
  620. def __init__ (
  621. self,
  622. conn,
  623. ):
  624. self.conn = conn
  625. def fetch_by_video_id (
  626. self,
  627. video_id: int,
  628. with_relation_tables: bool,
  629. ) -> list[VideoHistoryDto]:
  630. with self.conn.cursor () as c:
  631. c.execute ("""
  632. SELECT
  633. id,
  634. video_id,
  635. fetched_at,
  636. views_count
  637. FROM
  638. video_histories
  639. WHERE
  640. video_id = %s""", video_id)
  641. video_histories: list[VideoHistoryDto] = []
  642. for row in c.fetchall ():
  643. video_histories.append (self._create_dto_from_row (row, with_relation_tables))
  644. return video_histories
  645. def insert (
  646. self,
  647. video_history: VideoHistoryDto,
  648. ) -> None:
  649. with self.conn.cursor () as c:
  650. c.execute ("""
  651. INSERT INTO
  652. video_histories(
  653. video_id,
  654. fetched_at,
  655. views_count)
  656. VALUES
  657. (
  658. %s,
  659. %s,
  660. %s)""", (video_history.video_id,
  661. video_history.fetched_at,
  662. video_history.views_count))
  663. def upsert (
  664. self,
  665. video_history: VideoHistoryDto,
  666. ) -> None:
  667. with self.conn.cursor () as c:
  668. c.execute ("""
  669. INSERT INTO
  670. video_histories(
  671. video_id,
  672. fetched_at,
  673. views_count)
  674. VALUES
  675. (
  676. %s,
  677. %s,
  678. %s)
  679. ON DUPLICATE KEY UPDATE
  680. video_id,
  681. fetched_at,
  682. views_count""", (video_history.video_id,
  683. video_history.fetched_at,
  684. video_history.views_count))
  685. def upsert_all (
  686. self,
  687. video_histories: list[VideoHistoryDto],
  688. ) -> None:
  689. for video_history in video_histories:
  690. self.upsert (video_history)
  691. def _create_dto_from_row (
  692. self,
  693. row,
  694. with_relation_tables: bool,
  695. ) -> VideoHistoryDto:
  696. video_history = VideoHistoryDto (id_ = row['id'],
  697. video_id = row['video_id'],
  698. fetched_at = row['fetched_at'],
  699. views_count = row['views_count'])
  700. if with_relation_tables:
  701. video_history.video = VideoDao (self.conn).find (video_history.video_id, True)
  702. return video_history
  703. @dataclass (slots = True)
  704. class VideoHistoryDto:
  705. video_id: int
  706. fetched_at: datetime
  707. views_count: int
  708. id_: int | None = None
  709. video: VideoDto | None = None
  710. class CommentDao:
  711. def __init__ (
  712. self,
  713. conn,
  714. ):
  715. self.conn = conn
  716. def fetch_by_video_id (
  717. self,
  718. video_id: int,
  719. with_relation_tables: bool,
  720. ) -> list[CommentDto]:
  721. with self.conn.cursor () as c:
  722. c.execute ("""
  723. SELECT
  724. id,
  725. video_id,
  726. comment_no,
  727. user_id,
  728. content,
  729. posted_at,
  730. nico_count,
  731. vpos_ms
  732. FROM
  733. comments
  734. WHERE
  735. video_id = %s""", video_id)
  736. comments: list[CommentDto] = []
  737. for row in c.fetchall ():
  738. comments.append (self._create_dto_from_row (row, with_relation_tables))
  739. return comments
  740. def upsert (
  741. self,
  742. comment: CommentDto,
  743. with_relation_tables: bool,
  744. ) -> None:
  745. with self.conn.cursor () as c:
  746. c.execute ("""
  747. INSERT INTO
  748. comments(
  749. video_id,
  750. comment_no,
  751. user_id,
  752. content,
  753. posted_at,
  754. nico_count,
  755. vpos_ms)
  756. VALUES
  757. (
  758. %s,
  759. %s,
  760. %s,
  761. %s,
  762. %s,
  763. %s,
  764. %s)
  765. ON DUPLICATE KEY UPDATE
  766. video_id = VALUES(video_id),
  767. comment_no = VALUES(comment_no),
  768. user_id = VALUES(user_id),
  769. content = VALUES(content),
  770. posted_at = VALUES(posted_at),
  771. nico_count = VALUES(nico_count),
  772. vpos_ms = VALUES(vpos_ms)""", (comment.video_id,
  773. comment.comment_no,
  774. comment.user_id,
  775. comment.content,
  776. comment.posted_at,
  777. comment.nico_count,
  778. comment.vpos_ms))
  779. def upsert_all (
  780. self,
  781. comments: list[CommentDto],
  782. with_relation_tables: bool,
  783. ) -> None:
  784. for comment in comments:
  785. self.upsert (comment, with_relation_tables)
  786. def _create_dto_from_row (
  787. self,
  788. row,
  789. with_relation_tables: bool,
  790. ) -> CommentDto:
  791. comment = CommentDto (id_ = row['id'],
  792. video_id = row['video_id'],
  793. comment_no = row['comment_no'],
  794. user_id = row['user_id'],
  795. content = row['content'],
  796. posted_at = row['posted_at'],
  797. nico_count = row['nico_count'],
  798. vpos_ms = row['vpos_ms'])
  799. if with_relation_tables:
  800. comment.video = VideoDao (self.conn).find (comment.video_id, True)
  801. return comment
  802. @dataclass (slots = True)
  803. class CommentDto:
  804. video_id: int
  805. comment_no: int
  806. user_id: int
  807. content: str
  808. posted_at: datetime
  809. id_: int | None = None
  810. nico_count: int = 0
  811. vpos_ms: int | DbNullType = DbNull
  812. video: VideoDto | None = None
  813. user: UserDto | None = None
  814. class UserDao:
  815. def __init__ (
  816. self,
  817. conn,
  818. ):
  819. self.conn = conn
  820. def fetch_by_code (
  821. self,
  822. user_code: str
  823. ) -> UserDto | None:
  824. with self.conn.cursor () as c:
  825. c.execute ("""
  826. SELECT
  827. id,
  828. code
  829. FROM
  830. users
  831. WHERE
  832. code = %s""", user_code)
  833. row = c.fetchone ()
  834. if row is None:
  835. return None
  836. return self._create_dto_from_row (row)
  837. def insert (
  838. self,
  839. user: UserDto,
  840. ) -> None:
  841. with self.conn.cursor () as c:
  842. c.execute ("""
  843. INSERT INTO
  844. users(code)
  845. VALUES
  846. (%s)""", user.code)
  847. user.id_ = c.lastrowid
  848. def _create_dto_from_row (
  849. self,
  850. row,
  851. ) -> UserDto:
  852. return UserDto (id_ = row['id'],
  853. code = row['code'])
  854. @dataclass (slots = True)
  855. class UserDto:
  856. code: str
  857. id_: int | None = None
  858. if __name__ == '__main__':
  859. main ()