| 
							- """
 - AI ニジカ常時稼動バッチ
 - """
 - 
 - from __future__ import annotations
 - 
 - import asyncio
 - import random
 - from asyncio import Lock
 - from datetime import date, datetime, time, timedelta
 - from typing import cast
 - 
 - import nicolib
 - import queries_to_answers as q2a
 - from db.config import DB as NicoDB
 - from db.models import Comment, Video, VideoHistory
 - from nicolib import VideoInfo
 - from nizika_ai.config import DB as AIDB
 - from nizika_ai.consts import Character, GPTModel, QueryType
 - from nizika_ai.models import Query
 - 
 - KIRIBAN_VIEWS_COUNTS: list[int] = sorted ({ *range (1_000, 10_000, 1_000),
 -                                             *range (10_000, 1_000_001, 10_000),
 -                                             114_514, 1_940, 2_450, 5_100,
 -                                             19_400, 24_500, 51_000, 93_194, 2_424, 242_424, 1_919,
 -                                             4_545, 194_245, 245_194, 510_245 },
 -                                           reverse = True)
 - 
 - kiriban_list: list[tuple[int, VideoInfo, datetime]] = []
 - 
 - watched_videos: set[str] = set ()
 - 
 - lock = Lock ()
 - 
 - 
 - async def main (
 - ) -> None:
 -     """
 -     メーン処理
 -     """
 - 
 -     # DB 準備
 -     NicoDB
 -     AIDB
 - 
 -     await asyncio.gather (
 -             queries_to_answers (),
 -             report_kiriban (),
 -             report_nico (),
 -             update_kiriban_list (),
 -             report_snack_time (),
 -             report_hot_spring_time ())
 - 
 - 
 - async def queries_to_answers (
 - ) -> None:
 -     """
 -     クエリ処理
 -     """
 - 
 -     while True:
 -         loop = asyncio.get_running_loop ()
 -         await loop.run_in_executor (None, q2a.main)
 -         await asyncio.sleep (10)
 - 
 - 
 - async def report_kiriban (
 - ) -> None:
 -     """
 -     キリ番祝ひ
 -     """
 - 
 -     while True:
 -         if not kiriban_list:
 -             await wait_until (time (15, 0))
 -             continue
 - 
 -         # キリ番祝ひ
 -         async with lock:
 -             (views_count, video_info, uploaded_at) = (
 -                     kiriban_list.pop (random.randint (0, len (kiriban_list) - 1)))
 - 
 -         video_code = video_info['contentId']
 -         comments = fetch_comments (video_code)
 -         popular_comments = sorted (comments,
 -                                    key      = lambda c: c.nico_count,
 -                                    reverse  = True)[:10]
 -         latest_comments = sorted (comments,
 -                                   key       = lambda c: c.posted_at,
 -                                   reverse   = True)[:10]
 -         prompt = (f"{ _format_elapsed (uploaded_at) }前にニコニコに投稿された"
 -                   f"『{ video_info['title'] }』という動画が{ views_count }再生を突破しました。\n"
 -                   f"コメント数は{ len (comments) }件です。\n")
 -         if video_info['tags']:
 -             prompt += f"つけられたタグは「{ '」、「'.join (video_info['tags']) }」です。\n"
 -         if comments:
 -             prompt += f"人気のコメントは次の通りです:「{ '」、「'.join (c.content for c in popular_comments) }」\n"
 -             if latest_comments != popular_comments:
 -                 prompt += f"最新のコメントは次の通りです:「{ '」、「'.join (c.content for c in latest_comments) }」\n"
 -         prompt += f"""
 - 概要には次のように書かれています:
 - ```html
 - { video_info['description'] }
 - ```
 - このことについて、何かお祝いメッセージを下さい。
 - ただし、そのメッセージ内には再生数の数値を添えてください。
 - また、つけられたタグ、コメントからどのような動画か想像し、説明してください。"""
 -         _add_query (prompt, QueryType.KIRIBAN, { 'video_code': video_code })
 - 
 -         # 待ち時間計算
 -         dt = datetime.now ()
 -         d = dt.date ()
 -         if dt.hour >= 15:
 -             d += timedelta (days = 1)
 -         remain = max (len (kiriban_list), 1)
 -         td = (datetime.combine (d, time (15, 0)) - dt) / remain
 -         # まれに時刻跨ぎでマイナスになるため
 -         if td.total_seconds () < 0:
 -             td = timedelta (seconds = 0)
 - 
 -         await asyncio.sleep (td.total_seconds ())
 - 
 - 
 - async def update_kiriban_list (
 - ) -> None:
 -     """
 -     キリ番リストの更新
 -     """
 - 
 -     while True:
 -         await wait_until (time (15, 0))
 - 
 -         new_list = fetch_kiriban_list (datetime.now ().date ())
 -         if not new_list:
 -             continue
 - 
 -         async with lock:
 -             have = { k[1]['contentId'] for k in kiriban_list }
 -             for item in new_list:
 -                 if item[1]['contentId'] not in have:
 -                     kiriban_list.append (item)
 -                     have.add (item[1]['contentId'])
 - 
 - 
 - def fetch_kiriban_list (
 -         base_date:  date,
 - ) -> list[tuple[int, VideoInfo, datetime]]:
 -     """
 -     キリ番を迎へた動画のリストを取得する.
 - 
 -     Parameters
 -     ----------
 -     base_date:  date
 -         基準日
 - 
 -     Return
 -     ------
 -     list[tuple[int, VideoInfo, datetime]]
 -         動画リスト(キリ番基準再生数、対象動画情報、投稿日時のタプル)
 -     """
 - 
 -     _kiriban_list: list[tuple[int, VideoInfo, datetime]] = []
 - 
 -     latest_fetched_at = cast (date, (VideoHistory
 -                                      .where ('fetched_at', '<=', base_date)
 -                                      .max ('fetched_at')))
 - 
 -     for kiriban_views_count in KIRIBAN_VIEWS_COUNTS:
 -         targets = { vh.video.code for vh in (VideoHistory
 -                                              .where ('fetched_at', latest_fetched_at)
 -                                              .where ('views_count', '>=', kiriban_views_count)
 -                                              .get ()) }
 -         for code in targets:
 -             if code in [kiriban[1]['contentId'] for kiriban in _kiriban_list]:
 -                 continue
 -             previous_views_count: int | None = (
 -                     VideoHistory
 -                     .where_has ('video', lambda q, code = code: q.where ('code', code))
 -                     .where ('fetched_at', '<', latest_fetched_at)
 -                     .max ('views_count'))
 -             if previous_views_count is None:
 -                 previous_views_count = 0
 -             if previous_views_count >= kiriban_views_count:
 -                 continue
 -             video_info = nicolib.fetch_video_info (code)
 -             if video_info is not None:
 -                 _kiriban_list.append ((kiriban_views_count, video_info,
 -                                        (cast (Video, Video.where ('code', code).first ())
 -                                         .uploaded_at)))
 - 
 -     return _kiriban_list
 - 
 - 
 - def fetch_comments (
 -         video_code: str,
 - ) -> list[Comment]:
 -     """
 -     動画のコメント・リストを取得する.
 - 
 -     Parameters
 -     ----------
 -     video_code: str
 -         ニコニコの動画コード
 - 
 -     Return
 -     ------
 -     list[Comment]
 -         コメント・リスト
 -     """
 - 
 -     video = Video.where ('code', video_code).first ()
 -     if video is None:
 -         return []
 -     return video.comments
 - 
 - 
 - def fetch_latest_deerjika (
 - ) -> VideoInfo | None:
 -     """
 -     最新のぼざクリ動画を取得する.
 - 
 -     Return
 -     ------
 -     VideoInfo | None
 -         動画情報
 -     """
 - 
 -     return nicolib.fetch_latest_video (['伊地知ニジカ',
 -                                         'ぼざろクリーチャーシリーズ',
 -                                         'ぼざろクリーチャーシリーズ外伝'])
 - 
 - 
 - async def report_nico (
 - ) -> None:
 -     """
 -     ニコニコから最新のぼざクリを取得し,まだ報知してゐなかったら報知する.
 -     """
 - 
 -     while True:
 -         latest_deerjika = fetch_latest_deerjika ()
 -         if latest_deerjika and latest_deerjika['contentId'] not in watched_videos:
 -             video = latest_deerjika
 -             watched_videos.add (video['contentId'])
 - 
 -             prompt = f"""ニコニコに『{ video['title'] }』という動画がアップされました。
 - つけられたタグは「{ '」、「'.join (video['tags']) }」です。
 - 概要には次のように書かれています:
 - ```html
 - { video['description'] }
 - ```
 - このことについて、みんなに告知するとともに、ニジカちゃんの感想を教えてください。"""
 -             _add_query (prompt, QueryType.NICO_REPORT, { 'video_code': video['contentId'] })
 - 
 -         await asyncio.sleep (60)
 - 
 - 
 - async def wait_until (
 -         t:  time,
 - ) -> None:
 -     """
 -     指定した時刻まで待つ.
 - 
 -     Parameters
 -     ----------
 -     t:  time
 -         次に実行を続行するまでの時刻
 -     """
 - 
 -     dt = datetime.now ()
 -     d = dt.date ()
 -     if dt.time () >= t:
 -         d += timedelta (days = 1)
 -     await asyncio.sleep ((datetime.combine (d, t) - dt).total_seconds ())
 - 
 - 
 - async def report_snack_time (
 - ) -> None:
 -     """
 -     おやつタイムを報知する.
 -     """
 - 
 -     while True:
 -         await wait_until (time (15, 0))
 -         _add_query ('おやつタイムだ!!!!', QueryType.SNACK_TIME)
 - 
 - 
 - async def report_hot_spring_time (
 - ) -> None:
 -     """
 -     温泉タイムを報知する.
 -     """
 - 
 -     while True:
 -         await wait_until (time (21, 0))
 -         _add_query ('温泉に入ろう!!!', QueryType.HOT_SPRING)
 - 
 - 
 - def _add_query (
 -         content:        str,
 -         query_type:     QueryType,
 -         transfer_data:  dict | None = None,
 - ) -> None:
 -     query = Query ()
 -     query.user_id = None
 -     query.target_character = Character.DEERJIKA.value
 -     query.content = content
 -     query.query_type = query_type.value
 -     query.model = GPTModel.GPT3_TURBO.value
 -     query.sent_at = datetime.now ()
 -     query.answered = False
 -     query.transfer_data = transfer_data
 -     query.save ()
 - 
 - 
 - def _format_elapsed (
 -         uploaded_at:    datetime,
 - ) -> str:
 -     """
 -     指定した時刻から現在までの時間を見やすぃ文字列に変換する.
 - 
 -     Parameters
 -     ----------
 -     uploaded_at:    datetime
 -         基準日時
 - 
 -     Return
 -     ------
 -     str
 -         変換後文字列
 -     """
 - 
 -     delta = datetime.now () - uploaded_at
 -     days = delta.days
 -     seconds = delta.seconds
 -     (hours, seconds) = divmod (seconds, 3600)
 -     (mins, seconds) = divmod (seconds, 60)
 - 
 -     return f"{ days }日{ hours }時間{ mins }分{ seconds }秒"
 - 
 - 
 - kiriban_list = (
 -         fetch_kiriban_list ((now := datetime.now ()).date ()
 -                             - timedelta (days = 1 if now.hour < 15 else 0)))
 - 
 - if __name__ == '__main__':
 -     asyncio.run (main ())
 
 
  |