diff --git a/db/my_eloquent.py b/db/my_eloquent.py index afcf22a..1bdc1a2 100644 --- a/db/my_eloquent.py +++ b/db/my_eloquent.py @@ -19,13 +19,32 @@ class Model (eloquent.Model): self, *args: str, ) -> None: - q = self.query () - for arg in args: - q = q.where (arg, getattr (self, arg)) - row = q.first () + row = self._find_upsert_row (*args) if row is not None: self.id = row.id # pylint: disable = invalid-name # pylint: disable = attribute-defined-outside-init self._Model__exists = True - self.save () + self.save () + return + + try: + self.save () + except Exception: + row = self._find_upsert_row (*args) + if row is None: + raise + self.id = row.id + # pylint: disable = invalid-name + # pylint: disable = attribute-defined-outside-init + self._Model__exists = True + self.save () + + def _find_upsert_row ( + self, + *args: str, + ): + q = self.query () + for arg in args: + q = q.where (arg, getattr (self, arg)) + return q.first () diff --git a/get_comments_by_video_code.py b/get_comments_by_video_code.py index 698e541..045d7af 100644 --- a/get_comments_by_video_code.py +++ b/get_comments_by_video_code.py @@ -8,12 +8,9 @@ from __future__ import annotations import json -import os import sys -from datetime import date, datetime -from typing import TypedDict, cast - -from eloquent import DatabaseManager, Model +from datetime import datetime +from typing import TypedDict from db.config import DB from db.models import Video diff --git a/get_kiriban_list.py b/get_kiriban_list.py index da50b1b..81cb805 100644 --- a/get_kiriban_list.py +++ b/get_kiriban_list.py @@ -8,12 +8,9 @@ from __future__ import annotations import json -import os import sys from datetime import date, datetime -from typing import TypedDict, cast - -from eloquent import DatabaseManager, Model +from typing import cast from db.config import DB from db.models import Video, VideoHistory @@ -25,14 +22,15 @@ def main ( views_counts: list[int], base_date: date, ) -> None: - if not base_date: - base_date = datetime.now ().date () - kiriban_list: list[tuple[int, str, str]] = [] - latest_fetched_at = cast (date, (VideoHistory - .where ('fetched_at', '<=', base_date) - .max ('fetched_at'))) + latest_fetched_at = cast (date | None, + (VideoHistory + .where ('fetched_at', '<=', base_date) + .max ('fetched_at'))) + if latest_fetched_at is None: + print ('[]') + return for views_count in views_counts: targets = { vh.video.code for vh in ( @@ -63,5 +61,5 @@ def main ( if __name__ == '__main__': - main (map (int, sys.argv[2:]), + main (list (map (int, sys.argv[2:])), datetime.strptime (sys.argv[1], '%Y-%m-%d').date ()) diff --git a/get_videos.py b/get_videos.py index f4dc67a..271c52d 100644 --- a/get_videos.py +++ b/get_videos.py @@ -8,12 +8,9 @@ from __future__ import annotations import json -import os from datetime import date, datetime from typing import TypedDict -from eloquent import DatabaseManager, Model - from db.config import DB from db.models import Video @@ -36,20 +33,10 @@ def main ( for video_tag in row.video_tags: if video_tag.untagged_at is None: video['tags'].append (video_tag.tag.name) - videos.append(video) + videos.append (video) print (json.dumps (videos, default = str)) - -class DbConfig (TypedDict): - driver: str - host: str - database: str - user: str - password: str - prefix: str - - class VideoDict (TypedDict): id: int code: str diff --git a/update_db.py b/update_db.py index d4bd1b8..f73bd81 100644 --- a/update_db.py +++ b/update_db.py @@ -8,17 +8,16 @@ from __future__ import annotations import json -import os +import logging import random import string import time import unicodedata -from datetime import datetime, timedelta +from datetime import date, datetime, timedelta from typing import Any, TypedDict, cast import jaconv import requests -from eloquent import DatabaseManager, Model from db.config import DB from db.models import (Comment, @@ -29,39 +28,94 @@ from db.models import (Comment, VideoHistory, VideoTag) +logger = logging.getLogger (__name__) +logging.basicConfig ( + level = logging.INFO, + format = '%(asctime)s %(levelname)s %(message)s') + + +class SearchNicoResult (TypedDict): + videos: list['VideoResult'] + is_complete: bool + + +class UpdateContext (TypedDict): + api_data: list['VideoResult'] + comments_by_video_code: dict[str, list['CommentResult']] + deletable: bool + + +class VideoSearchParam (TypedDict): + q: str + targets: str + _sort: str + fields: str + _limit: int + jsonFilter: str + + +class VideoResult (TypedDict): + contentId: str + userId: int | None + title: str + tags: str + description: str | None + viewCounter: int + startTime: str + + +class CommentResult (TypedDict): + no: int + userId: str + body: str + postedAt: str + nicoruCount: int + vposMs: int + def main ( ) -> None: now = datetime.now () + today = now.date () - api_data = search_nico_by_tags (['伊地知ニジカ', - 'ぼざろクリーチャーシリーズ', - 'ぼざろクリーチャーシリーズ外伝']) + search_result = search_nico_by_tags (['伊地知ニジカ', + 'ぼざろクリーチャーシリーズ', + 'ぼざろクリーチャーシリーズ外伝']) + comments_by_video_code = fetch_comments_by_video_code (search_result['videos']) - DB.begin_transaction () + context: UpdateContext = { 'api_data': search_result['videos'], + 'comments_by_video_code': comments_by_video_code, + 'deletable': search_result['is_complete'] } + + connection = DB.connection () + connection.begin_transaction () try: - update_tables (api_data, now) - DB.commit () + update_tables (context, now, today) + connection.commit () except Exception: - DB.rollback () + connection.rollback () raise def update_tables ( - api_data: list[VideoResult], - now: datetime, + context: UpdateContext, + now: datetime, + today: date, ) -> None: alive_video_codes: list[str] = [] - for datum in api_data: - tag_names: list[str] = datum['tags'].split () + for datum in context['api_data']: + tag_names = datum['tags'].split () + normalised_tag_names = {normalise (tag_name) for tag_name in tag_names} + user: User | None = None - if datum['userId']: - user = User.where('code', str (datum['userId'])).first () + if datum['userId'] is not None: + user = User.where ('code', str (datum['userId'])).first () if user is None: user = User () user.code = str (datum['userId']) user.save () + video = Video () video.code = datum['contentId'] video.user_id = user.id if user else None @@ -71,62 +125,74 @@ def update_tables ( video.deleted_at = None video.upsert () alive_video_codes.append (video.code) + video_history = VideoHistory () video_history.video_id = video.id - video_history.fetched_at = now + video_history.fetched_at = today video_history.views_count = datum['viewCounter'] video_history.upsert () + video_tags = [video_tag for video_tag in video.video_tags if video_tag.untagged_at is None] - tag: Tag | None - video_tag: VideoTag | None for video_tag in video_tags: tag = video_tag.tag - if (tag is not None - and (normalise (tag.name) not in map (normalise, tag_names))): - video_tag.untagged_at = now - video_tag.save () + if tag is None: + continue + if normalise (tag.name) in normalised_tag_names: + continue + video_tag.untagged_at = today + video_tag.save () + for tag_name in tag_names: tag = Tag.where ('name', tag_name).first () if tag is None: tag = Tag () tag.name = tag_name tag.save () + video_tag = (VideoTag.where ('video_id', video.id) .where ('tag_id', tag.id) - .where_null ('untagged_at') .first ()) if video_tag is None: video_tag = VideoTag () video_tag.video_id = video.id video_tag.tag_id = tag.id - video_tag.tagged_at = now - video_tag.untagged_at = None - video_tag.save () - for com in fetch_comments (video.code): + + video_tag.tagged_at = video_tag.tagged_at or today + video_tag.untagged_at = None + video_tag.upsert () + + for com in context['comments_by_video_code'].get (video.code, []): user = User.where ('code', com['userId']).first () if user is None: user = User () user.code = com['userId'] user.save () + comment = Comment () comment.video_id = video.id comment.comment_no = com['no'] comment.user_id = user.id comment.content = com['body'] comment.posted_at = datetime.fromisoformat (com['postedAt']) - comment.nico_count = com['nicoruCount'] - comment.vpos_ms = com['vposMs'] + comment.nico_count = com.get ('nicoruCount', 0) + comment.vpos_ms = com.get ('vposMs', 0) comment.upsert () - # 削除動画 + if not context['deletable']: + logger.warning ('skip soft-delete because the latest fetch was incomplete') + return + + if not alive_video_codes: + logger.warning ('skip soft-delete because no alive videos were fetched') + return + videos = (Video.where_not_in ('code', alive_video_codes) .where_null ('deleted_at') .get ()) for video in videos: - if video.code not in alive_video_codes: - video.deleted_at = now - video.save () + video.deleted_at = now + video.save () def fetch_video_data ( @@ -143,52 +209,108 @@ def fetch_video_data ( + '_' + str (random.randrange (10 ** 12, 10 ** 13))) - url = (f"https://www.nicovideo.jp/api/watch/v3_guest/{ video_code }" - + f"?actionTrackId={ action_track_id }") + url = (f'https://www.nicovideo.jp/api/watch/v3_guest/{ video_code }' + + f'?actionTrackId={ action_track_id }') return requests.post (url, headers = headers, timeout = 60).json () +def fetch_comments_by_video_code ( + videos: list[VideoResult], +) -> dict[str, list[CommentResult]]: + comments_by_video_code: dict[str, list[CommentResult]] = {} + + for video in videos: + video_code = video['contentId'] + try: + comments_by_video_code[video_code] = fetch_comments (video_code) + except (KeyError, + TypeError, + ValueError, + requests.RequestException) as exc: + logger.warning ('failed to fetch comments: %s (%s)', video_code, exc) + comments_by_video_code[video_code] = [] + + return comments_by_video_code + + def fetch_comments ( video_code: str, ) -> list[CommentResult]: - try: - nv_comment = fetch_video_data (video_code)['data']['comment']['nvComment'] - except KeyError: - return [] + video_data = fetch_video_data (video_code) + nv_comment = (video_data.get ('data', {}) + .get ('comment', {}) + .get ('nvComment')) if nv_comment is None: return [] - headers = { 'X-Frontend-Id': '6', - 'X-Frontend-Version': '0', - 'Content-Type': 'application/json' } + headers = { 'X-Frontend-Id': '6', + 'X-Frontend-Version': '0', + 'Content-Type': 'application/json' } - params = { 'params': nv_comment['params'], - 'additionals': { }, - 'threadKey': nv_comment['threadKey'] } + params = { 'params': nv_comment['params'], + 'additionals': {}, + 'threadKey': nv_comment['threadKey'] } url = nv_comment['server'] + '/v1/threads' - res = (requests.post (url, json.dumps (params), - headers = headers, - timeout = 60) - .json ()) + response = requests.post (url, + json = params, + headers = headers, + timeout = 60) + response.raise_for_status () + res = response.json () - try: - return res['data']['threads'][1]['comments'] - except (IndexError, KeyError): + return select_comments_from_threads (res) + + +def select_comments_from_threads ( + response: dict[str, Any], +) -> list[CommentResult]: + threads = response.get ('data', {}).get ('threads', []) + if not isinstance (threads, list): return [] + main_comments: list[CommentResult] = [] + fallback_comments: list[CommentResult] = [] + + for thread in threads: + comments = thread.get ('comments') if isinstance (thread, dict) else None + if not isinstance (comments, list): + continue + + casted_comments = cast (list[CommentResult], comments) + if len (casted_comments) > len (fallback_comments): + fallback_comments = casted_comments + + fork = str (thread.get ('fork', '')).lower () + label = str (thread.get ('label', '')).lower () + thread_id = str (thread.get ('id', '')).lower () + if fork == 'main' or 'main' in label or 'main' in thread_id: + main_comments = casted_comments + + selected_comments = main_comments or fallback_comments + deduped_comments: dict[int, CommentResult] = {} + for comment in selected_comments: + comment_no = comment.get ('no') + if not isinstance (comment_no, int): + continue + deduped_comments[comment_no] = comment + + return [deduped_comments[comment_no] + for comment_no in sorted (deduped_comments)] + def search_nico_by_tags ( tags: list[str], -) -> list[VideoResult]: +) -> SearchNicoResult: today = datetime.now () url = ('https://snapshot.search.nicovideo.jp' - + '/api/v2/snapshot/video/contents/search') + + '/api/v2/snapshot/video/contents/search') - result_data: list[VideoResult] = [] + result_by_video_code: dict[str, VideoResult] = {} + is_complete = True to = datetime (2022, 12, 3) while to <= today: time.sleep (1.2) @@ -215,19 +337,29 @@ def search_nico_by_tags ( 'startTime'), '_limit': 100, 'jsonFilter': query_filter } - res = requests.get (url, params = cast (dict[str, int | str], params), timeout = 60).json () try: - result_data += res['data'] - except KeyError: - pass + response = requests.get ( + url, + params = cast (dict[str, int | str], params), + timeout = 60) + response.raise_for_status () + res = response.json () + for datum in cast (list[VideoResult], res.get ('data', [])): + result_by_video_code[datum['contentId']] = datum + except (ValueError, requests.RequestException) as exc: + logger.warning ('snapshot fetch failed: %s - %s (%s)', + to.date (), + until.date (), + exc) + is_complete = False to = until + timedelta (days = 1) for video in TrackedVideo.get (): - if video.code in map (lambda v: v['contentId'], result_data): + if video.code in result_by_video_code: continue try: video_data = fetch_video_data (video.code)['data'] - result_data.append ({ + result_by_video_code[video.code] = { 'contentId': video.code, 'userId': video_data['video']['userId'], 'title': video_data['video']['title'], @@ -235,52 +367,23 @@ def search_nico_by_tags ( video_data['tag']['items'])), 'description': video_data['video']['description'], 'viewCounter': video_data['video']['count']['view'], - 'startTime': video_data['video']['registeredAt'] }) - except Exception: - pass - - return result_data - - -class VideoSearchParam (TypedDict): - q: str - targets: str - _sort: str - fields: str - _limit: int - jsonFilter: str - + 'startTime': video_data['video']['registeredAt'] } + except (KeyError, + TypeError, + ValueError, + requests.RequestException) as exc: + logger.warning ('tracked video fetch failed: %s (%s)', video.code, exc) + is_complete = False -class VideoResult (TypedDict): - contentId: str - userId: int | None - title: str - tags: str - description: str | None - viewCounter: int - startTime: str - - -class CommentResult (TypedDict): - id: str - no: int - vposMs: int - body: str - commands: list[str] - userId: str - isPremium: bool - score: int - postedAt: str - nicoruCount: int - nicoruId: Any - source: str - isMyPost: bool + return { 'videos': list (result_by_video_code.values ()), + 'is_complete': is_complete } def normalise ( - s: str, + text: str, ) -> str: - return jaconv.hira2kata (unicodedata.normalize ('NFKC', s)).lower () + return jaconv.hira2kata ( + unicodedata.normalize ('NFKC', text.strip ())).lower () if __name__ == '__main__':