""" AI ニジカ常時稼動バッチ """ from __future__ import annotations import asyncio import random from asyncio import Lock from datetime import date, datetime, time, timedelta from typing import cast import nicolib import queries_to_answers as q2a from db.models import Comment, Video, VideoHistory from nicolib import VideoInfo from nizika_ai.consts import Character, GPTModel, QueryType from nizika_ai.models import Query KIRIBAN_VIEWS_COUNTS: list[int] = sorted ({ *range (1_000, 10_000, 1_000), *range (10_000, 1_000_001, 10_000), 114_514, 1_940, 2_450, 5_100, 19_400, 24_500, 51_000, 93_194, 2_424, 242_424, 1_919, 4_545, 194_245, 245_194, 510_245 }, reverse = True) kiriban_list: list[tuple[int, VideoInfo, datetime]] watched_videos: set[str] = set () lock = Lock () async def main ( ) -> None: """ メーン処理 """ await asyncio.gather ( queries_to_answers (), report_kiriban (), report_nico (), update_kiriban_list ()) async def queries_to_answers ( ) -> None: """ クエリ処理 """ while True: loop = asyncio.get_running_loop () await loop.run_in_executor (None, q2a.main) await asyncio.sleep (10) async def report_kiriban ( ) -> None: """ キリ番祝ひ """ while True: if not kiriban_list: await wait_until (time (15, 0)) continue # キリ番祝ひ async with lock: (views_count, video_info, uploaded_at) = ( kiriban_list.pop (random.randint (0, len (kiriban_list) - 1))) video_code = video_info['contentId'] comments = fetch_comments (video_code) popular_comments = sorted (comments, key = lambda c: c.nico_count, reverse = True)[:10] latest_comments = sorted (comments, key = lambda c: c.posted_at, reverse = True)[:10] prompt = (f"{ _format_elapsed (uploaded_at) }前にニコニコに投稿された" f"『{ video_info['title'] }』という動画が{ views_count }再生を突破しました。\n" f"コメント数は{ len (comments) }件です。\n") if video_info['tags']: prompt += f"つけられたタグは「{ '」、「'.join (video_info['tags']) }」です。\n" if comments: prompt += f"人気のコメントは次の通りです:「{ '」、「'.join (c.content for c in popular_comments) }」\n" prompt += f"最新のコメントは次の通りです:「{ '」、「'.join (c.content for c in latest_comments) }」\n" prompt += f""" 概要には次のように書かれています: ```html { video_info['description'] } ``` このことについて、何かお祝いメッセージを下さい。 ただし、そのメッセージ内には再生数の数値を添えてください。 また、つけられたタグ、コメントからどのような動画か想像し、説明してください。""" query = Query () query.user_id = None query.target_character = Character.DEERJIKA.value query.content = prompt query.query_type = QueryType.KIRIBAN.value query.model = GPTModel.GPT3_TURBO.value query.sent_at = datetime.now () query.answered = False query.transfer_data = { 'video_code': video_code } query.save () # 待ち時間計算 dt = datetime.now () d = dt.date () if dt.hour >= 15: d += timedelta (days = 1) remain = max (len (kiriban_list), 1) td = (datetime.combine (d, time (15, 0)) - dt) / remain # まれに時刻跨ぎでマイナスになるため if td.total_seconds () < 0: td = timedelta (seconds = 0) await asyncio.sleep (td.total_seconds ()) async def update_kiriban_list ( ) -> None: """ キリ番リストの更新 """ while True: await wait_until (time (15, 0)) new_list = fetch_kiriban_list (datetime.now ().date ()) if not new_list: continue async with lock: have = { k[1]['contentId'] for k in kiriban_list } for item in new_list: if item[1]['contentId'] not in have: kiriban_list.append (item) have.add (item[1]['contentId']) def fetch_kiriban_list ( base_date: date, ) -> list[tuple[int, VideoInfo, datetime]]: """ キリ番を迎へた動画のリストを取得する. Parameters ---------- base_date: date 基準日 Return ------ list[tuple[int, VideoInfo, datetime]] 動画リスト(キリ番基準再生数、対象動画情報、投稿日時のタプル) """ _kiriban_list: list[tuple[int, VideoInfo, datetime]] = [] latest_fetched_at = cast (date, (VideoHistory .where ('fetched_at', '<=', base_date) .max ('fetched_at'))) for kiriban_views_count in KIRIBAN_VIEWS_COUNTS: targets = { vh.video.code for vh in (VideoHistory .where ('fetched_at', latest_fetched_at) .where ('views_count', '>=', kiriban_views_count) .get ()) } for code in targets: if code in [kiriban[1]['contentId'] for kiriban in _kiriban_list]: continue previous_views_count: int | None = ( VideoHistory .where_has ('video', lambda q, code = code: q.where ('code', code)) .where ('fetched_at', '<', latest_fetched_at) .max ('views_count')) if previous_views_count is None: previous_views_count = 0 if previous_views_count >= kiriban_views_count: continue video_info = nicolib.fetch_video_info (code) if video_info is not None: _kiriban_list.append ((kiriban_views_count, video_info, (cast (Video, Video.where ('code', code).first ()) .uploaded_at))) return _kiriban_list def fetch_comments ( video_code: str, ) -> list[Comment]: """ 動画のコメント・リストを取得する. Parameters ---------- video_code: str ニコニコの動画コード Return ------ list[Comment] コメント・リスト """ video = Video.where ('code', video_code).first () if video is None: return [] return video.comments def fetch_latest_deerjika ( ) -> VideoInfo | None: """ 最新のぼざクリ動画を取得する. Return ------ VideoInfo | None 動画情報 """ return nicolib.fetch_latest_video (['伊地知ニジカ', 'ぼざろクリーチャーシリーズ', 'ぼざろクリーチャーシリーズ外伝']) async def report_nico ( ) -> None: """ ニコニコから最新のぼざクリを取得し,まだ報知してゐなかったら報知する. """ while True: latest_deerjika = fetch_latest_deerjika () if latest_deerjika and latest_deerjika['contentId'] not in watched_videos: video = latest_deerjika watched_videos.add (video['contentId']) prompt = f"""ニコニコに『{ video['title'] }』という動画がアップされました。 つけられたタグは「{ '」、「'.join (video['tags']) }」です。 概要には次のように書かれています: ```html { video['description'] } ``` このことについて、みんなに告知するとともに、ニジカちゃんの感想を教えてください。""" query = Query () query.user_id = None query.target_character = Character.DEERJIKA.value query.content = prompt query.query_type = QueryType.NICO_REPORT.value query.model = GPTModel.GPT3_TURBO.value query.sent_at = datetime.now () query.answered = False query.transfer_data = { 'video_code': video['contentId'] } query.save () await asyncio.sleep (60) async def wait_until ( t: time, ) -> None: """ 指定した時刻まで待つ. Parameters ---------- t: time 次に実行を続行するまでの時刻 """ dt = datetime.now () d = dt.date () if dt.time () >= t: d += timedelta (days = 1) await asyncio.sleep ((datetime.combine (d, t) - dt).total_seconds ()) def _format_elapsed ( uploaded_at: datetime, ) -> str: """ 指定した時刻から現在までの時間を見やすぃ文字列に変換する. Parameters ---------- uploaded_at: datetime 基準日時 Return ------ str 変換後文字列 """ delta = datetime.now () - uploaded_at days = delta.days seconds = delta.seconds (hours, seconds) = divmod (seconds, 3600) (mins, seconds) = divmod (seconds, 60) return f"{ days }日{ hours }時間{ mins }分{ seconds }秒" kiriban_list = ( fetch_kiriban_list ((now := datetime.now ()).date () - timedelta (days = now.hour < 15))) if __name__ == '__main__': asyncio.run (main ())