20 コミット

作成者 SHA1 メッセージ 日付
みてるぞ 495c1381c7 #22 インポート漏れ修正 2026-04-24 23:08:11 +09:00
みてるぞ 1074f09b96 #22 2026-04-24 09:46:34 +00:00
みてるぞ 2b706f1247 #22 2026-04-24 09:33:17 +00:00
みてるぞ cb72b8dd99 削除フラグが誤って付与されるバグ修正(#20) (#21)
#20

#20

#20

#020

Co-authored-by: miteruzo <miteruzo@naver.com>
Reviewed-on: #21
2026-04-11 05:13:29 +09:00
みてるぞ b2adf62090 投稿者情報追加(#17) (#18)
#17

#17

Co-authored-by: miteruzo <miteruzo@naver.com>
Reviewed-on: #18
2026-03-05 21:03:16 +09:00
みてるぞ a3d9d0bfd7 feat: タグと関係なしに追跡する動画リスト追加 (#16)
#15 タグ関係なく追跡する動画リスト

Co-authored-by: miteruzo <miteruzo@naver.com>
Reviewed-on: #16
2026-01-01 14:00:11 +09:00
みてるぞ f44637d274 コメント取得追加 2025-10-26 05:07:07 +09:00
みてるぞ f809e9faae キリ番修正 2025-10-26 01:19:15 +09:00
みてるぞ de8fd8634a キリ番修正 2025-10-26 01:07:15 +09:00
みてるぞ 88be511f6e キリ番修正 2025-10-26 01:06:41 +09:00
みてるぞ ea339f1ec9 キリ番修正 2025-10-26 01:05:48 +09:00
みてるぞ 06328a89b2 キリ番追加 2025-10-26 00:45:29 +09:00
みてるぞ 463e8bbec7 config に移した. 2025-10-22 23:25:49 +09:00
みてるぞ baa75d68ba ゑぐぃバグ修正 2025-08-15 02:51:06 +09:00
みてるぞ 48e51f97d0 'update_db.py' を更新 2025-07-19 19:33:37 +09:00
みてるぞ c5204383ed #13 2025-07-01 23:53:11 +09:00
みてるぞ bf36d05ed3 #13 2025-07-01 23:48:30 +09:00
みてるぞ c9bd6fdfa7 #13 2025-07-01 23:39:57 +09:00
みてるぞ b2f5f81ca8 型定義追加 2024-11-06 03:56:28 +09:00
みてるぞ 67b76e6dd4 #12 2024-11-05 12:34:10 +09:00
12個のファイルの変更647行の追加250行の削除
+25
ファイルの表示
@@ -0,0 +1,25 @@
from __future__ import annotations
import os
from typing import TypedDict
from eloquent import DatabaseManager, Model # type: ignore
CONFIG: dict[str, DbConfig] = { 'mysql': { 'driver': 'mysql',
'host': 'localhost',
'database': 'nizika_nico',
'user': os.environ['MYSQL_USER'],
'password': os.environ['MYSQL_PASS'],
'prefix': '' } }
DB = DatabaseManager (CONFIG)
Model.set_connection_resolver (DB)
class DbConfig (TypedDict):
driver: str
host: str
database: str
user: str
password: str
prefix: str
+145
ファイルの表示
@@ -0,0 +1,145 @@
# pylint: disable = missing-class-docstring
# pylint: disable = missing-function-docstring
# pylint: disable = missing-module-docstring
# pylint: disable = unused-argument
from __future__ import annotations
from typing import Any, Generic, Type, TypeVar, overload
from typing_extensions import Self
_ModelT = TypeVar ('_ModelT', bound = 'Model')
class Connection:
def select (self, query: str, bindings: dict[str, Any] | None = None) -> Any: ...
def insert (self, query: str, bindings: dict[str, Any] | None = None) -> int: ...
def update (self, query: str, bindings: dict[str, Any] | None = None) -> int: ...
def delete (self, query: str, bindings: dict[str, Any] | None = None) -> int: ...
def transaction (self, callback: Any) -> Any: ...
def begin_transaction (self) -> None: ...
def commit (self) -> None: ...
def rollback (self) -> None: ...
class ConnectionResolver:
def connection (self, name: str | None = None) -> Any: ...
def get_default_connection (self) -> str: ...
def set_default_connection (self, name: str) -> None: ...
class DatabaseManager:
connections: dict[str, Connection]
def __init__ (self, config: dict[str, Any]) -> None: ...
def connection (self, name: str | None = None) -> Connection: ...
def disconnect (self, name: str | None = None) -> None: ...
def reconnect (self, name: str | None = None) -> Connection: ...
def get_connections (self) -> dict[str, Connection]: ...
class Model:
id: int
_Model__exists: bool
def has_one (
self,
related_model: Type[_ModelT],
foreign_key: str | None = None,
) -> _ModelT: ...
def has_many (
self,
related_model: Type[_ModelT],
foreign_key: str | None = None,
) -> list[_ModelT]: ...
def belongs_to (
self,
related_model: Type[_ModelT],
foreign_key: str | None = None,
) -> _ModelT: ...
def belongs_to_many (
self,
related_model: Type[_ModelT],
foreign_key: str | None = None,
) -> list[_ModelT]: ...
def save (self) -> None: ...
def delete (self) -> None: ...
@classmethod
def find (cls, id_: int) -> Self | None: ...
@classmethod
def query (
cls,
) -> QueryBuilder[Self]: ...
@overload
@classmethod
def where (
cls,
field: str,
operator: str,
value: Any,
) -> QueryBuilder[Self]: ...
@overload
@classmethod
def where (cls, field: str, value: Any) -> QueryBuilder[Self]: ...
@classmethod
def where_not_in (
cls,
column: str,
values: list[Any] | tuple
) -> QueryBuilder[Self]: ...
@classmethod
def where_not_null (cls, field: str) -> QueryBuilder[Self]: ...
@classmethod
def max (cls, column: str) -> Any: ...
@classmethod
def set_connection_resolver (cls, resolver: DatabaseManager) -> None: ...
class QueryBuilder (Generic[_ModelT]):
def first (self) -> _ModelT | None: ...
def get (self) -> list[_ModelT]: ...
@overload
def where (
self,
field: str,
operator: str,
value: Any,
) -> QueryBuilder[_ModelT]: ...
@overload
def where (self, field: str, value: Any) -> QueryBuilder[_ModelT]: ...
def where_null (self, field: str) -> QueryBuilder[_ModelT]: ...
def max (self, column: str) -> Any: ...
def _load_relation (self, relation_name: str) -> QueryBuilder[_ModelT]: ...
+23 -1
ファイルの表示
@@ -9,7 +9,7 @@ from __future__ import annotations
from datetime import date, datetime
from my_eloquent import Model
from db.my_eloquent import Model
class Comment (Model):
@@ -58,6 +58,19 @@ class Tag (Model):
return self.has_many (VideoTag)
class TrackedVideo (Model):
id: int
code: str
__timestamps__ = False
def upsert (
self,
*args: str,
) -> None:
super ().upsert ('code')
class User (Model):
id: int
code: str
@@ -74,6 +87,7 @@ class User (Model):
class Video (Model):
id: int
code: str
user_id: int | None
title: str
description: str
uploaded_at: datetime
@@ -81,6 +95,14 @@ class Video (Model):
__timestamps__ = False
@property
def user (
self,
) -> User | None:
if self.user_id is None:
return None
return self.belongs_to (User)
@property
def video_histories (
self,
+23 -4
ファイルの表示
@@ -19,13 +19,32 @@ class Model (eloquent.Model):
self,
*args: str,
) -> None:
q = self.query ()
for arg in args:
q = q.where (arg, getattr (self, arg))
row = q.first ()
row = self._find_upsert_row (*args)
if row is not None:
self.id = row.id
# pylint: disable = invalid-name
# pylint: disable = attribute-defined-outside-init
self._Model__exists = True
self.save ()
return
try:
self.save ()
except Exception:
row = self._find_upsert_row (*args)
if row is None:
raise
self.id = row.id
# pylint: disable = invalid-name
# pylint: disable = attribute-defined-outside-init
self._Model__exists = True
self.save ()
def _find_upsert_row (
self,
*args: str,
):
q = self.query ()
for arg in args:
q = q.where (arg, getattr (self, arg))
return q.first ()
-140
ファイルの表示
@@ -1,140 +0,0 @@
# pylint: disable = missing-class-docstring
# pylint: disable = missing-function-docstring
# pylint: disable = missing-module-docstring
# pylint: disable = unused-argument
from __future__ import annotations
from typing import Any, Generic, Type, TypeVar, overload
from typing_extensions import Self
_ModelT = TypeVar ('_ModelT', bound = 'Model')
class Connection:
def select (self, query: str, bindings: dict[str, Any] | None = None) -> Any: ...
def insert (self, query: str, bindings: dict[str, Any] | None = None) -> int: ...
def update (self, query: str, bindings: dict[str, Any] | None = None) -> int: ...
def delete (self, query: str, bindings: dict[str, Any] | None = None) -> int: ...
def transaction (self, callback: Any) -> Any: ...
def begin_transaction (self) -> None: ...
def commit (self) -> None: ...
def rollback (self) -> None: ...
class ConnectionResolver:
def connection (self, name: str | None = None) -> Any: ...
def get_default_connection (self) -> str: ...
def set_default_connection (self, name: str) -> None: ...
class DatabaseManager:
connections: dict[str, Connection]
def __init__ (self, config: dict[str, Any]) -> None: ...
def connection (self, name: str | None = None) -> Connection: ...
def disconnect (self, name: str | None = None) -> None: ...
def reconnect (self, name: str | None = None) -> Connection: ...
def get_connections (self) -> dict[str, Connection]: ...
class Model:
id: int
_Model__exists: bool
def has_one (
self,
related_model: Type[_ModelT],
foreign_key: str | None = None,
) -> _ModelT: ...
def has_many (
self,
related_model: Type[_ModelT],
foreign_key: str | None = None,
) -> list[_ModelT]: ...
def belongs_to (
self,
related_model: Type[_ModelT],
foreign_key: str | None = None,
) -> _ModelT: ...
def belongs_to_many (
self,
related_model: Type[_ModelT],
foreign_key: str | None = None,
) -> list[_ModelT]: ...
def save (self) -> None: ...
def delete (self) -> None: ...
@classmethod
def find (cls, id_: int) -> Self | None: ...
@classmethod
def query (
cls,
) -> QueryBuilder[Self]: ...
@overload
@classmethod
def where (
cls,
field: str,
operator: str,
value: Any,
) -> QueryBuilder[Self]: ...
@overload
@classmethod
def where (cls, field: str, value: Any) -> QueryBuilder[Self]: ...
@classmethod
def where_not_in (
cls,
column: str,
values: list[Any] | tuple
) -> QueryBuilder[Self]: ...
@classmethod
def where_not_null (cls, field: str) -> QueryBuilder[Self]: ...
@classmethod
def set_connection_resolver (cls, resolver: DatabaseManager) -> None: ...
class QueryBuilder (Generic[_ModelT]):
def first (self) -> _ModelT | None: ...
def get (self) -> list[_ModelT]: ...
@overload
def where (
self,
field: str,
operator: str,
value: Any,
) -> QueryBuilder[_ModelT]: ...
@overload
def where (self, field: str, value: Any) -> QueryBuilder[_ModelT]: ...
def where_null (self, field: str) -> QueryBuilder[_ModelT]: ...
def _load_relation (self, relation_name: str) -> QueryBuilder[_ModelT]: ...
シンボリックリンク
+1
ファイルの表示
@@ -0,0 +1 @@
db/eloquent.pyi
+55
ファイルの表示
@@ -0,0 +1,55 @@
# pylint: disable = missing-class-docstring
# pylint: disable = missing-function-docstring
"""
動画コードからコメントのリストを取得し,JSON 形式で出力する.
"""
from __future__ import annotations
import json
import sys
from datetime import datetime
from typing import TypedDict
from db.config import DB
from db.models import Video
DB
def main (
video_code: str,
) -> None:
video = Video.where ('code', video_code).first ()
if video:
comments: list[CommentDict] = []
for row in video.comments:
comment: CommentDict = {
'id': row.id,
'video_id': row.video_id,
'comment_no': row.comment_no,
'user_id': row.user_id,
'content': row.content,
'posted_at': row.posted_at,
'nico_count': row.nico_count,
'vpos_ms': row.vpos_ms }
comments.append (comment)
print (json.dumps (comments, default = str))
else:
print ('[]')
class CommentDict (TypedDict):
id: int
video_id: int
comment_no: int
user_id: int
content: str
posted_at: datetime
nico_count: int
vpos_ms: int
if __name__ == '__main__':
main (sys.argv[1])
+65
ファイルの表示
@@ -0,0 +1,65 @@
# pylint: disable = missing-class-docstring
# pylint: disable = missing-function-docstring
"""
動画履歴の情報を取得し,JSON 形式で出力する.
"""
from __future__ import annotations
import json
import sys
from datetime import date, datetime
from typing import cast
from db.config import DB
from db.models import Video, VideoHistory
DB
def main (
views_counts: list[int],
base_date: date,
) -> None:
kiriban_list: list[tuple[int, str, str]] = []
latest_fetched_at = cast (date | None,
(VideoHistory
.where ('fetched_at', '<=', base_date)
.max ('fetched_at')))
if latest_fetched_at is None:
print ('[]')
return
for views_count in views_counts:
targets = { vh.video.code for vh in (
VideoHistory
.where ('fetched_at', latest_fetched_at)
.where ('views_count', '>=', views_count)
.get ()) }
for code in targets:
if code in [kiriban[1] for kiriban in kiriban_list]:
continue
previous_views_count: int | None = (
VideoHistory
.where_has ('video', lambda q, code = code: q.where ('code', code))
.where ('fetched_at', '<', latest_fetched_at)
.max ('views_count'))
if previous_views_count is None:
previous_views_count = 0
if previous_views_count >= views_count:
continue
kiriban_list.append ((views_count, code,
(cast (Video, Video.where ('code', code).first ())
.uploaded_at)))
print (json.dumps (kiriban_list, default = str))
if __name__ == '__main__':
main (list (map (int, sys.argv[2:])),
datetime.strptime (sys.argv[1], '%Y-%m-%d').date ())
+52
ファイルの表示
@@ -0,0 +1,52 @@
# pylint: disable = missing-class-docstring
# pylint: disable = missing-function-docstring
"""
全動画の情報を取得し,JSON 形式で出力する.
"""
from __future__ import annotations
import json
from datetime import date, datetime
from typing import TypedDict
from db.config import DB
from db.models import Video
DB
def main (
) -> None:
videos: list[VideoDict] = []
for row in Video.all ():
deleted_at = row.deleted_at.date () if row.deleted_at else None
video: VideoDict = { 'id': row.id,
'code': row.code,
'user': getattr (row.user, 'code', None),
'title': row.title,
'description': row.description,
'tags': [],
'uploaded_at': row.uploaded_at,
'deleted_at': deleted_at }
for video_tag in row.video_tags:
if video_tag.untagged_at is None:
video['tags'].append (video_tag.tag.name)
videos.append (video)
print (json.dumps (videos, default = str))
class VideoDict (TypedDict):
id: int
code: str
user: str | None
title: str
description: str
tags: list[str]
uploaded_at: datetime
deleted_at: date | None
if __name__ == '__main__':
main ()
+2
ファイルの表示
@@ -0,0 +1,2 @@
CREATE TABLE `nizika_nico`.`tracked_videos` (`id` BIGINT NOT NULL AUTO_INCREMENT , `code` VARCHAR(16) NOT NULL COMMENT '動画コード' , PRIMARY KEY (`id`)) ENGINE = InnoDB COMMENT = '追跡対象動画';
ALTER TABLE `tracked_videos` ADD UNIQUE(`code`);
+3
ファイルの表示
@@ -0,0 +1,3 @@
ALTER TABLE `videos` ADD `user_id` BIGINT NULL DEFAULT NULL COMMENT 'ユーザ Id.' AFTER `code`;
ALTER TABLE `videos` ADD INDEX(`user_id`);
ALTER TABLE `videos` ADD FOREIGN KEY (`user_id`) REFERENCES `users`(`id`) ON DELETE RESTRICT ON UPDATE CASCADE;
+19
ファイルの表示
@@ -0,0 +1,19 @@
import sys
from db.config import DB
from db.models import TrackedVideo
DB
def main (
video_codes: list[str],
) -> None:
for code in video_codes:
tv = TrackedVideo ()
tv.code = code
tv.upsert ()
if __name__ == '__main__':
main (sys.argv[1:])
+209 -80
ファイルの表示
@@ -8,115 +8,157 @@
from __future__ import annotations
import json
import os
import logging
import random
import string
import time
import unicodedata
from datetime import datetime, timedelta
from datetime import date, datetime, timedelta
from typing import Any, TypedDict, cast
import jaconv
import requests
from eloquent import DatabaseManager, Model
from models import Comment, Tag, User, Video, VideoHistory, VideoTag
from db.config import DB
from db.models import (Comment,
Tag,
TrackedVideo,
User,
Video,
VideoHistory,
VideoTag)
logger = logging.getLogger (__name__)
logging.basicConfig (
level = logging.INFO,
format = '%(asctime)s %(levelname)s %(message)s')
def main (
) -> None:
config: dict[str, DbConfig] = { 'mysql': { 'driver': 'mysql',
'host': 'localhost',
'database': 'nizika_nico',
'user': os.environ['MYSQL_USER'],
'password': os.environ['MYSQL_PASS'],
'prefix': '' } }
db = DatabaseManager (config)
Model.set_connection_resolver (db)
now = datetime.now ()
today = now.date ()
api_data = search_nico_by_tags (['伊地知ニジカ', 'ぼざろクリーチャーシリーズ'])
update_tables (api_data, now)
search_result = search_nico_by_tags (['伊地知ニジカ',
'ぼざろクリーチャーシリーズ',
'ぼざろクリーチャーシリーズ外伝'])
comments_by_video_code = fetch_comments_by_video_code (search_result['videos'])
context: UpdateContext = { 'api_data': search_result['videos'],
'comments_by_video_code': comments_by_video_code,
'deletable': search_result['is_complete'] }
connection = DB.connection ()
connection.begin_transaction ()
try:
update_tables (context, now, today)
connection.commit ()
except Exception:
connection.rollback ()
raise
def update_tables (
api_data: list[VideoResult],
context: UpdateContext,
now: datetime,
today: date,
) -> None:
alive_video_codes: list[str] = []
for datum in api_data:
tag_names: list[str] = datum['tags'].split ()
for datum in context['api_data']:
tag_names = datum['tags'].split ()
normalised_tag_names = {normalise (tag_name) for tag_name in tag_names}
user: User | None = None
if datum['userId'] is not None:
user = User.where ('code', str (datum['userId'])).first ()
if user is None:
user = User ()
user.code = str (datum['userId'])
user.save ()
video = Video ()
video.code = datum['contentId']
video.user_id = user.id if user else None
video.title = datum['title']
video.description = datum['description'] or ''
video.uploaded_at = datetime.fromisoformat (datum['startTime'])
video.deleted_at = None
video.upsert ()
alive_video_codes.append (video.code)
video_history = VideoHistory ()
video_history.video_id = video.id
video_history.fetched_at = now
video_history.fetched_at = today
video_history.views_count = datum['viewCounter']
video_history.upsert ()
video_tags = [video_tag for video_tag in video.video_tags
if video_tag.untagged_at is not None]
tag: Tag | None
video_tag: VideoTag | None
if video_tag.untagged_at is None]
for video_tag in video_tags:
tag = video_tag.tag
if (tag is not None
and (normalise (tag.name) not in map (normalise, tag_names))):
video_tag.untagged_at = now
if tag is None:
continue
if normalise (tag.name) in normalised_tag_names:
continue
video_tag.untagged_at = today
video_tag.save ()
for tag_name in tag_names:
tag = Tag.where ('name', tag_name).first ()
if tag is None:
tag = Tag ()
tag.name = tag_name
tag.save ()
video_tag = (VideoTag.where ('video_id', video.id)
.where ('tag_id', tag.id)
.where_null ('untagged_at')
.first ())
if video_tag is None:
video_tag = VideoTag ()
video_tag.video_id = video.id
video_tag.tag_id = tag.id
video_tag.tagged_at = now
video_tag.tagged_at = getattr (video_tag, 'tagged_at', None) or today
video_tag.untagged_at = None
video_tag.save ()
for com in fetch_comments (video.code):
video_tag.upsert ()
for com in context['comments_by_video_code'].get (video.code, []):
user = User.where ('code', com['userId']).first ()
if user is None:
user = User ()
user.code = com['userId']
user.save ()
comment = Comment ()
comment.video_id = video.id
comment.comment_no = com['no']
comment.user_id = user.id
comment.content = com['body']
comment.posted_at = datetime.fromisoformat (com['postedAt'])
comment.nico_count = com['nicoruCount']
comment.vpos_ms = com['vposMs']
comment.nico_count = com.get ('nicoruCount', 0)
comment.vpos_ms = com.get ('vposMs', 0)
comment.upsert ()
# 削除動画
if not context['deletable']:
logger.warning ('skip soft-delete because the latest fetch was incomplete')
return
if not alive_video_codes:
logger.warning ('skip soft-delete because no alive videos were fetched')
return
videos = (Video.where_not_in ('code', alive_video_codes)
.where_null ('deleted_at')
.get ())
for video in videos:
if video.code not in alive_video_codes:
video.deleted_at = now
video.save ()
def fetch_comments (
def fetch_video_data (
video_code: str,
) -> list[CommentResult]:
) -> dict[str, Any]:
time.sleep (1.2)
headers = { 'X-Frontend-Id': '6',
@@ -128,15 +170,38 @@ def fetch_comments (
+ '_'
+ str (random.randrange (10 ** 12, 10 ** 13)))
url = (f"https://www.nicovideo.jp/api/watch/v3_guest/{ video_code }"
+ f"?actionTrackId={ action_track_id }")
url = (f'https://www.nicovideo.jp/api/watch/v3_guest/{ video_code }'
+ f'?actionTrackId={ action_track_id }')
res = requests.post (url, headers = headers, timeout = 60).json ()
return requests.post (url, headers = headers, timeout = 60).json ()
def fetch_comments_by_video_code (
videos: list[VideoResult],
) -> dict[str, list[CommentResult]]:
comments_by_video_code: dict[str, list[CommentResult]] = {}
for video in videos:
video_code = video['contentId']
try:
nv_comment = res['data']['comment']['nvComment']
except KeyError:
return []
comments_by_video_code[video_code] = fetch_comments (video_code)
except (KeyError,
TypeError,
ValueError,
requests.RequestException) as exc:
logger.warning ('failed to fetch comments: %s (%s)', video_code, exc)
comments_by_video_code[video_code] = []
return comments_by_video_code
def fetch_comments (
video_code: str,
) -> list[CommentResult]:
video_data = fetch_video_data (video_code)
nv_comment = (video_data.get ('data', {})
.get ('comment', {})
.get ('nvComment'))
if nv_comment is None:
return []
@@ -145,37 +210,68 @@ def fetch_comments (
'Content-Type': 'application/json' }
params = { 'params': nv_comment['params'],
'additionals': { },
'additionals': {},
'threadKey': nv_comment['threadKey'] }
url = nv_comment['server'] + '/v1/threads'
res = (requests.post (url, json.dumps (params),
response = requests.post (url,
json = params,
headers = headers,
timeout = 60)
.json ())
response.raise_for_status ()
res = response.json ()
try:
return res['data']['threads'][1]['comments']
except (IndexError, KeyError):
return select_comments_from_threads (res)
def select_comments_from_threads (
response: dict[str, Any],
) -> list[CommentResult]:
threads = response.get ('data', {}).get ('threads', [])
if not isinstance (threads, list):
return []
main_comments: list[CommentResult] = []
fallback_comments: list[CommentResult] = []
def search_nico_by_tag (
tag: str,
) -> list[VideoResult]:
return search_nico_by_tags ([tag])
for thread in threads:
comments = thread.get ('comments') if isinstance (thread, dict) else None
if not isinstance (comments, list):
continue
casted_comments = cast (list[CommentResult], comments)
if len (casted_comments) > len (fallback_comments):
fallback_comments = casted_comments
fork = str (thread.get ('fork', '')).lower ()
label = str (thread.get ('label', '')).lower ()
thread_id = str (thread.get ('id', '')).lower ()
if fork == 'main' or 'main' in label or 'main' in thread_id:
main_comments = casted_comments
selected_comments = main_comments or fallback_comments
deduped_comments: dict[int, CommentResult] = {}
for comment in selected_comments:
comment_no = comment.get ('no')
if not isinstance (comment_no, int):
continue
deduped_comments[comment_no] = comment
return [deduped_comments[comment_no]
for comment_no in sorted (deduped_comments)]
def search_nico_by_tags (
tags: list[str],
) -> list[VideoResult]:
) -> SearchNicoResult:
today = datetime.now ()
url = ('https://snapshot.search.nicovideo.jp'
+ '/api/v2/snapshot/video/contents/search')
result_data: list[VideoResult] = []
result_by_video_code: dict[str, VideoResult] = {}
is_complete = True
to = datetime (2022, 12, 3)
while to <= today:
time.sleep (1.2)
@@ -194,6 +290,7 @@ def search_nico_by_tags (
'targets': 'tagsExact',
'_sort': '-viewCounter',
'fields': ('contentId,'
'userId,'
'title,'
'tags,'
'description,'
@@ -201,23 +298,67 @@ def search_nico_by_tags (
'startTime'),
'_limit': 100,
'jsonFilter': query_filter }
res = requests.get (url, params = cast (dict[str, int | str], params), timeout = 60).json ()
try:
result_data += res['data']
except KeyError:
pass
response = requests.get (
url,
params = cast (dict[str, int | str], params),
timeout = 60)
response.raise_for_status ()
res = response.json ()
for datum in cast (list[VideoResult], res.get ('data', [])):
result_by_video_code[datum['contentId']] = datum
except (ValueError, requests.RequestException) as exc:
logger.warning ('snapshot fetch failed: %s - %s (%s)',
to.date (),
until.date (),
exc)
is_complete = False
to = until + timedelta (days = 1)
return result_data
for video in TrackedVideo.get ():
if video.code in result_by_video_code:
continue
try:
tracked_video = video
video_data = fetch_video_data (tracked_video.code)['data']
owner = video_data.get ('owner') or {}
video_info = video_data['video']
result_by_video_code[tracked_video.code] = {
'contentId': tracked_video.code,
'userId': owner.get ('id'),
'title': video_info['title'],
'tags': ' '.join (map (lambda t: t['name'],
video_data['tag']['items'])),
'description': video_info['description'],
'viewCounter': video_info['count']['view'],
'startTime': video_info['registeredAt'] }
except (KeyError,
TypeError,
ValueError,
requests.RequestException) as exc:
logger.warning ('tracked video fetch failed: %s (%s)', video.code, exc)
is_complete = False
return { 'videos': list (result_by_video_code.values ()),
'is_complete': is_complete }
class DbConfig (TypedDict):
driver: str
host: str
database: str
user: str
password: str
prefix: str
def normalise (
text: str,
) -> str:
return jaconv.hira2kata (
unicodedata.normalize ('NFKC', text.strip ())).lower ()
class SearchNicoResult (TypedDict):
videos: list['VideoResult']
is_complete: bool
class UpdateContext (TypedDict):
api_data: list['VideoResult']
comments_by_video_code: dict[str, list['CommentResult']]
deletable: bool
class VideoSearchParam (TypedDict):
@@ -231,6 +372,7 @@ class VideoSearchParam (TypedDict):
class VideoResult (TypedDict):
contentId: str
userId: int | None
title: str
tags: str
description: str | None
@@ -239,25 +381,12 @@ class VideoResult (TypedDict):
class CommentResult (TypedDict):
id: str
no: int
vposMs: int
body: str
commands: list[str]
userId: str
isPremium: bool
score: int
body: str
postedAt: str
nicoruCount: int
nicoruId: Any
source: str
isMyPost: bool
def normalise (
s: str,
) -> str:
return jaconv.hira2kata (unicodedata.normalize ('NFKC', s)).lower ()
vposMs: int
if __name__ == '__main__':