19 コミット

作成者 SHA1 メッセージ 日付
みてるぞ f34ff2a0f6 #20 2026-03-29 17:58:13 +09:00
みてるぞ 0ddf29cfd8 #20 2026-03-29 16:40:38 +09:00
みてるぞ af4cc8d618 #20 2026-03-29 16:17:02 +09:00
みてるぞ 516d5ebfa6 #020 2026-03-29 15:16:51 +09:00
みてるぞ b2adf62090 投稿者情報追加(#17) (#18)
#17

#17

Co-authored-by: miteruzo <miteruzo@naver.com>
Reviewed-on: #18
2026-03-05 21:03:16 +09:00
みてるぞ a3d9d0bfd7 feat: タグと関係なしに追跡する動画リスト追加 (#16)
#15 タグ関係なく追跡する動画リスト

Co-authored-by: miteruzo <miteruzo@naver.com>
Reviewed-on: #16
2026-01-01 14:00:11 +09:00
みてるぞ f44637d274 コメント取得追加 2025-10-26 05:07:07 +09:00
みてるぞ f809e9faae キリ番修正 2025-10-26 01:19:15 +09:00
みてるぞ de8fd8634a キリ番修正 2025-10-26 01:07:15 +09:00
みてるぞ 88be511f6e キリ番修正 2025-10-26 01:06:41 +09:00
みてるぞ ea339f1ec9 キリ番修正 2025-10-26 01:05:48 +09:00
みてるぞ 06328a89b2 キリ番追加 2025-10-26 00:45:29 +09:00
みてるぞ 463e8bbec7 config に移した. 2025-10-22 23:25:49 +09:00
みてるぞ baa75d68ba ゑぐぃバグ修正 2025-08-15 02:51:06 +09:00
みてるぞ 48e51f97d0 'update_db.py' を更新 2025-07-19 19:33:37 +09:00
みてるぞ c5204383ed #13 2025-07-01 23:53:11 +09:00
みてるぞ bf36d05ed3 #13 2025-07-01 23:48:30 +09:00
みてるぞ c9bd6fdfa7 #13 2025-07-01 23:39:57 +09:00
みてるぞ b2f5f81ca8 型定義追加 2024-11-06 03:56:28 +09:00
10個のファイルの変更480行の追加109行の削除
+25
ファイルの表示
@@ -0,0 +1,25 @@
from __future__ import annotations
import os
from typing import TypedDict
from eloquent import DatabaseManager, Model # type: ignore
CONFIG: dict[str, DbConfig] = { 'mysql': { 'driver': 'mysql',
'host': 'localhost',
'database': 'nizika_nico',
'user': os.environ['MYSQL_USER'],
'password': os.environ['MYSQL_PASS'],
'prefix': '' } }
DB = DatabaseManager (CONFIG)
Model.set_connection_resolver (DB)
class DbConfig (TypedDict):
driver: str
host: str
database: str
user: str
password: str
prefix: str
+5
ファイルの表示
@@ -115,6 +115,9 @@ class Model:
@classmethod @classmethod
def where_not_null (cls, field: str) -> QueryBuilder[Self]: ... def where_not_null (cls, field: str) -> QueryBuilder[Self]: ...
@classmethod
def max (cls, column: str) -> Any: ...
@classmethod @classmethod
def set_connection_resolver (cls, resolver: DatabaseManager) -> None: ... def set_connection_resolver (cls, resolver: DatabaseManager) -> None: ...
@@ -137,4 +140,6 @@ class QueryBuilder (Generic[_ModelT]):
def where_null (self, field: str) -> QueryBuilder[_ModelT]: ... def where_null (self, field: str) -> QueryBuilder[_ModelT]: ...
def max (self, column: str) -> Any: ...
def _load_relation (self, relation_name: str) -> QueryBuilder[_ModelT]: ... def _load_relation (self, relation_name: str) -> QueryBuilder[_ModelT]: ...
+16
ファイルの表示
@@ -58,6 +58,13 @@ class Tag (Model):
return self.has_many (VideoTag) return self.has_many (VideoTag)
class TrackedVideo (Model):
id: int
code: str
__timestamps__ = False
class User (Model): class User (Model):
id: int id: int
code: str code: str
@@ -74,6 +81,7 @@ class User (Model):
class Video (Model): class Video (Model):
id: int id: int
code: str code: str
user_id: int | None
title: str title: str
description: str description: str
uploaded_at: datetime uploaded_at: datetime
@@ -81,6 +89,14 @@ class Video (Model):
__timestamps__ = False __timestamps__ = False
@property
def user (
self,
) -> User | None:
if self.user_id is None:
return None
return self.belongs_to (User)
@property @property
def video_histories ( def video_histories (
self, self,
+23 -4
ファイルの表示
@@ -19,13 +19,32 @@ class Model (eloquent.Model):
self, self,
*args: str, *args: str,
) -> None: ) -> None:
q = self.query () row = self._find_upsert_row (*args)
for arg in args:
q = q.where (arg, getattr (self, arg))
row = q.first ()
if row is not None: if row is not None:
self.id = row.id self.id = row.id
# pylint: disable = invalid-name # pylint: disable = invalid-name
# pylint: disable = attribute-defined-outside-init # pylint: disable = attribute-defined-outside-init
self._Model__exists = True self._Model__exists = True
self.save () self.save ()
return
try:
self.save ()
except Exception:
row = self._find_upsert_row (*args)
if row is None:
raise
self.id = row.id
# pylint: disable = invalid-name
# pylint: disable = attribute-defined-outside-init
self._Model__exists = True
self.save ()
def _find_upsert_row (
self,
*args: str,
):
q = self.query ()
for arg in args:
q = q.where (arg, getattr (self, arg))
return q.first ()
+55
ファイルの表示
@@ -0,0 +1,55 @@
# pylint: disable = missing-class-docstring
# pylint: disable = missing-function-docstring
"""
動画コードからコメントのリストを取得し,JSON 形式で出力する.
"""
from __future__ import annotations
import json
import sys
from datetime import datetime
from typing import TypedDict
from db.config import DB
from db.models import Video
DB
def main (
video_code: str,
) -> None:
video = Video.where ('code', video_code).first ()
if video:
comments: list[CommentDict] = []
for row in video.comments:
comment: CommentDict = {
'id': row.id,
'video_id': row.video_id,
'comment_no': row.comment_no,
'user_id': row.user_id,
'content': row.content,
'posted_at': row.posted_at,
'nico_count': row.nico_count,
'vpos_ms': row.vpos_ms }
comments.append (comment)
print (json.dumps (comments, default = str))
else:
print ('[]')
class CommentDict (TypedDict):
id: int
video_id: int
comment_no: int
user_id: int
content: str
posted_at: datetime
nico_count: int
vpos_ms: int
if __name__ == '__main__':
main (sys.argv[1])
+65
ファイルの表示
@@ -0,0 +1,65 @@
# pylint: disable = missing-class-docstring
# pylint: disable = missing-function-docstring
"""
動画履歴の情報を取得し,JSON 形式で出力する.
"""
from __future__ import annotations
import json
import sys
from datetime import date, datetime
from typing import cast
from db.config import DB
from db.models import Video, VideoHistory
DB
def main (
views_counts: list[int],
base_date: date,
) -> None:
kiriban_list: list[tuple[int, str, str]] = []
latest_fetched_at = cast (date | None,
(VideoHistory
.where ('fetched_at', '<=', base_date)
.max ('fetched_at')))
if latest_fetched_at is None:
print ('[]')
return
for views_count in views_counts:
targets = { vh.video.code for vh in (
VideoHistory
.where ('fetched_at', latest_fetched_at)
.where ('views_count', '>=', views_count)
.get ()) }
for code in targets:
if code in [kiriban[1] for kiriban in kiriban_list]:
continue
previous_views_count: int | None = (
VideoHistory
.where_has ('video', lambda q, code = code: q.where ('code', code))
.where ('fetched_at', '<', latest_fetched_at)
.max ('views_count'))
if previous_views_count is None:
previous_views_count = 0
if previous_views_count >= views_count:
continue
kiriban_list.append ((views_count, code,
(cast (Video, Video.where ('code', code).first ())
.uploaded_at)))
print (json.dumps (kiriban_list, default = str))
if __name__ == '__main__':
main (list (map (int, sys.argv[2:])),
datetime.strptime (sys.argv[1], '%Y-%m-%d').date ())
+52
ファイルの表示
@@ -0,0 +1,52 @@
# pylint: disable = missing-class-docstring
# pylint: disable = missing-function-docstring
"""
全動画の情報を取得し,JSON 形式で出力する.
"""
from __future__ import annotations
import json
from datetime import date, datetime
from typing import TypedDict
from db.config import DB
from db.models import Video
DB
def main (
) -> None:
videos: list[VideoDict] = []
for row in Video.all ():
deleted_at = row.deleted_at.date () if row.deleted_at else None
video: VideoDict = { 'id': row.id,
'code': row.code,
'user': getattr (row.user, 'code', None),
'title': row.title,
'description': row.description,
'tags': [],
'uploaded_at': row.uploaded_at,
'deleted_at': deleted_at }
for video_tag in row.video_tags:
if video_tag.untagged_at is None:
video['tags'].append (video_tag.tag.name)
videos.append (video)
print (json.dumps (videos, default = str))
class VideoDict (TypedDict):
id: int
code: str
user: str | None
title: str
description: str
tags: list[str]
uploaded_at: datetime
deleted_at: date | None
if __name__ == '__main__':
main ()
+2
ファイルの表示
@@ -0,0 +1,2 @@
CREATE TABLE `nizika_nico`.`tracked_videos` (`id` BIGINT NOT NULL AUTO_INCREMENT , `code` VARCHAR(16) NOT NULL COMMENT '動画コード' , PRIMARY KEY (`id`)) ENGINE = InnoDB COMMENT = '追跡対象動画';
ALTER TABLE `tracked_videos` ADD UNIQUE(`code`);
+3
ファイルの表示
@@ -0,0 +1,3 @@
ALTER TABLE `videos` ADD `user_id` BIGINT NULL DEFAULT NULL COMMENT 'ユーザ Id.' AFTER `code`;
ALTER TABLE `videos` ADD INDEX(`user_id`);
ALTER TABLE `videos` ADD FOREIGN KEY (`user_id`) REFERENCES `users`(`id`) ON DELETE RESTRICT ON UPDATE CASCADE;
+208 -79
ファイルの表示
@@ -8,115 +8,157 @@
from __future__ import annotations from __future__ import annotations
import json import json
import os import logging
import random import random
import string import string
import time import time
import unicodedata import unicodedata
from datetime import datetime, timedelta from datetime import date, datetime, timedelta
from typing import Any, TypedDict, cast from typing import Any, TypedDict, cast
import jaconv import jaconv
import requests import requests
from eloquent import DatabaseManager, Model
from db.models import Comment, Tag, User, Video, VideoHistory, VideoTag from db.config import DB
from db.models import (Comment,
Tag,
TrackedVideo,
User,
Video,
VideoHistory,
VideoTag)
logger = logging.getLogger (__name__)
logging.basicConfig (
level = logging.INFO,
format = '%(asctime)s %(levelname)s %(message)s')
def main ( def main (
) -> None: ) -> None:
config: dict[str, DbConfig] = { 'mysql': { 'driver': 'mysql',
'host': 'localhost',
'database': 'nizika_nico',
'user': os.environ['MYSQL_USER'],
'password': os.environ['MYSQL_PASS'],
'prefix': '' } }
db = DatabaseManager (config)
Model.set_connection_resolver (db)
now = datetime.now () now = datetime.now ()
today = now.date ()
api_data = search_nico_by_tags (['伊地知ニジカ', 'ぼざろクリーチャーシリーズ']) search_result = search_nico_by_tags (['伊地知ニジカ',
update_tables (api_data, now) 'ぼざろクリーチャーシリーズ',
'ぼざろクリーチャーシリーズ外伝'])
comments_by_video_code = fetch_comments_by_video_code (search_result['videos'])
context: UpdateContext = { 'api_data': search_result['videos'],
'comments_by_video_code': comments_by_video_code,
'deletable': search_result['is_complete'] }
connection = DB.connection ()
connection.begin_transaction ()
try:
update_tables (context, now, today)
connection.commit ()
except Exception:
connection.rollback ()
raise
def update_tables ( def update_tables (
api_data: list[VideoResult], context: UpdateContext,
now: datetime, now: datetime,
today: date,
) -> None: ) -> None:
alive_video_codes: list[str] = [] alive_video_codes: list[str] = []
for datum in api_data: for datum in context['api_data']:
tag_names: list[str] = datum['tags'].split () tag_names = datum['tags'].split ()
normalised_tag_names = {normalise (tag_name) for tag_name in tag_names}
user: User | None = None
if datum['userId'] is not None:
user = User.where ('code', str (datum['userId'])).first ()
if user is None:
user = User ()
user.code = str (datum['userId'])
user.save ()
video = Video () video = Video ()
video.code = datum['contentId'] video.code = datum['contentId']
video.user_id = user.id if user else None
video.title = datum['title'] video.title = datum['title']
video.description = datum['description'] or '' video.description = datum['description'] or ''
video.uploaded_at = datetime.fromisoformat (datum['startTime']) video.uploaded_at = datetime.fromisoformat (datum['startTime'])
video.deleted_at = None video.deleted_at = None
video.upsert () video.upsert ()
alive_video_codes.append (video.code) alive_video_codes.append (video.code)
video_history = VideoHistory () video_history = VideoHistory ()
video_history.video_id = video.id video_history.video_id = video.id
video_history.fetched_at = now video_history.fetched_at = today
video_history.views_count = datum['viewCounter'] video_history.views_count = datum['viewCounter']
video_history.upsert () video_history.upsert ()
video_tags = [video_tag for video_tag in video.video_tags video_tags = [video_tag for video_tag in video.video_tags
if video_tag.untagged_at is not None] if video_tag.untagged_at is None]
tag: Tag | None
video_tag: VideoTag | None
for video_tag in video_tags: for video_tag in video_tags:
tag = video_tag.tag tag = video_tag.tag
if (tag is not None if tag is None:
and (normalise (tag.name) not in map (normalise, tag_names))): continue
video_tag.untagged_at = now if normalise (tag.name) in normalised_tag_names:
continue
video_tag.untagged_at = today
video_tag.save () video_tag.save ()
for tag_name in tag_names: for tag_name in tag_names:
tag = Tag.where ('name', tag_name).first () tag = Tag.where ('name', tag_name).first ()
if tag is None: if tag is None:
tag = Tag () tag = Tag ()
tag.name = tag_name tag.name = tag_name
tag.save () tag.save ()
video_tag = (VideoTag.where ('video_id', video.id) video_tag = (VideoTag.where ('video_id', video.id)
.where ('tag_id', tag.id) .where ('tag_id', tag.id)
.where_null ('untagged_at')
.first ()) .first ())
if video_tag is None: if video_tag is None:
video_tag = VideoTag () video_tag = VideoTag ()
video_tag.video_id = video.id video_tag.video_id = video.id
video_tag.tag_id = tag.id video_tag.tag_id = tag.id
video_tag.tagged_at = now
video_tag.tagged_at = getattr (video_tag, 'tagged_at', None) or today
video_tag.untagged_at = None video_tag.untagged_at = None
video_tag.save () video_tag.upsert ()
for com in fetch_comments (video.code):
for com in context['comments_by_video_code'].get (video.code, []):
user = User.where ('code', com['userId']).first () user = User.where ('code', com['userId']).first ()
if user is None: if user is None:
user = User () user = User ()
user.code = com['userId'] user.code = com['userId']
user.save () user.save ()
comment = Comment () comment = Comment ()
comment.video_id = video.id comment.video_id = video.id
comment.comment_no = com['no'] comment.comment_no = com['no']
comment.user_id = user.id comment.user_id = user.id
comment.content = com['body'] comment.content = com['body']
comment.posted_at = datetime.fromisoformat (com['postedAt']) comment.posted_at = datetime.fromisoformat (com['postedAt'])
comment.nico_count = com['nicoruCount'] comment.nico_count = com.get ('nicoruCount', 0)
comment.vpos_ms = com['vposMs'] comment.vpos_ms = com.get ('vposMs', 0)
comment.upsert () comment.upsert ()
# 削除動画 if not context['deletable']:
logger.warning ('skip soft-delete because the latest fetch was incomplete')
return
if not alive_video_codes:
logger.warning ('skip soft-delete because no alive videos were fetched')
return
videos = (Video.where_not_in ('code', alive_video_codes) videos = (Video.where_not_in ('code', alive_video_codes)
.where_null ('deleted_at') .where_null ('deleted_at')
.get ()) .get ())
for video in videos: for video in videos:
if video.code not in alive_video_codes:
video.deleted_at = now video.deleted_at = now
video.save () video.save ()
def fetch_comments ( def fetch_video_data (
video_code: str, video_code: str,
) -> list[CommentResult]: ) -> dict[str, Any]:
time.sleep (1.2) time.sleep (1.2)
headers = { 'X-Frontend-Id': '6', headers = { 'X-Frontend-Id': '6',
@@ -128,15 +170,38 @@ def fetch_comments (
+ '_' + '_'
+ str (random.randrange (10 ** 12, 10 ** 13))) + str (random.randrange (10 ** 12, 10 ** 13)))
url = (f"https://www.nicovideo.jp/api/watch/v3_guest/{ video_code }" url = (f'https://www.nicovideo.jp/api/watch/v3_guest/{ video_code }'
+ f"?actionTrackId={ action_track_id }") + f'?actionTrackId={ action_track_id }')
res = requests.post (url, headers = headers, timeout = 60).json () return requests.post (url, headers = headers, timeout = 60).json ()
def fetch_comments_by_video_code (
videos: list[VideoResult],
) -> dict[str, list[CommentResult]]:
comments_by_video_code: dict[str, list[CommentResult]] = {}
for video in videos:
video_code = video['contentId']
try: try:
nv_comment = res['data']['comment']['nvComment'] comments_by_video_code[video_code] = fetch_comments (video_code)
except KeyError: except (KeyError,
return [] TypeError,
ValueError,
requests.RequestException) as exc:
logger.warning ('failed to fetch comments: %s (%s)', video_code, exc)
comments_by_video_code[video_code] = []
return comments_by_video_code
def fetch_comments (
video_code: str,
) -> list[CommentResult]:
video_data = fetch_video_data (video_code)
nv_comment = (video_data.get ('data', {})
.get ('comment', {})
.get ('nvComment'))
if nv_comment is None: if nv_comment is None:
return [] return []
@@ -150,32 +215,63 @@ def fetch_comments (
url = nv_comment['server'] + '/v1/threads' url = nv_comment['server'] + '/v1/threads'
res = (requests.post (url, json.dumps (params), response = requests.post (url,
json = params,
headers = headers, headers = headers,
timeout = 60) timeout = 60)
.json ()) response.raise_for_status ()
res = response.json ()
try: return select_comments_from_threads (res)
return res['data']['threads'][1]['comments']
except (IndexError, KeyError):
def select_comments_from_threads (
response: dict[str, Any],
) -> list[CommentResult]:
threads = response.get ('data', {}).get ('threads', [])
if not isinstance (threads, list):
return [] return []
main_comments: list[CommentResult] = []
fallback_comments: list[CommentResult] = []
def search_nico_by_tag ( for thread in threads:
tag: str, comments = thread.get ('comments') if isinstance (thread, dict) else None
) -> list[VideoResult]: if not isinstance (comments, list):
return search_nico_by_tags ([tag]) continue
casted_comments = cast (list[CommentResult], comments)
if len (casted_comments) > len (fallback_comments):
fallback_comments = casted_comments
fork = str (thread.get ('fork', '')).lower ()
label = str (thread.get ('label', '')).lower ()
thread_id = str (thread.get ('id', '')).lower ()
if fork == 'main' or 'main' in label or 'main' in thread_id:
main_comments = casted_comments
selected_comments = main_comments or fallback_comments
deduped_comments: dict[int, CommentResult] = {}
for comment in selected_comments:
comment_no = comment.get ('no')
if not isinstance (comment_no, int):
continue
deduped_comments[comment_no] = comment
return [deduped_comments[comment_no]
for comment_no in sorted (deduped_comments)]
def search_nico_by_tags ( def search_nico_by_tags (
tags: list[str], tags: list[str],
) -> list[VideoResult]: ) -> SearchNicoResult:
today = datetime.now () today = datetime.now ()
url = ('https://snapshot.search.nicovideo.jp' url = ('https://snapshot.search.nicovideo.jp'
+ '/api/v2/snapshot/video/contents/search') + '/api/v2/snapshot/video/contents/search')
result_data: list[VideoResult] = [] result_by_video_code: dict[str, VideoResult] = {}
is_complete = True
to = datetime (2022, 12, 3) to = datetime (2022, 12, 3)
while to <= today: while to <= today:
time.sleep (1.2) time.sleep (1.2)
@@ -194,6 +290,7 @@ def search_nico_by_tags (
'targets': 'tagsExact', 'targets': 'tagsExact',
'_sort': '-viewCounter', '_sort': '-viewCounter',
'fields': ('contentId,' 'fields': ('contentId,'
'userId,'
'title,' 'title,'
'tags,' 'tags,'
'description,' 'description,'
@@ -201,23 +298,67 @@ def search_nico_by_tags (
'startTime'), 'startTime'),
'_limit': 100, '_limit': 100,
'jsonFilter': query_filter } 'jsonFilter': query_filter }
res = requests.get (url, params = cast (dict[str, int | str], params), timeout = 60).json ()
try: try:
result_data += res['data'] response = requests.get (
except KeyError: url,
pass params = cast (dict[str, int | str], params),
timeout = 60)
response.raise_for_status ()
res = response.json ()
for datum in cast (list[VideoResult], res.get ('data', [])):
result_by_video_code[datum['contentId']] = datum
except (ValueError, requests.RequestException) as exc:
logger.warning ('snapshot fetch failed: %s - %s (%s)',
to.date (),
until.date (),
exc)
is_complete = False
to = until + timedelta (days = 1) to = until + timedelta (days = 1)
return result_data for video in TrackedVideo.get ():
if video.code in result_by_video_code:
continue
try:
tracked_video = video
video_data = fetch_video_data (tracked_video.code)['data']
owner = video_data.get ('owner') or {}
video_info = video_data['video']
result_by_video_code[tracked_video.code] = {
'contentId': tracked_video.code,
'userId': owner.get ('id'),
'title': video_info['title'],
'tags': ' '.join (map (lambda t: t['name'],
video_data['tag']['items'])),
'description': video_info['description'],
'viewCounter': video_info['count']['view'],
'startTime': video_info['registeredAt'] }
except (KeyError,
TypeError,
ValueError,
requests.RequestException) as exc:
logger.warning ('tracked video fetch failed: %s (%s)', video.code, exc)
is_complete = False
return { 'videos': list (result_by_video_code.values ()),
'is_complete': is_complete }
class DbConfig (TypedDict): def normalise (
driver: str text: str,
host: str ) -> str:
database: str return jaconv.hira2kata (
user: str unicodedata.normalize ('NFKC', text.strip ())).lower ()
password: str
prefix: str
class SearchNicoResult (TypedDict):
videos: list['VideoResult']
is_complete: bool
class UpdateContext (TypedDict):
api_data: list['VideoResult']
comments_by_video_code: dict[str, list['CommentResult']]
deletable: bool
class VideoSearchParam (TypedDict): class VideoSearchParam (TypedDict):
@@ -231,6 +372,7 @@ class VideoSearchParam (TypedDict):
class VideoResult (TypedDict): class VideoResult (TypedDict):
contentId: str contentId: str
userId: int | None
title: str title: str
tags: str tags: str
description: str | None description: str | None
@@ -239,25 +381,12 @@ class VideoResult (TypedDict):
class CommentResult (TypedDict): class CommentResult (TypedDict):
id: str
no: int no: int
vposMs: int
body: str
commands: list[str]
userId: str userId: str
isPremium: bool body: str
score: int
postedAt: str postedAt: str
nicoruCount: int nicoruCount: int
nicoruId: Any vposMs: int
source: str
isMyPost: bool
def normalise (
s: str,
) -> str:
return jaconv.hira2kata (unicodedata.normalize ('NFKC', s)).lower ()
if __name__ == '__main__': if __name__ == '__main__':