from datetime import datetime, timedelta import io import json import time import sys from atproto import Client, models from bs4 import BeautifulSoup from requests.exceptions import Timeout import requests from ai.talk import Talk import account def check_notifications ( client: Client, ) -> list: (uris, last_seen_at) = ([], client.get_current_time_iso ()) for notification in (client.app.bsky.notification.list_notifications () .notifications): if not notification.is_read: if notification.reason in ['mention', 'reply']: uris += [notification.uri] elif notification.reason == 'follow': client.follow (notification.author.did) client.app.bsky.notification.update_seen ({ 'seen_at': last_seen_at }) return uris def get_thread_contents ( client: Client, uri: str, parent_height: int, ) -> list: response = (client.get_post_thread (uri = uri, parent_height = parent_height) .thread) records = [] while response is not None: records += [{ 'strong_ref': models.create_strong_ref (response.post), 'did': response.post.author.did, 'handle': response.post.author.handle, 'name': response.post.author.display_name, 'datetime': response.post.record.created_at, 'text': response.post.record.text, 'embed': response.post.record.embed }] response = response.parent return records def main ( ) -> None: client = Client (base_url = 'https://bsky.social') client.login (account.USER_ID, account.PASSWORD) last_posted_at = datetime.now () - timedelta (hours = 6) has_got_snack_time = False watched_videos = [] while True: now = datetime.now () for uri in check_notifications (client): records = get_thread_contents (client, uri, 20) if len (records) > 0: answer = Talk.main ((records[0]['text'] if (records[0]['embed'] is None or not hasattr (records[0]['embed'], 'images')) else [ { 'type': 'text', 'text': records[0]['text'] }, { 'type': 'image_url', 'image_url': { 'url': f"https://cdn.bsky.app/img/feed_fullsize/plain/{ records[0]['did'] }/{ records[0]['embed'].images[0].image.ref.link }" } }]), records[0]['name'], [*map (lambda record: { 'role': ('assistant' if (record['handle'] == account.USER_ID) else 'user'), 'content': record['text']}, reversed (records[1:]))]) client.post (answer, reply_to = models.AppBskyFeedPost.ReplyRef ( parent = records[0]['strong_ref'], root = records[-1]['strong_ref'])) for datum in [e for e in get_nico_deerjika () if e['contentId'] not in watched_videos]: watched_videos += [datum['contentId']] uri = f"https://www.nicovideo.jp/watch/{ datum['contentId'] }" (title, description, thumbnail) = get_embed_info (uri) try: upload = client.com.atproto.repo.upload_blob ( io.BytesIO (requests.get (thumbnail, timeout = 60).content)) thumb = upload.blob except Timeout: thumb = None embed_external = models.AppBskyEmbedExternal.Main ( external = models.AppBskyEmbedExternal.External ( title = title, description = description, thumb = upload.blob, uri = uri)) client.post (Talk.main (f""" ニコニコに『{ datum['title'] }』という動画がアップされました。 つけられたタグは「{ '」、「'.join (datum['tags']) }」です。 概要には次のように書かれています: ```html { datum['description'] } ``` このことについて、みんなに告知するとともに、ニジカちゃんの感想を教えてください。 """), embed = embed_external) if now.hour == 14 and has_got_snack_time: has_got_snack_time = False if now.hour == 15 and not has_got_snack_time: try: with open ('./assets/snack-time.jpg', 'rb') as f: image = models.AppBskyEmbedImages.Image ( alt = ('左に喜多ちゃん、右に人面鹿のニジカが' 'V字に並んでいる。' '喜多ちゃんは右手でピースサインをして' '片目をウインクしている。' 'ニジカは両手を広げ、' '右手にスプーンを持って' 'ポーズを取っている。' '背景には' '赤と黄色の放射線状の模様が広がり、' '下部に「おやつタイムだ!!!!」という' '日本語のテキストが表示されている。'), image = client.com.atproto.repo.upload_blob (f).blob) client.post (Talk.main ('おやつタイムだ!!!!'), embed = models.app.bsky.embed.images.Main ( images = [image])) last_posted_at = now except Exception: pass has_got_snack_time = True if now - last_posted_at >= timedelta (hours = 6): client.post (Talk.main ('今どうしてる?')) last_posted_at = now time.sleep (60) def get_nico_deerjika (): URL = ('https://snapshot.search.nicovideo.jp/api/v2/snapshot/video' '/contents/search') now = datetime.now () base = now - timedelta (hours = 4) params = { 'q': '伊地知ニジカ', 'targets': 'tags', '_sort': '-startTime', 'fields': 'contentId,title,description,tags,startTime', '_limit': 20, 'jsonFilter': json.dumps ({ 'type': 'or', 'filters': [{ 'type': 'range', 'field': 'startTime', 'from': ('%04d-%02d-%02dT%02d:%02d:00+09:00' % (base.year, base.month, base.day, base.hour, base.minute)), 'to': ('%04d-%02d-%02dT23:59:59+09:00' % (now.year, now.month, now.day)), 'include_lower': True }] }) } try: res = requests.get (URL, params = params, timeout = 60).json () except Timeout: return [] data = [] for datum in res['data']: datum['tags'] = datum['tags'].split () data.append (datum) return data def get_embed_info ( url: str ) -> (str, str, str): title: str = '' description: str = '' thumbnail: str = '' try: res = requests.get (url, timeout = 60) except Timeout: return ('', '', '') if res.status_code != 200: return ('', '', '') soup = BeautifulSoup (res.text, 'html.parser') tmp = soup.find ('title') if tmp is not None: title = tmp.text tmp = soup.find ('meta', attrs = { 'name': 'description' }) if tmp is not None: description = tmp.get ('content') tmp = soup.find ('meta', attrs = { 'name': 'thumbnail' }) if tmp is not None: thumbnail = tmp.get ('content') return (title, description, thumbnail) if __name__ == '__main__': main (*sys.argv[1:])