|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350 |
- """
- Bluesky のニジカがいろいろする.
- (近々機能ごとにファイル分けて systemd でイベント管理する予定)
- """
-
- from __future__ import annotations
-
- import io
- import random
- import sys
- import time
- from datetime import date, datetime
- from datetime import time as dt_time
- from datetime import timedelta
- from typing import TypedDict, cast
-
- import requests
- from atproto import Client, models
- from atproto_client.models.app.bsky.feed.get_timeline import Response
- from bs4 import BeautifulSoup
- from requests.exceptions import Timeout
-
- import account
- import nico
- from ai.talk import Talk
-
- TARGET_WORDS = ['deerjika', 'ニジカ', 'ぼっち', '虹夏', '郁代', 'バーカ',
- 'kfif', 'kita-flatten-ikuyo-flatten', 'ラマ田', 'ゴートう',
- 'ぼざクリ', 'オオミソカ', '伊地知', '喜多ちゃん',
- '喜タイ', '洗澡鹿', 'シーザオ', '今日は本当に',
- 'ダイソーで', '変なチンチン', 'daisoで', 'だね~(笑)',
- 'おやつタイム', 'わさしが', 'わさび県', 'たぬマ', 'にくまる',
- 'ルイズマリー', '餅', 'ニジゴ', 'ゴニジ', 'ニジニジ',
- '新年だよね', 'うんこじゃん', 'ほくほくのジャガイモ']
-
-
- def main (
- ) -> None:
- time.sleep (60)
-
- client = Client (base_url = 'https://bsky.social')
-
- client.login (account.USER_ID, account.PASSWORD)
-
- got_kiriban_at: date = datetime.now ().date () - timedelta (days = datetime.now ().hour < 15)
- kiriban_list: list[tuple[int, nico.VideoInfo, datetime]] = (
- nico.get_kiriban_list (got_kiriban_at))
- kiriban_interval: timedelta = ((get_kiriban_dt_to_update () - datetime.now ())
- / len (kiriban_list))
- next_kiriban_at = datetime.now ()
-
- last_posted_at = datetime.now () - timedelta (hours = 6)
- has_got_snack_time = False
- has_taken_hot_spring = False
- watched_videos = []
- while True:
- now = datetime.now ()
-
- for uri in check_notifications (client):
- records = get_thread_contents (client, uri, 20)
- if len (records) > 0:
- answer = Talk.main ((records[0]['text']
- if (records[0]['embed'] is None
- or not hasattr (records[0]['embed'],
- 'images'))
- else [
- { 'type': 'text', 'text': records[0]['text'] },
- { 'type': 'image_url', 'image_url': {
- 'url': f"https://cdn.bsky.app/img/feed_fullsize/plain/{ records[0]['did'] }/{ records[0]['embed'].images[0].image.ref.link }" } }]),
- records[0]['name'],
- [*map (lambda record: {
- 'role': ('assistant'
- if (record['handle']
- == account.USER_ID)
- else 'user'),
- 'content':
- record['text']},
- reversed (records[1:]))])
- client.post (answer,
- reply_to = models.AppBskyFeedPost.ReplyRef (
- parent = records[0]['strong_ref'],
- root = records[-1]['strong_ref']))
-
- like_posts (client)
-
- if kiriban_list and datetime.now () >= next_kiriban_at:
- (views_count, video_info, uploaded_at) = (
- kiriban_list.pop (random.randint (0, len (kiriban_list) - 1)))
- since_posted = datetime.now () - uploaded_at
- uri = f"https://www.nicovideo.jp/watch/{ video_info['contentId'] }"
- (title, description, thumbnail) = get_embed_info (uri)
- try:
- upload = client.com.atproto.repo.upload_blob (
- io.BytesIO (requests.get (thumbnail,
- timeout = 60).content))
- thumb = upload.blob
- except Timeout:
- thumb = None
- comments = nico.get_comments (video_info['contentId'])
- popular_comments = sorted (comments,
- key = lambda c: c.nico_count,
- reverse = True)[:10]
- latest_comments = sorted (comments,
- key = lambda c: c.posted_at,
- reverse = True)[:10]
- embed_external = models.AppBskyEmbedExternal.Main (
- external = models.AppBskyEmbedExternal.External (
- title = title,
- description = description,
- thumb = thumb,
- uri = uri))
- prompt = f"{ since_posted.days }日と{ since_posted.seconds }秒前にニコニコに投稿された『{ video_info['title'] }』という動画が{ views_count }再生を突破しました。\n"
- prompt += f"コメント数は{ len (comments) }件です。\n"
- if video_info['tags']:
- prompt += f"つけられたタグは「{ '」、「'.join (video_info['tags']) }」です。\n"
- if comments:
- prompt += f"人気のコメントは次の通りです:「{ '」、「'.join (c.content for c in popular_comments) }」\n"
- prompt += f"最新のコメントは次の通りです:「{ '」、「'.join (c.content for c in latest_comments) }」\n"
- prompt += f"""
- 概要には次のように書かれています:
- ```html
- { video_info['description'] }
- ```
- このことについて、ニジカちゃんからのお祝いメッセージを下さい。
- ただし、そのメッセージ内には再生数の数値とその多さに応じたリアクションを添えてください。
- ちなみに1000再生以上はまじですげぇです.
- また、ぜひ投稿からこの再生数に至るまでにかかった時間や、つけられたタグ、コメントに対して思いを馳せてください。
- 好きなコメントがあったら教えてね。"""
- client.post (Talk.main (prompt), embed = embed_external)
- next_kiriban_at += kiriban_interval
- last_posted_at = now
-
- latest_deerjika = nico.get_latest_deerjika ()
- if latest_deerjika is not None:
- for datum in [e for e in [latest_deerjika]
- if e['contentId'] not in watched_videos]:
- watched_videos += [datum['contentId']]
-
- uri = f"https://www.nicovideo.jp/watch/{ datum['contentId'] }"
- (title, description, thumbnail) = get_embed_info (uri)
- try:
- upload = client.com.atproto.repo.upload_blob (
- io.BytesIO (requests.get (thumbnail,
- timeout = 60).content))
- thumb = upload.blob
- except Timeout:
- thumb = None
-
- embed_external = models.AppBskyEmbedExternal.Main (
- external = models.AppBskyEmbedExternal.External (
- title = title,
- description = description,
- thumb = thumb,
- uri = uri))
- client.post (Talk.main (f"""
- ニコニコに『{ datum['title'] }』という動画がアップされました。
- つけられたタグは「{ '」、「'.join (datum['tags']) }」です。
- 概要には次のように書かれています:
- ```html
- { datum['description'] }
- ```
- このことについて、みんなに告知するとともに、ニジカちゃんの感想を教えてください。 """),
- embed = embed_external)
- last_posted_at = now
-
- if now.hour == 14 and has_got_snack_time:
- has_got_snack_time = False
-
- if now.hour == 15:
- if got_kiriban_at < datetime.now ().date ():
- kiriban_list = nico.get_kiriban_list (datetime.now ().date ())
- got_kiriban_at = datetime.now ().date ()
- kiriban_interval = ((get_kiriban_dt_to_update () - datetime.now ())
- / len (kiriban_list))
- next_kiriban_at = datetime.now ()
-
- if not has_got_snack_time:
- try:
- with open ('./assets/snack-time.jpg', 'rb') as f:
- image = models.AppBskyEmbedImages.Image (
- alt = ('左に喜多ちゃん、右に人面鹿のニジカが'
- 'V字に並んでいる。'
- '喜多ちゃんは右手でピースサインをして'
- '片目をウインクしている。'
- 'ニジカは両手を広げ、'
- '右手にスプーンを持って'
- 'ポーズを取っている。'
- '背景には'
- '赤と黄色の放射線状の模様が広がり、'
- '下部に「おやつタイムだ!!!!」という'
- '日本語のテキストが表示されている。'),
- image = client.com.atproto.repo.upload_blob (f).blob)
- client.post (Talk.main ('おやつタイムだ!!!!'),
- embed = models.app.bsky.embed.images.Main (
- images = [image]))
- last_posted_at = now
- except Exception:
- pass
- has_got_snack_time = True
-
- if now.hour == 20 and has_taken_hot_spring:
- has_taken_hot_spring = False
-
- if now.hour == 21 and not has_taken_hot_spring:
- try:
- with open ('./assets/hot-spring.jpg', 'rb') as f:
- image = models.AppBskyEmbedImages.Image (
- alt = ('左に喜多ちゃん、右にわさび県産滋賀県が'
- 'V字に並んでいる。'
- '喜多ちゃんは右手でピースサインをして'
- '片目をウインクしている。'
- 'わさび県産滋賀県はただ茫然と'
- '立ち尽くしている。'
- '背景には'
- '血と空の色をした放射線状の模様が広がり、'
- '下部に「温泉に入ろう!!!」という'
- '日本語のテキストが表示されている。'),
- image = client.com.atproto.repo.upload_blob (f).blob)
- client.post (Talk.main ('温泉に入ろう!!!'),
- embed = models.app.bsky.embed.images.Main (
- images = [image]))
- last_posted_at = now
- except Exception:
- pass
- has_taken_hot_spring = True
-
- if now - last_posted_at >= timedelta (hours = 6):
- client.post (Talk.main ('今どうしてる?'))
- last_posted_at = now
-
- time.sleep (60)
-
-
- def check_notifications (
- client: Client,
- ) -> list:
- (uris, last_seen_at) = ([], client.get_current_time_iso ())
-
- for notification in (client.app.bsky.notification.list_notifications ()
- .notifications):
- if not notification.is_read:
- if notification.reason in ['mention', 'reply', 'quote']:
- uris += [notification.uri]
- elif notification.reason == 'follow':
- client.follow (notification.author.did)
-
- client.app.bsky.notification.update_seen ({ 'seen_at': last_seen_at })
-
- return uris
-
-
- def get_thread_contents (
- client: Client,
- uri: str,
- parent_height: int,
- ) -> list:
- response = (client.get_post_thread (uri = uri,
- parent_height = parent_height)
- .thread)
- records = []
- while response is not None:
- records += [{ 'strong_ref': models.create_strong_ref (response.post),
- 'did': response.post.author.did,
- 'handle': response.post.author.handle,
- 'name': response.post.author.display_name,
- 'datetime': response.post.record.created_at,
- 'text': response.post.record.text,
- 'embed': response.post.record.embed }]
- response = response.parent
-
- return records
-
-
- def get_embed_info (
- url: str
- ) -> tuple[str, str, str]:
- title: str = ''
- description: str = ''
- thumbnail: str = ''
-
- try:
- res = requests.get (url, timeout = 60)
- except Timeout:
- return ('', '', '')
-
- if res.status_code != 200:
- return ('', '', '')
-
- soup = BeautifulSoup (res.text, 'html.parser')
-
- tmp = soup.find ('title')
- if tmp is not None:
- title = tmp.text
-
- tmp = soup.find ('meta', attrs = { 'name': 'description' })
- if tmp is not None and hasattr (tmp, 'get'):
- try:
- description = cast (str, tmp.get ('content'))
- except Exception:
- pass
-
- tmp = soup.find ('meta', attrs = { 'name': 'thumbnail' })
- if tmp is not None and hasattr (tmp, 'get'):
- try:
- thumbnail = cast (str, tmp.get ('content'))
- except Exception:
- pass
-
- return (title, description, thumbnail)
-
-
- def get_kiriban_dt_to_update (
- ) -> datetime:
- now = datetime.now ()
- today = now.date ()
- dt = datetime.combine (today, dt_time (15, 0))
- if dt <= now:
- dt += timedelta (days = 1)
- return dt
-
-
- def like_posts (
- client: Client,
- ) -> None:
- for post in get_target_posts (client):
- client.like (**post)
-
-
- def get_target_posts (
- client: Client,
- ) -> list[LikeParams]:
- posts = []
-
- timeline: Response = client.get_timeline ()
- for feed in timeline.feed:
- if (feed.post.author.did != client.me.did
- and (feed.post.viewer.like is None)
- and any (target_word in feed.post.record.text.lower () for target_word in TARGET_WORDS)):
- posts.append (LikeParams({ 'uri': feed.post.uri, 'cid': feed.post.cid }))
-
- return posts
-
-
- class LikeParams (TypedDict):
- uri: str
- cid: str
-
-
- if __name__ == '__main__':
- main (*sys.argv[1:])
|