58 コミット

作成者 SHA1 メッセージ 日付
みてるぞ f34ff2a0f6 #20 2026-03-29 17:58:13 +09:00
みてるぞ 0ddf29cfd8 #20 2026-03-29 16:40:38 +09:00
みてるぞ af4cc8d618 #20 2026-03-29 16:17:02 +09:00
みてるぞ 516d5ebfa6 #020 2026-03-29 15:16:51 +09:00
みてるぞ b2adf62090 投稿者情報追加(#17) (#18)
#17

#17

Co-authored-by: miteruzo <miteruzo@naver.com>
Reviewed-on: #18
2026-03-05 21:03:16 +09:00
みてるぞ a3d9d0bfd7 feat: タグと関係なしに追跡する動画リスト追加 (#16)
#15 タグ関係なく追跡する動画リスト

Co-authored-by: miteruzo <miteruzo@naver.com>
Reviewed-on: #16
2026-01-01 14:00:11 +09:00
みてるぞ f44637d274 コメント取得追加 2025-10-26 05:07:07 +09:00
みてるぞ f809e9faae キリ番修正 2025-10-26 01:19:15 +09:00
みてるぞ de8fd8634a キリ番修正 2025-10-26 01:07:15 +09:00
みてるぞ 88be511f6e キリ番修正 2025-10-26 01:06:41 +09:00
みてるぞ ea339f1ec9 キリ番修正 2025-10-26 01:05:48 +09:00
みてるぞ 06328a89b2 キリ番追加 2025-10-26 00:45:29 +09:00
みてるぞ 463e8bbec7 config に移した. 2025-10-22 23:25:49 +09:00
みてるぞ baa75d68ba ゑぐぃバグ修正 2025-08-15 02:51:06 +09:00
みてるぞ 48e51f97d0 'update_db.py' を更新 2025-07-19 19:33:37 +09:00
みてるぞ c5204383ed #13 2025-07-01 23:53:11 +09:00
みてるぞ bf36d05ed3 #13 2025-07-01 23:48:30 +09:00
みてるぞ c9bd6fdfa7 #13 2025-07-01 23:39:57 +09:00
みてるぞ b2f5f81ca8 型定義追加 2024-11-06 03:56:28 +09:00
みてるぞ 67b76e6dd4 #12 2024-11-05 12:34:10 +09:00
みてるぞ 6a5e6dfade ごみを削除 2024-10-17 18:59:25 +09:00
みてるぞ 6e99da7326 Python 3.10 に適合したのと __pycache__ 除外 2024-10-17 18:57:03 +09:00
みてるぞ 53ba658319 #8 の対応 2024-10-17 12:49:13 +09:00
みてるぞ 067c90890e video_histories への書込みを Upsert に 2024-10-16 22:40:42 +09:00
みてるぞ db14af1a73 もろもろ修正 2024-10-16 22:17:35 +09:00
みてるぞ da17333a80 typing.Self は Python 3.10 には未実装か,さぅか. 2024-10-16 22:16:04 +09:00
みてるぞ ee971997ad #6 の対応 2024-10-16 22:08:07 +09:00
みてるぞ b448335851 もろもろのモロヘイヤ 2024-10-16 20:20:50 +09:00
みてるぞ 9f810b23f0 Upsert 問題の修正とその他無駄を排除 2024-10-15 23:01:03 +09:00
みてるぞ c91cf19926 Eloquent の型定義ファイルと型安全性確認;本番環境に取込み可能 2024-10-15 00:18:35 +09:00
みてるぞ 6185788456 Merge branch 'main' into feature/query 2024-10-14 19:15:22 +09:00
みてるぞ 283b628053 課題 #5 に対する対応 2024-10-14 19:12:37 +09:00
みてるぞ 05182b251f Merge branch 'main' into feature/query 2024-10-14 18:23:39 +09:00
みてるぞ 5eb3fb6037 外されたタグを再登録できてなぃバグ修正 2024-10-14 18:16:56 +09:00
みてるぞ b6c041ddad Merge branch 'main' into feature/query 2024-10-12 22:21:21 +09:00
みてるぞ e23d919919 タグ削除チェックは大文字に変換して行ふやぅ修正 2024-10-12 22:10:33 +09:00
みてるぞ 726b5dc47e Eloquent への移植いちわぅ完了(後で型定義する) 2024-10-12 18:39:21 +09:00
みてるぞ feb31dc10b Eloquent 導入(ソースはエラー起きるため main に取込まなぃこと) 2024-10-12 06:01:21 +09:00
みてるぞ 980dd0ac60 ログ出力追加 2024-10-10 20:42:47 +09:00
みてるぞ faf3b44745 括弧の対応がをかしかったので修正(コミット前に MyPy チェックして) 2024-10-10 12:21:57 +09:00
みてるぞ 988cf714f8 print で出力するのは無意味だったので取消し 2024-10-10 02:59:44 +09:00
みてるぞ 033652e1b7 凡ミス(よくこれで 1 回め通ったな……) 2024-10-10 02:46:13 +09:00
みてるぞ e4f9470bc5 Merge branch 'main' of https://git.miteruzo.com/miteruzo/nizika_nico 2024-10-10 02:40:38 +09:00
みてるぞ 6535cbed70 【お試し】Upsert 時に LAST_INSERT_ID 指定 2024-10-10 02:40:29 +09:00
みてるぞ 763375c69d Python 3.10 非対応機能削除 2024-10-10 02:20:44 +09:00
みてるぞ 28471cb318 SQL ログ追加 2024-10-10 02:15:12 +09:00
みてるぞ d6bf49033b DbNull の修正と型定義整備 2024-10-10 00:26:56 +09:00
みてるぞ d226b443d7 MyPy のエラー対応 2024-10-08 19:28:56 +09:00
みてるぞ bf09176bfe いろいろ 2024-10-08 12:48:36 +09:00
みてるぞ 86238a11ed タグ周りのバグ修正 2024-10-08 12:46:30 +09:00
みてるぞ 890f584010 バグ修正 2024-10-08 01:52:10 +09:00
みてるぞ a2beddfa12 DB 名忘れてたので追加 2024-10-07 22:57:47 +09:00
みてるぞ e4b541ecdc 完成(たぶん) 2024-10-07 22:53:13 +09:00
みてるぞ 9261d0c68d 100 件しか検索できなぃ問題修正 2024-10-07 01:49:37 +09:00
みてるぞ 7407664264 DAO および DTO の不足について対応 2024-10-06 23:38:41 +09:00
みてるぞ f7555345e8 本日作業分 2024-10-06 06:39:45 +09:00
みてるぞ fb87032fab 本日作業分 2024-09-27 04:06:37 +09:00
みてるぞ d8b83c8632 本日作業分 2024-09-25 01:48:58 +09:00
12個のファイルの変更901行の追加214行の削除
+1
ファイルの表示
@@ -0,0 +1 @@
__pycache__
+25
ファイルの表示
@@ -0,0 +1,25 @@
from __future__ import annotations
import os
from typing import TypedDict
from eloquent import DatabaseManager, Model # type: ignore
CONFIG: dict[str, DbConfig] = { 'mysql': { 'driver': 'mysql',
'host': 'localhost',
'database': 'nizika_nico',
'user': os.environ['MYSQL_USER'],
'password': os.environ['MYSQL_PASS'],
'prefix': '' } }
DB = DatabaseManager (CONFIG)
Model.set_connection_resolver (DB)
class DbConfig (TypedDict):
driver: str
host: str
database: str
user: str
password: str
prefix: str
+145
ファイルの表示
@@ -0,0 +1,145 @@
# pylint: disable = missing-class-docstring
# pylint: disable = missing-function-docstring
# pylint: disable = missing-module-docstring
# pylint: disable = unused-argument
from __future__ import annotations
from typing import Any, Generic, Type, TypeVar, overload
from typing_extensions import Self
_ModelT = TypeVar ('_ModelT', bound = 'Model')
class Connection:
def select (self, query: str, bindings: dict[str, Any] | None = None) -> Any: ...
def insert (self, query: str, bindings: dict[str, Any] | None = None) -> int: ...
def update (self, query: str, bindings: dict[str, Any] | None = None) -> int: ...
def delete (self, query: str, bindings: dict[str, Any] | None = None) -> int: ...
def transaction (self, callback: Any) -> Any: ...
def begin_transaction (self) -> None: ...
def commit (self) -> None: ...
def rollback (self) -> None: ...
class ConnectionResolver:
def connection (self, name: str | None = None) -> Any: ...
def get_default_connection (self) -> str: ...
def set_default_connection (self, name: str) -> None: ...
class DatabaseManager:
connections: dict[str, Connection]
def __init__ (self, config: dict[str, Any]) -> None: ...
def connection (self, name: str | None = None) -> Connection: ...
def disconnect (self, name: str | None = None) -> None: ...
def reconnect (self, name: str | None = None) -> Connection: ...
def get_connections (self) -> dict[str, Connection]: ...
class Model:
id: int
_Model__exists: bool
def has_one (
self,
related_model: Type[_ModelT],
foreign_key: str | None = None,
) -> _ModelT: ...
def has_many (
self,
related_model: Type[_ModelT],
foreign_key: str | None = None,
) -> list[_ModelT]: ...
def belongs_to (
self,
related_model: Type[_ModelT],
foreign_key: str | None = None,
) -> _ModelT: ...
def belongs_to_many (
self,
related_model: Type[_ModelT],
foreign_key: str | None = None,
) -> list[_ModelT]: ...
def save (self) -> None: ...
def delete (self) -> None: ...
@classmethod
def find (cls, id_: int) -> Self | None: ...
@classmethod
def query (
cls,
) -> QueryBuilder[Self]: ...
@overload
@classmethod
def where (
cls,
field: str,
operator: str,
value: Any,
) -> QueryBuilder[Self]: ...
@overload
@classmethod
def where (cls, field: str, value: Any) -> QueryBuilder[Self]: ...
@classmethod
def where_not_in (
cls,
column: str,
values: list[Any] | tuple
) -> QueryBuilder[Self]: ...
@classmethod
def where_not_null (cls, field: str) -> QueryBuilder[Self]: ...
@classmethod
def max (cls, column: str) -> Any: ...
@classmethod
def set_connection_resolver (cls, resolver: DatabaseManager) -> None: ...
class QueryBuilder (Generic[_ModelT]):
def first (self) -> _ModelT | None: ...
def get (self) -> list[_ModelT]: ...
@overload
def where (
self,
field: str,
operator: str,
value: Any,
) -> QueryBuilder[_ModelT]: ...
@overload
def where (self, field: str, value: Any) -> QueryBuilder[_ModelT]: ...
def where_null (self, field: str) -> QueryBuilder[_ModelT]: ...
def max (self, column: str) -> Any: ...
def _load_relation (self, relation_name: str) -> QueryBuilder[_ModelT]: ...
+171
ファイルの表示
@@ -0,0 +1,171 @@
# pylint: disable = missing-class-docstring
# pylint: disable = missing-function-docstring
"""
ぼざクリ DB の構成
"""
from __future__ import annotations
from datetime import date, datetime
from db.my_eloquent import Model
class Comment (Model):
# pylint: disable = too-many-instance-attributes
id: int
video_id: int
comment_no: int
user_id: int
content: str
posted_at: datetime
nico_count: int
vpos_ms: int
__timestamps__ = False
@property
def video (
self,
) -> Video:
return self.belongs_to (Video)
@property
def user (
self,
) -> User:
return self.belongs_to (User)
def upsert (
self,
*args: str,
) -> None:
super ().upsert ('video_id', 'comment_no')
class Tag (Model):
id: int
name: str
__timestamps__ = False
@property
def video_tags (
self,
) -> list[VideoTag]:
return self.has_many (VideoTag)
class TrackedVideo (Model):
id: int
code: str
__timestamps__ = False
class User (Model):
id: int
code: str
__timestamps__ = False
@property
def comments (
self,
) -> list[Comment]:
return self.has_many (Comment)
class Video (Model):
id: int
code: str
user_id: int | None
title: str
description: str
uploaded_at: datetime
deleted_at: datetime | None
__timestamps__ = False
@property
def user (
self,
) -> User | None:
if self.user_id is None:
return None
return self.belongs_to (User)
@property
def video_histories (
self,
) -> list[VideoHistory]:
return self.has_many (VideoHistory)
@property
def video_tags (
self,
) -> list[VideoTag]:
return self.has_many (VideoTag)
@property
def comments (
self,
) -> list[Comment]:
return self.has_many (Comment)
def upsert (
self,
*args: str,
) -> None:
super ().upsert ('code')
class VideoHistory (Model):
id: int
video_id: int
fetched_at: date
views_count: int
__timestamps__ = False
@property
def video (
self,
) -> Video:
return self.belongs_to (Video)
def upsert (
self,
*args: str,
) -> None:
super ().upsert ('video_id', 'fetched_at')
class VideoTag (Model):
id: int
video_id: int
tag_id: int
tagged_at: date
untagged_at: date | None
__timestamps__ = False
@property
def video (
self,
) -> Video:
return self.belongs_to (Video)
@property
def tag (
self,
) -> Tag:
return self.belongs_to (Tag)
def upsert (
self,
*args: str,
) -> None:
super ().upsert ('video_id', 'tag_id')
+50
ファイルの表示
@@ -0,0 +1,50 @@
# pylint: disable = missing-class-docstring
# pylint: disable = missing-function-docstring
"""
みてるぞ式魔改造(言ふほどか?)版 Eloquent
"""
import eloquent
class DatabaseManager (eloquent.DatabaseManager):
pass
class Model (eloquent.Model):
id: int
def upsert (
self,
*args: str,
) -> None:
row = self._find_upsert_row (*args)
if row is not None:
self.id = row.id
# pylint: disable = invalid-name
# pylint: disable = attribute-defined-outside-init
self._Model__exists = True
self.save ()
return
try:
self.save ()
except Exception:
row = self._find_upsert_row (*args)
if row is None:
raise
self.id = row.id
# pylint: disable = invalid-name
# pylint: disable = attribute-defined-outside-init
self._Model__exists = True
self.save ()
def _find_upsert_row (
self,
*args: str,
):
q = self.query ()
for arg in args:
q = q.where (arg, getattr (self, arg))
return q.first ()
シンボリックリンク
+1
ファイルの表示
@@ -0,0 +1 @@
db/eloquent.pyi
+55
ファイルの表示
@@ -0,0 +1,55 @@
# pylint: disable = missing-class-docstring
# pylint: disable = missing-function-docstring
"""
動画コードからコメントのリストを取得し,JSON 形式で出力する.
"""
from __future__ import annotations
import json
import sys
from datetime import datetime
from typing import TypedDict
from db.config import DB
from db.models import Video
DB
def main (
video_code: str,
) -> None:
video = Video.where ('code', video_code).first ()
if video:
comments: list[CommentDict] = []
for row in video.comments:
comment: CommentDict = {
'id': row.id,
'video_id': row.video_id,
'comment_no': row.comment_no,
'user_id': row.user_id,
'content': row.content,
'posted_at': row.posted_at,
'nico_count': row.nico_count,
'vpos_ms': row.vpos_ms }
comments.append (comment)
print (json.dumps (comments, default = str))
else:
print ('[]')
class CommentDict (TypedDict):
id: int
video_id: int
comment_no: int
user_id: int
content: str
posted_at: datetime
nico_count: int
vpos_ms: int
if __name__ == '__main__':
main (sys.argv[1])
+65
ファイルの表示
@@ -0,0 +1,65 @@
# pylint: disable = missing-class-docstring
# pylint: disable = missing-function-docstring
"""
動画履歴の情報を取得し,JSON 形式で出力する.
"""
from __future__ import annotations
import json
import sys
from datetime import date, datetime
from typing import cast
from db.config import DB
from db.models import Video, VideoHistory
DB
def main (
views_counts: list[int],
base_date: date,
) -> None:
kiriban_list: list[tuple[int, str, str]] = []
latest_fetched_at = cast (date | None,
(VideoHistory
.where ('fetched_at', '<=', base_date)
.max ('fetched_at')))
if latest_fetched_at is None:
print ('[]')
return
for views_count in views_counts:
targets = { vh.video.code for vh in (
VideoHistory
.where ('fetched_at', latest_fetched_at)
.where ('views_count', '>=', views_count)
.get ()) }
for code in targets:
if code in [kiriban[1] for kiriban in kiriban_list]:
continue
previous_views_count: int | None = (
VideoHistory
.where_has ('video', lambda q, code = code: q.where ('code', code))
.where ('fetched_at', '<', latest_fetched_at)
.max ('views_count'))
if previous_views_count is None:
previous_views_count = 0
if previous_views_count >= views_count:
continue
kiriban_list.append ((views_count, code,
(cast (Video, Video.where ('code', code).first ())
.uploaded_at)))
print (json.dumps (kiriban_list, default = str))
if __name__ == '__main__':
main (list (map (int, sys.argv[2:])),
datetime.strptime (sys.argv[1], '%Y-%m-%d').date ())
+52
ファイルの表示
@@ -0,0 +1,52 @@
# pylint: disable = missing-class-docstring
# pylint: disable = missing-function-docstring
"""
全動画の情報を取得し,JSON 形式で出力する.
"""
from __future__ import annotations
import json
from datetime import date, datetime
from typing import TypedDict
from db.config import DB
from db.models import Video
DB
def main (
) -> None:
videos: list[VideoDict] = []
for row in Video.all ():
deleted_at = row.deleted_at.date () if row.deleted_at else None
video: VideoDict = { 'id': row.id,
'code': row.code,
'user': getattr (row.user, 'code', None),
'title': row.title,
'description': row.description,
'tags': [],
'uploaded_at': row.uploaded_at,
'deleted_at': deleted_at }
for video_tag in row.video_tags:
if video_tag.untagged_at is None:
video['tags'].append (video_tag.tag.name)
videos.append (video)
print (json.dumps (videos, default = str))
class VideoDict (TypedDict):
id: int
code: str
user: str | None
title: str
description: str
tags: list[str]
uploaded_at: datetime
deleted_at: date | None
if __name__ == '__main__':
main ()
+2
ファイルの表示
@@ -0,0 +1,2 @@
CREATE TABLE `nizika_nico`.`tracked_videos` (`id` BIGINT NOT NULL AUTO_INCREMENT , `code` VARCHAR(16) NOT NULL COMMENT '動画コード' , PRIMARY KEY (`id`)) ENGINE = InnoDB COMMENT = '追跡対象動画';
ALTER TABLE `tracked_videos` ADD UNIQUE(`code`);
+3
ファイルの表示
@@ -0,0 +1,3 @@
ALTER TABLE `videos` ADD `user_id` BIGINT NULL DEFAULT NULL COMMENT 'ユーザ Id.' AFTER `code`;
ALTER TABLE `videos` ADD INDEX(`user_id`);
ALTER TABLE `videos` ADD FOREIGN KEY (`user_id`) REFERENCES `users`(`id`) ON DELETE RESTRICT ON UPDATE CASCADE;
+331 -214
ファイルの表示
@@ -1,275 +1,392 @@
# pylint: disable = missing-class-docstring
# pylint: disable = missing-function-docstring
"""
日次で実行し,ぼざクリ DB を最新に更新する.
"""
from __future__ import annotations
import json
import os
import logging
import random
import string
import time
from dataclasses import dataclass
from datetime import datetime
import unicodedata
from datetime import date, datetime, timedelta
from typing import Any, TypedDict, cast
import mysql.connector
import jaconv
import requests
from db.config import DB
from db.models import (Comment,
Tag,
TrackedVideo,
User,
Video,
VideoHistory,
VideoTag)
logger = logging.getLogger (__name__)
logging.basicConfig (
level = logging.INFO,
format = '%(asctime)s %(levelname)s %(message)s')
def main (
) -> None:
conn = mysql.connector.connect (host = os.environ['MYSQL_HOST'],
user = os.environ['MYSQL_USER'],
password = os.environ['MYSQL_PASS'])
now = datetime.now ()
today = now.date ()
video_dao = VideoDao (conn)
search_result = search_nico_by_tags (['伊地知ニジカ',
'ぼざろクリーチャーシリーズ',
'ぼざろクリーチャーシリーズ外伝'])
comments_by_video_code = fetch_comments_by_video_code (search_result['videos'])
api_data = search_nico_by_tags (['伊地知ニジカ', 'ぼざろクリーチャーシリーズ'])
context: UpdateContext = { 'api_data': search_result['videos'],
'comments_by_video_code': comments_by_video_code,
'deletable': search_result['is_complete'] }
update_video_table (video_dao, api_data)
# TODO: 書くこと
connection = DB.connection ()
connection.begin_transaction ()
try:
update_tables (context, now, today)
connection.commit ()
except Exception:
connection.rollback ()
raise
def update_video_table (
video_dao,
api_data: list[dict],
def update_tables (
context: UpdateContext,
now: datetime,
today: date,
) -> None:
# TODO: 書くこと
alive_video_codes: list[str] = []
video_dao.upsert_all (videos)
for datum in context['api_data']:
tag_names = datum['tags'].split ()
normalised_tag_names = {normalise (tag_name) for tag_name in tag_names}
video_dao.delete_nonexistent_data (video_id_list)
user: User | None = None
if datum['userId'] is not None:
user = User.where ('code', str (datum['userId'])).first ()
if user is None:
user = User ()
user.code = str (datum['userId'])
user.save ()
# TODO: 書くこと
video = Video ()
video.code = datum['contentId']
video.user_id = user.id if user else None
video.title = datum['title']
video.description = datum['description'] or ''
video.uploaded_at = datetime.fromisoformat (datum['startTime'])
video.deleted_at = None
video.upsert ()
alive_video_codes.append (video.code)
video_history = VideoHistory ()
video_history.video_id = video.id
video_history.fetched_at = today
video_history.views_count = datum['viewCounter']
video_history.upsert ()
video_tags = [video_tag for video_tag in video.video_tags
if video_tag.untagged_at is None]
for video_tag in video_tags:
tag = video_tag.tag
if tag is None:
continue
if normalise (tag.name) in normalised_tag_names:
continue
video_tag.untagged_at = today
video_tag.save ()
for tag_name in tag_names:
tag = Tag.where ('name', tag_name).first ()
if tag is None:
tag = Tag ()
tag.name = tag_name
tag.save ()
video_tag = (VideoTag.where ('video_id', video.id)
.where ('tag_id', tag.id)
.first ())
if video_tag is None:
video_tag = VideoTag ()
video_tag.video_id = video.id
video_tag.tag_id = tag.id
video_tag.tagged_at = getattr (video_tag, 'tagged_at', None) or today
video_tag.untagged_at = None
video_tag.upsert ()
for com in context['comments_by_video_code'].get (video.code, []):
user = User.where ('code', com['userId']).first ()
if user is None:
user = User ()
user.code = com['userId']
user.save ()
comment = Comment ()
comment.video_id = video.id
comment.comment_no = com['no']
comment.user_id = user.id
comment.content = com['body']
comment.posted_at = datetime.fromisoformat (com['postedAt'])
comment.nico_count = com.get ('nicoruCount', 0)
comment.vpos_ms = com.get ('vposMs', 0)
comment.upsert ()
if not context['deletable']:
logger.warning ('skip soft-delete because the latest fetch was incomplete')
return
if not alive_video_codes:
logger.warning ('skip soft-delete because no alive videos were fetched')
return
videos = (Video.where_not_in ('code', alive_video_codes)
.where_null ('deleted_at')
.get ())
for video in videos:
video.deleted_at = now
video.save ()
def fetch_comments (
video_id: str,
) -> list:
def fetch_video_data (
video_code: str,
) -> dict[str, Any]:
time.sleep (1.2)
headers = { 'X-Frontend-Id': '6',
'X-Frontend-Version': '0' }
action_track_id = (
''.join (random.choice (string.ascii_letters + string.digits)
for _ in range (10))
for _ in range (10))
+ '_'
+ str (random.randrange (10 ** 12, 10 ** 13)))
url = (f"https://www.nicovideo.jp/api/watch/v3_guest/{ video_id }"
+ f"?actionTrackId={ action_track_id }")
url = (f'https://www.nicovideo.jp/api/watch/v3_guest/{ video_code }'
+ f'?actionTrackId={ action_track_id }')
res = requests.post (url, headers = headers, timeout = 60).json ()
return requests.post (url, headers = headers, timeout = 60).json ()
try:
nv_comment = res['data']['comment']['nvComment']
except KeyError:
return []
def fetch_comments_by_video_code (
videos: list[VideoResult],
) -> dict[str, list[CommentResult]]:
comments_by_video_code: dict[str, list[CommentResult]] = {}
for video in videos:
video_code = video['contentId']
try:
comments_by_video_code[video_code] = fetch_comments (video_code)
except (KeyError,
TypeError,
ValueError,
requests.RequestException) as exc:
logger.warning ('failed to fetch comments: %s (%s)', video_code, exc)
comments_by_video_code[video_code] = []
return comments_by_video_code
def fetch_comments (
video_code: str,
) -> list[CommentResult]:
video_data = fetch_video_data (video_code)
nv_comment = (video_data.get ('data', {})
.get ('comment', {})
.get ('nvComment'))
if nv_comment is None:
return []
headers = { 'X-Frontend-Id': '6',
'X-Frontend-Version': '0',
'Content-Type': 'application/json' }
headers = { 'X-Frontend-Id': '6',
'X-Frontend-Version': '0',
'Content-Type': 'application/json' }
params = { 'params': nv_comment['params'],
'additionals': { },
'threadKey': nv_comment['threadKey'] }
params = { 'params': nv_comment['params'],
'additionals': {},
'threadKey': nv_comment['threadKey'] }
url = nv_comment['server'] + '/v1/threads'
res = (requests.post (url, json.dumps (params),
headers = headers,
timeout = 60)
.json ())
response = requests.post (url,
json = params,
headers = headers,
timeout = 60)
response.raise_for_status ()
res = response.json ()
try:
return res['data']['threads'][1]['comments']
except (IndexError, KeyError):
return select_comments_from_threads (res)
def select_comments_from_threads (
response: dict[str, Any],
) -> list[CommentResult]:
threads = response.get ('data', {}).get ('threads', [])
if not isinstance (threads, list):
return []
main_comments: list[CommentResult] = []
fallback_comments: list[CommentResult] = []
def search_nico_by_tag (
tag: str,
) -> list[dict]:
return search_nico_by_tags ([tag])
for thread in threads:
comments = thread.get ('comments') if isinstance (thread, dict) else None
if not isinstance (comments, list):
continue
casted_comments = cast (list[CommentResult], comments)
if len (casted_comments) > len (fallback_comments):
fallback_comments = casted_comments
fork = str (thread.get ('fork', '')).lower ()
label = str (thread.get ('label', '')).lower ()
thread_id = str (thread.get ('id', '')).lower ()
if fork == 'main' or 'main' in label or 'main' in thread_id:
main_comments = casted_comments
selected_comments = main_comments or fallback_comments
deduped_comments: dict[int, CommentResult] = {}
for comment in selected_comments:
comment_no = comment.get ('no')
if not isinstance (comment_no, int):
continue
deduped_comments[comment_no] = comment
return [deduped_comments[comment_no]
for comment_no in sorted (deduped_comments)]
def search_nico_by_tags (
tags: list[str],
) -> list[dict]:
) -> SearchNicoResult:
today = datetime.now ()
url = ('https://snapshot.search.nicovideo.jp'
+ '/api/v2/snapshot/video/contents/search')
+ '/api/v2/snapshot/video/contents/search')
# TODO: 年月日の設定ができてゐなぃのと,100 件までしか取得できなぃので何とかすること
result_by_video_code: dict[str, VideoResult] = {}
is_complete = True
to = datetime (2022, 12, 3)
while to <= today:
time.sleep (1.2)
until = to + timedelta (days = 14)
# pylint: disable = consider-using-f-string
query_filter = json.dumps ({ 'type': 'or',
'filters': [
{ 'type': 'range',
'field': 'startTime',
'from': ('%04d-%02d-%02dT00:00:00+09:00'
% (to.year, to.month, to.day)),
'to': ('%04d-%02d-%02dT23:59:59+09:00'
% (until.year, until.month, until.day)),
'include_lower': True }] })
params: VideoSearchParam = { 'q': ' OR '.join (tags),
'targets': 'tagsExact',
'_sort': '-viewCounter',
'fields': ('contentId,'
'userId,'
'title,'
'tags,'
'description,'
'viewCounter,'
'startTime'),
'_limit': 100,
'jsonFilter': query_filter }
try:
response = requests.get (
url,
params = cast (dict[str, int | str], params),
timeout = 60)
response.raise_for_status ()
res = response.json ()
for datum in cast (list[VideoResult], res.get ('data', [])):
result_by_video_code[datum['contentId']] = datum
except (ValueError, requests.RequestException) as exc:
logger.warning ('snapshot fetch failed: %s - %s (%s)',
to.date (),
until.date (),
exc)
is_complete = False
to = until + timedelta (days = 1)
query_filter = json.dumps ({ 'type': 'or',
'filters': [
{ 'type': 'range',
'field': 'startTime',
'from': f"{year}-{start}T00:00:00+09:00",
'to': f"{year}-{end}T23:59:59+09:00",
'include_lower': True }] })
for video in TrackedVideo.get ():
if video.code in result_by_video_code:
continue
try:
tracked_video = video
video_data = fetch_video_data (tracked_video.code)['data']
owner = video_data.get ('owner') or {}
video_info = video_data['video']
result_by_video_code[tracked_video.code] = {
'contentId': tracked_video.code,
'userId': owner.get ('id'),
'title': video_info['title'],
'tags': ' '.join (map (lambda t: t['name'],
video_data['tag']['items'])),
'description': video_info['description'],
'viewCounter': video_info['count']['view'],
'startTime': video_info['registeredAt'] }
except (KeyError,
TypeError,
ValueError,
requests.RequestException) as exc:
logger.warning ('tracked video fetch failed: %s (%s)', video.code, exc)
is_complete = False
params: dict[str, int | str]
params = { 'q': ' OR '.join (tags),
'targets': 'tagsExact',
'_sort': '-viewCounter',
'fields': 'contentId,title,tags,viewCounter,startTime',
'_limit': 100,
'jsonFilter': query_filter }
res = requests.get (url, params = params, timeout = 60).json ()
return res['data']
return { 'videos': list (result_by_video_code.values ()),
'is_complete': is_complete }
class VideoDao:
def __init__ (
self,
conn
):
self.conn = conn
def fetch (
self,
video_id: int,
with_relation_tables: bool = True,
) -> VideoDto | None:
with self.conn.cursor () as c:
c.execute ("""
SELECT
id,
code,
title,
description,
uploaded_at,
deleted_at
FROM
videos
WHERE
id = %s
ORDER BY
id""", video_id)
row = c.fetchone ()
if row is None:
return None
return self._create_dto_from_row (row, with_relation_tables)
def fetch_all (
self,
with_relation_tables: bool = True,
) -> list[VideoDto]:
with self.conn.cursor () as c:
c.execute ("""
SELECT
id,
code,
title,
description,
uploaded_at,
deleted_at
FROM
videos
ORDER BY
id""")
videos: list[VideoDto] = []
for row in c.fetchall ():
videos.append (self._create_dto_from_row (row, with_relation_tables))
return videos
def _create_dto_from_row (
self,
row,
with_relation_tables: bool,
) -> VideoDto:
video = VideoDto (id_ = row['id'],
code = row['code'],
title = row['title'],
description = row['description'],
uploaded_at = row['uploaded_at'],
deleted_at = row['deleted_at'],
video_tags = None,
comments = None,
video_histories = None)
if with_relation_tables:
video.video_tags = VideoTagDao (self.conn).fetch_by_video_id (video.id_, False)
for i in range (len (video.video_tags)):
video.video_tags[i].video = video
video.comments = CommentDao (self.conn).fetch_by_video_id (video.id_, False)
for i in range (len (video.comments)):
video.comments[i].video = video
video.video_histories = VideoHistoryDao (self.conn).fetch_by_video_id (video.id_, False)
for i in range (len (video.video_histories)):
video.video_histories[i].video = video
return video
def normalise (
text: str,
) -> str:
return jaconv.hira2kata (
unicodedata.normalize ('NFKC', text.strip ())).lower ()
@dataclass (slots = True)
class VideoDto:
id_: int
code: str
title: str
description: str
uploaded_at: datetime
deleted_at: datetime | None
video_tags: list[VideoTagDto] | None
comments: list[CommentDto] | None
video_histories: list[VideoHistoryDto] | None
class SearchNicoResult (TypedDict):
videos: list['VideoResult']
is_complete: bool
class VideoTagDao:
def __init__ (
self,
conn,
):
self.conn = conn
def fetch_by_video_id (
self,
video_id: int,
with_relation_tables: bool = True,
) -> list[VideoTagDto]:
with self.conn.cursor () as c:
c.execute ("""
SELECT
id,
video_id,
tag_id,
tagged_at,
untagged_at
FROM
video_tags
WHERE
video_id = %s
ORDER BY
id""", video_id)
video_tags: list[VideoTagDto] = []
for row in c.fetchall ():
video_tags.append (self._create_dto_from_row (row, with_relation_tables))
return video_tags
def _create_dto_from_row (
self,
row,
with_relation_tables: bool,
) -> VideoTagDto:
video_tag = VideoTagDto (id_ = row['id'],
video_id = row['video_id'],
tag_id = row['tag_id'],
tagged_at = row['tagged_at'],
untagged_at = row['untagged_at'],
video = None,
tag = None)
if with_relation_tables:
video_tag.video = VideoDao (self.conn).fetch (video_tag.video_id, True)
video_tag.tag = TagDao (self.conn).fetch (video_tag.tag_id, True)
return video_tag
class UpdateContext (TypedDict):
api_data: list['VideoResult']
comments_by_video_code: dict[str, list['CommentResult']]
deletable: bool
@dataclass (slots = True)
class VideoTagDto:
id_: int
video_id: int
tag_id: int
tagged_at: datetime
untagged_at: datetime
video: VideoDto
tag: TagDto
class VideoSearchParam (TypedDict):
q: str
targets: str
_sort: str
fields: str
_limit: int
jsonFilter: str
class VideoResult (TypedDict):
contentId: str
userId: int | None
title: str
tags: str
description: str | None
viewCounter: int
startTime: str
class CommentResult (TypedDict):
no: int
userId: str
body: str
postedAt: str
nicoruCount: int
vposMs: int
if __name__ == '__main__':