diff --git a/main.py b/main.py index cb38918..cf0991b 100644 --- a/main.py +++ b/main.py @@ -5,33 +5,15 @@ Bluesky のニジカがいろいろする. from __future__ import annotations -import io -import random import sys import time -from datetime import date, datetime -from datetime import time as dt_time -from datetime import timedelta -from typing import TypedDict, cast +from datetime import datetime, timedelta +from typing import TypedDict -import requests from atproto import Client, models from atproto_client.models.app.bsky.feed.get_timeline import Response -from bs4 import BeautifulSoup -from requests.exceptions import Timeout import account -import nico -from ai.talk import Talk - -TARGET_WORDS = ['deerjika', 'ニジカ', 'ぼっち', '虹夏', '郁代', 'バーカ', - 'kfif', 'kita-flatten-ikuyo-flatten', 'ラマ田', 'ゴートう', - 'ぼざクリ', 'オオミソカ', '伊地知', '喜多ちゃん', - '喜タイ', '洗澡鹿', 'シーザオ', '今日は本当に', - 'ダイソーで', '変なチンチン', 'daisoで', 'だね~(笑)', - 'おやつタイム', 'わさしが', 'わさび県', 'たぬマ', 'にくまる', - 'ルイズマリー', '餅', 'ニジゴ', 'ゴニジ', 'ニジニジ', - '新年だよね', 'うんこじゃん', 'ほくほくのジャガイモ'] def main ( @@ -42,23 +24,15 @@ def main ( client.login (account.USER_ID, account.PASSWORD) - got_kiriban_at: date = datetime.now ().date () - timedelta (days = datetime.now ().hour < 15) - kiriban_list: list[tuple[int, nico.VideoInfo, datetime]] = ( - nico.get_kiriban_list (got_kiriban_at)) - kiriban_interval: timedelta = ((get_kiriban_dt_to_update () - datetime.now ()) - / len (kiriban_list)) - next_kiriban_at = datetime.now () - last_posted_at = datetime.now () - timedelta (hours = 6) has_got_snack_time = False has_taken_hot_spring = False - watched_videos = [] while True: now = datetime.now () for uri in check_notifications (client): records = get_thread_contents (client, uri, 20) - if len (records) > 0: + if records: answer = Talk.main ((records[0]['text'] if (records[0]['embed'] is None or not hasattr (records[0]['embed'], @@ -66,7 +40,9 @@ def main ( else [ { 'type': 'text', 'text': records[0]['text'] }, { 'type': 'image_url', 'image_url': { - 'url': f"https://cdn.bsky.app/img/feed_fullsize/plain/{ records[0]['did'] }/{ records[0]['embed'].images[0].image.ref.link }" } }]), + 'url': (f"https://cdn.bsky.app/img/feed_fullsize" + f"/plain/{ records[0]['did'] }" + f"/{ records[0]['embed'].images[0].image.ref.link }") } }]), records[0]['name'], [*map (lambda record: { 'role': ('assistant' @@ -81,98 +57,7 @@ def main ( parent = records[0]['strong_ref'], root = records[-1]['strong_ref'])) - like_posts (client) - - if kiriban_list and datetime.now () >= next_kiriban_at: - (views_count, video_info, uploaded_at) = ( - kiriban_list.pop (random.randint (0, len (kiriban_list) - 1))) - since_posted = datetime.now () - uploaded_at - uri = f"https://www.nicovideo.jp/watch/{ video_info['contentId'] }" - (title, description, thumbnail) = get_embed_info (uri) - try: - upload = client.com.atproto.repo.upload_blob ( - io.BytesIO (requests.get (thumbnail, - timeout = 60).content)) - thumb = upload.blob - except Timeout: - thumb = None - comments = nico.get_comments (video_info['contentId']) - popular_comments = sorted (comments, - key = lambda c: c.nico_count, - reverse = True)[:10] - latest_comments = sorted (comments, - key = lambda c: c.posted_at, - reverse = True)[:10] - embed_external = models.AppBskyEmbedExternal.Main ( - external = models.AppBskyEmbedExternal.External ( - title = title, - description = description, - thumb = thumb, - uri = uri)) - prompt = f"{ since_posted.days }日と{ since_posted.seconds }秒前にニコニコに投稿された『{ video_info['title'] }』という動画が{ views_count }再生を突破しました。\n" - prompt += f"コメント数は{ len (comments) }件です。\n" - if video_info['tags']: - prompt += f"つけられたタグは「{ '」、「'.join (video_info['tags']) }」です。\n" - if comments: - prompt += f"人気のコメントは次の通りです:「{ '」、「'.join (c.content for c in popular_comments) }」\n" - prompt += f"最新のコメントは次の通りです:「{ '」、「'.join (c.content for c in latest_comments) }」\n" - prompt += f""" -概要には次のように書かれています: -```html -{ video_info['description'] } -``` -このことについて、ニジカちゃんからのお祝いメッセージを下さい。 -ただし、そのメッセージ内には再生数の数値とその多さに応じたリアクションを添えてください。 -また、ぜひ投稿からこの再生数に至るまでにかかった時間や、つけられたタグ、コメントに対して思いを馳せてください。 -好きなコメントがあったら教えてね。""" - client.post (Talk.main (prompt), embed = embed_external) - next_kiriban_at += kiriban_interval - last_posted_at = now - - latest_deerjika = nico.get_latest_deerjika () - if latest_deerjika is not None: - for datum in [e for e in [latest_deerjika] - if e['contentId'] not in watched_videos]: - watched_videos += [datum['contentId']] - - uri = f"https://www.nicovideo.jp/watch/{ datum['contentId'] }" - (title, description, thumbnail) = get_embed_info (uri) - try: - upload = client.com.atproto.repo.upload_blob ( - io.BytesIO (requests.get (thumbnail, - timeout = 60).content)) - thumb = upload.blob - except Timeout: - thumb = None - - embed_external = models.AppBskyEmbedExternal.Main ( - external = models.AppBskyEmbedExternal.External ( - title = title, - description = description, - thumb = thumb, - uri = uri)) - client.post (Talk.main (f""" -ニコニコに『{ datum['title'] }』という動画がアップされました。 -つけられたタグは「{ '」、「'.join (datum['tags']) }」です。 -概要には次のように書かれています: -```html -{ datum['description'] } -``` -このことについて、みんなに告知するとともに、ニジカちゃんの感想を教えてください。 """), - embed = embed_external) - last_posted_at = now - - if now.hour == 14 and has_got_snack_time: - has_got_snack_time = False - if now.hour == 15: - if got_kiriban_at < datetime.now ().date (): - kiriban_list = nico.get_kiriban_list (datetime.now ().date ()) - got_kiriban_at = datetime.now ().date () - kiriban_interval = ((get_kiriban_dt_to_update () - datetime.now ()) - / len (kiriban_list)) - next_kiriban_at = datetime.now () - if not has_got_snack_time: try: with open ('./assets/snack-time.jpg', 'rb') as f: @@ -223,10 +108,6 @@ def main ( pass has_taken_hot_spring = True - if now - last_posted_at >= timedelta (hours = 6): - client.post (Talk.main ('今どうしてる?')) - last_posted_at = now - time.sleep (60) @@ -270,76 +151,6 @@ def get_thread_contents ( return records -def get_embed_info ( - url: str -) -> tuple[str, str, str]: - title: str = '' - description: str = '' - thumbnail: str = '' - - try: - res = requests.get (url, timeout = 60) - except Timeout: - return ('', '', '') - - if res.status_code != 200: - return ('', '', '') - - soup = BeautifulSoup (res.text, 'html.parser') - - tmp = soup.find ('title') - if tmp is not None: - title = tmp.text - - tmp = soup.find ('meta', attrs = { 'name': 'description' }) - if tmp is not None and hasattr (tmp, 'get'): - try: - description = cast (str, tmp.get ('content')) - except Exception: - pass - - tmp = soup.find ('meta', attrs = { 'name': 'thumbnail' }) - if tmp is not None and hasattr (tmp, 'get'): - try: - thumbnail = cast (str, tmp.get ('content')) - except Exception: - pass - - return (title, description, thumbnail) - - -def get_kiriban_dt_to_update ( -) -> datetime: - now = datetime.now () - today = now.date () - dt = datetime.combine (today, dt_time (15, 0)) - if dt <= now: - dt += timedelta (days = 1) - return dt - - -def like_posts ( - client: Client, -) -> None: - for post in get_target_posts (client): - client.like (**post) - - -def get_target_posts ( - client: Client, -) -> list[LikeParams]: - posts = [] - - timeline: Response = client.get_timeline () - for feed in timeline.feed: - if (feed.post.author.did != client.me.did - and (feed.post.viewer.like is None) - and any (target_word in feed.post.record.text.lower () for target_word in TARGET_WORDS)): - posts.append (LikeParams({ 'uri': feed.post.uri, 'cid': feed.post.cid })) - - return posts - - class LikeParams (TypedDict): uri: str cid: str diff --git a/nico.py b/nico.py deleted file mode 100644 index 0bc2fbc..0000000 --- a/nico.py +++ /dev/null @@ -1,150 +0,0 @@ -""" -ニコニコのニジカ動画取得モヂュール -""" - -from __future__ import annotations - -import os -from datetime import date, datetime, timedelta -from typing import TypedDict, cast - -import requests -from bs4 import BeautifulSoup -from requests.exceptions import Timeout -from eloquent import DatabaseManager, Model - -import nicolib -from db.models import Comment, Tag, Video, VideoHistory, VideoTag - -CONFIG: dict[str, DbConfig] = { 'mysql': { 'driver': 'mysql', - 'host': 'localhost', - 'database': 'nizika_nico', - 'user': os.environ['MYSQL_USER'], - 'password': os.environ['MYSQL_PASS'], - 'prefix': '' } } - -DB = DatabaseManager (CONFIG) -Model.set_connection_resolver (DB) - -KIRIBAN_VIEWS_COUNTS: list[int] = sorted ({ *range (1_000, 10_000, 1_000), - *range (10_000, 1_000_001, 10_000), - 194, 245, 510, 810, 114_514, 1_940, 2_450, 5_100, - 19_400, 24_500, 51_000, 93_194, 2_424, 242_424, 1_919, - 4_545, 194_245, 245_194, 510_245 }, - reverse = True) - -class VideoInfo (TypedDict): - contentId: str - title: str - tags: list[str] - description: str - - -def get_latest_deerjika ( -) -> VideoInfo | None: - tag = '伊地知ニジカ OR ぼざろクリーチャーシリーズ' - url = f"https://www.nicovideo.jp/tag/{ tag }" - - params = { 'sort': 'f', - 'order': 'd' } - - video_info = { } - - bs = get_bs_from_url (url, params) - if bs is None: - return None - - try: - video = (bs.find_all ('ul', class_ = 'videoListInner')[1] - .find ('li', class_ = 'item')) - - video_info['contentId'] = video['data-video-id'] - except Exception: - return None - - return get_video_info (video_info['contentId']) - - -def get_bs_from_url ( - url: str, - params: dict = { }, -) -> BeautifulSoup | None: - """ - URL から BeautifulSoup インスタンス生成 - - Parameters - ---------- - url: str - 捜査する URL - params: dict - パラメータ - - Return - ------ - BeautifulSoup | None - BeautifulSoup オブゼクト(失敗したら None) - """ - - try: - req = requests.get (url, params = params, timeout = 60) - except Timeout: - return None - - if req.status_code != 200: - return None - - req.encoding = req.apparent_encoding - - return BeautifulSoup (req.text, 'html.parser') - - -def get_kiriban_list ( - base_date: date, -) -> list[tuple[int, VideoInfo, datetime]]: - kiriban_list: list[tuple[int, VideoInfo, datetime]] = [] - - latest_fetched_at = cast (date, (VideoHistory - .where ('fetched_at', '<=', base_date) - .max ('fetched_at'))) - - for kiriban_views_count in KIRIBAN_VIEWS_COUNTS: - targets = { vh.video.code for vh in (VideoHistory - .where ('fetched_at', latest_fetched_at) - .where ('views_count', '>=', kiriban_views_count) - .get ()) } - for code in targets: - if code in [kiriban[1]['contentId'] for kiriban in kiriban_list]: - continue - previous_views_count: int | None = ( - VideoHistory - .where_has ('video', lambda q: q.where ('code', code)) - .where ('fetched_at', '<', latest_fetched_at) - .max ('views_count')) - if previous_views_count is None: - previous_views_count = 0 - if previous_views_count >= kiriban_views_count: - continue - video_info = get_video_info (code) - if video_info is not None: - kiriban_list.append ((kiriban_views_count, video_info, - cast (Video, Video.where ('code', code).first ()).uploaded_at)) - - return kiriban_list - - -def get_comments ( - video_code: str, -) -> list[Comment]: - video = Video.where ('code', video_code).first () - if video is None: - return [] - return video.comments - - -class DbConfig (TypedDict): - driver: str - host: str - database: str - user: str - password: str - prefix: str diff --git a/test.py b/test.py index bb678d5..60f1798 100644 --- a/test.py +++ b/test.py @@ -35,7 +35,8 @@ async def main ( """ await asyncio.gather (like_posts (), - reply ()) + reply (), + check_mentions ()) async def like_posts (