from datetime import datetime, timedelta import time import sys from atproto import Client, models from ai.talk import Talk import account def check_notifications ( client: Client, ) -> list: (uris, last_seen_at) = ([], client.get_current_time_iso ()) for notification in (client.app.bsky.notification.list_notifications () .notifications): if not notification.is_read: if notification.reason in ['mention', 'reply']: uris += [notification.uri] elif notification.reason == 'follow': client.follow (notification.author.did) client.app.bsky.notification.update_seen ({ 'seen_at': last_seen_at }) return uris def get_thread_contents ( client: Client, uri: str, parent_height: int, ) -> list: response = (client.get_post_thread (uri = uri, parent_height = parent_height) .thread) records = [] while response is not None: records += [{ 'strong_ref': models.create_strong_ref (response.post), 'did': response.post.author.did, 'handle': response.post.author.handle, 'name': response.post.author.display_name, 'datetime': response.post.record.created_at, 'text': response.post.record.text, 'embed': response.post.record.embed }] response = response.parent return records def main () -> None: client = Client (base_url = 'https://bsky.social') client.login (account.USER_ID, account.PASSWORD) last_posted_at = datetime.now () - timedelta (hours = 6) has_got_snack_time = False while True: now = datetime.now () for uri in check_notifications (client): records = get_thread_contents (client, uri, 20) if len (records) > 0: answer = Talk.main ((records[0]['text'] if (records[0]['embed'] is None or not hasattr (records[0]['embed'], 'images')) else [ { 'type': 'text', 'text': records[0]['text'] }, { 'type': 'image_url', 'image_url': { 'url': f"https://cdn.bsky.app/img/feed_fullsize/plain/{ records[0]['did'] }/{ records[0]['embed'].images[0].image.ref.link }" } }]), records[0]['name'], [*map (lambda record: { 'role': ('assistant' if (record['handle'] == account.USER_ID) else 'user'), 'content': record['text']}, reversed (records[1:]))]) client.send_post (answer, reply_to = models.AppBskyFeedPost.ReplyRef ( parent = records[0]['strong_ref'], root = records[-1]['strong_ref'])) if now.hour == 14 and has_got_snack_time: has_got_snack_time = False if now.hour == 15 and not has_got_snack_time: client.send_post (Talk.main ('おやつタイムだ!!!!')) last_posted_at = now has_got_snack_time = True if now - last_posted_at >= timedelta (hours = 6): client.send_post (Talk.main ('今どうしてる?')) last_posted_at = now time.sleep (60) if __name__ == '__main__': main (*sys.argv[1:])