from __future__ import annotations import asyncio from datetime import date, datetime, time, timedelta from typing import TypedDict, cast import requests from bs4 import BeautifulSoup from requests.exceptions import Timeout import queries_to_answers as q2a from db.models import Video, VideoHistory KIRIBAN_VIEWS_COUNTS: list[int] = sorted ({ *range (1_000, 10_000, 1_000), *range (10_000, 1_000_001, 10_000), 114_514, 1_940, 2_450, 5_100, 19_400, 24_500, 51_000, 93_194, 2_424, 242_424, 1_919, 4_545, 194_245, 245_194, 510_245 }, reverse = True) kiriban_list: list[tuple[int, VideoInfo, datetime]] async def main ( ) -> None: await asyncio.gather ( queries_to_answers (), report_kiriban (), report_nico (), update_kiriban_list ()) async def queries_to_answers ( ) -> None: while True: q2a.main () await asyncio.sleep (10) async def report_kiriban ( ) -> None: while True: # キリ番祝ひ (views_count, video_info, uploaded_at) = ( kiriban_list.pop (random.randint (0, len (kiriban_list) - 1))) since_posted = datetime.now () - uploaded_at uri = f"https://www.nicovideo.jp/watch/{ video_info['contentId'] }" (title, description, thumbnail) = fetch_embed_info (uri) try: upload = client.com.atproto.repo.upload_blob ( io.BytesIO (requests.get (thumbnail, timeout = 60).content)) thumb = upload.blob except Timeout: thumb = None comments = nico.get_comments (video_info['contentId']) popular_comments = sorted (comments, key = lambda c: c.nico_count, reverse = True)[:10] latest_comments = sorted (comments, key = lambda c: c.posted_at, reverse = True)[:10] embed_external = models.AppBskyEmbedExternal.Main ( external = models.AppBskyEmbedExternal.External ( title = title, description = description, thumb = thumb, uri = uri)) prompt = f"{ since_posted.days }日と{ since_posted.seconds }秒前にニコニコに投稿された『{ video_info['title'] }』という動画が{ views_count }再生を突破しました。\n" prompt += f"コメント数は{ len (comments) }件です。\n" if video_info['tags']: prompt += f"つけられたタグは「{ '」、「'.join (video_info['tags']) }」です。\n" if comments: prompt += f"人気のコメントは次の通りです:「{ '」、「'.join (c.content for c in popular_comments) }」\n" prompt += f"最新のコメントは次の通りです:「{ '」、「'.join (c.content for c in latest_comments) }」\n" prompt += f""" 概要には次のように書かれています: ```html { video_info['description'] } ``` このことについて、ニジカちゃんからのお祝いメッセージを下さい。 ただし、そのメッセージ内には再生数の数値とその多さに応じたリアクションを添えてください。 また、ぜひ投稿からこの再生数に至るまでにかかった時間や、つけられたタグ、コメントに対して思いを馳せてください。 好きなコメントがあったら教えてね。""" # 待ち時間計算 dt = datetime.now () d = dt.date () if dt.hour >= 15: d += timedelta (days = 1) td = datetime.combine (d, time (15, 0)) - dt if kiriban_list: td /= len (kiriban_list) await asyncio.sleep (td.total_seconds ()) async def update_kiriban_list ( ) -> None: while True: await wait_until (time (15, 0)) kiriban_list += fetch_kiriban_list (datetime.now ().date ()) def fetch_kiriban_list ( base_date: date, ) -> list[tuple[int, VideoInfo, datetime]]: _kiriban_list: list[tuple[int, VideoInfo, datetime]] = [] latest_fetched_at = cast (date, (VideoHistory .where ('fetched_at', '<=', base_date) .max ('fetched_at'))) for kiriban_views_count in KIRIBAN_VIEWS_COUNTS: targets = { vh.video.code for vh in (VideoHistory .where ('fetched_at', latest_fetched_at) .where ('views_count', '>=', kiriban_views_count) .get ()) } for code in targets: if code in [kiriban[1]['contentId'] for kiriban in _kiriban_list]: continue previous_views_count: int | None = ( VideoHistory .where_has ('video', lambda q: q.where ('code', code)) .where ('fetched_at', '<', latest_fetched_at) .max ('views_count')) if previous_views_count is None: previous_views_count = 0 if previous_views_count >= kiriban_views_count: continue video_info = fetch_video_info (code) if video_info is not None: _kiriban_list.append ((kiriban_views_count, video_info, cast (Video, Video.where ('code', code).first ()).uploaded_at)) return _kiriban_list def fetch_video_info ( video_code: str, ) -> VideoInfo | None: video_info: dict[str, str | list[str]] = { 'contentId': video_code } bs = create_bs_from_url (f"https://www.nicovideo.jp/watch/{ video_code }") if bs is None: return None try: title = bs.find ('title') if title is None: return None video_info['title'] = '-'.join (title.text.split ('-')[:(-1)])[:(-1)] tags: str = bs.find ('meta', attrs = { 'name': 'keywords' }).get ('content') # type: ignore video_info['tags'] = tags.split (',') video_info['description'] = bs.find ('meta', attrs = { 'name': 'description' }).get ('content') # type: ignore except Exception: return None return cast (VideoInfo, video_info) def create_bs_from_url ( url: str, params: dict | None = None, ) -> BeautifulSoup | None: """ URL から BeautifulSoup インスタンス生成 Parameters ---------- url: str 捜査する URL params: dict パラメータ Return ------ BeautifulSoup | None BeautifulSoup オブゼクト(失敗したら None) """ if params is None: params = { } try: req = requests.get (url, params = params, timeout = 60) except Timeout: return None if req.status_code != 200: return None req.encoding = req.apparent_encoding return BeautifulSoup (req.text, 'hecoml.parser') async def report_nico ( ) -> None: ... async def wait_until ( t: time, ): dt = datetime.now () d = dt.date () if dt.time () >= t: d += timedelta (days = 1) await asyncio.sleep ((datetime.combine (d, t) - dt).total_seconds ()) class VideoInfo (TypedDict): contentId: str title: str tags: list[str] description: str kiriban_list = ( fetch_kiriban_list ((d := datetime.now ()).date () - timedelta (days = d.hour < 15))) if __name__ == '__main__': asyncio.run (main ())