| @@ -2,22 +2,89 @@ from datetime import datetime, timedelta | |||||
| import time | import time | ||||
| import sys | import sys | ||||
| from atproto import Client | |||||
| from atproto import Client, models | |||||
| from ai.talk import Talk | from ai.talk import Talk | ||||
| import account | import account | ||||
| def check_notifications ( | |||||
| client: Client, | |||||
| ) -> list: | |||||
| (uris, last_seen_at) = ([], client.get_current_time_iso ()) | |||||
| for notification in (client.app.bsky.notification.list_notifications () | |||||
| .notifications): | |||||
| if ((not notification.is_read) | |||||
| and (notification.reason in ['mention', 'reply'])): | |||||
| uris += [notification.uri] | |||||
| client.app.bsky.notification.update_seen ({ 'seen_at': last_seen_at }) | |||||
| return uris | |||||
| def get_thread_contents ( | |||||
| client: Client, | |||||
| uri: str, | |||||
| parent_height: int, | |||||
| ) -> list: | |||||
| response = (client.get_post_thread (uri = uri, | |||||
| parent_height = parent_height) | |||||
| .thread) | |||||
| records = [] | |||||
| while response is not None: | |||||
| records += [{ 'strong_ref': models.create_strong_ref (response.post), | |||||
| 'did': response.post.author.did, | |||||
| 'handle': response.post.author.handle, | |||||
| 'name': response.post.author.display_name, | |||||
| 'datetime': response.post.record.created_at, | |||||
| 'text': response.post.record.text }] | |||||
| response = response.parent | |||||
| return records | |||||
| def main () -> None: | def main () -> None: | ||||
| api = Client () | |||||
| client = Client (base_url = 'https://bsky.social') | |||||
| api.login (account.USER_ID, account.PASSWORD) | |||||
| client.login (account.USER_ID, account.PASSWORD) | |||||
| last_posted_at = datetime.now () - timedelta (hours = 6) | last_posted_at = datetime.now () - timedelta (hours = 6) | ||||
| has_got_snack_time = False | |||||
| while True: | while True: | ||||
| if datetime.now () - last_posted_at >= timedelta (hours = 6): | |||||
| api.send_post (Talk.main ('')) | |||||
| last_posted_at = datetime.now () | |||||
| now = datetime.now () | |||||
| for uri in check_notifications (client): | |||||
| records = get_thread_contents (client, uri, 20) | |||||
| if len (records) > 0: | |||||
| answer = Talk.main (records[0]['text'], | |||||
| records[0]['name'], | |||||
| [*map (lambda record: { | |||||
| 'role': ('assistant' | |||||
| if (record['handle'] | |||||
| == account.USER_ID) | |||||
| else 'user'), | |||||
| 'content': | |||||
| record['text']}, | |||||
| reversed (records[1:]))]) | |||||
| client.send_post (answer, | |||||
| reply_to = models.AppBskyFeedPost.ReplyRef ( | |||||
| parent = records[0]['strong_ref'], | |||||
| root = records[-1]['strong_ref'])) | |||||
| if now.hour == 14 and has_got_snack_time: | |||||
| has_got_snack_time = False | |||||
| if now.hour == 15 and not has_got_snack_time: | |||||
| client.send_post (Talk.main ('おやつタイムだ!!!!')) | |||||
| last_posted_at = now | |||||
| has_got_snack_time = True | |||||
| if now - last_posted_at >= timedelta (hours = 6): | |||||
| client.send_post (Talk.main ('')) | |||||
| last_posted_at = now | |||||
| if __name__ == '__main__': | if __name__ == '__main__': | ||||