34 コミット

作成者 SHA1 メッセージ 日付
みてるぞ e6f58e621d 追跡対象動画バルク UPSERT (#22) (#23)
#22 インポート漏れ修正

#22

#22

Co-authored-by: miteruzo <miteruzo@naver.com>
Reviewed-on: #23
2026-04-24 23:25:34 +09:00
みてるぞ cb72b8dd99 削除フラグが誤って付与されるバグ修正(#20) (#21)
#20

#20

#20

#020

Co-authored-by: miteruzo <miteruzo@naver.com>
Reviewed-on: #21
2026-04-11 05:13:29 +09:00
みてるぞ b2adf62090 投稿者情報追加(#17) (#18)
#17

#17

Co-authored-by: miteruzo <miteruzo@naver.com>
Reviewed-on: #18
2026-03-05 21:03:16 +09:00
みてるぞ a3d9d0bfd7 feat: タグと関係なしに追跡する動画リスト追加 (#16)
#15 タグ関係なく追跡する動画リスト

Co-authored-by: miteruzo <miteruzo@naver.com>
Reviewed-on: #16
2026-01-01 14:00:11 +09:00
みてるぞ f44637d274 コメント取得追加 2025-10-26 05:07:07 +09:00
みてるぞ f809e9faae キリ番修正 2025-10-26 01:19:15 +09:00
みてるぞ de8fd8634a キリ番修正 2025-10-26 01:07:15 +09:00
みてるぞ 88be511f6e キリ番修正 2025-10-26 01:06:41 +09:00
みてるぞ ea339f1ec9 キリ番修正 2025-10-26 01:05:48 +09:00
みてるぞ 06328a89b2 キリ番追加 2025-10-26 00:45:29 +09:00
みてるぞ 463e8bbec7 config に移した. 2025-10-22 23:25:49 +09:00
みてるぞ baa75d68ba ゑぐぃバグ修正 2025-08-15 02:51:06 +09:00
みてるぞ 48e51f97d0 'update_db.py' を更新 2025-07-19 19:33:37 +09:00
みてるぞ c5204383ed #13 2025-07-01 23:53:11 +09:00
みてるぞ bf36d05ed3 #13 2025-07-01 23:48:30 +09:00
みてるぞ c9bd6fdfa7 #13 2025-07-01 23:39:57 +09:00
みてるぞ b2f5f81ca8 型定義追加 2024-11-06 03:56:28 +09:00
みてるぞ 67b76e6dd4 #12 2024-11-05 12:34:10 +09:00
みてるぞ 6a5e6dfade ごみを削除 2024-10-17 18:59:25 +09:00
みてるぞ 6e99da7326 Python 3.10 に適合したのと __pycache__ 除外 2024-10-17 18:57:03 +09:00
みてるぞ 53ba658319 #8 の対応 2024-10-17 12:49:13 +09:00
みてるぞ 067c90890e video_histories への書込みを Upsert に 2024-10-16 22:40:42 +09:00
みてるぞ db14af1a73 もろもろ修正 2024-10-16 22:17:35 +09:00
みてるぞ da17333a80 typing.Self は Python 3.10 には未実装か,さぅか. 2024-10-16 22:16:04 +09:00
みてるぞ ee971997ad #6 の対応 2024-10-16 22:08:07 +09:00
みてるぞ b448335851 もろもろのモロヘイヤ 2024-10-16 20:20:50 +09:00
みてるぞ 9f810b23f0 Upsert 問題の修正とその他無駄を排除 2024-10-15 23:01:03 +09:00
みてるぞ c91cf19926 Eloquent の型定義ファイルと型安全性確認;本番環境に取込み可能 2024-10-15 00:18:35 +09:00
みてるぞ 6185788456 Merge branch 'main' into feature/query 2024-10-14 19:15:22 +09:00
みてるぞ 283b628053 課題 #5 に対する対応 2024-10-14 19:12:37 +09:00
みてるぞ 05182b251f Merge branch 'main' into feature/query 2024-10-14 18:23:39 +09:00
みてるぞ 5eb3fb6037 外されたタグを再登録できてなぃバグ修正 2024-10-14 18:16:56 +09:00
みてるぞ b6c041ddad Merge branch 'main' into feature/query 2024-10-12 22:21:21 +09:00
みてるぞ e23d919919 タグ削除チェックは大文字に変換して行ふやぅ修正 2024-10-12 22:10:33 +09:00
13個のファイルの変更839行の追加257行の削除
+1
ファイルの表示
@@ -0,0 +1 @@
__pycache__
+25
ファイルの表示
@@ -0,0 +1,25 @@
from __future__ import annotations
import os
from typing import TypedDict
from eloquent import DatabaseManager, Model # type: ignore
CONFIG: dict[str, DbConfig] = { 'mysql': { 'driver': 'mysql',
'host': 'localhost',
'database': 'nizika_nico',
'user': os.environ['MYSQL_USER'],
'password': os.environ['MYSQL_PASS'],
'prefix': '' } }
DB = DatabaseManager (CONFIG)
Model.set_connection_resolver (DB)
class DbConfig (TypedDict):
driver: str
host: str
database: str
user: str
password: str
prefix: str
+145
ファイルの表示
@@ -0,0 +1,145 @@
# pylint: disable = missing-class-docstring
# pylint: disable = missing-function-docstring
# pylint: disable = missing-module-docstring
# pylint: disable = unused-argument
from __future__ import annotations
from typing import Any, Generic, Type, TypeVar, overload
from typing_extensions import Self
_ModelT = TypeVar ('_ModelT', bound = 'Model')
class Connection:
def select (self, query: str, bindings: dict[str, Any] | None = None) -> Any: ...
def insert (self, query: str, bindings: dict[str, Any] | None = None) -> int: ...
def update (self, query: str, bindings: dict[str, Any] | None = None) -> int: ...
def delete (self, query: str, bindings: dict[str, Any] | None = None) -> int: ...
def transaction (self, callback: Any) -> Any: ...
def begin_transaction (self) -> None: ...
def commit (self) -> None: ...
def rollback (self) -> None: ...
class ConnectionResolver:
def connection (self, name: str | None = None) -> Any: ...
def get_default_connection (self) -> str: ...
def set_default_connection (self, name: str) -> None: ...
class DatabaseManager:
connections: dict[str, Connection]
def __init__ (self, config: dict[str, Any]) -> None: ...
def connection (self, name: str | None = None) -> Connection: ...
def disconnect (self, name: str | None = None) -> None: ...
def reconnect (self, name: str | None = None) -> Connection: ...
def get_connections (self) -> dict[str, Connection]: ...
class Model:
id: int
_Model__exists: bool
def has_one (
self,
related_model: Type[_ModelT],
foreign_key: str | None = None,
) -> _ModelT: ...
def has_many (
self,
related_model: Type[_ModelT],
foreign_key: str | None = None,
) -> list[_ModelT]: ...
def belongs_to (
self,
related_model: Type[_ModelT],
foreign_key: str | None = None,
) -> _ModelT: ...
def belongs_to_many (
self,
related_model: Type[_ModelT],
foreign_key: str | None = None,
) -> list[_ModelT]: ...
def save (self) -> None: ...
def delete (self) -> None: ...
@classmethod
def find (cls, id_: int) -> Self | None: ...
@classmethod
def query (
cls,
) -> QueryBuilder[Self]: ...
@overload
@classmethod
def where (
cls,
field: str,
operator: str,
value: Any,
) -> QueryBuilder[Self]: ...
@overload
@classmethod
def where (cls, field: str, value: Any) -> QueryBuilder[Self]: ...
@classmethod
def where_not_in (
cls,
column: str,
values: list[Any] | tuple
) -> QueryBuilder[Self]: ...
@classmethod
def where_not_null (cls, field: str) -> QueryBuilder[Self]: ...
@classmethod
def max (cls, column: str) -> Any: ...
@classmethod
def set_connection_resolver (cls, resolver: DatabaseManager) -> None: ...
class QueryBuilder (Generic[_ModelT]):
def first (self) -> _ModelT | None: ...
def get (self) -> list[_ModelT]: ...
@overload
def where (
self,
field: str,
operator: str,
value: Any,
) -> QueryBuilder[_ModelT]: ...
@overload
def where (self, field: str, value: Any) -> QueryBuilder[_ModelT]: ...
def where_null (self, field: str) -> QueryBuilder[_ModelT]: ...
def max (self, column: str) -> Any: ...
def _load_relation (self, relation_name: str) -> QueryBuilder[_ModelT]: ...
+177
ファイルの表示
@@ -0,0 +1,177 @@
# pylint: disable = missing-class-docstring
# pylint: disable = missing-function-docstring
"""
ぼざクリ DB の構成
"""
from __future__ import annotations
from datetime import date, datetime
from db.my_eloquent import Model
class Comment (Model):
# pylint: disable = too-many-instance-attributes
id: int
video_id: int
comment_no: int
user_id: int
content: str
posted_at: datetime
nico_count: int
vpos_ms: int
__timestamps__ = False
@property
def video (
self,
) -> Video:
return self.belongs_to (Video)
@property
def user (
self,
) -> User:
return self.belongs_to (User)
def upsert (
self,
*args: str,
) -> None:
super ().upsert ('video_id', 'comment_no')
class Tag (Model):
id: int
name: str
__timestamps__ = False
@property
def video_tags (
self,
) -> list[VideoTag]:
return self.has_many (VideoTag)
class TrackedVideo (Model):
id: int
code: str
__timestamps__ = False
def upsert (
self,
*args: str,
) -> None:
super ().upsert ('code')
class User (Model):
id: int
code: str
__timestamps__ = False
@property
def comments (
self,
) -> list[Comment]:
return self.has_many (Comment)
class Video (Model):
id: int
code: str
user_id: int | None
title: str
description: str
uploaded_at: datetime
deleted_at: datetime | None
__timestamps__ = False
@property
def user (
self,
) -> User | None:
if self.user_id is None:
return None
return self.belongs_to (User)
@property
def video_histories (
self,
) -> list[VideoHistory]:
return self.has_many (VideoHistory)
@property
def video_tags (
self,
) -> list[VideoTag]:
return self.has_many (VideoTag)
@property
def comments (
self,
) -> list[Comment]:
return self.has_many (Comment)
def upsert (
self,
*args: str,
) -> None:
super ().upsert ('code')
class VideoHistory (Model):
id: int
video_id: int
fetched_at: date
views_count: int
__timestamps__ = False
@property
def video (
self,
) -> Video:
return self.belongs_to (Video)
def upsert (
self,
*args: str,
) -> None:
super ().upsert ('video_id', 'fetched_at')
class VideoTag (Model):
id: int
video_id: int
tag_id: int
tagged_at: date
untagged_at: date | None
__timestamps__ = False
@property
def video (
self,
) -> Video:
return self.belongs_to (Video)
@property
def tag (
self,
) -> Tag:
return self.belongs_to (Tag)
def upsert (
self,
*args: str,
) -> None:
super ().upsert ('video_id', 'tag_id')
+50
ファイルの表示
@@ -0,0 +1,50 @@
# pylint: disable = missing-class-docstring
# pylint: disable = missing-function-docstring
"""
みてるぞ式魔改造(言ふほどか?)版 Eloquent
"""
import eloquent
class DatabaseManager (eloquent.DatabaseManager):
pass
class Model (eloquent.Model):
id: int
def upsert (
self,
*args: str,
) -> None:
row = self._find_upsert_row (*args)
if row is not None:
self.id = row.id
# pylint: disable = invalid-name
# pylint: disable = attribute-defined-outside-init
self._Model__exists = True
self.save ()
return
try:
self.save ()
except Exception:
row = self._find_upsert_row (*args)
if row is None:
raise
self.id = row.id
# pylint: disable = invalid-name
# pylint: disable = attribute-defined-outside-init
self._Model__exists = True
self.save ()
def _find_upsert_row (
self,
*args: str,
):
q = self.query ()
for arg in args:
q = q.where (arg, getattr (self, arg))
return q.first ()
シンボリックリンク
+1
ファイルの表示
@@ -0,0 +1 @@
db/eloquent.pyi
+55
ファイルの表示
@@ -0,0 +1,55 @@
# pylint: disable = missing-class-docstring
# pylint: disable = missing-function-docstring
"""
動画コードからコメントのリストを取得し,JSON 形式で出力する.
"""
from __future__ import annotations
import json
import sys
from datetime import datetime
from typing import TypedDict
from db.config import DB
from db.models import Video
DB
def main (
video_code: str,
) -> None:
video = Video.where ('code', video_code).first ()
if video:
comments: list[CommentDict] = []
for row in video.comments:
comment: CommentDict = {
'id': row.id,
'video_id': row.video_id,
'comment_no': row.comment_no,
'user_id': row.user_id,
'content': row.content,
'posted_at': row.posted_at,
'nico_count': row.nico_count,
'vpos_ms': row.vpos_ms }
comments.append (comment)
print (json.dumps (comments, default = str))
else:
print ('[]')
class CommentDict (TypedDict):
id: int
video_id: int
comment_no: int
user_id: int
content: str
posted_at: datetime
nico_count: int
vpos_ms: int
if __name__ == '__main__':
main (sys.argv[1])
+65
ファイルの表示
@@ -0,0 +1,65 @@
# pylint: disable = missing-class-docstring
# pylint: disable = missing-function-docstring
"""
動画履歴の情報を取得し,JSON 形式で出力する.
"""
from __future__ import annotations
import json
import sys
from datetime import date, datetime
from typing import cast
from db.config import DB
from db.models import Video, VideoHistory
DB
def main (
views_counts: list[int],
base_date: date,
) -> None:
kiriban_list: list[tuple[int, str, str]] = []
latest_fetched_at = cast (date | None,
(VideoHistory
.where ('fetched_at', '<=', base_date)
.max ('fetched_at')))
if latest_fetched_at is None:
print ('[]')
return
for views_count in views_counts:
targets = { vh.video.code for vh in (
VideoHistory
.where ('fetched_at', latest_fetched_at)
.where ('views_count', '>=', views_count)
.get ()) }
for code in targets:
if code in [kiriban[1] for kiriban in kiriban_list]:
continue
previous_views_count: int | None = (
VideoHistory
.where_has ('video', lambda q, code = code: q.where ('code', code))
.where ('fetched_at', '<', latest_fetched_at)
.max ('views_count'))
if previous_views_count is None:
previous_views_count = 0
if previous_views_count >= views_count:
continue
kiriban_list.append ((views_count, code,
(cast (Video, Video.where ('code', code).first ())
.uploaded_at)))
print (json.dumps (kiriban_list, default = str))
if __name__ == '__main__':
main (list (map (int, sys.argv[2:])),
datetime.strptime (sys.argv[1], '%Y-%m-%d').date ())
+52
ファイルの表示
@@ -0,0 +1,52 @@
# pylint: disable = missing-class-docstring
# pylint: disable = missing-function-docstring
"""
全動画の情報を取得し,JSON 形式で出力する.
"""
from __future__ import annotations
import json
from datetime import date, datetime
from typing import TypedDict
from db.config import DB
from db.models import Video
DB
def main (
) -> None:
videos: list[VideoDict] = []
for row in Video.all ():
deleted_at = row.deleted_at.date () if row.deleted_at else None
video: VideoDict = { 'id': row.id,
'code': row.code,
'user': getattr (row.user, 'code', None),
'title': row.title,
'description': row.description,
'tags': [],
'uploaded_at': row.uploaded_at,
'deleted_at': deleted_at }
for video_tag in row.video_tags:
if video_tag.untagged_at is None:
video['tags'].append (video_tag.tag.name)
videos.append (video)
print (json.dumps (videos, default = str))
class VideoDict (TypedDict):
id: int
code: str
user: str | None
title: str
description: str
tags: list[str]
uploaded_at: datetime
deleted_at: date | None
if __name__ == '__main__':
main ()
+2
ファイルの表示
@@ -0,0 +1,2 @@
CREATE TABLE `nizika_nico`.`tracked_videos` (`id` BIGINT NOT NULL AUTO_INCREMENT , `code` VARCHAR(16) NOT NULL COMMENT '動画コード' , PRIMARY KEY (`id`)) ENGINE = InnoDB COMMENT = '追跡対象動画';
ALTER TABLE `tracked_videos` ADD UNIQUE(`code`);
+3
ファイルの表示
@@ -0,0 +1,3 @@
ALTER TABLE `videos` ADD `user_id` BIGINT NULL DEFAULT NULL COMMENT 'ユーザ Id.' AFTER `code`;
ALTER TABLE `videos` ADD INDEX(`user_id`);
ALTER TABLE `videos` ADD FOREIGN KEY (`user_id`) REFERENCES `users`(`id`) ON DELETE RESTRICT ON UPDATE CASCADE;
+19
ファイルの表示
@@ -0,0 +1,19 @@
import sys
from db.config import DB
from db.models import TrackedVideo
DB
def main (
video_codes: list[str],
) -> None:
for code in video_codes:
tv = TrackedVideo ()
tv.code = code
tv.upsert ()
if __name__ == '__main__':
main (sys.argv[1:])
+218 -231
ファイルの表示
@@ -1,3 +1,6 @@
# pylint: disable = missing-class-docstring
# pylint: disable = missing-function-docstring
""" """
日次で実行し,ぼざクリ DB を最新に更新する. 日次で実行し,ぼざクリ DB を最新に更新する.
""" """
@@ -5,110 +8,157 @@
from __future__ import annotations from __future__ import annotations
import json import json
import os import logging
import random import random
import string import string
import time import time
from dataclasses import dataclass import unicodedata
from datetime import date, datetime, timedelta from datetime import date, datetime, timedelta
from typing import Any, Type, TypedDict, cast from typing import Any, TypedDict, cast
import jaconv
import requests import requests
from eloquent import DatabaseManager, Model
from eloquent.orm.relations.dynamic_property import DynamicProperty
config: dict[str, DbConfig] = { 'mysql': { 'driver': 'mysql', from db.config import DB
'host': 'localhost', from db.models import (Comment,
'database': 'nizika_nico', Tag,
'user': os.environ['MYSQL_USER'], TrackedVideo,
'password': os.environ['MYSQL_PASS'], User,
'prefix': '' } } Video,
db = DatabaseManager (config) VideoHistory,
Model.set_connection_resolver (db) VideoTag)
logger = logging.getLogger (__name__)
logging.basicConfig (
level = logging.INFO,
format = '%(asctime)s %(levelname)s %(message)s')
def main ( def main (
) -> None: ) -> None:
now = datetime.now () now = datetime.now ()
today = now.date ()
api_data = search_nico_by_tags (['伊地知ニジカ', 'ぼざろクリーチャーシリーズ']) search_result = search_nico_by_tags (['伊地知ニジカ',
update_tables (api_data, now) 'ぼざろクリーチャーシリーズ',
'ぼざろクリーチャーシリーズ外伝'])
comments_by_video_code = fetch_comments_by_video_code (search_result['videos'])
context: UpdateContext = { 'api_data': search_result['videos'],
'comments_by_video_code': comments_by_video_code,
'deletable': search_result['is_complete'] }
connection = DB.connection ()
connection.begin_transaction ()
try:
update_tables (context, now, today)
connection.commit ()
except Exception:
connection.rollback ()
raise
def update_tables ( def update_tables (
api_data: list[VideoResult], context: UpdateContext,
now: datetime, now: datetime,
today: date,
) -> None: ) -> None:
alive_video_codes: list[str] = [] alive_video_codes: list[str] = []
for datum in api_data: for datum in context['api_data']:
tag_names: list[str] = datum['tags'].split () tag_names = datum['tags'].split ()
normalised_tag_names = {normalise (tag_name) for tag_name in tag_names}
user: User | None = None
if datum['userId'] is not None:
user = User.where ('code', str (datum['userId'])).first ()
if user is None:
user = User ()
user.code = str (datum['userId'])
user.save ()
video = Video () video = Video ()
video.code = datum['contentId'] video.code = datum['contentId']
video.user_id = user.id if user else None
video.title = datum['title'] video.title = datum['title']
video.description = datum['description'] or '' video.description = datum['description'] or ''
video.uploaded_at = datetime.fromisoformat (datum['startTime']) video.uploaded_at = datetime.fromisoformat (datum['startTime'])
video.deleted_at = None video.deleted_at = None
video.upsert () video.upsert ()
alive_video_codes.append (video.code) alive_video_codes.append (video.code)
video_history = VideoHistory () video_history = VideoHistory ()
video_history.video_id = video.id video_history.video_id = video.id
video_history.fetched_at = now video_history.fetched_at = today
video_history.views_count = datum['viewCounter'] video_history.views_count = datum['viewCounter']
video_history.save () video_history.upsert ()
video_tags = video.video_tags.where_not_null ('untagged_at').get ()
video_tags = [video_tag for video_tag in video.video_tags
if video_tag.untagged_at is None]
for video_tag in video_tags: for video_tag in video_tags:
tag = video_tag.tag tag = video_tag.tag
if tag is not None and tag.name not in tag_names: if tag is None:
video_tag.untagged_at = now continue
if normalise (tag.name) in normalised_tag_names:
continue
video_tag.untagged_at = today
video_tag.save () video_tag.save ()
tags: list[Tag] = []
for tag_name in tag_names: for tag_name in tag_names:
tag = Tag.where ('name', tag_name).first () tag = Tag.where ('name', tag_name).first ()
if tag is None: if tag is None:
tag = Tag () tag = Tag ()
tag.name = tag_name tag.name = tag_name
tag.save () tag.save ()
video_tag = (Video.where ('video_id', video.id)
video_tag = (VideoTag.where ('video_id', video.id)
.where ('tag_id', tag.id) .where ('tag_id', tag.id)
.where_null ('untagged_at')
.first ()) .first ())
if video_tag is None: if video_tag is None:
video_tag = VideoTag () video_tag = VideoTag ()
video_tag.video_id = video.id video_tag.video_id = video.id
video_tag.tag_id = tag.id video_tag.tag_id = tag.id
video_tag.tagged_at = now
video_tag.tagged_at = getattr (video_tag, 'tagged_at', None) or today
video_tag.untagged_at = None video_tag.untagged_at = None
video_tag.save () video_tag.upsert ()
for com in fetch_comments (video.code):
for com in context['comments_by_video_code'].get (video.code, []):
user = User.where ('code', com['userId']).first () user = User.where ('code', com['userId']).first ()
if user is None: if user is None:
user = User () user = User ()
user.code = com['userId'] user.code = com['userId']
user.save () user.save ()
comment = Comment () comment = Comment ()
comment.video_id = video.id comment.video_id = video.id
comment.comment_no = com['no'] comment.comment_no = com['no']
comment.user_id = user.id comment.user_id = user.id
comment.content = com['body'] comment.content = com['body']
comment.posted_at = datetime.fromisoformat (com['postedAt']) comment.posted_at = datetime.fromisoformat (com['postedAt'])
comment.nico_count = com['nicoruCount'] comment.nico_count = com.get ('nicoruCount', 0)
comment.vpos_ms = com['vposMs'] comment.vpos_ms = com.get ('vposMs', 0)
comment.upsert () comment.upsert ()
# 削除動画 if not context['deletable']:
logger.warning ('skip soft-delete because the latest fetch was incomplete')
return
if not alive_video_codes:
logger.warning ('skip soft-delete because no alive videos were fetched')
return
videos = (Video.where_not_in ('code', alive_video_codes) videos = (Video.where_not_in ('code', alive_video_codes)
.where_null ('deleted_at') .where_null ('deleted_at')
.get ()) .get ())
for video in videos: for video in videos:
if video.code not in alive_video_codes:
video.deleted_at = now video.deleted_at = now
video.save () video.save ()
def fetch_comments ( def fetch_video_data (
video_code: str, video_code: str,
) -> list[CommentResult]: ) -> dict[str, Any]:
time.sleep (1.2) time.sleep (1.2)
headers = { 'X-Frontend-Id': '6', headers = { 'X-Frontend-Id': '6',
@@ -120,15 +170,38 @@ def fetch_comments (
+ '_' + '_'
+ str (random.randrange (10 ** 12, 10 ** 13))) + str (random.randrange (10 ** 12, 10 ** 13)))
url = (f"https://www.nicovideo.jp/api/watch/v3_guest/{ video_code }" url = (f'https://www.nicovideo.jp/api/watch/v3_guest/{ video_code }'
+ f"?actionTrackId={ action_track_id }") + f'?actionTrackId={ action_track_id }')
res = requests.post (url, headers = headers, timeout = 60).json () return requests.post (url, headers = headers, timeout = 60).json ()
def fetch_comments_by_video_code (
videos: list[VideoResult],
) -> dict[str, list[CommentResult]]:
comments_by_video_code: dict[str, list[CommentResult]] = {}
for video in videos:
video_code = video['contentId']
try: try:
nv_comment = res['data']['comment']['nvComment'] comments_by_video_code[video_code] = fetch_comments (video_code)
except KeyError: except (KeyError,
return [] TypeError,
ValueError,
requests.RequestException) as exc:
logger.warning ('failed to fetch comments: %s (%s)', video_code, exc)
comments_by_video_code[video_code] = []
return comments_by_video_code
def fetch_comments (
video_code: str,
) -> list[CommentResult]:
video_data = fetch_video_data (video_code)
nv_comment = (video_data.get ('data', {})
.get ('comment', {})
.get ('nvComment'))
if nv_comment is None: if nv_comment is None:
return [] return []
@@ -137,52 +210,87 @@ def fetch_comments (
'Content-Type': 'application/json' } 'Content-Type': 'application/json' }
params = { 'params': nv_comment['params'], params = { 'params': nv_comment['params'],
'additionals': { }, 'additionals': {},
'threadKey': nv_comment['threadKey'] } 'threadKey': nv_comment['threadKey'] }
url = nv_comment['server'] + '/v1/threads' url = nv_comment['server'] + '/v1/threads'
res = (requests.post (url, json.dumps (params), response = requests.post (url,
json = params,
headers = headers, headers = headers,
timeout = 60) timeout = 60)
.json ()) response.raise_for_status ()
res = response.json ()
try: return select_comments_from_threads (res)
return res['data']['threads'][1]['comments']
except (IndexError, KeyError):
def select_comments_from_threads (
response: dict[str, Any],
) -> list[CommentResult]:
threads = response.get ('data', {}).get ('threads', [])
if not isinstance (threads, list):
return [] return []
main_comments: list[CommentResult] = []
fallback_comments: list[CommentResult] = []
def search_nico_by_tag ( for thread in threads:
tag: str, comments = thread.get ('comments') if isinstance (thread, dict) else None
) -> list[VideoResult]: if not isinstance (comments, list):
return search_nico_by_tags ([tag]) continue
casted_comments = cast (list[CommentResult], comments)
if len (casted_comments) > len (fallback_comments):
fallback_comments = casted_comments
fork = str (thread.get ('fork', '')).lower ()
label = str (thread.get ('label', '')).lower ()
thread_id = str (thread.get ('id', '')).lower ()
if fork == 'main' or 'main' in label or 'main' in thread_id:
main_comments = casted_comments
selected_comments = main_comments or fallback_comments
deduped_comments: dict[int, CommentResult] = {}
for comment in selected_comments:
comment_no = comment.get ('no')
if not isinstance (comment_no, int):
continue
deduped_comments[comment_no] = comment
return [deduped_comments[comment_no]
for comment_no in sorted (deduped_comments)]
def search_nico_by_tags ( def search_nico_by_tags (
tags: list[str], tags: list[str],
) -> list[VideoResult]: ) -> SearchNicoResult:
today = datetime.now () today = datetime.now ()
url = ('https://snapshot.search.nicovideo.jp' url = ('https://snapshot.search.nicovideo.jp'
+ '/api/v2/snapshot/video/contents/search') + '/api/v2/snapshot/video/contents/search')
result_data: list[VideoResult] = [] result_by_video_code: dict[str, VideoResult] = {}
is_complete = True
to = datetime (2022, 12, 3) to = datetime (2022, 12, 3)
while to <= today: while to <= today:
time.sleep (1.2) time.sleep (1.2)
until = to + timedelta (days = 14) until = to + timedelta (days = 14)
# pylint: disable = consider-using-f-string
query_filter = json.dumps ({ 'type': 'or', query_filter = json.dumps ({ 'type': 'or',
'filters': [ 'filters': [
{ 'type': 'range', { 'type': 'range',
'field': 'startTime', 'field': 'startTime',
'from': '%04d-%02d-%02dT00:00:00+09:00' % (to.year, to.month, to.day), 'from': ('%04d-%02d-%02dT00:00:00+09:00'
'to': '%04d-%02d-%02dT23:59:59+09:00' % (until.year, until.month, until.day), % (to.year, to.month, to.day)),
'to': ('%04d-%02d-%02dT23:59:59+09:00'
% (until.year, until.month, until.day)),
'include_lower': True }] }) 'include_lower': True }] })
params: VideoSearchParam = { 'q': ' OR '.join (tags), params: VideoSearchParam = { 'q': ' OR '.join (tags),
'targets': 'tagsExact', 'targets': 'tagsExact',
'_sort': '-viewCounter', '_sort': '-viewCounter',
'fields': ('contentId,' 'fields': ('contentId,'
'userId,'
'title,' 'title,'
'tags,' 'tags,'
'description,' 'description,'
@@ -190,137 +298,67 @@ def search_nico_by_tags (
'startTime'), 'startTime'),
'_limit': 100, '_limit': 100,
'jsonFilter': query_filter } 'jsonFilter': query_filter }
res = requests.get (url, params = cast (dict[str, int | str], params), timeout = 60).json ()
try: try:
result_data += res['data'] response = requests.get (
except KeyError: url,
pass params = cast (dict[str, int | str], params),
timeout = 60)
response.raise_for_status ()
res = response.json ()
for datum in cast (list[VideoResult], res.get ('data', [])):
result_by_video_code[datum['contentId']] = datum
except (ValueError, requests.RequestException) as exc:
logger.warning ('snapshot fetch failed: %s - %s (%s)',
to.date (),
until.date (),
exc)
is_complete = False
to = until + timedelta (days = 1) to = until + timedelta (days = 1)
return result_data for video in TrackedVideo.get ():
if video.code in result_by_video_code:
continue
try:
tracked_video = video
video_data = fetch_video_data (tracked_video.code)['data']
owner = video_data.get ('owner') or {}
video_info = video_data['video']
result_by_video_code[tracked_video.code] = {
'contentId': tracked_video.code,
'userId': owner.get ('id'),
'title': video_info['title'],
'tags': ' '.join (map (lambda t: t['name'],
video_data['tag']['items'])),
'description': video_info['description'],
'viewCounter': video_info['count']['view'],
'startTime': video_info['registeredAt'] }
except (KeyError,
TypeError,
ValueError,
requests.RequestException) as exc:
logger.warning ('tracked video fetch failed: %s (%s)', video.code, exc)
is_complete = False
return { 'videos': list (result_by_video_code.values ()),
'is_complete': is_complete }
class Comment (Model): def normalise (
__timestamps__ = False text: str,
) -> str:
@property return jaconv.hira2kata (
def video ( unicodedata.normalize ('NFKC', text.strip ())).lower ()
self,
) -> DynamicProperty:
return self.belongs_to (Video)
@property
def user (
self,
) -> DynamicProperty:
return self.belongs_to (User)
class Tag (Model): class SearchNicoResult (TypedDict):
__timestamps__ = False videos: list['VideoResult']
is_complete: bool
@property
def video_tags (
self,
) -> DynamicProperty:
return self.has_many (VideoTag)
class User (Model): class UpdateContext (TypedDict):
__timestamps__ = False api_data: list['VideoResult']
comments_by_video_code: dict[str, list['CommentResult']]
@property deletable: bool
def comments (
self,
) -> DynamicProperty:
return self.has_many (Comment)
class Video (Model):
__timestamps__ = False
@property
def video_histories (
self,
) -> DynamicProperty:
return self.has_many (VideoHistory)
@property
def video_tags (
self,
) -> DynamicProperty:
return self.has_many (VideoTag)
@property
def comments (
self,
) -> DynamicProperty:
return self.has_many (Comment)
def upsert (
self,
) -> None:
row = Video.where ('code', self.code).first ()
if row is not None:
self.id = row.id
self.save ()
class VideoHistory (Model):
__timestamps__ = False
@property
def video (
self,
) -> DynamicProperty:
return self.belongs_to (Video)
def upsert (
self,
) -> None:
row = (Video
.where ('video_id', self.video_id)
.where ('fetched_at', self.fetched_at)
.first ())
if row is not None:
self.id = row.id
self.save ()
class VideoTag (Model):
__timestamps__ = False
@property
def video (
self,
) -> DynamicProperty:
return self.belongs_to (Video)
@property
def tag (
self,
) -> DynamicProperty:
return self.belongs_to (Tag)
def upsert (
self,
) -> None:
row = (Video
.where ('video_id', self.video_id)
.where ('tag_id', self.tag_id)
.first ())
if row is not None:
self.id = row.id
self.save ()
class DbConfig (TypedDict):
driver: str
host: str
database: str
user: str
password: str
prefix: str
class VideoSearchParam (TypedDict): class VideoSearchParam (TypedDict):
@@ -334,6 +372,7 @@ class VideoSearchParam (TypedDict):
class VideoResult (TypedDict): class VideoResult (TypedDict):
contentId: str contentId: str
userId: int | None
title: str title: str
tags: str tags: str
description: str | None description: str | None
@@ -342,64 +381,12 @@ class VideoResult (TypedDict):
class CommentResult (TypedDict): class CommentResult (TypedDict):
id: str
no: int no: int
vposMs: int
body: str
commands: list[str]
userId: str userId: str
isPremium: bool body: str
score: int
postedAt: str postedAt: str
nicoruCount: int nicoruCount: int
nicoruId: Any vposMs: int
source: str
isMyPost: bool
class CommentRow (TypedDict):
id: int
video_id: int
comment_no: int
user_id: int
content: str
posted_at: datetime
nico_count: int
vpos_ms: int | None
class TagRow (TypedDict):
id: int
name: str
class UserRow (TypedDict):
id: int
code: str
class VideoRow (TypedDict):
id: int
code: str
title: str
description: str
uploaded_at: datetime
deleted_at: datetime | None
class VideoHistoryRow (TypedDict):
id: int
video_id: int
fetched_at: date
views_count: int
class VideoTagRow (TypedDict):
id: int
video_id: int
tag_id: int
tagged_at: date
untagged_at: date | None
if __name__ == '__main__': if __name__ == '__main__':