""" Bluesky のニジカがいろいろする. (近々機能ごとにファイル分けて systemd でイベント管理する予定) """ from __future__ import annotations import io import random import sys import time from datetime import date, datetime from datetime import time as dt_time from datetime import timedelta from typing import TypedDict, cast import requests from atproto import Client, models from atproto_client.models.app.bsky.feed.get_timeline import Response from bs4 import BeautifulSoup from requests.exceptions import Timeout import account import nico from ai.talk import Talk TARGET_WORDS = ['deerjika', 'ニジカ', 'ぼっち', '虹夏', '郁代', 'バーカ', 'kfif', 'kita-flatten-ikuyo-flatten', 'ラマ田', 'ゴートう', 'ぼざクリ', 'オオミソカ', '伊地知', '喜多ちゃん', '喜タイ', '洗澡鹿', 'シーザオ', '今日は本当に', 'ダイソーで', '変なチンチン', 'daisoで', 'だね~(笑)', 'おやつタイム', 'わさしが', 'わさび県', 'たぬマ', 'にくまる', 'ルイズマリー', '餅', 'ニジゴ', 'ゴニジ', 'ニジニジ', '新年だよね', 'うんこじゃん', 'ほくほくのジャガイモ'] def main ( ) -> None: time.sleep (60) client = Client (base_url = 'https://bsky.social') client.login (account.USER_ID, account.PASSWORD) got_kiriban_at: date = datetime.now ().date () - timedelta (days = datetime.now ().hour < 15) kiriban_list: list[tuple[int, nico.VideoInfo, datetime]] = ( nico.get_kiriban_list (got_kiriban_at)) kiriban_interval: timedelta = ((get_kiriban_dt_to_update () - datetime.now ()) / len (kiriban_list)) next_kiriban_at = datetime.now () last_posted_at = datetime.now () - timedelta (hours = 6) has_got_snack_time = False has_taken_hot_spring = False watched_videos = [] while True: now = datetime.now () for uri in check_notifications (client): records = get_thread_contents (client, uri, 20) if len (records) > 0: answer = Talk.main ((records[0]['text'] if (records[0]['embed'] is None or not hasattr (records[0]['embed'], 'images')) else [ { 'type': 'text', 'text': records[0]['text'] }, { 'type': 'image_url', 'image_url': { 'url': f"https://cdn.bsky.app/img/feed_fullsize/plain/{ records[0]['did'] }/{ records[0]['embed'].images[0].image.ref.link }" } }]), records[0]['name'], [*map (lambda record: { 'role': ('assistant' if (record['handle'] == account.USER_ID) else 'user'), 'content': record['text']}, reversed (records[1:]))]) client.post (answer, reply_to = models.AppBskyFeedPost.ReplyRef ( parent = records[0]['strong_ref'], root = records[-1]['strong_ref'])) like_posts (client) if kiriban_list and datetime.now () >= next_kiriban_at: (views_count, video_info, uploaded_at) = ( kiriban_list.pop (random.randint (0, len (kiriban_list) - 1))) since_posted = datetime.now () - uploaded_at uri = f"https://www.nicovideo.jp/watch/{ video_info['contentId'] }" (title, description, thumbnail) = get_embed_info (uri) try: upload = client.com.atproto.repo.upload_blob ( io.BytesIO (requests.get (thumbnail, timeout = 60).content)) thumb = upload.blob except Timeout: thumb = None comments = nico.get_comments (video_info['contentId']) popular_comments = sorted (comments, key = lambda c: c.nico_count, reverse = True)[:10] latest_comments = sorted (comments, key = lambda c: c.posted_at, reverse = True)[:10] embed_external = models.AppBskyEmbedExternal.Main ( external = models.AppBskyEmbedExternal.External ( title = title, description = description, thumb = thumb, uri = uri)) prompt = f"{ since_posted.days }日と{ since_posted.seconds }秒前にニコニコに投稿された『{ video_info['title'] }』という動画が{ views_count }再生を突破しました。\n" prompt += f"コメント数は{ len (comments) }件です。\n" if video_info['tags']: prompt += f"つけられたタグは「{ '」、「'.join (video_info['tags']) }」です。\n" if comments: prompt += f"人気のコメントは次の通りです:「{ '」、「'.join (c.content for c in popular_comments) }」\n" prompt += f"最新のコメントは次の通りです:「{ '」、「'.join (c.content for c in latest_comments) }」\n" prompt += f""" 概要には次のように書かれています: ```html { video_info['description'] } ``` このことについて、ニジカちゃんからのお祝いメッセージを下さい。 ただし、そのメッセージ内には再生数の数値とその多さに応じたリアクションを添えてください。 ちなみに1000再生以下はしょぼいです。 また、ぜひ投稿からこの再生数に至るまでにかかった時間や、つけられたタグ、コメントに対して思いを馳せてください。 好きなコメントがあったら教えてね。""" client.post (Talk.main (prompt), embed = embed_external) next_kiriban_at += kiriban_interval last_posted_at = now latest_deerjika = nico.get_latest_deerjika () if latest_deerjika is not None: for datum in [e for e in [latest_deerjika] if e['contentId'] not in watched_videos]: watched_videos += [datum['contentId']] uri = f"https://www.nicovideo.jp/watch/{ datum['contentId'] }" (title, description, thumbnail) = get_embed_info (uri) try: upload = client.com.atproto.repo.upload_blob ( io.BytesIO (requests.get (thumbnail, timeout = 60).content)) thumb = upload.blob except Timeout: thumb = None embed_external = models.AppBskyEmbedExternal.Main ( external = models.AppBskyEmbedExternal.External ( title = title, description = description, thumb = thumb, uri = uri)) client.post (Talk.main (f""" ニコニコに『{ datum['title'] }』という動画がアップされました。 つけられたタグは「{ '」、「'.join (datum['tags']) }」です。 概要には次のように書かれています: ```html { datum['description'] } ``` このことについて、みんなに告知するとともに、ニジカちゃんの感想を教えてください。 """), embed = embed_external) last_posted_at = now if now.hour == 14 and has_got_snack_time: has_got_snack_time = False if now.hour == 15: if got_kiriban_at < datetime.now ().date (): kiriban_list = nico.get_kiriban_list (datetime.now ().date ()) got_kiriban_at = datetime.now ().date () kiriban_interval = ((get_kiriban_dt_to_update () - datetime.now ()) / len (kiriban_list)) next_kiriban_at = datetime.now () if not has_got_snack_time: try: with open ('./assets/snack-time.jpg', 'rb') as f: image = models.AppBskyEmbedImages.Image ( alt = ('左に喜多ちゃん、右に人面鹿のニジカが' 'V字に並んでいる。' '喜多ちゃんは右手でピースサインをして' '片目をウインクしている。' 'ニジカは両手を広げ、' '右手にスプーンを持って' 'ポーズを取っている。' '背景には' '赤と黄色の放射線状の模様が広がり、' '下部に「おやつタイムだ!!!!」という' '日本語のテキストが表示されている。'), image = client.com.atproto.repo.upload_blob (f).blob) client.post (Talk.main ('おやつタイムだ!!!!'), embed = models.app.bsky.embed.images.Main ( images = [image])) last_posted_at = now except Exception: pass has_got_snack_time = True if now.hour == 20 and has_taken_hot_spring: has_taken_hot_spring = False if now.hour == 21 and not has_taken_hot_spring: try: with open ('./assets/hot-spring.jpg', 'rb') as f: image = models.AppBskyEmbedImages.Image ( alt = ('左に喜多ちゃん、右にわさび県産滋賀県が' 'V字に並んでいる。' '喜多ちゃんは右手でピースサインをして' '片目をウインクしている。' 'わさび県産滋賀県はただ茫然と' '立ち尽くしている。' '背景には' '血と空の色をした放射線状の模様が広がり、' '下部に「温泉に入ろう!!!」という' '日本語のテキストが表示されている。'), image = client.com.atproto.repo.upload_blob (f).blob) client.post (Talk.main ('温泉に入ろう!!!'), embed = models.app.bsky.embed.images.Main ( images = [image])) last_posted_at = now except Exception: pass has_taken_hot_spring = True if now - last_posted_at >= timedelta (hours = 6): client.post (Talk.main ('今どうしてる?')) last_posted_at = now time.sleep (60) def check_notifications ( client: Client, ) -> list: (uris, last_seen_at) = ([], client.get_current_time_iso ()) for notification in (client.app.bsky.notification.list_notifications () .notifications): if not notification.is_read: if notification.reason in ['mention', 'reply', 'quote']: uris += [notification.uri] elif notification.reason == 'follow': client.follow (notification.author.did) client.app.bsky.notification.update_seen ({ 'seen_at': last_seen_at }) return uris def get_thread_contents ( client: Client, uri: str, parent_height: int, ) -> list: response = (client.get_post_thread (uri = uri, parent_height = parent_height) .thread) records = [] while response is not None: records += [{ 'strong_ref': models.create_strong_ref (response.post), 'did': response.post.author.did, 'handle': response.post.author.handle, 'name': response.post.author.display_name, 'datetime': response.post.record.created_at, 'text': response.post.record.text, 'embed': response.post.record.embed }] response = response.parent return records def get_embed_info ( url: str ) -> tuple[str, str, str]: title: str = '' description: str = '' thumbnail: str = '' try: res = requests.get (url, timeout = 60) except Timeout: return ('', '', '') if res.status_code != 200: return ('', '', '') soup = BeautifulSoup (res.text, 'html.parser') tmp = soup.find ('title') if tmp is not None: title = tmp.text tmp = soup.find ('meta', attrs = { 'name': 'description' }) if tmp is not None and hasattr (tmp, 'get'): try: description = cast (str, tmp.get ('content')) except Exception: pass tmp = soup.find ('meta', attrs = { 'name': 'thumbnail' }) if tmp is not None and hasattr (tmp, 'get'): try: thumbnail = cast (str, tmp.get ('content')) except Exception: pass return (title, description, thumbnail) def get_kiriban_dt_to_update ( ) -> datetime: now = datetime.now () today = now.date () dt = datetime.combine (today, dt_time (15, 0)) if dt <= now: dt += timedelta (days = 1) return dt def like_posts ( client: Client, ) -> None: for post in get_target_posts (client): client.like (**post) def get_target_posts ( client: Client, ) -> list[LikeParams]: posts = [] timeline: Response = client.get_timeline () for feed in timeline.feed: if (feed.post.author.did != client.me.did and (feed.post.viewer.like is None) and any (target_word in feed.post.record.text.lower () for target_word in TARGET_WORDS)): posts.append (LikeParams({ 'uri': feed.post.uri, 'cid': feed.post.cid })) return posts class LikeParams (TypedDict): uri: str cid: str if __name__ == '__main__': main (*sys.argv[1:])