コミットを比較

...

16 コミット

7個のファイルの変更168行の追加73行の削除
+2
ファイルの表示
@@ -0,0 +1,2 @@
/db
/eloquent.pyi
+12 -3
ファイルの表示
@@ -1,6 +1,7 @@
from datetime import datetime from datetime import datetime
from atproto import models from atproto.models.AppBskyFeedDefs import BlockedPost, NotFoundPost
from atproto_client.models.app.bsky.feed import get_timeline
class Client: class Client:
@@ -14,8 +15,12 @@ class Client:
parent_height: int | None = None parent_height: int | None = None
) -> Response: ... ) -> Response: ...
def get_timeline (self) -> get_timeline.Response: ...
def follow (self, did: str) -> None: ... def follow (self, did: str) -> None: ...
def like (self, uri: str, cid: str) -> None: ...
class AppNamespace: class AppNamespace:
bsky: AppBskyNamespace bsky: AppBskyNamespace
@@ -34,8 +39,12 @@ class AppBskyNotificationNamespace:
class Response: class Response:
notifications: list[Notification] notifications: list[Notification]
thread: (ThreadViewPost thread: (ThreadViewPost
| models.AppBskyFeedDefs.NotFoundPost | NotFoundPost
| models.AppBskyFeedDefs.BlockedPost) | BlockedPost)
class ThreadViewPost:
pass
class Notification: class Notification:
ファイルの表示
+5
ファイルの表示
@@ -0,0 +1,5 @@
class NotFoundPost:
pass
class BlockedPost:
pass
+5
ファイルの表示
@@ -0,0 +1,5 @@
from atproto.models.AppBskyFeedDefs import FeedViewPost
class Response:
feed: list[FeedViewPost]
+105 -47
ファイルの表示
@@ -3,6 +3,8 @@ Bluesky のニジカがいろいろする.
(近々機能ごとにファイル分けて systemd でイベント管理する予定) (近々機能ごとにファイル分けて systemd でイベント管理する予定)
""" """
from __future__ import annotations
import io import io
import random import random
import sys import sys
@@ -10,10 +12,11 @@ import time
from datetime import date, datetime from datetime import date, datetime
from datetime import time as dt_time from datetime import time as dt_time
from datetime import timedelta from datetime import timedelta
from typing import cast from typing import TypedDict, cast
import requests import requests
from atproto import Client, models from atproto import Client, models
from atproto_client.models.app.bsky.feed.get_timeline import Response
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from requests.exceptions import Timeout from requests.exceptions import Timeout
@@ -21,45 +24,14 @@ import account
import nico import nico
from ai.talk import Talk from ai.talk import Talk
TARGET_WORDS = ['deerjika', 'ニジカ', 'ぼっち', '虹夏', '郁代', 'バーカ',
def check_notifications ( 'kfif', 'kita-flatten-ikuyo-flatten', 'ラマ田', 'ゴートう',
client: Client, 'ぼざクリ', 'オオミソカ', '伊地知', '喜多ちゃん',
) -> list: '喜タイ', '洗澡鹿', 'シーザオ', '今日は本当に',
(uris, last_seen_at) = ([], client.get_current_time_iso ()) 'ダイソーで', '変なチンチン', 'daisoで', 'だね~(笑)',
'おやつタイム', 'わさしが', 'わさび県', 'たぬマ', 'にくまる',
for notification in (client.app.bsky.notification.list_notifications () 'ルイズマリー', '', 'ニジゴ', 'ゴニジ', 'ニジニジ',
.notifications): '新年だよね', 'うんこじゃん', 'ほくほくのジャガイモ']
if not notification.is_read:
if notification.reason in ['mention', 'reply', 'quote']:
uris += [notification.uri]
elif notification.reason == 'follow':
client.follow (notification.author.did)
client.app.bsky.notification.update_seen ({ 'seen_at': last_seen_at })
return uris
def get_thread_contents (
client: Client,
uri: str,
parent_height: int,
) -> list:
response = (client.get_post_thread (uri = uri,
parent_height = parent_height)
.thread)
records = []
while response is not None:
records += [{ 'strong_ref': models.create_strong_ref (response.post),
'did': response.post.author.did,
'handle': response.post.author.handle,
'name': response.post.author.display_name,
'datetime': response.post.record.created_at,
'text': response.post.record.text,
'embed': response.post.record.embed }]
response = response.parent
return records
def main ( def main (
@@ -71,7 +43,8 @@ def main (
client.login (account.USER_ID, account.PASSWORD) client.login (account.USER_ID, account.PASSWORD)
got_kiriban_at: date = datetime.now ().date () - timedelta (days = datetime.now ().hour < 15) got_kiriban_at: date = datetime.now ().date () - timedelta (days = datetime.now ().hour < 15)
kiriban_list: list[tuple[int, nico.VideoInfo]] = nico.get_kiriban_list (got_kiriban_at) kiriban_list: list[tuple[int, nico.VideoInfo, datetime]] = (
nico.get_kiriban_list (got_kiriban_at))
kiriban_interval: timedelta = ((get_kiriban_dt_to_update () - datetime.now ()) kiriban_interval: timedelta = ((get_kiriban_dt_to_update () - datetime.now ())
/ len (kiriban_list)) / len (kiriban_list))
next_kiriban_at = datetime.now () next_kiriban_at = datetime.now ()
@@ -108,9 +81,12 @@ def main (
parent = records[0]['strong_ref'], parent = records[0]['strong_ref'],
root = records[-1]['strong_ref'])) root = records[-1]['strong_ref']))
like_posts (client)
if kiriban_list and datetime.now () >= next_kiriban_at: if kiriban_list and datetime.now () >= next_kiriban_at:
(views_count, video_info) = ( (views_count, video_info, uploaded_at) = (
kiriban_list.pop (random.randint (0, len (kiriban_list) - 1))) kiriban_list.pop (random.randint (0, len (kiriban_list) - 1)))
since_posted = datetime.now () - uploaded_at
uri = f"https://www.nicovideo.jp/watch/{ video_info['contentId'] }" uri = f"https://www.nicovideo.jp/watch/{ video_info['contentId'] }"
(title, description, thumbnail) = get_embed_info (uri) (title, description, thumbnail) = get_embed_info (uri)
try: try:
@@ -120,21 +96,36 @@ def main (
thumb = upload.blob thumb = upload.blob
except Timeout: except Timeout:
thumb = None thumb = None
comments = nico.get_comments (video_info['contentId'])
popular_comments = sorted (comments,
key = lambda c: c.nico_count,
reverse = True)[:10]
latest_comments = sorted (comments,
key = lambda c: c.posted_at,
reverse = True)[:10]
embed_external = models.AppBskyEmbedExternal.Main ( embed_external = models.AppBskyEmbedExternal.Main (
external = models.AppBskyEmbedExternal.External ( external = models.AppBskyEmbedExternal.External (
title = title, title = title,
description = description, description = description,
thumb = thumb, thumb = thumb,
uri = uri)) uri = uri))
client.post (Talk.main (f""" prompt = f"{ since_posted.days }日と{ since_posted.seconds }秒前にニコニコに投稿された『{ video_info['title'] }』という動画が{ views_count }再生を突破しました。\n"
ニコニコの『{ video_info['title'] }』という動画が{ views_count }再生を突破しました。 prompt += f"コメント数は{ len (comments) }件です。\n"
つけられたタグは「{ '」、「'.join (video_info['tags']) }」です。 if video_info['tags']:
prompt += f"つけられたタグは「{ '」、「'.join (video_info['tags']) }」です。\n"
if comments:
prompt += f"人気のコメントは次の通りです:「{ '」、「'.join (c.content for c in popular_comments) }\n"
prompt += f"最新のコメントは次の通りです:「{ '」、「'.join (c.content for c in latest_comments) }\n"
prompt += f"""
概要には次のように書かれています: 概要には次のように書かれています:
```html ```html
{ video_info['description'] } { video_info['description'] }
``` ```
このことについて、ニジカちゃんからのお祝いメッセージを下さい。"""), このことについて、ニジカちゃんからのお祝いメッセージを下さい。
embed = embed_external) ただし、そのメッセージ内には再生数の数値とその多さに応じたリアクションを添えてください。
また、ぜひ投稿からこの再生数に至るまでにかかった時間や、つけられたタグ、コメントに対して思いを馳せてください。
好きなコメントがあったら教えてね。"""
client.post (Talk.main (prompt), embed = embed_external)
next_kiriban_at += kiriban_interval next_kiriban_at += kiriban_interval
last_posted_at = now last_posted_at = now
@@ -239,6 +230,46 @@ def main (
time.sleep (60) time.sleep (60)
def check_notifications (
client: Client,
) -> list:
(uris, last_seen_at) = ([], client.get_current_time_iso ())
for notification in (client.app.bsky.notification.list_notifications ()
.notifications):
if not notification.is_read:
if notification.reason in ['mention', 'reply', 'quote']:
uris += [notification.uri]
elif notification.reason == 'follow':
client.follow (notification.author.did)
client.app.bsky.notification.update_seen ({ 'seen_at': last_seen_at })
return uris
def get_thread_contents (
client: Client,
uri: str,
parent_height: int,
) -> list:
response = (client.get_post_thread (uri = uri,
parent_height = parent_height)
.thread)
records = []
while response is not None:
records += [{ 'strong_ref': models.create_strong_ref (response.post),
'did': response.post.author.did,
'handle': response.post.author.handle,
'name': response.post.author.display_name,
'datetime': response.post.record.created_at,
'text': response.post.record.text,
'embed': response.post.record.embed }]
response = response.parent
return records
def get_embed_info ( def get_embed_info (
url: str url: str
) -> tuple[str, str, str]: ) -> tuple[str, str, str]:
@@ -287,5 +318,32 @@ def get_kiriban_dt_to_update (
return dt return dt
def like_posts (
client: Client,
) -> None:
for post in get_target_posts (client):
client.like (**post)
def get_target_posts (
client: Client,
) -> list[LikeParams]:
posts = []
timeline: Response = client.get_timeline ()
for feed in timeline.feed:
if (feed.post.author.did != client.me.did
and (feed.post.viewer.like is None)
and any (target_word in feed.post.record.text.lower () for target_word in TARGET_WORDS)):
posts.append (LikeParams({ 'uri': feed.post.uri, 'cid': feed.post.cid }))
return posts
class LikeParams (TypedDict):
uri: str
cid: str
if __name__ == '__main__': if __name__ == '__main__':
main (*sys.argv[1:]) main (*sys.argv[1:])
+39 -23
ファイルの表示
@@ -5,7 +5,7 @@
from __future__ import annotations from __future__ import annotations
import os import os
from datetime import date, timedelta from datetime import date, datetime, timedelta
from typing import TypedDict, cast from typing import TypedDict, cast
import requests import requests
@@ -13,7 +13,7 @@ from bs4 import BeautifulSoup
from requests.exceptions import Timeout from requests.exceptions import Timeout
from eloquent import DatabaseManager, Model from eloquent import DatabaseManager, Model
from db.models import Tag, Video, VideoHistory, VideoTag from db.models import Comment, Tag, Video, VideoHistory, VideoTag
CONFIG: dict[str, DbConfig] = { 'mysql': { 'driver': 'mysql', CONFIG: dict[str, DbConfig] = { 'mysql': { 'driver': 'mysql',
'host': 'localhost', 'host': 'localhost',
@@ -25,12 +25,12 @@ CONFIG: dict[str, DbConfig] = { 'mysql': { 'driver': 'mysql',
DB = DatabaseManager (CONFIG) DB = DatabaseManager (CONFIG)
Model.set_connection_resolver (DB) Model.set_connection_resolver (DB)
KIRIBAN_VIEWS_COUNTS: set[int] = { *range (100, 1_000, 100), KIRIBAN_VIEWS_COUNTS: list[int] = sorted ({ *range (1_000, 10_000, 1_000),
*range (1_000, 10_000, 1_000), *range (10_000, 1_000_001, 10_000),
*range (10_000, 1_000_001, 10_000), 194, 245, 510, 810, 114_514, 1_940, 2_450, 5_100,
194, 245, 510, 114_514, 1_940, 2_450, 5_100, 24_500, 19_400, 24_500, 51_000, 93_194, 2_424, 242_424, 1_919,
51_000, 2_424 } 4_545, 194_245, 245_194, 510_245 },
reverse = True)
class VideoInfo (TypedDict): class VideoInfo (TypedDict):
contentId: str contentId: str
@@ -124,31 +124,47 @@ def get_video_info (
def get_kiriban_list ( def get_kiriban_list (
base_date: date, base_date: date,
) -> list[tuple[int, VideoInfo]]: ) -> list[tuple[int, VideoInfo, datetime]]:
kiriban_list: list[tuple[int, VideoInfo]] = [] kiriban_list: list[tuple[int, VideoInfo, datetime]] = []
latest_fetched_at = cast (date, VideoHistory.max ('fetched_at')) latest_fetched_at = cast (date, (VideoHistory
previous_fetched_at = cast (date, (VideoHistory .where ('fetched_at', '<=', base_date)
.where ('fetched_at', '<', latest_fetched_at) .max ('fetched_at')))
.max ('fetched_at')))
for kiriban_views_count in KIRIBAN_VIEWS_COUNTS: for kiriban_views_count in KIRIBAN_VIEWS_COUNTS:
targets = ({ vh.video.code for vh in (VideoHistory targets = { vh.video.code for vh in (VideoHistory
.where ('fetched_at', latest_fetched_at) .where ('fetched_at', latest_fetched_at)
.where ('views_count', '>=', kiriban_views_count) .where ('views_count', '>=', kiriban_views_count)
.get ()) } .get ()) }
- { vh.video.code for vh in (VideoHistory
.where ('fetched_at', previous_fetched_at)
.where ('views_count', '>=', kiriban_views_count)
.get ()) })
for code in targets: for code in targets:
if code in [kiriban[1]['contentId'] for kiriban in kiriban_list]:
continue
previous_views_count: int | None = (
VideoHistory
.where_has ('video', lambda q: q.where ('code', code))
.where ('fetched_at', '<', latest_fetched_at)
.max ('views_count'))
if previous_views_count is None:
previous_views_count = 0
if previous_views_count >= kiriban_views_count:
continue
video_info = get_video_info (code) video_info = get_video_info (code)
if video_info is not None: if video_info is not None:
kiriban_list.append ((kiriban_views_count, video_info)) kiriban_list.append ((kiriban_views_count, video_info,
cast (Video, Video.where ('code', code).first ()).uploaded_at))
return kiriban_list return kiriban_list
def get_comments (
video_code: str,
) -> list[Comment]:
video = Video.where ('code', video_code).first ()
if video is None:
return []
return video.comments
class DbConfig (TypedDict): class DbConfig (TypedDict):
driver: str driver: str
host: str host: str