ぼざろクリーチャーシリーズ DB 兼 API(自分用)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

1093 lines
35 KiB

  1. """
  2. 日次で実行し,ぼざクリ DB を最新に更新する.
  3. """
  4. from __future__ import annotations
  5. import json
  6. import os
  7. import random
  8. import string
  9. import time
  10. from dataclasses import dataclass
  11. from datetime import date, datetime, timedelta
  12. from typing import Any, Type, TypedDict, cast
  13. import requests
  14. from eloquent import DatabaseManager, Model
  15. from eloquent.orm.relations.dynamic_property import DynamicProperty
  16. config: dict[str, DbConfig] = { 'mysql': { 'driver': 'mysql',
  17. 'host': 'localhost',
  18. 'database': 'nizika_nico',
  19. 'user': os.environ['MYSQL_USER'],
  20. 'password': os.environ['MYSQL_PASS'],
  21. 'prefix': '' } }
  22. db = DatabaseManager (config)
  23. Model.set_connection_resolver (db)
  24. class VideoSearchParam (TypedDict):
  25. q: str
  26. targets: str
  27. _sort: str
  28. fields: str
  29. _limit: int
  30. jsonFilter: str
  31. class VideoResult (TypedDict):
  32. contentId: str
  33. title: str
  34. tags: str
  35. description: str | None
  36. viewCounter: int
  37. startTime: str
  38. class CommentResult (TypedDict):
  39. id: str
  40. no: int
  41. vposMs: int
  42. body: str
  43. commands: list[str]
  44. userId: str
  45. isPremium: bool
  46. score: int
  47. postedAt: str
  48. nicoruCount: int
  49. nicoruId: Any
  50. source: str
  51. isMyPost: bool
  52. class CommentRow (TypedDict):
  53. id: int
  54. video_id: int
  55. comment_no: int
  56. user_id: int
  57. content: str
  58. posted_at: datetime
  59. nico_count: int
  60. vpos_ms: int | None
  61. class TagRow (TypedDict):
  62. id: int
  63. name: str
  64. class UserRow (TypedDict):
  65. id: int
  66. code: str
  67. class VideoRow (TypedDict):
  68. id: int
  69. code: str
  70. title: str
  71. description: str
  72. uploaded_at: datetime
  73. deleted_at: datetime | None
  74. class VideoHistoryRow (TypedDict):
  75. id: int
  76. video_id: int
  77. fetched_at: date
  78. views_count: int
  79. class VideoTagRow (TypedDict):
  80. id: int
  81. video_id: int
  82. tag_id: int
  83. tagged_at: date
  84. untagged_at: date | None
  85. class DbConfig (TypedDict):
  86. driver: str
  87. host: str
  88. database: str
  89. user: str
  90. password: str
  91. prefix: str
  92. def main (
  93. ) -> None:
  94. api_data = search_nico_by_tags (['伊地知ニジカ', 'ぼざろクリーチャーシリーズ'])
  95. update_tables (api_data, now)
  96. conn.commit ()
  97. conn.close ()
  98. class Comment (Model):
  99. __timestamps__ = False
  100. @property
  101. def video (
  102. self,
  103. ) -> DynamicProperty:
  104. return self.belongs_to (Video)
  105. @property
  106. def user (
  107. self,
  108. ) -> DynamicProperty:
  109. return self.belongs_to (User)
  110. class Tag (Model):
  111. __timestamps__ = False
  112. @property
  113. def video_tags (
  114. self,
  115. ) -> DynamicProperty:
  116. return self.has_many (VideoTag)
  117. @property
  118. def videos (
  119. self,
  120. ) -> DynamicProperty:
  121. return self.belongs_to_many (Video)
  122. class User (Model):
  123. __timestamps__ = False
  124. @property
  125. del comments (
  126. self,
  127. ) -> DynamicProperty:
  128. return self.has_many (Comment)
  129. class Video (Model):
  130. __timestamps__ = False
  131. @property
  132. def video_histories (
  133. self,
  134. ) -> DynamicProperty:
  135. return self.has_many (VideoHistory)
  136. @property
  137. def video_tags (
  138. self,
  139. ) -> DynamicProperty:
  140. return self.has_many (VideoTag)
  141. @property
  142. def tags (
  143. self,
  144. ) -> DynamicProperty:
  145. return self.belongs_to_many (Tag)
  146. @property
  147. def comments (
  148. self,
  149. ) -> DynamicProperty:
  150. return self.has_many (Comment)
  151. def upsert (
  152. self,
  153. ) -> None:
  154. row = Video.where ('code', self.code).first ()
  155. if row is not None:
  156. self.id = row.id
  157. self.save ()
  158. class VideoHistory (Model):
  159. __timestamps__ = False
  160. @property
  161. def video (
  162. self,
  163. ) -> DynamicProperty:
  164. return self.belongs_to (Video)
  165. def upsert (
  166. self,
  167. ) -> None:
  168. row = (Video
  169. .where ('video_id', self.video_id)
  170. .where ('fetched_at', self.fetched_at)
  171. .first ())
  172. if row is not None:
  173. self.id = row.id
  174. self.save ()
  175. class VideoTag (Model):
  176. __timestamps__ = False
  177. @property
  178. def video (
  179. self,
  180. ) -> DynamicProperty:
  181. return self.belongs_to (Video)
  182. @property
  183. def tag (
  184. self,
  185. ) -> DynamicProperty:
  186. return self.belongs_to (Tag)
  187. def upsert (
  188. self,
  189. ) -> None:
  190. row = (Video
  191. .where ('video_id', self.video_id)
  192. .where ('tag_id', self.tag_id)
  193. .first ())
  194. if row is not None:
  195. self.id = row.id
  196. self.save ()
  197. def update_tables (
  198. api_data: list[VideoResult],
  199. now: datetime,
  200. ) -> None:
  201. video_ids: list[int] = []
  202. for datum in api_data:
  203. tag_names: list[str] = datum['tags'].split ()
  204. video = Video ()
  205. video.code = datum['contentId'],
  206. video.title = datum['title'],
  207. video.description = datum['description'] or '',
  208. video.uploaded_at = datetime.fromisoformat (datum['startTime'])
  209. video.upsert ()
  210. if video.id is not None:
  211. video_ids.append (video.id_)
  212. video_history = VideoHistoryDto (video_id = video.id_,
  213. fetched_at = now,
  214. views_count = datum['viewCounter'])
  215. video_history_dao.insert (video_history)
  216. tag_ids: list[int] = []
  217. video_tags = video_tag_dao.fetch_alive_by_video_id (video.id_, False)
  218. for vt in video_tags:
  219. tag = tag_dao.find (vt.tag_id)
  220. if (tag is not None
  221. and (tag.name not in tag_names)
  222. and (tag.id_ is not None)):
  223. tag_ids.append (tag.id_)
  224. video_tag_dao.untag_all (video.id_, tag_ids, now)
  225. tags: list[TagDto] = []
  226. for tag_name in tag_names:
  227. tag = tag_dao.fetch_by_name (tag_name)
  228. if tag is None:
  229. tag = TagDto (name = tag_name)
  230. tag_dao.insert (tag)
  231. if video.id_ is not None and tag.id_ is not None:
  232. video_tag = video_tag_dao.fetch_alive_by_ids (video.id_, tag.id_, False)
  233. if video_tag is None:
  234. video_tag = VideoTagDto (video_id = video.id_,
  235. tag_id = tag.id_,
  236. tagged_at = now)
  237. video_tag_dao.insert (video_tag, False)
  238. for com in fetch_comments (video.code):
  239. user = user_dao.fetch_by_code (com['userId'])
  240. if user is None:
  241. user = UserDto (code = com['userId'])
  242. user_dao.insert (user)
  243. if video.id_ is not None and user.id_ is not None:
  244. comment = CommentDto (video_id = video.id_,
  245. comment_no = com['no'],
  246. user_id = user.id_,
  247. content = com['body'],
  248. posted_at = datetime.fromisoformat (com['postedAt']),
  249. nico_count = com['nicoruCount'],
  250. vpos_ms = com['vposMs'])
  251. comment_dao.upsert (comment, False)
  252. alive_video_codes = [d['contentId'] for d in api_data]
  253. lost_video_ids: list[int] = []
  254. videos = video_dao.fetch_alive ()
  255. for video in videos:
  256. if video.id_ is not None and video.code not in alive_video_codes:
  257. lost_video_ids.append (video.id_)
  258. video_dao.delete (lost_video_ids, now)
  259. def fetch_comments (
  260. video_code: str,
  261. ) -> list[CommentResult]:
  262. time.sleep (1.2)
  263. headers = { 'X-Frontend-Id': '6',
  264. 'X-Frontend-Version': '0' }
  265. action_track_id = (
  266. ''.join (random.choice (string.ascii_letters + string.digits)
  267. for _ in range (10))
  268. + '_'
  269. + str (random.randrange (10 ** 12, 10 ** 13)))
  270. url = (f"https://www.nicovideo.jp/api/watch/v3_guest/{ video_code }"
  271. + f"?actionTrackId={ action_track_id }")
  272. res = requests.post (url, headers = headers, timeout = 60).json ()
  273. try:
  274. nv_comment = res['data']['comment']['nvComment']
  275. except KeyError:
  276. return []
  277. if nv_comment is None:
  278. return []
  279. headers = { 'X-Frontend-Id': '6',
  280. 'X-Frontend-Version': '0',
  281. 'Content-Type': 'application/json' }
  282. params = { 'params': nv_comment['params'],
  283. 'additionals': { },
  284. 'threadKey': nv_comment['threadKey'] }
  285. url = nv_comment['server'] + '/v1/threads'
  286. res = (requests.post (url, json.dumps (params),
  287. headers = headers,
  288. timeout = 60)
  289. .json ())
  290. try:
  291. return res['data']['threads'][1]['comments']
  292. except (IndexError, KeyError):
  293. return []
  294. def search_nico_by_tag (
  295. tag: str,
  296. ) -> list[VideoResult]:
  297. return search_nico_by_tags ([tag])
  298. def search_nico_by_tags (
  299. tags: list[str],
  300. ) -> list[VideoResult]:
  301. today = datetime.now ()
  302. url = ('https://snapshot.search.nicovideo.jp'
  303. + '/api/v2/snapshot/video/contents/search')
  304. result_data: list[VideoResult] = []
  305. to = datetime (2022, 12, 3)
  306. while to <= today:
  307. time.sleep (1.2)
  308. until = to + timedelta (days = 14)
  309. query_filter = json.dumps ({ 'type': 'or',
  310. 'filters': [
  311. { 'type': 'range',
  312. 'field': 'startTime',
  313. 'from': '%04d-%02d-%02dT00:00:00+09:00' % (to.year, to.month, to.day),
  314. 'to': '%04d-%02d-%02dT23:59:59+09:00' % (until.year, until.month, until.day),
  315. 'include_lower': True }] })
  316. params: VideoSearchParam = { 'q': ' OR '.join (tags),
  317. 'targets': 'tagsExact',
  318. '_sort': '-viewCounter',
  319. 'fields': ('contentId,'
  320. 'title,'
  321. 'tags,'
  322. 'description,'
  323. 'viewCounter,'
  324. 'startTime'),
  325. '_limit': 100,
  326. 'jsonFilter': query_filter }
  327. res = requests.get (url, params = cast (dict[str, int | str], params), timeout = 60).json ()
  328. try:
  329. result_data += res['data']
  330. except KeyError:
  331. pass
  332. to = until + timedelta (days = 1)
  333. return result_data
  334. class VideoDao:
  335. def find (
  336. self,
  337. video_id: int,
  338. with_relation_tables: bool,
  339. ) -> VideoDto | None:
  340. with self.conn.cursor (dictionary = True) as c:
  341. c.execute ("""
  342. SELECT
  343. id,
  344. code,
  345. title,
  346. description,
  347. uploaded_at,
  348. deleted_at
  349. FROM
  350. videos
  351. WHERE
  352. id = %s
  353. ORDER BY
  354. id""", (video_id,))
  355. print (c._executed)
  356. row = cast (VideoRow | None, c.fetchone ())
  357. if row is None:
  358. return None
  359. return self._create_dto_from_row (row, with_relation_tables)
  360. def fetch_all (
  361. self,
  362. with_relation_tables: bool,
  363. ) -> list[VideoDto]:
  364. with self.conn.cursor (dictionary = True) as c:
  365. c.execute ("""
  366. SELECT
  367. id,
  368. code,
  369. title,
  370. description,
  371. uploaded_at,
  372. deleted_at
  373. FROM
  374. videos
  375. ORDER BY
  376. id""")
  377. print (c._executed)
  378. videos: list[VideoDto] = []
  379. for row in cast (list[VideoRow], c.fetchall ()):
  380. videos.append (self._create_dto_from_row (row, with_relation_tables))
  381. return videos
  382. def fetch_alive (
  383. self,
  384. ) -> list[VideoDto]:
  385. with self.conn.cursor (dictionary = True) as c:
  386. c.execute ("""
  387. SELECT
  388. id,
  389. code,
  390. title,
  391. description,
  392. uploaded_at,
  393. deleted_at
  394. FROM
  395. videos
  396. WHERE
  397. deleted_at IS NULL""")
  398. print (c._executed)
  399. videos: list[VideoDto] = []
  400. for row in cast (list[VideoRow], c.fetchall ()):
  401. videos.append (self._create_dto_from_row (row, False))
  402. return videos
  403. def upsert (
  404. self,
  405. video: VideoDto,
  406. with_relation_tables: bool,
  407. ) -> None:
  408. deleted_at: datetime | DbNullType | None = video.deleted_at
  409. if deleted_at is None:
  410. raise TypeError ('未実装')
  411. if deleted_at is DbNull:
  412. deleted_at = None
  413. deleted_at = cast (datetime | None, deleted_at)
  414. with self.conn.cursor (dictionary = True) as c:
  415. c.execute ("""
  416. INSERT INTO
  417. videos(
  418. code,
  419. title,
  420. description,
  421. uploaded_at,
  422. deleted_at)
  423. VALUES
  424. (
  425. %s,
  426. %s,
  427. %s,
  428. %s,
  429. %s)
  430. ON DUPLICATE KEY UPDATE
  431. id = LAST_INSERT_ID(id),
  432. code = VALUES(code),
  433. title = VALUES(title),
  434. description = VALUES(description),
  435. uploaded_at = VALUES(uploaded_at),
  436. deleted_at = VALUES(deleted_at)""", (video.code,
  437. video.title,
  438. video.description,
  439. video.uploaded_at,
  440. deleted_at))
  441. print (c._executed)
  442. video.id_ = c.lastrowid
  443. if with_relation_tables:
  444. if video.video_tags is not None:
  445. VideoTagDao (self.conn).upsert_all (video.video_tags, False)
  446. if video.comments is not None:
  447. CommentDao (self.conn).upsert_all (video.comments, False)
  448. if video.video_histories is not None:
  449. VideoHistoryDao (self.conn).upsert_all (video.video_histories)
  450. def upsert_all (
  451. self,
  452. videos: list[VideoDto],
  453. with_relation_tables: bool,
  454. ) -> None:
  455. for video in videos:
  456. self.upsert (video, with_relation_tables)
  457. def delete (
  458. self,
  459. video_ids: list[int],
  460. at: datetime,
  461. ) -> None:
  462. if not video_ids:
  463. return
  464. with self.conn.cursor (dictionary = True) as c:
  465. c.execute ("""
  466. UPDATE
  467. videos
  468. SET
  469. deleted_at = %%s
  470. WHERE
  471. id IN (%s)""" % ', '.join (['%s'] * len (video_ids)), (at, *video_ids))
  472. print (c._executed)
  473. def _create_dto_from_row (
  474. self,
  475. row: VideoRow,
  476. with_relation_tables: bool,
  477. ) -> VideoDto:
  478. video = VideoDto (id_ = row['id'],
  479. code = row['code'],
  480. title = row['title'],
  481. description = row['description'],
  482. uploaded_at = row['uploaded_at'],
  483. deleted_at = row['deleted_at'] or DbNull)
  484. if with_relation_tables and video.id_ is not None:
  485. video.video_tags = VideoTagDao (self.conn).fetch_by_video_id (video.id_, False)
  486. for i in range (len (video.video_tags)):
  487. video.video_tags[i].video = video
  488. video.comments = CommentDao (self.conn).fetch_by_video_id (video.id_, False)
  489. for i in range (len (video.comments)):
  490. video.comments[i].video = video
  491. video.video_histories = VideoHistoryDao (self.conn).fetch_by_video_id (video.id_, False)
  492. for i in range (len (video.video_histories)):
  493. video.video_histories[i].video = video
  494. return video
  495. class VideoTagDao:
  496. def fetch_by_video_id (
  497. self,
  498. video_id: int,
  499. with_relation_tables: bool,
  500. ) -> list[VideoTagDto]:
  501. with self.conn.cursor (dictionary = True) as c:
  502. c.execute ("""
  503. SELECT
  504. id,
  505. video_id,
  506. tag_id,
  507. tagged_at,
  508. untagged_at
  509. FROM
  510. video_tags
  511. WHERE
  512. video_id = %s
  513. ORDER BY
  514. id""", (video_id,))
  515. print (c._executed)
  516. video_tags: list[VideoTagDto] = []
  517. for row in cast (list[VideoTagRow], c.fetchall ()):
  518. video_tags.append (self._create_dto_from_row (row, with_relation_tables))
  519. return video_tags
  520. def fetch_alive_by_video_id (
  521. self,
  522. video_id: int,
  523. with_relation_tables: bool,
  524. ) -> list[VideoTagDto]:
  525. with self.conn.cursor (dictionary = True) as c:
  526. c.execute ("""
  527. SELECT
  528. id,
  529. video_id,
  530. tag_id,
  531. tagged_at,
  532. untagged_at
  533. FROM
  534. video_tags
  535. WHERE
  536. video_id = %s
  537. AND (untagged_at IS NULL)
  538. ORDER BY
  539. id""", (video_id,))
  540. print (c._executed)
  541. video_tags: list[VideoTagDto] = []
  542. for row in cast (list[VideoTagRow], c.fetchall ()):
  543. video_tags.append (self._create_dto_from_row (row, with_relation_tables))
  544. return video_tags
  545. def fetch_alive_by_ids (
  546. self,
  547. video_id: int,
  548. tag_id: int,
  549. with_relation_tables: bool,
  550. ) -> VideoTagDto | None:
  551. with self.conn.cursor (dictionary = True) as c:
  552. c.execute ("""
  553. SELECT
  554. id,
  555. video_id,
  556. tag_id,
  557. tagged_at,
  558. untagged_at
  559. FROM
  560. video_tags
  561. WHERE
  562. video_id = %s
  563. AND tag_id = %s""", (video_id, tag_id))
  564. print (c._executed)
  565. row = cast (VideoTagRow, c.fetchone ())
  566. if row is None:
  567. return None
  568. return self._create_dto_from_row (row, with_relation_tables)
  569. def insert (
  570. self,
  571. video_tag: VideoTagDto,
  572. with_relation_tables: bool,
  573. ) -> None:
  574. untagged_at: date | DbNullType | None = video_tag.untagged_at
  575. if untagged_at is None:
  576. raise TypeError ('未実装')
  577. if untagged_at is DbNull:
  578. untagged_at = None
  579. untagged_at = cast (date | None, untagged_at)
  580. with self.conn.cursor (dictionary = True) as c:
  581. c.execute ("""
  582. INSERT INTO
  583. video_tags(
  584. video_id,
  585. tag_id,
  586. tagged_at,
  587. untagged_at)
  588. VALUES
  589. (
  590. %s,
  591. %s,
  592. %s,
  593. %s)""", (video_tag.video_id, video_tag.tag_id,
  594. video_tag.tagged_at, untagged_at))
  595. print (c._executed)
  596. video_tag.id_ = c.lastrowid
  597. if with_relation_tables:
  598. if video_tag.video is not None:
  599. VideoDao (self.conn).upsert (video_tag.video, True)
  600. if video_tag.tag is not None:
  601. TagDao (self.conn).upsert (video_tag.tag)
  602. def upsert (
  603. self,
  604. video_tag: VideoTagDto,
  605. with_relation_tables: bool,
  606. ) -> None:
  607. untagged_at: date | DbNullType | None = video_tag.untagged_at
  608. if untagged_at is None:
  609. raise TypeError ('未実装')
  610. if untagged_at is DbNull:
  611. untagged_at = None
  612. untagged_at = cast (date | None, untagged_at)
  613. with self.conn.cursor (dictionary = True) as c:
  614. c.execute ("""
  615. INSERT INTO
  616. video_tags(
  617. video_id,
  618. tag_id,
  619. tagged_at,
  620. untagged_at)
  621. VALUES
  622. (
  623. %s,
  624. %s,
  625. %s,
  626. %s)
  627. ON DUPLICATE KEY UPDATE
  628. id = LAST_INSERT_ID(id),
  629. video_id = VALUES(video_id),
  630. tag_id = VALUES(tag_id),
  631. tagged_at = VALUES(tagged_at),
  632. untagged_at = VALUES(untagged_at)""", (video_tag.video_id,
  633. video_tag.tag_id,
  634. video_tag.tagged_at,
  635. untagged_at))
  636. print (c._executed)
  637. video_tag.id_ = c.lastrowid
  638. if with_relation_tables:
  639. if video_tag.video is not None:
  640. VideoDao (self.conn).upsert (video_tag.video, True)
  641. if video_tag.tag is not None:
  642. TagDao (self.conn).upsert (video_tag.tag)
  643. def upsert_all (
  644. self,
  645. video_tags: list[VideoTagDto],
  646. with_relation_tables: bool,
  647. ) -> None:
  648. for video_tag in video_tags:
  649. self.upsert (video_tag, with_relation_tables)
  650. def untag_all (
  651. self,
  652. video_id: int,
  653. tag_ids: list[int],
  654. now: datetime,
  655. ) -> None:
  656. if not tag_ids:
  657. return
  658. with self.conn.cursor (dictionary = True) as c:
  659. c.execute ("""
  660. UPDATE
  661. video_tags
  662. SET
  663. untagged_at = %%s
  664. WHERE
  665. video_id = %%s
  666. AND tag_id IN (%s)""" % ', '.join (['%s'] * len (tag_ids)), (now, video_id, *tag_ids))
  667. print (c._executed)
  668. def _create_dto_from_row (
  669. self,
  670. row: VideoTagRow,
  671. with_relation_tables: bool,
  672. ) -> VideoTagDto:
  673. video_tag = VideoTagDto (id_ = row['id'],
  674. video_id = row['video_id'],
  675. tag_id = row['tag_id'],
  676. tagged_at = row['tagged_at'],
  677. untagged_at = row['untagged_at'] or DbNull)
  678. if with_relation_tables:
  679. video_tag.video = VideoDao (self.conn).find (video_tag.video_id, True)
  680. video_tag.tag = TagDao (self.conn).find (video_tag.tag_id)
  681. return video_tag
  682. class TagDao:
  683. def find (
  684. self,
  685. tag_id: int,
  686. ) -> TagDto | None:
  687. with self.conn.cursor (dictionary = True) as c:
  688. c.execute ("""
  689. SELECT
  690. id,
  691. name
  692. FROM
  693. tags
  694. WHERE
  695. id = %s""", (tag_id,))
  696. print (c._executed)
  697. row = cast (TagRow | None, c.fetchone ())
  698. if row is None:
  699. return None
  700. return self._create_dto_from_row (row)
  701. def fetch_by_name (
  702. self,
  703. tag_name: str,
  704. ) -> TagDto | None:
  705. with self.conn.cursor (dictionary = True) as c:
  706. c.execute ("""
  707. SELECT
  708. id,
  709. name
  710. FROM
  711. tags
  712. WHERE
  713. name = %s""", (tag_name,))
  714. print (c._executed)
  715. row = cast (TagRow | None, c.fetchone ())
  716. if row is None:
  717. return None
  718. return self._create_dto_from_row (row)
  719. def insert (
  720. self,
  721. tag: TagDto,
  722. ) -> None:
  723. with self.conn.cursor (dictionary = True) as c:
  724. c.execute ("""
  725. INSERT INTO
  726. tags(name)
  727. VALUES
  728. (%s)""", (tag.name,))
  729. print (c._executed)
  730. tag.id_ = c.lastrowid
  731. def upsert (
  732. self,
  733. tag: TagDto,
  734. ) -> None:
  735. with self.conn.cursor (dictionary = True) as c:
  736. c.execute ("""
  737. INSERT INTO
  738. tags(name)
  739. VALUES
  740. (%s)
  741. ON DUPLICATE KEY UPDATE
  742. id = LAST_INSERT_ID(id),
  743. name = VALUES(name)""", (tag.name,))
  744. print (c._executed)
  745. tag.id_ = c.lastrowid
  746. def _create_dto_from_row (
  747. self,
  748. row: TagRow,
  749. ) -> TagDto:
  750. return TagDto (id_ = row['id'],
  751. name = row['name'])
  752. class VideoHistoryDao:
  753. def fetch_by_video_id (
  754. self,
  755. video_id: int,
  756. with_relation_tables: bool,
  757. ) -> list[VideoHistoryDto]:
  758. with self.conn.cursor (dictionary = True) as c:
  759. c.execute ("""
  760. SELECT
  761. id,
  762. video_id,
  763. fetched_at,
  764. views_count
  765. FROM
  766. video_histories
  767. WHERE
  768. video_id = %s""", (video_id,))
  769. print (c._executed)
  770. video_histories: list[VideoHistoryDto] = []
  771. for row in cast (list[VideoHistoryRow], c.fetchall ()):
  772. video_histories.append (self._create_dto_from_row (row, with_relation_tables))
  773. return video_histories
  774. def insert (
  775. self,
  776. video_history: VideoHistoryDto,
  777. ) -> None:
  778. with self.conn.cursor (dictionary = True) as c:
  779. c.execute ("""
  780. INSERT INTO
  781. video_histories(
  782. video_id,
  783. fetched_at,
  784. views_count)
  785. VALUES
  786. (
  787. %s,
  788. %s,
  789. %s)""", (video_history.video_id,
  790. video_history.fetched_at,
  791. video_history.views_count))
  792. print (c._executed)
  793. def upsert (
  794. self,
  795. video_history: VideoHistoryDto,
  796. ) -> None:
  797. with self.conn.cursor (dictionary = True) as c:
  798. c.execute ("""
  799. INSERT INTO
  800. video_histories(
  801. video_id,
  802. fetched_at,
  803. views_count)
  804. VALUES
  805. (
  806. %s,
  807. %s,
  808. %s)
  809. ON DUPLICATE KEY UPDATE
  810. id = LAST_INSERT_ID(id),
  811. video_id = VALUES(video_id),
  812. fetched_at = VALUES(fetched_at),
  813. views_count = VALUES(views_count)""", (video_history.video_id,
  814. video_history.fetched_at,
  815. video_history.views_count))
  816. print (c._executed)
  817. def upsert_all (
  818. self,
  819. video_histories: list[VideoHistoryDto],
  820. ) -> None:
  821. for video_history in video_histories:
  822. self.upsert (video_history)
  823. def _create_dto_from_row (
  824. self,
  825. row: VideoHistoryRow,
  826. with_relation_tables: bool,
  827. ) -> VideoHistoryDto:
  828. video_history = VideoHistoryDto (id_ = row['id'],
  829. video_id = row['video_id'],
  830. fetched_at = row['fetched_at'],
  831. views_count = row['views_count'])
  832. if with_relation_tables:
  833. video_history.video = VideoDao (self.conn).find (video_history.video_id, True)
  834. return video_history
  835. class CommentDao:
  836. def fetch_by_video_id (
  837. self,
  838. video_id: int,
  839. with_relation_tables: bool,
  840. ) -> list[CommentDto]:
  841. with self.conn.cursor (dictionary = True) as c:
  842. c.execute ("""
  843. SELECT
  844. id,
  845. video_id,
  846. comment_no,
  847. user_id,
  848. content,
  849. posted_at,
  850. nico_count,
  851. vpos_ms
  852. FROM
  853. comments
  854. WHERE
  855. video_id = %s""", (video_id,))
  856. print (c._executed)
  857. comments: list[CommentDto] = []
  858. for row in cast (list[CommentRow], c.fetchall ()):
  859. comments.append (self._create_dto_from_row (row, with_relation_tables))
  860. return comments
  861. def upsert (
  862. self,
  863. comment: CommentDto,
  864. with_relation_tables: bool,
  865. ) -> None:
  866. vpos_ms: int | DbNullType | None = comment.vpos_ms
  867. if vpos_ms is None:
  868. raise TypeError ('未実装')
  869. if vpos_ms is DbNull:
  870. vpos_ms = None
  871. vpos_ms = cast (int | None, vpos_ms)
  872. with self.conn.cursor (dictionary = True) as c:
  873. c.execute ("""
  874. INSERT INTO
  875. comments(
  876. video_id,
  877. comment_no,
  878. user_id,
  879. content,
  880. posted_at,
  881. nico_count,
  882. vpos_ms)
  883. VALUES
  884. (
  885. %s,
  886. %s,
  887. %s,
  888. %s,
  889. %s,
  890. %s,
  891. %s)
  892. ON DUPLICATE KEY UPDATE
  893. id = LAST_INSERT_ID(id),
  894. video_id = VALUES(video_id),
  895. comment_no = VALUES(comment_no),
  896. user_id = VALUES(user_id),
  897. content = VALUES(content),
  898. posted_at = VALUES(posted_at),
  899. nico_count = VALUES(nico_count),
  900. vpos_ms = VALUES(vpos_ms)""", (comment.video_id,
  901. comment.comment_no,
  902. comment.user_id,
  903. comment.content,
  904. comment.posted_at,
  905. comment.nico_count,
  906. vpos_ms))
  907. print (c._executed)
  908. def upsert_all (
  909. self,
  910. comments: list[CommentDto],
  911. with_relation_tables: bool,
  912. ) -> None:
  913. for comment in comments:
  914. self.upsert (comment, with_relation_tables)
  915. def _create_dto_from_row (
  916. self,
  917. row: CommentRow,
  918. with_relation_tables: bool,
  919. ) -> CommentDto:
  920. comment = CommentDto (id_ = row['id'],
  921. video_id = row['video_id'],
  922. comment_no = row['comment_no'],
  923. user_id = row['user_id'],
  924. content = row['content'],
  925. posted_at = row['posted_at'],
  926. nico_count = row['nico_count'],
  927. vpos_ms = row['vpos_ms'] or DbNull)
  928. if with_relation_tables:
  929. comment.video = VideoDao (self.conn).find (comment.video_id, True)
  930. return comment
  931. class UserDao:
  932. def fetch_by_code (
  933. self,
  934. user_code: str
  935. ) -> UserDto | None:
  936. with self.conn.cursor (dictionary = True) as c:
  937. c.execute ("""
  938. SELECT
  939. id,
  940. code
  941. FROM
  942. users
  943. WHERE
  944. code = %s""", (user_code,))
  945. print (c._executed)
  946. row = cast (UserRow | None, c.fetchone ())
  947. if row is None:
  948. return None
  949. return self._create_dto_from_row (row)
  950. def insert (
  951. self,
  952. user: UserDto,
  953. ) -> None:
  954. with self.conn.cursor (dictionary = True) as c:
  955. c.execute ("""
  956. INSERT INTO
  957. users(code)
  958. VALUES
  959. (%s)""", (user.code,))
  960. print (c.execute)
  961. user.id_ = c.lastrowid
  962. def _create_dto_from_row (
  963. self,
  964. row: UserRow,
  965. ) -> UserDto:
  966. return UserDto (id_ = row['id'],
  967. code = row['code'])
  968. if __name__ == '__main__':
  969. main ()