from __future__ import annotations import asyncio import random from asyncio import Lock from datetime import date, datetime, time, timedelta from typing import TypedDict, cast import nicolib import queries_to_answers as q2a from db.models import Comment, Video, VideoHistory from nicolib import VideoInfo from nizika_ai.consts import Character, GPTModel, QueryType from nizika_ai.models import Query KIRIBAN_VIEWS_COUNTS: list[int] = sorted ({ *range (1_000, 10_000, 1_000), *range (10_000, 1_000_001, 10_000), 114_514, 1_940, 2_450, 5_100, 19_400, 24_500, 51_000, 93_194, 2_424, 242_424, 1_919, 4_545, 194_245, 245_194, 510_245 }, reverse = True) kiriban_list: list[tuple[int, VideoInfo, datetime]] lock = Lock () async def main ( ) -> None: await asyncio.gather ( queries_to_answers (), report_kiriban (), report_nico (), update_kiriban_list ()) async def queries_to_answers ( ) -> None: while True: loop = asyncio.get_running_loop () await loop.run_in_executor (None, q2a.main) await asyncio.sleep (10) async def report_kiriban ( ) -> None: while True: if not kiriban_list: await wait_until (time (15, 0)) continue # キリ番祝ひ async with lock: (views_count, video_info, uploaded_at) = ( kiriban_list.pop (random.randint (0, len (kiriban_list) - 1))) since_posted = datetime.now () - uploaded_at _days = since_posted.days _seconds = since_posted.seconds (_hours, _seconds) = divmod (_seconds, 3600) (_mins, _seconds) = divmod (_seconds, 60) video_code = video_info['contentId'] uri = f"https://www.nicovideo.jp/watch/{ video_code }" (title, description, _) = nicolib.fetch_embed_info (uri) comments = fetch_comments (video_code) popular_comments = sorted (comments, key = lambda c: c.nico_count, reverse = True)[:10] latest_comments = sorted (comments, key = lambda c: c.posted_at, reverse = True)[:10] prompt = f"{ _days }日{ _hours }時間{ _mins }分{ _seconds }秒前にニコニコに投稿された『{ video_info['title'] }』という動画が{ views_count }再生を突破しました。\n" prompt += f"コメント数は{ len (comments) }件です。\n" if video_info['tags']: prompt += f"つけられたタグは「{ '」、「'.join (video_info['tags']) }」です。\n" if comments: prompt += f"人気のコメントは次の通りです:「{ '」、「'.join (c.content for c in popular_comments) }」\n" prompt += f"最新のコメントは次の通りです:「{ '」、「'.join (c.content for c in latest_comments) }」\n" prompt += f""" 概要には次のように書かれています: ```html { video_info['description'] } ``` このことについて、何かお祝いメッセージを下さい。 ただし、そのメッセージ内には再生数の数値を添えてください。 また、つけられたタグ、コメントからどのような動画か想像し、説明してください。""" query = Query () query.user_id = None query.target_character = Character.DEERJIKA.value query.content = prompt query.query_type = QueryType.KIRIBAN.value query.model = GPTModel.GPT3_TURBO.value query.sent_at = datetime.now () query.answered = False query.transfer_data = { 'video_code': video_code } query.save () # 待ち時間計算 dt = datetime.now () d = dt.date () if dt.hour >= 15: d += timedelta (days = 1) remain = max (len (kiriban_list), 1) td = (datetime.combine (d, time (15, 0)) - dt) / remain # まれに時刻跨ぎでマイナスになるため if td.total_seconds () < 0: td = timedelta (seconds = 0) await asyncio.sleep (td.total_seconds ()) async def update_kiriban_list ( ) -> None: while True: await wait_until (time (15, 0)) new_list = fetch_kiriban_list (datetime.now ().date ()) if not new_list: continue async with lock: have = { k[1]['contentId'] for k in kiriban_list } for item in new_list: if item[1]['contentId'] not in have: kiriban_list.append (item) have.add (item[1]['contentId']) def fetch_kiriban_list ( base_date: date, ) -> list[tuple[int, VideoInfo, datetime]]: _kiriban_list: list[tuple[int, VideoInfo, datetime]] = [] latest_fetched_at = cast (date, (VideoHistory .where ('fetched_at', '<=', base_date) .max ('fetched_at'))) for kiriban_views_count in KIRIBAN_VIEWS_COUNTS: targets = { vh.video.code for vh in (VideoHistory .where ('fetched_at', latest_fetched_at) .where ('views_count', '>=', kiriban_views_count) .get ()) } for code in targets: if code in [kiriban[1]['contentId'] for kiriban in _kiriban_list]: continue previous_views_count: int | None = ( VideoHistory .where_has ('video', lambda q: q.where ('code', code)) .where ('fetched_at', '<', latest_fetched_at) .max ('views_count')) if previous_views_count is None: previous_views_count = 0 if previous_views_count >= kiriban_views_count: continue video_info = nicolib.fetch_video_info (code) if video_info is not None: _kiriban_list.append ((kiriban_views_count, video_info, cast (Video, Video.where ('code', code).first ()).uploaded_at)) return _kiriban_list def fetch_comments ( video_code: str, ) -> list[Comment]: video = Video.where ('code', video_code).first () if video is None: return [] return video.comments async def report_nico ( ) -> None: ... async def wait_until ( t: time, ): dt = datetime.now () d = dt.date () if dt.time () >= t: d += timedelta (days = 1) await asyncio.sleep ((datetime.combine (d, t) - dt).total_seconds ()) kiriban_list = ( fetch_kiriban_list ((d := datetime.now ()).date () - timedelta (days = d.hour < 15))) if __name__ == '__main__': asyncio.run (main ())