2 コミット

作成者 SHA1 メッセージ 日付
みてるぞ ac17cb2bbf #17 2026-03-05 20:56:40 +09:00
みてるぞ 442097f037 #17 2026-03-05 20:50:00 +09:00
7個のファイルの変更133行の追加265行の削除
-6
ファイルの表示
@@ -64,12 +64,6 @@ class TrackedVideo (Model):
__timestamps__ = False
def upsert (
self,
*args: str,
) -> None:
super ().upsert ('code')
class User (Model):
id: int
+4 -23
ファイルの表示
@@ -19,32 +19,13 @@ class Model (eloquent.Model):
self,
*args: str,
) -> None:
row = self._find_upsert_row (*args)
q = self.query ()
for arg in args:
q = q.where (arg, getattr (self, arg))
row = q.first ()
if row is not None:
self.id = row.id
# pylint: disable = invalid-name
# pylint: disable = attribute-defined-outside-init
self._Model__exists = True
self.save ()
return
try:
self.save ()
except Exception:
row = self._find_upsert_row (*args)
if row is None:
raise
self.id = row.id
# pylint: disable = invalid-name
# pylint: disable = attribute-defined-outside-init
self._Model__exists = True
self.save ()
def _find_upsert_row (
self,
*args: str,
):
q = self.query ()
for arg in args:
q = q.where (arg, getattr (self, arg))
return q.first ()
+5 -2
ファイルの表示
@@ -8,9 +8,12 @@
from __future__ import annotations
import json
import os
import sys
from datetime import datetime
from typing import TypedDict
from datetime import date, datetime
from typing import TypedDict, cast
from eloquent import DatabaseManager, Model
from db.config import DB
from db.models import Video
+9 -7
ファイルの表示
@@ -8,9 +8,12 @@
from __future__ import annotations
import json
import os
import sys
from datetime import date, datetime
from typing import cast
from typing import TypedDict, cast
from eloquent import DatabaseManager, Model
from db.config import DB
from db.models import Video, VideoHistory
@@ -22,15 +25,14 @@ def main (
views_counts: list[int],
base_date: date,
) -> None:
if not base_date:
base_date = datetime.now ().date ()
kiriban_list: list[tuple[int, str, str]] = []
latest_fetched_at = cast (date | None,
(VideoHistory
latest_fetched_at = cast (date, (VideoHistory
.where ('fetched_at', '<=', base_date)
.max ('fetched_at')))
if latest_fetched_at is None:
print ('[]')
return
for views_count in views_counts:
targets = { vh.video.code for vh in (
@@ -61,5 +63,5 @@ def main (
if __name__ == '__main__':
main (list (map (int, sys.argv[2:])),
main (map (int, sys.argv[2:]),
datetime.strptime (sys.argv[1], '%Y-%m-%d').date ())
+13
ファイルの表示
@@ -8,9 +8,12 @@
from __future__ import annotations
import json
import os
from datetime import date, datetime
from typing import TypedDict
from eloquent import DatabaseManager, Model
from db.config import DB
from db.models import Video
@@ -37,6 +40,16 @@ def main (
print (json.dumps (videos, default = str))
class DbConfig (TypedDict):
driver: str
host: str
database: str
user: str
password: str
prefix: str
class VideoDict (TypedDict):
id: int
code: str
-19
ファイルの表示
@@ -1,19 +0,0 @@
import sys
from db.config import DB
from db.models import TrackedVideo
DB
def main (
video_codes: list[str],
) -> None:
for code in video_codes:
tv = TrackedVideo ()
tv.code = code
tv.upsert ()
if __name__ == '__main__':
main (sys.argv[1:])
+70 -176
ファイルの表示
@@ -8,16 +8,17 @@
from __future__ import annotations
import json
import logging
import os
import random
import string
import time
import unicodedata
from datetime import date, datetime, timedelta
from datetime import datetime, timedelta
from typing import Any, TypedDict, cast
import jaconv
import requests
from eloquent import DatabaseManager, Model
from db.config import DB
from db.models import (Comment,
@@ -28,55 +29,39 @@ from db.models import (Comment,
VideoHistory,
VideoTag)
logger = logging.getLogger (__name__)
logging.basicConfig (
level = logging.INFO,
format = '%(asctime)s %(levelname)s %(message)s')
def main (
) -> None:
now = datetime.now ()
today = now.date ()
search_result = search_nico_by_tags (['伊地知ニジカ',
api_data = search_nico_by_tags (['伊地知ニジカ',
'ぼざろクリーチャーシリーズ',
'ぼざろクリーチャーシリーズ外伝'])
comments_by_video_code = fetch_comments_by_video_code (search_result['videos'])
context: UpdateContext = { 'api_data': search_result['videos'],
'comments_by_video_code': comments_by_video_code,
'deletable': search_result['is_complete'] }
connection = DB.connection ()
connection.begin_transaction ()
DB.begin_transaction ()
try:
update_tables (context, now, today)
connection.commit ()
update_tables (api_data, now)
DB.commit ()
except Exception:
connection.rollback ()
DB.rollback ()
raise
def update_tables (
context: UpdateContext,
api_data: list[VideoResult],
now: datetime,
today: date,
) -> None:
alive_video_codes: list[str] = []
for datum in context['api_data']:
tag_names = datum['tags'].split ()
normalised_tag_names = {normalise (tag_name) for tag_name in tag_names}
for datum in api_data:
tag_names: list[str] = datum['tags'].split ()
user: User | None = None
if datum['userId'] is not None:
if datum['userId']:
user = User.where('code', str (datum['userId'])).first ()
if user is None:
user = User ()
user.code = str (datum['userId'])
user.save ()
video = Video ()
video.code = datum['contentId']
video.user_id = user.id if user else None
@@ -86,72 +71,60 @@ def update_tables (
video.deleted_at = None
video.upsert ()
alive_video_codes.append (video.code)
video_history = VideoHistory ()
video_history.video_id = video.id
video_history.fetched_at = today
video_history.fetched_at = now
video_history.views_count = datum['viewCounter']
video_history.upsert ()
video_tags = [video_tag for video_tag in video.video_tags
if video_tag.untagged_at is None]
tag: Tag | None
video_tag: VideoTag | None
for video_tag in video_tags:
tag = video_tag.tag
if tag is None:
continue
if normalise (tag.name) in normalised_tag_names:
continue
video_tag.untagged_at = today
if (tag is not None
and (normalise (tag.name) not in map (normalise, tag_names))):
video_tag.untagged_at = now
video_tag.save ()
for tag_name in tag_names:
tag = Tag.where ('name', tag_name).first ()
if tag is None:
tag = Tag ()
tag.name = tag_name
tag.save ()
video_tag = (VideoTag.where ('video_id', video.id)
.where ('tag_id', tag.id)
.where_null ('untagged_at')
.first ())
if video_tag is None:
video_tag = VideoTag ()
video_tag.video_id = video.id
video_tag.tag_id = tag.id
video_tag.tagged_at = getattr (video_tag, 'tagged_at', None) or today
video_tag.tagged_at = now
video_tag.untagged_at = None
video_tag.upsert ()
for com in context['comments_by_video_code'].get (video.code, []):
video_tag.save ()
for com in fetch_comments (video.code):
user = User.where ('code', com['userId']).first ()
if user is None:
user = User ()
user.code = com['userId']
user.save ()
comment = Comment ()
comment.video_id = video.id
comment.comment_no = com['no']
comment.user_id = user.id
comment.content = com['body']
comment.posted_at = datetime.fromisoformat (com['postedAt'])
comment.nico_count = com.get ('nicoruCount', 0)
comment.vpos_ms = com.get ('vposMs', 0)
comment.nico_count = com['nicoruCount']
comment.vpos_ms = com['vposMs']
comment.upsert ()
if not context['deletable']:
logger.warning ('skip soft-delete because the latest fetch was incomplete')
return
if not alive_video_codes:
logger.warning ('skip soft-delete because no alive videos were fetched')
return
# 削除動画
videos = (Video.where_not_in ('code', alive_video_codes)
.where_null ('deleted_at')
.get ())
for video in videos:
if video.code not in alive_video_codes:
video.deleted_at = now
video.save ()
@@ -170,38 +143,19 @@ def fetch_video_data (
+ '_'
+ str (random.randrange (10 ** 12, 10 ** 13)))
url = (f'https://www.nicovideo.jp/api/watch/v3_guest/{ video_code }'
+ f'?actionTrackId={ action_track_id }')
url = (f"https://www.nicovideo.jp/api/watch/v3_guest/{ video_code }"
+ f"?actionTrackId={ action_track_id }")
return requests.post (url, headers = headers, timeout = 60).json ()
def fetch_comments_by_video_code (
videos: list[VideoResult],
) -> dict[str, list[CommentResult]]:
comments_by_video_code: dict[str, list[CommentResult]] = {}
for video in videos:
video_code = video['contentId']
try:
comments_by_video_code[video_code] = fetch_comments (video_code)
except (KeyError,
TypeError,
ValueError,
requests.RequestException) as exc:
logger.warning ('failed to fetch comments: %s (%s)', video_code, exc)
comments_by_video_code[video_code] = []
return comments_by_video_code
def fetch_comments (
video_code: str,
) -> list[CommentResult]:
video_data = fetch_video_data (video_code)
nv_comment = (video_data.get ('data', {})
.get ('comment', {})
.get ('nvComment'))
try:
nv_comment = fetch_video_data (video_code)['data']['comment']['nvComment']
except KeyError:
return []
if nv_comment is None:
return []
@@ -215,63 +169,26 @@ def fetch_comments (
url = nv_comment['server'] + '/v1/threads'
response = requests.post (url,
json = params,
res = (requests.post (url, json.dumps (params),
headers = headers,
timeout = 60)
response.raise_for_status ()
res = response.json ()
.json ())
return select_comments_from_threads (res)
def select_comments_from_threads (
response: dict[str, Any],
) -> list[CommentResult]:
threads = response.get ('data', {}).get ('threads', [])
if not isinstance (threads, list):
try:
return res['data']['threads'][1]['comments']
except (IndexError, KeyError):
return []
main_comments: list[CommentResult] = []
fallback_comments: list[CommentResult] = []
for thread in threads:
comments = thread.get ('comments') if isinstance (thread, dict) else None
if not isinstance (comments, list):
continue
casted_comments = cast (list[CommentResult], comments)
if len (casted_comments) > len (fallback_comments):
fallback_comments = casted_comments
fork = str (thread.get ('fork', '')).lower ()
label = str (thread.get ('label', '')).lower ()
thread_id = str (thread.get ('id', '')).lower ()
if fork == 'main' or 'main' in label or 'main' in thread_id:
main_comments = casted_comments
selected_comments = main_comments or fallback_comments
deduped_comments: dict[int, CommentResult] = {}
for comment in selected_comments:
comment_no = comment.get ('no')
if not isinstance (comment_no, int):
continue
deduped_comments[comment_no] = comment
return [deduped_comments[comment_no]
for comment_no in sorted (deduped_comments)]
def search_nico_by_tags (
tags: list[str],
) -> SearchNicoResult:
) -> list[VideoResult]:
today = datetime.now ()
url = ('https://snapshot.search.nicovideo.jp'
+ '/api/v2/snapshot/video/contents/search')
result_by_video_code: dict[str, VideoResult] = {}
is_complete = True
result_data: list[VideoResult] = []
to = datetime (2022, 12, 3)
while to <= today:
time.sleep (1.2)
@@ -298,67 +215,31 @@ def search_nico_by_tags (
'startTime'),
'_limit': 100,
'jsonFilter': query_filter }
res = requests.get (url, params = cast (dict[str, int | str], params), timeout = 60).json ()
try:
response = requests.get (
url,
params = cast (dict[str, int | str], params),
timeout = 60)
response.raise_for_status ()
res = response.json ()
for datum in cast (list[VideoResult], res.get ('data', [])):
result_by_video_code[datum['contentId']] = datum
except (ValueError, requests.RequestException) as exc:
logger.warning ('snapshot fetch failed: %s - %s (%s)',
to.date (),
until.date (),
exc)
is_complete = False
result_data += res['data']
except KeyError:
pass
to = until + timedelta (days = 1)
for video in TrackedVideo.get ():
if video.code in result_by_video_code:
if video.code in map (lambda v: v['contentId'], result_data):
continue
try:
tracked_video = video
video_data = fetch_video_data (tracked_video.code)['data']
owner = video_data.get ('owner') or {}
video_info = video_data['video']
result_by_video_code[tracked_video.code] = {
'contentId': tracked_video.code,
'userId': owner.get ('id'),
'title': video_info['title'],
video_data = fetch_video_data (video.code)['data']
result_data.append ({
'contentId': video.code,
'userId': video_data['video']['userId'],
'title': video_data['video']['title'],
'tags': ' '.join (map (lambda t: t['name'],
video_data['tag']['items'])),
'description': video_info['description'],
'viewCounter': video_info['count']['view'],
'startTime': video_info['registeredAt'] }
except (KeyError,
TypeError,
ValueError,
requests.RequestException) as exc:
logger.warning ('tracked video fetch failed: %s (%s)', video.code, exc)
is_complete = False
'description': video_data['video']['description'],
'viewCounter': video_data['video']['count']['view'],
'startTime': video_data['video']['registeredAt'] })
except Exception:
pass
return { 'videos': list (result_by_video_code.values ()),
'is_complete': is_complete }
def normalise (
text: str,
) -> str:
return jaconv.hira2kata (
unicodedata.normalize ('NFKC', text.strip ())).lower ()
class SearchNicoResult (TypedDict):
videos: list['VideoResult']
is_complete: bool
class UpdateContext (TypedDict):
api_data: list['VideoResult']
comments_by_video_code: dict[str, list['CommentResult']]
deletable: bool
return result_data
class VideoSearchParam (TypedDict):
@@ -381,12 +262,25 @@ class VideoResult (TypedDict):
class CommentResult (TypedDict):
id: str
no: int
userId: str
vposMs: int
body: str
commands: list[str]
userId: str
isPremium: bool
score: int
postedAt: str
nicoruCount: int
vposMs: int
nicoruId: Any
source: str
isMyPost: bool
def normalise (
s: str,
) -> str:
return jaconv.hira2kata (unicodedata.normalize ('NFKC', s)).lower ()
if __name__ == '__main__':