ぼざろクリーチャーシリーズ DB 兼 API(自分用)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

1079 lines
35 KiB

  1. """
  2. 日次で実行し,ぼざクリ DB を最新に更新する.
  3. """
  4. from __future__ import annotations
  5. import json
  6. import os
  7. import random
  8. import string
  9. import time
  10. import unicodedata
  11. from dataclasses import dataclass
  12. from datetime import date, datetime, timedelta
  13. from typing import Any, Type, TypedDict, cast
  14. import mysql.connector
  15. import requests
  16. from mysql.connector.connection import MySQLConnectionAbstract
  17. class DbNull:
  18. def __new__ (
  19. cls,
  20. ):
  21. delattr (cls, '__init__')
  22. DbNullType = Type[DbNull]
  23. class VideoSearchParam (TypedDict):
  24. q: str
  25. targets: str
  26. _sort: str
  27. fields: str
  28. _limit: int
  29. jsonFilter: str
  30. class VideoResult (TypedDict):
  31. contentId: str
  32. title: str
  33. tags: str
  34. description: str | None
  35. viewCounter: int
  36. startTime: str
  37. class CommentResult (TypedDict):
  38. id: str
  39. no: int
  40. vposMs: int
  41. body: str
  42. commands: list[str]
  43. userId: str
  44. isPremium: bool
  45. score: int
  46. postedAt: str
  47. nicoruCount: int
  48. nicoruId: Any
  49. source: str
  50. isMyPost: bool
  51. class CommentRow (TypedDict):
  52. id: int
  53. video_id: int
  54. comment_no: int
  55. user_id: int
  56. content: str
  57. posted_at: datetime
  58. nico_count: int
  59. vpos_ms: int | None
  60. class TagRow (TypedDict):
  61. id: int
  62. name: str
  63. class UserRow (TypedDict):
  64. id: int
  65. code: str
  66. class VideoRow (TypedDict):
  67. id: int
  68. code: str
  69. title: str
  70. description: str
  71. uploaded_at: datetime
  72. deleted_at: datetime | None
  73. class VideoHistoryRow (TypedDict):
  74. id: int
  75. video_id: int
  76. fetched_at: date
  77. views_count: int
  78. class VideoTagRow (TypedDict):
  79. id: int
  80. video_id: int
  81. tag_id: int
  82. tagged_at: date
  83. untagged_at: date | None
  84. def main (
  85. ) -> None:
  86. conn = mysql.connector.connect (user = os.environ['MYSQL_USER'],
  87. password = os.environ['MYSQL_PASS'],
  88. database = 'nizika_nico')
  89. if not isinstance (conn, MySQLConnectionAbstract):
  90. raise TypeError
  91. now = datetime.now ()
  92. video_dao = VideoDao (conn)
  93. tag_dao = TagDao (conn)
  94. video_tag_dao = VideoTagDao (conn)
  95. video_history_dao = VideoHistoryDao (conn)
  96. comment_dao = CommentDao (conn)
  97. user_dao = UserDao (conn)
  98. api_data = search_nico_by_tags (['伊地知ニジカ', 'ぼざろクリーチャーシリーズ'])
  99. update_tables (video_dao, tag_dao, video_tag_dao, video_history_dao, comment_dao, user_dao,
  100. api_data, now)
  101. conn.commit ()
  102. conn.close ()
  103. def update_tables (
  104. video_dao: VideoDao,
  105. tag_dao: TagDao,
  106. video_tag_dao: VideoTagDao,
  107. video_history_dao: VideoHistoryDao,
  108. comment_dao: CommentDao,
  109. user_dao: UserDao,
  110. api_data: list[VideoResult],
  111. now: datetime,
  112. ) -> None:
  113. for datum in api_data:
  114. tag_names: list[str] = datum['tags'].split ()
  115. video = VideoDto (code = datum['contentId'],
  116. title = datum['title'],
  117. description = datum['description'] or '',
  118. uploaded_at = datetime.fromisoformat (datum['startTime']))
  119. video_dao.upsert (video, False)
  120. if video.id_ is not None:
  121. video_history = VideoHistoryDto (video_id = video.id_,
  122. fetched_at = now,
  123. views_count = datum['viewCounter'])
  124. video_history_dao.insert (video_history)
  125. tag_ids: list[int] = []
  126. video_tags = video_tag_dao.fetch_alive_by_video_id (video.id_, False)
  127. for vt in video_tags:
  128. tag = tag_dao.find (vt.tag_id)
  129. if (tag is not None
  130. and (normalise (tag.name) not in map (normalise, tag_names))
  131. and (tag.id_ is not None)):
  132. tag_ids.append (tag.id_)
  133. video_tag_dao.untag_all (video.id_, tag_ids, now)
  134. tags: list[TagDto] = []
  135. for tag_name in tag_names:
  136. tag = tag_dao.fetch_by_name (tag_name)
  137. if tag is None:
  138. tag = TagDto (name = tag_name)
  139. tag_dao.insert (tag)
  140. if video.id_ is not None and tag.id_ is not None:
  141. video_tag = video_tag_dao.fetch_alive_by_ids (video.id_, tag.id_, False)
  142. if video_tag is None:
  143. video_tag = VideoTagDto (video_id = video.id_,
  144. tag_id = tag.id_,
  145. tagged_at = now)
  146. video_tag_dao.insert (video_tag, False)
  147. for com in fetch_comments (video.code):
  148. user = user_dao.fetch_by_code (com['userId'])
  149. if user is None:
  150. user = UserDto (code = com['userId'])
  151. user_dao.insert (user)
  152. if video.id_ is not None and user.id_ is not None:
  153. comment = CommentDto (video_id = video.id_,
  154. comment_no = com['no'],
  155. user_id = user.id_,
  156. content = com['body'],
  157. posted_at = datetime.fromisoformat (com['postedAt']),
  158. nico_count = com['nicoruCount'],
  159. vpos_ms = com['vposMs'])
  160. comment_dao.upsert (comment, False)
  161. alive_video_codes = [d['contentId'] for d in api_data]
  162. lost_video_ids: list[int] = []
  163. videos = video_dao.fetch_alive ()
  164. for video in videos:
  165. if video.id_ is not None and video.code not in alive_video_codes:
  166. lost_video_ids.append (video.id_)
  167. video_dao.delete (lost_video_ids, now)
  168. def fetch_comments (
  169. video_code: str,
  170. ) -> list[CommentResult]:
  171. time.sleep (1.2)
  172. headers = { 'X-Frontend-Id': '6',
  173. 'X-Frontend-Version': '0' }
  174. action_track_id = (
  175. ''.join (random.choice (string.ascii_letters + string.digits)
  176. for _ in range (10))
  177. + '_'
  178. + str (random.randrange (10 ** 12, 10 ** 13)))
  179. url = (f"https://www.nicovideo.jp/api/watch/v3_guest/{ video_code }"
  180. + f"?actionTrackId={ action_track_id }")
  181. res = requests.post (url, headers = headers, timeout = 60).json ()
  182. try:
  183. nv_comment = res['data']['comment']['nvComment']
  184. except KeyError:
  185. return []
  186. if nv_comment is None:
  187. return []
  188. headers = { 'X-Frontend-Id': '6',
  189. 'X-Frontend-Version': '0',
  190. 'Content-Type': 'application/json' }
  191. params = { 'params': nv_comment['params'],
  192. 'additionals': { },
  193. 'threadKey': nv_comment['threadKey'] }
  194. url = nv_comment['server'] + '/v1/threads'
  195. res = (requests.post (url, json.dumps (params),
  196. headers = headers,
  197. timeout = 60)
  198. .json ())
  199. try:
  200. return res['data']['threads'][1]['comments']
  201. except (IndexError, KeyError):
  202. return []
  203. def search_nico_by_tag (
  204. tag: str,
  205. ) -> list[VideoResult]:
  206. return search_nico_by_tags ([tag])
  207. def search_nico_by_tags (
  208. tags: list[str],
  209. ) -> list[VideoResult]:
  210. today = datetime.now ()
  211. url = ('https://snapshot.search.nicovideo.jp'
  212. + '/api/v2/snapshot/video/contents/search')
  213. result_data: list[VideoResult] = []
  214. to = datetime (2022, 12, 3)
  215. while to <= today:
  216. time.sleep (1.2)
  217. until = to + timedelta (days = 14)
  218. query_filter = json.dumps ({ 'type': 'or',
  219. 'filters': [
  220. { 'type': 'range',
  221. 'field': 'startTime',
  222. 'from': '%04d-%02d-%02dT00:00:00+09:00' % (to.year, to.month, to.day),
  223. 'to': '%04d-%02d-%02dT23:59:59+09:00' % (until.year, until.month, until.day),
  224. 'include_lower': True }] })
  225. params: VideoSearchParam = { 'q': ' OR '.join (tags),
  226. 'targets': 'tagsExact',
  227. '_sort': '-viewCounter',
  228. 'fields': ('contentId,'
  229. 'title,'
  230. 'tags,'
  231. 'description,'
  232. 'viewCounter,'
  233. 'startTime'),
  234. '_limit': 100,
  235. 'jsonFilter': query_filter }
  236. res = requests.get (url, params = cast (dict[str, int | str], params), timeout = 60).json ()
  237. try:
  238. result_data += res['data']
  239. except KeyError:
  240. pass
  241. to = until + timedelta (days = 1)
  242. return result_data
  243. class VideoDao:
  244. def __init__ (
  245. self,
  246. conn: MySQLConnectionAbstract,
  247. ):
  248. self.conn = conn
  249. def find (
  250. self,
  251. video_id: int,
  252. with_relation_tables: bool,
  253. ) -> VideoDto | None:
  254. with self.conn.cursor (dictionary = True) as c:
  255. c.execute ("""
  256. SELECT
  257. id,
  258. code,
  259. title,
  260. description,
  261. uploaded_at,
  262. deleted_at
  263. FROM
  264. videos
  265. WHERE
  266. id = %s
  267. ORDER BY
  268. id""", (video_id,))
  269. print (c._executed)
  270. row = cast (VideoRow | None, c.fetchone ())
  271. if row is None:
  272. return None
  273. return self._create_dto_from_row (row, with_relation_tables)
  274. def fetch_all (
  275. self,
  276. with_relation_tables: bool,
  277. ) -> list[VideoDto]:
  278. with self.conn.cursor (dictionary = True) as c:
  279. c.execute ("""
  280. SELECT
  281. id,
  282. code,
  283. title,
  284. description,
  285. uploaded_at,
  286. deleted_at
  287. FROM
  288. videos
  289. ORDER BY
  290. id""")
  291. print (c._executed)
  292. videos: list[VideoDto] = []
  293. for row in cast (list[VideoRow], c.fetchall ()):
  294. videos.append (self._create_dto_from_row (row, with_relation_tables))
  295. return videos
  296. def fetch_alive (
  297. self,
  298. ) -> list[VideoDto]:
  299. with self.conn.cursor (dictionary = True) as c:
  300. c.execute ("""
  301. SELECT
  302. id,
  303. code,
  304. title,
  305. description,
  306. uploaded_at,
  307. deleted_at
  308. FROM
  309. videos
  310. WHERE
  311. deleted_at IS NULL""")
  312. print (c._executed)
  313. videos: list[VideoDto] = []
  314. for row in cast (list[VideoRow], c.fetchall ()):
  315. videos.append (self._create_dto_from_row (row, False))
  316. return videos
  317. def upsert (
  318. self,
  319. video: VideoDto,
  320. with_relation_tables: bool,
  321. ) -> None:
  322. deleted_at: datetime | DbNullType | None = video.deleted_at
  323. if deleted_at is None:
  324. raise TypeError ('未実装')
  325. if deleted_at is DbNull:
  326. deleted_at = None
  327. deleted_at = cast (datetime | None, deleted_at)
  328. with self.conn.cursor (dictionary = True) as c:
  329. c.execute ("""
  330. INSERT INTO
  331. videos(
  332. code,
  333. title,
  334. description,
  335. uploaded_at,
  336. deleted_at)
  337. VALUES
  338. (
  339. %s,
  340. %s,
  341. %s,
  342. %s,
  343. %s)
  344. ON DUPLICATE KEY UPDATE
  345. id = LAST_INSERT_ID(id),
  346. code = VALUES(code),
  347. title = VALUES(title),
  348. description = VALUES(description),
  349. uploaded_at = VALUES(uploaded_at),
  350. deleted_at = VALUES(deleted_at)""", (video.code,
  351. video.title,
  352. video.description,
  353. video.uploaded_at,
  354. deleted_at))
  355. print (c._executed)
  356. video.id_ = c.lastrowid
  357. if with_relation_tables:
  358. if video.video_tags is not None:
  359. VideoTagDao (self.conn).upsert_all (video.video_tags, False)
  360. if video.comments is not None:
  361. CommentDao (self.conn).upsert_all (video.comments, False)
  362. if video.video_histories is not None:
  363. VideoHistoryDao (self.conn).upsert_all (video.video_histories)
  364. def upsert_all (
  365. self,
  366. videos: list[VideoDto],
  367. with_relation_tables: bool,
  368. ) -> None:
  369. for video in videos:
  370. self.upsert (video, with_relation_tables)
  371. def delete (
  372. self,
  373. video_ids: list[int],
  374. at: datetime,
  375. ) -> None:
  376. if not video_ids:
  377. return
  378. with self.conn.cursor (dictionary = True) as c:
  379. c.execute ("""
  380. UPDATE
  381. videos
  382. SET
  383. deleted_at = %%s
  384. WHERE
  385. id IN (%s)""" % ', '.join (['%s'] * len (video_ids)), (at, *video_ids))
  386. print (c._executed)
  387. def _create_dto_from_row (
  388. self,
  389. row: VideoRow,
  390. with_relation_tables: bool,
  391. ) -> VideoDto:
  392. video = VideoDto (id_ = row['id'],
  393. code = row['code'],
  394. title = row['title'],
  395. description = row['description'],
  396. uploaded_at = row['uploaded_at'],
  397. deleted_at = row['deleted_at'] or DbNull)
  398. if with_relation_tables and video.id_ is not None:
  399. video.video_tags = VideoTagDao (self.conn).fetch_by_video_id (video.id_, False)
  400. for i in range (len (video.video_tags)):
  401. video.video_tags[i].video = video
  402. video.comments = CommentDao (self.conn).fetch_by_video_id (video.id_, False)
  403. for i in range (len (video.comments)):
  404. video.comments[i].video = video
  405. video.video_histories = VideoHistoryDao (self.conn).fetch_by_video_id (video.id_, False)
  406. for i in range (len (video.video_histories)):
  407. video.video_histories[i].video = video
  408. return video
  409. @dataclass (slots = True)
  410. class VideoDto:
  411. code: str
  412. title: str
  413. description: str
  414. uploaded_at: datetime
  415. id_: int | None = None
  416. deleted_at: datetime | DbNullType = DbNull
  417. video_tags: list[VideoTagDto] | None = None
  418. comments: list[CommentDto] | None = None
  419. video_histories: list[VideoHistoryDto] | None = None
  420. class VideoTagDao:
  421. def __init__ (
  422. self,
  423. conn: MySQLConnectionAbstract,
  424. ):
  425. self.conn = conn
  426. def fetch_by_video_id (
  427. self,
  428. video_id: int,
  429. with_relation_tables: bool,
  430. ) -> list[VideoTagDto]:
  431. with self.conn.cursor (dictionary = True) as c:
  432. c.execute ("""
  433. SELECT
  434. id,
  435. video_id,
  436. tag_id,
  437. tagged_at,
  438. untagged_at
  439. FROM
  440. video_tags
  441. WHERE
  442. video_id = %s
  443. ORDER BY
  444. id""", (video_id,))
  445. print (c._executed)
  446. video_tags: list[VideoTagDto] = []
  447. for row in cast (list[VideoTagRow], c.fetchall ()):
  448. video_tags.append (self._create_dto_from_row (row, with_relation_tables))
  449. return video_tags
  450. def fetch_alive_by_video_id (
  451. self,
  452. video_id: int,
  453. with_relation_tables: bool,
  454. ) -> list[VideoTagDto]:
  455. with self.conn.cursor (dictionary = True) as c:
  456. c.execute ("""
  457. SELECT
  458. id,
  459. video_id,
  460. tag_id,
  461. tagged_at,
  462. untagged_at
  463. FROM
  464. video_tags
  465. WHERE
  466. video_id = %s
  467. AND untagged_at IS NULL
  468. ORDER BY
  469. id""", (video_id,))
  470. print (c._executed)
  471. video_tags: list[VideoTagDto] = []
  472. for row in cast (list[VideoTagRow], c.fetchall ()):
  473. video_tags.append (self._create_dto_from_row (row, with_relation_tables))
  474. return video_tags
  475. def fetch_alive_by_ids (
  476. self,
  477. video_id: int,
  478. tag_id: int,
  479. with_relation_tables: bool,
  480. ) -> VideoTagDto | None:
  481. with self.conn.cursor (dictionary = True) as c:
  482. c.execute ("""
  483. SELECT
  484. id,
  485. video_id,
  486. tag_id,
  487. tagged_at,
  488. untagged_at
  489. FROM
  490. video_tags
  491. WHERE
  492. video_id = %s
  493. AND tag_id = %s
  494. AND untagged_at IS NULL""", (video_id, tag_id))
  495. print (c._executed)
  496. row = cast (VideoTagRow, c.fetchone ())
  497. if row is None:
  498. return None
  499. return self._create_dto_from_row (row, with_relation_tables)
  500. def insert (
  501. self,
  502. video_tag: VideoTagDto,
  503. with_relation_tables: bool,
  504. ) -> None:
  505. untagged_at: date | DbNullType | None = video_tag.untagged_at
  506. if untagged_at is None:
  507. raise TypeError ('未実装')
  508. if untagged_at is DbNull:
  509. untagged_at = None
  510. untagged_at = cast (date | None, untagged_at)
  511. with self.conn.cursor (dictionary = True) as c:
  512. c.execute ("""
  513. INSERT INTO
  514. video_tags(
  515. video_id,
  516. tag_id,
  517. tagged_at,
  518. untagged_at)
  519. VALUES
  520. (
  521. %s,
  522. %s,
  523. %s,
  524. %s)""", (video_tag.video_id, video_tag.tag_id,
  525. video_tag.tagged_at, untagged_at))
  526. print (c._executed)
  527. video_tag.id_ = c.lastrowid
  528. if with_relation_tables:
  529. if video_tag.video is not None:
  530. VideoDao (self.conn).upsert (video_tag.video, True)
  531. if video_tag.tag is not None:
  532. TagDao (self.conn).upsert (video_tag.tag)
  533. def upsert (
  534. self,
  535. video_tag: VideoTagDto,
  536. with_relation_tables: bool,
  537. ) -> None:
  538. untagged_at: date | DbNullType | None = video_tag.untagged_at
  539. if untagged_at is None:
  540. raise TypeError ('未実装')
  541. if untagged_at is DbNull:
  542. untagged_at = None
  543. untagged_at = cast (date | None, untagged_at)
  544. with self.conn.cursor (dictionary = True) as c:
  545. c.execute ("""
  546. INSERT INTO
  547. video_tags(
  548. video_id,
  549. tag_id,
  550. tagged_at,
  551. untagged_at)
  552. VALUES
  553. (
  554. %s,
  555. %s,
  556. %s,
  557. %s)
  558. ON DUPLICATE KEY UPDATE
  559. id = LAST_INSERT_ID(id),
  560. video_id = VALUES(video_id),
  561. tag_id = VALUES(tag_id),
  562. tagged_at = VALUES(tagged_at),
  563. untagged_at = VALUES(untagged_at)""", (video_tag.video_id,
  564. video_tag.tag_id,
  565. video_tag.tagged_at,
  566. untagged_at))
  567. print (c._executed)
  568. video_tag.id_ = c.lastrowid
  569. if with_relation_tables:
  570. if video_tag.video is not None:
  571. VideoDao (self.conn).upsert (video_tag.video, True)
  572. if video_tag.tag is not None:
  573. TagDao (self.conn).upsert (video_tag.tag)
  574. def upsert_all (
  575. self,
  576. video_tags: list[VideoTagDto],
  577. with_relation_tables: bool,
  578. ) -> None:
  579. for video_tag in video_tags:
  580. self.upsert (video_tag, with_relation_tables)
  581. def untag_all (
  582. self,
  583. video_id: int,
  584. tag_ids: list[int],
  585. now: datetime,
  586. ) -> None:
  587. if not tag_ids:
  588. return
  589. with self.conn.cursor (dictionary = True) as c:
  590. c.execute ("""
  591. UPDATE
  592. video_tags
  593. SET
  594. untagged_at = %%s
  595. WHERE
  596. video_id = %%s
  597. AND tag_id IN (%s)""" % ', '.join (['%s'] * len (tag_ids)), (now, video_id, *tag_ids))
  598. print (c._executed)
  599. def _create_dto_from_row (
  600. self,
  601. row: VideoTagRow,
  602. with_relation_tables: bool,
  603. ) -> VideoTagDto:
  604. video_tag = VideoTagDto (id_ = row['id'],
  605. video_id = row['video_id'],
  606. tag_id = row['tag_id'],
  607. tagged_at = row['tagged_at'],
  608. untagged_at = row['untagged_at'] or DbNull)
  609. if with_relation_tables:
  610. video_tag.video = VideoDao (self.conn).find (video_tag.video_id, True)
  611. video_tag.tag = TagDao (self.conn).find (video_tag.tag_id)
  612. return video_tag
  613. @dataclass (slots = True)
  614. class VideoTagDto:
  615. video_id: int
  616. tag_id: int
  617. tagged_at: date
  618. id_: int | None = None
  619. untagged_at: date | DbNullType = DbNull
  620. video: VideoDto | None = None
  621. tag: TagDto | None = None
  622. class TagDao:
  623. def __init__ (
  624. self,
  625. conn: MySQLConnectionAbstract,
  626. ):
  627. self.conn = conn
  628. def find (
  629. self,
  630. tag_id: int,
  631. ) -> TagDto | None:
  632. with self.conn.cursor (dictionary = True) as c:
  633. c.execute ("""
  634. SELECT
  635. id,
  636. name
  637. FROM
  638. tags
  639. WHERE
  640. id = %s""", (tag_id,))
  641. print (c._executed)
  642. row = cast (TagRow | None, c.fetchone ())
  643. if row is None:
  644. return None
  645. return self._create_dto_from_row (row)
  646. def fetch_by_name (
  647. self,
  648. tag_name: str,
  649. ) -> TagDto | None:
  650. with self.conn.cursor (dictionary = True) as c:
  651. c.execute ("""
  652. SELECT
  653. id,
  654. name
  655. FROM
  656. tags
  657. WHERE
  658. name = %s""", (tag_name,))
  659. print (c._executed)
  660. row = cast (TagRow | None, c.fetchone ())
  661. if row is None:
  662. return None
  663. return self._create_dto_from_row (row)
  664. def insert (
  665. self,
  666. tag: TagDto,
  667. ) -> None:
  668. with self.conn.cursor (dictionary = True) as c:
  669. c.execute ("""
  670. INSERT INTO
  671. tags(name)
  672. VALUES
  673. (%s)""", (tag.name,))
  674. print (c._executed)
  675. tag.id_ = c.lastrowid
  676. def upsert (
  677. self,
  678. tag: TagDto,
  679. ) -> None:
  680. with self.conn.cursor (dictionary = True) as c:
  681. c.execute ("""
  682. INSERT INTO
  683. tags(name)
  684. VALUES
  685. (%s)
  686. ON DUPLICATE KEY UPDATE
  687. id = LAST_INSERT_ID(id),
  688. name = VALUES(name)""", (tag.name,))
  689. print (c._executed)
  690. tag.id_ = c.lastrowid
  691. def _create_dto_from_row (
  692. self,
  693. row: TagRow,
  694. ) -> TagDto:
  695. return TagDto (id_ = row['id'],
  696. name = row['name'])
  697. @dataclass (slots = True)
  698. class TagDto:
  699. name: str
  700. id_: int | None = None
  701. class VideoHistoryDao:
  702. def __init__ (
  703. self,
  704. conn: MySQLConnectionAbstract,
  705. ):
  706. self.conn = conn
  707. def fetch_by_video_id (
  708. self,
  709. video_id: int,
  710. with_relation_tables: bool,
  711. ) -> list[VideoHistoryDto]:
  712. with self.conn.cursor (dictionary = True) as c:
  713. c.execute ("""
  714. SELECT
  715. id,
  716. video_id,
  717. fetched_at,
  718. views_count
  719. FROM
  720. video_histories
  721. WHERE
  722. video_id = %s""", (video_id,))
  723. print (c._executed)
  724. video_histories: list[VideoHistoryDto] = []
  725. for row in cast (list[VideoHistoryRow], c.fetchall ()):
  726. video_histories.append (self._create_dto_from_row (row, with_relation_tables))
  727. return video_histories
  728. def insert (
  729. self,
  730. video_history: VideoHistoryDto,
  731. ) -> None:
  732. with self.conn.cursor (dictionary = True) as c:
  733. c.execute ("""
  734. INSERT INTO
  735. video_histories(
  736. video_id,
  737. fetched_at,
  738. views_count)
  739. VALUES
  740. (
  741. %s,
  742. %s,
  743. %s)""", (video_history.video_id,
  744. video_history.fetched_at,
  745. video_history.views_count))
  746. print (c._executed)
  747. def upsert (
  748. self,
  749. video_history: VideoHistoryDto,
  750. ) -> None:
  751. with self.conn.cursor (dictionary = True) as c:
  752. c.execute ("""
  753. INSERT INTO
  754. video_histories(
  755. video_id,
  756. fetched_at,
  757. views_count)
  758. VALUES
  759. (
  760. %s,
  761. %s,
  762. %s)
  763. ON DUPLICATE KEY UPDATE
  764. id = LAST_INSERT_ID(id),
  765. video_id = VALUES(video_id),
  766. fetched_at = VALUES(fetched_at),
  767. views_count = VALUES(views_count)""", (video_history.video_id,
  768. video_history.fetched_at,
  769. video_history.views_count))
  770. print (c._executed)
  771. def upsert_all (
  772. self,
  773. video_histories: list[VideoHistoryDto],
  774. ) -> None:
  775. for video_history in video_histories:
  776. self.upsert (video_history)
  777. def _create_dto_from_row (
  778. self,
  779. row: VideoHistoryRow,
  780. with_relation_tables: bool,
  781. ) -> VideoHistoryDto:
  782. video_history = VideoHistoryDto (id_ = row['id'],
  783. video_id = row['video_id'],
  784. fetched_at = row['fetched_at'],
  785. views_count = row['views_count'])
  786. if with_relation_tables:
  787. video_history.video = VideoDao (self.conn).find (video_history.video_id, True)
  788. return video_history
  789. @dataclass (slots = True)
  790. class VideoHistoryDto:
  791. video_id: int
  792. fetched_at: date
  793. views_count: int
  794. id_: int | None = None
  795. video: VideoDto | None = None
  796. class CommentDao:
  797. def __init__ (
  798. self,
  799. conn: MySQLConnectionAbstract,
  800. ):
  801. self.conn = conn
  802. def fetch_by_video_id (
  803. self,
  804. video_id: int,
  805. with_relation_tables: bool,
  806. ) -> list[CommentDto]:
  807. with self.conn.cursor (dictionary = True) as c:
  808. c.execute ("""
  809. SELECT
  810. id,
  811. video_id,
  812. comment_no,
  813. user_id,
  814. content,
  815. posted_at,
  816. nico_count,
  817. vpos_ms
  818. FROM
  819. comments
  820. WHERE
  821. video_id = %s""", (video_id,))
  822. print (c._executed)
  823. comments: list[CommentDto] = []
  824. for row in cast (list[CommentRow], c.fetchall ()):
  825. comments.append (self._create_dto_from_row (row, with_relation_tables))
  826. return comments
  827. def upsert (
  828. self,
  829. comment: CommentDto,
  830. with_relation_tables: bool,
  831. ) -> None:
  832. vpos_ms: int | DbNullType | None = comment.vpos_ms
  833. if vpos_ms is None:
  834. raise TypeError ('未実装')
  835. if vpos_ms is DbNull:
  836. vpos_ms = None
  837. vpos_ms = cast (int | None, vpos_ms)
  838. with self.conn.cursor (dictionary = True) as c:
  839. c.execute ("""
  840. INSERT INTO
  841. comments(
  842. video_id,
  843. comment_no,
  844. user_id,
  845. content,
  846. posted_at,
  847. nico_count,
  848. vpos_ms)
  849. VALUES
  850. (
  851. %s,
  852. %s,
  853. %s,
  854. %s,
  855. %s,
  856. %s,
  857. %s)
  858. ON DUPLICATE KEY UPDATE
  859. id = LAST_INSERT_ID(id),
  860. video_id = VALUES(video_id),
  861. comment_no = VALUES(comment_no),
  862. user_id = VALUES(user_id),
  863. content = VALUES(content),
  864. posted_at = VALUES(posted_at),
  865. nico_count = VALUES(nico_count),
  866. vpos_ms = VALUES(vpos_ms)""", (comment.video_id,
  867. comment.comment_no,
  868. comment.user_id,
  869. comment.content,
  870. comment.posted_at,
  871. comment.nico_count,
  872. vpos_ms))
  873. print (c._executed)
  874. def upsert_all (
  875. self,
  876. comments: list[CommentDto],
  877. with_relation_tables: bool,
  878. ) -> None:
  879. for comment in comments:
  880. self.upsert (comment, with_relation_tables)
  881. def _create_dto_from_row (
  882. self,
  883. row: CommentRow,
  884. with_relation_tables: bool,
  885. ) -> CommentDto:
  886. comment = CommentDto (id_ = row['id'],
  887. video_id = row['video_id'],
  888. comment_no = row['comment_no'],
  889. user_id = row['user_id'],
  890. content = row['content'],
  891. posted_at = row['posted_at'],
  892. nico_count = row['nico_count'],
  893. vpos_ms = row['vpos_ms'] or DbNull)
  894. if with_relation_tables:
  895. comment.video = VideoDao (self.conn).find (comment.video_id, True)
  896. return comment
  897. @dataclass (slots = True)
  898. class CommentDto:
  899. video_id: int
  900. comment_no: int
  901. user_id: int
  902. content: str
  903. posted_at: datetime
  904. id_: int | None = None
  905. nico_count: int = 0
  906. vpos_ms: int | DbNullType = DbNull
  907. video: VideoDto | None = None
  908. user: UserDto | None = None
  909. class UserDao:
  910. def __init__ (
  911. self,
  912. conn: MySQLConnectionAbstract,
  913. ):
  914. self.conn = conn
  915. def fetch_by_code (
  916. self,
  917. user_code: str
  918. ) -> UserDto | None:
  919. with self.conn.cursor (dictionary = True) as c:
  920. c.execute ("""
  921. SELECT
  922. id,
  923. code
  924. FROM
  925. users
  926. WHERE
  927. code = %s""", (user_code,))
  928. print (c._executed)
  929. row = cast (UserRow | None, c.fetchone ())
  930. if row is None:
  931. return None
  932. return self._create_dto_from_row (row)
  933. def insert (
  934. self,
  935. user: UserDto,
  936. ) -> None:
  937. with self.conn.cursor (dictionary = True) as c:
  938. c.execute ("""
  939. INSERT INTO
  940. users(code)
  941. VALUES
  942. (%s)""", (user.code,))
  943. print (c.execute)
  944. user.id_ = c.lastrowid
  945. def _create_dto_from_row (
  946. self,
  947. row: UserRow,
  948. ) -> UserDto:
  949. return UserDto (id_ = row['id'],
  950. code = row['code'])
  951. @dataclass (slots = True)
  952. class UserDto:
  953. code: str
  954. id_: int | None = None
  955. def normalise (
  956. s: str
  957. ) -> str:
  958. return unicodedata.normalize ('NFKC', s).lower ()
  959. if __name__ == '__main__':
  960. main ()