5 コミット

作成者 SHA1 メッセージ 日付
みてるぞ 495c1381c7 #22 インポート漏れ修正 2026-04-24 23:08:11 +09:00
みてるぞ 1074f09b96 #22 2026-04-24 09:46:34 +00:00
みてるぞ 2b706f1247 #22 2026-04-24 09:33:17 +00:00
みてるぞ cb72b8dd99 削除フラグが誤って付与されるバグ修正(#20) (#21)
#20

#20

#20

#020

Co-authored-by: miteruzo <miteruzo@naver.com>
Reviewed-on: #21
2026-04-11 05:13:29 +09:00
みてるぞ b2adf62090 投稿者情報追加(#17) (#18)
#17

#17

Co-authored-by: miteruzo <miteruzo@naver.com>
Reviewed-on: #18
2026-03-05 21:03:16 +09:00
8個のファイルの変更286行の追加129行の削除
+15
ファイルの表示
@@ -64,6 +64,12 @@ class TrackedVideo (Model):
__timestamps__ = False __timestamps__ = False
def upsert (
self,
*args: str,
) -> None:
super ().upsert ('code')
class User (Model): class User (Model):
id: int id: int
@@ -81,6 +87,7 @@ class User (Model):
class Video (Model): class Video (Model):
id: int id: int
code: str code: str
user_id: int | None
title: str title: str
description: str description: str
uploaded_at: datetime uploaded_at: datetime
@@ -88,6 +95,14 @@ class Video (Model):
__timestamps__ = False __timestamps__ = False
@property
def user (
self,
) -> User | None:
if self.user_id is None:
return None
return self.belongs_to (User)
@property @property
def video_histories ( def video_histories (
self, self,
+24 -5
ファイルの表示
@@ -19,13 +19,32 @@ class Model (eloquent.Model):
self, self,
*args: str, *args: str,
) -> None: ) -> None:
q = self.query () row = self._find_upsert_row (*args)
for arg in args:
q = q.where (arg, getattr (self, arg))
row = q.first ()
if row is not None: if row is not None:
self.id = row.id self.id = row.id
# pylint: disable = invalid-name # pylint: disable = invalid-name
# pylint: disable = attribute-defined-outside-init # pylint: disable = attribute-defined-outside-init
self._Model__exists = True self._Model__exists = True
self.save () self.save ()
return
try:
self.save ()
except Exception:
row = self._find_upsert_row (*args)
if row is None:
raise
self.id = row.id
# pylint: disable = invalid-name
# pylint: disable = attribute-defined-outside-init
self._Model__exists = True
self.save ()
def _find_upsert_row (
self,
*args: str,
):
q = self.query ()
for arg in args:
q = q.where (arg, getattr (self, arg))
return q.first ()
+2 -5
ファイルの表示
@@ -8,12 +8,9 @@
from __future__ import annotations from __future__ import annotations
import json import json
import os
import sys import sys
from datetime import date, datetime from datetime import datetime
from typing import TypedDict, cast from typing import TypedDict
from eloquent import DatabaseManager, Model
from db.config import DB from db.config import DB
from db.models import Video from db.models import Video
+9 -11
ファイルの表示
@@ -8,12 +8,9 @@
from __future__ import annotations from __future__ import annotations
import json import json
import os
import sys import sys
from datetime import date, datetime from datetime import date, datetime
from typing import TypedDict, cast from typing import cast
from eloquent import DatabaseManager, Model
from db.config import DB from db.config import DB
from db.models import Video, VideoHistory from db.models import Video, VideoHistory
@@ -25,14 +22,15 @@ def main (
views_counts: list[int], views_counts: list[int],
base_date: date, base_date: date,
) -> None: ) -> None:
if not base_date:
base_date = datetime.now ().date ()
kiriban_list: list[tuple[int, str, str]] = [] kiriban_list: list[tuple[int, str, str]] = []
latest_fetched_at = cast (date, (VideoHistory latest_fetched_at = cast (date | None,
.where ('fetched_at', '<=', base_date) (VideoHistory
.max ('fetched_at'))) .where ('fetched_at', '<=', base_date)
.max ('fetched_at')))
if latest_fetched_at is None:
print ('[]')
return
for views_count in views_counts: for views_count in views_counts:
targets = { vh.video.code for vh in ( targets = { vh.video.code for vh in (
@@ -63,5 +61,5 @@ def main (
if __name__ == '__main__': if __name__ == '__main__':
main (map (int, sys.argv[2:]), main (list (map (int, sys.argv[2:])),
datetime.strptime (sys.argv[1], '%Y-%m-%d').date ()) datetime.strptime (sys.argv[1], '%Y-%m-%d').date ())
+3 -14
ファイルの表示
@@ -8,12 +8,9 @@
from __future__ import annotations from __future__ import annotations
import json import json
import os
from datetime import date, datetime from datetime import date, datetime
from typing import TypedDict from typing import TypedDict
from eloquent import DatabaseManager, Model
from db.config import DB from db.config import DB
from db.models import Video from db.models import Video
@@ -27,6 +24,7 @@ def main (
deleted_at = row.deleted_at.date () if row.deleted_at else None deleted_at = row.deleted_at.date () if row.deleted_at else None
video: VideoDict = { 'id': row.id, video: VideoDict = { 'id': row.id,
'code': row.code, 'code': row.code,
'user': getattr (row.user, 'code', None),
'title': row.title, 'title': row.title,
'description': row.description, 'description': row.description,
'tags': [], 'tags': [],
@@ -35,23 +33,14 @@ def main (
for video_tag in row.video_tags: for video_tag in row.video_tags:
if video_tag.untagged_at is None: if video_tag.untagged_at is None:
video['tags'].append (video_tag.tag.name) video['tags'].append (video_tag.tag.name)
videos.append(video) videos.append (video)
print (json.dumps (videos, default = str)) print (json.dumps (videos, default = str))
class DbConfig (TypedDict):
driver: str
host: str
database: str
user: str
password: str
prefix: str
class VideoDict (TypedDict): class VideoDict (TypedDict):
id: int id: int
code: str code: str
user: str | None
title: str title: str
description: str description: str
tags: list[str] tags: list[str]
+3
ファイルの表示
@@ -0,0 +1,3 @@
ALTER TABLE `videos` ADD `user_id` BIGINT NULL DEFAULT NULL COMMENT 'ユーザ Id.' AFTER `code`;
ALTER TABLE `videos` ADD INDEX(`user_id`);
ALTER TABLE `videos` ADD FOREIGN KEY (`user_id`) REFERENCES `users`(`id`) ON DELETE RESTRICT ON UPDATE CASCADE;
+19
ファイルの表示
@@ -0,0 +1,19 @@
import sys
from db.config import DB
from db.models import TrackedVideo
DB
def main (
video_codes: list[str],
) -> None:
for code in video_codes:
tv = TrackedVideo ()
tv.code = code
tv.upsert ()
if __name__ == '__main__':
main (sys.argv[1:])
+211 -94
ファイルの表示
@@ -8,17 +8,16 @@
from __future__ import annotations from __future__ import annotations
import json import json
import os import logging
import random import random
import string import string
import time import time
import unicodedata import unicodedata
from datetime import datetime, timedelta from datetime import date, datetime, timedelta
from typing import Any, TypedDict, cast from typing import Any, TypedDict, cast
import jaconv import jaconv
import requests import requests
from eloquent import DatabaseManager, Model
from db.config import DB from db.config import DB
from db.models import (Comment, from db.models import (Comment,
@@ -29,96 +28,132 @@ from db.models import (Comment,
VideoHistory, VideoHistory,
VideoTag) VideoTag)
logger = logging.getLogger (__name__)
logging.basicConfig (
level = logging.INFO,
format = '%(asctime)s %(levelname)s %(message)s')
def main ( def main (
) -> None: ) -> None:
now = datetime.now () now = datetime.now ()
today = now.date ()
api_data = search_nico_by_tags (['伊地知ニジカ', search_result = search_nico_by_tags (['伊地知ニジカ',
'ぼざろクリーチャーシリーズ', 'ぼざろクリーチャーシリーズ',
'ぼざろクリーチャーシリーズ外伝']) 'ぼざろクリーチャーシリーズ外伝'])
comments_by_video_code = fetch_comments_by_video_code (search_result['videos'])
DB.begin_transaction () context: UpdateContext = { 'api_data': search_result['videos'],
'comments_by_video_code': comments_by_video_code,
'deletable': search_result['is_complete'] }
connection = DB.connection ()
connection.begin_transaction ()
try: try:
update_tables (api_data, now) update_tables (context, now, today)
DB.commit () connection.commit ()
except Exception: except Exception:
DB.rollback () connection.rollback ()
raise raise
def update_tables ( def update_tables (
api_data: list[VideoResult], context: UpdateContext,
now: datetime, now: datetime,
today: date,
) -> None: ) -> None:
alive_video_codes: list[str] = [] alive_video_codes: list[str] = []
for datum in api_data: for datum in context['api_data']:
tag_names: list[str] = datum['tags'].split () tag_names = datum['tags'].split ()
normalised_tag_names = {normalise (tag_name) for tag_name in tag_names}
user: User | None = None
if datum['userId'] is not None:
user = User.where ('code', str (datum['userId'])).first ()
if user is None:
user = User ()
user.code = str (datum['userId'])
user.save ()
video = Video () video = Video ()
video.code = datum['contentId'] video.code = datum['contentId']
video.user_id = user.id if user else None
video.title = datum['title'] video.title = datum['title']
video.description = datum['description'] or '' video.description = datum['description'] or ''
video.uploaded_at = datetime.fromisoformat (datum['startTime']) video.uploaded_at = datetime.fromisoformat (datum['startTime'])
video.deleted_at = None video.deleted_at = None
video.upsert () video.upsert ()
alive_video_codes.append (video.code) alive_video_codes.append (video.code)
video_history = VideoHistory () video_history = VideoHistory ()
video_history.video_id = video.id video_history.video_id = video.id
video_history.fetched_at = now video_history.fetched_at = today
video_history.views_count = datum['viewCounter'] video_history.views_count = datum['viewCounter']
video_history.upsert () video_history.upsert ()
video_tags = [video_tag for video_tag in video.video_tags video_tags = [video_tag for video_tag in video.video_tags
if video_tag.untagged_at is None] if video_tag.untagged_at is None]
tag: Tag | None
video_tag: VideoTag | None
for video_tag in video_tags: for video_tag in video_tags:
tag = video_tag.tag tag = video_tag.tag
if (tag is not None if tag is None:
and (normalise (tag.name) not in map (normalise, tag_names))): continue
video_tag.untagged_at = now if normalise (tag.name) in normalised_tag_names:
video_tag.save () continue
video_tag.untagged_at = today
video_tag.save ()
for tag_name in tag_names: for tag_name in tag_names:
tag = Tag.where ('name', tag_name).first () tag = Tag.where ('name', tag_name).first ()
if tag is None: if tag is None:
tag = Tag () tag = Tag ()
tag.name = tag_name tag.name = tag_name
tag.save () tag.save ()
video_tag = (VideoTag.where ('video_id', video.id) video_tag = (VideoTag.where ('video_id', video.id)
.where ('tag_id', tag.id) .where ('tag_id', tag.id)
.where_null ('untagged_at')
.first ()) .first ())
if video_tag is None: if video_tag is None:
video_tag = VideoTag () video_tag = VideoTag ()
video_tag.video_id = video.id video_tag.video_id = video.id
video_tag.tag_id = tag.id video_tag.tag_id = tag.id
video_tag.tagged_at = now
video_tag.untagged_at = None video_tag.tagged_at = getattr (video_tag, 'tagged_at', None) or today
video_tag.save () video_tag.untagged_at = None
for com in fetch_comments (video.code): video_tag.upsert ()
for com in context['comments_by_video_code'].get (video.code, []):
user = User.where ('code', com['userId']).first () user = User.where ('code', com['userId']).first ()
if user is None: if user is None:
user = User () user = User ()
user.code = com['userId'] user.code = com['userId']
user.save () user.save ()
comment = Comment () comment = Comment ()
comment.video_id = video.id comment.video_id = video.id
comment.comment_no = com['no'] comment.comment_no = com['no']
comment.user_id = user.id comment.user_id = user.id
comment.content = com['body'] comment.content = com['body']
comment.posted_at = datetime.fromisoformat (com['postedAt']) comment.posted_at = datetime.fromisoformat (com['postedAt'])
comment.nico_count = com['nicoruCount'] comment.nico_count = com.get ('nicoruCount', 0)
comment.vpos_ms = com['vposMs'] comment.vpos_ms = com.get ('vposMs', 0)
comment.upsert () comment.upsert ()
# 削除動画 if not context['deletable']:
logger.warning ('skip soft-delete because the latest fetch was incomplete')
return
if not alive_video_codes:
logger.warning ('skip soft-delete because no alive videos were fetched')
return
videos = (Video.where_not_in ('code', alive_video_codes) videos = (Video.where_not_in ('code', alive_video_codes)
.where_null ('deleted_at') .where_null ('deleted_at')
.get ()) .get ())
for video in videos: for video in videos:
if video.code not in alive_video_codes: video.deleted_at = now
video.deleted_at = now video.save ()
video.save ()
def fetch_video_data ( def fetch_video_data (
@@ -135,52 +170,108 @@ def fetch_video_data (
+ '_' + '_'
+ str (random.randrange (10 ** 12, 10 ** 13))) + str (random.randrange (10 ** 12, 10 ** 13)))
url = (f"https://www.nicovideo.jp/api/watch/v3_guest/{ video_code }" url = (f'https://www.nicovideo.jp/api/watch/v3_guest/{ video_code }'
+ f"?actionTrackId={ action_track_id }") + f'?actionTrackId={ action_track_id }')
return requests.post (url, headers = headers, timeout = 60).json () return requests.post (url, headers = headers, timeout = 60).json ()
def fetch_comments_by_video_code (
videos: list[VideoResult],
) -> dict[str, list[CommentResult]]:
comments_by_video_code: dict[str, list[CommentResult]] = {}
for video in videos:
video_code = video['contentId']
try:
comments_by_video_code[video_code] = fetch_comments (video_code)
except (KeyError,
TypeError,
ValueError,
requests.RequestException) as exc:
logger.warning ('failed to fetch comments: %s (%s)', video_code, exc)
comments_by_video_code[video_code] = []
return comments_by_video_code
def fetch_comments ( def fetch_comments (
video_code: str, video_code: str,
) -> list[CommentResult]: ) -> list[CommentResult]:
try: video_data = fetch_video_data (video_code)
nv_comment = fetch_video_data (video_code)['data']['comment']['nvComment'] nv_comment = (video_data.get ('data', {})
except KeyError: .get ('comment', {})
return [] .get ('nvComment'))
if nv_comment is None: if nv_comment is None:
return [] return []
headers = { 'X-Frontend-Id': '6', headers = { 'X-Frontend-Id': '6',
'X-Frontend-Version': '0', 'X-Frontend-Version': '0',
'Content-Type': 'application/json' } 'Content-Type': 'application/json' }
params = { 'params': nv_comment['params'], params = { 'params': nv_comment['params'],
'additionals': { }, 'additionals': {},
'threadKey': nv_comment['threadKey'] } 'threadKey': nv_comment['threadKey'] }
url = nv_comment['server'] + '/v1/threads' url = nv_comment['server'] + '/v1/threads'
res = (requests.post (url, json.dumps (params), response = requests.post (url,
headers = headers, json = params,
timeout = 60) headers = headers,
.json ()) timeout = 60)
response.raise_for_status ()
res = response.json ()
try: return select_comments_from_threads (res)
return res['data']['threads'][1]['comments']
except (IndexError, KeyError):
def select_comments_from_threads (
response: dict[str, Any],
) -> list[CommentResult]:
threads = response.get ('data', {}).get ('threads', [])
if not isinstance (threads, list):
return [] return []
main_comments: list[CommentResult] = []
fallback_comments: list[CommentResult] = []
for thread in threads:
comments = thread.get ('comments') if isinstance (thread, dict) else None
if not isinstance (comments, list):
continue
casted_comments = cast (list[CommentResult], comments)
if len (casted_comments) > len (fallback_comments):
fallback_comments = casted_comments
fork = str (thread.get ('fork', '')).lower ()
label = str (thread.get ('label', '')).lower ()
thread_id = str (thread.get ('id', '')).lower ()
if fork == 'main' or 'main' in label or 'main' in thread_id:
main_comments = casted_comments
selected_comments = main_comments or fallback_comments
deduped_comments: dict[int, CommentResult] = {}
for comment in selected_comments:
comment_no = comment.get ('no')
if not isinstance (comment_no, int):
continue
deduped_comments[comment_no] = comment
return [deduped_comments[comment_no]
for comment_no in sorted (deduped_comments)]
def search_nico_by_tags ( def search_nico_by_tags (
tags: list[str], tags: list[str],
) -> list[VideoResult]: ) -> SearchNicoResult:
today = datetime.now () today = datetime.now ()
url = ('https://snapshot.search.nicovideo.jp' url = ('https://snapshot.search.nicovideo.jp'
+ '/api/v2/snapshot/video/contents/search') + '/api/v2/snapshot/video/contents/search')
result_data: list[VideoResult] = [] result_by_video_code: dict[str, VideoResult] = {}
is_complete = True
to = datetime (2022, 12, 3) to = datetime (2022, 12, 3)
while to <= today: while to <= today:
time.sleep (1.2) time.sleep (1.2)
@@ -199,6 +290,7 @@ def search_nico_by_tags (
'targets': 'tagsExact', 'targets': 'tagsExact',
'_sort': '-viewCounter', '_sort': '-viewCounter',
'fields': ('contentId,' 'fields': ('contentId,'
'userId,'
'title,' 'title,'
'tags,' 'tags,'
'description,' 'description,'
@@ -206,30 +298,67 @@ def search_nico_by_tags (
'startTime'), 'startTime'),
'_limit': 100, '_limit': 100,
'jsonFilter': query_filter } 'jsonFilter': query_filter }
res = requests.get (url, params = cast (dict[str, int | str], params), timeout = 60).json ()
try: try:
result_data += res['data'] response = requests.get (
except KeyError: url,
pass params = cast (dict[str, int | str], params),
timeout = 60)
response.raise_for_status ()
res = response.json ()
for datum in cast (list[VideoResult], res.get ('data', [])):
result_by_video_code[datum['contentId']] = datum
except (ValueError, requests.RequestException) as exc:
logger.warning ('snapshot fetch failed: %s - %s (%s)',
to.date (),
until.date (),
exc)
is_complete = False
to = until + timedelta (days = 1) to = until + timedelta (days = 1)
for video in TrackedVideo.get (): for video in TrackedVideo.get ():
if video.code in map (lambda v: v['contentId'], result_data): if video.code in result_by_video_code:
continue continue
try: try:
video_data = fetch_video_data (video.code)['data'] tracked_video = video
result_data.append ({ video_data = fetch_video_data (tracked_video.code)['data']
'contentId': video.code, owner = video_data.get ('owner') or {}
'title': video_data['video']['title'], video_info = video_data['video']
result_by_video_code[tracked_video.code] = {
'contentId': tracked_video.code,
'userId': owner.get ('id'),
'title': video_info['title'],
'tags': ' '.join (map (lambda t: t['name'], 'tags': ' '.join (map (lambda t: t['name'],
video_data['tag']['items'])), video_data['tag']['items'])),
'description': video_data['video']['description'], 'description': video_info['description'],
'viewCounter': video_data['video']['count']['view'], 'viewCounter': video_info['count']['view'],
'startTime': video_data['video']['registeredAt'] }) 'startTime': video_info['registeredAt'] }
except Exception: except (KeyError,
pass TypeError,
ValueError,
requests.RequestException) as exc:
logger.warning ('tracked video fetch failed: %s (%s)', video.code, exc)
is_complete = False
return result_data return { 'videos': list (result_by_video_code.values ()),
'is_complete': is_complete }
def normalise (
text: str,
) -> str:
return jaconv.hira2kata (
unicodedata.normalize ('NFKC', text.strip ())).lower ()
class SearchNicoResult (TypedDict):
videos: list['VideoResult']
is_complete: bool
class UpdateContext (TypedDict):
api_data: list['VideoResult']
comments_by_video_code: dict[str, list['CommentResult']]
deletable: bool
class VideoSearchParam (TypedDict): class VideoSearchParam (TypedDict):
@@ -242,34 +371,22 @@ class VideoSearchParam (TypedDict):
class VideoResult (TypedDict): class VideoResult (TypedDict):
contentId: str contentId: str
title: str userId: int | None
tags: str title: str
description: str | None tags: str
viewCounter: int description: str | None
startTime: str viewCounter: int
startTime: str
class CommentResult (TypedDict): class CommentResult (TypedDict):
id: str no: int
no: int userId: str
vposMs: int body: str
body: str postedAt: str
commands: list[str] nicoruCount: int
userId: str vposMs: int
isPremium: bool
score: int
postedAt: str
nicoruCount: int
nicoruId: Any
source: str
isMyPost: bool
def normalise (
s: str,
) -> str:
return jaconv.hira2kata (unicodedata.normalize ('NFKC', s)).lower ()
if __name__ == '__main__': if __name__ == '__main__':