27 コミット

作成者 SHA1 メッセージ 日付
みてるぞ e6f58e621d 追跡対象動画バルク UPSERT (#22) (#23)
#22 インポート漏れ修正

#22

#22

Co-authored-by: miteruzo <miteruzo@naver.com>
Reviewed-on: #23
2026-04-24 23:25:34 +09:00
みてるぞ cb72b8dd99 削除フラグが誤って付与されるバグ修正(#20) (#21)
#20

#20

#20

#020

Co-authored-by: miteruzo <miteruzo@naver.com>
Reviewed-on: #21
2026-04-11 05:13:29 +09:00
みてるぞ b2adf62090 投稿者情報追加(#17) (#18)
#17

#17

Co-authored-by: miteruzo <miteruzo@naver.com>
Reviewed-on: #18
2026-03-05 21:03:16 +09:00
みてるぞ a3d9d0bfd7 feat: タグと関係なしに追跡する動画リスト追加 (#16)
#15 タグ関係なく追跡する動画リスト

Co-authored-by: miteruzo <miteruzo@naver.com>
Reviewed-on: #16
2026-01-01 14:00:11 +09:00
みてるぞ f44637d274 コメント取得追加 2025-10-26 05:07:07 +09:00
みてるぞ f809e9faae キリ番修正 2025-10-26 01:19:15 +09:00
みてるぞ de8fd8634a キリ番修正 2025-10-26 01:07:15 +09:00
みてるぞ 88be511f6e キリ番修正 2025-10-26 01:06:41 +09:00
みてるぞ ea339f1ec9 キリ番修正 2025-10-26 01:05:48 +09:00
みてるぞ 06328a89b2 キリ番追加 2025-10-26 00:45:29 +09:00
みてるぞ 463e8bbec7 config に移した. 2025-10-22 23:25:49 +09:00
みてるぞ baa75d68ba ゑぐぃバグ修正 2025-08-15 02:51:06 +09:00
みてるぞ 48e51f97d0 'update_db.py' を更新 2025-07-19 19:33:37 +09:00
みてるぞ c5204383ed #13 2025-07-01 23:53:11 +09:00
みてるぞ bf36d05ed3 #13 2025-07-01 23:48:30 +09:00
みてるぞ c9bd6fdfa7 #13 2025-07-01 23:39:57 +09:00
みてるぞ b2f5f81ca8 型定義追加 2024-11-06 03:56:28 +09:00
みてるぞ 67b76e6dd4 #12 2024-11-05 12:34:10 +09:00
みてるぞ 6a5e6dfade ごみを削除 2024-10-17 18:59:25 +09:00
みてるぞ 6e99da7326 Python 3.10 に適合したのと __pycache__ 除外 2024-10-17 18:57:03 +09:00
みてるぞ 53ba658319 #8 の対応 2024-10-17 12:49:13 +09:00
みてるぞ 067c90890e video_histories への書込みを Upsert に 2024-10-16 22:40:42 +09:00
みてるぞ db14af1a73 もろもろ修正 2024-10-16 22:17:35 +09:00
みてるぞ da17333a80 typing.Self は Python 3.10 には未実装か,さぅか. 2024-10-16 22:16:04 +09:00
みてるぞ ee971997ad #6 の対応 2024-10-16 22:08:07 +09:00
みてるぞ b448335851 もろもろのモロヘイヤ 2024-10-16 20:20:50 +09:00
みてるぞ 9f810b23f0 Upsert 問題の修正とその他無駄を排除 2024-10-15 23:01:03 +09:00
14個のファイルの変更838行の追加370行の削除
+1
ファイルの表示
@@ -0,0 +1 @@
__pycache__
+25
ファイルの表示
@@ -0,0 +1,25 @@
from __future__ import annotations
import os
from typing import TypedDict
from eloquent import DatabaseManager, Model # type: ignore
CONFIG: dict[str, DbConfig] = { 'mysql': { 'driver': 'mysql',
'host': 'localhost',
'database': 'nizika_nico',
'user': os.environ['MYSQL_USER'],
'password': os.environ['MYSQL_PASS'],
'prefix': '' } }
DB = DatabaseManager (CONFIG)
Model.set_connection_resolver (DB)
class DbConfig (TypedDict):
driver: str
host: str
database: str
user: str
password: str
prefix: str
+145
ファイルの表示
@@ -0,0 +1,145 @@
# pylint: disable = missing-class-docstring
# pylint: disable = missing-function-docstring
# pylint: disable = missing-module-docstring
# pylint: disable = unused-argument
from __future__ import annotations
from typing import Any, Generic, Type, TypeVar, overload
from typing_extensions import Self
_ModelT = TypeVar ('_ModelT', bound = 'Model')
class Connection:
def select (self, query: str, bindings: dict[str, Any] | None = None) -> Any: ...
def insert (self, query: str, bindings: dict[str, Any] | None = None) -> int: ...
def update (self, query: str, bindings: dict[str, Any] | None = None) -> int: ...
def delete (self, query: str, bindings: dict[str, Any] | None = None) -> int: ...
def transaction (self, callback: Any) -> Any: ...
def begin_transaction (self) -> None: ...
def commit (self) -> None: ...
def rollback (self) -> None: ...
class ConnectionResolver:
def connection (self, name: str | None = None) -> Any: ...
def get_default_connection (self) -> str: ...
def set_default_connection (self, name: str) -> None: ...
class DatabaseManager:
connections: dict[str, Connection]
def __init__ (self, config: dict[str, Any]) -> None: ...
def connection (self, name: str | None = None) -> Connection: ...
def disconnect (self, name: str | None = None) -> None: ...
def reconnect (self, name: str | None = None) -> Connection: ...
def get_connections (self) -> dict[str, Connection]: ...
class Model:
id: int
_Model__exists: bool
def has_one (
self,
related_model: Type[_ModelT],
foreign_key: str | None = None,
) -> _ModelT: ...
def has_many (
self,
related_model: Type[_ModelT],
foreign_key: str | None = None,
) -> list[_ModelT]: ...
def belongs_to (
self,
related_model: Type[_ModelT],
foreign_key: str | None = None,
) -> _ModelT: ...
def belongs_to_many (
self,
related_model: Type[_ModelT],
foreign_key: str | None = None,
) -> list[_ModelT]: ...
def save (self) -> None: ...
def delete (self) -> None: ...
@classmethod
def find (cls, id_: int) -> Self | None: ...
@classmethod
def query (
cls,
) -> QueryBuilder[Self]: ...
@overload
@classmethod
def where (
cls,
field: str,
operator: str,
value: Any,
) -> QueryBuilder[Self]: ...
@overload
@classmethod
def where (cls, field: str, value: Any) -> QueryBuilder[Self]: ...
@classmethod
def where_not_in (
cls,
column: str,
values: list[Any] | tuple
) -> QueryBuilder[Self]: ...
@classmethod
def where_not_null (cls, field: str) -> QueryBuilder[Self]: ...
@classmethod
def max (cls, column: str) -> Any: ...
@classmethod
def set_connection_resolver (cls, resolver: DatabaseManager) -> None: ...
class QueryBuilder (Generic[_ModelT]):
def first (self) -> _ModelT | None: ...
def get (self) -> list[_ModelT]: ...
@overload
def where (
self,
field: str,
operator: str,
value: Any,
) -> QueryBuilder[_ModelT]: ...
@overload
def where (self, field: str, value: Any) -> QueryBuilder[_ModelT]: ...
def where_null (self, field: str) -> QueryBuilder[_ModelT]: ...
def max (self, column: str) -> Any: ...
def _load_relation (self, relation_name: str) -> QueryBuilder[_ModelT]: ...
+177
ファイルの表示
@@ -0,0 +1,177 @@
# pylint: disable = missing-class-docstring
# pylint: disable = missing-function-docstring
"""
ぼざクリ DB の構成
"""
from __future__ import annotations
from datetime import date, datetime
from db.my_eloquent import Model
class Comment (Model):
# pylint: disable = too-many-instance-attributes
id: int
video_id: int
comment_no: int
user_id: int
content: str
posted_at: datetime
nico_count: int
vpos_ms: int
__timestamps__ = False
@property
def video (
self,
) -> Video:
return self.belongs_to (Video)
@property
def user (
self,
) -> User:
return self.belongs_to (User)
def upsert (
self,
*args: str,
) -> None:
super ().upsert ('video_id', 'comment_no')
class Tag (Model):
id: int
name: str
__timestamps__ = False
@property
def video_tags (
self,
) -> list[VideoTag]:
return self.has_many (VideoTag)
class TrackedVideo (Model):
id: int
code: str
__timestamps__ = False
def upsert (
self,
*args: str,
) -> None:
super ().upsert ('code')
class User (Model):
id: int
code: str
__timestamps__ = False
@property
def comments (
self,
) -> list[Comment]:
return self.has_many (Comment)
class Video (Model):
id: int
code: str
user_id: int | None
title: str
description: str
uploaded_at: datetime
deleted_at: datetime | None
__timestamps__ = False
@property
def user (
self,
) -> User | None:
if self.user_id is None:
return None
return self.belongs_to (User)
@property
def video_histories (
self,
) -> list[VideoHistory]:
return self.has_many (VideoHistory)
@property
def video_tags (
self,
) -> list[VideoTag]:
return self.has_many (VideoTag)
@property
def comments (
self,
) -> list[Comment]:
return self.has_many (Comment)
def upsert (
self,
*args: str,
) -> None:
super ().upsert ('code')
class VideoHistory (Model):
id: int
video_id: int
fetched_at: date
views_count: int
__timestamps__ = False
@property
def video (
self,
) -> Video:
return self.belongs_to (Video)
def upsert (
self,
*args: str,
) -> None:
super ().upsert ('video_id', 'fetched_at')
class VideoTag (Model):
id: int
video_id: int
tag_id: int
tagged_at: date
untagged_at: date | None
__timestamps__ = False
@property
def video (
self,
) -> Video:
return self.belongs_to (Video)
@property
def tag (
self,
) -> Tag:
return self.belongs_to (Tag)
def upsert (
self,
*args: str,
) -> None:
super ().upsert ('video_id', 'tag_id')
+50
ファイルの表示
@@ -0,0 +1,50 @@
# pylint: disable = missing-class-docstring
# pylint: disable = missing-function-docstring
"""
みてるぞ式魔改造(言ふほどか?)版 Eloquent
"""
import eloquent
class DatabaseManager (eloquent.DatabaseManager):
pass
class Model (eloquent.Model):
id: int
def upsert (
self,
*args: str,
) -> None:
row = self._find_upsert_row (*args)
if row is not None:
self.id = row.id
# pylint: disable = invalid-name
# pylint: disable = attribute-defined-outside-init
self._Model__exists = True
self.save ()
return
try:
self.save ()
except Exception:
row = self._find_upsert_row (*args)
if row is None:
raise
self.id = row.id
# pylint: disable = invalid-name
# pylint: disable = attribute-defined-outside-init
self._Model__exists = True
self.save ()
def _find_upsert_row (
self,
*args: str,
):
q = self.query ()
for arg in args:
q = q.where (arg, getattr (self, arg))
return q.first ()
-113
ファイルの表示
@@ -1,113 +0,0 @@
from __future__ import annotations
from typing import Any, Generic, Type, TypeVar, overload
from eloquent.orm.relations.dynamic_property import DynamicProperty
_TModel = TypeVar ('_TModel', bound = 'Model')
class Connection:
def select (self, query: str, bindings: dict[str, Any] | None = None) -> Any: ...
def insert (self, query: str, bindings: dict[str, Any] | None = None) -> int: ...
def update (self, query: str, bindings: dict[str, Any] | None = None) -> int: ...
def delete (self, query: str, bindings: dict[str, Any] | None = None) -> int: ...
def transaction (self, callback: Any) -> Any: ...
def begin_transaction (self) -> None: ...
def commit (self) -> None: ...
def rollback (self) -> None: ...
class ConnectionResolver:
def connection (self, name: str | None = None) -> Any: ...
def get_default_connection (self) -> str: ...
def set_default_connection (self, name: str) -> None: ...
class DatabaseManager:
connections: dict[str, Connection]
def __init__ (self, config: dict[str, Any]) -> None: ...
def connection (self, name: str | None = None) -> Connection: ...
def disconnect (self, name: str | None = None) -> None: ...
def reconnect (self, name: str | None = None) -> Connection: ...
def get_connections (self) -> dict[str, Connection]: ...
class Model:
id: int
def has_one (
self,
related_model: Type[Model],
foreign_key: str | None = None,
) -> DynamicProperty: ...
def has_many (
self,
related_model: Type[Model],
foreign_key: str | None = None,
) -> DynamicProperty: ...
def belongs_to (
self,
related_model: Type[Model],
foreign_key: str | None = None,
) -> DynamicProperty: ...
def belongs_to_many (
self,
related_model: Type[Model],
foreign_key: str | None = None,
) -> DynamicProperty: ...
def save (self) -> None: ...
def delete (self) -> None: ...
@classmethod
def find (cls, id: int) -> Model | None: ...
@overload
@classmethod
def where (cls, field: str, operator: str, value: Any) -> QueryBuilder: ...
@overload
@classmethod
def where (cls, field: str, value: Any) -> QueryBuilder: ...
@classmethod
def where_not_in (cls, column: str, values: list[Any] | tuple) -> QueryBuilder: ...
@classmethod
def where_not_null (cls, field: str) -> QueryBuilder: ...
@classmethod
def set_connection_resolver (cls, resolver: DatabaseManager) -> None: ...
class QueryBuilder (Generic[_TModel]):
def first (self) -> _TModel | None: ...
def get (self) -> list[_TModel]: ...
@overload
def where (self, field: str, operator: str, value: Any) -> QueryBuilder: ...
@overload
def where (self, field: str, value: Any) -> QueryBuilder: ...
def where_null (self, field: str) -> QueryBuilder: ...
シンボリックリンク
+1
ファイルの表示
@@ -0,0 +1 @@
db/eloquent.pyi
-4
ファイルの表示
@@ -1,4 +0,0 @@
from eloquent import Model
class DynamicProperty (Model): ...
+55
ファイルの表示
@@ -0,0 +1,55 @@
# pylint: disable = missing-class-docstring
# pylint: disable = missing-function-docstring
"""
動画コードからコメントのリストを取得し,JSON 形式で出力する.
"""
from __future__ import annotations
import json
import sys
from datetime import datetime
from typing import TypedDict
from db.config import DB
from db.models import Video
DB
def main (
video_code: str,
) -> None:
video = Video.where ('code', video_code).first ()
if video:
comments: list[CommentDict] = []
for row in video.comments:
comment: CommentDict = {
'id': row.id,
'video_id': row.video_id,
'comment_no': row.comment_no,
'user_id': row.user_id,
'content': row.content,
'posted_at': row.posted_at,
'nico_count': row.nico_count,
'vpos_ms': row.vpos_ms }
comments.append (comment)
print (json.dumps (comments, default = str))
else:
print ('[]')
class CommentDict (TypedDict):
id: int
video_id: int
comment_no: int
user_id: int
content: str
posted_at: datetime
nico_count: int
vpos_ms: int
if __name__ == '__main__':
main (sys.argv[1])
+65
ファイルの表示
@@ -0,0 +1,65 @@
# pylint: disable = missing-class-docstring
# pylint: disable = missing-function-docstring
"""
動画履歴の情報を取得し,JSON 形式で出力する.
"""
from __future__ import annotations
import json
import sys
from datetime import date, datetime
from typing import cast
from db.config import DB
from db.models import Video, VideoHistory
DB
def main (
views_counts: list[int],
base_date: date,
) -> None:
kiriban_list: list[tuple[int, str, str]] = []
latest_fetched_at = cast (date | None,
(VideoHistory
.where ('fetched_at', '<=', base_date)
.max ('fetched_at')))
if latest_fetched_at is None:
print ('[]')
return
for views_count in views_counts:
targets = { vh.video.code for vh in (
VideoHistory
.where ('fetched_at', latest_fetched_at)
.where ('views_count', '>=', views_count)
.get ()) }
for code in targets:
if code in [kiriban[1] for kiriban in kiriban_list]:
continue
previous_views_count: int | None = (
VideoHistory
.where_has ('video', lambda q, code = code: q.where ('code', code))
.where ('fetched_at', '<', latest_fetched_at)
.max ('views_count'))
if previous_views_count is None:
previous_views_count = 0
if previous_views_count >= views_count:
continue
kiriban_list.append ((views_count, code,
(cast (Video, Video.where ('code', code).first ())
.uploaded_at)))
print (json.dumps (kiriban_list, default = str))
if __name__ == '__main__':
main (list (map (int, sys.argv[2:])),
datetime.strptime (sys.argv[1], '%Y-%m-%d').date ())
+52
ファイルの表示
@@ -0,0 +1,52 @@
# pylint: disable = missing-class-docstring
# pylint: disable = missing-function-docstring
"""
全動画の情報を取得し,JSON 形式で出力する.
"""
from __future__ import annotations
import json
from datetime import date, datetime
from typing import TypedDict
from db.config import DB
from db.models import Video
DB
def main (
) -> None:
videos: list[VideoDict] = []
for row in Video.all ():
deleted_at = row.deleted_at.date () if row.deleted_at else None
video: VideoDict = { 'id': row.id,
'code': row.code,
'user': getattr (row.user, 'code', None),
'title': row.title,
'description': row.description,
'tags': [],
'uploaded_at': row.uploaded_at,
'deleted_at': deleted_at }
for video_tag in row.video_tags:
if video_tag.untagged_at is None:
video['tags'].append (video_tag.tag.name)
videos.append (video)
print (json.dumps (videos, default = str))
class VideoDict (TypedDict):
id: int
code: str
user: str | None
title: str
description: str
tags: list[str]
uploaded_at: datetime
deleted_at: date | None
if __name__ == '__main__':
main ()
+2
ファイルの表示
@@ -0,0 +1,2 @@
CREATE TABLE `nizika_nico`.`tracked_videos` (`id` BIGINT NOT NULL AUTO_INCREMENT , `code` VARCHAR(16) NOT NULL COMMENT '動画コード' , PRIMARY KEY (`id`)) ENGINE = InnoDB COMMENT = '追跡対象動画';
ALTER TABLE `tracked_videos` ADD UNIQUE(`code`);
+3
ファイルの表示
@@ -0,0 +1,3 @@
ALTER TABLE `videos` ADD `user_id` BIGINT NULL DEFAULT NULL COMMENT 'ユーザ Id.' AFTER `code`;
ALTER TABLE `videos` ADD INDEX(`user_id`);
ALTER TABLE `videos` ADD FOREIGN KEY (`user_id`) REFERENCES `users`(`id`) ON DELETE RESTRICT ON UPDATE CASCADE;
+19
ファイルの表示
@@ -0,0 +1,19 @@
import sys
from db.config import DB
from db.models import TrackedVideo
DB
def main (
video_codes: list[str],
) -> None:
for code in video_codes:
tv = TrackedVideo ()
tv.code = code
tv.upsert ()
if __name__ == '__main__':
main (sys.argv[1:])
+217 -227
ファイルの表示
@@ -1,3 +1,6 @@
# pylint: disable = missing-class-docstring
# pylint: disable = missing-function-docstring
"""
日次で実行し,ぼざクリ DB を最新に更新する.
"""
@@ -5,111 +8,157 @@
from __future__ import annotations
import json
import os
import logging
import random
import string
import time
import unicodedata
from dataclasses import dataclass
from datetime import date, datetime, timedelta
from typing import Any, Type, TypedDict, cast
from typing import Any, TypedDict, cast
import jaconv
import requests
from eloquent import DatabaseManager, Model
from eloquent.orm.relations.dynamic_property import DynamicProperty
config: dict[str, DbConfig] = { 'mysql': { 'driver': 'mysql',
'host': 'localhost',
'database': 'nizika_nico',
'user': os.environ['MYSQL_USER'],
'password': os.environ['MYSQL_PASS'],
'prefix': '' } }
db = DatabaseManager (config)
Model.set_connection_resolver (db)
from db.config import DB
from db.models import (Comment,
Tag,
TrackedVideo,
User,
Video,
VideoHistory,
VideoTag)
logger = logging.getLogger (__name__)
logging.basicConfig (
level = logging.INFO,
format = '%(asctime)s %(levelname)s %(message)s')
def main (
) -> None:
now = datetime.now ()
today = now.date ()
api_data = search_nico_by_tags (['伊地知ニジカ', 'ぼざろクリーチャーシリーズ'])
update_tables (api_data, now)
search_result = search_nico_by_tags (['伊地知ニジカ',
'ぼざろクリーチャーシリーズ',
'ぼざろクリーチャーシリーズ外伝'])
comments_by_video_code = fetch_comments_by_video_code (search_result['videos'])
context: UpdateContext = { 'api_data': search_result['videos'],
'comments_by_video_code': comments_by_video_code,
'deletable': search_result['is_complete'] }
connection = DB.connection ()
connection.begin_transaction ()
try:
update_tables (context, now, today)
connection.commit ()
except Exception:
connection.rollback ()
raise
def update_tables (
api_data: list[VideoResult],
context: UpdateContext,
now: datetime,
today: date,
) -> None:
alive_video_codes: list[str] = []
for datum in api_data:
tag_names: list[str] = datum['tags'].split ()
for datum in context['api_data']:
tag_names = datum['tags'].split ()
normalised_tag_names = {normalise (tag_name) for tag_name in tag_names}
user: User | None = None
if datum['userId'] is not None:
user = User.where ('code', str (datum['userId'])).first ()
if user is None:
user = User ()
user.code = str (datum['userId'])
user.save ()
video = Video ()
video.code = datum['contentId']
video.user_id = user.id if user else None
video.title = datum['title']
video.description = datum['description'] or ''
video.uploaded_at = datetime.fromisoformat (datum['startTime'])
video.deleted_at = None
video.upsert ()
alive_video_codes.append (video.code)
video_history = VideoHistory ()
video_history.video_id = video.id
video_history.fetched_at = now
video_history.fetched_at = today
video_history.views_count = datum['viewCounter']
video_history.save ()
video_tags = video.video_tags.where_not_null ('untagged_at').get ()
video_history.upsert ()
video_tags = [video_tag for video_tag in video.video_tags
if video_tag.untagged_at is None]
for video_tag in video_tags:
tag = video_tag.tag
if (tag is not None
and (normalise (tag.name) not in map (normalise, tag_names))):
video_tag.untagged_at = now
if tag is None:
continue
if normalise (tag.name) in normalised_tag_names:
continue
video_tag.untagged_at = today
video_tag.save ()
for tag_name in tag_names:
tag = Tag.where ('name', tag_name).first ()
if tag is None:
tag = Tag ()
tag.name = tag_name
tag.save ()
video_tag = (Video.where ('video_id', video.id)
video_tag = (VideoTag.where ('video_id', video.id)
.where ('tag_id', tag.id)
.where_null ('untagged_at')
.first ())
if video_tag is None:
video_tag = VideoTag ()
video_tag.video_id = video.id
video_tag.tag_id = tag.id
video_tag.tagged_at = now
video_tag.tagged_at = getattr (video_tag, 'tagged_at', None) or today
video_tag.untagged_at = None
video_tag.save ()
for com in fetch_comments (video.code):
video_tag.upsert ()
for com in context['comments_by_video_code'].get (video.code, []):
user = User.where ('code', com['userId']).first ()
if user is None:
user = User ()
user.code = com['userId']
user.save ()
comment = Comment ()
comment.video_id = video.id
comment.comment_no = com['no']
comment.user_id = user.id
comment.content = com['body']
comment.posted_at = datetime.fromisoformat (com['postedAt'])
comment.nico_count = com['nicoruCount']
comment.vpos_ms = com['vposMs']
comment.nico_count = com.get ('nicoruCount', 0)
comment.vpos_ms = com.get ('vposMs', 0)
comment.upsert ()
# 削除動画
if not context['deletable']:
logger.warning ('skip soft-delete because the latest fetch was incomplete')
return
if not alive_video_codes:
logger.warning ('skip soft-delete because no alive videos were fetched')
return
videos = (Video.where_not_in ('code', alive_video_codes)
.where_null ('deleted_at')
.get ())
for video in videos:
if video.code not in alive_video_codes:
video.deleted_at = now
video.save ()
def fetch_comments (
def fetch_video_data (
video_code: str,
) -> list[CommentResult]:
) -> dict[str, Any]:
time.sleep (1.2)
headers = { 'X-Frontend-Id': '6',
@@ -121,15 +170,38 @@ def fetch_comments (
+ '_'
+ str (random.randrange (10 ** 12, 10 ** 13)))
url = (f"https://www.nicovideo.jp/api/watch/v3_guest/{ video_code }"
+ f"?actionTrackId={ action_track_id }")
url = (f'https://www.nicovideo.jp/api/watch/v3_guest/{ video_code }'
+ f'?actionTrackId={ action_track_id }')
res = requests.post (url, headers = headers, timeout = 60).json ()
return requests.post (url, headers = headers, timeout = 60).json ()
def fetch_comments_by_video_code (
videos: list[VideoResult],
) -> dict[str, list[CommentResult]]:
comments_by_video_code: dict[str, list[CommentResult]] = {}
for video in videos:
video_code = video['contentId']
try:
nv_comment = res['data']['comment']['nvComment']
except KeyError:
return []
comments_by_video_code[video_code] = fetch_comments (video_code)
except (KeyError,
TypeError,
ValueError,
requests.RequestException) as exc:
logger.warning ('failed to fetch comments: %s (%s)', video_code, exc)
comments_by_video_code[video_code] = []
return comments_by_video_code
def fetch_comments (
video_code: str,
) -> list[CommentResult]:
video_data = fetch_video_data (video_code)
nv_comment = (video_data.get ('data', {})
.get ('comment', {})
.get ('nvComment'))
if nv_comment is None:
return []
@@ -138,52 +210,87 @@ def fetch_comments (
'Content-Type': 'application/json' }
params = { 'params': nv_comment['params'],
'additionals': { },
'additionals': {},
'threadKey': nv_comment['threadKey'] }
url = nv_comment['server'] + '/v1/threads'
res = (requests.post (url, json.dumps (params),
response = requests.post (url,
json = params,
headers = headers,
timeout = 60)
.json ())
response.raise_for_status ()
res = response.json ()
try:
return res['data']['threads'][1]['comments']
except (IndexError, KeyError):
return select_comments_from_threads (res)
def select_comments_from_threads (
response: dict[str, Any],
) -> list[CommentResult]:
threads = response.get ('data', {}).get ('threads', [])
if not isinstance (threads, list):
return []
main_comments: list[CommentResult] = []
fallback_comments: list[CommentResult] = []
def search_nico_by_tag (
tag: str,
) -> list[VideoResult]:
return search_nico_by_tags ([tag])
for thread in threads:
comments = thread.get ('comments') if isinstance (thread, dict) else None
if not isinstance (comments, list):
continue
casted_comments = cast (list[CommentResult], comments)
if len (casted_comments) > len (fallback_comments):
fallback_comments = casted_comments
fork = str (thread.get ('fork', '')).lower ()
label = str (thread.get ('label', '')).lower ()
thread_id = str (thread.get ('id', '')).lower ()
if fork == 'main' or 'main' in label or 'main' in thread_id:
main_comments = casted_comments
selected_comments = main_comments or fallback_comments
deduped_comments: dict[int, CommentResult] = {}
for comment in selected_comments:
comment_no = comment.get ('no')
if not isinstance (comment_no, int):
continue
deduped_comments[comment_no] = comment
return [deduped_comments[comment_no]
for comment_no in sorted (deduped_comments)]
def search_nico_by_tags (
tags: list[str],
) -> list[VideoResult]:
) -> SearchNicoResult:
today = datetime.now ()
url = ('https://snapshot.search.nicovideo.jp'
+ '/api/v2/snapshot/video/contents/search')
result_data: list[VideoResult] = []
result_by_video_code: dict[str, VideoResult] = {}
is_complete = True
to = datetime (2022, 12, 3)
while to <= today:
time.sleep (1.2)
until = to + timedelta (days = 14)
# pylint: disable = consider-using-f-string
query_filter = json.dumps ({ 'type': 'or',
'filters': [
{ 'type': 'range',
'field': 'startTime',
'from': '%04d-%02d-%02dT00:00:00+09:00' % (to.year, to.month, to.day),
'to': '%04d-%02d-%02dT23:59:59+09:00' % (until.year, until.month, until.day),
'from': ('%04d-%02d-%02dT00:00:00+09:00'
% (to.year, to.month, to.day)),
'to': ('%04d-%02d-%02dT23:59:59+09:00'
% (until.year, until.month, until.day)),
'include_lower': True }] })
params: VideoSearchParam = { 'q': ' OR '.join (tags),
'targets': 'tagsExact',
'_sort': '-viewCounter',
'fields': ('contentId,'
'userId,'
'title,'
'tags,'
'description,'
@@ -191,172 +298,67 @@ def search_nico_by_tags (
'startTime'),
'_limit': 100,
'jsonFilter': query_filter }
res = requests.get (url, params = cast (dict[str, int | str], params), timeout = 60).json ()
try:
result_data += res['data']
except KeyError:
pass
response = requests.get (
url,
params = cast (dict[str, int | str], params),
timeout = 60)
response.raise_for_status ()
res = response.json ()
for datum in cast (list[VideoResult], res.get ('data', [])):
result_by_video_code[datum['contentId']] = datum
except (ValueError, requests.RequestException) as exc:
logger.warning ('snapshot fetch failed: %s - %s (%s)',
to.date (),
until.date (),
exc)
is_complete = False
to = until + timedelta (days = 1)
return result_data
for video in TrackedVideo.get ():
if video.code in result_by_video_code:
continue
try:
tracked_video = video
video_data = fetch_video_data (tracked_video.code)['data']
owner = video_data.get ('owner') or {}
video_info = video_data['video']
result_by_video_code[tracked_video.code] = {
'contentId': tracked_video.code,
'userId': owner.get ('id'),
'title': video_info['title'],
'tags': ' '.join (map (lambda t: t['name'],
video_data['tag']['items'])),
'description': video_info['description'],
'viewCounter': video_info['count']['view'],
'startTime': video_info['registeredAt'] }
except (KeyError,
TypeError,
ValueError,
requests.RequestException) as exc:
logger.warning ('tracked video fetch failed: %s (%s)', video.code, exc)
is_complete = False
return { 'videos': list (result_by_video_code.values ()),
'is_complete': is_complete }
class Comment (Model):
__timestamps__ = False
video_id: int
comment_no: int
user_id: int
content: str
posted_at: datetime
nico_count: int
vpos_ms: int
@property
def video (
self,
) -> DynamicProperty:
return self.belongs_to (Video)
@property
def user (
self,
) -> DynamicProperty:
return self.belongs_to (User)
def upsert (
self,
) -> None:
row = (Comment.where ('video_id', self.video_id)
.where ('comment_no', self.comment_no)
.first ())
if row is not None:
self.id = row.id
self.save ()
def normalise (
text: str,
) -> str:
return jaconv.hira2kata (
unicodedata.normalize ('NFKC', text.strip ())).lower ()
class Tag (Model):
__timestamps__ = False
name: str
@property
def video_tags (
self,
) -> DynamicProperty:
return self.has_many (VideoTag)
class SearchNicoResult (TypedDict):
videos: list['VideoResult']
is_complete: bool
class User (Model):
__timestamps__ = False
code: str
@property
def comments (
self,
) -> DynamicProperty:
return self.has_many (Comment)
class Video (Model):
__timestamps__ = False
code: str
title: str
description: str
uploaded_at: datetime
deleted_at: datetime | None
@property
def video_histories (
self,
) -> DynamicProperty:
return self.has_many (VideoHistory)
@property
def video_tags (
self,
) -> DynamicProperty:
return self.has_many (VideoTag)
@property
def comments (
self,
) -> DynamicProperty:
return self.has_many (Comment)
def upsert (
self,
) -> None:
row = Video.where ('code', self.code).first ()
if row is not None:
self.id = row.id
self.save ()
class VideoHistory (Model):
__timestamps__ = False
video_id: int
fetched_at: date
views_count: int
@property
def video (
self,
) -> DynamicProperty:
return self.belongs_to (Video)
def upsert (
self,
) -> None:
row = (VideoHistory.where ('video_id', self.video_id)
.where ('fetched_at', self.fetched_at)
.first ())
if row is not None:
self.id = row.id
self.save ()
class VideoTag (Model):
__timestamps__ = False
video_id: int
tag_id: int
tagged_at: date
untagged_at: date | None
@property
def video (
self,
) -> DynamicProperty:
return self.belongs_to (Video)
@property
def tag (
self,
) -> DynamicProperty:
return self.belongs_to (Tag)
def upsert (
self,
) -> None:
row = (VideoTag.where ('video_id', self.video_id)
.where ('tag_id', self.tag_id)
.first ())
if row is not None:
self.id = row.id
self.save ()
class DbConfig (TypedDict):
driver: str
host: str
database: str
user: str
password: str
prefix: str
class UpdateContext (TypedDict):
api_data: list['VideoResult']
comments_by_video_code: dict[str, list['CommentResult']]
deletable: bool
class VideoSearchParam (TypedDict):
@@ -370,6 +372,7 @@ class VideoSearchParam (TypedDict):
class VideoResult (TypedDict):
contentId: str
userId: int | None
title: str
tags: str
description: str | None
@@ -378,25 +381,12 @@ class VideoResult (TypedDict):
class CommentResult (TypedDict):
id: str
no: int
vposMs: int
body: str
commands: list[str]
userId: str
isPremium: bool
score: int
body: str
postedAt: str
nicoruCount: int
nicoruId: Any
source: str
isMyPost: bool
def normalise (
s: str,
) -> str:
return unicodedata.normalize ('NFKC', s).lower ()
vposMs: int
if __name__ == '__main__':