from __future__ import annotations import asyncio import random from datetime import date, datetime, time, timedelta from typing import TypedDict, cast import requests from bs4 import BeautifulSoup from requests.exceptions import Timeout import queries_to_answers as q2a from db.models import Comment, Video, VideoHistory from nizika_ai.consts import Character, GPTModel, QueryType from nizika_ai.models import Query KIRIBAN_VIEWS_COUNTS: list[int] = sorted ({ *range (1_000, 10_000, 1_000), *range (10_000, 1_000_001, 10_000), 114_514, 1_940, 2_450, 5_100, 19_400, 24_500, 51_000, 93_194, 2_424, 242_424, 1_919, 4_545, 194_245, 245_194, 510_245 }, reverse = True) kiriban_list: list[tuple[int, VideoInfo, datetime]] async def main ( ) -> None: await asyncio.gather ( queries_to_answers (), report_kiriban (), report_nico (), update_kiriban_list ()) async def queries_to_answers ( ) -> None: while True: q2a.main () await asyncio.sleep (10) async def report_kiriban ( ) -> None: while True: # キリ番祝ひ (views_count, video_info, uploaded_at) = ( kiriban_list.pop (random.randint (0, len (kiriban_list) - 1))) since_posted = datetime.now () - uploaded_at video_code = video_info['contentId'] uri = f"https://www.nicovideo.jp/watch/{ video_code }" (title, description, _) = fetch_embed_info (uri) comments = fetch_comments (video_code) popular_comments = sorted (comments, key = lambda c: c.nico_count, reverse = True)[:10] latest_comments = sorted (comments, key = lambda c: c.posted_at, reverse = True)[:10] prompt = f"{ since_posted.days }日と{ since_posted.seconds }秒前にニコニコに投稿された『{ video_info['title'] }』という動画が{ views_count }再生を突破しました。\n" prompt += f"コメント数は{ len (comments) }件です。\n" if video_info['tags']: prompt += f"つけられたタグは「{ '」、「'.join (video_info['tags']) }」です。\n" if comments: prompt += f"人気のコメントは次の通りです:「{ '」、「'.join (c.content for c in popular_comments) }」\n" prompt += f"最新のコメントは次の通りです:「{ '」、「'.join (c.content for c in latest_comments) }」\n" prompt += f""" 概要には次のように書かれています: ```html { video_info['description'] } ``` このことについて、何かお祝いメッセージを下さい。 ただし、そのメッセージ内には再生数の数値を添えてください。 また、つけられたタグ、コメントからどのような動画か想像し、説明してください。""" query = Query () query.user_id = None query.target_character = Character.DEERJIKA.value query.content = prompt query.query_type = QueryType.KIRIBAN.value query.model = GPTModel.GPT3_TURBO.value query.sent_at = datetime.now () query.answered = False query.transfer_data = { 'video_code': video_code } query.save () # 待ち時間計算 dt = datetime.now () d = dt.date () if dt.hour >= 15: d += timedelta (days = 1) td = datetime.combine (d, time (15, 0)) - dt if kiriban_list: td /= len (kiriban_list) await asyncio.sleep (td.total_seconds ()) async def update_kiriban_list ( ) -> None: while True: await wait_until (time (15, 0)) kiriban_list += fetch_kiriban_list (datetime.now ().date ()) def fetch_kiriban_list ( base_date: date, ) -> list[tuple[int, VideoInfo, datetime]]: _kiriban_list: list[tuple[int, VideoInfo, datetime]] = [] latest_fetched_at = cast (date, (VideoHistory .where ('fetched_at', '<=', base_date) .max ('fetched_at'))) for kiriban_views_count in KIRIBAN_VIEWS_COUNTS: targets = { vh.video.code for vh in (VideoHistory .where ('fetched_at', latest_fetched_at) .where ('views_count', '>=', kiriban_views_count) .get ()) } for code in targets: if code in [kiriban[1]['contentId'] for kiriban in _kiriban_list]: continue previous_views_count: int | None = ( VideoHistory .where_has ('video', lambda q: q.where ('code', code)) .where ('fetched_at', '<', latest_fetched_at) .max ('views_count')) if previous_views_count is None: previous_views_count = 0 if previous_views_count >= kiriban_views_count: continue video_info = fetch_video_info (code) if video_info is not None: _kiriban_list.append ((kiriban_views_count, video_info, cast (Video, Video.where ('code', code).first ()).uploaded_at)) return _kiriban_list def fetch_video_info ( video_code: str, ) -> VideoInfo | None: video_info: dict[str, str | list[str]] = { 'contentId': video_code } bs = create_bs_from_url (f"https://www.nicovideo.jp/watch/{ video_code }") if bs is None: return None try: title = bs.find ('title') if title is None: return None video_info['title'] = '-'.join (title.text.split ('-')[:(-1)])[:(-1)] tags: str = bs.find ('meta', attrs = { 'name': 'keywords' }).get ('content') # type: ignore video_info['tags'] = tags.split (',') video_info['description'] = bs.find ('meta', attrs = { 'name': 'description' }).get ('content') # type: ignore except Exception: return None return cast (VideoInfo, video_info) def create_bs_from_url ( url: str, params: dict | None = None, ) -> BeautifulSoup | None: """ URL から BeautifulSoup インスタンス生成 Parameters ---------- url: str 捜査する URL params: dict パラメータ Return ------ BeautifulSoup | None BeautifulSoup オブゼクト(失敗したら None) """ if params is None: params = { } try: req = requests.get (url, params = params, timeout = 60) except Timeout: return None if req.status_code != 200: return None req.encoding = req.apparent_encoding return BeautifulSoup (req.text, 'hecoml.parser') def fetch_embed_info ( url: str, ) -> tuple[str, str, str]: title: str = '' description: str = '' thumbnail: str = '' try: res = requests.get (url, timeout = 60) except Timeout: return ('', '', '') if res.status_code != 200: return ('', '', '') soup = BeautifulSoup (res.text, 'html.parser') tmp = soup.find ('title') if tmp is not None: title = tmp.text tmp = soup.find ('meta', attrs = { 'name': 'description' }) if tmp is not None and hasattr (tmp, 'get'): try: description = cast (str, tmp.get ('content')) except Exception: pass tmp = soup.find ('meta', attrs = { 'name': 'thumbnail' }) if tmp is not None and hasattr (tmp, 'get'): try: thumbnail = cast (str, tmp.get ('content')) except Exception: pass return (title, description, thumbnail) def fetch_comments ( video_code: str, ) -> list[Comment]: video = Video.where ('code', video_code).first () if video is None: return [] return video.comments async def report_nico ( ) -> None: ... async def wait_until ( t: time, ): dt = datetime.now () d = dt.date () if dt.time () >= t: d += timedelta (days = 1) await asyncio.sleep ((datetime.combine (d, t) - dt).total_seconds ()) class VideoInfo (TypedDict): contentId: str title: str tags: list[str] description: str kiriban_list = ( fetch_kiriban_list ((d := datetime.now ()).date () - timedelta (days = d.hour < 15))) if __name__ == '__main__': asyncio.run (main ())