from __future__ import annotations import asyncio from datetime import date, datetime, time, timedelta from typing import TypedDict, cast import requests from bs4 import BeautifulSoup from requests.exceptions import Timeout import queries_to_answers as q2a from db.models import Video, VideoHistory KIRIBAN_VIEWS_COUNTS: list[int] = sorted ({ *range (1_000, 10_000, 1_000), *range (10_000, 1_000_001, 10_000), 114_514, 1_940, 2_450, 5_100, 19_400, 24_500, 51_000, 93_194, 2_424, 242_424, 1_919, 4_545, 194_245, 245_194, 510_245 }, reverse = True) kiriban_list: list[tuple[int, VideoInfo, datetime]] async def main ( ) -> None: await asyncio.gather ( queries_to_answers (), report_kiriban (), report_nico (), update_kiriban_list ()) async def queries_to_answers ( ) -> None: while True: q2a.main () await asyncio.sleep (10) async def report_kiriban ( ) -> None: while True: # キリ番祝ひ ... # 待ち時間計算 dt = datetime.now () d = dt.date () if dt.hour >= 15: d += timedelta (days = 1) td = datetime.combine (d, time (15, 0)) - dt if kiriban_list: td /= len (kiriban_list) await asyncio.sleep (td.total_seconds ()) async def update_kiriban_list ( ) -> None: while True: await wait_until (time (15, 0)) kiriban_list += fetch_kiriban_list (datetime.now ().date ()) def fetch_kiriban_list ( base_date: date, ) -> list[tuple[int, VideoInfo, datetime]]: _kiriban_list: list[tuple[int, VideoInfo, datetime]] = [] latest_fetched_at = cast (date, (VideoHistory .where ('fetched_at', '<=', base_date) .max ('fetched_at'))) for kiriban_views_count in KIRIBAN_VIEWS_COUNTS: targets = { vh.video.code for vh in (VideoHistory .where ('fetched_at', latest_fetched_at) .where ('views_count', '>=', kiriban_views_count) .get ()) } for code in targets: if code in [kiriban[1]['contentId'] for kiriban in _kiriban_list]: continue previous_views_count: int | None = ( VideoHistory .where_has ('video', lambda q: q.where ('code', code)) .where ('fetched_at', '<', latest_fetched_at) .max ('views_count')) if previous_views_count is None: previous_views_count = 0 if previous_views_count >= kiriban_views_count: continue video_info = fetch_video_info (code) if video_info is not None: _kiriban_list.append ((kiriban_views_count, video_info, cast (Video, Video.where ('code', code).first ()).uploaded_at)) return _kiriban_list def fetch_video_info ( video_code: str, ) -> VideoInfo | None: video_info: dict[str, str | list[str]] = { 'contentId': video_code } bs = create_bs_from_url (f"https://www.nicovideo.jp/watch/{ video_code }") if bs is None: return None try: title = bs.find ('title') if title is None: return None video_info['title'] = '-'.join (title.text.split ('-')[:(-1)])[:(-1)] tags: str = bs.find ('meta', attrs = { 'name': 'keywords' }).get ('content') # type: ignore video_info['tags'] = tags.split (',') video_info['description'] = bs.find ('meta', attrs = { 'name': 'description' }).get ('content') # type: ignore except Exception: return None return cast (VideoInfo, video_info) def create_bs_from_url ( url: str, params: dict | None = None, ) -> BeautifulSoup | None: """ URL から BeautifulSoup インスタンス生成 Parameters ---------- url: str 捜査する URL params: dict パラメータ Return ------ BeautifulSoup | None BeautifulSoup オブゼクト(失敗したら None) """ if params is None: params = { } try: req = requests.get (url, params = params, timeout = 60) except Timeout: return None if req.status_code != 200: return None req.encoding = req.apparent_encoding return BeautifulSoup (req.text, 'hecoml.parser') async def report_nico ( ) -> None: ... async def wait_until ( t: time, ): dt = datetime.now () d = dt.date () if dt.time () >= t: d += timedelta (days = 1) await asyncio.sleep ((datetime.combine (d, t) - dt).total_seconds ()) class VideoInfo (TypedDict): contentId: str title: str tags: list[str] description: str kiriban_list = ( fetch_kiriban_list ((d := datetime.now ()).date () - timedelta (days = d.hour < 15))) if __name__ == '__main__': asyncio.run (main ())