""" ニコニコのニジカ動画取得モヂュール """ from __future__ import annotations import os from datetime import date, timedelta from typing import TypedDict, cast import requests from bs4 import BeautifulSoup from requests.exceptions import Timeout from eloquent import DatabaseManager, Model from db.models import Tag, Video, VideoHistory, VideoTag CONFIG: dict[str, DbConfig] = { 'mysql': { 'driver': 'mysql', 'host': 'localhost', 'database': 'nizika_nico', 'user': os.environ['MYSQL_USER'], 'password': os.environ['MYSQL_PASS'], 'prefix': '' } } DB = DatabaseManager (CONFIG) Model.set_connection_resolver (DB) KIRIBAN_VIEWS_COUNTS: set[int] = { *range (100, 1_000, 100), *range (1_000, 10_000, 1_000), *range (10_000, 1_000_001, 10_000), 194, 245, 510, 114_514, 1_940, 2_450, 5_100, 24_500, 51_000, 2_424 } class VideoInfo (TypedDict): contentId: str title: str tags: list[str] description: str def get_latest_deerjika ( ) -> VideoInfo | None: tag = '伊地知ニジカ OR ぼざろクリーチャーシリーズ' url = f"https://www.nicovideo.jp/tag/{ tag }" params = { 'sort': 'f', 'order': 'd' } video_info = { } bs = get_bs_from_url (url, params) if bs is None: return None try: video = (bs.find_all ('ul', class_ = 'videoListInner')[1] .find ('li', class_ = 'item')) video_info['contentId'] = video['data-video-id'] except Exception: return None return get_video_info (video_info['contentId']) def get_bs_from_url ( url: str, params: dict = { }, ) -> BeautifulSoup | None: """ URL から BeautifulSoup インスタンス生成 Parameters ---------- url: str 捜査する URL params: dict パラメータ Return ------ BeautifulSoup | None BeautifulSoup オブゼクト(失敗したら None) """ try: req = requests.get (url, params = params, timeout = 60) except Timeout: return None if req.status_code != 200: return None req.encoding = req.apparent_encoding return BeautifulSoup (req.text, 'html.parser') def get_video_info ( video_code: str, ) -> VideoInfo | None: video_info: dict[str, str | list[str]] = { 'contentId': video_code } bs = get_bs_from_url (f"https://www.nicovideo.jp/watch/{ video_code }") if bs is None: return None try: title = bs.find ('title') if title is None: return None video_info['title'] = '-'.join (title.text.split ('-')[:(-1)])[:(-1)] tags: str = bs.find ('meta', attrs = { 'name': 'keywords' }).get ('content') # type: ignore video_info['tags'] = tags.split (',') video_info['description'] = bs.find ('meta', attrs = { 'name': 'description' }).get ('content') # type: ignore except Exception: return None return cast (VideoInfo, video_info) def get_kiriban_list ( base_date: date, ) -> list[tuple[int, VideoInfo]]: kiriban_list: list[tuple[int, VideoInfo]] = [] latest_fetched_at = cast (date, VideoHistory.max ('fetched_at')) previous_fetched_at = cast (date, (VideoHistory .where ('fetched_at', '<', latest_fetched_at) .max ('fetched_at'))) for kiriban_views_count in KIRIBAN_VIEWS_COUNTS: targets = ({ vh.video.code for vh in (VideoHistory .where ('fetched_at', latest_fetched_at) .where ('views_count', '>=', kiriban_views_count) .get ()) } - { vh.video.code for vh in (VideoHistory .where ('fetched_at', previous_fetched_at) .where ('views_count', '>=', kiriban_views_count) .get ()) }) for code in targets: video_info = get_video_info (code) if video_info is not None: kiriban_list.append ((kiriban_views_count, video_info)) return kiriban_list class DbConfig (TypedDict): driver: str host: str database: str user: str password: str prefix: str