コミットを比較
16 コミット
| 作成者 | SHA1 | 日付 | |
|---|---|---|---|
| 6d4e826439 | |||
| e6f90611fa | |||
| 4f2056b347 | |||
| 64279b2eca | |||
| 4bd15d0cc9 | |||
| 26867f5269 | |||
| 0dd27b4ec7 | |||
| fb920a1f2c | |||
| 5041b91485 | |||
| 06b9d015d0 | |||
| fad7ffe969 | |||
| d6654c41ba | |||
| a4e0a5fcd8 | |||
| f3c8f5fa27 | |||
| f6ab471e04 | |||
| ad9f5256e5 |
@@ -0,0 +1,2 @@
|
||||
/db
|
||||
/eloquent.pyi
|
||||
+12
-3
@@ -1,6 +1,7 @@
|
||||
from datetime import datetime
|
||||
|
||||
from atproto import models
|
||||
from atproto.models.AppBskyFeedDefs import BlockedPost, NotFoundPost
|
||||
from atproto_client.models.app.bsky.feed import get_timeline
|
||||
|
||||
|
||||
class Client:
|
||||
@@ -14,8 +15,12 @@ class Client:
|
||||
parent_height: int | None = None
|
||||
) -> Response: ...
|
||||
|
||||
def get_timeline (self) -> get_timeline.Response: ...
|
||||
|
||||
def follow (self, did: str) -> None: ...
|
||||
|
||||
def like (self, uri: str, cid: str) -> None: ...
|
||||
|
||||
|
||||
class AppNamespace:
|
||||
bsky: AppBskyNamespace
|
||||
@@ -34,8 +39,12 @@ class AppBskyNotificationNamespace:
|
||||
class Response:
|
||||
notifications: list[Notification]
|
||||
thread: (ThreadViewPost
|
||||
| models.AppBskyFeedDefs.NotFoundPost
|
||||
| models.AppBskyFeedDefs.BlockedPost)
|
||||
| NotFoundPost
|
||||
| BlockedPost)
|
||||
|
||||
|
||||
class ThreadViewPost:
|
||||
pass
|
||||
|
||||
|
||||
class Notification:
|
||||
|
||||
@@ -0,0 +1,5 @@
|
||||
class NotFoundPost:
|
||||
pass
|
||||
|
||||
class BlockedPost:
|
||||
pass
|
||||
@@ -0,0 +1,5 @@
|
||||
from atproto.models.AppBskyFeedDefs import FeedViewPost
|
||||
|
||||
|
||||
class Response:
|
||||
feed: list[FeedViewPost]
|
||||
@@ -3,6 +3,8 @@ Bluesky のニジカがいろいろする.
|
||||
(近々機能ごとにファイル分けて systemd でイベント管理する予定)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
import random
|
||||
import sys
|
||||
@@ -10,10 +12,11 @@ import time
|
||||
from datetime import date, datetime
|
||||
from datetime import time as dt_time
|
||||
from datetime import timedelta
|
||||
from typing import cast
|
||||
from typing import TypedDict, cast
|
||||
|
||||
import requests
|
||||
from atproto import Client, models
|
||||
from atproto_client.models.app.bsky.feed.get_timeline import Response
|
||||
from bs4 import BeautifulSoup
|
||||
from requests.exceptions import Timeout
|
||||
|
||||
@@ -21,45 +24,14 @@ import account
|
||||
import nico
|
||||
from ai.talk import Talk
|
||||
|
||||
|
||||
def check_notifications (
|
||||
client: Client,
|
||||
) -> list:
|
||||
(uris, last_seen_at) = ([], client.get_current_time_iso ())
|
||||
|
||||
for notification in (client.app.bsky.notification.list_notifications ()
|
||||
.notifications):
|
||||
if not notification.is_read:
|
||||
if notification.reason in ['mention', 'reply', 'quote']:
|
||||
uris += [notification.uri]
|
||||
elif notification.reason == 'follow':
|
||||
client.follow (notification.author.did)
|
||||
|
||||
client.app.bsky.notification.update_seen ({ 'seen_at': last_seen_at })
|
||||
|
||||
return uris
|
||||
|
||||
|
||||
def get_thread_contents (
|
||||
client: Client,
|
||||
uri: str,
|
||||
parent_height: int,
|
||||
) -> list:
|
||||
response = (client.get_post_thread (uri = uri,
|
||||
parent_height = parent_height)
|
||||
.thread)
|
||||
records = []
|
||||
while response is not None:
|
||||
records += [{ 'strong_ref': models.create_strong_ref (response.post),
|
||||
'did': response.post.author.did,
|
||||
'handle': response.post.author.handle,
|
||||
'name': response.post.author.display_name,
|
||||
'datetime': response.post.record.created_at,
|
||||
'text': response.post.record.text,
|
||||
'embed': response.post.record.embed }]
|
||||
response = response.parent
|
||||
|
||||
return records
|
||||
TARGET_WORDS = ['deerjika', 'ニジカ', 'ぼっち', '虹夏', '郁代', 'バーカ',
|
||||
'kfif', 'kita-flatten-ikuyo-flatten', 'ラマ田', 'ゴートう',
|
||||
'ぼざクリ', 'オオミソカ', '伊地知', '喜多ちゃん',
|
||||
'喜タイ', '洗澡鹿', 'シーザオ', '今日は本当に',
|
||||
'ダイソーで', '変なチンチン', 'daisoで', 'だね~(笑)',
|
||||
'おやつタイム', 'わさしが', 'わさび県', 'たぬマ', 'にくまる',
|
||||
'ルイズマリー', '餅', 'ニジゴ', 'ゴニジ', 'ニジニジ',
|
||||
'新年だよね', 'うんこじゃん', 'ほくほくのジャガイモ']
|
||||
|
||||
|
||||
def main (
|
||||
@@ -71,7 +43,8 @@ def main (
|
||||
client.login (account.USER_ID, account.PASSWORD)
|
||||
|
||||
got_kiriban_at: date = datetime.now ().date () - timedelta (days = datetime.now ().hour < 15)
|
||||
kiriban_list: list[tuple[int, nico.VideoInfo]] = nico.get_kiriban_list (got_kiriban_at)
|
||||
kiriban_list: list[tuple[int, nico.VideoInfo, datetime]] = (
|
||||
nico.get_kiriban_list (got_kiriban_at))
|
||||
kiriban_interval: timedelta = ((get_kiriban_dt_to_update () - datetime.now ())
|
||||
/ len (kiriban_list))
|
||||
next_kiriban_at = datetime.now ()
|
||||
@@ -108,9 +81,12 @@ def main (
|
||||
parent = records[0]['strong_ref'],
|
||||
root = records[-1]['strong_ref']))
|
||||
|
||||
like_posts (client)
|
||||
|
||||
if kiriban_list and datetime.now () >= next_kiriban_at:
|
||||
(views_count, video_info) = (
|
||||
(views_count, video_info, uploaded_at) = (
|
||||
kiriban_list.pop (random.randint (0, len (kiriban_list) - 1)))
|
||||
since_posted = datetime.now () - uploaded_at
|
||||
uri = f"https://www.nicovideo.jp/watch/{ video_info['contentId'] }"
|
||||
(title, description, thumbnail) = get_embed_info (uri)
|
||||
try:
|
||||
@@ -120,21 +96,36 @@ def main (
|
||||
thumb = upload.blob
|
||||
except Timeout:
|
||||
thumb = None
|
||||
comments = nico.get_comments (video_info['contentId'])
|
||||
popular_comments = sorted (comments,
|
||||
key = lambda c: c.nico_count,
|
||||
reverse = True)[:10]
|
||||
latest_comments = sorted (comments,
|
||||
key = lambda c: c.posted_at,
|
||||
reverse = True)[:10]
|
||||
embed_external = models.AppBskyEmbedExternal.Main (
|
||||
external = models.AppBskyEmbedExternal.External (
|
||||
title = title,
|
||||
description = description,
|
||||
thumb = thumb,
|
||||
uri = uri))
|
||||
client.post (Talk.main (f"""
|
||||
ニコニコの『{ video_info['title'] }』という動画が{ views_count }再生を突破しました。
|
||||
つけられたタグは「{ '」、「'.join (video_info['tags']) }」です。
|
||||
prompt = f"{ since_posted.days }日と{ since_posted.seconds }秒前にニコニコに投稿された『{ video_info['title'] }』という動画が{ views_count }再生を突破しました。\n"
|
||||
prompt += f"コメント数は{ len (comments) }件です。\n"
|
||||
if video_info['tags']:
|
||||
prompt += f"つけられたタグは「{ '」、「'.join (video_info['tags']) }」です。\n"
|
||||
if comments:
|
||||
prompt += f"人気のコメントは次の通りです:「{ '」、「'.join (c.content for c in popular_comments) }」\n"
|
||||
prompt += f"最新のコメントは次の通りです:「{ '」、「'.join (c.content for c in latest_comments) }」\n"
|
||||
prompt += f"""
|
||||
概要には次のように書かれています:
|
||||
```html
|
||||
{ video_info['description'] }
|
||||
```
|
||||
このことについて、ニジカちゃんからのお祝いメッセージを下さい。"""),
|
||||
embed = embed_external)
|
||||
このことについて、ニジカちゃんからのお祝いメッセージを下さい。
|
||||
ただし、そのメッセージ内には再生数の数値とその多さに応じたリアクションを添えてください。
|
||||
また、ぜひ投稿からこの再生数に至るまでにかかった時間や、つけられたタグ、コメントに対して思いを馳せてください。
|
||||
好きなコメントがあったら教えてね。"""
|
||||
client.post (Talk.main (prompt), embed = embed_external)
|
||||
next_kiriban_at += kiriban_interval
|
||||
last_posted_at = now
|
||||
|
||||
@@ -239,6 +230,46 @@ def main (
|
||||
time.sleep (60)
|
||||
|
||||
|
||||
def check_notifications (
|
||||
client: Client,
|
||||
) -> list:
|
||||
(uris, last_seen_at) = ([], client.get_current_time_iso ())
|
||||
|
||||
for notification in (client.app.bsky.notification.list_notifications ()
|
||||
.notifications):
|
||||
if not notification.is_read:
|
||||
if notification.reason in ['mention', 'reply', 'quote']:
|
||||
uris += [notification.uri]
|
||||
elif notification.reason == 'follow':
|
||||
client.follow (notification.author.did)
|
||||
|
||||
client.app.bsky.notification.update_seen ({ 'seen_at': last_seen_at })
|
||||
|
||||
return uris
|
||||
|
||||
|
||||
def get_thread_contents (
|
||||
client: Client,
|
||||
uri: str,
|
||||
parent_height: int,
|
||||
) -> list:
|
||||
response = (client.get_post_thread (uri = uri,
|
||||
parent_height = parent_height)
|
||||
.thread)
|
||||
records = []
|
||||
while response is not None:
|
||||
records += [{ 'strong_ref': models.create_strong_ref (response.post),
|
||||
'did': response.post.author.did,
|
||||
'handle': response.post.author.handle,
|
||||
'name': response.post.author.display_name,
|
||||
'datetime': response.post.record.created_at,
|
||||
'text': response.post.record.text,
|
||||
'embed': response.post.record.embed }]
|
||||
response = response.parent
|
||||
|
||||
return records
|
||||
|
||||
|
||||
def get_embed_info (
|
||||
url: str
|
||||
) -> tuple[str, str, str]:
|
||||
@@ -287,5 +318,32 @@ def get_kiriban_dt_to_update (
|
||||
return dt
|
||||
|
||||
|
||||
def like_posts (
|
||||
client: Client,
|
||||
) -> None:
|
||||
for post in get_target_posts (client):
|
||||
client.like (**post)
|
||||
|
||||
|
||||
def get_target_posts (
|
||||
client: Client,
|
||||
) -> list[LikeParams]:
|
||||
posts = []
|
||||
|
||||
timeline: Response = client.get_timeline ()
|
||||
for feed in timeline.feed:
|
||||
if (feed.post.author.did != client.me.did
|
||||
and (feed.post.viewer.like is None)
|
||||
and any (target_word in feed.post.record.text.lower () for target_word in TARGET_WORDS)):
|
||||
posts.append (LikeParams({ 'uri': feed.post.uri, 'cid': feed.post.cid }))
|
||||
|
||||
return posts
|
||||
|
||||
|
||||
class LikeParams (TypedDict):
|
||||
uri: str
|
||||
cid: str
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main (*sys.argv[1:])
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from datetime import date, timedelta
|
||||
from datetime import date, datetime, timedelta
|
||||
from typing import TypedDict, cast
|
||||
|
||||
import requests
|
||||
@@ -13,7 +13,7 @@ from bs4 import BeautifulSoup
|
||||
from requests.exceptions import Timeout
|
||||
from eloquent import DatabaseManager, Model
|
||||
|
||||
from db.models import Tag, Video, VideoHistory, VideoTag
|
||||
from db.models import Comment, Tag, Video, VideoHistory, VideoTag
|
||||
|
||||
CONFIG: dict[str, DbConfig] = { 'mysql': { 'driver': 'mysql',
|
||||
'host': 'localhost',
|
||||
@@ -25,12 +25,12 @@ CONFIG: dict[str, DbConfig] = { 'mysql': { 'driver': 'mysql',
|
||||
DB = DatabaseManager (CONFIG)
|
||||
Model.set_connection_resolver (DB)
|
||||
|
||||
KIRIBAN_VIEWS_COUNTS: set[int] = { *range (100, 1_000, 100),
|
||||
*range (1_000, 10_000, 1_000),
|
||||
*range (10_000, 1_000_001, 10_000),
|
||||
194, 245, 510, 114_514, 1_940, 2_450, 5_100, 24_500,
|
||||
51_000, 2_424 }
|
||||
|
||||
KIRIBAN_VIEWS_COUNTS: list[int] = sorted ({ *range (1_000, 10_000, 1_000),
|
||||
*range (10_000, 1_000_001, 10_000),
|
||||
194, 245, 510, 810, 114_514, 1_940, 2_450, 5_100,
|
||||
19_400, 24_500, 51_000, 93_194, 2_424, 242_424, 1_919,
|
||||
4_545, 194_245, 245_194, 510_245 },
|
||||
reverse = True)
|
||||
|
||||
class VideoInfo (TypedDict):
|
||||
contentId: str
|
||||
@@ -124,31 +124,47 @@ def get_video_info (
|
||||
|
||||
def get_kiriban_list (
|
||||
base_date: date,
|
||||
) -> list[tuple[int, VideoInfo]]:
|
||||
kiriban_list: list[tuple[int, VideoInfo]] = []
|
||||
) -> list[tuple[int, VideoInfo, datetime]]:
|
||||
kiriban_list: list[tuple[int, VideoInfo, datetime]] = []
|
||||
|
||||
latest_fetched_at = cast (date, VideoHistory.max ('fetched_at'))
|
||||
previous_fetched_at = cast (date, (VideoHistory
|
||||
.where ('fetched_at', '<', latest_fetched_at)
|
||||
.max ('fetched_at')))
|
||||
latest_fetched_at = cast (date, (VideoHistory
|
||||
.where ('fetched_at', '<=', base_date)
|
||||
.max ('fetched_at')))
|
||||
|
||||
for kiriban_views_count in KIRIBAN_VIEWS_COUNTS:
|
||||
targets = ({ vh.video.code for vh in (VideoHistory
|
||||
.where ('fetched_at', latest_fetched_at)
|
||||
.where ('views_count', '>=', kiriban_views_count)
|
||||
.get ()) }
|
||||
- { vh.video.code for vh in (VideoHistory
|
||||
.where ('fetched_at', previous_fetched_at)
|
||||
.where ('views_count', '>=', kiriban_views_count)
|
||||
.get ()) })
|
||||
targets = { vh.video.code for vh in (VideoHistory
|
||||
.where ('fetched_at', latest_fetched_at)
|
||||
.where ('views_count', '>=', kiriban_views_count)
|
||||
.get ()) }
|
||||
for code in targets:
|
||||
if code in [kiriban[1]['contentId'] for kiriban in kiriban_list]:
|
||||
continue
|
||||
previous_views_count: int | None = (
|
||||
VideoHistory
|
||||
.where_has ('video', lambda q: q.where ('code', code))
|
||||
.where ('fetched_at', '<', latest_fetched_at)
|
||||
.max ('views_count'))
|
||||
if previous_views_count is None:
|
||||
previous_views_count = 0
|
||||
if previous_views_count >= kiriban_views_count:
|
||||
continue
|
||||
video_info = get_video_info (code)
|
||||
if video_info is not None:
|
||||
kiriban_list.append ((kiriban_views_count, video_info))
|
||||
kiriban_list.append ((kiriban_views_count, video_info,
|
||||
cast (Video, Video.where ('code', code).first ()).uploaded_at))
|
||||
|
||||
return kiriban_list
|
||||
|
||||
|
||||
def get_comments (
|
||||
video_code: str,
|
||||
) -> list[Comment]:
|
||||
video = Video.where ('code', video_code).first ()
|
||||
if video is None:
|
||||
return []
|
||||
return video.comments
|
||||
|
||||
|
||||
class DbConfig (TypedDict):
|
||||
driver: str
|
||||
host: str
|
||||
|
||||
新しい課題から参照
ユーザをブロックする