| 
							- # pylint: disable = missing-class-docstring
 - # pylint: disable = missing-function-docstring
 - 
 - """
 - 日次で実行し,ぼざクリ DB を最新に更新する.
 - """
 - 
 - from __future__ import annotations
 - 
 - import json
 - import os
 - import random
 - import string
 - import time
 - import unicodedata
 - from datetime import datetime, timedelta
 - from typing import Any, TypedDict, cast
 - 
 - import jaconv
 - import requests
 - from eloquent import DatabaseManager, Model
 - 
 - from db.config import DB
 - from db.models import Comment, Tag, User, Video, VideoHistory, VideoTag
 - 
 - 
 - def main (
 - ) -> None:
 -     now = datetime.now ()
 - 
 -     api_data = search_nico_by_tags (['伊地知ニジカ',
 -                                      'ぼざろクリーチャーシリーズ',
 -                                      'ぼざろクリーチャーシリーズ外伝'])
 - 
 -     DB.begin_transaction ()
 -     try:
 -         update_tables (api_data, now)
 -         DB.commit ()
 -     except Exception:
 -         DB.rollback ()
 -         raise
 - 
 - 
 - def update_tables (
 -         api_data:   list[VideoResult],
 -         now:        datetime,
 - ) -> None:
 -     alive_video_codes: list[str] = []
 - 
 -     for datum in api_data:
 -         tag_names: list[str] = datum['tags'].split ()
 -         video = Video ()
 -         video.code = datum['contentId']
 -         video.title = datum['title']
 -         video.description = datum['description'] or ''
 -         video.uploaded_at = datetime.fromisoformat (datum['startTime'])
 -         video.deleted_at = None
 -         video.upsert ()
 -         alive_video_codes.append (video.code)
 -         video_history = VideoHistory ()
 -         video_history.video_id = video.id
 -         video_history.fetched_at = now
 -         video_history.views_count = datum['viewCounter']
 -         video_history.upsert ()
 -         video_tags = [video_tag for video_tag in video.video_tags
 -                                 if video_tag.untagged_at is None]
 -         tag: Tag | None
 -         video_tag: VideoTag | None
 -         for video_tag in video_tags:
 -             tag = video_tag.tag
 -             if (tag is not None
 -                     and (normalise (tag.name) not in map (normalise, tag_names))):
 -                 video_tag.untagged_at = now
 -                 video_tag.save ()
 -         for tag_name in tag_names:
 -             tag = Tag.where ('name', tag_name).first ()
 -             if tag is None:
 -                 tag = Tag ()
 -                 tag.name = tag_name
 -                 tag.save ()
 -             video_tag = (VideoTag.where ('video_id', video.id)
 -                                  .where ('tag_id', tag.id)
 -                                  .where_null ('untagged_at')
 -                                  .first ())
 -             if video_tag is None:
 -                 video_tag = VideoTag ()
 -                 video_tag.video_id = video.id
 -                 video_tag.tag_id = tag.id
 -                 video_tag.tagged_at = now
 -                 video_tag.untagged_at = None
 -                 video_tag.save ()
 -         for com in fetch_comments (video.code):
 -             user = User.where ('code', com['userId']).first ()
 -             if user is None:
 -                 user = User ()
 -                 user.code = com['userId']
 -                 user.save ()
 -             comment = Comment ()
 -             comment.video_id = video.id
 -             comment.comment_no = com['no']
 -             comment.user_id = user.id
 -             comment.content = com['body']
 -             comment.posted_at = datetime.fromisoformat (com['postedAt'])
 -             comment.nico_count = com['nicoruCount']
 -             comment.vpos_ms = com['vposMs']
 -             comment.upsert ()
 - 
 -     # 削除動画
 -     videos = (Video.where_not_in ('code', alive_video_codes)
 -                    .where_null ('deleted_at')
 -                    .get ())
 -     for video in videos:
 -         if video.code not in alive_video_codes:
 -             video.deleted_at = now
 -             video.save ()
 - 
 - 
 - def fetch_comments (
 -         video_code: str,
 - ) -> list[CommentResult]:
 -     time.sleep (1.2)
 - 
 -     headers = { 'X-Frontend-Id':      '6',
 -                 'X-Frontend-Version': '0' }
 - 
 -     action_track_id = (
 -             ''.join (random.choice (string.ascii_letters + string.digits)
 -                      for _ in range (10))
 -             + '_'
 -             + str (random.randrange (10 ** 12, 10 ** 13)))
 - 
 -     url = (f"https://www.nicovideo.jp/api/watch/v3_guest/{ video_code }"
 -             + f"?actionTrackId={ action_track_id }")
 - 
 -     res = requests.post (url, headers = headers, timeout = 60).json ()
 - 
 -     try:
 -         nv_comment = res['data']['comment']['nvComment']
 -     except KeyError:
 -         return []
 -     if nv_comment is None:
 -         return []
 - 
 -     headers = { 'X-Frontend-Id':        '6',
 -                 'X-Frontend-Version':   '0',
 -                 'Content-Type':         'application/json' }
 - 
 -     params = { 'params':        nv_comment['params'],
 -                'additionals':   { },
 -                'threadKey':     nv_comment['threadKey'] }
 - 
 -     url = nv_comment['server'] + '/v1/threads'
 - 
 -     res = (requests.post (url, json.dumps (params),
 -                           headers = headers,
 -                           timeout = 60)
 -             .json ())
 - 
 -     try:
 -         return res['data']['threads'][1]['comments']
 -     except (IndexError, KeyError):
 -         return []
 - 
 - 
 - def search_nico_by_tag (
 -         tag:    str,
 - ) -> list[VideoResult]:
 -     return search_nico_by_tags ([tag])
 - 
 - 
 - def search_nico_by_tags (
 -         tags:   list[str],
 - ) -> list[VideoResult]:
 -     today = datetime.now ()
 - 
 -     url = ('https://snapshot.search.nicovideo.jp'
 -             + '/api/v2/snapshot/video/contents/search')
 - 
 -     result_data: list[VideoResult] = []
 -     to = datetime (2022, 12, 3)
 -     while to <= today:
 -         time.sleep (1.2)
 -         until = to + timedelta (days = 14)
 -         # pylint: disable = consider-using-f-string
 -         query_filter = json.dumps ({ 'type':    'or',
 -                                      'filters': [
 -                 { 'type':          'range',
 -                   'field':         'startTime',
 -                   'from':          ('%04d-%02d-%02dT00:00:00+09:00'
 -                                     % (to.year, to.month, to.day)),
 -                   'to':            ('%04d-%02d-%02dT23:59:59+09:00'
 -                                     % (until.year, until.month, until.day)),
 -                   'include_lower': True }] })
 -         params: VideoSearchParam = { 'q':          ' OR '.join (tags),
 -                                      'targets':    'tagsExact',
 -                                      '_sort':      '-viewCounter',
 -                                      'fields':     ('contentId,'
 -                                                     'title,'
 -                                                     'tags,'
 -                                                     'description,'
 -                                                     'viewCounter,'
 -                                                     'startTime'),
 -                                      '_limit':     100,
 -                                      'jsonFilter': query_filter }
 -         res = requests.get (url, params = cast (dict[str, int | str], params), timeout = 60).json ()
 -         try:
 -             result_data += res['data']
 -         except KeyError:
 -             pass
 -         to = until + timedelta (days = 1)
 - 
 -     return result_data
 - 
 - 
 - class VideoSearchParam (TypedDict):
 -     q:          str
 -     targets:    str
 -     _sort:      str
 -     fields:     str
 -     _limit:     int
 -     jsonFilter: str
 - 
 - 
 - class VideoResult (TypedDict):
 -     contentId:      str
 -     title:          str
 -     tags:           str
 -     description:    str | None
 -     viewCounter:    int
 -     startTime:      str
 - 
 - 
 - class CommentResult (TypedDict):
 -     id:             str
 -     no:             int
 -     vposMs:         int
 -     body:           str
 -     commands:       list[str]
 -     userId:         str
 -     isPremium:      bool
 -     score:          int
 -     postedAt:       str
 -     nicoruCount:    int
 -     nicoruId:       Any
 -     source:         str
 -     isMyPost:       bool
 - 
 - 
 - def normalise (
 -         s:  str,
 - ) -> str:
 -     return jaconv.hira2kata (unicodedata.normalize ('NFKC', s)).lower ()
 - 
 - 
 - if __name__ == '__main__':
 -     main ()
 
 
  |