# pylint: disable = missing-class-docstring # pylint: disable = missing-function-docstring """ 日次で実行し,ぼざクリ DB を最新に更新する. """ from __future__ import annotations import json import logging import random import string import time import unicodedata from datetime import date, datetime, timedelta from typing import Any, TypedDict, cast import jaconv import requests from db.config import DB from db.models import (Comment, Tag, TrackedVideo, User, Video, VideoHistory, VideoTag) logger = logging.getLogger (__name__) logging.basicConfig ( level = logging.INFO, format = '%(asctime)s %(levelname)s %(message)s') class SearchNicoResult (TypedDict): videos: list['VideoResult'] is_complete: bool class UpdateContext (TypedDict): api_data: list['VideoResult'] comments_by_video_code: dict[str, list['CommentResult']] deletable: bool class VideoSearchParam (TypedDict): q: str targets: str _sort: str fields: str _limit: int jsonFilter: str class VideoResult (TypedDict): contentId: str userId: int | None title: str tags: str description: str | None viewCounter: int startTime: str class CommentResult (TypedDict): no: int userId: str body: str postedAt: str nicoruCount: int vposMs: int def main ( ) -> None: now = datetime.now () today = now.date () search_result = search_nico_by_tags (['伊地知ニジカ', 'ぼざろクリーチャーシリーズ', 'ぼざろクリーチャーシリーズ外伝']) comments_by_video_code = fetch_comments_by_video_code (search_result['videos']) context: UpdateContext = { 'api_data': search_result['videos'], 'comments_by_video_code': comments_by_video_code, 'deletable': search_result['is_complete'] } connection = DB.connection () connection.begin_transaction () try: update_tables (context, now, today) connection.commit () except Exception: connection.rollback () raise def update_tables ( context: UpdateContext, now: datetime, today: date, ) -> None: alive_video_codes: list[str] = [] for datum in context['api_data']: tag_names = datum['tags'].split () normalised_tag_names = {normalise (tag_name) for tag_name in tag_names} user: User | None = None if datum['userId'] is not None: user = User.where ('code', str (datum['userId'])).first () if user is None: user = User () user.code = str (datum['userId']) user.save () video = Video () video.code = datum['contentId'] video.user_id = user.id if user else None video.title = datum['title'] video.description = datum['description'] or '' video.uploaded_at = datetime.fromisoformat (datum['startTime']) video.deleted_at = None video.upsert () alive_video_codes.append (video.code) video_history = VideoHistory () video_history.video_id = video.id video_history.fetched_at = today video_history.views_count = datum['viewCounter'] video_history.upsert () video_tags = [video_tag for video_tag in video.video_tags if video_tag.untagged_at is None] for video_tag in video_tags: tag = video_tag.tag if tag is None: continue if normalise (tag.name) in normalised_tag_names: continue video_tag.untagged_at = today video_tag.save () for tag_name in tag_names: tag = Tag.where ('name', tag_name).first () if tag is None: tag = Tag () tag.name = tag_name tag.save () video_tag = (VideoTag.where ('video_id', video.id) .where ('tag_id', tag.id) .first ()) if video_tag is None: video_tag = VideoTag () video_tag.video_id = video.id video_tag.tag_id = tag.id video_tag.tagged_at = video_tag.tagged_at or today video_tag.untagged_at = None video_tag.upsert () for com in context['comments_by_video_code'].get (video.code, []): user = User.where ('code', com['userId']).first () if user is None: user = User () user.code = com['userId'] user.save () comment = Comment () comment.video_id = video.id comment.comment_no = com['no'] comment.user_id = user.id comment.content = com['body'] comment.posted_at = datetime.fromisoformat (com['postedAt']) comment.nico_count = com.get ('nicoruCount', 0) comment.vpos_ms = com.get ('vposMs', 0) comment.upsert () if not context['deletable']: logger.warning ('skip soft-delete because the latest fetch was incomplete') return if not alive_video_codes: logger.warning ('skip soft-delete because no alive videos were fetched') return videos = (Video.where_not_in ('code', alive_video_codes) .where_null ('deleted_at') .get ()) for video in videos: video.deleted_at = now video.save () def fetch_video_data ( video_code: str, ) -> dict[str, Any]: time.sleep (1.2) headers = { 'X-Frontend-Id': '6', 'X-Frontend-Version': '0' } action_track_id = ( ''.join (random.choice (string.ascii_letters + string.digits) for _ in range (10)) + '_' + str (random.randrange (10 ** 12, 10 ** 13))) url = (f'https://www.nicovideo.jp/api/watch/v3_guest/{ video_code }' + f'?actionTrackId={ action_track_id }') return requests.post (url, headers = headers, timeout = 60).json () def fetch_comments_by_video_code ( videos: list[VideoResult], ) -> dict[str, list[CommentResult]]: comments_by_video_code: dict[str, list[CommentResult]] = {} for video in videos: video_code = video['contentId'] try: comments_by_video_code[video_code] = fetch_comments (video_code) except (KeyError, TypeError, ValueError, requests.RequestException) as exc: logger.warning ('failed to fetch comments: %s (%s)', video_code, exc) comments_by_video_code[video_code] = [] return comments_by_video_code def fetch_comments ( video_code: str, ) -> list[CommentResult]: video_data = fetch_video_data (video_code) nv_comment = (video_data.get ('data', {}) .get ('comment', {}) .get ('nvComment')) if nv_comment is None: return [] headers = { 'X-Frontend-Id': '6', 'X-Frontend-Version': '0', 'Content-Type': 'application/json' } params = { 'params': nv_comment['params'], 'additionals': {}, 'threadKey': nv_comment['threadKey'] } url = nv_comment['server'] + '/v1/threads' response = requests.post (url, json = params, headers = headers, timeout = 60) response.raise_for_status () res = response.json () return select_comments_from_threads (res) def select_comments_from_threads ( response: dict[str, Any], ) -> list[CommentResult]: threads = response.get ('data', {}).get ('threads', []) if not isinstance (threads, list): return [] main_comments: list[CommentResult] = [] fallback_comments: list[CommentResult] = [] for thread in threads: comments = thread.get ('comments') if isinstance (thread, dict) else None if not isinstance (comments, list): continue casted_comments = cast (list[CommentResult], comments) if len (casted_comments) > len (fallback_comments): fallback_comments = casted_comments fork = str (thread.get ('fork', '')).lower () label = str (thread.get ('label', '')).lower () thread_id = str (thread.get ('id', '')).lower () if fork == 'main' or 'main' in label or 'main' in thread_id: main_comments = casted_comments selected_comments = main_comments or fallback_comments deduped_comments: dict[int, CommentResult] = {} for comment in selected_comments: comment_no = comment.get ('no') if not isinstance (comment_no, int): continue deduped_comments[comment_no] = comment return [deduped_comments[comment_no] for comment_no in sorted (deduped_comments)] def search_nico_by_tags ( tags: list[str], ) -> SearchNicoResult: today = datetime.now () url = ('https://snapshot.search.nicovideo.jp' + '/api/v2/snapshot/video/contents/search') result_by_video_code: dict[str, VideoResult] = {} is_complete = True to = datetime (2022, 12, 3) while to <= today: time.sleep (1.2) until = to + timedelta (days = 14) # pylint: disable = consider-using-f-string query_filter = json.dumps ({ 'type': 'or', 'filters': [ { 'type': 'range', 'field': 'startTime', 'from': ('%04d-%02d-%02dT00:00:00+09:00' % (to.year, to.month, to.day)), 'to': ('%04d-%02d-%02dT23:59:59+09:00' % (until.year, until.month, until.day)), 'include_lower': True }] }) params: VideoSearchParam = { 'q': ' OR '.join (tags), 'targets': 'tagsExact', '_sort': '-viewCounter', 'fields': ('contentId,' 'userId,' 'title,' 'tags,' 'description,' 'viewCounter,' 'startTime'), '_limit': 100, 'jsonFilter': query_filter } try: response = requests.get ( url, params = cast (dict[str, int | str], params), timeout = 60) response.raise_for_status () res = response.json () for datum in cast (list[VideoResult], res.get ('data', [])): result_by_video_code[datum['contentId']] = datum except (ValueError, requests.RequestException) as exc: logger.warning ('snapshot fetch failed: %s - %s (%s)', to.date (), until.date (), exc) is_complete = False to = until + timedelta (days = 1) for video in TrackedVideo.get (): if video.code in result_by_video_code: continue try: video_data = fetch_video_data (video.code)['data'] result_by_video_code[video.code] = { 'contentId': video.code, 'userId': video_data['video']['userId'], 'title': video_data['video']['title'], 'tags': ' '.join (map (lambda t: t['name'], video_data['tag']['items'])), 'description': video_data['video']['description'], 'viewCounter': video_data['video']['count']['view'], 'startTime': video_data['video']['registeredAt'] } except (KeyError, TypeError, ValueError, requests.RequestException) as exc: logger.warning ('tracked video fetch failed: %s (%s)', video.code, exc) is_complete = False return { 'videos': list (result_by_video_code.values ()), 'is_complete': is_complete } def normalise ( text: str, ) -> str: return jaconv.hira2kata ( unicodedata.normalize ('NFKC', text.strip ())).lower () if __name__ == '__main__': main ()