ぼざろクリーチャーシリーズ DB 兼 API(自分用)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

1072 lines
35 KiB

  1. """
  2. 日次で実行し,ぼざクリ DB を最新に更新する.
  3. """
  4. from __future__ import annotations
  5. import json
  6. import os
  7. import random
  8. import string
  9. import time
  10. from dataclasses import dataclass
  11. from datetime import date, datetime, timedelta
  12. from typing import Any, Type, TypedDict, cast
  13. import mysql.connector
  14. import requests
  15. from mysql.connector.connection import MySQLConnectionAbstract
  16. class DbNull:
  17. def __new__ (
  18. cls,
  19. ):
  20. delattr (cls, '__init__')
  21. DbNullType = Type[DbNull]
  22. class VideoSearchParam (TypedDict):
  23. q: str
  24. targets: str
  25. _sort: str
  26. fields: str
  27. _limit: int
  28. jsonFilter: str
  29. class VideoResult (TypedDict):
  30. contentId: str
  31. title: str
  32. tags: str
  33. description: str | None
  34. viewCounter: int
  35. startTime: str
  36. class CommentResult (TypedDict):
  37. id: str
  38. no: int
  39. vposMs: int
  40. body: str
  41. commands: list[str]
  42. userId: str
  43. isPremium: bool
  44. score: int
  45. postedAt: str
  46. nicoruCount: int
  47. nicoruId: Any
  48. source: str
  49. isMyPost: bool
  50. class CommentRow (TypedDict):
  51. id: int
  52. video_id: int
  53. comment_no: int
  54. user_id: int
  55. content: str
  56. posted_at: datetime
  57. nico_count: int
  58. vpos_ms: int | None
  59. class TagRow (TypedDict):
  60. id: int
  61. name: str
  62. class UserRow (TypedDict):
  63. id: int
  64. code: str
  65. class VideoRow (TypedDict):
  66. id: int
  67. code: str
  68. title: str
  69. description: str
  70. uploaded_at: datetime
  71. deleted_at: datetime | None
  72. class VideoHistoryRow (TypedDict):
  73. id: int
  74. video_id: int
  75. fetched_at: date
  76. views_count: int
  77. class VideoTagRow (TypedDict):
  78. id: int
  79. video_id: int
  80. tag_id: int
  81. tagged_at: date
  82. untagged_at: date | None
  83. def main (
  84. ) -> None:
  85. conn = mysql.connector.connect (user = os.environ['MYSQL_USER'],
  86. password = os.environ['MYSQL_PASS'],
  87. database = 'nizika_nico')
  88. if not isinstance (conn, MySQLConnectionAbstract):
  89. raise TypeError
  90. now = datetime.now ()
  91. video_dao = VideoDao (conn)
  92. tag_dao = TagDao (conn)
  93. video_tag_dao = VideoTagDao (conn)
  94. video_history_dao = VideoHistoryDao (conn)
  95. comment_dao = CommentDao (conn)
  96. user_dao = UserDao (conn)
  97. api_data = search_nico_by_tags (['伊地知ニジカ', 'ぼざろクリーチャーシリーズ'])
  98. update_tables (video_dao, tag_dao, video_tag_dao, video_history_dao, comment_dao, user_dao,
  99. api_data, now)
  100. conn.commit ()
  101. conn.close ()
  102. def update_tables (
  103. video_dao: VideoDao,
  104. tag_dao: TagDao,
  105. video_tag_dao: VideoTagDao,
  106. video_history_dao: VideoHistoryDao,
  107. comment_dao: CommentDao,
  108. user_dao: UserDao,
  109. api_data: list[VideoResult],
  110. now: datetime,
  111. ) -> None:
  112. for datum in api_data:
  113. tag_names: list[str] = datum['tags'].split ()
  114. video = VideoDto (code = datum['contentId'],
  115. title = datum['title'],
  116. description = datum['description'] or '',
  117. uploaded_at = datetime.fromisoformat (datum['startTime']))
  118. video_dao.upsert (video, False)
  119. if video.id_ is not None:
  120. video_history = VideoHistoryDto (video_id = video.id_,
  121. fetched_at = now,
  122. views_count = datum['viewCounter'])
  123. video_history_dao.insert (video_history)
  124. tag_ids: list[int] = []
  125. video_tags = video_tag_dao.fetch_alive_by_video_id (video.id_, False)
  126. for vt in video_tags:
  127. tag = tag_dao.find (vt.tag_id)
  128. if (tag is not None
  129. and (tag.name.upper () not in map (str.upper, tag_names))
  130. and (tag.id_ is not None)):
  131. tag_ids.append (tag.id_)
  132. video_tag_dao.untag_all (video.id_, tag_ids, now)
  133. tags: list[TagDto] = []
  134. for tag_name in tag_names:
  135. tag = tag_dao.fetch_by_name (tag_name)
  136. if tag is None:
  137. tag = TagDto (name = tag_name)
  138. tag_dao.insert (tag)
  139. if video.id_ is not None and tag.id_ is not None:
  140. video_tag = video_tag_dao.fetch_alive_by_ids (video.id_, tag.id_, False)
  141. if video_tag is None:
  142. video_tag = VideoTagDto (video_id = video.id_,
  143. tag_id = tag.id_,
  144. tagged_at = now)
  145. video_tag_dao.insert (video_tag, False)
  146. for com in fetch_comments (video.code):
  147. user = user_dao.fetch_by_code (com['userId'])
  148. if user is None:
  149. user = UserDto (code = com['userId'])
  150. user_dao.insert (user)
  151. if video.id_ is not None and user.id_ is not None:
  152. comment = CommentDto (video_id = video.id_,
  153. comment_no = com['no'],
  154. user_id = user.id_,
  155. content = com['body'],
  156. posted_at = datetime.fromisoformat (com['postedAt']),
  157. nico_count = com['nicoruCount'],
  158. vpos_ms = com['vposMs'])
  159. comment_dao.upsert (comment, False)
  160. alive_video_codes = [d['contentId'] for d in api_data]
  161. lost_video_ids: list[int] = []
  162. videos = video_dao.fetch_alive ()
  163. for video in videos:
  164. if video.id_ is not None and video.code not in alive_video_codes:
  165. lost_video_ids.append (video.id_)
  166. video_dao.delete (lost_video_ids, now)
  167. def fetch_comments (
  168. video_code: str,
  169. ) -> list[CommentResult]:
  170. time.sleep (1.2)
  171. headers = { 'X-Frontend-Id': '6',
  172. 'X-Frontend-Version': '0' }
  173. action_track_id = (
  174. ''.join (random.choice (string.ascii_letters + string.digits)
  175. for _ in range (10))
  176. + '_'
  177. + str (random.randrange (10 ** 12, 10 ** 13)))
  178. url = (f"https://www.nicovideo.jp/api/watch/v3_guest/{ video_code }"
  179. + f"?actionTrackId={ action_track_id }")
  180. res = requests.post (url, headers = headers, timeout = 60).json ()
  181. try:
  182. nv_comment = res['data']['comment']['nvComment']
  183. except KeyError:
  184. return []
  185. if nv_comment is None:
  186. return []
  187. headers = { 'X-Frontend-Id': '6',
  188. 'X-Frontend-Version': '0',
  189. 'Content-Type': 'application/json' }
  190. params = { 'params': nv_comment['params'],
  191. 'additionals': { },
  192. 'threadKey': nv_comment['threadKey'] }
  193. url = nv_comment['server'] + '/v1/threads'
  194. res = (requests.post (url, json.dumps (params),
  195. headers = headers,
  196. timeout = 60)
  197. .json ())
  198. try:
  199. return res['data']['threads'][1]['comments']
  200. except (IndexError, KeyError):
  201. return []
  202. def search_nico_by_tag (
  203. tag: str,
  204. ) -> list[VideoResult]:
  205. return search_nico_by_tags ([tag])
  206. def search_nico_by_tags (
  207. tags: list[str],
  208. ) -> list[VideoResult]:
  209. today = datetime.now ()
  210. url = ('https://snapshot.search.nicovideo.jp'
  211. + '/api/v2/snapshot/video/contents/search')
  212. result_data: list[VideoResult] = []
  213. to = datetime (2022, 12, 3)
  214. while to <= today:
  215. time.sleep (1.2)
  216. until = to + timedelta (days = 14)
  217. query_filter = json.dumps ({ 'type': 'or',
  218. 'filters': [
  219. { 'type': 'range',
  220. 'field': 'startTime',
  221. 'from': '%04d-%02d-%02dT00:00:00+09:00' % (to.year, to.month, to.day),
  222. 'to': '%04d-%02d-%02dT23:59:59+09:00' % (until.year, until.month, until.day),
  223. 'include_lower': True }] })
  224. params: VideoSearchParam = { 'q': ' OR '.join (tags),
  225. 'targets': 'tagsExact',
  226. '_sort': '-viewCounter',
  227. 'fields': ('contentId,'
  228. 'title,'
  229. 'tags,'
  230. 'description,'
  231. 'viewCounter,'
  232. 'startTime'),
  233. '_limit': 100,
  234. 'jsonFilter': query_filter }
  235. res = requests.get (url, params = cast (dict[str, int | str], params), timeout = 60).json ()
  236. try:
  237. result_data += res['data']
  238. except KeyError:
  239. pass
  240. to = until + timedelta (days = 1)
  241. return result_data
  242. class VideoDao:
  243. def __init__ (
  244. self,
  245. conn: MySQLConnectionAbstract,
  246. ):
  247. self.conn = conn
  248. def find (
  249. self,
  250. video_id: int,
  251. with_relation_tables: bool,
  252. ) -> VideoDto | None:
  253. with self.conn.cursor (dictionary = True) as c:
  254. c.execute ("""
  255. SELECT
  256. id,
  257. code,
  258. title,
  259. description,
  260. uploaded_at,
  261. deleted_at
  262. FROM
  263. videos
  264. WHERE
  265. id = %s
  266. ORDER BY
  267. id""", (video_id,))
  268. print (c._executed)
  269. row = cast (VideoRow | None, c.fetchone ())
  270. if row is None:
  271. return None
  272. return self._create_dto_from_row (row, with_relation_tables)
  273. def fetch_all (
  274. self,
  275. with_relation_tables: bool,
  276. ) -> list[VideoDto]:
  277. with self.conn.cursor (dictionary = True) as c:
  278. c.execute ("""
  279. SELECT
  280. id,
  281. code,
  282. title,
  283. description,
  284. uploaded_at,
  285. deleted_at
  286. FROM
  287. videos
  288. ORDER BY
  289. id""")
  290. print (c._executed)
  291. videos: list[VideoDto] = []
  292. for row in cast (list[VideoRow], c.fetchall ()):
  293. videos.append (self._create_dto_from_row (row, with_relation_tables))
  294. return videos
  295. def fetch_alive (
  296. self,
  297. ) -> list[VideoDto]:
  298. with self.conn.cursor (dictionary = True) as c:
  299. c.execute ("""
  300. SELECT
  301. id,
  302. code,
  303. title,
  304. description,
  305. uploaded_at,
  306. deleted_at
  307. FROM
  308. videos
  309. WHERE
  310. deleted_at IS NULL""")
  311. print (c._executed)
  312. videos: list[VideoDto] = []
  313. for row in cast (list[VideoRow], c.fetchall ()):
  314. videos.append (self._create_dto_from_row (row, False))
  315. return videos
  316. def upsert (
  317. self,
  318. video: VideoDto,
  319. with_relation_tables: bool,
  320. ) -> None:
  321. deleted_at: datetime | DbNullType | None = video.deleted_at
  322. if deleted_at is None:
  323. raise TypeError ('未実装')
  324. if deleted_at is DbNull:
  325. deleted_at = None
  326. deleted_at = cast (datetime | None, deleted_at)
  327. with self.conn.cursor (dictionary = True) as c:
  328. c.execute ("""
  329. INSERT INTO
  330. videos(
  331. code,
  332. title,
  333. description,
  334. uploaded_at,
  335. deleted_at)
  336. VALUES
  337. (
  338. %s,
  339. %s,
  340. %s,
  341. %s,
  342. %s)
  343. ON DUPLICATE KEY UPDATE
  344. id = LAST_INSERT_ID(id),
  345. code = VALUES(code),
  346. title = VALUES(title),
  347. description = VALUES(description),
  348. uploaded_at = VALUES(uploaded_at),
  349. deleted_at = VALUES(deleted_at)""", (video.code,
  350. video.title,
  351. video.description,
  352. video.uploaded_at,
  353. deleted_at))
  354. print (c._executed)
  355. video.id_ = c.lastrowid
  356. if with_relation_tables:
  357. if video.video_tags is not None:
  358. VideoTagDao (self.conn).upsert_all (video.video_tags, False)
  359. if video.comments is not None:
  360. CommentDao (self.conn).upsert_all (video.comments, False)
  361. if video.video_histories is not None:
  362. VideoHistoryDao (self.conn).upsert_all (video.video_histories)
  363. def upsert_all (
  364. self,
  365. videos: list[VideoDto],
  366. with_relation_tables: bool,
  367. ) -> None:
  368. for video in videos:
  369. self.upsert (video, with_relation_tables)
  370. def delete (
  371. self,
  372. video_ids: list[int],
  373. at: datetime,
  374. ) -> None:
  375. if not video_ids:
  376. return
  377. with self.conn.cursor (dictionary = True) as c:
  378. c.execute ("""
  379. UPDATE
  380. videos
  381. SET
  382. deleted_at = %%s
  383. WHERE
  384. id IN (%s)""" % ', '.join (['%s'] * len (video_ids)), (at, *video_ids))
  385. print (c._executed)
  386. def _create_dto_from_row (
  387. self,
  388. row: VideoRow,
  389. with_relation_tables: bool,
  390. ) -> VideoDto:
  391. video = VideoDto (id_ = row['id'],
  392. code = row['code'],
  393. title = row['title'],
  394. description = row['description'],
  395. uploaded_at = row['uploaded_at'],
  396. deleted_at = row['deleted_at'] or DbNull)
  397. if with_relation_tables and video.id_ is not None:
  398. video.video_tags = VideoTagDao (self.conn).fetch_by_video_id (video.id_, False)
  399. for i in range (len (video.video_tags)):
  400. video.video_tags[i].video = video
  401. video.comments = CommentDao (self.conn).fetch_by_video_id (video.id_, False)
  402. for i in range (len (video.comments)):
  403. video.comments[i].video = video
  404. video.video_histories = VideoHistoryDao (self.conn).fetch_by_video_id (video.id_, False)
  405. for i in range (len (video.video_histories)):
  406. video.video_histories[i].video = video
  407. return video
  408. @dataclass (slots = True)
  409. class VideoDto:
  410. code: str
  411. title: str
  412. description: str
  413. uploaded_at: datetime
  414. id_: int | None = None
  415. deleted_at: datetime | DbNullType = DbNull
  416. video_tags: list[VideoTagDto] | None = None
  417. comments: list[CommentDto] | None = None
  418. video_histories: list[VideoHistoryDto] | None = None
  419. class VideoTagDao:
  420. def __init__ (
  421. self,
  422. conn: MySQLConnectionAbstract,
  423. ):
  424. self.conn = conn
  425. def fetch_by_video_id (
  426. self,
  427. video_id: int,
  428. with_relation_tables: bool,
  429. ) -> list[VideoTagDto]:
  430. with self.conn.cursor (dictionary = True) as c:
  431. c.execute ("""
  432. SELECT
  433. id,
  434. video_id,
  435. tag_id,
  436. tagged_at,
  437. untagged_at
  438. FROM
  439. video_tags
  440. WHERE
  441. video_id = %s
  442. ORDER BY
  443. id""", (video_id,))
  444. print (c._executed)
  445. video_tags: list[VideoTagDto] = []
  446. for row in cast (list[VideoTagRow], c.fetchall ()):
  447. video_tags.append (self._create_dto_from_row (row, with_relation_tables))
  448. return video_tags
  449. def fetch_alive_by_video_id (
  450. self,
  451. video_id: int,
  452. with_relation_tables: bool,
  453. ) -> list[VideoTagDto]:
  454. with self.conn.cursor (dictionary = True) as c:
  455. c.execute ("""
  456. SELECT
  457. id,
  458. video_id,
  459. tag_id,
  460. tagged_at,
  461. untagged_at
  462. FROM
  463. video_tags
  464. WHERE
  465. video_id = %s
  466. AND untagged_at IS NULL
  467. ORDER BY
  468. id""", (video_id,))
  469. print (c._executed)
  470. video_tags: list[VideoTagDto] = []
  471. for row in cast (list[VideoTagRow], c.fetchall ()):
  472. video_tags.append (self._create_dto_from_row (row, with_relation_tables))
  473. return video_tags
  474. def fetch_alive_by_ids (
  475. self,
  476. video_id: int,
  477. tag_id: int,
  478. with_relation_tables: bool,
  479. ) -> VideoTagDto | None:
  480. with self.conn.cursor (dictionary = True) as c:
  481. c.execute ("""
  482. SELECT
  483. id,
  484. video_id,
  485. tag_id,
  486. tagged_at,
  487. untagged_at
  488. FROM
  489. video_tags
  490. WHERE
  491. video_id = %s
  492. AND tag_id = %s
  493. AND untagged_at IS NULL""", (video_id, tag_id))
  494. print (c._executed)
  495. row = cast (VideoTagRow, c.fetchone ())
  496. if row is None:
  497. return None
  498. return self._create_dto_from_row (row, with_relation_tables)
  499. def insert (
  500. self,
  501. video_tag: VideoTagDto,
  502. with_relation_tables: bool,
  503. ) -> None:
  504. untagged_at: date | DbNullType | None = video_tag.untagged_at
  505. if untagged_at is None:
  506. raise TypeError ('未実装')
  507. if untagged_at is DbNull:
  508. untagged_at = None
  509. untagged_at = cast (date | None, untagged_at)
  510. with self.conn.cursor (dictionary = True) as c:
  511. c.execute ("""
  512. INSERT INTO
  513. video_tags(
  514. video_id,
  515. tag_id,
  516. tagged_at,
  517. untagged_at)
  518. VALUES
  519. (
  520. %s,
  521. %s,
  522. %s,
  523. %s)""", (video_tag.video_id, video_tag.tag_id,
  524. video_tag.tagged_at, untagged_at))
  525. print (c._executed)
  526. video_tag.id_ = c.lastrowid
  527. if with_relation_tables:
  528. if video_tag.video is not None:
  529. VideoDao (self.conn).upsert (video_tag.video, True)
  530. if video_tag.tag is not None:
  531. TagDao (self.conn).upsert (video_tag.tag)
  532. def upsert (
  533. self,
  534. video_tag: VideoTagDto,
  535. with_relation_tables: bool,
  536. ) -> None:
  537. untagged_at: date | DbNullType | None = video_tag.untagged_at
  538. if untagged_at is None:
  539. raise TypeError ('未実装')
  540. if untagged_at is DbNull:
  541. untagged_at = None
  542. untagged_at = cast (date | None, untagged_at)
  543. with self.conn.cursor (dictionary = True) as c:
  544. c.execute ("""
  545. INSERT INTO
  546. video_tags(
  547. video_id,
  548. tag_id,
  549. tagged_at,
  550. untagged_at)
  551. VALUES
  552. (
  553. %s,
  554. %s,
  555. %s,
  556. %s)
  557. ON DUPLICATE KEY UPDATE
  558. id = LAST_INSERT_ID(id),
  559. video_id = VALUES(video_id),
  560. tag_id = VALUES(tag_id),
  561. tagged_at = VALUES(tagged_at),
  562. untagged_at = VALUES(untagged_at)""", (video_tag.video_id,
  563. video_tag.tag_id,
  564. video_tag.tagged_at,
  565. untagged_at))
  566. print (c._executed)
  567. video_tag.id_ = c.lastrowid
  568. if with_relation_tables:
  569. if video_tag.video is not None:
  570. VideoDao (self.conn).upsert (video_tag.video, True)
  571. if video_tag.tag is not None:
  572. TagDao (self.conn).upsert (video_tag.tag)
  573. def upsert_all (
  574. self,
  575. video_tags: list[VideoTagDto],
  576. with_relation_tables: bool,
  577. ) -> None:
  578. for video_tag in video_tags:
  579. self.upsert (video_tag, with_relation_tables)
  580. def untag_all (
  581. self,
  582. video_id: int,
  583. tag_ids: list[int],
  584. now: datetime,
  585. ) -> None:
  586. if not tag_ids:
  587. return
  588. with self.conn.cursor (dictionary = True) as c:
  589. c.execute ("""
  590. UPDATE
  591. video_tags
  592. SET
  593. untagged_at = %%s
  594. WHERE
  595. video_id = %%s
  596. AND tag_id IN (%s)""" % ', '.join (['%s'] * len (tag_ids)), (now, video_id, *tag_ids))
  597. print (c._executed)
  598. def _create_dto_from_row (
  599. self,
  600. row: VideoTagRow,
  601. with_relation_tables: bool,
  602. ) -> VideoTagDto:
  603. video_tag = VideoTagDto (id_ = row['id'],
  604. video_id = row['video_id'],
  605. tag_id = row['tag_id'],
  606. tagged_at = row['tagged_at'],
  607. untagged_at = row['untagged_at'] or DbNull)
  608. if with_relation_tables:
  609. video_tag.video = VideoDao (self.conn).find (video_tag.video_id, True)
  610. video_tag.tag = TagDao (self.conn).find (video_tag.tag_id)
  611. return video_tag
  612. @dataclass (slots = True)
  613. class VideoTagDto:
  614. video_id: int
  615. tag_id: int
  616. tagged_at: date
  617. id_: int | None = None
  618. untagged_at: date | DbNullType = DbNull
  619. video: VideoDto | None = None
  620. tag: TagDto | None = None
  621. class TagDao:
  622. def __init__ (
  623. self,
  624. conn: MySQLConnectionAbstract,
  625. ):
  626. self.conn = conn
  627. def find (
  628. self,
  629. tag_id: int,
  630. ) -> TagDto | None:
  631. with self.conn.cursor (dictionary = True) as c:
  632. c.execute ("""
  633. SELECT
  634. id,
  635. name
  636. FROM
  637. tags
  638. WHERE
  639. id = %s""", (tag_id,))
  640. print (c._executed)
  641. row = cast (TagRow | None, c.fetchone ())
  642. if row is None:
  643. return None
  644. return self._create_dto_from_row (row)
  645. def fetch_by_name (
  646. self,
  647. tag_name: str,
  648. ) -> TagDto | None:
  649. with self.conn.cursor (dictionary = True) as c:
  650. c.execute ("""
  651. SELECT
  652. id,
  653. name
  654. FROM
  655. tags
  656. WHERE
  657. name = %s""", (tag_name,))
  658. print (c._executed)
  659. row = cast (TagRow | None, c.fetchone ())
  660. if row is None:
  661. return None
  662. return self._create_dto_from_row (row)
  663. def insert (
  664. self,
  665. tag: TagDto,
  666. ) -> None:
  667. with self.conn.cursor (dictionary = True) as c:
  668. c.execute ("""
  669. INSERT INTO
  670. tags(name)
  671. VALUES
  672. (%s)""", (tag.name,))
  673. print (c._executed)
  674. tag.id_ = c.lastrowid
  675. def upsert (
  676. self,
  677. tag: TagDto,
  678. ) -> None:
  679. with self.conn.cursor (dictionary = True) as c:
  680. c.execute ("""
  681. INSERT INTO
  682. tags(name)
  683. VALUES
  684. (%s)
  685. ON DUPLICATE KEY UPDATE
  686. id = LAST_INSERT_ID(id),
  687. name = VALUES(name)""", (tag.name,))
  688. print (c._executed)
  689. tag.id_ = c.lastrowid
  690. def _create_dto_from_row (
  691. self,
  692. row: TagRow,
  693. ) -> TagDto:
  694. return TagDto (id_ = row['id'],
  695. name = row['name'])
  696. @dataclass (slots = True)
  697. class TagDto:
  698. name: str
  699. id_: int | None = None
  700. class VideoHistoryDao:
  701. def __init__ (
  702. self,
  703. conn: MySQLConnectionAbstract,
  704. ):
  705. self.conn = conn
  706. def fetch_by_video_id (
  707. self,
  708. video_id: int,
  709. with_relation_tables: bool,
  710. ) -> list[VideoHistoryDto]:
  711. with self.conn.cursor (dictionary = True) as c:
  712. c.execute ("""
  713. SELECT
  714. id,
  715. video_id,
  716. fetched_at,
  717. views_count
  718. FROM
  719. video_histories
  720. WHERE
  721. video_id = %s""", (video_id,))
  722. print (c._executed)
  723. video_histories: list[VideoHistoryDto] = []
  724. for row in cast (list[VideoHistoryRow], c.fetchall ()):
  725. video_histories.append (self._create_dto_from_row (row, with_relation_tables))
  726. return video_histories
  727. def insert (
  728. self,
  729. video_history: VideoHistoryDto,
  730. ) -> None:
  731. with self.conn.cursor (dictionary = True) as c:
  732. c.execute ("""
  733. INSERT INTO
  734. video_histories(
  735. video_id,
  736. fetched_at,
  737. views_count)
  738. VALUES
  739. (
  740. %s,
  741. %s,
  742. %s)""", (video_history.video_id,
  743. video_history.fetched_at,
  744. video_history.views_count))
  745. print (c._executed)
  746. def upsert (
  747. self,
  748. video_history: VideoHistoryDto,
  749. ) -> None:
  750. with self.conn.cursor (dictionary = True) as c:
  751. c.execute ("""
  752. INSERT INTO
  753. video_histories(
  754. video_id,
  755. fetched_at,
  756. views_count)
  757. VALUES
  758. (
  759. %s,
  760. %s,
  761. %s)
  762. ON DUPLICATE KEY UPDATE
  763. id = LAST_INSERT_ID(id),
  764. video_id = VALUES(video_id),
  765. fetched_at = VALUES(fetched_at),
  766. views_count = VALUES(views_count)""", (video_history.video_id,
  767. video_history.fetched_at,
  768. video_history.views_count))
  769. print (c._executed)
  770. def upsert_all (
  771. self,
  772. video_histories: list[VideoHistoryDto],
  773. ) -> None:
  774. for video_history in video_histories:
  775. self.upsert (video_history)
  776. def _create_dto_from_row (
  777. self,
  778. row: VideoHistoryRow,
  779. with_relation_tables: bool,
  780. ) -> VideoHistoryDto:
  781. video_history = VideoHistoryDto (id_ = row['id'],
  782. video_id = row['video_id'],
  783. fetched_at = row['fetched_at'],
  784. views_count = row['views_count'])
  785. if with_relation_tables:
  786. video_history.video = VideoDao (self.conn).find (video_history.video_id, True)
  787. return video_history
  788. @dataclass (slots = True)
  789. class VideoHistoryDto:
  790. video_id: int
  791. fetched_at: date
  792. views_count: int
  793. id_: int | None = None
  794. video: VideoDto | None = None
  795. class CommentDao:
  796. def __init__ (
  797. self,
  798. conn: MySQLConnectionAbstract,
  799. ):
  800. self.conn = conn
  801. def fetch_by_video_id (
  802. self,
  803. video_id: int,
  804. with_relation_tables: bool,
  805. ) -> list[CommentDto]:
  806. with self.conn.cursor (dictionary = True) as c:
  807. c.execute ("""
  808. SELECT
  809. id,
  810. video_id,
  811. comment_no,
  812. user_id,
  813. content,
  814. posted_at,
  815. nico_count,
  816. vpos_ms
  817. FROM
  818. comments
  819. WHERE
  820. video_id = %s""", (video_id,))
  821. print (c._executed)
  822. comments: list[CommentDto] = []
  823. for row in cast (list[CommentRow], c.fetchall ()):
  824. comments.append (self._create_dto_from_row (row, with_relation_tables))
  825. return comments
  826. def upsert (
  827. self,
  828. comment: CommentDto,
  829. with_relation_tables: bool,
  830. ) -> None:
  831. vpos_ms: int | DbNullType | None = comment.vpos_ms
  832. if vpos_ms is None:
  833. raise TypeError ('未実装')
  834. if vpos_ms is DbNull:
  835. vpos_ms = None
  836. vpos_ms = cast (int | None, vpos_ms)
  837. with self.conn.cursor (dictionary = True) as c:
  838. c.execute ("""
  839. INSERT INTO
  840. comments(
  841. video_id,
  842. comment_no,
  843. user_id,
  844. content,
  845. posted_at,
  846. nico_count,
  847. vpos_ms)
  848. VALUES
  849. (
  850. %s,
  851. %s,
  852. %s,
  853. %s,
  854. %s,
  855. %s,
  856. %s)
  857. ON DUPLICATE KEY UPDATE
  858. id = LAST_INSERT_ID(id),
  859. video_id = VALUES(video_id),
  860. comment_no = VALUES(comment_no),
  861. user_id = VALUES(user_id),
  862. content = VALUES(content),
  863. posted_at = VALUES(posted_at),
  864. nico_count = VALUES(nico_count),
  865. vpos_ms = VALUES(vpos_ms)""", (comment.video_id,
  866. comment.comment_no,
  867. comment.user_id,
  868. comment.content,
  869. comment.posted_at,
  870. comment.nico_count,
  871. vpos_ms))
  872. print (c._executed)
  873. def upsert_all (
  874. self,
  875. comments: list[CommentDto],
  876. with_relation_tables: bool,
  877. ) -> None:
  878. for comment in comments:
  879. self.upsert (comment, with_relation_tables)
  880. def _create_dto_from_row (
  881. self,
  882. row: CommentRow,
  883. with_relation_tables: bool,
  884. ) -> CommentDto:
  885. comment = CommentDto (id_ = row['id'],
  886. video_id = row['video_id'],
  887. comment_no = row['comment_no'],
  888. user_id = row['user_id'],
  889. content = row['content'],
  890. posted_at = row['posted_at'],
  891. nico_count = row['nico_count'],
  892. vpos_ms = row['vpos_ms'] or DbNull)
  893. if with_relation_tables:
  894. comment.video = VideoDao (self.conn).find (comment.video_id, True)
  895. return comment
  896. @dataclass (slots = True)
  897. class CommentDto:
  898. video_id: int
  899. comment_no: int
  900. user_id: int
  901. content: str
  902. posted_at: datetime
  903. id_: int | None = None
  904. nico_count: int = 0
  905. vpos_ms: int | DbNullType = DbNull
  906. video: VideoDto | None = None
  907. user: UserDto | None = None
  908. class UserDao:
  909. def __init__ (
  910. self,
  911. conn: MySQLConnectionAbstract,
  912. ):
  913. self.conn = conn
  914. def fetch_by_code (
  915. self,
  916. user_code: str
  917. ) -> UserDto | None:
  918. with self.conn.cursor (dictionary = True) as c:
  919. c.execute ("""
  920. SELECT
  921. id,
  922. code
  923. FROM
  924. users
  925. WHERE
  926. code = %s""", (user_code,))
  927. print (c._executed)
  928. row = cast (UserRow | None, c.fetchone ())
  929. if row is None:
  930. return None
  931. return self._create_dto_from_row (row)
  932. def insert (
  933. self,
  934. user: UserDto,
  935. ) -> None:
  936. with self.conn.cursor (dictionary = True) as c:
  937. c.execute ("""
  938. INSERT INTO
  939. users(code)
  940. VALUES
  941. (%s)""", (user.code,))
  942. print (c.execute)
  943. user.id_ = c.lastrowid
  944. def _create_dto_from_row (
  945. self,
  946. row: UserRow,
  947. ) -> UserDto:
  948. return UserDto (id_ = row['id'],
  949. code = row['code'])
  950. @dataclass (slots = True)
  951. class UserDto:
  952. code: str
  953. id_: int | None = None
  954. if __name__ == '__main__':
  955. main ()