import json import os import random import string import time from dataclasses import dataclass from datetime import datetime from typing import TypedDict, cast import mysql.connector import requests DbNull = (None,) DbNullType = tuple[None] class VideoResult (TypedDict): contentId: str title: str tags: list[str] description: str viewCounter: int startTime: str class VideoSearchParam (TypedDict): q: str targets: str _sort: str fields: str _limit: int jsonFilter: str class CommentResult (TypedDict): pass def main ( ) -> None: conn = mysql.connector.connect (host = os.environ['MYSQL_HOST'], user = os.environ['MYSQL_USER'], password = os.environ['MYSQL_PASS']) now = datetime.now () video_dao = VideoDao (conn) tag_dao = TagDao (conn) video_tag_dao = VideoTagDao (conn) video_history_dao = VideoHistoryDao (conn) api_data = search_nico_by_tags (['伊地知ニジカ', 'ぼざろクリーチャーシリーズ']) update_tables (video_dao, tag_dao, video_tag_dao, video_history_dao, api_data, now) # TODO: 書くこと def update_tables ( video_dao: VideoDao, tag_dao: TagDao, video_tag_dao: VideoTagDao, video_history_dao: VideoHistoryDao, api_data: list[VideoResult], now: datetime, ) -> None: for datum in api_data: video = VideoDto (code = datum['contentId'], title = datum['title'], description = datum['description'], uploaded_at = datum['startTime']) video_dao.upsert (video, False) if video.id_ is not None: video_history = VideoHistoryDto (video_id = video.id_, fetched_at = now, views_count = datum['viewCounter']) video_history_dao.insert (video_history) tag_ids: list[int] = [] video_tags = video_tag_dao.fetch_alive_by_video_id (video.id_, False) for vt in video_tags: tag = tag_dao.find (vt.tag_id) if (tag is not None and (tag.name not in datum['tags']) and (tag.id_ is not None)): tag_ids.append (tag.id_) video_tag_dao.untag_all (video.id_, tag_ids, now) tags: list[TagDto] = [] for tag_name in datum['tags']: tag = tag_dao.fetch_by_name (tag_name) if tag is None: tag = TagDto (name = tag_name) tag_dao.insert (tag) if video.id_ is not None and tag.id_ is not None: video_tag = video_tag_dao.fetch_alive_by_ids (video.id_, tag.id_, False) if video_tag is None: video_tag = VideoTagDto (video_id = video.id_, tag_id = tag.id_, tagged_at = now) video_tag_dao.insert (video_tag, False) # TODO: コメント取得すること # TODO: 削除処理,存在しなぃ動画リストを取得した上で行ふこと # video_dao.delete (video_ids, now) # TODO: 書くこと def fetch_comments ( video_id: str, ) -> list: headers = { 'X-Frontend-Id': '6', 'X-Frontend-Version': '0' } action_track_id = ( ''.join (random.choice (string.ascii_letters + string.digits) for _ in range (10)) + '_' + str (random.randrange (10 ** 12, 10 ** 13))) url = (f"https://www.nicovideo.jp/api/watch/v3_guest/{ video_id }" + f"?actionTrackId={ action_track_id }") res = requests.post (url, headers = headers, timeout = 60).json () try: nv_comment = res['data']['comment']['nvComment'] except KeyError: return [] if nv_comment is None: return [] headers = { 'X-Frontend-Id': '6', 'X-Frontend-Version': '0', 'Content-Type': 'application/json' } params = { 'params': nv_comment['params'], 'additionals': { }, 'threadKey': nv_comment['threadKey'] } url = nv_comment['server'] + '/v1/threads' res = (requests.post (url, json.dumps (params), headers = headers, timeout = 60) .json ()) try: return res['data']['threads'][1]['comments'] except (IndexError, KeyError): return [] def search_nico_by_tag ( tag: str, ) -> list[VideoResult]: return search_nico_by_tags ([tag]) def search_nico_by_tags ( tags: list[str], ) -> list[VideoResult]: url = ('https://snapshot.search.nicovideo.jp' + '/api/v2/snapshot/video/contents/search') # TODO: 年月日の設定ができてゐなぃのと,100 件までしか取得できなぃので何とかすること query_filter = json.dumps ({ 'type': 'or', 'filters': [ { 'type': 'range', 'field': 'startTime', 'from': f"{year}-{start}T00:00:00+09:00", 'to': f"{year}-{end}T23:59:59+09:00", 'include_lower': True }] }) params: VideoSearchParam params = { 'q': ' OR '.join (tags), 'targets': 'tagsExact', '_sort': '-viewCounter', 'fields': 'contentId,title,tags,description,viewCounter,startTime', '_limit': 100, 'jsonFilter': query_filter } res = requests.get (url, params = cast (dict[str, int | str], params), timeout = 60).json () return res['data'] class VideoDao: def __init__ ( self, conn ): self.conn = conn def find ( self, video_id: int, with_relation_tables: bool, ) -> VideoDto | None: with self.conn.cursor () as c: c.execute (""" SELECT id, code, title, description, uploaded_at, deleted_at FROM videos WHERE id = %s ORDER BY id""", video_id) row = c.fetchone () if row is None: return None return self._create_dto_from_row (row, with_relation_tables) def fetch_all ( self, with_relation_tables: bool, ) -> list[VideoDto]: with self.conn.cursor () as c: c.execute (""" SELECT id, code, title, description, uploaded_at, deleted_at FROM videos ORDER BY id""") videos: list[VideoDto] = [] for row in c.fetchall (): videos.append (self._create_dto_from_row (row, with_relation_tables)) return videos def upsert ( self, video: VideoDto, with_relation_tables: bool, ) -> None: with self.conn.cursor () as c: c.execute (""" INSERT INTO videos( code, title, description, uploaded_at, deleted_at) VALUES ( %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE code = VALUES(code), title = VALUES(title), description = VALUES(description), uploaded_at = VALUES(uploaded_at), deleted_at = VALUES(deleted_at)""", (video.code, video.title, video.description, video.uploaded_at, video.deleted_at)) video.id_ = c.lastrowid if with_relation_tables: if video.video_tags is not None: VideoTagDao (self.conn).upsert_all (video.video_tags, False) if video.comments is not None: CommentDao (self.conn).upsert_all (video.comments, False) if video.video_histories is not None: VideoHistoryDao (self.conn).upsert_all (video.video_histories) def upsert_all ( self, videos: list[VideoDto], with_relation_tables: bool, ) -> None: for video in videos: self.upsert (video, with_relation_tables) def delete ( self, video_ids: list[int], at: datetime, ) -> None: with self.conn.cursor () as c: c.execute (""" UPDATE videos SET deleted_at = %s WHERE id IN (%s)""", (at, (*video_ids,))) def _create_dto_from_row ( self, row, with_relation_tables: bool, ) -> VideoDto: video = VideoDto (id_ = row['id'], code = row['code'], title = row['title'], description = row['description'], uploaded_at = row['uploaded_at'], deleted_at = row['deleted_at']) if with_relation_tables and video.id_ is not None: video.video_tags = VideoTagDao (self.conn).fetch_by_video_id (video.id_, False) for i in range (len (video.video_tags)): video.video_tags[i].video = video video.comments = CommentDao (self.conn).fetch_by_video_id (video.id_, False) for i in range (len (video.comments)): video.comments[i].video = video video.video_histories = VideoHistoryDao (self.conn).fetch_by_video_id (video.id_, False) for i in range (len (video.video_histories)): video.video_histories[i].video = video return video @dataclass (slots = True) class VideoDto: code: str title: str description: str uploaded_at: datetime id_: int | None = None deleted_at: datetime | DbNullType = DbNull video_tags: list[VideoTagDto] | None = None comments: list[CommentDto] | None = None video_histories: list[VideoHistoryDto] | None = None class VideoTagDao: def __init__ ( self, conn, ): self.conn = conn def fetch_by_video_id ( self, video_id: int, with_relation_tables: bool, ) -> list[VideoTagDto]: with self.conn.cursor () as c: c.execute (""" SELECT id, video_id, tag_id, tagged_at, untagged_at FROM video_tags WHERE video_id = %s ORDER BY id""", video_id) video_tags: list[VideoTagDto] = [] for row in c.fetchall (): video_tags.append (self._create_dto_from_row (row, with_relation_tables)) return video_tags def fetch_alive_by_video_id ( self, video_id: int, with_relation_tables: bool, ) -> list[VideoTagDto]: with self.conn.cursor () as c: c.execute (""" SELECT id, video_id, tag_id, tagged_at, untagged_at FROM video_tags WHERE video_id = %s AND (untagged_at IS NULL) ORDER BY id""", video_id) video_tags: list[VideoTagDto] = [] for row in c.fetchall (): video_tags.append (self._create_dto_from_row (row, with_relation_tables)) return video_tags def fetch_alive_by_ids ( self, video_id: int, tag_id: int, with_relation_tables: bool, ) -> VideoTagDto | None: with self.conn.cursor () as c: c.execute (""" SELECT id, video_id, tag_id, tagged_at, untagged_at FROM video_tags WHERE video_id = %s AND tag_id = %s""", (video_id, tag_id)) row = c.fetchone () if row is None: return None return self._create_dto_from_row (row, with_relation_tables) def insert ( self, video_tag: VideoTagDto, with_relation_tables: bool, ) -> None: with self.conn.cursor () as c: c.execute (""" INSERT INTO video_tags( video_id, tag_id, tagged_at, untagged_at) VALUES ( %s, %s, %s, %s)""", (video_tag.video_id, video_tag.tag_id, video_tag.tagged_at, video_tag.untagged_at)) video_tag.id_ = c.lastrowid if with_relation_tables: if video_tag.video is not None: VideoDao (self.conn).upsert (video_tag.video, True) if video_tag.tag is not None: TagDao (self.conn).upsert (video_tag.tag) def upsert ( self, video_tag: VideoTagDto, with_relation_tables: bool, ) -> None: with self.conn.cursor () as c: c.execute (""" INSERT INTO video_tags( video_id, tag_id, tagged_at, untagged_at) VALUES ( %s, %s, %s, %s) ON DUPLICATE KEY UPDATE video_id = VALUES(video_id), tag_id = VALUES(tag_id), tagged_at = VALUES(tagged_at), untagged_at = VALUES(untagged_at)""", (video_tag.video_id, video_tag.tag_id, video_tag.tagged_at, video_tag.untagged_at)) video_tag.id_ = c.lastrowid if with_relation_tables: if video_tag.video is not None: VideoDao (self.conn).upsert (video_tag.video, True) if video_tag.tag is not None: TagDao (self.conn).upsert (video_tag.tag) def upsert_all ( self, video_tags: list[VideoTagDto], with_relation_tables: bool, ) -> None: for video_tag in video_tags: self.upsert (video_tag, with_relation_tables) def untag_all ( self, video_id: int, tag_ids: list[int], now: datetime ) -> None: with self.conn.cursor () as c: c.execute (""" UPDATE video_tags SET untagged_at = %s WHERE video_id = %s AND tag_ids IN (%s)""", (now, video_id, (*tag_ids,))) def _create_dto_from_row ( self, row, with_relation_tables: bool, ) -> VideoTagDto: video_tag = VideoTagDto (id_ = row['id'], video_id = row['video_id'], tag_id = row['tag_id'], tagged_at = row['tagged_at'], untagged_at = row['untagged_at']) if with_relation_tables: video_tag.video = VideoDao (self.conn).find (video_tag.video_id, True) video_tag.tag = TagDao (self.conn).find (video_tag.tag_id) return video_tag @dataclass (slots = True) class VideoTagDto: video_id: int tag_id: int tagged_at: datetime id_: int | None = None untagged_at: datetime | DbNullType = DbNull video: VideoDto | None = None tag: TagDto | None = None class TagDao: def __init__ ( self, conn, ): self.conn = conn def find ( self, tag_id: int, ) -> TagDto | None: with self.conn.cursor () as c: c.execute (""" SELECT id, name) FROM tags WHERE id = %s""", tag_id) row = c.fetchone () if row is None: return None return self._create_dto_from_row (row) def fetch_by_name ( self, tag_name: str, ) -> TagDto | None: with self.conn.cursor () as c: c.execute (""" SELECT id, name FROM tags WHERE name = %s""", tag_name) row = c.fetchone () if row is None: return None return self._create_dto_from_row (row) def insert ( self, tag: TagDto, ) -> None: with self.conn.cursor () as c: c.execute (""" INSERT INTO tags(name) VALUES (%s)""", tag.name) tag.id_ = c.lastrowid def upsert ( self, tag: TagDto, ) -> None: with self.conn.cursor () as c: c.execute (""" INSERT INTO tags(name) VALUES (%s) ON DUPLICATE KEY UPDATE name = VALUES(name)""", tag.name) tag.id_ = c.lastrowid def _create_dto_from_row ( self, row, ) -> TagDto: return TagDto (id_ = row['id'], name = row['name']) @dataclass (slots = True) class TagDto: name: str id_: int | None = None class VideoHistoryDao: def __init__ ( self, conn, ): self.conn = conn def fetch_by_video_id ( self, video_id: int, with_relation_tables: bool, ) -> list[VideoHistoryDto]: with self.conn.cursor () as c: c.execute (""" SELECT id, video_id, fetched_at, views_count FROM video_histories WHERE video_id = %s""", video_id) video_histories: list[VideoHistoryDto] = [] for row in c.fetchall (): video_histories.append (self._create_dto_from_row (row, with_relation_tables)) return video_histories def insert ( self, video_history: VideoHistoryDto, ) -> None: with self.conn.cursor () as c: c.execute (""" INSERT INTO video_histories( video_id, fetched_at, views_count) VALUES ( %s, %s, %s)""", (video_history.video_id, video_history.fetched_at, video_history.views_count)) def upsert ( self, video_history: VideoHistoryDto, ) -> None: with self.conn.cursor () as c: c.execute (""" INSERT INTO video_histories( video_id, fetched_at, views_count) VALUES ( %s, %s, %s) ON DUPLICATE KEY UPDATE video_id, fetched_at, views_count""", (video_history.video_id, video_history.fetched_at, video_history.views_count)) def upsert_all ( self, video_histories: list[VideoHistoryDto], ) -> None: for video_history in video_histories: self.upsert (video_history) def _create_dto_from_row ( self, row, with_relation_tables: bool, ) -> VideoHistoryDto: video_history = VideoHistoryDto (id_ = row['id'], video_id = row['video_id'], fetched_at = row['fetched_at'], views_count = row['views_count']) if with_relation_tables: video_history.video = VideoDao (self.conn).find (video_history.video_id, True) return video_history @dataclass (slots = True) class VideoHistoryDto: video_id: int fetched_at: datetime views_count: int id_: int | None = None video: VideoDto | None = None class CommentDao: def __init__ ( self, conn, ): self.conn = conn def fetch_by_video_id ( self, video_id: int, with_relation_tables: bool, ) -> list[CommentDto]: with self.conn.cursor () as c: c.execute (""" SELECT id, video_id, comment_no, user_id, content, posted_at, nico_count, vpos_ms FROM comments WHERE video_id = %s""", video_id) comments: list[CommentDto] = [] for row in c.fetchall (): comments.append (self._create_dto_from_row (row, with_relation_tables)) return comments def upsert ( self, comment: CommentDto, with_relation_tables: bool, ) -> None: with self.conn.cursor () as c: c.execute (""" INSERT INTO comments( video_id, comment_no, user_id, content, posted_at, nico_count, vpos_ms) VALUES ( %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE video_id = VALUES(video_id), comment_no = VALUES(comment_no), user_id = VALUES(user_id), content = VALUES(content), posted_at = VALUES(posted_at), nico_count = VALUES(nico_count), vpos_ms = VALUES(vpos_ms)""", (comment.video_id, comment.comment_no, comment.user_id, comment.content, comment.posted_at, comment.nico_count, comment.vpos_ms)) def upsert_all ( self, comments: list[CommentDto], with_relation_tables: bool, ) -> None: for comment in comments: self.upsert (comment, with_relation_tables) def _create_dto_from_row ( self, row, with_relation_tables: bool, ) -> CommentDto: comment = CommentDto (id_ = row['id'], video_id = row['video_id'], comment_no = row['comment_no'], user_id = row['user_id'], content = row['content'], posted_at = row['posted_at'], nico_count = row['nico_count'], vpos_ms = row['vpos_ms']) if with_relation_tables: comment.video = VideoDao (self.conn).find (comment.video_id, True) return comment @dataclass (slots = True) class CommentDto: video_id: int comment_no: int user_id: int content: str posted_at: datetime id_: int | None = None nico_count: int = 0 vpos_ms: int | DbNullType = DbNull video: VideoDto | None = None if __name__ == '__main__': main ()