ぼざろクリーチャーシリーズ DB 兼 API(自分用)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

965 lines
32 KiB

  1. """
  2. 日次で実行し,ぼざクリ DB を最新に更新する.
  3. """
  4. from __future__ import annotations
  5. import json
  6. import os
  7. import random
  8. import string
  9. import time
  10. from dataclasses import dataclass
  11. from datetime import datetime, timedelta
  12. from typing import Any, TypedDict, cast
  13. import mysql.connector
  14. import requests
  15. # TODO: “何もしなぃ” を意味する None と区別可能にするため,NULL クラスを別途用意すること
  16. DbNull = None
  17. DbNullType = None
  18. class VideoSearchParam (TypedDict):
  19. q: str
  20. targets: str
  21. _sort: str
  22. fields: str
  23. _limit: int
  24. jsonFilter: str
  25. class VideoResult (TypedDict):
  26. contentId: str
  27. title: str
  28. tags: list[str]
  29. description: str
  30. viewCounter: int
  31. startTime: str
  32. class CommentResult (TypedDict):
  33. id: str
  34. no: int
  35. vposMs: int
  36. body: str
  37. commands: list[str]
  38. userId: str
  39. isPremium: bool
  40. score: int
  41. postedAt: str
  42. nicoruCount: int
  43. nicoruId: Any
  44. source: str
  45. isMyPost: bool
  46. def main (
  47. ) -> None:
  48. conn = mysql.connector.connect (user = os.environ['MYSQL_USER'],
  49. password = os.environ['MYSQL_PASS'],
  50. database = 'nizika_nico')
  51. now = datetime.now ()
  52. video_dao = VideoDao (conn)
  53. tag_dao = TagDao (conn)
  54. video_tag_dao = VideoTagDao (conn)
  55. video_history_dao = VideoHistoryDao (conn)
  56. comment_dao = CommentDao (conn)
  57. user_dao = UserDao (conn)
  58. api_data = search_nico_by_tags (['伊地知ニジカ', 'ぼざろクリーチャーシリーズ'])
  59. update_tables (video_dao, tag_dao, video_tag_dao, video_history_dao, comment_dao, user_dao,
  60. api_data, now)
  61. conn.commit ()
  62. conn.close ()
  63. def update_tables (
  64. video_dao: VideoDao,
  65. tag_dao: TagDao,
  66. video_tag_dao: VideoTagDao,
  67. video_history_dao: VideoHistoryDao,
  68. comment_dao: CommentDao,
  69. user_dao: UserDao,
  70. api_data: list[VideoResult],
  71. now: datetime,
  72. ) -> None:
  73. video_ids: list[int] = []
  74. for datum in api_data:
  75. video = VideoDto (code = datum['contentId'],
  76. title = datum['title'],
  77. description = datum['description'],
  78. uploaded_at = datetime.fromisoformat (datum['startTime']))
  79. video_dao.upsert (video, False)
  80. if video.id_ is not None:
  81. video_ids.append (video.id_)
  82. video_history = VideoHistoryDto (video_id = video.id_,
  83. fetched_at = now,
  84. views_count = datum['viewCounter'])
  85. video_history_dao.insert (video_history)
  86. tag_ids: list[int] = []
  87. video_tags = video_tag_dao.fetch_alive_by_video_id (video.id_, False)
  88. for vt in video_tags:
  89. tag = tag_dao.find (vt.tag_id)
  90. if (tag is not None
  91. and (tag.name not in datum['tags'])
  92. and (tag.id_ is not None)):
  93. tag_ids.append (tag.id_)
  94. video_tag_dao.untag_all (video.id_, tag_ids, now)
  95. tags: list[TagDto] = []
  96. for tag_name in datum['tags']:
  97. tag = tag_dao.fetch_by_name (tag_name)
  98. if tag is None:
  99. tag = TagDto (name = tag_name)
  100. tag_dao.insert (tag)
  101. if video.id_ is not None and tag.id_ is not None:
  102. video_tag = video_tag_dao.fetch_alive_by_ids (video.id_, tag.id_, False)
  103. if video_tag is None:
  104. video_tag = VideoTagDto (video_id = video.id_,
  105. tag_id = tag.id_,
  106. tagged_at = now)
  107. video_tag_dao.insert (video_tag, False)
  108. for com in fetch_comments (video.code):
  109. user = user_dao.fetch_by_code (com['userId'])
  110. if user is None:
  111. user = UserDto (code = com['userId'])
  112. user_dao.insert (user)
  113. if video.id_ is not None and user.id_ is not None:
  114. comment = CommentDto (video_id = video.id_,
  115. comment_no = com['no'],
  116. user_id = user.id_,
  117. content = com['body'],
  118. posted_at = datetime.fromisoformat (com['postedAt']),
  119. nico_count = com['nicoruCount'],
  120. vpos_ms = com['vposMs'])
  121. comment_dao.upsert (comment, False)
  122. alive_video_ids = [d['contentId'] for d in api_data]
  123. lost_video_ids: list[int] = []
  124. videos = video_dao.fetch_alive ()
  125. for video in videos:
  126. if video.id_ is not None and video.id_ not in alive_video_ids:
  127. lost_video_ids.append (video.id_)
  128. video_dao.delete (lost_video_ids, now)
  129. def fetch_comments (
  130. video_code: str,
  131. ) -> list[CommentResult]:
  132. time.sleep (1.2)
  133. headers = { 'X-Frontend-Id': '6',
  134. 'X-Frontend-Version': '0' }
  135. action_track_id = (
  136. ''.join (random.choice (string.ascii_letters + string.digits)
  137. for _ in range (10))
  138. + '_'
  139. + str (random.randrange (10 ** 12, 10 ** 13)))
  140. url = (f"https://www.nicovideo.jp/api/watch/v3_guest/{ video_code }"
  141. + f"?actionTrackId={ action_track_id }")
  142. res = requests.post (url, headers = headers, timeout = 60).json ()
  143. try:
  144. nv_comment = res['data']['comment']['nvComment']
  145. except KeyError:
  146. return []
  147. if nv_comment is None:
  148. return []
  149. headers = { 'X-Frontend-Id': '6',
  150. 'X-Frontend-Version': '0',
  151. 'Content-Type': 'application/json' }
  152. params = { 'params': nv_comment['params'],
  153. 'additionals': { },
  154. 'threadKey': nv_comment['threadKey'] }
  155. url = nv_comment['server'] + '/v1/threads'
  156. res = (requests.post (url, json.dumps (params),
  157. headers = headers,
  158. timeout = 60)
  159. .json ())
  160. try:
  161. return res['data']['threads'][1]['comments']
  162. except (IndexError, KeyError):
  163. return []
  164. def search_nico_by_tag (
  165. tag: str,
  166. ) -> list[VideoResult]:
  167. return search_nico_by_tags ([tag])
  168. def search_nico_by_tags (
  169. tags: list[str],
  170. ) -> list[VideoResult]:
  171. today = datetime.now ()
  172. url = ('https://snapshot.search.nicovideo.jp'
  173. + '/api/v2/snapshot/video/contents/search')
  174. result_data: list[VideoResult] = []
  175. to = datetime (2022, 12, 3)
  176. while to <= today:
  177. time.sleep (1.2)
  178. until = to + timedelta (days = 14)
  179. query_filter = json.dumps ({ 'type': 'or',
  180. 'filters': [
  181. { 'type': 'range',
  182. 'field': 'startTime',
  183. 'from': '%04d-%02d-%02dT00:00:00+09:00' % (to.year, to.month, to.day),
  184. 'to': '%04d-%02d-%02dT23:59:59+09:00' % (until.year, until.month, until.day),
  185. 'include_lower': True }] })
  186. params: VideoSearchParam = { 'q': ' OR '.join (tags),
  187. 'targets': 'tagsExact',
  188. '_sort': '-viewCounter',
  189. 'fields': ('contentId,'
  190. 'title,'
  191. 'tags,'
  192. 'description,'
  193. 'viewCounter,'
  194. 'startTime'),
  195. '_limit': 100,
  196. 'jsonFilter': query_filter }
  197. res = requests.get (url, params = cast (dict[str, int | str], params), timeout = 60).json ()
  198. try:
  199. result_data += res['data']
  200. except KeyError:
  201. pass
  202. to = until + timedelta (days = 1)
  203. return result_data
  204. class VideoDao:
  205. def __init__ (
  206. self,
  207. conn
  208. ):
  209. self.conn = conn
  210. def find (
  211. self,
  212. video_id: int,
  213. with_relation_tables: bool,
  214. ) -> VideoDto | None:
  215. with self.conn.cursor (dictionary = True) as c:
  216. c.execute ("""
  217. SELECT
  218. id,
  219. code,
  220. title,
  221. description,
  222. uploaded_at,
  223. deleted_at
  224. FROM
  225. videos
  226. WHERE
  227. id = %s
  228. ORDER BY
  229. id""", (video_id,))
  230. row = c.fetchone ()
  231. if row is None:
  232. return None
  233. return self._create_dto_from_row (row, with_relation_tables)
  234. def fetch_all (
  235. self,
  236. with_relation_tables: bool,
  237. ) -> list[VideoDto]:
  238. with self.conn.cursor (dictionary = True) as c:
  239. c.execute ("""
  240. SELECT
  241. id,
  242. code,
  243. title,
  244. description,
  245. uploaded_at,
  246. deleted_at
  247. FROM
  248. videos
  249. ORDER BY
  250. id""")
  251. videos: list[VideoDto] = []
  252. for row in c.fetchall ():
  253. videos.append (self._create_dto_from_row (row, with_relation_tables))
  254. return videos
  255. def fetch_alive (
  256. self,
  257. ) -> list[VideoDto]:
  258. with self.conn.cursor (dictionary = True) as c:
  259. c.execute ("""
  260. SELECT
  261. id,
  262. code,
  263. title,
  264. description,
  265. uploaded_at,
  266. deleted_at
  267. FROM
  268. videos
  269. WHERE
  270. deleted_at IS NULL""")
  271. videos: list[VideoDto] = []
  272. for row in c.fetchall ():
  273. videos.append (self._create_dto_from_row (row, False))
  274. return videos
  275. def upsert (
  276. self,
  277. video: VideoDto,
  278. with_relation_tables: bool,
  279. ) -> None:
  280. with self.conn.cursor (dictionary = True) as c:
  281. c.execute ("""
  282. INSERT INTO
  283. videos(
  284. code,
  285. title,
  286. description,
  287. uploaded_at,
  288. deleted_at)
  289. VALUES
  290. (
  291. %s,
  292. %s,
  293. %s,
  294. %s,
  295. %s)
  296. ON DUPLICATE KEY UPDATE
  297. code = VALUES(code),
  298. title = VALUES(title),
  299. description = VALUES(description),
  300. uploaded_at = VALUES(uploaded_at),
  301. deleted_at = VALUES(deleted_at)""", (video.code,
  302. video.title,
  303. video.description,
  304. video.uploaded_at,
  305. video.deleted_at))
  306. video.id_ = c.lastrowid
  307. if with_relation_tables:
  308. if video.video_tags is not None:
  309. VideoTagDao (self.conn).upsert_all (video.video_tags, False)
  310. if video.comments is not None:
  311. CommentDao (self.conn).upsert_all (video.comments, False)
  312. if video.video_histories is not None:
  313. VideoHistoryDao (self.conn).upsert_all (video.video_histories)
  314. def upsert_all (
  315. self,
  316. videos: list[VideoDto],
  317. with_relation_tables: bool,
  318. ) -> None:
  319. for video in videos:
  320. self.upsert (video, with_relation_tables)
  321. def delete (
  322. self,
  323. video_ids: list[int],
  324. at: datetime,
  325. ) -> None:
  326. if not video_ids:
  327. return
  328. with self.conn.cursor (dictionary = True) as c:
  329. c.execute ("""
  330. UPDATE
  331. videos
  332. SET
  333. deleted_at = %%s
  334. WHERE
  335. id IN (%s)""" % ', '.join (['%s'] * len (video_ids)), (at, *video_ids))
  336. def _create_dto_from_row (
  337. self,
  338. row,
  339. with_relation_tables: bool,
  340. ) -> VideoDto:
  341. video = VideoDto (id_ = row['id'],
  342. code = row['code'],
  343. title = row['title'],
  344. description = row['description'],
  345. uploaded_at = row['uploaded_at'],
  346. deleted_at = row['deleted_at'])
  347. if with_relation_tables and video.id_ is not None:
  348. video.video_tags = VideoTagDao (self.conn).fetch_by_video_id (video.id_, False)
  349. for i in range (len (video.video_tags)):
  350. video.video_tags[i].video = video
  351. video.comments = CommentDao (self.conn).fetch_by_video_id (video.id_, False)
  352. for i in range (len (video.comments)):
  353. video.comments[i].video = video
  354. video.video_histories = VideoHistoryDao (self.conn).fetch_by_video_id (video.id_, False)
  355. for i in range (len (video.video_histories)):
  356. video.video_histories[i].video = video
  357. return video
  358. @dataclass (slots = True)
  359. class VideoDto:
  360. code: str
  361. title: str
  362. description: str
  363. uploaded_at: datetime
  364. id_: int | None = None
  365. deleted_at: datetime | DbNullType = DbNull
  366. video_tags: list[VideoTagDto] | None = None
  367. comments: list[CommentDto] | None = None
  368. video_histories: list[VideoHistoryDto] | None = None
  369. class VideoTagDao:
  370. def __init__ (
  371. self,
  372. conn,
  373. ):
  374. self.conn = conn
  375. def fetch_by_video_id (
  376. self,
  377. video_id: int,
  378. with_relation_tables: bool,
  379. ) -> list[VideoTagDto]:
  380. with self.conn.cursor (dictionary = True) as c:
  381. c.execute ("""
  382. SELECT
  383. id,
  384. video_id,
  385. tag_id,
  386. tagged_at,
  387. untagged_at
  388. FROM
  389. video_tags
  390. WHERE
  391. video_id = %s
  392. ORDER BY
  393. id""", (video_id,))
  394. video_tags: list[VideoTagDto] = []
  395. for row in c.fetchall ():
  396. video_tags.append (self._create_dto_from_row (row, with_relation_tables))
  397. return video_tags
  398. def fetch_alive_by_video_id (
  399. self,
  400. video_id: int,
  401. with_relation_tables: bool,
  402. ) -> list[VideoTagDto]:
  403. with self.conn.cursor (dictionary = True) as c:
  404. c.execute ("""
  405. SELECT
  406. id,
  407. video_id,
  408. tag_id,
  409. tagged_at,
  410. untagged_at
  411. FROM
  412. video_tags
  413. WHERE
  414. video_id = %s
  415. AND (untagged_at IS NULL)
  416. ORDER BY
  417. id""", (video_id,))
  418. video_tags: list[VideoTagDto] = []
  419. for row in c.fetchall ():
  420. video_tags.append (self._create_dto_from_row (row, with_relation_tables))
  421. return video_tags
  422. def fetch_alive_by_ids (
  423. self,
  424. video_id: int,
  425. tag_id: int,
  426. with_relation_tables: bool,
  427. ) -> VideoTagDto | None:
  428. with self.conn.cursor (dictionary = True) as c:
  429. c.execute ("""
  430. SELECT
  431. id,
  432. video_id,
  433. tag_id,
  434. tagged_at,
  435. untagged_at
  436. FROM
  437. video_tags
  438. WHERE
  439. video_id = %s
  440. AND tag_id = %s""", (video_id, tag_id))
  441. row = c.fetchone ()
  442. if row is None:
  443. return None
  444. return self._create_dto_from_row (row, with_relation_tables)
  445. def insert (
  446. self,
  447. video_tag: VideoTagDto,
  448. with_relation_tables: bool,
  449. ) -> None:
  450. with self.conn.cursor (dictionary = True) as c:
  451. c.execute ("""
  452. INSERT INTO
  453. video_tags(
  454. video_id,
  455. tag_id,
  456. tagged_at,
  457. untagged_at)
  458. VALUES
  459. (
  460. %s,
  461. %s,
  462. %s,
  463. %s)""", (video_tag.video_id, video_tag.tag_id,
  464. video_tag.tagged_at, video_tag.untagged_at))
  465. video_tag.id_ = c.lastrowid
  466. if with_relation_tables:
  467. if video_tag.video is not None:
  468. VideoDao (self.conn).upsert (video_tag.video, True)
  469. if video_tag.tag is not None:
  470. TagDao (self.conn).upsert (video_tag.tag)
  471. def upsert (
  472. self,
  473. video_tag: VideoTagDto,
  474. with_relation_tables: bool,
  475. ) -> None:
  476. with self.conn.cursor (dictionary = True) as c:
  477. c.execute ("""
  478. INSERT INTO
  479. video_tags(
  480. video_id,
  481. tag_id,
  482. tagged_at,
  483. untagged_at)
  484. VALUES
  485. (
  486. %s,
  487. %s,
  488. %s,
  489. %s)
  490. ON DUPLICATE KEY UPDATE
  491. video_id = VALUES(video_id),
  492. tag_id = VALUES(tag_id),
  493. tagged_at = VALUES(tagged_at),
  494. untagged_at = VALUES(untagged_at)""", (video_tag.video_id,
  495. video_tag.tag_id,
  496. video_tag.tagged_at,
  497. video_tag.untagged_at))
  498. video_tag.id_ = c.lastrowid
  499. if with_relation_tables:
  500. if video_tag.video is not None:
  501. VideoDao (self.conn).upsert (video_tag.video, True)
  502. if video_tag.tag is not None:
  503. TagDao (self.conn).upsert (video_tag.tag)
  504. def upsert_all (
  505. self,
  506. video_tags: list[VideoTagDto],
  507. with_relation_tables: bool,
  508. ) -> None:
  509. for video_tag in video_tags:
  510. self.upsert (video_tag, with_relation_tables)
  511. def untag_all (
  512. self,
  513. video_id: int,
  514. tag_ids: list[int],
  515. now: datetime
  516. ) -> None:
  517. if not tag_ids:
  518. return
  519. with self.conn.cursor (dictionary = True) as c:
  520. c.execute ("""
  521. UPDATE
  522. video_tags
  523. SET
  524. untagged_at = %%s
  525. WHERE
  526. video_id = %%s
  527. AND tag_ids IN (%s)""" % ', '.join (['%s'] * len (tag_ids)), (now, video_id, *tag_ids))
  528. def _create_dto_from_row (
  529. self,
  530. row,
  531. with_relation_tables: bool,
  532. ) -> VideoTagDto:
  533. video_tag = VideoTagDto (id_ = row['id'],
  534. video_id = row['video_id'],
  535. tag_id = row['tag_id'],
  536. tagged_at = row['tagged_at'],
  537. untagged_at = row['untagged_at'])
  538. if with_relation_tables:
  539. video_tag.video = VideoDao (self.conn).find (video_tag.video_id, True)
  540. video_tag.tag = TagDao (self.conn).find (video_tag.tag_id)
  541. return video_tag
  542. @dataclass (slots = True)
  543. class VideoTagDto:
  544. video_id: int
  545. tag_id: int
  546. tagged_at: datetime
  547. id_: int | None = None
  548. untagged_at: datetime | DbNullType = DbNull
  549. video: VideoDto | None = None
  550. tag: TagDto | None = None
  551. class TagDao:
  552. def __init__ (
  553. self,
  554. conn,
  555. ):
  556. self.conn = conn
  557. def find (
  558. self,
  559. tag_id: int,
  560. ) -> TagDto | None:
  561. with self.conn.cursor (dictionary = True) as c:
  562. c.execute ("""
  563. SELECT
  564. id,
  565. name
  566. FROM
  567. tags
  568. WHERE
  569. id = %s""", (tag_id,))
  570. row = c.fetchone ()
  571. if row is None:
  572. return None
  573. return self._create_dto_from_row (row)
  574. def fetch_by_name (
  575. self,
  576. tag_name: str,
  577. ) -> TagDto | None:
  578. with self.conn.cursor (dictionary = True) as c:
  579. c.execute ("""
  580. SELECT
  581. id,
  582. name
  583. FROM
  584. tags
  585. WHERE
  586. name = %s""", (tag_name,))
  587. row = c.fetchone ()
  588. if row is None:
  589. return None
  590. return self._create_dto_from_row (row)
  591. def insert (
  592. self,
  593. tag: TagDto,
  594. ) -> None:
  595. with self.conn.cursor (dictionary = True) as c:
  596. c.execute ("""
  597. INSERT INTO
  598. tags(name)
  599. VALUES
  600. (%s)""", (tag.name,))
  601. tag.id_ = c.lastrowid
  602. def upsert (
  603. self,
  604. tag: TagDto,
  605. ) -> None:
  606. with self.conn.cursor (dictionary = True) as c:
  607. c.execute ("""
  608. INSERT INTO
  609. tags(name)
  610. VALUES
  611. (%s)
  612. ON DUPLICATE KEY UPDATE
  613. name = VALUES(name)""", (tag.name,))
  614. tag.id_ = c.lastrowid
  615. def _create_dto_from_row (
  616. self,
  617. row,
  618. ) -> TagDto:
  619. return TagDto (id_ = row['id'],
  620. name = row['name'])
  621. @dataclass (slots = True)
  622. class TagDto:
  623. name: str
  624. id_: int | None = None
  625. class VideoHistoryDao:
  626. def __init__ (
  627. self,
  628. conn,
  629. ):
  630. self.conn = conn
  631. def fetch_by_video_id (
  632. self,
  633. video_id: int,
  634. with_relation_tables: bool,
  635. ) -> list[VideoHistoryDto]:
  636. with self.conn.cursor (dictionary = True) as c:
  637. c.execute ("""
  638. SELECT
  639. id,
  640. video_id,
  641. fetched_at,
  642. views_count
  643. FROM
  644. video_histories
  645. WHERE
  646. video_id = %s""", (video_id,))
  647. video_histories: list[VideoHistoryDto] = []
  648. for row in c.fetchall ():
  649. video_histories.append (self._create_dto_from_row (row, with_relation_tables))
  650. return video_histories
  651. def insert (
  652. self,
  653. video_history: VideoHistoryDto,
  654. ) -> None:
  655. with self.conn.cursor (dictionary = True) as c:
  656. c.execute ("""
  657. INSERT INTO
  658. video_histories(
  659. video_id,
  660. fetched_at,
  661. views_count)
  662. VALUES
  663. (
  664. %s,
  665. %s,
  666. %s)""", (video_history.video_id,
  667. video_history.fetched_at,
  668. video_history.views_count))
  669. def upsert (
  670. self,
  671. video_history: VideoHistoryDto,
  672. ) -> None:
  673. with self.conn.cursor (dictionary = True) as c:
  674. c.execute ("""
  675. INSERT INTO
  676. video_histories(
  677. video_id,
  678. fetched_at,
  679. views_count)
  680. VALUES
  681. (
  682. %s,
  683. %s,
  684. %s)
  685. ON DUPLICATE KEY UPDATE
  686. video_id,
  687. fetched_at,
  688. views_count""", (video_history.video_id,
  689. video_history.fetched_at,
  690. video_history.views_count))
  691. def upsert_all (
  692. self,
  693. video_histories: list[VideoHistoryDto],
  694. ) -> None:
  695. for video_history in video_histories:
  696. self.upsert (video_history)
  697. def _create_dto_from_row (
  698. self,
  699. row,
  700. with_relation_tables: bool,
  701. ) -> VideoHistoryDto:
  702. video_history = VideoHistoryDto (id_ = row['id'],
  703. video_id = row['video_id'],
  704. fetched_at = row['fetched_at'],
  705. views_count = row['views_count'])
  706. if with_relation_tables:
  707. video_history.video = VideoDao (self.conn).find (video_history.video_id, True)
  708. return video_history
  709. @dataclass (slots = True)
  710. class VideoHistoryDto:
  711. video_id: int
  712. fetched_at: datetime
  713. views_count: int
  714. id_: int | None = None
  715. video: VideoDto | None = None
  716. class CommentDao:
  717. def __init__ (
  718. self,
  719. conn,
  720. ):
  721. self.conn = conn
  722. def fetch_by_video_id (
  723. self,
  724. video_id: int,
  725. with_relation_tables: bool,
  726. ) -> list[CommentDto]:
  727. with self.conn.cursor (dictionary = True) as c:
  728. c.execute ("""
  729. SELECT
  730. id,
  731. video_id,
  732. comment_no,
  733. user_id,
  734. content,
  735. posted_at,
  736. nico_count,
  737. vpos_ms
  738. FROM
  739. comments
  740. WHERE
  741. video_id = %s""", (video_id,))
  742. comments: list[CommentDto] = []
  743. for row in c.fetchall ():
  744. comments.append (self._create_dto_from_row (row, with_relation_tables))
  745. return comments
  746. def upsert (
  747. self,
  748. comment: CommentDto,
  749. with_relation_tables: bool,
  750. ) -> None:
  751. with self.conn.cursor (dictionary = True) as c:
  752. c.execute ("""
  753. INSERT INTO
  754. comments(
  755. video_id,
  756. comment_no,
  757. user_id,
  758. content,
  759. posted_at,
  760. nico_count,
  761. vpos_ms)
  762. VALUES
  763. (
  764. %s,
  765. %s,
  766. %s,
  767. %s,
  768. %s,
  769. %s,
  770. %s)
  771. ON DUPLICATE KEY UPDATE
  772. video_id = VALUES(video_id),
  773. comment_no = VALUES(comment_no),
  774. user_id = VALUES(user_id),
  775. content = VALUES(content),
  776. posted_at = VALUES(posted_at),
  777. nico_count = VALUES(nico_count),
  778. vpos_ms = VALUES(vpos_ms)""", (comment.video_id,
  779. comment.comment_no,
  780. comment.user_id,
  781. comment.content,
  782. comment.posted_at,
  783. comment.nico_count,
  784. comment.vpos_ms))
  785. def upsert_all (
  786. self,
  787. comments: list[CommentDto],
  788. with_relation_tables: bool,
  789. ) -> None:
  790. for comment in comments:
  791. self.upsert (comment, with_relation_tables)
  792. def _create_dto_from_row (
  793. self,
  794. row,
  795. with_relation_tables: bool,
  796. ) -> CommentDto:
  797. comment = CommentDto (id_ = row['id'],
  798. video_id = row['video_id'],
  799. comment_no = row['comment_no'],
  800. user_id = row['user_id'],
  801. content = row['content'],
  802. posted_at = row['posted_at'],
  803. nico_count = row['nico_count'],
  804. vpos_ms = row['vpos_ms'])
  805. if with_relation_tables:
  806. comment.video = VideoDao (self.conn).find (comment.video_id, True)
  807. return comment
  808. @dataclass (slots = True)
  809. class CommentDto:
  810. video_id: int
  811. comment_no: int
  812. user_id: int
  813. content: str
  814. posted_at: datetime
  815. id_: int | None = None
  816. nico_count: int = 0
  817. vpos_ms: int | DbNullType = DbNull
  818. video: VideoDto | None = None
  819. user: UserDto | None = None
  820. class UserDao:
  821. def __init__ (
  822. self,
  823. conn,
  824. ):
  825. self.conn = conn
  826. def fetch_by_code (
  827. self,
  828. user_code: str
  829. ) -> UserDto | None:
  830. with self.conn.cursor (dictionary = True) as c:
  831. c.execute ("""
  832. SELECT
  833. id,
  834. code
  835. FROM
  836. users
  837. WHERE
  838. code = %s""", (user_code,))
  839. row = c.fetchone ()
  840. if row is None:
  841. return None
  842. return self._create_dto_from_row (row)
  843. def insert (
  844. self,
  845. user: UserDto,
  846. ) -> None:
  847. with self.conn.cursor (dictionary = True) as c:
  848. c.execute ("""
  849. INSERT INTO
  850. users(code)
  851. VALUES
  852. (%s)""", (user.code,))
  853. user.id_ = c.lastrowid
  854. def _create_dto_from_row (
  855. self,
  856. row,
  857. ) -> UserDto:
  858. return UserDto (id_ = row['id'],
  859. code = row['code'])
  860. @dataclass (slots = True)
  861. class UserDto:
  862. code: str
  863. id_: int | None = None
  864. if __name__ == '__main__':
  865. main ()