diff --git a/main.py b/main.py index 4cf8870..e1d239f 100644 --- a/main.py +++ b/main.py @@ -1,10 +1,14 @@ +""" +AI ニジカ常時稼動バッチ +""" + from __future__ import annotations import asyncio import random from asyncio import Lock from datetime import date, datetime, time, timedelta -from typing import TypedDict, cast +from typing import cast import nicolib import queries_to_answers as q2a @@ -21,11 +25,18 @@ KIRIBAN_VIEWS_COUNTS: list[int] = sorted ({ *range (1_000, 10_000, 1_000), reverse = True) kiriban_list: list[tuple[int, VideoInfo, datetime]] + +watched_videos: set[str] = set () + lock = Lock () async def main ( ) -> None: + """ + メーン処理 + """ + await asyncio.gather ( queries_to_answers (), report_kiriban (), @@ -35,6 +46,10 @@ async def main ( async def queries_to_answers ( ) -> None: + """ + クエリ処理 + """ + while True: loop = asyncio.get_running_loop () await loop.run_in_executor (None, q2a.main) @@ -43,6 +58,10 @@ async def queries_to_answers ( async def report_kiriban ( ) -> None: + """ + キリ番祝ひ + """ + while True: if not kiriban_list: await wait_until (time (15, 0)) @@ -53,15 +72,7 @@ async def report_kiriban ( (views_count, video_info, uploaded_at) = ( kiriban_list.pop (random.randint (0, len (kiriban_list) - 1))) - since_posted = datetime.now () - uploaded_at - _days = since_posted.days - _seconds = since_posted.seconds - (_hours, _seconds) = divmod (_seconds, 3600) - (_mins, _seconds) = divmod (_seconds, 60) - video_code = video_info['contentId'] - uri = f"https://www.nicovideo.jp/watch/{ video_code }" - (title, description, _) = nicolib.fetch_embed_info (uri) comments = fetch_comments (video_code) popular_comments = sorted (comments, key = lambda c: c.nico_count, @@ -69,8 +80,9 @@ async def report_kiriban ( latest_comments = sorted (comments, key = lambda c: c.posted_at, reverse = True)[:10] - prompt = f"{ _days }日{ _hours }時間{ _mins }分{ _seconds }秒前にニコニコに投稿された『{ video_info['title'] }』という動画が{ views_count }再生を突破しました。\n" - prompt += f"コメント数は{ len (comments) }件です。\n" + prompt = (f"{ _format_elapsed (uploaded_at) }前にニコニコに投稿された" + f"『{ video_info['title'] }』という動画が{ views_count }再生を突破しました。\n" + f"コメント数は{ len (comments) }件です。\n") if video_info['tags']: prompt += f"つけられたタグは「{ '」、「'.join (video_info['tags']) }」です。\n" if comments: @@ -111,6 +123,10 @@ async def report_kiriban ( async def update_kiriban_list ( ) -> None: + """ + キリ番リストの更新 + """ + while True: await wait_until (time (15, 0)) @@ -129,6 +145,20 @@ async def update_kiriban_list ( def fetch_kiriban_list ( base_date: date, ) -> list[tuple[int, VideoInfo, datetime]]: + """ + キリ番を迎へた動画のリストを取得する. + + Parameters + ---------- + base_date: date + 基準日 + + Return + ------ + list[tuple[int, VideoInfo, datetime]] + 動画リスト(キリ番基準再生数、対象動画情報、投稿日時のタプル) + """ + _kiriban_list: list[tuple[int, VideoInfo, datetime]] = [] latest_fetched_at = cast (date, (VideoHistory @@ -145,7 +175,7 @@ def fetch_kiriban_list ( continue previous_views_count: int | None = ( VideoHistory - .where_has ('video', lambda q: q.where ('code', code)) + .where_has ('video', lambda q, code = code: q.where ('code', code)) .where ('fetched_at', '<', latest_fetched_at) .max ('views_count')) if previous_views_count is None: @@ -155,7 +185,8 @@ def fetch_kiriban_list ( video_info = nicolib.fetch_video_info (code) if video_info is not None: _kiriban_list.append ((kiriban_views_count, video_info, - cast (Video, Video.where ('code', code).first ()).uploaded_at)) + (cast (Video, Video.where ('code', code).first ()) + .uploaded_at))) return _kiriban_list @@ -163,20 +194,87 @@ def fetch_kiriban_list ( def fetch_comments ( video_code: str, ) -> list[Comment]: + """ + 動画のコメント・リストを取得する. + + Parameters + ---------- + video_code: str + ニコニコの動画コード + + Return + ------ + list[Comment] + コメント・リスト + """ + video = Video.where ('code', video_code).first () if video is None: return [] return video.comments +def fetch_latest_deerjika ( +) -> VideoInfo | None: + """ + 最新のぼざクリ動画を取得する. + + Return + ------ + VideoInfo | None + 動画情報 + """ + + return nicolib.fetch_latest_video (['伊地知ニジカ', + 'ぼざろクリーチャーシリーズ', + 'ぼざろクリーチャーシリーズ外伝']) + + async def report_nico ( ) -> None: - ... + """ + ニコニコから最新のぼざクリを取得し,まだ報知してゐなかったら報知する. + """ + + while True: + latest_deerjika = fetch_latest_deerjika () + if latest_deerjika and latest_deerjika['contentId'] not in watched_videos: + video = latest_deerjika + watched_videos.add (video['contentId']) + + prompt = f"""ニコニコに『{ video['title'] }』という動画がアップされました。 +つけられたタグは「{ '」、「'.join (video['tags']) }」です。 +概要には次のように書かれています: +```html +{ video['description'] } +``` +このことについて、みんなに告知するとともに、ニジカちゃんの感想を教えてください。""" + query = Query () + query.user_id = None + query.target_character = Character.DEERJIKA.value + query.content = prompt + query.query_type = QueryType.NICO_REPORT.value + query.model = GPTModel.GPT3_TURBO.value + query.sent_at = datetime.now () + query.answered = False + query.transfer_data = { 'video_code': video['contentId'] } + query.save () + + await asyncio.sleep (60) async def wait_until ( t: time, -): +) -> None: + """ + 指定した時刻まで待つ. + + Parameters + ---------- + t: time + 次に実行を続行するまでの時刻 + """ + dt = datetime.now () d = dt.date () if dt.time () >= t: @@ -184,9 +282,35 @@ async def wait_until ( await asyncio.sleep ((datetime.combine (d, t) - dt).total_seconds ()) +def _format_elapsed ( + uploaded_at: datetime, +) -> str: + """ + 指定した時刻から現在までの時間を見やすぃ文字列に変換する. + + Parameters + ---------- + uploaded_at: datetime + 基準日時 + + Return + ------ + str + 変換後文字列 + """ + + delta = datetime.now () - uploaded_at + days = delta.days + seconds = delta.seconds + (hours, seconds) = divmod (seconds, 3600) + (mins, seconds) = divmod (seconds, 60) + + return f"{ days }日{ hours }時間{ mins }分{ seconds }秒" + + kiriban_list = ( - fetch_kiriban_list ((d := datetime.now ()).date () - - timedelta (days = d.hour < 15))) + fetch_kiriban_list ((now := datetime.now ()).date () + - timedelta (days = now.hour < 15))) if __name__ == '__main__': asyncio.run (main ()) diff --git a/nicolib b/nicolib index b7a88cc..9fec1cf 160000 --- a/nicolib +++ b/nicolib @@ -1 +1 @@ -Subproject commit b7a88cc774aa7869678c00abf9f93982e5b6cfb9 +Subproject commit 9fec1cf5f99ca2cb2a9b3ff158027a3416cc5f06 diff --git a/queries_to_answers.py b/queries_to_answers.py index 20c7475..9c2cc3d 100644 --- a/queries_to_answers.py +++ b/queries_to_answers.py @@ -1,3 +1,7 @@ +""" +DB の queries テーブルにたまってゐるクエリを AI に処理させ answers テーブルに流す. +""" + from __future__ import annotations import random @@ -6,12 +10,16 @@ from typing import TypedDict from nizika_ai.config import DB from nizika_ai.consts import AnswerType, Character, Platform -from nizika_ai.models import Answer, AnsweredFlag, Query, User +from nizika_ai.models import Answer, AnsweredFlag, Query from nizika_ai.talk import Talk def main ( ) -> None: + """ + メーン処理 + """ + queries: list[Query] = Query.where ('answered', False).get () if not queries: return @@ -48,6 +56,21 @@ def add_answer ( user_name: str | None, histories: list[History], ) -> None: + """ + AI の返答を DB に積む. + + Parameters + ---------- + query: Query + クエリ + character: Character + 返答するキャラクタ + user_name: str | None + クエリの主 + histories: list[History] + 履歴 + """ + message: str | list[dict[str, str | dict[str, str]]] if query.image_url: message = [{ 'type': 'text', 'text': query.content }, @@ -69,10 +92,19 @@ def add_answer ( def add_answered_flags ( answer: Answer, ) -> None: + """ + 返答済フラグを付与する. + + Parameters + ---------- + answer: Answer + 返答モデル + """ + answer_type: AnswerType try: answer_type = AnswerType (answer.answer_type) - except Exception: + except (TypeError, ValueError): return if answer_type in (AnswerType.YOUTUBE_REPLY,): @@ -85,6 +117,17 @@ def add_answered_flag ( answer: Answer, platform: Platform, ) -> None: + """ + 返答済フラグを付与する. + + Parameters + ---------- + answer: Answer + 返答モデル + platform: Platform + プラットフォーム + """ + answered_flag = AnsweredFlag () answered_flag.answer_id = answer.id answered_flag.platform = platform.value @@ -93,6 +136,10 @@ def add_answered_flag ( class History (TypedDict): + """ + 会話履歴の 1 要素;ユーザや AI の発話を簡易に保持する型 + """ + role: str content: str