""" AI ニジカ常時稼動バッチ """ from __future__ import annotations import asyncio import json import os import random import subprocess from asyncio import Lock from datetime import date, datetime, time, timedelta from typing import Any, cast import nicolib import queries_to_answers as q2a from nicolib import VideoInfo from nizika_ai.consts import Character, GPTModel, QueryType from nizika_ai.models import Query KIRIBAN_VIEWS_COUNTS: list[int] = sorted ({ *range (1_000, 10_000, 1_000), *range (10_000, 1_000_001, 10_000), 114_514, 1_940, 2_450, 5_100, 19_400, 24_500, 51_000, 93_194, 2_424, 242_424, 1_919, 4_545, 194_245, 245_194, 510_245 }, reverse = True) kiriban_list: list[tuple[int, VideoInfo, datetime]] = [] watched_videos: set[str] = set () lock = Lock () async def main ( ) -> None: """ メーン処理 """ await asyncio.gather ( queries_to_answers (), report_kiriban (), report_nico (), update_kiriban_list (), report_snack_time (), report_hot_spring_time ()) async def queries_to_answers ( ) -> None: """ クエリ処理 """ while True: loop = asyncio.get_running_loop () await loop.run_in_executor (None, q2a.main) await asyncio.sleep (10) async def report_kiriban ( ) -> None: """ キリ番祝ひ """ while True: if not kiriban_list: await wait_until (time (15, 0)) continue # キリ番祝ひ async with lock: (views_count, video_info, uploaded_at) = ( kiriban_list.pop (random.randint (0, len (kiriban_list) - 1))) video_code = video_info['contentId'] comments = fetch_comments (video_code) popular_comments = sorted (comments, key = lambda c: c['nico_count'], reverse = True)[:10] latest_comments = sorted (comments, key = lambda c: c['posted_at'], reverse = True)[:10] prompt = (f"{ _format_elapsed (uploaded_at) }前にニコニコに投稿された" f"『{ video_info['title'] }』という動画が{ views_count }再生を突破しました。\n" f"コメント数は{ len (comments) }件です。\n") if video_info['tags']: prompt += f"つけられたタグは「{ '」、「'.join (video_info['tags']) }」です。\n" if comments: prompt += f"人気のコメントは次の通りです:「{ '」、「'.join (c['content'] for c in popular_comments) }」\n" if latest_comments != popular_comments: prompt += f"最新のコメントは次の通りです:「{ '」、「'.join (c.content for c in latest_comments) }」\n" prompt += f""" 概要には次のように書かれています: ```html { video_info['description'] } ``` このことについて、何かお祝いメッセージを下さい。 ただし、そのメッセージ内には再生数の数値を添えてください。 また、つけられたタグ、コメントからどのような動画か想像し、説明してください。""" _add_query (prompt, QueryType.KIRIBAN, { 'video_code': video_code }) # 待ち時間計算 dt = datetime.now () d = dt.date () if dt.hour >= 15: d += timedelta (days = 1) remain = max (len (kiriban_list), 1) td = (datetime.combine (d, time (15, 0)) - dt) / remain # まれに時刻跨ぎでマイナスになるため if td.total_seconds () < 0: td = timedelta (seconds = 0) await asyncio.sleep (td.total_seconds ()) async def update_kiriban_list ( ) -> None: """ キリ番リストの更新 """ while True: await wait_until (time (15, 0)) new_list = fetch_kiriban_list (datetime.now ().date ()) if not new_list: continue async with lock: have = { k[1]['contentId'] for k in kiriban_list } for item in new_list: if item[1]['contentId'] not in have: kiriban_list.append (item) have.add (item[1]['contentId']) def fetch_kiriban_list ( base_date: date, ) -> list[tuple[int, VideoInfo, datetime]]: """ キリ番を迎へた動画のリストを取得する. Parameters ---------- base_date: date 基準日 Return ------ list[tuple[int, VideoInfo, datetime]] 動画リスト(キリ番基準再生数、対象動画情報、投稿日時のタプル) """ result = subprocess.run ( ['python3', str (base_date), *map (str, KIRABAN_VIEWS_COUNTS)], cwd = '/root/nizika_nico', env = os.environ, capture_output = True, text = True) kl: list[list[int | str]] = json.loads (result.stdout) return map (lambda k: (cast (int, k[0]), nicolib.fetch_video_info (cast (str, k[1])), datetime.strptime (cast (str, k[2]), '%Y-%m-%d %H:%M:%S.%f')), kl) def fetch_comments ( video_code: str, ) -> list[CommentDict]: """ 動画のコメント・リストを取得する. Parameters ---------- video_code: str ニコニコの動画コード Return ------ list[CommentDict] コメント・リスト """ result = subprocess.run ( ['python3', video_code], cwd = '/root/nizika_nico', env = os.environ, capture_output = True, text = True) rows: list[dict[str, Any]] = json.loads (result.stdout) comments: list[CommentDict] = [] for row in comments: row['posted_at'] = datetime.strptime (row['posted_at'], '%Y-%m-%d %H:%M:%S.%f') comments.append (cast (CommentDict, row)) return comments def fetch_latest_deerjika ( ) -> VideoInfo | None: """ 最新のぼざクリ動画を取得する. Return ------ VideoInfo | None 動画情報 """ return nicolib.fetch_latest_video (['伊地知ニジカ', 'ぼざろクリーチャーシリーズ', 'ぼざろクリーチャーシリーズ外伝']) async def report_nico ( ) -> None: """ ニコニコから最新のぼざクリを取得し,まだ報知してゐなかったら報知する. """ while True: latest_deerjika = fetch_latest_deerjika () if latest_deerjika and latest_deerjika['contentId'] not in watched_videos: video = latest_deerjika watched_videos.add (video['contentId']) prompt = f"""ニコニコに『{ video['title'] }』という動画がアップされました。 つけられたタグは「{ '」、「'.join (video['tags']) }」です。 概要には次のように書かれています: ```html { video['description'] } ``` このことについて、みんなに告知するとともに、ニジカちゃんの感想を教えてください。""" _add_query (prompt, QueryType.NICO_REPORT, { 'video_code': video['contentId'] }) await asyncio.sleep (60) async def wait_until ( t: time, ) -> None: """ 指定した時刻まで待つ. Parameters ---------- t: time 次に実行を続行するまでの時刻 """ dt = datetime.now () d = dt.date () if dt.time () >= t: d += timedelta (days = 1) await asyncio.sleep ((datetime.combine (d, t) - dt).total_seconds ()) async def report_snack_time ( ) -> None: """ おやつタイムを報知する. """ while True: await wait_until (time (15, 0)) _add_query ('おやつタイムだ!!!!', QueryType.SNACK_TIME) async def report_hot_spring_time ( ) -> None: """ 温泉タイムを報知する. """ while True: await wait_until (time (21, 0)) _add_query ('温泉に入ろう!!!', QueryType.HOT_SPRING) def _add_query ( content: str, query_type: QueryType, transfer_data: dict | None = None, ) -> None: query = Query () query.user_id = None query.target_character = Character.DEERJIKA.value query.content = content query.query_type = query_type.value query.model = GPTModel.GPT3_TURBO.value query.sent_at = datetime.now () query.answered = False query.transfer_data = transfer_data query.save () def _format_elapsed ( uploaded_at: datetime, ) -> str: """ 指定した時刻から現在までの時間を見やすぃ文字列に変換する. Parameters ---------- uploaded_at: datetime 基準日時 Return ------ str 変換後文字列 """ delta = datetime.now () - uploaded_at days = delta.days seconds = delta.seconds (hours, seconds) = divmod (seconds, 3600) (mins, seconds) = divmod (seconds, 60) return f"{ days }日{ hours }時間{ mins }分{ seconds }秒" class CommentDict (TypedDict): id: int video_id: int comment_no: int user_id: int content: str posted_at: datetime nico_count: int vpos_ms: int kiriban_list = ( fetch_kiriban_list ((now := datetime.now ()).date () - timedelta (days = 1 if now.hour < 15 else 0))) if __name__ == '__main__': asyncio.run (main ())