|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225 |
- from __future__ import annotations
-
- import asyncio
- from datetime import date, datetime, time, timedelta
- from typing import TypedDict, cast
-
- import requests
- from bs4 import BeautifulSoup
- from requests.exceptions import Timeout
-
- import queries_to_answers as q2a
- from db.models import Video, VideoHistory
-
- KIRIBAN_VIEWS_COUNTS: list[int] = sorted ({ *range (1_000, 10_000, 1_000),
- *range (10_000, 1_000_001, 10_000),
- 114_514, 1_940, 2_450, 5_100,
- 19_400, 24_500, 51_000, 93_194, 2_424, 242_424, 1_919,
- 4_545, 194_245, 245_194, 510_245 },
- reverse = True)
-
- kiriban_list: list[tuple[int, VideoInfo, datetime]]
-
-
- async def main (
- ) -> None:
- await asyncio.gather (
- queries_to_answers (),
- report_kiriban (),
- report_nico (),
- update_kiriban_list ())
-
-
- async def queries_to_answers (
- ) -> None:
- while True:
- q2a.main ()
- await asyncio.sleep (10)
-
-
- async def report_kiriban (
- ) -> None:
- while True:
- # キリ番祝ひ
- (views_count, video_info, uploaded_at) = (
- kiriban_list.pop (random.randint (0, len (kiriban_list) - 1)))
- since_posted = datetime.now () - uploaded_at
- uri = f"https://www.nicovideo.jp/watch/{ video_info['contentId'] }"
- (title, description, thumbnail) = fetch_embed_info (uri)
- try:
- upload = client.com.atproto.repo.upload_blob (
- io.BytesIO (requests.get (thumbnail,
- timeout = 60).content))
- thumb = upload.blob
- except Timeout:
- thumb = None
- comments = nico.get_comments (video_info['contentId'])
- popular_comments = sorted (comments,
- key = lambda c: c.nico_count,
- reverse = True)[:10]
- latest_comments = sorted (comments,
- key = lambda c: c.posted_at,
- reverse = True)[:10]
- embed_external = models.AppBskyEmbedExternal.Main (
- external = models.AppBskyEmbedExternal.External (
- title = title,
- description = description,
- thumb = thumb,
- uri = uri))
- prompt = f"{ since_posted.days }日と{ since_posted.seconds }秒前にニコニコに投稿された『{ video_info['title'] }』という動画が{ views_count }再生を突破しました。\n"
- prompt += f"コメント数は{ len (comments) }件です。\n"
- if video_info['tags']:
- prompt += f"つけられたタグは「{ '」、「'.join (video_info['tags']) }」です。\n"
- if comments:
- prompt += f"人気のコメントは次の通りです:「{ '」、「'.join (c.content for c in popular_comments) }」\n"
- prompt += f"最新のコメントは次の通りです:「{ '」、「'.join (c.content for c in latest_comments) }」\n"
- prompt += f"""
- 概要には次のように書かれています:
- ```html
- { video_info['description'] }
- ```
- このことについて、ニジカちゃんからのお祝いメッセージを下さい。
- ただし、そのメッセージ内には再生数の数値とその多さに応じたリアクションを添えてください。
- また、ぜひ投稿からこの再生数に至るまでにかかった時間や、つけられたタグ、コメントに対して思いを馳せてください。
- 好きなコメントがあったら教えてね。"""
- # 待ち時間計算
- dt = datetime.now ()
- d = dt.date ()
- if dt.hour >= 15:
- d += timedelta (days = 1)
- td = datetime.combine (d, time (15, 0)) - dt
- if kiriban_list:
- td /= len (kiriban_list)
- await asyncio.sleep (td.total_seconds ())
-
-
- async def update_kiriban_list (
- ) -> None:
- while True:
- await wait_until (time (15, 0))
- kiriban_list += fetch_kiriban_list (datetime.now ().date ())
-
-
- def fetch_kiriban_list (
- base_date: date,
- ) -> list[tuple[int, VideoInfo, datetime]]:
- _kiriban_list: list[tuple[int, VideoInfo, datetime]] = []
-
- latest_fetched_at = cast (date, (VideoHistory
- .where ('fetched_at', '<=', base_date)
- .max ('fetched_at')))
-
- for kiriban_views_count in KIRIBAN_VIEWS_COUNTS:
- targets = { vh.video.code for vh in (VideoHistory
- .where ('fetched_at', latest_fetched_at)
- .where ('views_count', '>=', kiriban_views_count)
- .get ()) }
- for code in targets:
- if code in [kiriban[1]['contentId'] for kiriban in _kiriban_list]:
- continue
- previous_views_count: int | None = (
- VideoHistory
- .where_has ('video', lambda q: q.where ('code', code))
- .where ('fetched_at', '<', latest_fetched_at)
- .max ('views_count'))
- if previous_views_count is None:
- previous_views_count = 0
- if previous_views_count >= kiriban_views_count:
- continue
- video_info = fetch_video_info (code)
- if video_info is not None:
- _kiriban_list.append ((kiriban_views_count, video_info,
- cast (Video, Video.where ('code', code).first ()).uploaded_at))
-
- return _kiriban_list
-
-
- def fetch_video_info (
- video_code: str,
- ) -> VideoInfo | None:
- video_info: dict[str, str | list[str]] = { 'contentId': video_code }
-
- bs = create_bs_from_url (f"https://www.nicovideo.jp/watch/{ video_code }")
- if bs is None:
- return None
-
- try:
- title = bs.find ('title')
- if title is None:
- return None
- video_info['title'] = '-'.join (title.text.split ('-')[:(-1)])[:(-1)]
-
- tags: str = bs.find ('meta', attrs = { 'name': 'keywords' }).get ('content') # type: ignore
- video_info['tags'] = tags.split (',')
-
- video_info['description'] = bs.find ('meta', attrs = { 'name': 'description' }).get ('content') # type: ignore
- except Exception:
- return None
-
- return cast (VideoInfo, video_info)
-
-
- def create_bs_from_url (
- url: str,
- params: dict | None = None,
- ) -> BeautifulSoup | None:
- """
- URL から BeautifulSoup インスタンス生成
-
- Parameters
- ----------
- url: str
- 捜査する URL
- params: dict
- パラメータ
-
- Return
- ------
- BeautifulSoup | None
- BeautifulSoup オブゼクト(失敗したら None)
- """
-
- if params is None:
- params = { }
-
- try:
- req = requests.get (url, params = params, timeout = 60)
- except Timeout:
- return None
-
- if req.status_code != 200:
- return None
-
- req.encoding = req.apparent_encoding
-
- return BeautifulSoup (req.text, 'hecoml.parser')
-
-
- async def report_nico (
- ) -> None:
- ...
-
-
- async def wait_until (
- t: time,
- ):
- dt = datetime.now ()
- d = dt.date ()
- if dt.time () >= t:
- d += timedelta (days = 1)
- await asyncio.sleep ((datetime.combine (d, t) - dt).total_seconds ())
-
-
- class VideoInfo (TypedDict):
- contentId: str
- title: str
- tags: list[str]
- description: str
-
-
- kiriban_list = (
- fetch_kiriban_list ((d := datetime.now ()).date ()
- - timedelta (days = d.hour < 15)))
-
- if __name__ == '__main__':
- asyncio.run (main ())
|