from datetime import datetime, timedelta import json import time import sys from atproto import Client, models import requests from ai.talk import Talk import account def check_notifications ( client: Client, ) -> list: (uris, last_seen_at) = ([], client.get_current_time_iso ()) for notification in (client.app.bsky.notification.list_notifications () .notifications): if not notification.is_read: if notification.reason in ['mention', 'reply']: uris += [notification.uri] elif notification.reason == 'follow': client.follow (notification.author.did) client.app.bsky.notification.update_seen ({ 'seen_at': last_seen_at }) return uris def get_thread_contents ( client: Client, uri: str, parent_height: int, ) -> list: response = (client.get_post_thread (uri = uri, parent_height = parent_height) .thread) records = [] while response is not None: records += [{ 'strong_ref': models.create_strong_ref (response.post), 'did': response.post.author.did, 'handle': response.post.author.handle, 'name': response.post.author.display_name, 'datetime': response.post.record.created_at, 'text': response.post.record.text, 'embed': response.post.record.embed }] response = response.parent return records def main ( ) -> None: client = Client (base_url = 'https://bsky.social') client.login (account.USER_ID, account.PASSWORD) last_posted_at = datetime.now () - timedelta (hours = 6) has_got_snack_time = False while True: now = datetime.now () for uri in check_notifications (client): records = get_thread_contents (client, uri, 20) if len (records) > 0: answer = Talk.main ((records[0]['text'] if (records[0]['embed'] is None or not hasattr (records[0]['embed'], 'images')) else [ { 'type': 'text', 'text': records[0]['text'] }, { 'type': 'image_url', 'image_url': { 'url': f"https://cdn.bsky.app/img/feed_fullsize/plain/{ records[0]['did'] }/{ records[0]['embed'].images[0].image.ref.link }" } }]), records[0]['name'], [*map (lambda record: { 'role': ('assistant' if (record['handle'] == account.USER_ID) else 'user'), 'content': record['text']}, reversed (records[1:]))]) client.send_post (answer, reply_to = models.AppBskyFeedPost.ReplyRef ( parent = records[0]['strong_ref'], root = records[-1]['strong_ref'])) if now.hour == 14 and has_got_snack_time: has_got_snack_time = False if now.hour == 15 and not has_got_snack_time: try: with open ('./assets/snack-time.jpg', 'rb') as f: image = models.AppBskyEmbedImages.Image ( alt = ('左に喜多ちゃん、右に人面鹿のニジカが' 'V字に並んでいる。' '喜多ちゃんは右手でピースサインをして' '片目をウインクしている。' 'ニジカは両手を広げ、' '右手にスプーンを持って' 'ポーズを取っている。' '背景には' '赤と黄色の放射線状の模様が広がり、' '下部に「おやつタイムだ!!!!」という' '日本語のテキストが表示されている。'), image = client.com.atproto.repo.upload_blob (f).blob) client.send_post (Talk.main ('おやつタイムだ!!!!'), embed = models.app.bsky.embed.images.Main ( images = [image])) last_posted_at = now except Exception: pass has_got_snack_time = True if now - last_posted_at >= timedelta (hours = 6): client.send_post (Talk.main ('今どうしてる?')) last_posted_at = now time.sleep (60) def get_nico_deerjika (): URL = ('https://snapshot.search.nicovideo.jp/api/v2/snapshot/video' '/contents/search') now = datetime.now () params = { 'q': '伊地知ニジカ', 'targets': 'tagsExact', '_sort': '-startTime', 'fields': 'contentId,title,description,tags,startTime', '_limit': 1, 'jsonFilter': json.dumps ({ 'type': 'or', 'filters': [{ 'type': 'range', 'field': 'startTime', 'from': ('%04d-%02d-%02dT00:00:00+09:00' % (now.year, now.month, now.day)), 'to': ('%04d-%02d-%02dT23:59:59+09:00' % (now.year, now.month, now.day)), 'include_lower': True }] }) } res = requests.get (URL, params = params).json () return res['data'][0] if len (res['data']) > 0 else None if __name__ == '__main__': main (*sys.argv[1:])