""" Bluesky のニジカがいろいろする. (近々機能ごとにファイル分けて systemd でイベント管理する予定) """ import io import random import sys import time from datetime import date, datetime from datetime import time as dt_time from datetime import timedelta from typing import cast import requests from atproto import Client, models from bs4 import BeautifulSoup from requests.exceptions import Timeout import account import nico from ai.talk import Talk def check_notifications ( client: Client, ) -> list: (uris, last_seen_at) = ([], client.get_current_time_iso ()) for notification in (client.app.bsky.notification.list_notifications () .notifications): if not notification.is_read: if notification.reason in ['mention', 'reply', 'quote']: uris += [notification.uri] elif notification.reason == 'follow': client.follow (notification.author.did) client.app.bsky.notification.update_seen ({ 'seen_at': last_seen_at }) return uris def get_thread_contents ( client: Client, uri: str, parent_height: int, ) -> list: response = (client.get_post_thread (uri = uri, parent_height = parent_height) .thread) records = [] while response is not None: records += [{ 'strong_ref': models.create_strong_ref (response.post), 'did': response.post.author.did, 'handle': response.post.author.handle, 'name': response.post.author.display_name, 'datetime': response.post.record.created_at, 'text': response.post.record.text, 'embed': response.post.record.embed }] response = response.parent return records def main ( ) -> None: time.sleep (60) client = Client (base_url = 'https://bsky.social') client.login (account.USER_ID, account.PASSWORD) got_kiriban_at: date = datetime.now ().date () - timedelta (days = datetime.now ().hour < 15) kiriban_list: list[tuple[int, nico.VideoInfo, datetime]] = ( nico.get_kiriban_list (got_kiriban_at)) kiriban_interval: timedelta = ((get_kiriban_dt_to_update () - datetime.now ()) / len (kiriban_list)) next_kiriban_at = datetime.now () last_posted_at = datetime.now () - timedelta (hours = 6) has_got_snack_time = False has_taken_hot_spring = False watched_videos = [] while True: now = datetime.now () for uri in check_notifications (client): records = get_thread_contents (client, uri, 20) if len (records) > 0: answer = Talk.main ((records[0]['text'] if (records[0]['embed'] is None or not hasattr (records[0]['embed'], 'images')) else [ { 'type': 'text', 'text': records[0]['text'] }, { 'type': 'image_url', 'image_url': { 'url': f"https://cdn.bsky.app/img/feed_fullsize/plain/{ records[0]['did'] }/{ records[0]['embed'].images[0].image.ref.link }" } }]), records[0]['name'], [*map (lambda record: { 'role': ('assistant' if (record['handle'] == account.USER_ID) else 'user'), 'content': record['text']}, reversed (records[1:]))]) client.post (answer, reply_to = models.AppBskyFeedPost.ReplyRef ( parent = records[0]['strong_ref'], root = records[-1]['strong_ref'])) if kiriban_list and datetime.now () >= next_kiriban_at: (views_count, video_info, uploaded_at) = ( kiriban_list.pop (random.randint (0, len (kiriban_list) - 1))) since_posted = datetime.now () - uploaded_at uri = f"https://www.nicovideo.jp/watch/{ video_info['contentId'] }" (title, description, thumbnail) = get_embed_info (uri) try: upload = client.com.atproto.repo.upload_blob ( io.BytesIO (requests.get (thumbnail, timeout = 60).content)) thumb = upload.blob except Timeout: thumb = None comments = nico.get_comments (video_info['contentId']) popular_comments = sorted (comments, key = lambda c: c.nico_count, reverse = True)[:10] latest_comments = sorted (comments, key = lambda c: c.posted_at, reverse = True)[:10] embed_external = models.AppBskyEmbedExternal.Main ( external = models.AppBskyEmbedExternal.External ( title = title, description = description, thumb = thumb, uri = uri)) prompt = f"{ since_posted.days }日と{ since_posted.seconds }秒前にニコニコに投稿された『{ video_info['title'] }』という動画が{ views_count }再生を突破しました。\n" if video_info['tags']: prompt += f"つけられたタグは「{ '」、「'.join (video_info['tags']) }」です。\n" if comments: prompt += f"人気のコメントは次の通りです:「{ '」、「'.join (c.content for c in popular_comments) }」\n" prompt += f"最新のコメントは次の通りです:「{ '」、「'.join (c.content for c in latest_comments) }」\n" prompt += f""" 概要には次のように書かれています: ```html { video_info['description'] } ``` このことについて、ニジカちゃんからのお祝いメッセージを下さい。 ただし、そのメッセージ内には再生数の数値とその多さに応じたリアクション(「すっご~い!」や「しょぼ~い」など)を添えてください。""" client.post (Talk.main (prompt), embed = embed_external) next_kiriban_at += kiriban_interval last_posted_at = now latest_deerjika = nico.get_latest_deerjika () if latest_deerjika is not None: for datum in [e for e in [latest_deerjika] if e['contentId'] not in watched_videos]: watched_videos += [datum['contentId']] uri = f"https://www.nicovideo.jp/watch/{ datum['contentId'] }" (title, description, thumbnail) = get_embed_info (uri) try: upload = client.com.atproto.repo.upload_blob ( io.BytesIO (requests.get (thumbnail, timeout = 60).content)) thumb = upload.blob except Timeout: thumb = None embed_external = models.AppBskyEmbedExternal.Main ( external = models.AppBskyEmbedExternal.External ( title = title, description = description, thumb = thumb, uri = uri)) client.post (Talk.main (f""" ニコニコに『{ datum['title'] }』という動画がアップされました。 つけられたタグは「{ '」、「'.join (datum['tags']) }」です。 概要には次のように書かれています: ```html { datum['description'] } ``` このことについて、みんなに告知するとともに、ニジカちゃんの感想を教えてください。 """), embed = embed_external) last_posted_at = now if now.hour == 14 and has_got_snack_time: has_got_snack_time = False if now.hour == 15: if got_kiriban_at < datetime.now ().date (): kiriban_list = nico.get_kiriban_list (datetime.now ().date ()) got_kiriban_at = datetime.now ().date () kiriban_interval = ((get_kiriban_dt_to_update () - datetime.now ()) / len (kiriban_list)) next_kiriban_at = datetime.now () if not has_got_snack_time: try: with open ('./assets/snack-time.jpg', 'rb') as f: image = models.AppBskyEmbedImages.Image ( alt = ('左に喜多ちゃん、右に人面鹿のニジカが' 'V字に並んでいる。' '喜多ちゃんは右手でピースサインをして' '片目をウインクしている。' 'ニジカは両手を広げ、' '右手にスプーンを持って' 'ポーズを取っている。' '背景には' '赤と黄色の放射線状の模様が広がり、' '下部に「おやつタイムだ!!!!」という' '日本語のテキストが表示されている。'), image = client.com.atproto.repo.upload_blob (f).blob) client.post (Talk.main ('おやつタイムだ!!!!'), embed = models.app.bsky.embed.images.Main ( images = [image])) last_posted_at = now except Exception: pass has_got_snack_time = True if now.hour == 20 and has_taken_hot_spring: has_taken_hot_spring = False if now.hour == 21 and not has_taken_hot_spring: try: with open ('./assets/hot-spring.jpg', 'rb') as f: image = models.AppBskyEmbedImages.Image ( alt = ('左に喜多ちゃん、右にわさび県産滋賀県が' 'V字に並んでいる。' '喜多ちゃんは右手でピースサインをして' '片目をウインクしている。' 'わさび県産滋賀県はただ茫然と' '立ち尽くしている。' '背景には' '血と空の色をした放射線状の模様が広がり、' '下部に「温泉に入ろう!!!」という' '日本語のテキストが表示されている。'), image = client.com.atproto.repo.upload_blob (f).blob) client.post (Talk.main ('温泉に入ろう!!!'), embed = models.app.bsky.embed.images.Main ( images = [image])) last_posted_at = now except Exception: pass has_taken_hot_spring = True if now - last_posted_at >= timedelta (hours = 6): client.post (Talk.main ('今どうしてる?')) last_posted_at = now time.sleep (60) def get_embed_info ( url: str ) -> tuple[str, str, str]: title: str = '' description: str = '' thumbnail: str = '' try: res = requests.get (url, timeout = 60) except Timeout: return ('', '', '') if res.status_code != 200: return ('', '', '') soup = BeautifulSoup (res.text, 'html.parser') tmp = soup.find ('title') if tmp is not None: title = tmp.text tmp = soup.find ('meta', attrs = { 'name': 'description' }) if tmp is not None and hasattr (tmp, 'get'): try: description = cast (str, tmp.get ('content')) except Exception: pass tmp = soup.find ('meta', attrs = { 'name': 'thumbnail' }) if tmp is not None and hasattr (tmp, 'get'): try: thumbnail = cast (str, tmp.get ('content')) except Exception: pass return (title, description, thumbnail) def get_kiriban_dt_to_update ( ) -> datetime: now = datetime.now () today = now.date () dt = datetime.combine (today, dt_time (15, 0)) if dt <= now: dt += timedelta (days = 1) return dt if __name__ == '__main__': main (*sys.argv[1:])