統合 AI への移行 #16

マージ済み
みてるぞ が 6 個のコミットを ai-migration から main へマージ 2025-10-22 21:50:02 +09:00
9個のファイルの変更255行の追加530行の削除
+6 -3
ファイルの表示
@@ -1,6 +1,9 @@
[submodule "nizika_nico"]
path = nizika_nico
url = https://git.miteruzo.com/miteruzo/nizika_nico
[submodule "ai"]
path = ai
url = https://git.miteruzo.com/miteruzo/nizika_broadcast
[submodule "nizika_ai"]
path = nizika_ai
url = https://git.miteruzo.com/miteruzo/nizika_ai.git
[submodule "nicolib"]
path = nicolib
url = https://git.miteruzo.com/miteruzo/nicolib.git
-1
サブモジュール ai299a3acdff から削除されました
-58
ファイルの表示
@@ -1,58 +0,0 @@
from datetime import datetime
from atproto.models.AppBskyFeedDefs import BlockedPost, NotFoundPost
from atproto_client.models.app.bsky.feed import get_timeline
class Client:
app: AppNamespace
def get_current_time_iso (self) -> datetime: ...
def get_post_thread (
self,
uri: str,
parent_height: int | None = None
) -> Response: ...
def get_timeline (self) -> get_timeline.Response: ...
def follow (self, did: str) -> None: ...
def like (self, uri: str, cid: str) -> None: ...
class AppNamespace:
bsky: AppBskyNamespace
class AppBskyNamespace:
notification: AppBskyNotificationNamespace
class AppBskyNotificationNamespace:
def list_notifications (self) -> Response: ...
def update_seen (self, seen: dict[str, datetime]) -> None: ...
class Response:
notifications: list[Notification]
thread: (ThreadViewPost
| NotFoundPost
| BlockedPost)
class ThreadViewPost:
pass
class Notification:
is_read: bool
reason: str
uri: str
author: ProfileView
class ProfileView:
did: str
-5
ファイルの表示
@@ -1,5 +0,0 @@
class NotFoundPost:
pass
class BlockedPost:
pass
-5
ファイルの表示
@@ -1,5 +0,0 @@
from atproto.models.AppBskyFeedDefs import FeedViewPost
class Response:
feed: list[FeedViewPost]
+216 -253
ファイルの表示
@@ -1,28 +1,24 @@
"""
Bluesky のニジカがいろいろする.
(近々機能ごとにファイル分けて systemd でイベント管理する予定)
"""
from __future__ import annotations
import io
import random
import sys
import asyncio
import os
import time
from datetime import date, datetime
from datetime import time as dt_time
from datetime import timedelta
from typing import TypedDict, cast
from datetime import datetime
from io import BytesIO
from typing import Any, TypedDict
import atproto # type: ignore
import requests
from atproto import Client, models
from atproto_client.models.app.bsky.feed.get_timeline import Response
from bs4 import BeautifulSoup
from atproto import Client # type: ignore
from atproto.models import AppBskyEmbedExternal, AppBskyEmbedImages # type: ignore
from atproto.models.AppBskyFeedPost import ReplyRef # type: ignore
from atproto_client.models.app.bsky.feed.get_timeline import Response # type: ignore
from requests.exceptions import Timeout
import account
import nico
from ai.talk import Talk
import nicolib
from nizika_ai.consts import Character, GPTModel, Platform, QueryType
from nizika_ai.models import Answer, AnsweredFlag, Query, User
TARGET_WORDS = ['deerjika', 'ニジカ', 'ぼっち', '虹夏', '郁代', 'バーカ',
'kfif', 'kita-flatten-ikuyo-flatten', 'ラマ田', 'ゴートう',
@@ -33,151 +29,125 @@ TARGET_WORDS = ['deerjika', 'ニジカ', 'ぼっち', '虹夏', '郁代', 'バ
'ルイズマリー', '', 'ニジゴ', 'ゴニジ', 'ニジニジ',
'新年だよね', 'うんこじゃん', 'ほくほくのジャガイモ']
time.sleep (60)
def main (
client = Client (base_url = 'https://bsky.social')
client.login (account.USER_ID, account.PASSWORD)
async def main (
) -> None:
time.sleep (60)
"""
メーン処理
"""
client = Client (base_url = 'https://bsky.social')
await asyncio.gather (like_posts (),
check_mentions (),
answer ())
client.login (account.USER_ID, account.PASSWORD)
got_kiriban_at: date = datetime.now ().date () - timedelta (days = datetime.now ().hour < 15)
kiriban_list: list[tuple[int, nico.VideoInfo, datetime]] = (
nico.get_kiriban_list (got_kiriban_at))
kiriban_interval: timedelta = ((get_kiriban_dt_to_update () - datetime.now ())
/ len (kiriban_list))
next_kiriban_at = datetime.now ()
last_posted_at = datetime.now () - timedelta (hours = 6)
has_got_snack_time = False
has_taken_hot_spring = False
watched_videos = []
async def like_posts (
) -> None:
while True:
now = datetime.now ()
for uri in check_notifications (client):
records = get_thread_contents (client, uri, 20)
if len (records) > 0:
answer = Talk.main ((records[0]['text']
if (records[0]['embed'] is None
or not hasattr (records[0]['embed'],
'images'))
else [
{ 'type': 'text', 'text': records[0]['text'] },
{ 'type': 'image_url', 'image_url': {
'url': f"https://cdn.bsky.app/img/feed_fullsize/plain/{ records[0]['did'] }/{ records[0]['embed'].images[0].image.ref.link }" } }]),
records[0]['name'],
[*map (lambda record: {
'role': ('assistant'
if (record['handle']
== account.USER_ID)
else 'user'),
'content':
record['text']},
reversed (records[1:]))])
client.post (answer,
reply_to = models.AppBskyFeedPost.ReplyRef (
parent = records[0]['strong_ref'],
root = records[-1]['strong_ref']))
like_posts (client)
if kiriban_list and datetime.now () >= next_kiriban_at:
(views_count, video_info, uploaded_at) = (
kiriban_list.pop (random.randint (0, len (kiriban_list) - 1)))
since_posted = datetime.now () - uploaded_at
uri = f"https://www.nicovideo.jp/watch/{ video_info['contentId'] }"
(title, description, thumbnail) = get_embed_info (uri)
try:
upload = client.com.atproto.repo.upload_blob (
io.BytesIO (requests.get (thumbnail,
timeout = 60).content))
for post in fetch_target_posts ():
client.like (**post)
except Exception as e:
print (f"[like_posts] { type (e).__name__ }: { e }")
await asyncio.sleep (60)
async def check_mentions (
) -> None:
while True:
try:
for uri in check_notifications ():
records = fetch_thread_contents (uri, 20)
if records:
record = records[0]
image_url: str | None = None
if record['embed'] and hasattr (record['embed'], 'images'):
image_url = ('https://cdn.bsky.app/img/feed_fullsize/plain'
f"/{ record['did'] }"
f"/{ record['embed'].images[0].image.ref.link }")
user = _fetch_user (record['did'], record['name'])
_add_query (user, record['text'], image_url, {
'uri': record['strong_ref']['uri'],
'cid': record['strong_ref']['cid'] })
except Exception as e:
print (f"[check_mentions] { type (e).__name__ }: { e }")
await asyncio.sleep (60)
async def answer (
) -> None:
while True:
answered_flags = (
AnsweredFlag
.where ('platform', Platform.BLUESKY.value)
.where ('answered', False)
.get ())
for answered_flag in answered_flags:
td: dict[str, Any]
answer = answered_flag.answer
match QueryType (answer.query_rel.query_type):
case QueryType.BLUESKY_COMMENT:
td = answer.query_rel.transfer_data or { }
uri: str | None = td.get ('uri')
cid: str | None = td.get ('cid')
if (not uri) or (not cid):
continue
sref = { 'uri': uri, 'cid': cid }
strong_ref = atproto.models.create_strong_ref (sref) # type: ignore
reply_ref = ReplyRef (root = strong_ref, parent = strong_ref)
try:
client.post (answer.content, reply_to = reply_ref)
except Exception as e:
print (f"[answer/reply] { type (e).__name__ }: { e }")
continue
answered_flag.answered = True
answered_flag.save ()
case QueryType.KIRIBAN | QueryType.NICO_REPORT:
td = answer.query_rel.transfer_data or { }
video_code: str | None = td.get ('video_code')
if not video_code:
continue
uri = f"https://www.nicovideo.jp/watch/{ video_code }"
(title, description, thumbnail) = nicolib.fetch_embed_info (uri)
try:
resp = requests.get (thumbnail, timeout = 60)
resp.raise_for_status ()
upload = client.com.atproto.repo.upload_blob (BytesIO (resp.content))
thumb = upload.blob
except Timeout:
thumb = None
comments = nico.get_comments (video_info['contentId'])
popular_comments = sorted (comments,
key = lambda c: c.nico_count,
reverse = True)[:10]
latest_comments = sorted (comments,
key = lambda c: c.posted_at,
reverse = True)[:10]
embed_external = models.AppBskyEmbedExternal.Main (
external = models.AppBskyEmbedExternal.External (
title = title,
description = description,
thumb = thumb,
uri = uri))
prompt = f"{ since_posted.days }日と{ since_posted.seconds }秒前にニコニコに投稿された『{ video_info['title'] }』という動画が{ views_count }再生を突破しました。\n"
prompt += f"コメント数は{ len (comments) }件です。\n"
if video_info['tags']:
prompt += f"つけられたタグは「{ '」、「'.join (video_info['tags']) }」です。\n"
if comments:
prompt += f"人気のコメントは次の通りです:「{ '」、「'.join (c.content for c in popular_comments) }\n"
prompt += f"最新のコメントは次の通りです:「{ '」、「'.join (c.content for c in latest_comments) }\n"
prompt += f"""
概要には次のように書かれています:
```html
{ video_info['description'] }
```
このことについて、ニジカちゃんからのお祝いメッセージを下さい。
ただし、そのメッセージ内には再生数の数値とその多さに応じたリアクションを添えてください。
また、ぜひ投稿からこの再生数に至るまでにかかった時間や、つけられたタグ、コメントに対して思いを馳せてください。
好きなコメントがあったら教えてね。"""
client.post (Talk.main (prompt), embed = embed_external)
next_kiriban_at += kiriban_interval
last_posted_at = now
latest_deerjika = nico.get_latest_deerjika ()
if latest_deerjika is not None:
for datum in [e for e in [latest_deerjika]
if e['contentId'] not in watched_videos]:
watched_videos += [datum['contentId']]
uri = f"https://www.nicovideo.jp/watch/{ datum['contentId'] }"
(title, description, thumbnail) = get_embed_info (uri)
try:
upload = client.com.atproto.repo.upload_blob (
io.BytesIO (requests.get (thumbnail,
timeout = 60).content))
thumb = upload.blob
except Timeout:
except Exception as e:
print (f"[answer/nico-thumb] { type (e).__name__ }: { e }")
thumb = None
embed_external = models.AppBskyEmbedExternal.Main (
external = models.AppBskyEmbedExternal.External (
external = AppBskyEmbedExternal.External (
title = title,
description = description,
thumb = thumb,
uri = uri))
client.post (Talk.main (f"""
ニコニコに『{ datum['title'] }』という動画がアップされました。
つけられたタグは「{ '」、「'.join (datum['tags']) }」です。
概要には次のように書かれています:
```html
{ datum['description'] }
```
このことについて、みんなに告知するとともに、ニジカちゃんの感想を教えてください。 """),
embed = embed_external)
last_posted_at = now
if now.hour == 14 and has_got_snack_time:
has_got_snack_time = False
if now.hour == 15:
if got_kiriban_at < datetime.now ().date ():
kiriban_list = nico.get_kiriban_list (datetime.now ().date ())
got_kiriban_at = datetime.now ().date ()
kiriban_interval = ((get_kiriban_dt_to_update () - datetime.now ())
/ len (kiriban_list))
next_kiriban_at = datetime.now ()
if not has_got_snack_time:
uri = uri)
embed_external = AppBskyEmbedExternal.Main (external = external)
try:
client.post (answer.content, embed = embed_external)
except Exception as e:
print (f"[answer/nico-post] { type (e).__name__ }: { e }")
continue
answered_flag.answered = True
answered_flag.save ()
case QueryType.SNACK_TIME:
try:
with open ('./assets/snack-time.jpg', 'rb') as f:
image = models.AppBskyEmbedImages.Image (
alt = ('左に喜多ちゃん、右に人面鹿のニジカが'
image = AppBskyEmbedImages.Image (
alt = (
'左に喜多ちゃん、右に人面鹿のニジカが'
'V字に並んでいる。'
'喜多ちゃんは右手でピースサインをして'
'片目をウインクしている。'
@@ -189,21 +159,16 @@ def main (
'下部に「おやつタイムだ!!!!」という'
'日本語のテキストが表示されている。'),
image = client.com.atproto.repo.upload_blob (f).blob)
client.post (Talk.main ('おやつタイムだ!!!!'),
embed = models.app.bsky.embed.images.Main (
images = [image]))
last_posted_at = now
client.post (answer.content,
embed = AppBskyEmbedImages.Main (images = [image]))
answered_flag.answered = True
answered_flag.save ()
except Exception:
pass
has_got_snack_time = True
if now.hour == 20 and has_taken_hot_spring:
has_taken_hot_spring = False
if now.hour == 21 and not has_taken_hot_spring:
case QueryType.HOT_SPRING:
try:
with open ('./assets/hot-spring.jpg', 'rb') as f:
image = models.AppBskyEmbedImages.Image (
image = AppBskyEmbedImages.Image (
alt = ('左に喜多ちゃん、右にわさび県産滋賀県が'
'V字に並んでいる。'
'喜多ちゃんは右手でピースサインをして'
@@ -215,32 +180,27 @@ def main (
'下部に「温泉に入ろう!!!」という'
'日本語のテキストが表示されている。'),
image = client.com.atproto.repo.upload_blob (f).blob)
client.post (Talk.main ('温泉に入ろう!!!'),
embed = models.app.bsky.embed.images.Main (
images = [image]))
last_posted_at = now
client.post (answer.content,
embed = AppBskyEmbedImages.Main (images = [image]))
answered_flag.answered = True
answered_flag.save ()
except Exception:
pass
has_taken_hot_spring = True
if now - last_posted_at >= timedelta (hours = 6):
client.post (Talk.main ('今どうしてる?'))
last_posted_at = now
time.sleep (60)
await asyncio.sleep (10)
def check_notifications (
client: Client,
) -> list:
(uris, last_seen_at) = ([], client.get_current_time_iso ())
) -> list[str]:
uris: list[str] = []
last_seen_at = client.get_current_time_iso ()
for notification in (client.app.bsky.notification.list_notifications ()
.notifications):
notifications = client.app.bsky.notification.list_notifications ()
for notification in notifications.notifications:
if not notification.is_read:
if notification.reason in ['mention', 'reply', 'quote']:
uris += [notification.uri]
elif notification.reason == 'follow':
match notification.reason:
case 'mention' | 'reply' | 'quote':
uris.append (notification.uri)
case 'follow':
client.follow (notification.author.did)
client.app.bsky.notification.update_seen ({ 'seen_at': last_seen_at })
@@ -248,102 +208,105 @@ def check_notifications (
return uris
def get_thread_contents (
client: Client,
def fetch_thread_contents (
uri: str,
parent_height: int,
) -> list:
response = (client.get_post_thread (uri = uri,
parent_height = parent_height)
.thread)
records = []
while response is not None:
records += [{ 'strong_ref': models.create_strong_ref (response.post),
'did': response.post.author.did,
'handle': response.post.author.handle,
'name': response.post.author.display_name,
'datetime': response.post.record.created_at,
'text': response.post.record.text,
'embed': response.post.record.embed }]
response = response.parent
) -> list[Record]:
post_thread = client.get_post_thread (uri = uri, parent_height = parent_height)
if not post_thread:
return []
res = post_thread.thread
records: list[Record] = []
while res:
if hasattr (res, 'post'):
records.append ({ 'strong_ref': { 'uri': res.post.uri,
'cid': res.post.cid },
'did': res.post.author.did,
'handle': res.post.author.handle,
'name': (res.post.author.display_name
or res.post.author.handle),
'datetime': res.post.record.created_at,
'text': getattr (res.post.record, 'text', None) or '',
'embed': getattr (res.post.record, 'embed', None) })
res = res.parent
else:
break
return records
def get_embed_info (
url: str
) -> tuple[str, str, str]:
title: str = ''
description: str = ''
thumbnail: str = ''
try:
res = requests.get (url, timeout = 60)
except Timeout:
return ('', '', '')
if res.status_code != 200:
return ('', '', '')
soup = BeautifulSoup (res.text, 'html.parser')
tmp = soup.find ('title')
if tmp is not None:
title = tmp.text
tmp = soup.find ('meta', attrs = { 'name': 'description' })
if tmp is not None and hasattr (tmp, 'get'):
try:
description = cast (str, tmp.get ('content'))
except Exception:
pass
tmp = soup.find ('meta', attrs = { 'name': 'thumbnail' })
if tmp is not None and hasattr (tmp, 'get'):
try:
thumbnail = cast (str, tmp.get ('content'))
except Exception:
pass
return (title, description, thumbnail)
def get_kiriban_dt_to_update (
) -> datetime:
now = datetime.now ()
today = now.date ()
dt = datetime.combine (today, dt_time (15, 0))
if dt <= now:
dt += timedelta (days = 1)
return dt
def like_posts (
client: Client,
) -> None:
for post in get_target_posts (client):
client.like (**post)
def get_target_posts (
client: Client,
def fetch_target_posts (
) -> list[LikeParams]:
posts = []
posts: list[LikeParams] = []
timeline: Response = client.get_timeline ()
for feed in timeline.feed:
if (feed.post.author.did != client.me.did
me = getattr (client, 'me', None)
my_did = me.did if me else ''
if (feed.post.author.did != my_did
and (feed.post.viewer.like is None)
and any (target_word in feed.post.record.text.lower () for target_word in TARGET_WORDS)):
posts.append (LikeParams({ 'uri': feed.post.uri, 'cid': feed.post.cid }))
and any (target_word in (feed.post.record.text or '').casefold ()
for target_word in TARGET_WORDS)):
posts.append (LikeParams ({ 'uri': feed.post.uri, 'cid': feed.post.cid }))
return posts
def _add_query (
user: User,
content: str,
image_url: str | None = None,
transfer_data: dict[str, Any] | None = None,
) -> None:
query = Query ()
query.user_id = user.id
query.target_character = Character.DEERJIKA.value
query.content = content
query.image_url = image_url or None
query.query_type = QueryType.BLUESKY_COMMENT.value
query.model = GPTModel.GPT3_TURBO.value
query.sent_at = datetime.now ()
query.answered = False
query.transfer_data = transfer_data
query.save ()
# TODO: 履歴情報の追加
def _fetch_user (
did: str,
name: str,
) -> User:
user = User.where ('platform', Platform.BLUESKY.value).where ('code', did).first ()
if user is None:
user = User ()
user.platform = Platform.BLUESKY.value
user.code = did
user.name = name
user.save ()
return user
class LikeParams (TypedDict):
uri: str
cid: str
class Record (TypedDict):
strong_ref: StrongRef
did: str
handle: str
name: str
datetime: str
text: str
embed: object
class StrongRef (TypedDict):
uri: str
cid: str
if __name__ == '__main__':
main (*sys.argv[1:])
asyncio.run (main ())
-174
ファイルの表示
@@ -1,174 +0,0 @@
"""
ニコニコのニジカ動画取得モヂュール
"""
from __future__ import annotations
import os
from datetime import date, datetime, timedelta
from typing import TypedDict, cast
import requests
from bs4 import BeautifulSoup
from requests.exceptions import Timeout
from eloquent import DatabaseManager, Model
from db.models import Comment, Tag, Video, VideoHistory, VideoTag
CONFIG: dict[str, DbConfig] = { 'mysql': { 'driver': 'mysql',
'host': 'localhost',
'database': 'nizika_nico',
'user': os.environ['MYSQL_USER'],
'password': os.environ['MYSQL_PASS'],
'prefix': '' } }
DB = DatabaseManager (CONFIG)
Model.set_connection_resolver (DB)
KIRIBAN_VIEWS_COUNTS: list[int] = sorted ({ *range (1_000, 10_000, 1_000),
*range (10_000, 1_000_001, 10_000),
194, 245, 510, 810, 114_514, 1_940, 2_450, 5_100,
19_400, 24_500, 51_000, 93_194, 2_424, 242_424, 1_919,
4_545, 194_245, 245_194, 510_245 },
reverse = True)
class VideoInfo (TypedDict):
contentId: str
title: str
tags: list[str]
description: str
def get_latest_deerjika (
) -> VideoInfo | None:
tag = '伊地知ニジカ OR ぼざろクリーチャーシリーズ'
url = f"https://www.nicovideo.jp/tag/{ tag }"
params = { 'sort': 'f',
'order': 'd' }
video_info = { }
bs = get_bs_from_url (url, params)
if bs is None:
return None
try:
video = (bs.find_all ('ul', class_ = 'videoListInner')[1]
.find ('li', class_ = 'item'))
video_info['contentId'] = video['data-video-id']
except Exception:
return None
return get_video_info (video_info['contentId'])
def get_bs_from_url (
url: str,
params: dict = { },
) -> BeautifulSoup | None:
"""
URL から BeautifulSoup インスタンス生成
Parameters
----------
url: str
捜査する URL
params: dict
パラメータ
Return
------
BeautifulSoup | None
BeautifulSoup オブゼクト(失敗したら None)
"""
try:
req = requests.get (url, params = params, timeout = 60)
except Timeout:
return None
if req.status_code != 200:
return None
req.encoding = req.apparent_encoding
return BeautifulSoup (req.text, 'html.parser')
def get_video_info (
video_code: str,
) -> VideoInfo | None:
video_info: dict[str, str | list[str]] = { 'contentId': video_code }
bs = get_bs_from_url (f"https://www.nicovideo.jp/watch/{ video_code }")
if bs is None:
return None
try:
title = bs.find ('title')
if title is None:
return None
video_info['title'] = '-'.join (title.text.split ('-')[:(-1)])[:(-1)]
tags: str = bs.find ('meta', attrs = { 'name': 'keywords' }).get ('content') # type: ignore
video_info['tags'] = tags.split (',')
video_info['description'] = bs.find ('meta', attrs = { 'name': 'description' }).get ('content') # type: ignore
except Exception:
return None
return cast (VideoInfo, video_info)
def get_kiriban_list (
base_date: date,
) -> list[tuple[int, VideoInfo, datetime]]:
kiriban_list: list[tuple[int, VideoInfo, datetime]] = []
latest_fetched_at = cast (date, (VideoHistory
.where ('fetched_at', '<=', base_date)
.max ('fetched_at')))
for kiriban_views_count in KIRIBAN_VIEWS_COUNTS:
targets = { vh.video.code for vh in (VideoHistory
.where ('fetched_at', latest_fetched_at)
.where ('views_count', '>=', kiriban_views_count)
.get ()) }
for code in targets:
if code in [kiriban[1]['contentId'] for kiriban in kiriban_list]:
continue
previous_views_count: int | None = (
VideoHistory
.where_has ('video', lambda q: q.where ('code', code))
.where ('fetched_at', '<', latest_fetched_at)
.max ('views_count'))
if previous_views_count is None:
previous_views_count = 0
if previous_views_count >= kiriban_views_count:
continue
video_info = get_video_info (code)
if video_info is not None:
kiriban_list.append ((kiriban_views_count, video_info,
cast (Video, Video.where ('code', code).first ()).uploaded_at))
return kiriban_list
def get_comments (
video_code: str,
) -> list[Comment]:
video = Video.where ('code', video_code).first ()
if video is None:
return []
return video.comments
class DbConfig (TypedDict):
driver: str
host: str
database: str
user: str
password: str
prefix: str
サブモジュール
+1
サブモジュール nicolib32ecf2d00f で追加されました
サブモジュール
+1
サブモジュール nizika_ai3be6d9063c で追加されました