4 コミット

作成者 SHA1 メッセージ 日付
みてるぞ 495c1381c7 #22 インポート漏れ修正 2026-04-24 23:08:11 +09:00
みてるぞ 1074f09b96 #22 2026-04-24 09:46:34 +00:00
みてるぞ 2b706f1247 #22 2026-04-24 09:33:17 +00:00
みてるぞ cb72b8dd99 削除フラグが誤って付与されるバグ修正(#20) (#21)
#20

#20

#20

#020

Co-authored-by: miteruzo <miteruzo@naver.com>
Reviewed-on: #21
2026-04-11 05:13:29 +09:00
7個のファイルの変更265行の追加133行の削除
+6
ファイルの表示
@@ -64,6 +64,12 @@ class TrackedVideo (Model):
__timestamps__ = False __timestamps__ = False
def upsert (
self,
*args: str,
) -> None:
super ().upsert ('code')
class User (Model): class User (Model):
id: int id: int
+23 -4
ファイルの表示
@@ -19,13 +19,32 @@ class Model (eloquent.Model):
self, self,
*args: str, *args: str,
) -> None: ) -> None:
q = self.query () row = self._find_upsert_row (*args)
for arg in args:
q = q.where (arg, getattr (self, arg))
row = q.first ()
if row is not None: if row is not None:
self.id = row.id self.id = row.id
# pylint: disable = invalid-name # pylint: disable = invalid-name
# pylint: disable = attribute-defined-outside-init # pylint: disable = attribute-defined-outside-init
self._Model__exists = True self._Model__exists = True
self.save () self.save ()
return
try:
self.save ()
except Exception:
row = self._find_upsert_row (*args)
if row is None:
raise
self.id = row.id
# pylint: disable = invalid-name
# pylint: disable = attribute-defined-outside-init
self._Model__exists = True
self.save ()
def _find_upsert_row (
self,
*args: str,
):
q = self.query ()
for arg in args:
q = q.where (arg, getattr (self, arg))
return q.first ()
+2 -5
ファイルの表示
@@ -8,12 +8,9 @@
from __future__ import annotations from __future__ import annotations
import json import json
import os
import sys import sys
from datetime import date, datetime from datetime import datetime
from typing import TypedDict, cast from typing import TypedDict
from eloquent import DatabaseManager, Model
from db.config import DB from db.config import DB
from db.models import Video from db.models import Video
+7 -9
ファイルの表示
@@ -8,12 +8,9 @@
from __future__ import annotations from __future__ import annotations
import json import json
import os
import sys import sys
from datetime import date, datetime from datetime import date, datetime
from typing import TypedDict, cast from typing import cast
from eloquent import DatabaseManager, Model
from db.config import DB from db.config import DB
from db.models import Video, VideoHistory from db.models import Video, VideoHistory
@@ -25,14 +22,15 @@ def main (
views_counts: list[int], views_counts: list[int],
base_date: date, base_date: date,
) -> None: ) -> None:
if not base_date:
base_date = datetime.now ().date ()
kiriban_list: list[tuple[int, str, str]] = [] kiriban_list: list[tuple[int, str, str]] = []
latest_fetched_at = cast (date, (VideoHistory latest_fetched_at = cast (date | None,
(VideoHistory
.where ('fetched_at', '<=', base_date) .where ('fetched_at', '<=', base_date)
.max ('fetched_at'))) .max ('fetched_at')))
if latest_fetched_at is None:
print ('[]')
return
for views_count in views_counts: for views_count in views_counts:
targets = { vh.video.code for vh in ( targets = { vh.video.code for vh in (
@@ -63,5 +61,5 @@ def main (
if __name__ == '__main__': if __name__ == '__main__':
main (map (int, sys.argv[2:]), main (list (map (int, sys.argv[2:])),
datetime.strptime (sys.argv[1], '%Y-%m-%d').date ()) datetime.strptime (sys.argv[1], '%Y-%m-%d').date ())
+1 -14
ファイルの表示
@@ -8,12 +8,9 @@
from __future__ import annotations from __future__ import annotations
import json import json
import os
from datetime import date, datetime from datetime import date, datetime
from typing import TypedDict from typing import TypedDict
from eloquent import DatabaseManager, Model
from db.config import DB from db.config import DB
from db.models import Video from db.models import Video
@@ -36,20 +33,10 @@ def main (
for video_tag in row.video_tags: for video_tag in row.video_tags:
if video_tag.untagged_at is None: if video_tag.untagged_at is None:
video['tags'].append (video_tag.tag.name) video['tags'].append (video_tag.tag.name)
videos.append(video) videos.append (video)
print (json.dumps (videos, default = str)) print (json.dumps (videos, default = str))
class DbConfig (TypedDict):
driver: str
host: str
database: str
user: str
password: str
prefix: str
class VideoDict (TypedDict): class VideoDict (TypedDict):
id: int id: int
code: str code: str
+19
ファイルの表示
@@ -0,0 +1,19 @@
import sys
from db.config import DB
from db.models import TrackedVideo
DB
def main (
video_codes: list[str],
) -> None:
for code in video_codes:
tv = TrackedVideo ()
tv.code = code
tv.upsert ()
if __name__ == '__main__':
main (sys.argv[1:])
+178 -72
ファイルの表示
@@ -8,17 +8,16 @@
from __future__ import annotations from __future__ import annotations
import json import json
import os import logging
import random import random
import string import string
import time import time
import unicodedata import unicodedata
from datetime import datetime, timedelta from datetime import date, datetime, timedelta
from typing import Any, TypedDict, cast from typing import Any, TypedDict, cast
import jaconv import jaconv
import requests import requests
from eloquent import DatabaseManager, Model
from db.config import DB from db.config import DB
from db.models import (Comment, from db.models import (Comment,
@@ -29,39 +28,55 @@ from db.models import (Comment,
VideoHistory, VideoHistory,
VideoTag) VideoTag)
logger = logging.getLogger (__name__)
logging.basicConfig (
level = logging.INFO,
format = '%(asctime)s %(levelname)s %(message)s')
def main ( def main (
) -> None: ) -> None:
now = datetime.now () now = datetime.now ()
today = now.date ()
api_data = search_nico_by_tags (['伊地知ニジカ', search_result = search_nico_by_tags (['伊地知ニジカ',
'ぼざろクリーチャーシリーズ', 'ぼざろクリーチャーシリーズ',
'ぼざろクリーチャーシリーズ外伝']) 'ぼざろクリーチャーシリーズ外伝'])
comments_by_video_code = fetch_comments_by_video_code (search_result['videos'])
DB.begin_transaction () context: UpdateContext = { 'api_data': search_result['videos'],
'comments_by_video_code': comments_by_video_code,
'deletable': search_result['is_complete'] }
connection = DB.connection ()
connection.begin_transaction ()
try: try:
update_tables (api_data, now) update_tables (context, now, today)
DB.commit () connection.commit ()
except Exception: except Exception:
DB.rollback () connection.rollback ()
raise raise
def update_tables ( def update_tables (
api_data: list[VideoResult], context: UpdateContext,
now: datetime, now: datetime,
today: date,
) -> None: ) -> None:
alive_video_codes: list[str] = [] alive_video_codes: list[str] = []
for datum in api_data: for datum in context['api_data']:
tag_names: list[str] = datum['tags'].split () tag_names = datum['tags'].split ()
normalised_tag_names = {normalise (tag_name) for tag_name in tag_names}
user: User | None = None user: User | None = None
if datum['userId']: if datum['userId'] is not None:
user = User.where('code', str (datum['userId'])).first () user = User.where ('code', str (datum['userId'])).first ()
if user is None: if user is None:
user = User () user = User ()
user.code = str (datum['userId']) user.code = str (datum['userId'])
user.save () user.save ()
video = Video () video = Video ()
video.code = datum['contentId'] video.code = datum['contentId']
video.user_id = user.id if user else None video.user_id = user.id if user else None
@@ -71,60 +86,72 @@ def update_tables (
video.deleted_at = None video.deleted_at = None
video.upsert () video.upsert ()
alive_video_codes.append (video.code) alive_video_codes.append (video.code)
video_history = VideoHistory () video_history = VideoHistory ()
video_history.video_id = video.id video_history.video_id = video.id
video_history.fetched_at = now video_history.fetched_at = today
video_history.views_count = datum['viewCounter'] video_history.views_count = datum['viewCounter']
video_history.upsert () video_history.upsert ()
video_tags = [video_tag for video_tag in video.video_tags video_tags = [video_tag for video_tag in video.video_tags
if video_tag.untagged_at is None] if video_tag.untagged_at is None]
tag: Tag | None
video_tag: VideoTag | None
for video_tag in video_tags: for video_tag in video_tags:
tag = video_tag.tag tag = video_tag.tag
if (tag is not None if tag is None:
and (normalise (tag.name) not in map (normalise, tag_names))): continue
video_tag.untagged_at = now if normalise (tag.name) in normalised_tag_names:
continue
video_tag.untagged_at = today
video_tag.save () video_tag.save ()
for tag_name in tag_names: for tag_name in tag_names:
tag = Tag.where ('name', tag_name).first () tag = Tag.where ('name', tag_name).first ()
if tag is None: if tag is None:
tag = Tag () tag = Tag ()
tag.name = tag_name tag.name = tag_name
tag.save () tag.save ()
video_tag = (VideoTag.where ('video_id', video.id) video_tag = (VideoTag.where ('video_id', video.id)
.where ('tag_id', tag.id) .where ('tag_id', tag.id)
.where_null ('untagged_at')
.first ()) .first ())
if video_tag is None: if video_tag is None:
video_tag = VideoTag () video_tag = VideoTag ()
video_tag.video_id = video.id video_tag.video_id = video.id
video_tag.tag_id = tag.id video_tag.tag_id = tag.id
video_tag.tagged_at = now
video_tag.tagged_at = getattr (video_tag, 'tagged_at', None) or today
video_tag.untagged_at = None video_tag.untagged_at = None
video_tag.save () video_tag.upsert ()
for com in fetch_comments (video.code):
for com in context['comments_by_video_code'].get (video.code, []):
user = User.where ('code', com['userId']).first () user = User.where ('code', com['userId']).first ()
if user is None: if user is None:
user = User () user = User ()
user.code = com['userId'] user.code = com['userId']
user.save () user.save ()
comment = Comment () comment = Comment ()
comment.video_id = video.id comment.video_id = video.id
comment.comment_no = com['no'] comment.comment_no = com['no']
comment.user_id = user.id comment.user_id = user.id
comment.content = com['body'] comment.content = com['body']
comment.posted_at = datetime.fromisoformat (com['postedAt']) comment.posted_at = datetime.fromisoformat (com['postedAt'])
comment.nico_count = com['nicoruCount'] comment.nico_count = com.get ('nicoruCount', 0)
comment.vpos_ms = com['vposMs'] comment.vpos_ms = com.get ('vposMs', 0)
comment.upsert () comment.upsert ()
# 削除動画 if not context['deletable']:
logger.warning ('skip soft-delete because the latest fetch was incomplete')
return
if not alive_video_codes:
logger.warning ('skip soft-delete because no alive videos were fetched')
return
videos = (Video.where_not_in ('code', alive_video_codes) videos = (Video.where_not_in ('code', alive_video_codes)
.where_null ('deleted_at') .where_null ('deleted_at')
.get ()) .get ())
for video in videos: for video in videos:
if video.code not in alive_video_codes:
video.deleted_at = now video.deleted_at = now
video.save () video.save ()
@@ -143,19 +170,38 @@ def fetch_video_data (
+ '_' + '_'
+ str (random.randrange (10 ** 12, 10 ** 13))) + str (random.randrange (10 ** 12, 10 ** 13)))
url = (f"https://www.nicovideo.jp/api/watch/v3_guest/{ video_code }" url = (f'https://www.nicovideo.jp/api/watch/v3_guest/{ video_code }'
+ f"?actionTrackId={ action_track_id }") + f'?actionTrackId={ action_track_id }')
return requests.post (url, headers = headers, timeout = 60).json () return requests.post (url, headers = headers, timeout = 60).json ()
def fetch_comments_by_video_code (
videos: list[VideoResult],
) -> dict[str, list[CommentResult]]:
comments_by_video_code: dict[str, list[CommentResult]] = {}
for video in videos:
video_code = video['contentId']
try:
comments_by_video_code[video_code] = fetch_comments (video_code)
except (KeyError,
TypeError,
ValueError,
requests.RequestException) as exc:
logger.warning ('failed to fetch comments: %s (%s)', video_code, exc)
comments_by_video_code[video_code] = []
return comments_by_video_code
def fetch_comments ( def fetch_comments (
video_code: str, video_code: str,
) -> list[CommentResult]: ) -> list[CommentResult]:
try: video_data = fetch_video_data (video_code)
nv_comment = fetch_video_data (video_code)['data']['comment']['nvComment'] nv_comment = (video_data.get ('data', {})
except KeyError: .get ('comment', {})
return [] .get ('nvComment'))
if nv_comment is None: if nv_comment is None:
return [] return []
@@ -164,31 +210,68 @@ def fetch_comments (
'Content-Type': 'application/json' } 'Content-Type': 'application/json' }
params = { 'params': nv_comment['params'], params = { 'params': nv_comment['params'],
'additionals': { }, 'additionals': {},
'threadKey': nv_comment['threadKey'] } 'threadKey': nv_comment['threadKey'] }
url = nv_comment['server'] + '/v1/threads' url = nv_comment['server'] + '/v1/threads'
res = (requests.post (url, json.dumps (params), response = requests.post (url,
json = params,
headers = headers, headers = headers,
timeout = 60) timeout = 60)
.json ()) response.raise_for_status ()
res = response.json ()
try: return select_comments_from_threads (res)
return res['data']['threads'][1]['comments']
except (IndexError, KeyError):
def select_comments_from_threads (
response: dict[str, Any],
) -> list[CommentResult]:
threads = response.get ('data', {}).get ('threads', [])
if not isinstance (threads, list):
return [] return []
main_comments: list[CommentResult] = []
fallback_comments: list[CommentResult] = []
for thread in threads:
comments = thread.get ('comments') if isinstance (thread, dict) else None
if not isinstance (comments, list):
continue
casted_comments = cast (list[CommentResult], comments)
if len (casted_comments) > len (fallback_comments):
fallback_comments = casted_comments
fork = str (thread.get ('fork', '')).lower ()
label = str (thread.get ('label', '')).lower ()
thread_id = str (thread.get ('id', '')).lower ()
if fork == 'main' or 'main' in label or 'main' in thread_id:
main_comments = casted_comments
selected_comments = main_comments or fallback_comments
deduped_comments: dict[int, CommentResult] = {}
for comment in selected_comments:
comment_no = comment.get ('no')
if not isinstance (comment_no, int):
continue
deduped_comments[comment_no] = comment
return [deduped_comments[comment_no]
for comment_no in sorted (deduped_comments)]
def search_nico_by_tags ( def search_nico_by_tags (
tags: list[str], tags: list[str],
) -> list[VideoResult]: ) -> SearchNicoResult:
today = datetime.now () today = datetime.now ()
url = ('https://snapshot.search.nicovideo.jp' url = ('https://snapshot.search.nicovideo.jp'
+ '/api/v2/snapshot/video/contents/search') + '/api/v2/snapshot/video/contents/search')
result_data: list[VideoResult] = [] result_by_video_code: dict[str, VideoResult] = {}
is_complete = True
to = datetime (2022, 12, 3) to = datetime (2022, 12, 3)
while to <= today: while to <= today:
time.sleep (1.2) time.sleep (1.2)
@@ -215,31 +298,67 @@ def search_nico_by_tags (
'startTime'), 'startTime'),
'_limit': 100, '_limit': 100,
'jsonFilter': query_filter } 'jsonFilter': query_filter }
res = requests.get (url, params = cast (dict[str, int | str], params), timeout = 60).json ()
try: try:
result_data += res['data'] response = requests.get (
except KeyError: url,
pass params = cast (dict[str, int | str], params),
timeout = 60)
response.raise_for_status ()
res = response.json ()
for datum in cast (list[VideoResult], res.get ('data', [])):
result_by_video_code[datum['contentId']] = datum
except (ValueError, requests.RequestException) as exc:
logger.warning ('snapshot fetch failed: %s - %s (%s)',
to.date (),
until.date (),
exc)
is_complete = False
to = until + timedelta (days = 1) to = until + timedelta (days = 1)
for video in TrackedVideo.get (): for video in TrackedVideo.get ():
if video.code in map (lambda v: v['contentId'], result_data): if video.code in result_by_video_code:
continue continue
try: try:
video_data = fetch_video_data (video.code)['data'] tracked_video = video
result_data.append ({ video_data = fetch_video_data (tracked_video.code)['data']
'contentId': video.code, owner = video_data.get ('owner') or {}
'userId': video_data['video']['userId'], video_info = video_data['video']
'title': video_data['video']['title'], result_by_video_code[tracked_video.code] = {
'contentId': tracked_video.code,
'userId': owner.get ('id'),
'title': video_info['title'],
'tags': ' '.join (map (lambda t: t['name'], 'tags': ' '.join (map (lambda t: t['name'],
video_data['tag']['items'])), video_data['tag']['items'])),
'description': video_data['video']['description'], 'description': video_info['description'],
'viewCounter': video_data['video']['count']['view'], 'viewCounter': video_info['count']['view'],
'startTime': video_data['video']['registeredAt'] }) 'startTime': video_info['registeredAt'] }
except Exception: except (KeyError,
pass TypeError,
ValueError,
requests.RequestException) as exc:
logger.warning ('tracked video fetch failed: %s (%s)', video.code, exc)
is_complete = False
return result_data return { 'videos': list (result_by_video_code.values ()),
'is_complete': is_complete }
def normalise (
text: str,
) -> str:
return jaconv.hira2kata (
unicodedata.normalize ('NFKC', text.strip ())).lower ()
class SearchNicoResult (TypedDict):
videos: list['VideoResult']
is_complete: bool
class UpdateContext (TypedDict):
api_data: list['VideoResult']
comments_by_video_code: dict[str, list['CommentResult']]
deletable: bool
class VideoSearchParam (TypedDict): class VideoSearchParam (TypedDict):
@@ -262,25 +381,12 @@ class VideoResult (TypedDict):
class CommentResult (TypedDict): class CommentResult (TypedDict):
id: str
no: int no: int
vposMs: int
body: str
commands: list[str]
userId: str userId: str
isPremium: bool body: str
score: int
postedAt: str postedAt: str
nicoruCount: int nicoruCount: int
nicoruId: Any vposMs: int
source: str
isMyPost: bool
def normalise (
s: str,
) -> str:
return jaconv.hira2kata (unicodedata.normalize ('NFKC', s)).lower ()
if __name__ == '__main__': if __name__ == '__main__':