統合 AI への移行 #16

マージ済み
みてるぞ が 6 個のコミットを ai-migration から main へマージ 2025-10-22 21:50:02 +09:00
9個のファイルの変更255行の追加530行の削除
+6 -3
ファイルの表示
@@ -1,6 +1,9 @@
[submodule "nizika_nico"] [submodule "nizika_nico"]
path = nizika_nico path = nizika_nico
url = https://git.miteruzo.com/miteruzo/nizika_nico url = https://git.miteruzo.com/miteruzo/nizika_nico
[submodule "ai"] [submodule "nizika_ai"]
path = ai path = nizika_ai
url = https://git.miteruzo.com/miteruzo/nizika_broadcast url = https://git.miteruzo.com/miteruzo/nizika_ai.git
[submodule "nicolib"]
path = nicolib
url = https://git.miteruzo.com/miteruzo/nicolib.git
-1
サブモジュール ai299a3acdff から削除されました
-58
ファイルの表示
@@ -1,58 +0,0 @@
from datetime import datetime
from atproto.models.AppBskyFeedDefs import BlockedPost, NotFoundPost
from atproto_client.models.app.bsky.feed import get_timeline
class Client:
app: AppNamespace
def get_current_time_iso (self) -> datetime: ...
def get_post_thread (
self,
uri: str,
parent_height: int | None = None
) -> Response: ...
def get_timeline (self) -> get_timeline.Response: ...
def follow (self, did: str) -> None: ...
def like (self, uri: str, cid: str) -> None: ...
class AppNamespace:
bsky: AppBskyNamespace
class AppBskyNamespace:
notification: AppBskyNotificationNamespace
class AppBskyNotificationNamespace:
def list_notifications (self) -> Response: ...
def update_seen (self, seen: dict[str, datetime]) -> None: ...
class Response:
notifications: list[Notification]
thread: (ThreadViewPost
| NotFoundPost
| BlockedPost)
class ThreadViewPost:
pass
class Notification:
is_read: bool
reason: str
uri: str
author: ProfileView
class ProfileView:
did: str
-5
ファイルの表示
@@ -1,5 +0,0 @@
class NotFoundPost:
pass
class BlockedPost:
pass
-5
ファイルの表示
@@ -1,5 +0,0 @@
from atproto.models.AppBskyFeedDefs import FeedViewPost
class Response:
feed: list[FeedViewPost]
+247 -284
ファイルの表示
@@ -1,28 +1,24 @@
"""
Bluesky のニジカがいろいろする.
(近々機能ごとにファイル分けて systemd でイベント管理する予定)
"""
from __future__ import annotations from __future__ import annotations
import io import asyncio
import random import os
import sys
import time import time
from datetime import date, datetime from datetime import datetime
from datetime import time as dt_time from io import BytesIO
from datetime import timedelta from typing import Any, TypedDict
from typing import TypedDict, cast
import atproto # type: ignore
import requests import requests
from atproto import Client, models from atproto import Client # type: ignore
from atproto_client.models.app.bsky.feed.get_timeline import Response from atproto.models import AppBskyEmbedExternal, AppBskyEmbedImages # type: ignore
from bs4 import BeautifulSoup from atproto.models.AppBskyFeedPost import ReplyRef # type: ignore
from atproto_client.models.app.bsky.feed.get_timeline import Response # type: ignore
from requests.exceptions import Timeout from requests.exceptions import Timeout
import account import account
import nico import nicolib
from ai.talk import Talk from nizika_ai.consts import Character, GPTModel, Platform, QueryType
from nizika_ai.models import Answer, AnsweredFlag, Query, User
TARGET_WORDS = ['deerjika', 'ニジカ', 'ぼっち', '虹夏', '郁代', 'バーカ', TARGET_WORDS = ['deerjika', 'ニジカ', 'ぼっち', '虹夏', '郁代', 'バーカ',
'kfif', 'kita-flatten-ikuyo-flatten', 'ラマ田', 'ゴートう', 'kfif', 'kita-flatten-ikuyo-flatten', 'ラマ田', 'ゴートう',
@@ -33,317 +29,284 @@ TARGET_WORDS = ['deerjika', 'ニジカ', 'ぼっち', '虹夏', '郁代', 'バ
'ルイズマリー', '', 'ニジゴ', 'ゴニジ', 'ニジニジ', 'ルイズマリー', '', 'ニジゴ', 'ゴニジ', 'ニジニジ',
'新年だよね', 'うんこじゃん', 'ほくほくのジャガイモ'] '新年だよね', 'うんこじゃん', 'ほくほくのジャガイモ']
time.sleep (60)
def main ( client = Client (base_url = 'https://bsky.social')
client.login (account.USER_ID, account.PASSWORD)
async def main (
) -> None: ) -> None:
time.sleep (60) """
メーン処理
"""
client = Client (base_url = 'https://bsky.social') await asyncio.gather (like_posts (),
check_mentions (),
answer ())
client.login (account.USER_ID, account.PASSWORD)
got_kiriban_at: date = datetime.now ().date () - timedelta (days = datetime.now ().hour < 15) async def like_posts (
kiriban_list: list[tuple[int, nico.VideoInfo, datetime]] = ( ) -> None:
nico.get_kiriban_list (got_kiriban_at))
kiriban_interval: timedelta = ((get_kiriban_dt_to_update () - datetime.now ())
/ len (kiriban_list))
next_kiriban_at = datetime.now ()
last_posted_at = datetime.now () - timedelta (hours = 6)
has_got_snack_time = False
has_taken_hot_spring = False
watched_videos = []
while True: while True:
now = datetime.now () try:
for post in fetch_target_posts ():
client.like (**post)
except Exception as e:
print (f"[like_posts] { type (e).__name__ }: { e }")
for uri in check_notifications (client): await asyncio.sleep (60)
records = get_thread_contents (client, uri, 20)
if len (records) > 0:
answer = Talk.main ((records[0]['text']
if (records[0]['embed'] is None
or not hasattr (records[0]['embed'],
'images'))
else [
{ 'type': 'text', 'text': records[0]['text'] },
{ 'type': 'image_url', 'image_url': {
'url': f"https://cdn.bsky.app/img/feed_fullsize/plain/{ records[0]['did'] }/{ records[0]['embed'].images[0].image.ref.link }" } }]),
records[0]['name'],
[*map (lambda record: {
'role': ('assistant'
if (record['handle']
== account.USER_ID)
else 'user'),
'content':
record['text']},
reversed (records[1:]))])
client.post (answer,
reply_to = models.AppBskyFeedPost.ReplyRef (
parent = records[0]['strong_ref'],
root = records[-1]['strong_ref']))
like_posts (client)
if kiriban_list and datetime.now () >= next_kiriban_at: async def check_mentions (
(views_count, video_info, uploaded_at) = ( ) -> None:
kiriban_list.pop (random.randint (0, len (kiriban_list) - 1))) while True:
since_posted = datetime.now () - uploaded_at try:
uri = f"https://www.nicovideo.jp/watch/{ video_info['contentId'] }" for uri in check_notifications ():
(title, description, thumbnail) = get_embed_info (uri) records = fetch_thread_contents (uri, 20)
try: if records:
upload = client.com.atproto.repo.upload_blob ( record = records[0]
io.BytesIO (requests.get (thumbnail, image_url: str | None = None
timeout = 60).content)) if record['embed'] and hasattr (record['embed'], 'images'):
thumb = upload.blob image_url = ('https://cdn.bsky.app/img/feed_fullsize/plain'
except Timeout: f"/{ record['did'] }"
thumb = None f"/{ record['embed'].images[0].image.ref.link }")
comments = nico.get_comments (video_info['contentId']) user = _fetch_user (record['did'], record['name'])
popular_comments = sorted (comments, _add_query (user, record['text'], image_url, {
key = lambda c: c.nico_count, 'uri': record['strong_ref']['uri'],
reverse = True)[:10] 'cid': record['strong_ref']['cid'] })
latest_comments = sorted (comments, except Exception as e:
key = lambda c: c.posted_at, print (f"[check_mentions] { type (e).__name__ }: { e }")
reverse = True)[:10]
embed_external = models.AppBskyEmbedExternal.Main (
external = models.AppBskyEmbedExternal.External (
title = title,
description = description,
thumb = thumb,
uri = uri))
prompt = f"{ since_posted.days }日と{ since_posted.seconds }秒前にニコニコに投稿された『{ video_info['title'] }』という動画が{ views_count }再生を突破しました。\n"
prompt += f"コメント数は{ len (comments) }件です。\n"
if video_info['tags']:
prompt += f"つけられたタグは「{ '」、「'.join (video_info['tags']) }」です。\n"
if comments:
prompt += f"人気のコメントは次の通りです:「{ '」、「'.join (c.content for c in popular_comments) }\n"
prompt += f"最新のコメントは次の通りです:「{ '」、「'.join (c.content for c in latest_comments) }\n"
prompt += f"""
概要には次のように書かれています:
```html
{ video_info['description'] }
```
このことについて、ニジカちゃんからのお祝いメッセージを下さい。
ただし、そのメッセージ内には再生数の数値とその多さに応じたリアクションを添えてください。
また、ぜひ投稿からこの再生数に至るまでにかかった時間や、つけられたタグ、コメントに対して思いを馳せてください。
好きなコメントがあったら教えてね。"""
client.post (Talk.main (prompt), embed = embed_external)
next_kiriban_at += kiriban_interval
last_posted_at = now
latest_deerjika = nico.get_latest_deerjika () await asyncio.sleep (60)
if latest_deerjika is not None:
for datum in [e for e in [latest_deerjika]
if e['contentId'] not in watched_videos]:
watched_videos += [datum['contentId']]
uri = f"https://www.nicovideo.jp/watch/{ datum['contentId'] }"
(title, description, thumbnail) = get_embed_info (uri)
try:
upload = client.com.atproto.repo.upload_blob (
io.BytesIO (requests.get (thumbnail,
timeout = 60).content))
thumb = upload.blob
except Timeout:
thumb = None
embed_external = models.AppBskyEmbedExternal.Main ( async def answer (
external = models.AppBskyEmbedExternal.External ( ) -> None:
while True:
answered_flags = (
AnsweredFlag
.where ('platform', Platform.BLUESKY.value)
.where ('answered', False)
.get ())
for answered_flag in answered_flags:
td: dict[str, Any]
answer = answered_flag.answer
match QueryType (answer.query_rel.query_type):
case QueryType.BLUESKY_COMMENT:
td = answer.query_rel.transfer_data or { }
uri: str | None = td.get ('uri')
cid: str | None = td.get ('cid')
if (not uri) or (not cid):
continue
sref = { 'uri': uri, 'cid': cid }
strong_ref = atproto.models.create_strong_ref (sref) # type: ignore
reply_ref = ReplyRef (root = strong_ref, parent = strong_ref)
try:
client.post (answer.content, reply_to = reply_ref)
except Exception as e:
print (f"[answer/reply] { type (e).__name__ }: { e }")
continue
answered_flag.answered = True
answered_flag.save ()
case QueryType.KIRIBAN | QueryType.NICO_REPORT:
td = answer.query_rel.transfer_data or { }
video_code: str | None = td.get ('video_code')
if not video_code:
continue
uri = f"https://www.nicovideo.jp/watch/{ video_code }"
(title, description, thumbnail) = nicolib.fetch_embed_info (uri)
try:
resp = requests.get (thumbnail, timeout = 60)
resp.raise_for_status ()
upload = client.com.atproto.repo.upload_blob (BytesIO (resp.content))
thumb = upload.blob
except Timeout:
thumb = None
except Exception as e:
print (f"[answer/nico-thumb] { type (e).__name__ }: { e }")
thumb = None
external = AppBskyEmbedExternal.External (
title = title, title = title,
description = description, description = description,
thumb = thumb, thumb = thumb,
uri = uri)) uri = uri)
client.post (Talk.main (f""" embed_external = AppBskyEmbedExternal.Main (external = external)
ニコニコに『{ datum['title'] }』という動画がアップされました。 try:
つけられたタグは「{ '」、「'.join (datum['tags']) }」です。 client.post (answer.content, embed = embed_external)
概要には次のように書かれています: except Exception as e:
```html print (f"[answer/nico-post] { type (e).__name__ }: { e }")
{ datum['description'] } continue
``` answered_flag.answered = True
このことについて、みんなに告知するとともに、ニジカちゃんの感想を教えてください。 """), answered_flag.save ()
embed = embed_external) case QueryType.SNACK_TIME:
last_posted_at = now try:
with open ('./assets/snack-time.jpg', 'rb') as f:
if now.hour == 14 and has_got_snack_time: image = AppBskyEmbedImages.Image (
has_got_snack_time = False alt = (
'左に喜多ちゃん、右に人面鹿のニジカが'
if now.hour == 15: 'V字に並んでいる。'
if got_kiriban_at < datetime.now ().date (): '喜多ちゃんは右手でピースサインをして'
kiriban_list = nico.get_kiriban_list (datetime.now ().date ()) '片目をウインクしている。'
got_kiriban_at = datetime.now ().date () 'ニジカは両手を広げ、'
kiriban_interval = ((get_kiriban_dt_to_update () - datetime.now ()) '右手にスプーンを持って'
/ len (kiriban_list)) 'ポーズを取っている。'
next_kiriban_at = datetime.now () '背景には'
'赤と黄色の放射線状の模様が広がり、'
if not has_got_snack_time: '下部に「おやつタイムだ!!!!」という'
try: '日本語のテキストが表示されている。'),
with open ('./assets/snack-time.jpg', 'rb') as f: image = client.com.atproto.repo.upload_blob (f).blob)
image = models.AppBskyEmbedImages.Image ( client.post (answer.content,
alt = ('左に喜多ちゃん、右に人面鹿のニジカが' embed = AppBskyEmbedImages.Main (images = [image]))
'V字に並んでいる。' answered_flag.answered = True
'喜多ちゃんは右手でピースサインをして' answered_flag.save ()
'片目をウインクしている。' except Exception:
'ニジカは両手を広げ、' pass
'右手にスプーンを持って' case QueryType.HOT_SPRING:
'ポーズを取っている。' try:
'背景には' with open ('./assets/hot-spring.jpg', 'rb') as f:
'赤と黄色の放射線状の模様が広がり、' image = AppBskyEmbedImages.Image (
'下部に「おやつタイムだ!!!!」という' alt = ('左に喜多ちゃん、右にわさび県産滋賀県が'
'日本語のテキストが表示されている。'), 'V字に並んでいる。'
image = client.com.atproto.repo.upload_blob (f).blob) '喜多ちゃんは右手でピースサインをして'
client.post (Talk.main ('おやつタイムだ!!!!'), '片目をウインクしている。'
embed = models.app.bsky.embed.images.Main ( 'わさび県産滋賀県はただ茫然と'
images = [image])) '立ち尽くしている。'
last_posted_at = now '背景には'
except Exception: '血と空の色をした放射線状の模様が広がり、'
pass '下部に「温泉に入ろう!!!」という'
has_got_snack_time = True '日本語のテキストが表示されている。'),
image = client.com.atproto.repo.upload_blob (f).blob)
if now.hour == 20 and has_taken_hot_spring: client.post (answer.content,
has_taken_hot_spring = False embed = AppBskyEmbedImages.Main (images = [image]))
answered_flag.answered = True
if now.hour == 21 and not has_taken_hot_spring: answered_flag.save ()
try: except Exception:
with open ('./assets/hot-spring.jpg', 'rb') as f: pass
image = models.AppBskyEmbedImages.Image ( await asyncio.sleep (10)
alt = ('左に喜多ちゃん、右にわさび県産滋賀県が'
'V字に並んでいる。'
'喜多ちゃんは右手でピースサインをして'
'片目をウインクしている。'
'わさび県産滋賀県はただ茫然と'
'立ち尽くしている。'
'背景には'
'血と空の色をした放射線状の模様が広がり、'
'下部に「温泉に入ろう!!!」という'
'日本語のテキストが表示されている。'),
image = client.com.atproto.repo.upload_blob (f).blob)
client.post (Talk.main ('温泉に入ろう!!!'),
embed = models.app.bsky.embed.images.Main (
images = [image]))
last_posted_at = now
except Exception:
pass
has_taken_hot_spring = True
if now - last_posted_at >= timedelta (hours = 6):
client.post (Talk.main ('今どうしてる?'))
last_posted_at = now
time.sleep (60)
def check_notifications ( def check_notifications (
client: Client, ) -> list[str]:
) -> list: uris: list[str] = []
(uris, last_seen_at) = ([], client.get_current_time_iso ()) last_seen_at = client.get_current_time_iso ()
for notification in (client.app.bsky.notification.list_notifications () notifications = client.app.bsky.notification.list_notifications ()
.notifications): for notification in notifications.notifications:
if not notification.is_read: if not notification.is_read:
if notification.reason in ['mention', 'reply', 'quote']: match notification.reason:
uris += [notification.uri] case 'mention' | 'reply' | 'quote':
elif notification.reason == 'follow': uris.append (notification.uri)
client.follow (notification.author.did) case 'follow':
client.follow (notification.author.did)
client.app.bsky.notification.update_seen ({ 'seen_at': last_seen_at }) client.app.bsky.notification.update_seen ({ 'seen_at': last_seen_at })
return uris return uris
def get_thread_contents ( def fetch_thread_contents (
client: Client,
uri: str, uri: str,
parent_height: int, parent_height: int,
) -> list: ) -> list[Record]:
response = (client.get_post_thread (uri = uri, post_thread = client.get_post_thread (uri = uri, parent_height = parent_height)
parent_height = parent_height) if not post_thread:
.thread) return []
records = []
while response is not None: res = post_thread.thread
records += [{ 'strong_ref': models.create_strong_ref (response.post),
'did': response.post.author.did, records: list[Record] = []
'handle': response.post.author.handle, while res:
'name': response.post.author.display_name, if hasattr (res, 'post'):
'datetime': response.post.record.created_at, records.append ({ 'strong_ref': { 'uri': res.post.uri,
'text': response.post.record.text, 'cid': res.post.cid },
'embed': response.post.record.embed }] 'did': res.post.author.did,
response = response.parent 'handle': res.post.author.handle,
'name': (res.post.author.display_name
or res.post.author.handle),
'datetime': res.post.record.created_at,
'text': getattr (res.post.record, 'text', None) or '',
'embed': getattr (res.post.record, 'embed', None) })
res = res.parent
else:
break
return records return records
def get_embed_info ( def fetch_target_posts (
url: str
) -> tuple[str, str, str]:
title: str = ''
description: str = ''
thumbnail: str = ''
try:
res = requests.get (url, timeout = 60)
except Timeout:
return ('', '', '')
if res.status_code != 200:
return ('', '', '')
soup = BeautifulSoup (res.text, 'html.parser')
tmp = soup.find ('title')
if tmp is not None:
title = tmp.text
tmp = soup.find ('meta', attrs = { 'name': 'description' })
if tmp is not None and hasattr (tmp, 'get'):
try:
description = cast (str, tmp.get ('content'))
except Exception:
pass
tmp = soup.find ('meta', attrs = { 'name': 'thumbnail' })
if tmp is not None and hasattr (tmp, 'get'):
try:
thumbnail = cast (str, tmp.get ('content'))
except Exception:
pass
return (title, description, thumbnail)
def get_kiriban_dt_to_update (
) -> datetime:
now = datetime.now ()
today = now.date ()
dt = datetime.combine (today, dt_time (15, 0))
if dt <= now:
dt += timedelta (days = 1)
return dt
def like_posts (
client: Client,
) -> None:
for post in get_target_posts (client):
client.like (**post)
def get_target_posts (
client: Client,
) -> list[LikeParams]: ) -> list[LikeParams]:
posts = [] posts: list[LikeParams] = []
timeline: Response = client.get_timeline () timeline: Response = client.get_timeline ()
for feed in timeline.feed: for feed in timeline.feed:
if (feed.post.author.did != client.me.did me = getattr (client, 'me', None)
my_did = me.did if me else ''
if (feed.post.author.did != my_did
and (feed.post.viewer.like is None) and (feed.post.viewer.like is None)
and any (target_word in feed.post.record.text.lower () for target_word in TARGET_WORDS)): and any (target_word in (feed.post.record.text or '').casefold ()
posts.append (LikeParams({ 'uri': feed.post.uri, 'cid': feed.post.cid })) for target_word in TARGET_WORDS)):
posts.append (LikeParams ({ 'uri': feed.post.uri, 'cid': feed.post.cid }))
return posts return posts
def _add_query (
user: User,
content: str,
image_url: str | None = None,
transfer_data: dict[str, Any] | None = None,
) -> None:
query = Query ()
query.user_id = user.id
query.target_character = Character.DEERJIKA.value
query.content = content
query.image_url = image_url or None
query.query_type = QueryType.BLUESKY_COMMENT.value
query.model = GPTModel.GPT3_TURBO.value
query.sent_at = datetime.now ()
query.answered = False
query.transfer_data = transfer_data
query.save ()
# TODO: 履歴情報の追加
def _fetch_user (
did: str,
name: str,
) -> User:
user = User.where ('platform', Platform.BLUESKY.value).where ('code', did).first ()
if user is None:
user = User ()
user.platform = Platform.BLUESKY.value
user.code = did
user.name = name
user.save ()
return user
class LikeParams (TypedDict): class LikeParams (TypedDict):
uri: str uri: str
cid: str cid: str
class Record (TypedDict):
strong_ref: StrongRef
did: str
handle: str
name: str
datetime: str
text: str
embed: object
class StrongRef (TypedDict):
uri: str
cid: str
if __name__ == '__main__': if __name__ == '__main__':
main (*sys.argv[1:]) asyncio.run (main ())
-174
ファイルの表示
@@ -1,174 +0,0 @@
"""
ニコニコのニジカ動画取得モヂュール
"""
from __future__ import annotations
import os
from datetime import date, datetime, timedelta
from typing import TypedDict, cast
import requests
from bs4 import BeautifulSoup
from requests.exceptions import Timeout
from eloquent import DatabaseManager, Model
from db.models import Comment, Tag, Video, VideoHistory, VideoTag
CONFIG: dict[str, DbConfig] = { 'mysql': { 'driver': 'mysql',
'host': 'localhost',
'database': 'nizika_nico',
'user': os.environ['MYSQL_USER'],
'password': os.environ['MYSQL_PASS'],
'prefix': '' } }
DB = DatabaseManager (CONFIG)
Model.set_connection_resolver (DB)
KIRIBAN_VIEWS_COUNTS: list[int] = sorted ({ *range (1_000, 10_000, 1_000),
*range (10_000, 1_000_001, 10_000),
194, 245, 510, 810, 114_514, 1_940, 2_450, 5_100,
19_400, 24_500, 51_000, 93_194, 2_424, 242_424, 1_919,
4_545, 194_245, 245_194, 510_245 },
reverse = True)
class VideoInfo (TypedDict):
contentId: str
title: str
tags: list[str]
description: str
def get_latest_deerjika (
) -> VideoInfo | None:
tag = '伊地知ニジカ OR ぼざろクリーチャーシリーズ'
url = f"https://www.nicovideo.jp/tag/{ tag }"
params = { 'sort': 'f',
'order': 'd' }
video_info = { }
bs = get_bs_from_url (url, params)
if bs is None:
return None
try:
video = (bs.find_all ('ul', class_ = 'videoListInner')[1]
.find ('li', class_ = 'item'))
video_info['contentId'] = video['data-video-id']
except Exception:
return None
return get_video_info (video_info['contentId'])
def get_bs_from_url (
url: str,
params: dict = { },
) -> BeautifulSoup | None:
"""
URL から BeautifulSoup インスタンス生成
Parameters
----------
url: str
捜査する URL
params: dict
パラメータ
Return
------
BeautifulSoup | None
BeautifulSoup オブゼクト(失敗したら None)
"""
try:
req = requests.get (url, params = params, timeout = 60)
except Timeout:
return None
if req.status_code != 200:
return None
req.encoding = req.apparent_encoding
return BeautifulSoup (req.text, 'html.parser')
def get_video_info (
video_code: str,
) -> VideoInfo | None:
video_info: dict[str, str | list[str]] = { 'contentId': video_code }
bs = get_bs_from_url (f"https://www.nicovideo.jp/watch/{ video_code }")
if bs is None:
return None
try:
title = bs.find ('title')
if title is None:
return None
video_info['title'] = '-'.join (title.text.split ('-')[:(-1)])[:(-1)]
tags: str = bs.find ('meta', attrs = { 'name': 'keywords' }).get ('content') # type: ignore
video_info['tags'] = tags.split (',')
video_info['description'] = bs.find ('meta', attrs = { 'name': 'description' }).get ('content') # type: ignore
except Exception:
return None
return cast (VideoInfo, video_info)
def get_kiriban_list (
base_date: date,
) -> list[tuple[int, VideoInfo, datetime]]:
kiriban_list: list[tuple[int, VideoInfo, datetime]] = []
latest_fetched_at = cast (date, (VideoHistory
.where ('fetched_at', '<=', base_date)
.max ('fetched_at')))
for kiriban_views_count in KIRIBAN_VIEWS_COUNTS:
targets = { vh.video.code for vh in (VideoHistory
.where ('fetched_at', latest_fetched_at)
.where ('views_count', '>=', kiriban_views_count)
.get ()) }
for code in targets:
if code in [kiriban[1]['contentId'] for kiriban in kiriban_list]:
continue
previous_views_count: int | None = (
VideoHistory
.where_has ('video', lambda q: q.where ('code', code))
.where ('fetched_at', '<', latest_fetched_at)
.max ('views_count'))
if previous_views_count is None:
previous_views_count = 0
if previous_views_count >= kiriban_views_count:
continue
video_info = get_video_info (code)
if video_info is not None:
kiriban_list.append ((kiriban_views_count, video_info,
cast (Video, Video.where ('code', code).first ()).uploaded_at))
return kiriban_list
def get_comments (
video_code: str,
) -> list[Comment]:
video = Video.where ('code', video_code).first ()
if video is None:
return []
return video.comments
class DbConfig (TypedDict):
driver: str
host: str
database: str
user: str
password: str
prefix: str
サブモジュール
+1
サブモジュール nicolib32ecf2d00f で追加されました
サブモジュール
+1
サブモジュール nizika_ai3be6d9063c で追加されました