diff --git a/update_db.py b/update_db.py index ea69a45..9fb4733 100644 --- a/update_db.py +++ b/update_db.py @@ -5,31 +5,72 @@ import string import time from dataclasses import dataclass from datetime import datetime +from typing import TypedDict, cast import mysql.connector import requests +DbNull = (None,) +DbNullType = tuple[None] + + +class VideoResult (TypedDict): + contentId: str + title: str + tags: list[str] + description: str + viewCounter: int + startTime: str + + +class VideoSearchParam (TypedDict): + q: str + targets: str + _sort: str + fields: str + _limit: int + jsonFilter: str + + def main ( ) -> None: conn = mysql.connector.connect (host = os.environ['MYSQL_HOST'], user = os.environ['MYSQL_USER'], password = os.environ['MYSQL_PASS']) + now = datetime.now () + video_dao = VideoDao (conn) + tag_dao = TagDao (conn) api_data = search_nico_by_tags (['伊地知ニジカ', 'ぼざろクリーチャーシリーズ']) - update_video_table (video_dao, api_data) + update_video_and_tag_table (video_dao, tag_dao, api_data, now) # TODO: 書くこと -def update_video_table ( +def update_video_and_tag_table ( video_dao: VideoDao, - api_data: list[dict], + tag_dao: TagDao, + api_data: list[VideoResult], + now: datetime, ) -> None: - # TODO: videos 取得書くこと + videos: list[VideoDto] = [] + for datum in api_data: + tags: list[TagDto] = [] + for tag_name in datum['tags']: + tags.append (TagDto (name = tag_name)) + tag_dao.upsert_all (tags) + # TODO: タグの対応づけすること + # TODO: コメント取得すること + video = VideoDto (code = datum['contentId'], + title = datum['title'], + description = datum['description'], + uploaded_at = datum['startTime'], + deleted_at = DbNull) + videos.append (video) video_dao.upsert_all (videos) @@ -85,13 +126,13 @@ def fetch_comments ( def search_nico_by_tag ( tag: str, -) -> list[dict]: +) -> list[VideoResult]: return search_nico_by_tags ([tag]) def search_nico_by_tags ( tags: list[str], -) -> list[dict]: +) -> list[VideoResult]: url = ('https://snapshot.search.nicovideo.jp' + '/api/v2/snapshot/video/contents/search') @@ -105,15 +146,15 @@ def search_nico_by_tags ( 'to': f"{year}-{end}T23:59:59+09:00", 'include_lower': True }] }) - params: dict[str, int | str] + params: VideoSearchParam params = { 'q': ' OR '.join (tags), 'targets': 'tagsExact', '_sort': '-viewCounter', - 'fields': 'contentId,title,tags,viewCounter,startTime', + 'fields': 'contentId,title,tags,description,viewCounter,startTime', '_limit': 100, 'jsonFilter': query_filter } - res = requests.get (url, params = params, timeout = 60).json () + res = requests.get (url, params = cast (dict[str, int | str], params), timeout = 60).json () return res['data'] @@ -172,6 +213,43 @@ class VideoDao: videos.append (self._create_dto_from_row (row, with_relation_tables)) return videos + def upsert ( + self, + video: VideoDto, + with_relation_tables: bool = True, + ) -> None: + with self.conn.cursor () as c: + c.execute (""" + INSERT INTO + videos( + code, + title, + description, + uploaded_at, + deleted_at) + VALUES + ( + %s, + %s, + %s, + %s, + %s) + ON DUPLICATE KEY UPDATE + code = VALUES(code), + title = VALUES(title), + description = VALUES(description), + uploaded_at = VALUES(uploaded_at), + deleted_at = VALUES(deleted_at)""", (video.code, + video.title, + video.description, + video.uploaded_at, + video.deleted_at)) + video.id_ = c.lastrowid + if with_relation_tables: + VideoTagDao (self.conn).upsert_all (video.video_tags, False) + CommentDao (self.conn).upsert_all (video.comments, False) + VideoHistoryDao (self.conn).upsert_all (video.video_histories, False) + def upsert_all ( self, videos: list[VideoDto], @@ -179,36 +257,7 @@ class VideoDao: ) -> None: with self.conn.cursor () as c: for video in videos: - c.execute (""" - INSERT INTO - videos( - code, - title, - description, - uploaded_at, - deleted_at) - VALUES - ( - %s, - %s, - %s, - %s, - %s) - ON DUPLICATE KEY UPDATE - code = VALUES(code), - title = VALUES(title), - description = VALUES(description), - uploaded_at = VALUES(uploaded_at), - deleted_at = VALUES(deleted_at)""", (video.code, - video.title, - video.description, - video.uploaded_at, - video.deleted_at)) - video.id_ = c.lastrowid - if with_relation_tables: - VideoTagDao (self.conn).upsert_all (video.video_tags, False) - CommentDao (self.conn).upsert_all (video.comments, False) - VideoHistoryDao (self.conn).upsert_all (video.video_histories, False) + self.upsert (video, with_relation_tables) def _create_dto_from_row ( self, @@ -222,7 +271,7 @@ class VideoDao: uploaded_at = row['uploaded_at'], deleted_at = row['deleted_at']) if with_relation_tables: - video.video_tags = VideoTagDao (self.conn).fetch_by_video_id (video.id_, False) + video.video_tags = VideoTagDao (self.conn).fetch_by_video_id (cast (int, video.id_), False) for i in range (len (video.video_tags)): video.video_tags[i].video = video video.comments = CommentDao (self.conn).fetch_by_video_id (video.id_, False) @@ -236,12 +285,12 @@ class VideoDao: @dataclass (slots = True) class VideoDto: - id_: int | None = None code: str title: str description: str uploaded_at: datetime - deleted_at: datetime | None = None + id_: int | None = None + deleted_at: datetime | DbNullType = DbNull video_tags: list[VideoTagDto] | None = None comments: list[CommentDto] | None = None video_histories: list[VideoHistoryDto] | None = None @@ -296,13 +345,32 @@ class VideoTagDao: @dataclass (slots = True) class VideoTagDto: - id_: int | None = None - video_id: int - tag_id: int tagged_at: datetime - untagged_at: datetime - video: VideoDto | None = None - tag: TagDto | None = None + id_: int | None = None + video_id: int | None = None + tag_id: int | None = None + untagged_at: datetime | DbNullType = DbNull + video: VideoDto | None = None + tag: TagDto | None = None + + +@dataclass (slots = True) +class TagDto: + name: str + id_: int | None = None + + +@dataclass (slots = True) +class CommentDto: + video_id: int + comment_no: int + user_id: int + content: str + posted_at: datetime + id_: int | None = None + nico_count: int = 0 + vpos_ms: int | DbNullType = DbNull + if __name__ == '__main__':