from datetime import datetime, timedelta import json import time import sys from atproto import Client, models import requests from ai.talk import Talk import account def check_notifications ( client: Client, ) -> list: (uris, last_seen_at) = ([], client.get_current_time_iso ()) for notification in (client.app.bsky.notification.list_notifications () .notifications): if not notification.is_read: if notification.reason in ['mention', 'reply']: uris += [notification.uri] elif notification.reason == 'follow': client.follow (notification.author.did) client.app.bsky.notification.update_seen ({ 'seen_at': last_seen_at }) return uris def get_thread_contents ( client: Client, uri: str, parent_height: int, ) -> list: response = (client.get_post_thread (uri = uri, parent_height = parent_height) .thread) records = [] while response is not None: records += [{ 'strong_ref': models.create_strong_ref (response.post), 'did': response.post.author.did, 'handle': response.post.author.handle, 'name': response.post.author.display_name, 'datetime': response.post.record.created_at, 'text': response.post.record.text, 'embed': response.post.record.embed }] response = response.parent return records def main ( ) -> None: client = Client (base_url = 'https://bsky.social') client.login (account.USER_ID, account.PASSWORD) last_posted_at = datetime.now () - timedelta (hours = 6) has_got_snack_time = False watched_videos = [] while True: now = datetime.now () for uri in check_notifications (client): records = get_thread_contents (client, uri, 20) if len (records) > 0: answer = Talk.main ((records[0]['text'] if (records[0]['embed'] is None or not hasattr (records[0]['embed'], 'images')) else [ { 'type': 'text', 'text': records[0]['text'] }, { 'type': 'image_url', 'image_url': { 'url': f"https://cdn.bsky.app/img/feed_fullsize/plain/{ records[0]['did'] }/{ records[0]['embed'].images[0].image.ref.link }" } }]), records[0]['name'], [*map (lambda record: { 'role': ('assistant' if (record['handle'] == account.USER_ID) else 'user'), 'content': record['text']}, reversed (records[1:]))]) client.send_post (answer, reply_to = models.AppBskyFeedPost.ReplyRef ( parent = records[0]['strong_ref'], root = records[-1]['strong_ref'])) for datum in [e for e in get_nico_deerjika () if e['contentId'] not in watched_videos]: watched_videos += [datum['contentId']] client.send_post (Talk.main (f""" ニコニコに『{ datum['title'] }』という動画がアップされました。 つけられたタグは「{ '」、「'.join (datum['tags']) }」です。 概要には次のように書かれています: ```html { datum['description'] } ``` このことについて、みんなに告知するとともに、ニジカちゃんの感想を教えてください。 """)) if now.hour == 14 and has_got_snack_time: has_got_snack_time = False if now.hour == 15 and not has_got_snack_time: try: with open ('./assets/snack-time.jpg', 'rb') as f: image = models.AppBskyEmbedImages.Image ( alt = ('左に喜多ちゃん、右に人面鹿のニジカが' 'V字に並んでいる。' '喜多ちゃんは右手でピースサインをして' '片目をウインクしている。' 'ニジカは両手を広げ、' '右手にスプーンを持って' 'ポーズを取っている。' '背景には' '赤と黄色の放射線状の模様が広がり、' '下部に「おやつタイムだ!!!!」という' '日本語のテキストが表示されている。'), image = client.com.atproto.repo.upload_blob (f).blob) client.send_post (Talk.main ('おやつタイムだ!!!!'), embed = models.app.bsky.embed.images.Main ( images = [image])) last_posted_at = now except Exception: pass has_got_snack_time = True if now - last_posted_at >= timedelta (hours = 6): client.send_post (Talk.main ('今どうしてる?')) last_posted_at = now time.sleep (60) def get_nico_deerjika (): URL = ('https://snapshot.search.nicovideo.jp/api/v2/snapshot/video' '/contents/search') now = datetime.now () base = now - timedelta (hours = 1) params = { 'q': '伊地知ニジカ', 'targets': 'tags', '_sort': '-startTime', 'fields': 'contentId,title,description,tags,startTime', '_limit': 20, 'jsonFilter': json.dumps ({ 'type': 'or', 'filters': [{ 'type': 'range', 'field': 'startTime', 'from': ('%04d-%02d-%02dT%02d:%02d:00+09:00' % (base.year, base.month, base.day, base.hour, base.minute)), 'to': ('%04d-%02d-%02dT23:59:59+09:00' % (now.year, now.month, now.day)), 'include_lower': True }] }) } res = requests.get (URL, params = params).json () data = [] for datum in res['data']: datum['tags'] = datum['tags'].split () data.append (datum) return data if __name__ == '__main__': main (*sys.argv[1:])