ぼざろクリーチャーシリーズ DB 兼 API(自分用)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

968 lines
32 KiB

  1. """
  2. 日次で実行し,ぼざクリ DB を最新に更新する.
  3. """
  4. from __future__ import annotations
  5. import json
  6. import os
  7. import random
  8. import string
  9. import time
  10. from dataclasses import dataclass
  11. from datetime import datetime, timedelta
  12. from typing import Any, TypedDict, cast
  13. import mysql.connector
  14. import requests
  15. # TODO: “何もしなぃ” を意味する None と区別可能にするため,NULL クラスを別途用意すること
  16. DbNull = None
  17. DbNullType = type (None)
  18. class VideoSearchParam (TypedDict):
  19. q: str
  20. targets: str
  21. _sort: str
  22. fields: str
  23. _limit: int
  24. jsonFilter: str
  25. class VideoResult (TypedDict):
  26. contentId: str
  27. title: str
  28. tags: str
  29. description: str | None
  30. viewCounter: int
  31. startTime: str
  32. class CommentResult (TypedDict):
  33. id: str
  34. no: int
  35. vposMs: int
  36. body: str
  37. commands: list[str]
  38. userId: str
  39. isPremium: bool
  40. score: int
  41. postedAt: str
  42. nicoruCount: int
  43. nicoruId: Any
  44. source: str
  45. isMyPost: bool
  46. def main (
  47. ) -> None:
  48. conn = mysql.connector.connect (user = os.environ['MYSQL_USER'],
  49. password = os.environ['MYSQL_PASS'],
  50. database = 'nizika_nico')
  51. now = datetime.now ()
  52. video_dao = VideoDao (conn)
  53. tag_dao = TagDao (conn)
  54. video_tag_dao = VideoTagDao (conn)
  55. video_history_dao = VideoHistoryDao (conn)
  56. comment_dao = CommentDao (conn)
  57. user_dao = UserDao (conn)
  58. api_data = search_nico_by_tags (['伊地知ニジカ', 'ぼざろクリーチャーシリーズ'])
  59. update_tables (video_dao, tag_dao, video_tag_dao, video_history_dao, comment_dao, user_dao,
  60. api_data, now)
  61. conn.commit ()
  62. conn.close ()
  63. def update_tables (
  64. video_dao: VideoDao,
  65. tag_dao: TagDao,
  66. video_tag_dao: VideoTagDao,
  67. video_history_dao: VideoHistoryDao,
  68. comment_dao: CommentDao,
  69. user_dao: UserDao,
  70. api_data: list[VideoResult],
  71. now: datetime,
  72. ) -> None:
  73. video_ids: list[int] = []
  74. for datum in api_data:
  75. tag_names = datum['tags'].split ()
  76. video = VideoDto (code = datum['contentId'],
  77. title = datum['title'],
  78. description = datum['description'],
  79. uploaded_at = datetime.fromisoformat (datum['startTime']))
  80. if video.description is None:
  81. video.description = ''
  82. video_dao.upsert (video, False)
  83. if video.id_ is not None:
  84. video_ids.append (video.id_)
  85. video_history = VideoHistoryDto (video_id = video.id_,
  86. fetched_at = now,
  87. views_count = datum['viewCounter'])
  88. video_history_dao.insert (video_history)
  89. tag_ids: list[int] = []
  90. video_tags = video_tag_dao.fetch_alive_by_video_id (video.id_, False)
  91. for vt in video_tags:
  92. tag = tag_dao.find (vt.tag_id)
  93. if (tag is not None
  94. and (tag.name not in tag_names)
  95. and (tag.id_ is not None)):
  96. tag_ids.append (tag.id_)
  97. video_tag_dao.untag_all (video.id_, tag_ids, now)
  98. tags: list[TagDto] = []
  99. for tag_name in tag_names:
  100. tag = tag_dao.fetch_by_name (tag_name)
  101. if tag is None:
  102. tag = TagDto (name = tag_name)
  103. tag_dao.insert (tag)
  104. if video.id_ is not None and tag.id_ is not None:
  105. video_tag = video_tag_dao.fetch_alive_by_ids (video.id_, tag.id_, False)
  106. if video_tag is None:
  107. video_tag = VideoTagDto (video_id = video.id_,
  108. tag_id = tag.id_,
  109. tagged_at = now)
  110. video_tag_dao.insert (video_tag, False)
  111. for com in fetch_comments (video.code):
  112. user = user_dao.fetch_by_code (com['userId'])
  113. if user is None:
  114. user = UserDto (code = com['userId'])
  115. user_dao.insert (user)
  116. if video.id_ is not None and user.id_ is not None:
  117. comment = CommentDto (video_id = video.id_,
  118. comment_no = com['no'],
  119. user_id = user.id_,
  120. content = com['body'],
  121. posted_at = datetime.fromisoformat (com['postedAt']),
  122. nico_count = com['nicoruCount'],
  123. vpos_ms = com['vposMs'])
  124. comment_dao.upsert (comment, False)
  125. alive_video_codes = [d['contentId'] for d in api_data]
  126. lost_video_ids: list[int] = []
  127. videos = video_dao.fetch_alive ()
  128. for video in videos:
  129. if video.id_ is not None and video.code not in alive_video_codes:
  130. lost_video_ids.append (video.id_)
  131. video_dao.delete (lost_video_ids, now)
  132. def fetch_comments (
  133. video_code: str,
  134. ) -> list[CommentResult]:
  135. time.sleep (1.2)
  136. headers = { 'X-Frontend-Id': '6',
  137. 'X-Frontend-Version': '0' }
  138. action_track_id = (
  139. ''.join (random.choice (string.ascii_letters + string.digits)
  140. for _ in range (10))
  141. + '_'
  142. + str (random.randrange (10 ** 12, 10 ** 13)))
  143. url = (f"https://www.nicovideo.jp/api/watch/v3_guest/{ video_code }"
  144. + f"?actionTrackId={ action_track_id }")
  145. res = requests.post (url, headers = headers, timeout = 60).json ()
  146. try:
  147. nv_comment = res['data']['comment']['nvComment']
  148. except KeyError:
  149. return []
  150. if nv_comment is None:
  151. return []
  152. headers = { 'X-Frontend-Id': '6',
  153. 'X-Frontend-Version': '0',
  154. 'Content-Type': 'application/json' }
  155. params = { 'params': nv_comment['params'],
  156. 'additionals': { },
  157. 'threadKey': nv_comment['threadKey'] }
  158. url = nv_comment['server'] + '/v1/threads'
  159. res = (requests.post (url, json.dumps (params),
  160. headers = headers,
  161. timeout = 60)
  162. .json ())
  163. try:
  164. return res['data']['threads'][1]['comments']
  165. except (IndexError, KeyError):
  166. return []
  167. def search_nico_by_tag (
  168. tag: str,
  169. ) -> list[VideoResult]:
  170. return search_nico_by_tags ([tag])
  171. def search_nico_by_tags (
  172. tags: list[str],
  173. ) -> list[VideoResult]:
  174. today = datetime.now ()
  175. url = ('https://snapshot.search.nicovideo.jp'
  176. + '/api/v2/snapshot/video/contents/search')
  177. result_data: list[VideoResult] = []
  178. to = datetime (2022, 12, 3)
  179. while to <= today:
  180. time.sleep (1.2)
  181. until = to + timedelta (days = 14)
  182. query_filter = json.dumps ({ 'type': 'or',
  183. 'filters': [
  184. { 'type': 'range',
  185. 'field': 'startTime',
  186. 'from': '%04d-%02d-%02dT00:00:00+09:00' % (to.year, to.month, to.day),
  187. 'to': '%04d-%02d-%02dT23:59:59+09:00' % (until.year, until.month, until.day),
  188. 'include_lower': True }] })
  189. params: VideoSearchParam = { 'q': ' OR '.join (tags),
  190. 'targets': 'tagsExact',
  191. '_sort': '-viewCounter',
  192. 'fields': ('contentId,'
  193. 'title,'
  194. 'tags,'
  195. 'description,'
  196. 'viewCounter,'
  197. 'startTime'),
  198. '_limit': 100,
  199. 'jsonFilter': query_filter }
  200. res = requests.get (url, params = cast (dict[str, int | str], params), timeout = 60).json ()
  201. try:
  202. result_data += res['data']
  203. except KeyError:
  204. pass
  205. to = until + timedelta (days = 1)
  206. return result_data
  207. class VideoDao:
  208. def __init__ (
  209. self,
  210. conn
  211. ):
  212. self.conn = conn
  213. def find (
  214. self,
  215. video_id: int,
  216. with_relation_tables: bool,
  217. ) -> VideoDto | None:
  218. with self.conn.cursor (dictionary = True) as c:
  219. c.execute ("""
  220. SELECT
  221. id,
  222. code,
  223. title,
  224. description,
  225. uploaded_at,
  226. deleted_at
  227. FROM
  228. videos
  229. WHERE
  230. id = %s
  231. ORDER BY
  232. id""", (video_id,))
  233. row = c.fetchone ()
  234. if row is None:
  235. return None
  236. return self._create_dto_from_row (row, with_relation_tables)
  237. def fetch_all (
  238. self,
  239. with_relation_tables: bool,
  240. ) -> list[VideoDto]:
  241. with self.conn.cursor (dictionary = True) as c:
  242. c.execute ("""
  243. SELECT
  244. id,
  245. code,
  246. title,
  247. description,
  248. uploaded_at,
  249. deleted_at
  250. FROM
  251. videos
  252. ORDER BY
  253. id""")
  254. videos: list[VideoDto] = []
  255. for row in c.fetchall ():
  256. videos.append (self._create_dto_from_row (row, with_relation_tables))
  257. return videos
  258. def fetch_alive (
  259. self,
  260. ) -> list[VideoDto]:
  261. with self.conn.cursor (dictionary = True) as c:
  262. c.execute ("""
  263. SELECT
  264. id,
  265. code,
  266. title,
  267. description,
  268. uploaded_at,
  269. deleted_at
  270. FROM
  271. videos
  272. WHERE
  273. deleted_at IS NULL""")
  274. videos: list[VideoDto] = []
  275. for row in c.fetchall ():
  276. videos.append (self._create_dto_from_row (row, False))
  277. return videos
  278. def upsert (
  279. self,
  280. video: VideoDto,
  281. with_relation_tables: bool,
  282. ) -> None:
  283. with self.conn.cursor (dictionary = True) as c:
  284. c.execute ("""
  285. INSERT INTO
  286. videos(
  287. code,
  288. title,
  289. description,
  290. uploaded_at,
  291. deleted_at)
  292. VALUES
  293. (
  294. %s,
  295. %s,
  296. %s,
  297. %s,
  298. %s)
  299. ON DUPLICATE KEY UPDATE
  300. code = VALUES(code),
  301. title = VALUES(title),
  302. description = VALUES(description),
  303. uploaded_at = VALUES(uploaded_at),
  304. deleted_at = VALUES(deleted_at)""", (video.code,
  305. video.title,
  306. video.description,
  307. video.uploaded_at,
  308. video.deleted_at))
  309. video.id_ = c.lastrowid
  310. if with_relation_tables:
  311. if video.video_tags is not None:
  312. VideoTagDao (self.conn).upsert_all (video.video_tags, False)
  313. if video.comments is not None:
  314. CommentDao (self.conn).upsert_all (video.comments, False)
  315. if video.video_histories is not None:
  316. VideoHistoryDao (self.conn).upsert_all (video.video_histories)
  317. def upsert_all (
  318. self,
  319. videos: list[VideoDto],
  320. with_relation_tables: bool,
  321. ) -> None:
  322. for video in videos:
  323. self.upsert (video, with_relation_tables)
  324. def delete (
  325. self,
  326. video_ids: list[int],
  327. at: datetime,
  328. ) -> None:
  329. if not video_ids:
  330. return
  331. with self.conn.cursor (dictionary = True) as c:
  332. c.execute ("""
  333. UPDATE
  334. videos
  335. SET
  336. deleted_at = %%s
  337. WHERE
  338. id IN (%s)""" % ', '.join (['%s'] * len (video_ids)), (at, *video_ids))
  339. def _create_dto_from_row (
  340. self,
  341. row,
  342. with_relation_tables: bool,
  343. ) -> VideoDto:
  344. video = VideoDto (id_ = row['id'],
  345. code = row['code'],
  346. title = row['title'],
  347. description = row['description'],
  348. uploaded_at = row['uploaded_at'],
  349. deleted_at = row['deleted_at'])
  350. if with_relation_tables and video.id_ is not None:
  351. video.video_tags = VideoTagDao (self.conn).fetch_by_video_id (video.id_, False)
  352. for i in range (len (video.video_tags)):
  353. video.video_tags[i].video = video
  354. video.comments = CommentDao (self.conn).fetch_by_video_id (video.id_, False)
  355. for i in range (len (video.comments)):
  356. video.comments[i].video = video
  357. video.video_histories = VideoHistoryDao (self.conn).fetch_by_video_id (video.id_, False)
  358. for i in range (len (video.video_histories)):
  359. video.video_histories[i].video = video
  360. return video
  361. @dataclass (slots = True)
  362. class VideoDto:
  363. code: str
  364. title: str
  365. description: str
  366. uploaded_at: datetime
  367. id_: int | None = None
  368. deleted_at: datetime | DbNullType = DbNull
  369. video_tags: list[VideoTagDto] | None = None
  370. comments: list[CommentDto] | None = None
  371. video_histories: list[VideoHistoryDto] | None = None
  372. class VideoTagDao:
  373. def __init__ (
  374. self,
  375. conn,
  376. ):
  377. self.conn = conn
  378. def fetch_by_video_id (
  379. self,
  380. video_id: int,
  381. with_relation_tables: bool,
  382. ) -> list[VideoTagDto]:
  383. with self.conn.cursor (dictionary = True) as c:
  384. c.execute ("""
  385. SELECT
  386. id,
  387. video_id,
  388. tag_id,
  389. tagged_at,
  390. untagged_at
  391. FROM
  392. video_tags
  393. WHERE
  394. video_id = %s
  395. ORDER BY
  396. id""", (video_id,))
  397. video_tags: list[VideoTagDto] = []
  398. for row in c.fetchall ():
  399. video_tags.append (self._create_dto_from_row (row, with_relation_tables))
  400. return video_tags
  401. def fetch_alive_by_video_id (
  402. self,
  403. video_id: int,
  404. with_relation_tables: bool,
  405. ) -> list[VideoTagDto]:
  406. with self.conn.cursor (dictionary = True) as c:
  407. c.execute ("""
  408. SELECT
  409. id,
  410. video_id,
  411. tag_id,
  412. tagged_at,
  413. untagged_at
  414. FROM
  415. video_tags
  416. WHERE
  417. video_id = %s
  418. AND (untagged_at IS NULL)
  419. ORDER BY
  420. id""", (video_id,))
  421. video_tags: list[VideoTagDto] = []
  422. for row in c.fetchall ():
  423. video_tags.append (self._create_dto_from_row (row, with_relation_tables))
  424. return video_tags
  425. def fetch_alive_by_ids (
  426. self,
  427. video_id: int,
  428. tag_id: int,
  429. with_relation_tables: bool,
  430. ) -> VideoTagDto | None:
  431. with self.conn.cursor (dictionary = True) as c:
  432. c.execute ("""
  433. SELECT
  434. id,
  435. video_id,
  436. tag_id,
  437. tagged_at,
  438. untagged_at
  439. FROM
  440. video_tags
  441. WHERE
  442. video_id = %s
  443. AND tag_id = %s""", (video_id, tag_id))
  444. row = c.fetchone ()
  445. if row is None:
  446. return None
  447. return self._create_dto_from_row (row, with_relation_tables)
  448. def insert (
  449. self,
  450. video_tag: VideoTagDto,
  451. with_relation_tables: bool,
  452. ) -> None:
  453. with self.conn.cursor (dictionary = True) as c:
  454. c.execute ("""
  455. INSERT INTO
  456. video_tags(
  457. video_id,
  458. tag_id,
  459. tagged_at,
  460. untagged_at)
  461. VALUES
  462. (
  463. %s,
  464. %s,
  465. %s,
  466. %s)""", (video_tag.video_id, video_tag.tag_id,
  467. video_tag.tagged_at, video_tag.untagged_at))
  468. video_tag.id_ = c.lastrowid
  469. if with_relation_tables:
  470. if video_tag.video is not None:
  471. VideoDao (self.conn).upsert (video_tag.video, True)
  472. if video_tag.tag is not None:
  473. TagDao (self.conn).upsert (video_tag.tag)
  474. def upsert (
  475. self,
  476. video_tag: VideoTagDto,
  477. with_relation_tables: bool,
  478. ) -> None:
  479. with self.conn.cursor (dictionary = True) as c:
  480. c.execute ("""
  481. INSERT INTO
  482. video_tags(
  483. video_id,
  484. tag_id,
  485. tagged_at,
  486. untagged_at)
  487. VALUES
  488. (
  489. %s,
  490. %s,
  491. %s,
  492. %s)
  493. ON DUPLICATE KEY UPDATE
  494. video_id = VALUES(video_id),
  495. tag_id = VALUES(tag_id),
  496. tagged_at = VALUES(tagged_at),
  497. untagged_at = VALUES(untagged_at)""", (video_tag.video_id,
  498. video_tag.tag_id,
  499. video_tag.tagged_at,
  500. video_tag.untagged_at))
  501. video_tag.id_ = c.lastrowid
  502. if with_relation_tables:
  503. if video_tag.video is not None:
  504. VideoDao (self.conn).upsert (video_tag.video, True)
  505. if video_tag.tag is not None:
  506. TagDao (self.conn).upsert (video_tag.tag)
  507. def upsert_all (
  508. self,
  509. video_tags: list[VideoTagDto],
  510. with_relation_tables: bool,
  511. ) -> None:
  512. for video_tag in video_tags:
  513. self.upsert (video_tag, with_relation_tables)
  514. def untag_all (
  515. self,
  516. video_id: int,
  517. tag_ids: list[int],
  518. now: datetime,
  519. ) -> None:
  520. if not tag_ids:
  521. return
  522. with self.conn.cursor (dictionary = True) as c:
  523. c.execute ("""
  524. UPDATE
  525. video_tags
  526. SET
  527. untagged_at = %%s
  528. WHERE
  529. video_id = %%s
  530. AND tag_ids IN (%s)""" % ', '.join (['%s'] * len (tag_ids)), (now, video_id, *tag_ids))
  531. def _create_dto_from_row (
  532. self,
  533. row,
  534. with_relation_tables: bool,
  535. ) -> VideoTagDto:
  536. video_tag = VideoTagDto (id_ = row['id'],
  537. video_id = row['video_id'],
  538. tag_id = row['tag_id'],
  539. tagged_at = row['tagged_at'],
  540. untagged_at = row['untagged_at'])
  541. if with_relation_tables:
  542. video_tag.video = VideoDao (self.conn).find (video_tag.video_id, True)
  543. video_tag.tag = TagDao (self.conn).find (video_tag.tag_id)
  544. return video_tag
  545. @dataclass (slots = True)
  546. class VideoTagDto:
  547. video_id: int
  548. tag_id: int
  549. tagged_at: datetime
  550. id_: int | None = None
  551. untagged_at: datetime | DbNullType = DbNull
  552. video: VideoDto | None = None
  553. tag: TagDto | None = None
  554. class TagDao:
  555. def __init__ (
  556. self,
  557. conn,
  558. ):
  559. self.conn = conn
  560. def find (
  561. self,
  562. tag_id: int,
  563. ) -> TagDto | None:
  564. with self.conn.cursor (dictionary = True) as c:
  565. c.execute ("""
  566. SELECT
  567. id,
  568. name
  569. FROM
  570. tags
  571. WHERE
  572. id = %s""", (tag_id,))
  573. row = c.fetchone ()
  574. if row is None:
  575. return None
  576. return self._create_dto_from_row (row)
  577. def fetch_by_name (
  578. self,
  579. tag_name: str,
  580. ) -> TagDto | None:
  581. with self.conn.cursor (dictionary = True) as c:
  582. c.execute ("""
  583. SELECT
  584. id,
  585. name
  586. FROM
  587. tags
  588. WHERE
  589. name = %s""", (tag_name,))
  590. row = c.fetchone ()
  591. if row is None:
  592. return None
  593. return self._create_dto_from_row (row)
  594. def insert (
  595. self,
  596. tag: TagDto,
  597. ) -> None:
  598. with self.conn.cursor (dictionary = True) as c:
  599. c.execute ("""
  600. INSERT INTO
  601. tags(name)
  602. VALUES
  603. (%s)""", (tag.name,))
  604. tag.id_ = c.lastrowid
  605. def upsert (
  606. self,
  607. tag: TagDto,
  608. ) -> None:
  609. with self.conn.cursor (dictionary = True) as c:
  610. c.execute ("""
  611. INSERT INTO
  612. tags(name)
  613. VALUES
  614. (%s)
  615. ON DUPLICATE KEY UPDATE
  616. name = VALUES(name)""", (tag.name,))
  617. tag.id_ = c.lastrowid
  618. def _create_dto_from_row (
  619. self,
  620. row,
  621. ) -> TagDto:
  622. return TagDto (id_ = row['id'],
  623. name = row['name'])
  624. @dataclass (slots = True)
  625. class TagDto:
  626. name: str
  627. id_: int | None = None
  628. class VideoHistoryDao:
  629. def __init__ (
  630. self,
  631. conn,
  632. ):
  633. self.conn = conn
  634. def fetch_by_video_id (
  635. self,
  636. video_id: int,
  637. with_relation_tables: bool,
  638. ) -> list[VideoHistoryDto]:
  639. with self.conn.cursor (dictionary = True) as c:
  640. c.execute ("""
  641. SELECT
  642. id,
  643. video_id,
  644. fetched_at,
  645. views_count
  646. FROM
  647. video_histories
  648. WHERE
  649. video_id = %s""", (video_id,))
  650. video_histories: list[VideoHistoryDto] = []
  651. for row in c.fetchall ():
  652. video_histories.append (self._create_dto_from_row (row, with_relation_tables))
  653. return video_histories
  654. def insert (
  655. self,
  656. video_history: VideoHistoryDto,
  657. ) -> None:
  658. with self.conn.cursor (dictionary = True) as c:
  659. c.execute ("""
  660. INSERT INTO
  661. video_histories(
  662. video_id,
  663. fetched_at,
  664. views_count)
  665. VALUES
  666. (
  667. %s,
  668. %s,
  669. %s)""", (video_history.video_id,
  670. video_history.fetched_at,
  671. video_history.views_count))
  672. def upsert (
  673. self,
  674. video_history: VideoHistoryDto,
  675. ) -> None:
  676. with self.conn.cursor (dictionary = True) as c:
  677. c.execute ("""
  678. INSERT INTO
  679. video_histories(
  680. video_id,
  681. fetched_at,
  682. views_count)
  683. VALUES
  684. (
  685. %s,
  686. %s,
  687. %s)
  688. ON DUPLICATE KEY UPDATE
  689. video_id,
  690. fetched_at,
  691. views_count""", (video_history.video_id,
  692. video_history.fetched_at,
  693. video_history.views_count))
  694. def upsert_all (
  695. self,
  696. video_histories: list[VideoHistoryDto],
  697. ) -> None:
  698. for video_history in video_histories:
  699. self.upsert (video_history)
  700. def _create_dto_from_row (
  701. self,
  702. row,
  703. with_relation_tables: bool,
  704. ) -> VideoHistoryDto:
  705. video_history = VideoHistoryDto (id_ = row['id'],
  706. video_id = row['video_id'],
  707. fetched_at = row['fetched_at'],
  708. views_count = row['views_count'])
  709. if with_relation_tables:
  710. video_history.video = VideoDao (self.conn).find (video_history.video_id, True)
  711. return video_history
  712. @dataclass (slots = True)
  713. class VideoHistoryDto:
  714. video_id: int
  715. fetched_at: datetime
  716. views_count: int
  717. id_: int | None = None
  718. video: VideoDto | None = None
  719. class CommentDao:
  720. def __init__ (
  721. self,
  722. conn,
  723. ):
  724. self.conn = conn
  725. def fetch_by_video_id (
  726. self,
  727. video_id: int,
  728. with_relation_tables: bool,
  729. ) -> list[CommentDto]:
  730. with self.conn.cursor (dictionary = True) as c:
  731. c.execute ("""
  732. SELECT
  733. id,
  734. video_id,
  735. comment_no,
  736. user_id,
  737. content,
  738. posted_at,
  739. nico_count,
  740. vpos_ms
  741. FROM
  742. comments
  743. WHERE
  744. video_id = %s""", (video_id,))
  745. comments: list[CommentDto] = []
  746. for row in c.fetchall ():
  747. comments.append (self._create_dto_from_row (row, with_relation_tables))
  748. return comments
  749. def upsert (
  750. self,
  751. comment: CommentDto,
  752. with_relation_tables: bool,
  753. ) -> None:
  754. with self.conn.cursor (dictionary = True) as c:
  755. c.execute ("""
  756. INSERT INTO
  757. comments(
  758. video_id,
  759. comment_no,
  760. user_id,
  761. content,
  762. posted_at,
  763. nico_count,
  764. vpos_ms)
  765. VALUES
  766. (
  767. %s,
  768. %s,
  769. %s,
  770. %s,
  771. %s,
  772. %s,
  773. %s)
  774. ON DUPLICATE KEY UPDATE
  775. video_id = VALUES(video_id),
  776. comment_no = VALUES(comment_no),
  777. user_id = VALUES(user_id),
  778. content = VALUES(content),
  779. posted_at = VALUES(posted_at),
  780. nico_count = VALUES(nico_count),
  781. vpos_ms = VALUES(vpos_ms)""", (comment.video_id,
  782. comment.comment_no,
  783. comment.user_id,
  784. comment.content,
  785. comment.posted_at,
  786. comment.nico_count,
  787. comment.vpos_ms))
  788. def upsert_all (
  789. self,
  790. comments: list[CommentDto],
  791. with_relation_tables: bool,
  792. ) -> None:
  793. for comment in comments:
  794. self.upsert (comment, with_relation_tables)
  795. def _create_dto_from_row (
  796. self,
  797. row,
  798. with_relation_tables: bool,
  799. ) -> CommentDto:
  800. comment = CommentDto (id_ = row['id'],
  801. video_id = row['video_id'],
  802. comment_no = row['comment_no'],
  803. user_id = row['user_id'],
  804. content = row['content'],
  805. posted_at = row['posted_at'],
  806. nico_count = row['nico_count'],
  807. vpos_ms = row['vpos_ms'])
  808. if with_relation_tables:
  809. comment.video = VideoDao (self.conn).find (comment.video_id, True)
  810. return comment
  811. @dataclass (slots = True)
  812. class CommentDto:
  813. video_id: int
  814. comment_no: int
  815. user_id: int
  816. content: str
  817. posted_at: datetime
  818. id_: int | None = None
  819. nico_count: int = 0
  820. vpos_ms: int | DbNullType = DbNull
  821. video: VideoDto | None = None
  822. user: UserDto | None = None
  823. class UserDao:
  824. def __init__ (
  825. self,
  826. conn,
  827. ):
  828. self.conn = conn
  829. def fetch_by_code (
  830. self,
  831. user_code: str
  832. ) -> UserDto | None:
  833. with self.conn.cursor (dictionary = True) as c:
  834. c.execute ("""
  835. SELECT
  836. id,
  837. code
  838. FROM
  839. users
  840. WHERE
  841. code = %s""", (user_code,))
  842. row = c.fetchone ()
  843. if row is None:
  844. return None
  845. return self._create_dto_from_row (row)
  846. def insert (
  847. self,
  848. user: UserDto,
  849. ) -> None:
  850. with self.conn.cursor (dictionary = True) as c:
  851. c.execute ("""
  852. INSERT INTO
  853. users(code)
  854. VALUES
  855. (%s)""", (user.code,))
  856. user.id_ = c.lastrowid
  857. def _create_dto_from_row (
  858. self,
  859. row,
  860. ) -> UserDto:
  861. return UserDto (id_ = row['id'],
  862. code = row['code'])
  863. @dataclass (slots = True)
  864. class UserDto:
  865. code: str
  866. id_: int | None = None
  867. if __name__ == '__main__':
  868. main ()