|
- """
- ニコニコのニジカ動画取得モヂュール
- """
-
- from __future__ import annotations
-
- import os
- from datetime import date, datetime, timedelta
- from typing import TypedDict, cast
-
- import requests
- from bs4 import BeautifulSoup
- from requests.exceptions import Timeout
- from eloquent import DatabaseManager, Model
-
- from db.models import Comment, Tag, Video, VideoHistory, VideoTag
-
- CONFIG: dict[str, DbConfig] = { 'mysql': { 'driver': 'mysql',
- 'host': 'localhost',
- 'database': 'nizika_nico',
- 'user': os.environ['MYSQL_USER'],
- 'password': os.environ['MYSQL_PASS'],
- 'prefix': '' } }
-
- DB = DatabaseManager (CONFIG)
- Model.set_connection_resolver (DB)
-
- KIRIBAN_VIEWS_COUNTS: set[int] = { *range (1_000, 10_000, 1_000),
- *range (10_000, 1_000_001, 10_000),
- 194, 245, 510, 810, 114_514, 1_940, 2_450, 5_100,
- 19_400, 24_500, 51_000, 93_194, 2_424, 242_424, 1_919, 4_545,
- 194_245, 245_194, 510_245 }
-
- class VideoInfo (TypedDict):
- contentId: str
- title: str
- tags: list[str]
- description: str
-
-
- def get_latest_deerjika (
- ) -> VideoInfo | None:
- tag = '伊地知ニジカ OR ぼざろクリーチャーシリーズ'
- url = f"https://www.nicovideo.jp/tag/{ tag }"
-
- params = { 'sort': 'f',
- 'order': 'd' }
-
- video_info = { }
-
- bs = get_bs_from_url (url, params)
- if bs is None:
- return None
-
- try:
- video = (bs.find_all ('ul', class_ = 'videoListInner')[1]
- .find ('li', class_ = 'item'))
-
- video_info['contentId'] = video['data-video-id']
- except Exception:
- return None
-
- return get_video_info (video_info['contentId'])
-
-
- def get_bs_from_url (
- url: str,
- params: dict = { },
- ) -> BeautifulSoup | None:
- """
- URL から BeautifulSoup インスタンス生成
-
- Parameters
- ----------
- url: str
- 捜査する URL
- params: dict
- パラメータ
-
- Return
- ------
- BeautifulSoup | None
- BeautifulSoup オブゼクト(失敗したら None)
- """
-
- try:
- req = requests.get (url, params = params, timeout = 60)
- except Timeout:
- return None
-
- if req.status_code != 200:
- return None
-
- req.encoding = req.apparent_encoding
-
- return BeautifulSoup (req.text, 'html.parser')
-
-
- def get_video_info (
- video_code: str,
- ) -> VideoInfo | None:
- video_info: dict[str, str | list[str]] = { 'contentId': video_code }
-
- bs = get_bs_from_url (f"https://www.nicovideo.jp/watch/{ video_code }")
- if bs is None:
- return None
-
- try:
- title = bs.find ('title')
- if title is None:
- return None
- video_info['title'] = '-'.join (title.text.split ('-')[:(-1)])[:(-1)]
-
- tags: str = bs.find ('meta', attrs = { 'name': 'keywords' }).get ('content') # type: ignore
- video_info['tags'] = tags.split (',')
-
- video_info['description'] = bs.find ('meta', attrs = { 'name': 'description' }).get ('content') # type: ignore
- except Exception:
- return None
-
- return cast (VideoInfo, video_info)
-
-
- def get_kiriban_list (
- base_date: date,
- ) -> list[tuple[int, VideoInfo, datetime]]:
- kiriban_list: list[tuple[int, VideoInfo, datetime]] = []
-
- latest_fetched_at = cast (date, (VideoHistory
- .where ('fetched_at', '<=', base_date)
- .max ('fetched_at')))
- previous_fetched_at = cast (date, (VideoHistory
- .where ('fetched_at', '<', latest_fetched_at)
- .max ('fetched_at')))
-
- for kiriban_views_count in KIRIBAN_VIEWS_COUNTS:
- targets = ({ vh.video.code for vh in (VideoHistory
- .where ('fetched_at', latest_fetched_at)
- .where ('views_count', '>=', kiriban_views_count)
- .get ()) }
- - { vh.video.code for vh in (VideoHistory
- .where ('fetched_at', previous_fetched_at)
- .where ('views_count', '>=', kiriban_views_count)
- .get ()) })
- for code in targets:
- video_info = get_video_info (code)
- if video_info is not None:
- kiriban_list.append ((kiriban_views_count, video_info,
- cast (Video, Video.where ('code', code).first ()).uploaded_at))
-
- return kiriban_list
-
-
- def get_comments (
- video_code: str,
- ) -> list[Comment]:
- video = Video.where ('code', video_code).first ()
- if video is None:
- return []
- return video.comments
-
-
- class DbConfig (TypedDict):
- driver: str
- host: str
- database: str
- user: str
- password: str
- prefix: str
|