# pylint: disable = missing-class-docstring # pylint: disable = missing-function-docstring """ 日次で実行し,ぼざクリ DB を最新に更新する. """ from __future__ import annotations import json import os import random import string import time import unicodedata from datetime import date, datetime, timedelta from typing import Any, TypedDict, cast import requests from eloquent import DatabaseManager, Model config: dict[str, DbConfig] = { 'mysql': { 'driver': 'mysql', 'host': 'localhost', 'database': 'nizika_nico', 'user': os.environ['MYSQL_USER'], 'password': os.environ['MYSQL_PASS'], 'prefix': '' } } db = DatabaseManager (config) Model.set_connection_resolver (db) def main ( ) -> None: now = datetime.now () api_data = search_nico_by_tags (['伊地知ニジカ', 'ぼざろクリーチャーシリーズ']) update_tables (api_data, now) def update_tables ( api_data: list[VideoResult], now: datetime, ) -> None: alive_video_codes: list[str] = [] for datum in api_data: tag_names: list[str] = datum['tags'].split () video = Video () video.code = datum['contentId'] video.title = datum['title'] video.description = datum['description'] or '' video.uploaded_at = datetime.fromisoformat (datum['startTime']) video.deleted_at = None video.upsert () alive_video_codes.append (video.code) video_history = VideoHistory () video_history.video_id = video.id video_history.fetched_at = now video_history.views_count = datum['viewCounter'] video_history.save () video_tags = video.video_tags.where_not_null ('untagged_at').get () tag: Tag | None video_tag: VideoTag | None for video_tag in video_tags: tag = video_tag.tag if (tag is not None and (normalise (tag.name) not in map (normalise, tag_names))): video_tag.untagged_at = now video_tag.save () for tag_name in tag_names: tag = Tag.where ('name', tag_name).first () if tag is None: tag = Tag () tag.name = tag_name tag.save () video_tag = (VideoTag.where ('video_id', video.id) .where ('tag_id', tag.id) .where_null ('untagged_at') .first ()) if video_tag is None: video_tag = VideoTag () video_tag.video_id = video.id video_tag.tag_id = tag.id video_tag.tagged_at = now video_tag.untagged_at = None video_tag.save () for com in fetch_comments (video.code): user = User.where ('code', com['userId']).first () if user is None: user = User () user.code = com['userId'] user.save () comment = Comment () comment.video_id = video.id comment.comment_no = com['no'] comment.user_id = user.id comment.content = com['body'] comment.posted_at = datetime.fromisoformat (com['postedAt']) comment.nico_count = com['nicoruCount'] comment.vpos_ms = com['vposMs'] comment.upsert () # 削除動画 videos = (Video.where_not_in ('code', alive_video_codes) .where_null ('deleted_at') .get ()) for video in videos: if video.code not in alive_video_codes: video.deleted_at = now video.save () def fetch_comments ( video_code: str, ) -> list[CommentResult]: time.sleep (1.2) headers = { 'X-Frontend-Id': '6', 'X-Frontend-Version': '0' } action_track_id = ( ''.join (random.choice (string.ascii_letters + string.digits) for _ in range (10)) + '_' + str (random.randrange (10 ** 12, 10 ** 13))) url = (f"https://www.nicovideo.jp/api/watch/v3_guest/{ video_code }" + f"?actionTrackId={ action_track_id }") res = requests.post (url, headers = headers, timeout = 60).json () try: nv_comment = res['data']['comment']['nvComment'] except KeyError: return [] if nv_comment is None: return [] headers = { 'X-Frontend-Id': '6', 'X-Frontend-Version': '0', 'Content-Type': 'application/json' } params = { 'params': nv_comment['params'], 'additionals': { }, 'threadKey': nv_comment['threadKey'] } url = nv_comment['server'] + '/v1/threads' res = (requests.post (url, json.dumps (params), headers = headers, timeout = 60) .json ()) try: return res['data']['threads'][1]['comments'] except (IndexError, KeyError): return [] def search_nico_by_tag ( tag: str, ) -> list[VideoResult]: return search_nico_by_tags ([tag]) def search_nico_by_tags ( tags: list[str], ) -> list[VideoResult]: today = datetime.now () url = ('https://snapshot.search.nicovideo.jp' + '/api/v2/snapshot/video/contents/search') result_data: list[VideoResult] = [] to = datetime (2022, 12, 3) while to <= today: time.sleep (1.2) until = to + timedelta (days = 14) # pylint: disable = consider-using-f-string query_filter = json.dumps ({ 'type': 'or', 'filters': [ { 'type': 'range', 'field': 'startTime', 'from': ('%04d-%02d-%02dT00:00:00+09:00' % (to.year, to.month, to.day)), 'to': ('%04d-%02d-%02dT23:59:59+09:00' % (until.year, until.month, until.day)), 'include_lower': True }] }) params: VideoSearchParam = { 'q': ' OR '.join (tags), 'targets': 'tagsExact', '_sort': '-viewCounter', 'fields': ('contentId,' 'title,' 'tags,' 'description,' 'viewCounter,' 'startTime'), '_limit': 100, 'jsonFilter': query_filter } res = requests.get (url, params = cast (dict[str, int | str], params), timeout = 60).json () try: result_data += res['data'] except KeyError: pass to = until + timedelta (days = 1) return result_data class Comment (Model): # pylint: disable = too-many-instance-attributes id: int video_id: int comment_no: int user_id: int content: str posted_at: datetime nico_count: int vpos_ms: int __timestamps__ = False @property def video ( self, ) -> Video: return self.belongs_to (Video) @property def user ( self, ) -> User: return self.belongs_to (User) def upsert ( self, ) -> None: row = (Comment.where ('video_id', self.video_id) .where ('comment_no', self.comment_no) .first ()) if row is not None: self.id = row.id self.__exists = True # pylint: disable = unused-private-member self.save () class Tag (Model): id: int name: str __timestamps__ = False @property def video_tags ( self, ) -> VideoTag: return self.has_many (VideoTag) class User (Model): id: int code: str __timestamps__ = False @property def comments ( self, ) -> Comment: return self.has_many (Comment) class Video (Model): id: int code: str title: str description: str uploaded_at: datetime deleted_at: datetime | None __timestamps__ = False @property def video_histories ( self, ) -> VideoHistory: return self.has_many (VideoHistory) @property def video_tags ( self, ) -> VideoTag: return self.has_many (VideoTag) @property def comments ( self, ) -> Comment: return self.has_many (Comment) def upsert ( self, ) -> None: row = Video.where ('code', self.code).first () if row is not None: self.id = row.id self.__exists = True # pylint: disable = unused-private-member self.save () class VideoHistory (Model): id: int video_id: int fetched_at: date views_count: int __timestamps__ = False @property def video ( self, ) -> Video: return self.belongs_to (Video) def upsert ( self, ) -> None: row = (VideoHistory.where ('video_id', self.video_id) .where ('fetched_at', self.fetched_at) .first ()) if row is not None: self.id = row.id self.__exists = True # pylint: disable = unused-private-member self.save () class VideoTag (Model): id: int video_id: int tag_id: int tagged_at: date untagged_at: date | None __timestamps__ = False @property def video ( self, ) -> Video: return self.belongs_to (Video) @property def tag ( self, ) -> Tag: return self.belongs_to (Tag) def upsert ( self, ) -> None: row = (VideoTag.where ('video_id', self.video_id) .where ('tag_id', self.tag_id) .first ()) if row is not None: self.id = row.id self.__exists = True # pylint: disable = unused-private-member self.save () class DbConfig (TypedDict): driver: str host: str database: str user: str password: str prefix: str class VideoSearchParam (TypedDict): q: str targets: str _sort: str fields: str _limit: int jsonFilter: str class VideoResult (TypedDict): contentId: str title: str tags: str description: str | None viewCounter: int startTime: str class CommentResult (TypedDict): id: str no: int vposMs: int body: str commands: list[str] userId: str isPremium: bool score: int postedAt: str nicoruCount: int nicoruId: Any source: str isMyPost: bool def normalise ( s: str, ) -> str: return unicodedata.normalize ('NFKC', s).lower () if __name__ == '__main__': main ()