import json import os import random import string import time from dataclasses import dataclass from datetime import datetime from typing import TypedDict, cast import mysql.connector import requests DbNull = (None,) DbNullType = tuple[None] class VideoResult (TypedDict): contentId: str title: str tags: list[str] description: str viewCounter: int startTime: str class VideoSearchParam (TypedDict): q: str targets: str _sort: str fields: str _limit: int jsonFilter: str def main ( ) -> None: conn = mysql.connector.connect (host = os.environ['MYSQL_HOST'], user = os.environ['MYSQL_USER'], password = os.environ['MYSQL_PASS']) now = datetime.now () video_dao = VideoDao (conn) tag_dao = TagDao (conn) api_data = search_nico_by_tags (['伊地知ニジカ', 'ぼざろクリーチャーシリーズ']) update_video_and_tag_table (video_dao, tag_dao, api_data, now) # TODO: 書くこと def update_video_and_tag_table ( video_dao: VideoDao, tag_dao: TagDao, api_data: list[VideoResult], now: datetime, ) -> None: videos: list[VideoDto] = [] for datum in api_data: tags: list[TagDto] = [] for tag_name in datum['tags']: tags.append (TagDto (name = tag_name)) tag_dao.upsert_all (tags) # TODO: タグの対応づけすること # TODO: コメント取得すること video = VideoDto (code = datum['contentId'], title = datum['title'], description = datum['description'], uploaded_at = datum['startTime'], deleted_at = DbNull) videos.append (video) video_dao.upsert_all (videos) video_dao.delete_nonexistent_data (video_id_list) # TODO: 書くこと def fetch_comments ( video_id: str, ) -> list: headers = { 'X-Frontend-Id': '6', 'X-Frontend-Version': '0' } action_track_id = ( ''.join (random.choice (string.ascii_letters + string.digits) for _ in range (10)) + '_' + str (random.randrange (10 ** 12, 10 ** 13))) url = (f"https://www.nicovideo.jp/api/watch/v3_guest/{ video_id }" + f"?actionTrackId={ action_track_id }") res = requests.post (url, headers = headers, timeout = 60).json () try: nv_comment = res['data']['comment']['nvComment'] except KeyError: return [] if nv_comment is None: return [] headers = { 'X-Frontend-Id': '6', 'X-Frontend-Version': '0', 'Content-Type': 'application/json' } params = { 'params': nv_comment['params'], 'additionals': { }, 'threadKey': nv_comment['threadKey'] } url = nv_comment['server'] + '/v1/threads' res = (requests.post (url, json.dumps (params), headers = headers, timeout = 60) .json ()) try: return res['data']['threads'][1]['comments'] except (IndexError, KeyError): return [] def search_nico_by_tag ( tag: str, ) -> list[VideoResult]: return search_nico_by_tags ([tag]) def search_nico_by_tags ( tags: list[str], ) -> list[VideoResult]: url = ('https://snapshot.search.nicovideo.jp' + '/api/v2/snapshot/video/contents/search') # TODO: 年月日の設定ができてゐなぃのと,100 件までしか取得できなぃので何とかすること query_filter = json.dumps ({ 'type': 'or', 'filters': [ { 'type': 'range', 'field': 'startTime', 'from': f"{year}-{start}T00:00:00+09:00", 'to': f"{year}-{end}T23:59:59+09:00", 'include_lower': True }] }) params: VideoSearchParam params = { 'q': ' OR '.join (tags), 'targets': 'tagsExact', '_sort': '-viewCounter', 'fields': 'contentId,title,tags,description,viewCounter,startTime', '_limit': 100, 'jsonFilter': query_filter } res = requests.get (url, params = cast (dict[str, int | str], params), timeout = 60).json () return res['data'] class VideoDao: def __init__ ( self, conn ): self.conn = conn def fetch ( self, video_id: int, with_relation_tables: bool = True, ) -> VideoDto | None: with self.conn.cursor () as c: c.execute (""" SELECT id, code, title, description, uploaded_at, deleted_at FROM videos WHERE id = %s ORDER BY id""", video_id) row = c.fetchone () if row is None: return None return self._create_dto_from_row (row, with_relation_tables) def fetch_all ( self, with_relation_tables: bool = True, ) -> list[VideoDto]: with self.conn.cursor () as c: c.execute (""" SELECT id, code, title, description, uploaded_at, deleted_at FROM videos ORDER BY id""") videos: list[VideoDto] = [] for row in c.fetchall (): videos.append (self._create_dto_from_row (row, with_relation_tables)) return videos def upsert ( self, video: VideoDto, with_relation_tables: bool = True, ) -> None: with self.conn.cursor () as c: c.execute (""" INSERT INTO videos( code, title, description, uploaded_at, deleted_at) VALUES ( %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE code = VALUES(code), title = VALUES(title), description = VALUES(description), uploaded_at = VALUES(uploaded_at), deleted_at = VALUES(deleted_at)""", (video.code, video.title, video.description, video.uploaded_at, video.deleted_at)) video.id_ = c.lastrowid if with_relation_tables: VideoTagDao (self.conn).upsert_all (video.video_tags, False) CommentDao (self.conn).upsert_all (video.comments, False) VideoHistoryDao (self.conn).upsert_all (video.video_histories, False) def upsert_all ( self, videos: list[VideoDto], with_relation_tables: bool = True, ) -> None: with self.conn.cursor () as c: for video in videos: self.upsert (video, with_relation_tables) def _create_dto_from_row ( self, row, with_relation_tables: bool, ) -> VideoDto: video = VideoDto (id_ = row['id'], code = row['code'], title = row['title'], description = row['description'], uploaded_at = row['uploaded_at'], deleted_at = row['deleted_at']) if with_relation_tables: video.video_tags = VideoTagDao (self.conn).fetch_by_video_id (cast (int, video.id_), False) for i in range (len (video.video_tags)): video.video_tags[i].video = video video.comments = CommentDao (self.conn).fetch_by_video_id (video.id_, False) for i in range (len (video.comments)): video.comments[i].video = video video.video_histories = VideoHistoryDao (self.conn).fetch_by_video_id (video.id_, False) for i in range (len (video.video_histories)): video.video_histories[i].video = video return video @dataclass (slots = True) class VideoDto: code: str title: str description: str uploaded_at: datetime id_: int | None = None deleted_at: datetime | DbNullType = DbNull video_tags: list[VideoTagDto] | None = None comments: list[CommentDto] | None = None video_histories: list[VideoHistoryDto] | None = None class VideoTagDao: def __init__ ( self, conn, ): self.conn = conn def fetch_by_video_id ( self, video_id: int, with_relation_tables: bool = True, ) -> list[VideoTagDto]: with self.conn.cursor () as c: c.execute (""" SELECT id, video_id, tag_id, tagged_at, untagged_at FROM video_tags WHERE video_id = %s ORDER BY id""", video_id) video_tags: list[VideoTagDto] = [] for row in c.fetchall (): video_tags.append (self._create_dto_from_row (row, with_relation_tables)) return video_tags def _create_dto_from_row ( self, row, with_relation_tables: bool, ) -> VideoTagDto: video_tag = VideoTagDto (id_ = row['id'], video_id = row['video_id'], tag_id = row['tag_id'], tagged_at = row['tagged_at'], untagged_at = row['untagged_at']) if with_relation_tables: video_tag.video = VideoDao (self.conn).fetch (video_tag.video_id, True) video_tag.tag = TagDao (self.conn).fetch (video_tag.tag_id, True) return video_tag @dataclass (slots = True) class VideoTagDto: tagged_at: datetime id_: int | None = None video_id: int | None = None tag_id: int | None = None untagged_at: datetime | DbNullType = DbNull video: VideoDto | None = None tag: TagDto | None = None @dataclass (slots = True) class TagDto: name: str id_: int | None = None @dataclass (slots = True) class CommentDto: video_id: int comment_no: int user_id: int content: str posted_at: datetime id_: int | None = None nico_count: int = 0 vpos_ms: int | DbNullType = DbNull if __name__ == '__main__': main ()