import json import os import random import string import time from dataclasses import dataclass from datetime import datetime import mysql.connector import requests def main ( ) -> None: conn = mysql.connector.connect (host = os.environ['MYSQL_HOST'], user = os.environ['MYSQL_USER'], password = os.environ['MYSQL_PASS']) video_dao = VideoDao (conn) api_data = search_nico_by_tags (['伊地知ニジカ', 'ぼざろクリーチャーシリーズ']) update_video_table (video_dao, api_data) # TODO: 書くこと def update_video_table ( video_dao, api_data: list[dict], ) -> None: # TODO: 書くこと video_dao.upsert_all (videos) video_dao.delete_nonexistent_data (video_id_list) # TODO: 書くこと def fetch_comments ( video_id: str, ) -> list: headers = { 'X-Frontend-Id': '6', 'X-Frontend-Version': '0' } action_track_id = ( ''.join (random.choice (string.ascii_letters + string.digits) for _ in range (10)) + '_' + str (random.randrange (10 ** 12, 10 ** 13))) url = (f"https://www.nicovideo.jp/api/watch/v3_guest/{ video_id }" + f"?actionTrackId={ action_track_id }") res = requests.post (url, headers = headers, timeout = 60).json () try: nv_comment = res['data']['comment']['nvComment'] except KeyError: return [] if nv_comment is None: return [] headers = { 'X-Frontend-Id': '6', 'X-Frontend-Version': '0', 'Content-Type': 'application/json' } params = { 'params': nv_comment['params'], 'additionals': { }, 'threadKey': nv_comment['threadKey'] } url = nv_comment['server'] + '/v1/threads' res = (requests.post (url, json.dumps (params), headers = headers, timeout = 60) .json ()) try: return res['data']['threads'][1]['comments'] except (IndexError, KeyError): return [] def search_nico_by_tag ( tag: str, ) -> list[dict]: return search_nico_by_tags ([tag]) def search_nico_by_tags ( tags: list[str], ) -> list[dict]: url = ('https://snapshot.search.nicovideo.jp' + '/api/v2/snapshot/video/contents/search') # TODO: 年月日の設定ができてゐなぃのと,100 件までしか取得できなぃので何とかすること query_filter = json.dumps ({ 'type': 'or', 'filters': [ { 'type': 'range', 'field': 'startTime', 'from': f"{year}-{start}T00:00:00+09:00", 'to': f"{year}-{end}T23:59:59+09:00", 'include_lower': True }] }) params: dict[str, int | str] params = { 'q': ' OR '.join (tags), 'targets': 'tagsExact', '_sort': '-viewCounter', 'fields': 'contentId,title,tags,viewCounter,startTime', '_limit': 100, 'jsonFilter': query_filter } res = requests.get (url, params = params, timeout = 60).json () return res['data'] class VideoDao: def __init__ ( self, conn ): self.conn = conn def fetch ( self, video_id: int, with_relation_tables: bool = True, ) -> VideoDto | None: with self.conn.cursor () as c: c.execute (""" SELECT id, code, title, description, uploaded_at, deleted_at FROM videos WHERE id = %s ORDER BY id""", video_id) row = c.fetchone () if row is None: return None return self._create_dto_from_row (row, with_relation_tables) def fetch_all ( self, with_relation_tables: bool = True, ) -> list[VideoDto]: with self.conn.cursor () as c: c.execute (""" SELECT id, code, title, description, uploaded_at, deleted_at FROM videos ORDER BY id""") videos: list[VideoDto] = [] for row in c.fetchall (): videos.append (self._create_dto_from_row (row, with_relation_tables)) return videos def upsert_all ( self, videos: list[VideoDto], with_relation_tables: bool = True, ) -> int: with self.conn.cursor () as c: for video in videos: c.execute (""" INSERT INTO videos( code, title, description, uploaded_at, deleted_at) VALUES ( %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE code = VALUES(code), title = VALUES(title), description = VALUES(description), uploaded_at = VALUES(uploaded_at), deleted_at = VALUES(deleted_at)""", ( video.code, video.title, video.description, video.uploaded_at, video.deleted_at)) video.id_ = c.lastrowid # TODO: with_relation_tables が True の場合,子テーブルも UPSERT すること def _create_dto_from_row ( self, row, with_relation_tables: bool, ) -> VideoDto: video = VideoDto (id_ = row['id'], code = row['code'], title = row['title'], description = row['description'], uploaded_at = row['uploaded_at'], deleted_at = row['deleted_at'], video_tags = None, comments = None, video_histories = None) if with_relation_tables: video.video_tags = VideoTagDao (self.conn).fetch_by_video_id (video.id_, False) for i in range (len (video.video_tags)): video.video_tags[i].video = video video.comments = CommentDao (self.conn).fetch_by_video_id (video.id_, False) for i in range (len (video.comments)): video.comments[i].video = video video.video_histories = VideoHistoryDao (self.conn).fetch_by_video_id (video.id_, False) for i in range (len (video.video_histories)): video.video_histories[i].video = video return video @dataclass (slots = True) class VideoDto: id_: int code: str title: str description: str uploaded_at: datetime deleted_at: datetime | None video_tags: list[VideoTagDto] | None comments: list[CommentDto] | None video_histories: list[VideoHistoryDto] | None class VideoTagDao: def __init__ ( self, conn, ): self.conn = conn def fetch_by_video_id ( self, video_id: int, with_relation_tables: bool = True, ) -> list[VideoTagDto]: with self.conn.cursor () as c: c.execute (""" SELECT id, video_id, tag_id, tagged_at, untagged_at FROM video_tags WHERE video_id = %s ORDER BY id""", video_id) video_tags: list[VideoTagDto] = [] for row in c.fetchall (): video_tags.append (self._create_dto_from_row (row, with_relation_tables)) return video_tags def _create_dto_from_row ( self, row, with_relation_tables: bool, ) -> VideoTagDto: video_tag = VideoTagDto (id_ = row['id'], video_id = row['video_id'], tag_id = row['tag_id'], tagged_at = row['tagged_at'], untagged_at = row['untagged_at'], video = None, tag = None) if with_relation_tables: video_tag.video = VideoDao (self.conn).fetch (video_tag.video_id, True) video_tag.tag = TagDao (self.conn).fetch (video_tag.tag_id, True) return video_tag @dataclass (slots = True) class VideoTagDto: id_: int video_id: int tag_id: int tagged_at: datetime untagged_at: datetime video: VideoDto tag: TagDto if __name__ == '__main__': main ()