""" 日次で実行し,ぼざクリ DB を最新に更新する. """ from __future__ import annotations import json import os import random import string import time from dataclasses import dataclass from datetime import date, datetime, timedelta from typing import Any, Type, TypedDict, cast import mysql.connector import requests from mysql.connector.connection import MySQLConnectionAbstract class DbNull: def __new__ ( cls, ): delattr (cls, '__init__') DbNullType = Type[DbNull] class VideoSearchParam (TypedDict): q: str targets: str _sort: str fields: str _limit: int jsonFilter: str class VideoResult (TypedDict): contentId: str title: str tags: str description: str | None viewCounter: int startTime: str class CommentResult (TypedDict): id: str no: int vposMs: int body: str commands: list[str] userId: str isPremium: bool score: int postedAt: str nicoruCount: int nicoruId: Any source: str isMyPost: bool class CommentRow (TypedDict): id: int video_id: int comment_no: int user_id: int content: str posted_at: datetime nico_count: int vpos_ms: int | None class TagRow (TypedDict): id: int name: str class UserRow (TypedDict): id: int code: str class VideoRow (TypedDict): id: int code: str title: str description: str uploaded_at: datetime deleted_at: datetime | None class VideoHistoryRow (TypedDict): id: int video_id: int fetched_at: date views_count: int class VideoTagRow (TypedDict): id: int video_id: int tag_id: int tagged_at: date untagged_at: date | None def main ( ) -> None: conn = mysql.connector.connect (user = os.environ['MYSQL_USER'], password = os.environ['MYSQL_PASS'], database = 'nizika_nico') if not isinstance (conn, MySQLConnectionAbstract): raise TypeError now = datetime.now () video_dao = VideoDao (conn) tag_dao = TagDao (conn) video_tag_dao = VideoTagDao (conn) video_history_dao = VideoHistoryDao (conn) comment_dao = CommentDao (conn) user_dao = UserDao (conn) api_data = search_nico_by_tags (['伊地知ニジカ', 'ぼざろクリーチャーシリーズ']) update_tables (video_dao, tag_dao, video_tag_dao, video_history_dao, comment_dao, user_dao, api_data, now) conn.commit () conn.close () def update_tables ( video_dao: VideoDao, tag_dao: TagDao, video_tag_dao: VideoTagDao, video_history_dao: VideoHistoryDao, comment_dao: CommentDao, user_dao: UserDao, api_data: list[VideoResult], now: datetime, ) -> None: video_ids: list[int] = [] for datum in api_data: tag_names: list[str] = datum['tags'].split () video = VideoDto (code = datum['contentId'], title = datum['title'], description = datum['description'] or '', uploaded_at = datetime.fromisoformat (datum['startTime'])) video_dao.upsert (video, False) if video.id_ is not None: video_ids.append (video.id_) video_history = VideoHistoryDto (video_id = video.id_, fetched_at = now, views_count = datum['viewCounter']) video_history_dao.insert (video_history) tag_ids: list[int] = [] video_tags = video_tag_dao.fetch_alive_by_video_id (video.id_, False) for vt in video_tags: tag = tag_dao.find (vt.tag_id) if (tag is not None and (tag.name not in tag_names) and (tag.id_ is not None)): tag_ids.append (tag.id_) video_tag_dao.untag_all (video.id_, tag_ids, now) tags: list[TagDto] = [] for tag_name in tag_names: tag = tag_dao.fetch_by_name (tag_name) if tag is None: tag = TagDto (name = tag_name) tag_dao.insert (tag) if video.id_ is not None and tag.id_ is not None: video_tag = video_tag_dao.fetch_alive_by_ids (video.id_, tag.id_, False) if video_tag is None: video_tag = VideoTagDto (video_id = video.id_, tag_id = tag.id_, tagged_at = now) video_tag_dao.insert (video_tag, False) for com in fetch_comments (video.code): user = user_dao.fetch_by_code (com['userId']) if user is None: user = UserDto (code = com['userId']) user_dao.insert (user) if video.id_ is not None and user.id_ is not None: comment = CommentDto (video_id = video.id_, comment_no = com['no'], user_id = user.id_, content = com['body'], posted_at = datetime.fromisoformat (com['postedAt']), nico_count = com['nicoruCount'], vpos_ms = com['vposMs']) comment_dao.upsert (comment, False) alive_video_codes = [d['contentId'] for d in api_data] lost_video_ids: list[int] = [] videos = video_dao.fetch_alive () for video in videos: if video.id_ is not None and video.code not in alive_video_codes: lost_video_ids.append (video.id_) video_dao.delete (lost_video_ids, now) def fetch_comments ( video_code: str, ) -> list[CommentResult]: time.sleep (1.2) headers = { 'X-Frontend-Id': '6', 'X-Frontend-Version': '0' } action_track_id = ( ''.join (random.choice (string.ascii_letters + string.digits) for _ in range (10)) + '_' + str (random.randrange (10 ** 12, 10 ** 13))) url = (f"https://www.nicovideo.jp/api/watch/v3_guest/{ video_code }" + f"?actionTrackId={ action_track_id }") res = requests.post (url, headers = headers, timeout = 60).json () try: nv_comment = res['data']['comment']['nvComment'] except KeyError: return [] if nv_comment is None: return [] headers = { 'X-Frontend-Id': '6', 'X-Frontend-Version': '0', 'Content-Type': 'application/json' } params = { 'params': nv_comment['params'], 'additionals': { }, 'threadKey': nv_comment['threadKey'] } url = nv_comment['server'] + '/v1/threads' res = (requests.post (url, json.dumps (params), headers = headers, timeout = 60) .json ()) try: return res['data']['threads'][1]['comments'] except (IndexError, KeyError): return [] def search_nico_by_tag ( tag: str, ) -> list[VideoResult]: return search_nico_by_tags ([tag]) def search_nico_by_tags ( tags: list[str], ) -> list[VideoResult]: today = datetime.now () url = ('https://snapshot.search.nicovideo.jp' + '/api/v2/snapshot/video/contents/search') result_data: list[VideoResult] = [] to = datetime (2022, 12, 3) while to <= today: time.sleep (1.2) until = to + timedelta (days = 14) query_filter = json.dumps ({ 'type': 'or', 'filters': [ { 'type': 'range', 'field': 'startTime', 'from': '%04d-%02d-%02dT00:00:00+09:00' % (to.year, to.month, to.day), 'to': '%04d-%02d-%02dT23:59:59+09:00' % (until.year, until.month, until.day), 'include_lower': True }] }) params: VideoSearchParam = { 'q': ' OR '.join (tags), 'targets': 'tagsExact', '_sort': '-viewCounter', 'fields': ('contentId,' 'title,' 'tags,' 'description,' 'viewCounter,' 'startTime'), '_limit': 100, 'jsonFilter': query_filter } res = requests.get (url, params = cast (dict[str, int | str], params), timeout = 60).json () try: result_data += res['data'] except KeyError: pass to = until + timedelta (days = 1) return result_data class VideoDao: def __init__ ( self, conn: MySQLConnectionAbstract, ): self.conn = conn def find ( self, video_id: int, with_relation_tables: bool, ) -> VideoDto | None: with self.conn.cursor (dictionary = True) as c: c.execute (""" SELECT id, code, title, description, uploaded_at, deleted_at FROM videos WHERE id = %s ORDER BY id""", (video_id,)) print (c._executed) row = cast (VideoRow | None, c.fetchone ()) if row is None: return None return self._create_dto_from_row (row, with_relation_tables) def fetch_all ( self, with_relation_tables: bool, ) -> list[VideoDto]: with self.conn.cursor (dictionary = True) as c: c.execute (""" SELECT id, code, title, description, uploaded_at, deleted_at FROM videos ORDER BY id""") print (c._executed) videos: list[VideoDto] = [] for row in cast (list[VideoRow], c.fetchall ()): videos.append (self._create_dto_from_row (row, with_relation_tables)) return videos def fetch_alive ( self, ) -> list[VideoDto]: with self.conn.cursor (dictionary = True) as c: c.execute (""" SELECT id, code, title, description, uploaded_at, deleted_at FROM videos WHERE deleted_at IS NULL""") print (c._executed) videos: list[VideoDto] = [] for row in cast (list[VideoRow], c.fetchall ()): videos.append (self._create_dto_from_row (row, False)) return videos def upsert ( self, video: VideoDto, with_relation_tables: bool, ) -> None: deleted_at: datetime | DbNullType | None = video.deleted_at if deleted_at is None: raise TypeError ('未実装') if deleted_at is DbNull: deleted_at = None deleted_at = cast (datetime | None, deleted_at) with self.conn.cursor (dictionary = True) as c: c.execute (""" INSERT INTO videos( code, title, description, uploaded_at, deleted_at) VALUES ( %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE id = LAST_INSERT_ID(id), code = VALUES(code), title = VALUES(title), description = VALUES(description), uploaded_at = VALUES(uploaded_at), deleted_at = VALUES(deleted_at)""", (video.code, video.title, video.description, video.uploaded_at, deleted_at)) print (c._executed) video.id_ = c.lastrowid if with_relation_tables: if video.video_tags is not None: VideoTagDao (self.conn).upsert_all (video.video_tags, False) if video.comments is not None: CommentDao (self.conn).upsert_all (video.comments, False) if video.video_histories is not None: VideoHistoryDao (self.conn).upsert_all (video.video_histories) def upsert_all ( self, videos: list[VideoDto], with_relation_tables: bool, ) -> None: for video in videos: self.upsert (video, with_relation_tables) def delete ( self, video_ids: list[int], at: datetime, ) -> None: if not video_ids: return with self.conn.cursor (dictionary = True) as c: c.execute (""" UPDATE videos SET deleted_at = %%s WHERE id IN (%s)""" % ', '.join (['%s'] * len (video_ids)), (at, *video_ids)) print (c._executed) def _create_dto_from_row ( self, row: VideoRow, with_relation_tables: bool, ) -> VideoDto: video = VideoDto (id_ = row['id'], code = row['code'], title = row['title'], description = row['description'], uploaded_at = row['uploaded_at'], deleted_at = row['deleted_at'] or DbNull) if with_relation_tables and video.id_ is not None: video.video_tags = VideoTagDao (self.conn).fetch_by_video_id (video.id_, False) for i in range (len (video.video_tags)): video.video_tags[i].video = video video.comments = CommentDao (self.conn).fetch_by_video_id (video.id_, False) for i in range (len (video.comments)): video.comments[i].video = video video.video_histories = VideoHistoryDao (self.conn).fetch_by_video_id (video.id_, False) for i in range (len (video.video_histories)): video.video_histories[i].video = video return video @dataclass (slots = True) class VideoDto: code: str title: str description: str uploaded_at: datetime id_: int | None = None deleted_at: datetime | DbNullType = DbNull video_tags: list[VideoTagDto] | None = None comments: list[CommentDto] | None = None video_histories: list[VideoHistoryDto] | None = None class VideoTagDao: def __init__ ( self, conn: MySQLConnectionAbstract, ): self.conn = conn def fetch_by_video_id ( self, video_id: int, with_relation_tables: bool, ) -> list[VideoTagDto]: with self.conn.cursor (dictionary = True) as c: c.execute (""" SELECT id, video_id, tag_id, tagged_at, untagged_at FROM video_tags WHERE video_id = %s ORDER BY id""", (video_id,)) print (c._executed) video_tags: list[VideoTagDto] = [] for row in cast (list[VideoTagRow], c.fetchall ()): video_tags.append (self._create_dto_from_row (row, with_relation_tables)) return video_tags def fetch_alive_by_video_id ( self, video_id: int, with_relation_tables: bool, ) -> list[VideoTagDto]: with self.conn.cursor (dictionary = True) as c: c.execute (""" SELECT id, video_id, tag_id, tagged_at, untagged_at FROM video_tags WHERE video_id = %s AND (untagged_at IS NULL) ORDER BY id""", (video_id,)) print (c._executed) video_tags: list[VideoTagDto] = [] for row in cast (list[VideoTagRow], c.fetchall ()): video_tags.append (self._create_dto_from_row (row, with_relation_tables)) return video_tags def fetch_alive_by_ids ( self, video_id: int, tag_id: int, with_relation_tables: bool, ) -> VideoTagDto | None: with self.conn.cursor (dictionary = True) as c: c.execute (""" SELECT id, video_id, tag_id, tagged_at, untagged_at FROM video_tags WHERE video_id = %s AND tag_id = %s""", (video_id, tag_id)) print (c._executed) row = cast (VideoTagRow, c.fetchone ()) if row is None: return None return self._create_dto_from_row (row, with_relation_tables) def insert ( self, video_tag: VideoTagDto, with_relation_tables: bool, ) -> None: untagged_at: date | DbNullType | None = video_tag.untagged_at if untagged_at is None: raise TypeError ('未実装') if untagged_at is DbNull: untagged_at = None untagged_at = cast (date | None, untagged_at) with self.conn.cursor (dictionary = True) as c: c.execute (""" INSERT INTO video_tags( video_id, tag_id, tagged_at, untagged_at) VALUES ( %s, %s, %s, %s)""", (video_tag.video_id, video_tag.tag_id, video_tag.tagged_at, untagged_at)) print (c._executed) video_tag.id_ = c.lastrowid if with_relation_tables: if video_tag.video is not None: VideoDao (self.conn).upsert (video_tag.video, True) if video_tag.tag is not None: TagDao (self.conn).upsert (video_tag.tag) def upsert ( self, video_tag: VideoTagDto, with_relation_tables: bool, ) -> None: untagged_at: date | DbNullType | None = video_tag.untagged_at if untagged_at is None: raise TypeError ('未実装') if untagged_at is DbNull: untagged_at = None untagged_at = cast (date | None, untagged_at) with self.conn.cursor (dictionary = True) as c: c.execute (""" INSERT INTO video_tags( video_id, tag_id, tagged_at, untagged_at) VALUES ( %s, %s, %s, %s) ON DUPLICATE KEY UPDATE id = LAST_INSERT_ID(id), video_id = VALUES(video_id), tag_id = VALUES(tag_id), tagged_at = VALUES(tagged_at), untagged_at = VALUES(untagged_at)""", (video_tag.video_id, video_tag.tag_id, video_tag.tagged_at, untagged_at)) print (c._executed) video_tag.id_ = c.lastrowid if with_relation_tables: if video_tag.video is not None: VideoDao (self.conn).upsert (video_tag.video, True) if video_tag.tag is not None: TagDao (self.conn).upsert (video_tag.tag) def upsert_all ( self, video_tags: list[VideoTagDto], with_relation_tables: bool, ) -> None: for video_tag in video_tags: self.upsert (video_tag, with_relation_tables) def untag_all ( self, video_id: int, tag_ids: list[int], now: datetime, ) -> None: if not tag_ids: return with self.conn.cursor (dictionary = True) as c: c.execute (""" UPDATE video_tags SET untagged_at = %%s WHERE video_id = %%s AND tag_id IN (%s)""" % ', '.join (['%s'] * len (tag_ids)), (now, video_id, *tag_ids)) print (c._executed) def _create_dto_from_row ( self, row: VideoTagRow, with_relation_tables: bool, ) -> VideoTagDto: video_tag = VideoTagDto (id_ = row['id'], video_id = row['video_id'], tag_id = row['tag_id'], tagged_at = row['tagged_at'], untagged_at = row['untagged_at'] or DbNull) if with_relation_tables: video_tag.video = VideoDao (self.conn).find (video_tag.video_id, True) video_tag.tag = TagDao (self.conn).find (video_tag.tag_id) return video_tag @dataclass (slots = True) class VideoTagDto: video_id: int tag_id: int tagged_at: date id_: int | None = None untagged_at: date | DbNullType = DbNull video: VideoDto | None = None tag: TagDto | None = None class TagDao: def __init__ ( self, conn: MySQLConnectionAbstract, ): self.conn = conn def find ( self, tag_id: int, ) -> TagDto | None: with self.conn.cursor (dictionary = True) as c: c.execute (""" SELECT id, name FROM tags WHERE id = %s""", (tag_id,)) print (c._executed) row = cast (TagRow | None, c.fetchone ()) if row is None: return None return self._create_dto_from_row (row) def fetch_by_name ( self, tag_name: str, ) -> TagDto | None: with self.conn.cursor (dictionary = True) as c: c.execute (""" SELECT id, name FROM tags WHERE name = %s""", (tag_name,)) print (c._executed) row = cast (TagRow | None, c.fetchone ()) if row is None: return None return self._create_dto_from_row (row) def insert ( self, tag: TagDto, ) -> None: with self.conn.cursor (dictionary = True) as c: c.execute (""" INSERT INTO tags(name) VALUES (%s)""", (tag.name,)) print (c._executed) tag.id_ = c.lastrowid def upsert ( self, tag: TagDto, ) -> None: with self.conn.cursor (dictionary = True) as c: c.execute (""" INSERT INTO tags(name) VALUES (%s) ON DUPLICATE KEY UPDATE id = LAST_INSERT_ID(id), name = VALUES(name)""", (tag.name,)) print (c._executed) tag.id_ = c.lastrowid def _create_dto_from_row ( self, row: TagRow, ) -> TagDto: return TagDto (id_ = row['id'], name = row['name']) @dataclass (slots = True) class TagDto: name: str id_: int | None = None class VideoHistoryDao: def __init__ ( self, conn: MySQLConnectionAbstract, ): self.conn = conn def fetch_by_video_id ( self, video_id: int, with_relation_tables: bool, ) -> list[VideoHistoryDto]: with self.conn.cursor (dictionary = True) as c: c.execute (""" SELECT id, video_id, fetched_at, views_count FROM video_histories WHERE video_id = %s""", (video_id,)) print (c._executed) video_histories: list[VideoHistoryDto] = [] for row in cast (list[VideoHistoryRow], c.fetchall ()): video_histories.append (self._create_dto_from_row (row, with_relation_tables)) return video_histories def insert ( self, video_history: VideoHistoryDto, ) -> None: with self.conn.cursor (dictionary = True) as c: c.execute (""" INSERT INTO video_histories( video_id, fetched_at, views_count) VALUES ( %s, %s, %s)""", (video_history.video_id, video_history.fetched_at, video_history.views_count)) print (c._executed) def upsert ( self, video_history: VideoHistoryDto, ) -> None: with self.conn.cursor (dictionary = True) as c: c.execute (""" INSERT INTO video_histories( video_id, fetched_at, views_count) VALUES ( %s, %s, %s) ON DUPLICATE KEY UPDATE id = LAST_INSERT_ID(id), video_id = VALUES(video_id), fetched_at = VALUES(fetched_at), views_count = VALUES(views_count)""", (video_history.video_id, video_history.fetched_at, video_history.views_count)) print (c._executed) def upsert_all ( self, video_histories: list[VideoHistoryDto], ) -> None: for video_history in video_histories: self.upsert (video_history) def _create_dto_from_row ( self, row: VideoHistoryRow, with_relation_tables: bool, ) -> VideoHistoryDto: video_history = VideoHistoryDto (id_ = row['id'], video_id = row['video_id'], fetched_at = row['fetched_at'], views_count = row['views_count']) if with_relation_tables: video_history.video = VideoDao (self.conn).find (video_history.video_id, True) return video_history @dataclass (slots = True) class VideoHistoryDto: video_id: int fetched_at: date views_count: int id_: int | None = None video: VideoDto | None = None class CommentDao: def __init__ ( self, conn: MySQLConnectionAbstract, ): self.conn = conn def fetch_by_video_id ( self, video_id: int, with_relation_tables: bool, ) -> list[CommentDto]: with self.conn.cursor (dictionary = True) as c: c.execute (""" SELECT id, video_id, comment_no, user_id, content, posted_at, nico_count, vpos_ms FROM comments WHERE video_id = %s""", (video_id,)) print (c._executed) comments: list[CommentDto] = [] for row in cast (list[CommentRow], c.fetchall ()): comments.append (self._create_dto_from_row (row, with_relation_tables)) return comments def upsert ( self, comment: CommentDto, with_relation_tables: bool, ) -> None: vpos_ms: int | DbNullType | None = comment.vpos_ms if vpos_ms is None: raise TypeError ('未実装') if vpos_ms is DbNull: vpos_ms = None vpos_ms = cast (int | None, vpos_ms) with self.conn.cursor (dictionary = True) as c: c.execute (""" INSERT INTO comments( video_id, comment_no, user_id, content, posted_at, nico_count, vpos_ms) VALUES ( %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE id = LAST_INSERT_ID(id), video_id = VALUES(video_id), comment_no = VALUES(comment_no), user_id = VALUES(user_id), content = VALUES(content), posted_at = VALUES(posted_at), nico_count = VALUES(nico_count), vpos_ms = VALUES(vpos_ms)""", (comment.video_id, comment.comment_no, comment.user_id, comment.content, comment.posted_at, comment.nico_count, vpos_ms)) print (c._executed) def upsert_all ( self, comments: list[CommentDto], with_relation_tables: bool, ) -> None: for comment in comments: self.upsert (comment, with_relation_tables) def _create_dto_from_row ( self, row: CommentRow, with_relation_tables: bool, ) -> CommentDto: comment = CommentDto (id_ = row['id'], video_id = row['video_id'], comment_no = row['comment_no'], user_id = row['user_id'], content = row['content'], posted_at = row['posted_at'], nico_count = row['nico_count'], vpos_ms = row['vpos_ms'] or DbNull) if with_relation_tables: comment.video = VideoDao (self.conn).find (comment.video_id, True) return comment @dataclass (slots = True) class CommentDto: video_id: int comment_no: int user_id: int content: str posted_at: datetime id_: int | None = None nico_count: int = 0 vpos_ms: int | DbNullType = DbNull video: VideoDto | None = None user: UserDto | None = None class UserDao: def __init__ ( self, conn: MySQLConnectionAbstract, ): self.conn = conn def fetch_by_code ( self, user_code: str ) -> UserDto | None: with self.conn.cursor (dictionary = True) as c: c.execute (""" SELECT id, code FROM users WHERE code = %s""", (user_code,)) print (c._executed) row = cast (UserRow | None, c.fetchone ()) if row is None: return None return self._create_dto_from_row (row) def insert ( self, user: UserDto, ) -> None: with self.conn.cursor (dictionary = True) as c: c.execute (""" INSERT INTO users(code) VALUES (%s)""", (user.code,)) print (c.execute) user.id_ = c.lastrowid def _create_dto_from_row ( self, row: UserRow, ) -> UserDto: return UserDto (id_ = row['id'], code = row['code']) @dataclass (slots = True) class UserDto: code: str id_: int | None = None if __name__ == '__main__': main ()