コミットを比較

...

16 コミット

7個のファイルの変更168行の追加73行の削除
+2
ファイルの表示
@@ -0,0 +1,2 @@
/db
/eloquent.pyi
+12 -3
ファイルの表示
@@ -1,6 +1,7 @@
from datetime import datetime
from atproto import models
from atproto.models.AppBskyFeedDefs import BlockedPost, NotFoundPost
from atproto_client.models.app.bsky.feed import get_timeline
class Client:
@@ -14,8 +15,12 @@ class Client:
parent_height: int | None = None
) -> Response: ...
def get_timeline (self) -> get_timeline.Response: ...
def follow (self, did: str) -> None: ...
def like (self, uri: str, cid: str) -> None: ...
class AppNamespace:
bsky: AppBskyNamespace
@@ -34,8 +39,12 @@ class AppBskyNotificationNamespace:
class Response:
notifications: list[Notification]
thread: (ThreadViewPost
| models.AppBskyFeedDefs.NotFoundPost
| models.AppBskyFeedDefs.BlockedPost)
| NotFoundPost
| BlockedPost)
class ThreadViewPost:
pass
class Notification:
ファイルの表示
+5
ファイルの表示
@@ -0,0 +1,5 @@
class NotFoundPost:
pass
class BlockedPost:
pass
+5
ファイルの表示
@@ -0,0 +1,5 @@
from atproto.models.AppBskyFeedDefs import FeedViewPost
class Response:
feed: list[FeedViewPost]
+105 -47
ファイルの表示
@@ -3,6 +3,8 @@ Bluesky のニジカがいろいろする.
(近々機能ごとにファイル分けて systemd でイベント管理する予定)
"""
from __future__ import annotations
import io
import random
import sys
@@ -10,10 +12,11 @@ import time
from datetime import date, datetime
from datetime import time as dt_time
from datetime import timedelta
from typing import cast
from typing import TypedDict, cast
import requests
from atproto import Client, models
from atproto_client.models.app.bsky.feed.get_timeline import Response
from bs4 import BeautifulSoup
from requests.exceptions import Timeout
@@ -21,45 +24,14 @@ import account
import nico
from ai.talk import Talk
def check_notifications (
client: Client,
) -> list:
(uris, last_seen_at) = ([], client.get_current_time_iso ())
for notification in (client.app.bsky.notification.list_notifications ()
.notifications):
if not notification.is_read:
if notification.reason in ['mention', 'reply', 'quote']:
uris += [notification.uri]
elif notification.reason == 'follow':
client.follow (notification.author.did)
client.app.bsky.notification.update_seen ({ 'seen_at': last_seen_at })
return uris
def get_thread_contents (
client: Client,
uri: str,
parent_height: int,
) -> list:
response = (client.get_post_thread (uri = uri,
parent_height = parent_height)
.thread)
records = []
while response is not None:
records += [{ 'strong_ref': models.create_strong_ref (response.post),
'did': response.post.author.did,
'handle': response.post.author.handle,
'name': response.post.author.display_name,
'datetime': response.post.record.created_at,
'text': response.post.record.text,
'embed': response.post.record.embed }]
response = response.parent
return records
TARGET_WORDS = ['deerjika', 'ニジカ', 'ぼっち', '虹夏', '郁代', 'バーカ',
'kfif', 'kita-flatten-ikuyo-flatten', 'ラマ田', 'ゴートう',
'ぼざクリ', 'オオミソカ', '伊地知', '喜多ちゃん',
'喜タイ', '洗澡鹿', 'シーザオ', '今日は本当に',
'ダイソーで', '変なチンチン', 'daisoで', 'だね~(笑)',
'おやつタイム', 'わさしが', 'わさび県', 'たぬマ', 'にくまる',
'ルイズマリー', '', 'ニジゴ', 'ゴニジ', 'ニジニジ',
'新年だよね', 'うんこじゃん', 'ほくほくのジャガイモ']
def main (
@@ -71,7 +43,8 @@ def main (
client.login (account.USER_ID, account.PASSWORD)
got_kiriban_at: date = datetime.now ().date () - timedelta (days = datetime.now ().hour < 15)
kiriban_list: list[tuple[int, nico.VideoInfo]] = nico.get_kiriban_list (got_kiriban_at)
kiriban_list: list[tuple[int, nico.VideoInfo, datetime]] = (
nico.get_kiriban_list (got_kiriban_at))
kiriban_interval: timedelta = ((get_kiriban_dt_to_update () - datetime.now ())
/ len (kiriban_list))
next_kiriban_at = datetime.now ()
@@ -108,9 +81,12 @@ def main (
parent = records[0]['strong_ref'],
root = records[-1]['strong_ref']))
like_posts (client)
if kiriban_list and datetime.now () >= next_kiriban_at:
(views_count, video_info) = (
(views_count, video_info, uploaded_at) = (
kiriban_list.pop (random.randint (0, len (kiriban_list) - 1)))
since_posted = datetime.now () - uploaded_at
uri = f"https://www.nicovideo.jp/watch/{ video_info['contentId'] }"
(title, description, thumbnail) = get_embed_info (uri)
try:
@@ -120,21 +96,36 @@ def main (
thumb = upload.blob
except Timeout:
thumb = None
comments = nico.get_comments (video_info['contentId'])
popular_comments = sorted (comments,
key = lambda c: c.nico_count,
reverse = True)[:10]
latest_comments = sorted (comments,
key = lambda c: c.posted_at,
reverse = True)[:10]
embed_external = models.AppBskyEmbedExternal.Main (
external = models.AppBskyEmbedExternal.External (
title = title,
description = description,
thumb = thumb,
uri = uri))
client.post (Talk.main (f"""
ニコニコの『{ video_info['title'] }』という動画が{ views_count }再生を突破しました。
つけられたタグは「{ '」、「'.join (video_info['tags']) }」です。
prompt = f"{ since_posted.days }日と{ since_posted.seconds }秒前にニコニコに投稿された『{ video_info['title'] }』という動画が{ views_count }再生を突破しました。\n"
prompt += f"コメント数は{ len (comments) }件です。\n"
if video_info['tags']:
prompt += f"つけられたタグは「{ '」、「'.join (video_info['tags']) }」です。\n"
if comments:
prompt += f"人気のコメントは次の通りです:「{ '」、「'.join (c.content for c in popular_comments) }\n"
prompt += f"最新のコメントは次の通りです:「{ '」、「'.join (c.content for c in latest_comments) }\n"
prompt += f"""
概要には次のように書かれています:
```html
{ video_info['description'] }
```
このことについて、ニジカちゃんからのお祝いメッセージを下さい。"""),
embed = embed_external)
このことについて、ニジカちゃんからのお祝いメッセージを下さい。
ただし、そのメッセージ内には再生数の数値とその多さに応じたリアクションを添えてください。
また、ぜひ投稿からこの再生数に至るまでにかかった時間や、つけられたタグ、コメントに対して思いを馳せてください。
好きなコメントがあったら教えてね。"""
client.post (Talk.main (prompt), embed = embed_external)
next_kiriban_at += kiriban_interval
last_posted_at = now
@@ -239,6 +230,46 @@ def main (
time.sleep (60)
def check_notifications (
client: Client,
) -> list:
(uris, last_seen_at) = ([], client.get_current_time_iso ())
for notification in (client.app.bsky.notification.list_notifications ()
.notifications):
if not notification.is_read:
if notification.reason in ['mention', 'reply', 'quote']:
uris += [notification.uri]
elif notification.reason == 'follow':
client.follow (notification.author.did)
client.app.bsky.notification.update_seen ({ 'seen_at': last_seen_at })
return uris
def get_thread_contents (
client: Client,
uri: str,
parent_height: int,
) -> list:
response = (client.get_post_thread (uri = uri,
parent_height = parent_height)
.thread)
records = []
while response is not None:
records += [{ 'strong_ref': models.create_strong_ref (response.post),
'did': response.post.author.did,
'handle': response.post.author.handle,
'name': response.post.author.display_name,
'datetime': response.post.record.created_at,
'text': response.post.record.text,
'embed': response.post.record.embed }]
response = response.parent
return records
def get_embed_info (
url: str
) -> tuple[str, str, str]:
@@ -287,5 +318,32 @@ def get_kiriban_dt_to_update (
return dt
def like_posts (
client: Client,
) -> None:
for post in get_target_posts (client):
client.like (**post)
def get_target_posts (
client: Client,
) -> list[LikeParams]:
posts = []
timeline: Response = client.get_timeline ()
for feed in timeline.feed:
if (feed.post.author.did != client.me.did
and (feed.post.viewer.like is None)
and any (target_word in feed.post.record.text.lower () for target_word in TARGET_WORDS)):
posts.append (LikeParams({ 'uri': feed.post.uri, 'cid': feed.post.cid }))
return posts
class LikeParams (TypedDict):
uri: str
cid: str
if __name__ == '__main__':
main (*sys.argv[1:])
+39 -23
ファイルの表示
@@ -5,7 +5,7 @@
from __future__ import annotations
import os
from datetime import date, timedelta
from datetime import date, datetime, timedelta
from typing import TypedDict, cast
import requests
@@ -13,7 +13,7 @@ from bs4 import BeautifulSoup
from requests.exceptions import Timeout
from eloquent import DatabaseManager, Model
from db.models import Tag, Video, VideoHistory, VideoTag
from db.models import Comment, Tag, Video, VideoHistory, VideoTag
CONFIG: dict[str, DbConfig] = { 'mysql': { 'driver': 'mysql',
'host': 'localhost',
@@ -25,12 +25,12 @@ CONFIG: dict[str, DbConfig] = { 'mysql': { 'driver': 'mysql',
DB = DatabaseManager (CONFIG)
Model.set_connection_resolver (DB)
KIRIBAN_VIEWS_COUNTS: set[int] = { *range (100, 1_000, 100),
*range (1_000, 10_000, 1_000),
*range (10_000, 1_000_001, 10_000),
194, 245, 510, 114_514, 1_940, 2_450, 5_100, 24_500,
51_000, 2_424 }
KIRIBAN_VIEWS_COUNTS: list[int] = sorted ({ *range (1_000, 10_000, 1_000),
*range (10_000, 1_000_001, 10_000),
194, 245, 510, 810, 114_514, 1_940, 2_450, 5_100,
19_400, 24_500, 51_000, 93_194, 2_424, 242_424, 1_919,
4_545, 194_245, 245_194, 510_245 },
reverse = True)
class VideoInfo (TypedDict):
contentId: str
@@ -124,31 +124,47 @@ def get_video_info (
def get_kiriban_list (
base_date: date,
) -> list[tuple[int, VideoInfo]]:
kiriban_list: list[tuple[int, VideoInfo]] = []
) -> list[tuple[int, VideoInfo, datetime]]:
kiriban_list: list[tuple[int, VideoInfo, datetime]] = []
latest_fetched_at = cast (date, VideoHistory.max ('fetched_at'))
previous_fetched_at = cast (date, (VideoHistory
.where ('fetched_at', '<', latest_fetched_at)
.max ('fetched_at')))
latest_fetched_at = cast (date, (VideoHistory
.where ('fetched_at', '<=', base_date)
.max ('fetched_at')))
for kiriban_views_count in KIRIBAN_VIEWS_COUNTS:
targets = ({ vh.video.code for vh in (VideoHistory
.where ('fetched_at', latest_fetched_at)
.where ('views_count', '>=', kiriban_views_count)
.get ()) }
- { vh.video.code for vh in (VideoHistory
.where ('fetched_at', previous_fetched_at)
.where ('views_count', '>=', kiriban_views_count)
.get ()) })
targets = { vh.video.code for vh in (VideoHistory
.where ('fetched_at', latest_fetched_at)
.where ('views_count', '>=', kiriban_views_count)
.get ()) }
for code in targets:
if code in [kiriban[1]['contentId'] for kiriban in kiriban_list]:
continue
previous_views_count: int | None = (
VideoHistory
.where_has ('video', lambda q: q.where ('code', code))
.where ('fetched_at', '<', latest_fetched_at)
.max ('views_count'))
if previous_views_count is None:
previous_views_count = 0
if previous_views_count >= kiriban_views_count:
continue
video_info = get_video_info (code)
if video_info is not None:
kiriban_list.append ((kiriban_views_count, video_info))
kiriban_list.append ((kiriban_views_count, video_info,
cast (Video, Video.where ('code', code).first ()).uploaded_at))
return kiriban_list
def get_comments (
video_code: str,
) -> list[Comment]:
video = Video.where ('code', video_code).first ()
if video is None:
return []
return video.comments
class DbConfig (TypedDict):
driver: str
host: str