Browse Source

Merge pull request '統合 AI への移行' (#16) from ai-migration into main

Reviewed-on: #16
main
みてるぞ 1 week ago
parent
commit
a103093e9c
9 changed files with 259 additions and 534 deletions
  1. +6
    -3
      .gitmodules
  2. +0
    -1
      ai
  3. +0
    -58
      atproto.pyi
  4. +0
    -5
      atproto/models/AppBskyFeedDefs.pyi
  5. +0
    -5
      atproto_client/models/app/bsky/feed/get_timeline.pyi
  6. +251
    -288
      main.py
  7. +0
    -174
      nico.py
  8. +1
    -0
      nicolib
  9. +1
    -0
      nizika_ai

+ 6
- 3
.gitmodules View File

@@ -1,6 +1,9 @@
[submodule "nizika_nico"]
path = nizika_nico
url = https://git.miteruzo.com/miteruzo/nizika_nico
[submodule "ai"]
path = ai
url = https://git.miteruzo.com/miteruzo/nizika_broadcast
[submodule "nizika_ai"]
path = nizika_ai
url = https://git.miteruzo.com/miteruzo/nizika_ai.git
[submodule "nicolib"]
path = nicolib
url = https://git.miteruzo.com/miteruzo/nicolib.git

+ 0
- 1
ai

@@ -1 +0,0 @@
Subproject commit 299a3acdff6312f4ea7188f3606bf66e751669c1

+ 0
- 58
atproto.pyi View File

@@ -1,58 +0,0 @@
from datetime import datetime

from atproto.models.AppBskyFeedDefs import BlockedPost, NotFoundPost
from atproto_client.models.app.bsky.feed import get_timeline


class Client:
app: AppNamespace

def get_current_time_iso (self) -> datetime: ...

def get_post_thread (
self,
uri: str,
parent_height: int | None = None
) -> Response: ...

def get_timeline (self) -> get_timeline.Response: ...

def follow (self, did: str) -> None: ...

def like (self, uri: str, cid: str) -> None: ...


class AppNamespace:
bsky: AppBskyNamespace


class AppBskyNamespace:
notification: AppBskyNotificationNamespace


class AppBskyNotificationNamespace:
def list_notifications (self) -> Response: ...

def update_seen (self, seen: dict[str, datetime]) -> None: ...


class Response:
notifications: list[Notification]
thread: (ThreadViewPost
| NotFoundPost
| BlockedPost)


class ThreadViewPost:
pass


class Notification:
is_read: bool
reason: str
uri: str
author: ProfileView


class ProfileView:
did: str

+ 0
- 5
atproto/models/AppBskyFeedDefs.pyi View File

@@ -1,5 +0,0 @@
class NotFoundPost:
pass

class BlockedPost:
pass

+ 0
- 5
atproto_client/models/app/bsky/feed/get_timeline.pyi View File

@@ -1,5 +0,0 @@
from atproto.models.AppBskyFeedDefs import FeedViewPost


class Response:
feed: list[FeedViewPost]

+ 251
- 288
main.py View File

@@ -1,28 +1,24 @@
"""
Bluesky のニジカがいろいろする.
(近々機能ごとにファイル分けて systemd でイベント管理する予定)
"""

from __future__ import annotations

import io
import random
import sys
import asyncio
import os
import time
from datetime import date, datetime
from datetime import time as dt_time
from datetime import timedelta
from typing import TypedDict, cast
from datetime import datetime
from io import BytesIO
from typing import Any, TypedDict

import atproto # type: ignore
import requests
from atproto import Client, models
from atproto_client.models.app.bsky.feed.get_timeline import Response
from bs4 import BeautifulSoup
from atproto import Client # type: ignore
from atproto.models import AppBskyEmbedExternal, AppBskyEmbedImages # type: ignore
from atproto.models.AppBskyFeedPost import ReplyRef # type: ignore
from atproto_client.models.app.bsky.feed.get_timeline import Response # type: ignore
from requests.exceptions import Timeout

import account
import nico
from ai.talk import Talk
import nicolib
from nizika_ai.consts import Character, GPTModel, Platform, QueryType
from nizika_ai.models import Answer, AnsweredFlag, Query, User

TARGET_WORDS = ['deerjika', 'ニジカ', 'ぼっち', '虹夏', '郁代', 'バーカ',
'kfif', 'kita-flatten-ikuyo-flatten', 'ラマ田', 'ゴートう',
@@ -33,317 +29,284 @@ TARGET_WORDS = ['deerjika', 'ニジカ', 'ぼっち', '虹夏', '郁代', 'バ
'ルイズマリー', '餅', 'ニジゴ', 'ゴニジ', 'ニジニジ',
'新年だよね', 'うんこじゃん', 'ほくほくのジャガイモ']

time.sleep (60)

client = Client (base_url = 'https://bsky.social')
client.login (account.USER_ID, account.PASSWORD)


def main (
async def main (
) -> None:
time.sleep (60)
"""
メーン処理
"""

client = Client (base_url = 'https://bsky.social')
await asyncio.gather (like_posts (),
check_mentions (),
answer ())

client.login (account.USER_ID, account.PASSWORD)

got_kiriban_at: date = datetime.now ().date () - timedelta (days = datetime.now ().hour < 15)
kiriban_list: list[tuple[int, nico.VideoInfo, datetime]] = (
nico.get_kiriban_list (got_kiriban_at))
kiriban_interval: timedelta = ((get_kiriban_dt_to_update () - datetime.now ())
/ len (kiriban_list))
next_kiriban_at = datetime.now ()
async def like_posts (
) -> None:
while True:
try:
for post in fetch_target_posts ():
client.like (**post)
except Exception as e:
print (f"[like_posts] { type (e).__name__ }: { e }")

await asyncio.sleep (60)

last_posted_at = datetime.now () - timedelta (hours = 6)
has_got_snack_time = False
has_taken_hot_spring = False
watched_videos = []

async def check_mentions (
) -> None:
while True:
now = datetime.now ()

for uri in check_notifications (client):
records = get_thread_contents (client, uri, 20)
if len (records) > 0:
answer = Talk.main ((records[0]['text']
if (records[0]['embed'] is None
or not hasattr (records[0]['embed'],
'images'))
else [
{ 'type': 'text', 'text': records[0]['text'] },
{ 'type': 'image_url', 'image_url': {
'url': f"https://cdn.bsky.app/img/feed_fullsize/plain/{ records[0]['did'] }/{ records[0]['embed'].images[0].image.ref.link }" } }]),
records[0]['name'],
[*map (lambda record: {
'role': ('assistant'
if (record['handle']
== account.USER_ID)
else 'user'),
'content':
record['text']},
reversed (records[1:]))])
client.post (answer,
reply_to = models.AppBskyFeedPost.ReplyRef (
parent = records[0]['strong_ref'],
root = records[-1]['strong_ref']))

like_posts (client)

if kiriban_list and datetime.now () >= next_kiriban_at:
(views_count, video_info, uploaded_at) = (
kiriban_list.pop (random.randint (0, len (kiriban_list) - 1)))
since_posted = datetime.now () - uploaded_at
uri = f"https://www.nicovideo.jp/watch/{ video_info['contentId'] }"
(title, description, thumbnail) = get_embed_info (uri)
try:
upload = client.com.atproto.repo.upload_blob (
io.BytesIO (requests.get (thumbnail,
timeout = 60).content))
thumb = upload.blob
except Timeout:
thumb = None
comments = nico.get_comments (video_info['contentId'])
popular_comments = sorted (comments,
key = lambda c: c.nico_count,
reverse = True)[:10]
latest_comments = sorted (comments,
key = lambda c: c.posted_at,
reverse = True)[:10]
embed_external = models.AppBskyEmbedExternal.Main (
external = models.AppBskyEmbedExternal.External (
title = title,
description = description,
thumb = thumb,
uri = uri))
prompt = f"{ since_posted.days }日と{ since_posted.seconds }秒前にニコニコに投稿された『{ video_info['title'] }』という動画が{ views_count }再生を突破しました。\n"
prompt += f"コメント数は{ len (comments) }件です。\n"
if video_info['tags']:
prompt += f"つけられたタグは「{ '」、「'.join (video_info['tags']) }」です。\n"
if comments:
prompt += f"人気のコメントは次の通りです:「{ '」、「'.join (c.content for c in popular_comments) }」\n"
prompt += f"最新のコメントは次の通りです:「{ '」、「'.join (c.content for c in latest_comments) }」\n"
prompt += f"""
概要には次のように書かれています:
```html
{ video_info['description'] }
```
このことについて、ニジカちゃんからのお祝いメッセージを下さい。
ただし、そのメッセージ内には再生数の数値とその多さに応じたリアクションを添えてください。
また、ぜひ投稿からこの再生数に至るまでにかかった時間や、つけられたタグ、コメントに対して思いを馳せてください。
好きなコメントがあったら教えてね。"""
client.post (Talk.main (prompt), embed = embed_external)
next_kiriban_at += kiriban_interval
last_posted_at = now

latest_deerjika = nico.get_latest_deerjika ()
if latest_deerjika is not None:
for datum in [e for e in [latest_deerjika]
if e['contentId'] not in watched_videos]:
watched_videos += [datum['contentId']]

uri = f"https://www.nicovideo.jp/watch/{ datum['contentId'] }"
(title, description, thumbnail) = get_embed_info (uri)
try:
upload = client.com.atproto.repo.upload_blob (
io.BytesIO (requests.get (thumbnail,
timeout = 60).content))
thumb = upload.blob
except Timeout:
thumb = None

embed_external = models.AppBskyEmbedExternal.Main (
external = models.AppBskyEmbedExternal.External (
try:
for uri in check_notifications ():
records = fetch_thread_contents (uri, 20)
if records:
record = records[0]
image_url: str | None = None
if record['embed'] and hasattr (record['embed'], 'images'):
image_url = ('https://cdn.bsky.app/img/feed_fullsize/plain'
f"/{ record['did'] }"
f"/{ record['embed'].images[0].image.ref.link }")
user = _fetch_user (record['did'], record['name'])
_add_query (user, record['text'], image_url, {
'uri': record['strong_ref']['uri'],
'cid': record['strong_ref']['cid'] })
except Exception as e:
print (f"[check_mentions] { type (e).__name__ }: { e }")

await asyncio.sleep (60)


async def answer (
) -> None:
while True:
answered_flags = (
AnsweredFlag
.where ('platform', Platform.BLUESKY.value)
.where ('answered', False)
.get ())
for answered_flag in answered_flags:
td: dict[str, Any]
answer = answered_flag.answer
match QueryType (answer.query_rel.query_type):
case QueryType.BLUESKY_COMMENT:
td = answer.query_rel.transfer_data or { }
uri: str | None = td.get ('uri')
cid: str | None = td.get ('cid')
if (not uri) or (not cid):
continue

sref = { 'uri': uri, 'cid': cid }
strong_ref = atproto.models.create_strong_ref (sref) # type: ignore
reply_ref = ReplyRef (root = strong_ref, parent = strong_ref)
try:
client.post (answer.content, reply_to = reply_ref)
except Exception as e:
print (f"[answer/reply] { type (e).__name__ }: { e }")
continue
answered_flag.answered = True
answered_flag.save ()
case QueryType.KIRIBAN | QueryType.NICO_REPORT:
td = answer.query_rel.transfer_data or { }
video_code: str | None = td.get ('video_code')
if not video_code:
continue

uri = f"https://www.nicovideo.jp/watch/{ video_code }"
(title, description, thumbnail) = nicolib.fetch_embed_info (uri)
try:
resp = requests.get (thumbnail, timeout = 60)
resp.raise_for_status ()
upload = client.com.atproto.repo.upload_blob (BytesIO (resp.content))
thumb = upload.blob
except Timeout:
thumb = None
except Exception as e:
print (f"[answer/nico-thumb] { type (e).__name__ }: { e }")
thumb = None

external = AppBskyEmbedExternal.External (
title = title,
description = description,
thumb = thumb,
uri = uri))
client.post (Talk.main (f"""
ニコニコに『{ datum['title'] }』という動画がアップされました。
つけられたタグは「{ '」、「'.join (datum['tags']) }」です。
概要には次のように書かれています:
```html
{ datum['description'] }
```
このことについて、みんなに告知するとともに、ニジカちゃんの感想を教えてください。 """),
embed = embed_external)
last_posted_at = now

if now.hour == 14 and has_got_snack_time:
has_got_snack_time = False

if now.hour == 15:
if got_kiriban_at < datetime.now ().date ():
kiriban_list = nico.get_kiriban_list (datetime.now ().date ())
got_kiriban_at = datetime.now ().date ()
kiriban_interval = ((get_kiriban_dt_to_update () - datetime.now ())
/ len (kiriban_list))
next_kiriban_at = datetime.now ()

if not has_got_snack_time:
try:
with open ('./assets/snack-time.jpg', 'rb') as f:
image = models.AppBskyEmbedImages.Image (
alt = ('左に喜多ちゃん、右に人面鹿のニジカが'
'V字に並んでいる。'
'喜多ちゃんは右手でピースサインをして'
'片目をウインクしている。'
'ニジカは両手を広げ、'
'右手にスプーンを持って'
'ポーズを取っている。'
'背景には'
'赤と黄色の放射線状の模様が広がり、'
'下部に「おやつタイムだ!!!!」という'
'日本語のテキストが表示されている。'),
image = client.com.atproto.repo.upload_blob (f).blob)
client.post (Talk.main ('おやつタイムだ!!!!'),
embed = models.app.bsky.embed.images.Main (
images = [image]))
last_posted_at = now
except Exception:
pass
has_got_snack_time = True

if now.hour == 20 and has_taken_hot_spring:
has_taken_hot_spring = False

if now.hour == 21 and not has_taken_hot_spring:
try:
with open ('./assets/hot-spring.jpg', 'rb') as f:
image = models.AppBskyEmbedImages.Image (
alt = ('左に喜多ちゃん、右にわさび県産滋賀県が'
'V字に並んでいる。'
'喜多ちゃんは右手でピースサインをして'
'片目をウインクしている。'
'わさび県産滋賀県はただ茫然と'
'立ち尽くしている。'
'背景には'
'血と空の色をした放射線状の模様が広がり、'
'下部に「温泉に入ろう!!!」という'
'日本語のテキストが表示されている。'),
image = client.com.atproto.repo.upload_blob (f).blob)
client.post (Talk.main ('温泉に入ろう!!!'),
embed = models.app.bsky.embed.images.Main (
images = [image]))
last_posted_at = now
except Exception:
pass
has_taken_hot_spring = True

if now - last_posted_at >= timedelta (hours = 6):
client.post (Talk.main ('今どうしてる?'))
last_posted_at = now

time.sleep (60)
uri = uri)
embed_external = AppBskyEmbedExternal.Main (external = external)
try:
client.post (answer.content, embed = embed_external)
except Exception as e:
print (f"[answer/nico-post] { type (e).__name__ }: { e }")
continue
answered_flag.answered = True
answered_flag.save ()
case QueryType.SNACK_TIME:
try:
with open ('./assets/snack-time.jpg', 'rb') as f:
image = AppBskyEmbedImages.Image (
alt = (
'左に喜多ちゃん、右に人面鹿のニジカが'
'V字に並んでいる。'
'喜多ちゃんは右手でピースサインをして'
'片目をウインクしている。'
'ニジカは両手を広げ、'
'右手にスプーンを持って'
'ポーズを取っている。'
'背景には'
'赤と黄色の放射線状の模様が広がり、'
'下部に「おやつタイムだ!!!!」という'
'日本語のテキストが表示されている。'),
image = client.com.atproto.repo.upload_blob (f).blob)
client.post (answer.content,
embed = AppBskyEmbedImages.Main (images = [image]))
answered_flag.answered = True
answered_flag.save ()
except Exception:
pass
case QueryType.HOT_SPRING:
try:
with open ('./assets/hot-spring.jpg', 'rb') as f:
image = AppBskyEmbedImages.Image (
alt = ('左に喜多ちゃん、右にわさび県産滋賀県が'
'V字に並んでいる。'
'喜多ちゃんは右手でピースサインをして'
'片目をウインクしている。'
'わさび県産滋賀県はただ茫然と'
'立ち尽くしている。'
'背景には'
'血と空の色をした放射線状の模様が広がり、'
'下部に「温泉に入ろう!!!」という'
'日本語のテキストが表示されている。'),
image = client.com.atproto.repo.upload_blob (f).blob)
client.post (answer.content,
embed = AppBskyEmbedImages.Main (images = [image]))
answered_flag.answered = True
answered_flag.save ()
except Exception:
pass
await asyncio.sleep (10)


def check_notifications (
client: Client,
) -> list:
(uris, last_seen_at) = ([], client.get_current_time_iso ())
) -> list[str]:
uris: list[str] = []
last_seen_at = client.get_current_time_iso ()

for notification in (client.app.bsky.notification.list_notifications ()
.notifications):
notifications = client.app.bsky.notification.list_notifications ()
for notification in notifications.notifications:
if not notification.is_read:
if notification.reason in ['mention', 'reply', 'quote']:
uris += [notification.uri]
elif notification.reason == 'follow':
client.follow (notification.author.did)
match notification.reason:
case 'mention' | 'reply' | 'quote':
uris.append (notification.uri)
case 'follow':
client.follow (notification.author.did)

client.app.bsky.notification.update_seen ({ 'seen_at': last_seen_at })

return uris


def get_thread_contents (
client: Client,
def fetch_thread_contents (
uri: str,
parent_height: int,
) -> list:
response = (client.get_post_thread (uri = uri,
parent_height = parent_height)
.thread)
records = []
while response is not None:
records += [{ 'strong_ref': models.create_strong_ref (response.post),
'did': response.post.author.did,
'handle': response.post.author.handle,
'name': response.post.author.display_name,
'datetime': response.post.record.created_at,
'text': response.post.record.text,
'embed': response.post.record.embed }]
response = response.parent
) -> list[Record]:
post_thread = client.get_post_thread (uri = uri, parent_height = parent_height)
if not post_thread:
return []

res = post_thread.thread

records: list[Record] = []
while res:
if hasattr (res, 'post'):
records.append ({ 'strong_ref': { 'uri': res.post.uri,
'cid': res.post.cid },
'did': res.post.author.did,
'handle': res.post.author.handle,
'name': (res.post.author.display_name
or res.post.author.handle),
'datetime': res.post.record.created_at,
'text': getattr (res.post.record, 'text', None) or '',
'embed': getattr (res.post.record, 'embed', None) })
res = res.parent
else:
break

return records


def get_embed_info (
url: str
) -> tuple[str, str, str]:
title: str = ''
description: str = ''
thumbnail: str = ''

try:
res = requests.get (url, timeout = 60)
except Timeout:
return ('', '', '')

if res.status_code != 200:
return ('', '', '')

soup = BeautifulSoup (res.text, 'html.parser')

tmp = soup.find ('title')
if tmp is not None:
title = tmp.text

tmp = soup.find ('meta', attrs = { 'name': 'description' })
if tmp is not None and hasattr (tmp, 'get'):
try:
description = cast (str, tmp.get ('content'))
except Exception:
pass

tmp = soup.find ('meta', attrs = { 'name': 'thumbnail' })
if tmp is not None and hasattr (tmp, 'get'):
try:
thumbnail = cast (str, tmp.get ('content'))
except Exception:
pass

return (title, description, thumbnail)
def fetch_target_posts (
) -> list[LikeParams]:
posts: list[LikeParams] = []

timeline: Response = client.get_timeline ()
for feed in timeline.feed:
me = getattr (client, 'me', None)
my_did = me.did if me else ''
if (feed.post.author.did != my_did
and (feed.post.viewer.like is None)
and any (target_word in (feed.post.record.text or '').casefold ()
for target_word in TARGET_WORDS)):
posts.append (LikeParams ({ 'uri': feed.post.uri, 'cid': feed.post.cid }))

def get_kiriban_dt_to_update (
) -> datetime:
now = datetime.now ()
today = now.date ()
dt = datetime.combine (today, dt_time (15, 0))
if dt <= now:
dt += timedelta (days = 1)
return dt
return posts


def like_posts (
client: Client,
def _add_query (
user: User,
content: str,
image_url: str | None = None,
transfer_data: dict[str, Any] | None = None,
) -> None:
for post in get_target_posts (client):
client.like (**post)
query = Query ()
query.user_id = user.id
query.target_character = Character.DEERJIKA.value
query.content = content
query.image_url = image_url or None
query.query_type = QueryType.BLUESKY_COMMENT.value
query.model = GPTModel.GPT3_TURBO.value
query.sent_at = datetime.now ()
query.answered = False
query.transfer_data = transfer_data
query.save ()
# TODO: 履歴情報の追加


def _fetch_user (
did: str,
name: str,
) -> User:
user = User.where ('platform', Platform.BLUESKY.value).where ('code', did).first ()
if user is None:
user = User ()
user.platform = Platform.BLUESKY.value
user.code = did
user.name = name
user.save ()
return user


def get_target_posts (
client: Client,
) -> list[LikeParams]:
posts = []
class LikeParams (TypedDict):
uri: str
cid: str

timeline: Response = client.get_timeline ()
for feed in timeline.feed:
if (feed.post.author.did != client.me.did
and (feed.post.viewer.like is None)
and any (target_word in feed.post.record.text.lower () for target_word in TARGET_WORDS)):
posts.append (LikeParams({ 'uri': feed.post.uri, 'cid': feed.post.cid }))

return posts
class Record (TypedDict):
strong_ref: StrongRef
did: str
handle: str
name: str
datetime: str
text: str
embed: object


class LikeParams (TypedDict):
class StrongRef (TypedDict):
uri: str
cid: str


if __name__ == '__main__':
main (*sys.argv[1:])
asyncio.run (main ())

+ 0
- 174
nico.py View File

@@ -1,174 +0,0 @@
"""
ニコニコのニジカ動画取得モヂュール
"""

from __future__ import annotations

import os
from datetime import date, datetime, timedelta
from typing import TypedDict, cast

import requests
from bs4 import BeautifulSoup
from requests.exceptions import Timeout
from eloquent import DatabaseManager, Model

from db.models import Comment, Tag, Video, VideoHistory, VideoTag

CONFIG: dict[str, DbConfig] = { 'mysql': { 'driver': 'mysql',
'host': 'localhost',
'database': 'nizika_nico',
'user': os.environ['MYSQL_USER'],
'password': os.environ['MYSQL_PASS'],
'prefix': '' } }

DB = DatabaseManager (CONFIG)
Model.set_connection_resolver (DB)

KIRIBAN_VIEWS_COUNTS: list[int] = sorted ({ *range (1_000, 10_000, 1_000),
*range (10_000, 1_000_001, 10_000),
194, 245, 510, 810, 114_514, 1_940, 2_450, 5_100,
19_400, 24_500, 51_000, 93_194, 2_424, 242_424, 1_919,
4_545, 194_245, 245_194, 510_245 },
reverse = True)

class VideoInfo (TypedDict):
contentId: str
title: str
tags: list[str]
description: str


def get_latest_deerjika (
) -> VideoInfo | None:
tag = '伊地知ニジカ OR ぼざろクリーチャーシリーズ'
url = f"https://www.nicovideo.jp/tag/{ tag }"

params = { 'sort': 'f',
'order': 'd' }

video_info = { }

bs = get_bs_from_url (url, params)
if bs is None:
return None

try:
video = (bs.find_all ('ul', class_ = 'videoListInner')[1]
.find ('li', class_ = 'item'))

video_info['contentId'] = video['data-video-id']
except Exception:
return None

return get_video_info (video_info['contentId'])


def get_bs_from_url (
url: str,
params: dict = { },
) -> BeautifulSoup | None:
"""
URL から BeautifulSoup インスタンス生成

Parameters
----------
url: str
捜査する URL
params: dict
パラメータ

Return
------
BeautifulSoup | None
BeautifulSoup オブゼクト(失敗したら None)
"""

try:
req = requests.get (url, params = params, timeout = 60)
except Timeout:
return None

if req.status_code != 200:
return None

req.encoding = req.apparent_encoding

return BeautifulSoup (req.text, 'html.parser')


def get_video_info (
video_code: str,
) -> VideoInfo | None:
video_info: dict[str, str | list[str]] = { 'contentId': video_code }

bs = get_bs_from_url (f"https://www.nicovideo.jp/watch/{ video_code }")
if bs is None:
return None

try:
title = bs.find ('title')
if title is None:
return None
video_info['title'] = '-'.join (title.text.split ('-')[:(-1)])[:(-1)]

tags: str = bs.find ('meta', attrs = { 'name': 'keywords' }).get ('content') # type: ignore
video_info['tags'] = tags.split (',')

video_info['description'] = bs.find ('meta', attrs = { 'name': 'description' }).get ('content') # type: ignore
except Exception:
return None

return cast (VideoInfo, video_info)


def get_kiriban_list (
base_date: date,
) -> list[tuple[int, VideoInfo, datetime]]:
kiriban_list: list[tuple[int, VideoInfo, datetime]] = []

latest_fetched_at = cast (date, (VideoHistory
.where ('fetched_at', '<=', base_date)
.max ('fetched_at')))

for kiriban_views_count in KIRIBAN_VIEWS_COUNTS:
targets = { vh.video.code for vh in (VideoHistory
.where ('fetched_at', latest_fetched_at)
.where ('views_count', '>=', kiriban_views_count)
.get ()) }
for code in targets:
if code in [kiriban[1]['contentId'] for kiriban in kiriban_list]:
continue
previous_views_count: int | None = (
VideoHistory
.where_has ('video', lambda q: q.where ('code', code))
.where ('fetched_at', '<', latest_fetched_at)
.max ('views_count'))
if previous_views_count is None:
previous_views_count = 0
if previous_views_count >= kiriban_views_count:
continue
video_info = get_video_info (code)
if video_info is not None:
kiriban_list.append ((kiriban_views_count, video_info,
cast (Video, Video.where ('code', code).first ()).uploaded_at))

return kiriban_list


def get_comments (
video_code: str,
) -> list[Comment]:
video = Video.where ('code', video_code).first ()
if video is None:
return []
return video.comments


class DbConfig (TypedDict):
driver: str
host: str
database: str
user: str
password: str
prefix: str

+ 1
- 0
nicolib

@@ -0,0 +1 @@
Subproject commit 32ecf2d00fcba876ed5afe48626058b2b4795399

+ 1
- 0
nizika_ai

@@ -0,0 +1 @@
Subproject commit 3be6d9063c987deaceee24a1d16296d21319778c

Loading…
Cancel
Save