統合 AI への移行 #16
+6
-3
@@ -1,6 +1,9 @@
|
||||
[submodule "nizika_nico"]
|
||||
path = nizika_nico
|
||||
url = https://git.miteruzo.com/miteruzo/nizika_nico
|
||||
[submodule "ai"]
|
||||
path = ai
|
||||
url = https://git.miteruzo.com/miteruzo/nizika_broadcast
|
||||
[submodule "nizika_ai"]
|
||||
path = nizika_ai
|
||||
url = https://git.miteruzo.com/miteruzo/nizika_ai.git
|
||||
[submodule "nicolib"]
|
||||
path = nicolib
|
||||
url = https://git.miteruzo.com/miteruzo/nicolib.git
|
||||
|
||||
-1
サブモジュール ai が 299a3acdff から削除されました
-58
@@ -1,58 +0,0 @@
|
||||
from datetime import datetime
|
||||
|
||||
from atproto.models.AppBskyFeedDefs import BlockedPost, NotFoundPost
|
||||
from atproto_client.models.app.bsky.feed import get_timeline
|
||||
|
||||
|
||||
class Client:
|
||||
app: AppNamespace
|
||||
|
||||
def get_current_time_iso (self) -> datetime: ...
|
||||
|
||||
def get_post_thread (
|
||||
self,
|
||||
uri: str,
|
||||
parent_height: int | None = None
|
||||
) -> Response: ...
|
||||
|
||||
def get_timeline (self) -> get_timeline.Response: ...
|
||||
|
||||
def follow (self, did: str) -> None: ...
|
||||
|
||||
def like (self, uri: str, cid: str) -> None: ...
|
||||
|
||||
|
||||
class AppNamespace:
|
||||
bsky: AppBskyNamespace
|
||||
|
||||
|
||||
class AppBskyNamespace:
|
||||
notification: AppBskyNotificationNamespace
|
||||
|
||||
|
||||
class AppBskyNotificationNamespace:
|
||||
def list_notifications (self) -> Response: ...
|
||||
|
||||
def update_seen (self, seen: dict[str, datetime]) -> None: ...
|
||||
|
||||
|
||||
class Response:
|
||||
notifications: list[Notification]
|
||||
thread: (ThreadViewPost
|
||||
| NotFoundPost
|
||||
| BlockedPost)
|
||||
|
||||
|
||||
class ThreadViewPost:
|
||||
pass
|
||||
|
||||
|
||||
class Notification:
|
||||
is_read: bool
|
||||
reason: str
|
||||
uri: str
|
||||
author: ProfileView
|
||||
|
||||
|
||||
class ProfileView:
|
||||
did: str
|
||||
@@ -1,5 +0,0 @@
|
||||
class NotFoundPost:
|
||||
pass
|
||||
|
||||
class BlockedPost:
|
||||
pass
|
||||
@@ -1,5 +0,0 @@
|
||||
from atproto.models.AppBskyFeedDefs import FeedViewPost
|
||||
|
||||
|
||||
class Response:
|
||||
feed: list[FeedViewPost]
|
||||
@@ -1,28 +1,24 @@
|
||||
"""
|
||||
Bluesky のニジカがいろいろする.
|
||||
(近々機能ごとにファイル分けて systemd でイベント管理する予定)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
import random
|
||||
import sys
|
||||
import asyncio
|
||||
import os
|
||||
import time
|
||||
from datetime import date, datetime
|
||||
from datetime import time as dt_time
|
||||
from datetime import timedelta
|
||||
from typing import TypedDict, cast
|
||||
from datetime import datetime
|
||||
from io import BytesIO
|
||||
from typing import Any, TypedDict
|
||||
|
||||
import atproto # type: ignore
|
||||
import requests
|
||||
from atproto import Client, models
|
||||
from atproto_client.models.app.bsky.feed.get_timeline import Response
|
||||
from bs4 import BeautifulSoup
|
||||
from atproto import Client # type: ignore
|
||||
from atproto.models import AppBskyEmbedExternal, AppBskyEmbedImages # type: ignore
|
||||
from atproto.models.AppBskyFeedPost import ReplyRef # type: ignore
|
||||
from atproto_client.models.app.bsky.feed.get_timeline import Response # type: ignore
|
||||
from requests.exceptions import Timeout
|
||||
|
||||
import account
|
||||
import nico
|
||||
from ai.talk import Talk
|
||||
import nicolib
|
||||
from nizika_ai.consts import Character, GPTModel, Platform, QueryType
|
||||
from nizika_ai.models import Answer, AnsweredFlag, Query, User
|
||||
|
||||
TARGET_WORDS = ['deerjika', 'ニジカ', 'ぼっち', '虹夏', '郁代', 'バーカ',
|
||||
'kfif', 'kita-flatten-ikuyo-flatten', 'ラマ田', 'ゴートう',
|
||||
@@ -33,317 +29,284 @@ TARGET_WORDS = ['deerjika', 'ニジカ', 'ぼっち', '虹夏', '郁代', 'バ
|
||||
'ルイズマリー', '餅', 'ニジゴ', 'ゴニジ', 'ニジニジ',
|
||||
'新年だよね', 'うんこじゃん', 'ほくほくのジャガイモ']
|
||||
|
||||
time.sleep (60)
|
||||
|
||||
def main (
|
||||
client = Client (base_url = 'https://bsky.social')
|
||||
client.login (account.USER_ID, account.PASSWORD)
|
||||
|
||||
|
||||
async def main (
|
||||
) -> None:
|
||||
time.sleep (60)
|
||||
"""
|
||||
メーン処理
|
||||
"""
|
||||
|
||||
client = Client (base_url = 'https://bsky.social')
|
||||
await asyncio.gather (like_posts (),
|
||||
check_mentions (),
|
||||
answer ())
|
||||
|
||||
client.login (account.USER_ID, account.PASSWORD)
|
||||
|
||||
got_kiriban_at: date = datetime.now ().date () - timedelta (days = datetime.now ().hour < 15)
|
||||
kiriban_list: list[tuple[int, nico.VideoInfo, datetime]] = (
|
||||
nico.get_kiriban_list (got_kiriban_at))
|
||||
kiriban_interval: timedelta = ((get_kiriban_dt_to_update () - datetime.now ())
|
||||
/ len (kiriban_list))
|
||||
next_kiriban_at = datetime.now ()
|
||||
|
||||
last_posted_at = datetime.now () - timedelta (hours = 6)
|
||||
has_got_snack_time = False
|
||||
has_taken_hot_spring = False
|
||||
watched_videos = []
|
||||
async def like_posts (
|
||||
) -> None:
|
||||
while True:
|
||||
now = datetime.now ()
|
||||
try:
|
||||
for post in fetch_target_posts ():
|
||||
client.like (**post)
|
||||
except Exception as e:
|
||||
print (f"[like_posts] { type (e).__name__ }: { e }")
|
||||
|
||||
for uri in check_notifications (client):
|
||||
records = get_thread_contents (client, uri, 20)
|
||||
if len (records) > 0:
|
||||
answer = Talk.main ((records[0]['text']
|
||||
if (records[0]['embed'] is None
|
||||
or not hasattr (records[0]['embed'],
|
||||
'images'))
|
||||
else [
|
||||
{ 'type': 'text', 'text': records[0]['text'] },
|
||||
{ 'type': 'image_url', 'image_url': {
|
||||
'url': f"https://cdn.bsky.app/img/feed_fullsize/plain/{ records[0]['did'] }/{ records[0]['embed'].images[0].image.ref.link }" } }]),
|
||||
records[0]['name'],
|
||||
[*map (lambda record: {
|
||||
'role': ('assistant'
|
||||
if (record['handle']
|
||||
== account.USER_ID)
|
||||
else 'user'),
|
||||
'content':
|
||||
record['text']},
|
||||
reversed (records[1:]))])
|
||||
client.post (answer,
|
||||
reply_to = models.AppBskyFeedPost.ReplyRef (
|
||||
parent = records[0]['strong_ref'],
|
||||
root = records[-1]['strong_ref']))
|
||||
await asyncio.sleep (60)
|
||||
|
||||
like_posts (client)
|
||||
|
||||
if kiriban_list and datetime.now () >= next_kiriban_at:
|
||||
(views_count, video_info, uploaded_at) = (
|
||||
kiriban_list.pop (random.randint (0, len (kiriban_list) - 1)))
|
||||
since_posted = datetime.now () - uploaded_at
|
||||
uri = f"https://www.nicovideo.jp/watch/{ video_info['contentId'] }"
|
||||
(title, description, thumbnail) = get_embed_info (uri)
|
||||
try:
|
||||
upload = client.com.atproto.repo.upload_blob (
|
||||
io.BytesIO (requests.get (thumbnail,
|
||||
timeout = 60).content))
|
||||
thumb = upload.blob
|
||||
except Timeout:
|
||||
thumb = None
|
||||
comments = nico.get_comments (video_info['contentId'])
|
||||
popular_comments = sorted (comments,
|
||||
key = lambda c: c.nico_count,
|
||||
reverse = True)[:10]
|
||||
latest_comments = sorted (comments,
|
||||
key = lambda c: c.posted_at,
|
||||
reverse = True)[:10]
|
||||
embed_external = models.AppBskyEmbedExternal.Main (
|
||||
external = models.AppBskyEmbedExternal.External (
|
||||
title = title,
|
||||
description = description,
|
||||
thumb = thumb,
|
||||
uri = uri))
|
||||
prompt = f"{ since_posted.days }日と{ since_posted.seconds }秒前にニコニコに投稿された『{ video_info['title'] }』という動画が{ views_count }再生を突破しました。\n"
|
||||
prompt += f"コメント数は{ len (comments) }件です。\n"
|
||||
if video_info['tags']:
|
||||
prompt += f"つけられたタグは「{ '」、「'.join (video_info['tags']) }」です。\n"
|
||||
if comments:
|
||||
prompt += f"人気のコメントは次の通りです:「{ '」、「'.join (c.content for c in popular_comments) }」\n"
|
||||
prompt += f"最新のコメントは次の通りです:「{ '」、「'.join (c.content for c in latest_comments) }」\n"
|
||||
prompt += f"""
|
||||
概要には次のように書かれています:
|
||||
```html
|
||||
{ video_info['description'] }
|
||||
```
|
||||
このことについて、ニジカちゃんからのお祝いメッセージを下さい。
|
||||
ただし、そのメッセージ内には再生数の数値とその多さに応じたリアクションを添えてください。
|
||||
また、ぜひ投稿からこの再生数に至るまでにかかった時間や、つけられたタグ、コメントに対して思いを馳せてください。
|
||||
好きなコメントがあったら教えてね。"""
|
||||
client.post (Talk.main (prompt), embed = embed_external)
|
||||
next_kiriban_at += kiriban_interval
|
||||
last_posted_at = now
|
||||
async def check_mentions (
|
||||
) -> None:
|
||||
while True:
|
||||
try:
|
||||
for uri in check_notifications ():
|
||||
records = fetch_thread_contents (uri, 20)
|
||||
if records:
|
||||
record = records[0]
|
||||
image_url: str | None = None
|
||||
if record['embed'] and hasattr (record['embed'], 'images'):
|
||||
image_url = ('https://cdn.bsky.app/img/feed_fullsize/plain'
|
||||
f"/{ record['did'] }"
|
||||
f"/{ record['embed'].images[0].image.ref.link }")
|
||||
user = _fetch_user (record['did'], record['name'])
|
||||
_add_query (user, record['text'], image_url, {
|
||||
'uri': record['strong_ref']['uri'],
|
||||
'cid': record['strong_ref']['cid'] })
|
||||
except Exception as e:
|
||||
print (f"[check_mentions] { type (e).__name__ }: { e }")
|
||||
|
||||
latest_deerjika = nico.get_latest_deerjika ()
|
||||
if latest_deerjika is not None:
|
||||
for datum in [e for e in [latest_deerjika]
|
||||
if e['contentId'] not in watched_videos]:
|
||||
watched_videos += [datum['contentId']]
|
||||
await asyncio.sleep (60)
|
||||
|
||||
uri = f"https://www.nicovideo.jp/watch/{ datum['contentId'] }"
|
||||
(title, description, thumbnail) = get_embed_info (uri)
|
||||
try:
|
||||
upload = client.com.atproto.repo.upload_blob (
|
||||
io.BytesIO (requests.get (thumbnail,
|
||||
timeout = 60).content))
|
||||
thumb = upload.blob
|
||||
except Timeout:
|
||||
thumb = None
|
||||
|
||||
embed_external = models.AppBskyEmbedExternal.Main (
|
||||
external = models.AppBskyEmbedExternal.External (
|
||||
async def answer (
|
||||
) -> None:
|
||||
while True:
|
||||
answered_flags = (
|
||||
AnsweredFlag
|
||||
.where ('platform', Platform.BLUESKY.value)
|
||||
.where ('answered', False)
|
||||
.get ())
|
||||
for answered_flag in answered_flags:
|
||||
td: dict[str, Any]
|
||||
answer = answered_flag.answer
|
||||
match QueryType (answer.query_rel.query_type):
|
||||
case QueryType.BLUESKY_COMMENT:
|
||||
td = answer.query_rel.transfer_data or { }
|
||||
uri: str | None = td.get ('uri')
|
||||
cid: str | None = td.get ('cid')
|
||||
if (not uri) or (not cid):
|
||||
continue
|
||||
|
||||
sref = { 'uri': uri, 'cid': cid }
|
||||
strong_ref = atproto.models.create_strong_ref (sref) # type: ignore
|
||||
reply_ref = ReplyRef (root = strong_ref, parent = strong_ref)
|
||||
try:
|
||||
client.post (answer.content, reply_to = reply_ref)
|
||||
except Exception as e:
|
||||
print (f"[answer/reply] { type (e).__name__ }: { e }")
|
||||
continue
|
||||
answered_flag.answered = True
|
||||
answered_flag.save ()
|
||||
case QueryType.KIRIBAN | QueryType.NICO_REPORT:
|
||||
td = answer.query_rel.transfer_data or { }
|
||||
video_code: str | None = td.get ('video_code')
|
||||
if not video_code:
|
||||
continue
|
||||
|
||||
uri = f"https://www.nicovideo.jp/watch/{ video_code }"
|
||||
(title, description, thumbnail) = nicolib.fetch_embed_info (uri)
|
||||
try:
|
||||
resp = requests.get (thumbnail, timeout = 60)
|
||||
resp.raise_for_status ()
|
||||
upload = client.com.atproto.repo.upload_blob (BytesIO (resp.content))
|
||||
thumb = upload.blob
|
||||
except Timeout:
|
||||
thumb = None
|
||||
except Exception as e:
|
||||
print (f"[answer/nico-thumb] { type (e).__name__ }: { e }")
|
||||
thumb = None
|
||||
|
||||
external = AppBskyEmbedExternal.External (
|
||||
title = title,
|
||||
description = description,
|
||||
thumb = thumb,
|
||||
uri = uri))
|
||||
client.post (Talk.main (f"""
|
||||
ニコニコに『{ datum['title'] }』という動画がアップされました。
|
||||
つけられたタグは「{ '」、「'.join (datum['tags']) }」です。
|
||||
概要には次のように書かれています:
|
||||
```html
|
||||
{ datum['description'] }
|
||||
```
|
||||
このことについて、みんなに告知するとともに、ニジカちゃんの感想を教えてください。 """),
|
||||
embed = embed_external)
|
||||
last_posted_at = now
|
||||
|
||||
if now.hour == 14 and has_got_snack_time:
|
||||
has_got_snack_time = False
|
||||
|
||||
if now.hour == 15:
|
||||
if got_kiriban_at < datetime.now ().date ():
|
||||
kiriban_list = nico.get_kiriban_list (datetime.now ().date ())
|
||||
got_kiriban_at = datetime.now ().date ()
|
||||
kiriban_interval = ((get_kiriban_dt_to_update () - datetime.now ())
|
||||
/ len (kiriban_list))
|
||||
next_kiriban_at = datetime.now ()
|
||||
|
||||
if not has_got_snack_time:
|
||||
try:
|
||||
with open ('./assets/snack-time.jpg', 'rb') as f:
|
||||
image = models.AppBskyEmbedImages.Image (
|
||||
alt = ('左に喜多ちゃん、右に人面鹿のニジカが'
|
||||
'V字に並んでいる。'
|
||||
'喜多ちゃんは右手でピースサインをして'
|
||||
'片目をウインクしている。'
|
||||
'ニジカは両手を広げ、'
|
||||
'右手にスプーンを持って'
|
||||
'ポーズを取っている。'
|
||||
'背景には'
|
||||
'赤と黄色の放射線状の模様が広がり、'
|
||||
'下部に「おやつタイムだ!!!!」という'
|
||||
'日本語のテキストが表示されている。'),
|
||||
image = client.com.atproto.repo.upload_blob (f).blob)
|
||||
client.post (Talk.main ('おやつタイムだ!!!!'),
|
||||
embed = models.app.bsky.embed.images.Main (
|
||||
images = [image]))
|
||||
last_posted_at = now
|
||||
except Exception:
|
||||
pass
|
||||
has_got_snack_time = True
|
||||
|
||||
if now.hour == 20 and has_taken_hot_spring:
|
||||
has_taken_hot_spring = False
|
||||
|
||||
if now.hour == 21 and not has_taken_hot_spring:
|
||||
try:
|
||||
with open ('./assets/hot-spring.jpg', 'rb') as f:
|
||||
image = models.AppBskyEmbedImages.Image (
|
||||
alt = ('左に喜多ちゃん、右にわさび県産滋賀県が'
|
||||
'V字に並んでいる。'
|
||||
'喜多ちゃんは右手でピースサインをして'
|
||||
'片目をウインクしている。'
|
||||
'わさび県産滋賀県はただ茫然と'
|
||||
'立ち尽くしている。'
|
||||
'背景には'
|
||||
'血と空の色をした放射線状の模様が広がり、'
|
||||
'下部に「温泉に入ろう!!!」という'
|
||||
'日本語のテキストが表示されている。'),
|
||||
image = client.com.atproto.repo.upload_blob (f).blob)
|
||||
client.post (Talk.main ('温泉に入ろう!!!'),
|
||||
embed = models.app.bsky.embed.images.Main (
|
||||
images = [image]))
|
||||
last_posted_at = now
|
||||
except Exception:
|
||||
pass
|
||||
has_taken_hot_spring = True
|
||||
|
||||
if now - last_posted_at >= timedelta (hours = 6):
|
||||
client.post (Talk.main ('今どうしてる?'))
|
||||
last_posted_at = now
|
||||
|
||||
time.sleep (60)
|
||||
uri = uri)
|
||||
embed_external = AppBskyEmbedExternal.Main (external = external)
|
||||
try:
|
||||
client.post (answer.content, embed = embed_external)
|
||||
except Exception as e:
|
||||
print (f"[answer/nico-post] { type (e).__name__ }: { e }")
|
||||
continue
|
||||
answered_flag.answered = True
|
||||
answered_flag.save ()
|
||||
case QueryType.SNACK_TIME:
|
||||
try:
|
||||
with open ('./assets/snack-time.jpg', 'rb') as f:
|
||||
image = AppBskyEmbedImages.Image (
|
||||
alt = (
|
||||
'左に喜多ちゃん、右に人面鹿のニジカが'
|
||||
'V字に並んでいる。'
|
||||
'喜多ちゃんは右手でピースサインをして'
|
||||
'片目をウインクしている。'
|
||||
'ニジカは両手を広げ、'
|
||||
'右手にスプーンを持って'
|
||||
'ポーズを取っている。'
|
||||
'背景には'
|
||||
'赤と黄色の放射線状の模様が広がり、'
|
||||
'下部に「おやつタイムだ!!!!」という'
|
||||
'日本語のテキストが表示されている。'),
|
||||
image = client.com.atproto.repo.upload_blob (f).blob)
|
||||
client.post (answer.content,
|
||||
embed = AppBskyEmbedImages.Main (images = [image]))
|
||||
answered_flag.answered = True
|
||||
answered_flag.save ()
|
||||
except Exception:
|
||||
pass
|
||||
case QueryType.HOT_SPRING:
|
||||
try:
|
||||
with open ('./assets/hot-spring.jpg', 'rb') as f:
|
||||
image = AppBskyEmbedImages.Image (
|
||||
alt = ('左に喜多ちゃん、右にわさび県産滋賀県が'
|
||||
'V字に並んでいる。'
|
||||
'喜多ちゃんは右手でピースサインをして'
|
||||
'片目をウインクしている。'
|
||||
'わさび県産滋賀県はただ茫然と'
|
||||
'立ち尽くしている。'
|
||||
'背景には'
|
||||
'血と空の色をした放射線状の模様が広がり、'
|
||||
'下部に「温泉に入ろう!!!」という'
|
||||
'日本語のテキストが表示されている。'),
|
||||
image = client.com.atproto.repo.upload_blob (f).blob)
|
||||
client.post (answer.content,
|
||||
embed = AppBskyEmbedImages.Main (images = [image]))
|
||||
answered_flag.answered = True
|
||||
answered_flag.save ()
|
||||
except Exception:
|
||||
pass
|
||||
await asyncio.sleep (10)
|
||||
|
||||
|
||||
def check_notifications (
|
||||
client: Client,
|
||||
) -> list:
|
||||
(uris, last_seen_at) = ([], client.get_current_time_iso ())
|
||||
) -> list[str]:
|
||||
uris: list[str] = []
|
||||
last_seen_at = client.get_current_time_iso ()
|
||||
|
||||
for notification in (client.app.bsky.notification.list_notifications ()
|
||||
.notifications):
|
||||
notifications = client.app.bsky.notification.list_notifications ()
|
||||
for notification in notifications.notifications:
|
||||
if not notification.is_read:
|
||||
if notification.reason in ['mention', 'reply', 'quote']:
|
||||
uris += [notification.uri]
|
||||
elif notification.reason == 'follow':
|
||||
client.follow (notification.author.did)
|
||||
match notification.reason:
|
||||
case 'mention' | 'reply' | 'quote':
|
||||
uris.append (notification.uri)
|
||||
case 'follow':
|
||||
client.follow (notification.author.did)
|
||||
|
||||
client.app.bsky.notification.update_seen ({ 'seen_at': last_seen_at })
|
||||
|
||||
return uris
|
||||
|
||||
|
||||
def get_thread_contents (
|
||||
client: Client,
|
||||
def fetch_thread_contents (
|
||||
uri: str,
|
||||
parent_height: int,
|
||||
) -> list:
|
||||
response = (client.get_post_thread (uri = uri,
|
||||
parent_height = parent_height)
|
||||
.thread)
|
||||
records = []
|
||||
while response is not None:
|
||||
records += [{ 'strong_ref': models.create_strong_ref (response.post),
|
||||
'did': response.post.author.did,
|
||||
'handle': response.post.author.handle,
|
||||
'name': response.post.author.display_name,
|
||||
'datetime': response.post.record.created_at,
|
||||
'text': response.post.record.text,
|
||||
'embed': response.post.record.embed }]
|
||||
response = response.parent
|
||||
) -> list[Record]:
|
||||
post_thread = client.get_post_thread (uri = uri, parent_height = parent_height)
|
||||
if not post_thread:
|
||||
return []
|
||||
|
||||
res = post_thread.thread
|
||||
|
||||
records: list[Record] = []
|
||||
while res:
|
||||
if hasattr (res, 'post'):
|
||||
records.append ({ 'strong_ref': { 'uri': res.post.uri,
|
||||
'cid': res.post.cid },
|
||||
'did': res.post.author.did,
|
||||
'handle': res.post.author.handle,
|
||||
'name': (res.post.author.display_name
|
||||
or res.post.author.handle),
|
||||
'datetime': res.post.record.created_at,
|
||||
'text': getattr (res.post.record, 'text', None) or '',
|
||||
'embed': getattr (res.post.record, 'embed', None) })
|
||||
res = res.parent
|
||||
else:
|
||||
break
|
||||
|
||||
return records
|
||||
|
||||
|
||||
def get_embed_info (
|
||||
url: str
|
||||
) -> tuple[str, str, str]:
|
||||
title: str = ''
|
||||
description: str = ''
|
||||
thumbnail: str = ''
|
||||
|
||||
try:
|
||||
res = requests.get (url, timeout = 60)
|
||||
except Timeout:
|
||||
return ('', '', '')
|
||||
|
||||
if res.status_code != 200:
|
||||
return ('', '', '')
|
||||
|
||||
soup = BeautifulSoup (res.text, 'html.parser')
|
||||
|
||||
tmp = soup.find ('title')
|
||||
if tmp is not None:
|
||||
title = tmp.text
|
||||
|
||||
tmp = soup.find ('meta', attrs = { 'name': 'description' })
|
||||
if tmp is not None and hasattr (tmp, 'get'):
|
||||
try:
|
||||
description = cast (str, tmp.get ('content'))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
tmp = soup.find ('meta', attrs = { 'name': 'thumbnail' })
|
||||
if tmp is not None and hasattr (tmp, 'get'):
|
||||
try:
|
||||
thumbnail = cast (str, tmp.get ('content'))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return (title, description, thumbnail)
|
||||
|
||||
|
||||
def get_kiriban_dt_to_update (
|
||||
) -> datetime:
|
||||
now = datetime.now ()
|
||||
today = now.date ()
|
||||
dt = datetime.combine (today, dt_time (15, 0))
|
||||
if dt <= now:
|
||||
dt += timedelta (days = 1)
|
||||
return dt
|
||||
|
||||
|
||||
def like_posts (
|
||||
client: Client,
|
||||
) -> None:
|
||||
for post in get_target_posts (client):
|
||||
client.like (**post)
|
||||
|
||||
|
||||
def get_target_posts (
|
||||
client: Client,
|
||||
def fetch_target_posts (
|
||||
) -> list[LikeParams]:
|
||||
posts = []
|
||||
posts: list[LikeParams] = []
|
||||
|
||||
timeline: Response = client.get_timeline ()
|
||||
for feed in timeline.feed:
|
||||
if (feed.post.author.did != client.me.did
|
||||
me = getattr (client, 'me', None)
|
||||
my_did = me.did if me else ''
|
||||
if (feed.post.author.did != my_did
|
||||
and (feed.post.viewer.like is None)
|
||||
and any (target_word in feed.post.record.text.lower () for target_word in TARGET_WORDS)):
|
||||
posts.append (LikeParams({ 'uri': feed.post.uri, 'cid': feed.post.cid }))
|
||||
and any (target_word in (feed.post.record.text or '').casefold ()
|
||||
for target_word in TARGET_WORDS)):
|
||||
posts.append (LikeParams ({ 'uri': feed.post.uri, 'cid': feed.post.cid }))
|
||||
|
||||
return posts
|
||||
|
||||
|
||||
def _add_query (
|
||||
user: User,
|
||||
content: str,
|
||||
image_url: str | None = None,
|
||||
transfer_data: dict[str, Any] | None = None,
|
||||
) -> None:
|
||||
query = Query ()
|
||||
query.user_id = user.id
|
||||
query.target_character = Character.DEERJIKA.value
|
||||
query.content = content
|
||||
query.image_url = image_url or None
|
||||
query.query_type = QueryType.BLUESKY_COMMENT.value
|
||||
query.model = GPTModel.GPT3_TURBO.value
|
||||
query.sent_at = datetime.now ()
|
||||
query.answered = False
|
||||
query.transfer_data = transfer_data
|
||||
query.save ()
|
||||
# TODO: 履歴情報の追加
|
||||
|
||||
|
||||
def _fetch_user (
|
||||
did: str,
|
||||
name: str,
|
||||
) -> User:
|
||||
user = User.where ('platform', Platform.BLUESKY.value).where ('code', did).first ()
|
||||
if user is None:
|
||||
user = User ()
|
||||
user.platform = Platform.BLUESKY.value
|
||||
user.code = did
|
||||
user.name = name
|
||||
user.save ()
|
||||
return user
|
||||
|
||||
|
||||
class LikeParams (TypedDict):
|
||||
uri: str
|
||||
cid: str
|
||||
|
||||
|
||||
class Record (TypedDict):
|
||||
strong_ref: StrongRef
|
||||
did: str
|
||||
handle: str
|
||||
name: str
|
||||
datetime: str
|
||||
text: str
|
||||
embed: object
|
||||
|
||||
|
||||
class StrongRef (TypedDict):
|
||||
uri: str
|
||||
cid: str
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main (*sys.argv[1:])
|
||||
asyncio.run (main ())
|
||||
|
||||
@@ -1,174 +0,0 @@
|
||||
"""
|
||||
ニコニコのニジカ動画取得モヂュール
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from datetime import date, datetime, timedelta
|
||||
from typing import TypedDict, cast
|
||||
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
from requests.exceptions import Timeout
|
||||
from eloquent import DatabaseManager, Model
|
||||
|
||||
from db.models import Comment, Tag, Video, VideoHistory, VideoTag
|
||||
|
||||
CONFIG: dict[str, DbConfig] = { 'mysql': { 'driver': 'mysql',
|
||||
'host': 'localhost',
|
||||
'database': 'nizika_nico',
|
||||
'user': os.environ['MYSQL_USER'],
|
||||
'password': os.environ['MYSQL_PASS'],
|
||||
'prefix': '' } }
|
||||
|
||||
DB = DatabaseManager (CONFIG)
|
||||
Model.set_connection_resolver (DB)
|
||||
|
||||
KIRIBAN_VIEWS_COUNTS: list[int] = sorted ({ *range (1_000, 10_000, 1_000),
|
||||
*range (10_000, 1_000_001, 10_000),
|
||||
194, 245, 510, 810, 114_514, 1_940, 2_450, 5_100,
|
||||
19_400, 24_500, 51_000, 93_194, 2_424, 242_424, 1_919,
|
||||
4_545, 194_245, 245_194, 510_245 },
|
||||
reverse = True)
|
||||
|
||||
class VideoInfo (TypedDict):
|
||||
contentId: str
|
||||
title: str
|
||||
tags: list[str]
|
||||
description: str
|
||||
|
||||
|
||||
def get_latest_deerjika (
|
||||
) -> VideoInfo | None:
|
||||
tag = '伊地知ニジカ OR ぼざろクリーチャーシリーズ'
|
||||
url = f"https://www.nicovideo.jp/tag/{ tag }"
|
||||
|
||||
params = { 'sort': 'f',
|
||||
'order': 'd' }
|
||||
|
||||
video_info = { }
|
||||
|
||||
bs = get_bs_from_url (url, params)
|
||||
if bs is None:
|
||||
return None
|
||||
|
||||
try:
|
||||
video = (bs.find_all ('ul', class_ = 'videoListInner')[1]
|
||||
.find ('li', class_ = 'item'))
|
||||
|
||||
video_info['contentId'] = video['data-video-id']
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
return get_video_info (video_info['contentId'])
|
||||
|
||||
|
||||
def get_bs_from_url (
|
||||
url: str,
|
||||
params: dict = { },
|
||||
) -> BeautifulSoup | None:
|
||||
"""
|
||||
URL から BeautifulSoup インスタンス生成
|
||||
|
||||
Parameters
|
||||
----------
|
||||
url: str
|
||||
捜査する URL
|
||||
params: dict
|
||||
パラメータ
|
||||
|
||||
Return
|
||||
------
|
||||
BeautifulSoup | None
|
||||
BeautifulSoup オブゼクト(失敗したら None)
|
||||
"""
|
||||
|
||||
try:
|
||||
req = requests.get (url, params = params, timeout = 60)
|
||||
except Timeout:
|
||||
return None
|
||||
|
||||
if req.status_code != 200:
|
||||
return None
|
||||
|
||||
req.encoding = req.apparent_encoding
|
||||
|
||||
return BeautifulSoup (req.text, 'html.parser')
|
||||
|
||||
|
||||
def get_video_info (
|
||||
video_code: str,
|
||||
) -> VideoInfo | None:
|
||||
video_info: dict[str, str | list[str]] = { 'contentId': video_code }
|
||||
|
||||
bs = get_bs_from_url (f"https://www.nicovideo.jp/watch/{ video_code }")
|
||||
if bs is None:
|
||||
return None
|
||||
|
||||
try:
|
||||
title = bs.find ('title')
|
||||
if title is None:
|
||||
return None
|
||||
video_info['title'] = '-'.join (title.text.split ('-')[:(-1)])[:(-1)]
|
||||
|
||||
tags: str = bs.find ('meta', attrs = { 'name': 'keywords' }).get ('content') # type: ignore
|
||||
video_info['tags'] = tags.split (',')
|
||||
|
||||
video_info['description'] = bs.find ('meta', attrs = { 'name': 'description' }).get ('content') # type: ignore
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
return cast (VideoInfo, video_info)
|
||||
|
||||
|
||||
def get_kiriban_list (
|
||||
base_date: date,
|
||||
) -> list[tuple[int, VideoInfo, datetime]]:
|
||||
kiriban_list: list[tuple[int, VideoInfo, datetime]] = []
|
||||
|
||||
latest_fetched_at = cast (date, (VideoHistory
|
||||
.where ('fetched_at', '<=', base_date)
|
||||
.max ('fetched_at')))
|
||||
|
||||
for kiriban_views_count in KIRIBAN_VIEWS_COUNTS:
|
||||
targets = { vh.video.code for vh in (VideoHistory
|
||||
.where ('fetched_at', latest_fetched_at)
|
||||
.where ('views_count', '>=', kiriban_views_count)
|
||||
.get ()) }
|
||||
for code in targets:
|
||||
if code in [kiriban[1]['contentId'] for kiriban in kiriban_list]:
|
||||
continue
|
||||
previous_views_count: int | None = (
|
||||
VideoHistory
|
||||
.where_has ('video', lambda q: q.where ('code', code))
|
||||
.where ('fetched_at', '<', latest_fetched_at)
|
||||
.max ('views_count'))
|
||||
if previous_views_count is None:
|
||||
previous_views_count = 0
|
||||
if previous_views_count >= kiriban_views_count:
|
||||
continue
|
||||
video_info = get_video_info (code)
|
||||
if video_info is not None:
|
||||
kiriban_list.append ((kiriban_views_count, video_info,
|
||||
cast (Video, Video.where ('code', code).first ()).uploaded_at))
|
||||
|
||||
return kiriban_list
|
||||
|
||||
|
||||
def get_comments (
|
||||
video_code: str,
|
||||
) -> list[Comment]:
|
||||
video = Video.where ('code', video_code).first ()
|
||||
if video is None:
|
||||
return []
|
||||
return video.comments
|
||||
|
||||
|
||||
class DbConfig (TypedDict):
|
||||
driver: str
|
||||
host: str
|
||||
database: str
|
||||
user: str
|
||||
password: str
|
||||
prefix: str
|
||||
サブモジュール
+1
サブモジュール nicolib が 32ecf2d00f で追加されました
サブモジュール
+1
サブモジュール nizika_ai が 3be6d9063c で追加されました
新しい課題から参照
ユーザをブロックする