from __future__ import annotations import asyncio import os import time from datetime import datetime from io import BytesIO from typing import TypedDict import atproto import requests from atproto import Client from atproto.models import AppBskyEmbedExternal, AppBskyEmbedImages from atproto.models.AppBskyFeedPost import ReplyRef from atproto.models.app.bsky.embed import images from atproto.models.com.atproto.repo import strong_ref from atproto_client.models.app.bsky.feed.get_timeline import Response from requests.exceptions import Timeout import account import nicolib from nizika_ai.consts import Character, GPTModel, Platform, QueryType from nizika_ai.models import Answer, AnsweredFlag, Query, User TARGET_WORDS = ['deerjika', 'ニジカ', 'ぼっち', '虹夏', '郁代', 'バーカ', 'kfif', 'kita-flatten-ikuyo-flatten', 'ラマ田', 'ゴートう', 'ぼざクリ', 'オオミソカ', '伊地知', '喜多ちゃん', '喜タイ', '洗澡鹿', 'シーザオ', '今日は本当に', 'ダイソーで', '変なチンチン', 'daisoで', 'だね~(笑)', 'おやつタイム', 'わさしが', 'わさび県', 'たぬマ', 'にくまる', 'ルイズマリー', '餅', 'ニジゴ', 'ゴニジ', 'ニジニジ', '新年だよね', 'うんこじゃん', 'ほくほくのジャガイモ'] time.sleep (60) client = Client (base_url = 'https://bsky.social') client.login (account.USER_ID, account.PASSWORD) async def main ( ) -> None: """ メーン処理 """ await asyncio.gather (like_posts (), check_mentions (), answer ()) async def like_posts ( ) -> None: while True: try: for post in fetch_target_posts (): client.like (**post) except Exception as e: print (f"[like_posts] { type (e).__name__ }: { e }") await asyncio.sleep (60) async def check_mentions ( ) -> None: while True: try: for uri in check_notifications (): records = fetch_thread_contents (uri, 20) if records: record = records[0] content = record['text'] image_url: str | None = None if record['embed'] and hasattr (record['embed'], 'images'): image_url = ('https://cdn.bsky.app/img/feed_fullsize/plain' f"/{ record['did'] }" f"/{ record['embed'].images[0].image.ref.link }") user = _fetch_user (record['did'], record['name']) _add_query (user, record['text'], image_url, { 'uri': record['strong_ref']['uri'], 'cid': record['strong_ref']['cid'] }) except Exception as e: print (f"[check_mentions] { type (e).__name__ }: { e }") await asyncio.sleep (60) async def answer ( ) -> None: answered_flags = ( AnsweredFlag .where ('platform', Platform.BLUESKY.value) .where ('answered', False) .get ()) for answered_flag in answered_flags: answer = answered_flag.answer match QueryType (answer.query.query_type): case QueryType.BLUESKY_COMMENT: td = answer.query.transfer_data or { } uri: str | None = td.get ('uri') cid: str | None = td.get ('cid') if (not uri) or (not cid): continue sref = { 'uri': uri, 'cid': cid } strong_ref = atproto.models.create_strong_ref (sref) reply_ref = ReplyRef (root = strong_ref, parent = strong_ref) client.post (answer.content, reply_to = reply_ref) flag.answered = True flag.save () case QueryType.KIRIBAN | QueryType.NICO_REPORT: td = answer.query.transfer_data or { } video_code: str | None = td.get ('video_code') if not video_code: continue uri = f"https://www.nicovideo.jp/watch/{ video_code }" (title, description, thumbnail) = nicolib.fetch_embed_info (uri) try: upload = client.com.atproto.repo.upload_blob ( BytesIO (requests.get (thumbnail, timeout = 60).content)) thumb = upload.blob except Timeout: thumb = None external = AppBskyEmbedExternal.External ( title = title, description = description, thumb = thumb, uri = uri) embed_external = AppBskyEmbedExternal.Main (external = external) client.post (answer.content, embed = embed_external) flag.answered = True flag.save () case QueryType.SNACK_TIME: try: with open ('./assets/snack-time.jpg', 'rb') as f: image = AppBskyEmbedImages.Image ( alt = ( '左に喜多ちゃん、右に人面鹿のニジカが' 'V字に並んでいる。' '喜多ちゃんは右手でピースサインをして' '片目をウインクしている。' 'ニジカは両手を広げ、' '右手にスプーンを持って' 'ポーズを取っている。' '背景には' '赤と黄色の放射線状の模様が広がり、' '下部に「おやつタイムだ!!!!」という' '日本語のテキストが表示されている。'), image = client.com.atproto.repo.upload_blob (f).blob) client.post (answer.content, embed = AppBskyEmbedImages.Main (images = [image])) flag.answered = True flag.save () except Exception: pass case QueryType.HOT_SPRING: try: with open ('./assets/hot-spring.jpg', 'rb') as f: image = AppBskyEmbedImages.Image ( alt = ('左に喜多ちゃん、右にわさび県産滋賀県が' 'V字に並んでいる。' '喜多ちゃんは右手でピースサインをして' '片目をウインクしている。' 'わさび県産滋賀県はただ茫然と' '立ち尽くしている。' '背景には' '血と空の色をした放射線状の模様が広がり、' '下部に「温泉に入ろう!!!」という' '日本語のテキストが表示されている。'), image = client.com.atproto.repo.upload_blob (f).blob) client.post (answer.content, embed = AppBskyEmbedImages.Main (images = [image])) flag.answered = True flag.save () except Exception: pass def check_notifications ( ) -> list[str]: uris: list[str] = [] last_seen_at = client.get_current_time_iso () notifications = client.app.bsky.notification.list_notifications () for notification in notifications.notifications: if not notification.is_read: match notification.reason: case 'mention' | 'reply' | 'quote': uris.append (notification.uri) case 'follow': client.follow (notification.author.did) client.app.bsky.notification.update_seen ({ 'seen_at': last_seen_at }) return uris def fetch_thread_contents ( uri: str, parent_height: int, ) -> list[Record]: post_thread = client.get_post_thread (uri = uri, parent_height = parent_height) if not post_thread: return [] res = post_thread.thread records: list[Record] = [] while res: records.append ({ 'strong_ref': { 'uri': res.post.uri, 'cid': res.post.cid }, 'did': res.post.author.did, 'handle': res.post.author.handle, 'name': res.post.author.display_name, 'datetime': res.post.record.created_at, 'text': res.post.record.text, 'embed': res.post.record.embed }) res = res.parent return records def fetch_target_posts ( ) -> list[LikeParams]: posts: list[LikeParams] = [] timeline: Response = client.get_timeline () for feed in timeline.feed: me = getattr (client, 'me', None) my_did = me.did if me else '' if (feed.post.author.did != my_did and (feed.post.viewer.like is None) and any (target_word in (feed.post.record.text or '').casefold () for target_word in TARGET_WORDS)): posts.append (LikeParams ({ 'uri': feed.post.uri, 'cid': feed.post.cid })) return posts def _add_query ( user: User, content: str, image_url: str | None = None, transfer_data: dict | None = None, ) -> None: query = Query () query.user_id = user.id query.target_character = Character.DEERJIKA.value query.content = content query.image_url = image_url or None query.query_type = QueryType.BLUESKY_COMMENT.value query.model = GPTModel.GPT3_TURBO.value query.sent_at = datetime.now () query.answered = False query.transfer_data = transfer_data query.save () # TODO: 履歴情報の追加 def _fetch_user ( did: str, name: str, ) -> User: user = User.where ('platform', Platform.BLUESKY.value).where ('code', did).first () if user is None: user = User () user.platform = Platform.BLUESKY.value user.code = did user.name = name user.save () return user class LikeParams (TypedDict): uri: str cid: str class Record (TypedDict): strong_ref: StrongRef did: str handle: str name: str datetime: str text: str embed: object class StrongRef (TypedDict): uri: str cid: str if __name__ == '__main__': asyncio.run (main ())