|
- """
- 日次で実行し,ぼざクリ DB を最新に更新する.
- """
-
- from __future__ import annotations
-
- import json
- import os
- import random
- import string
- import time
- import unicodedata
- from dataclasses import dataclass
- from datetime import date, datetime, timedelta
- from typing import Any, Type, TypedDict, cast
-
- import requests
- from eloquent import DatabaseManager, Model
- from eloquent.orm.relations.dynamic_property import DynamicProperty
-
- config: dict[str, DbConfig] = { 'mysql': { 'driver': 'mysql',
- 'host': 'localhost',
- 'database': 'nizika_nico',
- 'user': os.environ['MYSQL_USER'],
- 'password': os.environ['MYSQL_PASS'],
- 'prefix': '' } }
- db = DatabaseManager (config)
- Model.set_connection_resolver (db)
-
-
- def main (
- ) -> None:
- now = datetime.now ()
-
- api_data = search_nico_by_tags (['伊地知ニジカ', 'ぼざろクリーチャーシリーズ'])
- update_tables (api_data, now)
-
-
- def update_tables (
- api_data: list[VideoResult],
- now: datetime,
- ) -> None:
- alive_video_codes: list[str] = []
-
- for datum in api_data:
- tag_names: list[str] = datum['tags'].split ()
- video = Video ()
- video.code = datum['contentId']
- video.title = datum['title']
- video.description = datum['description'] or ''
- video.uploaded_at = datetime.fromisoformat (datum['startTime'])
- video.deleted_at = None
- video.upsert ()
- alive_video_codes.append (video.code)
- video_history = VideoHistory ()
- video_history.video_id = video.id
- video_history.fetched_at = now
- video_history.views_count = datum['viewCounter']
- video_history.save ()
- video_tags = video.video_tags.where_not_null ('untagged_at').get ()
- for video_tag in video_tags:
- tag = video_tag.tag
- if (tag is not None
- and (normalise (tag.name) not in map (normalise, tag_names))):
- video_tag.untagged_at = now
- video_tag.save ()
- for tag_name in tag_names:
- tag = Tag.where ('name', tag_name).first ()
- if tag is None:
- tag = Tag ()
- tag.name = tag_name
- tag.save ()
- video_tag = (Video.where ('video_id', video.id)
- .where ('tag_id', tag.id)
- .where_null ('untagged_at')
- .first ())
- if video_tag is None:
- video_tag = VideoTag ()
- video_tag.video_id = video.id
- video_tag.tag_id = tag.id
- video_tag.tagged_at = now
- video_tag.untagged_at = None
- video_tag.save ()
- for com in fetch_comments (video.code):
- user = User.where ('code', com['userId']).first ()
- if user is None:
- user = User ()
- user.code = com['userId']
- user.save ()
- comment = Comment ()
- comment.video_id = video.id
- comment.comment_no = com['no']
- comment.user_id = user.id
- comment.content = com['body']
- comment.posted_at = datetime.fromisoformat (com['postedAt'])
- comment.nico_count = com['nicoruCount']
- comment.vpos_ms = com['vposMs']
- comment.upsert ()
-
- # 削除動画
- videos = (Video.where_not_in ('code', alive_video_codes)
- .where_null ('deleted_at')
- .get ())
- for video in videos:
- if video.code not in alive_video_codes:
- video.deleted_at = now
- video.save ()
-
-
- def fetch_comments (
- video_code: str,
- ) -> list[CommentResult]:
- time.sleep (1.2)
-
- headers = { 'X-Frontend-Id': '6',
- 'X-Frontend-Version': '0' }
-
- action_track_id = (
- ''.join (random.choice (string.ascii_letters + string.digits)
- for _ in range (10))
- + '_'
- + str (random.randrange (10 ** 12, 10 ** 13)))
-
- url = (f"https://www.nicovideo.jp/api/watch/v3_guest/{ video_code }"
- + f"?actionTrackId={ action_track_id }")
-
- res = requests.post (url, headers = headers, timeout = 60).json ()
-
- try:
- nv_comment = res['data']['comment']['nvComment']
- except KeyError:
- return []
- if nv_comment is None:
- return []
-
- headers = { 'X-Frontend-Id': '6',
- 'X-Frontend-Version': '0',
- 'Content-Type': 'application/json' }
-
- params = { 'params': nv_comment['params'],
- 'additionals': { },
- 'threadKey': nv_comment['threadKey'] }
-
- url = nv_comment['server'] + '/v1/threads'
-
- res = (requests.post (url, json.dumps (params),
- headers = headers,
- timeout = 60)
- .json ())
-
- try:
- return res['data']['threads'][1]['comments']
- except (IndexError, KeyError):
- return []
-
-
- def search_nico_by_tag (
- tag: str,
- ) -> list[VideoResult]:
- return search_nico_by_tags ([tag])
-
-
- def search_nico_by_tags (
- tags: list[str],
- ) -> list[VideoResult]:
- today = datetime.now ()
-
- url = ('https://snapshot.search.nicovideo.jp'
- + '/api/v2/snapshot/video/contents/search')
-
- result_data: list[VideoResult] = []
- to = datetime (2022, 12, 3)
- while to <= today:
- time.sleep (1.2)
- until = to + timedelta (days = 14)
- query_filter = json.dumps ({ 'type': 'or',
- 'filters': [
- { 'type': 'range',
- 'field': 'startTime',
- 'from': '%04d-%02d-%02dT00:00:00+09:00' % (to.year, to.month, to.day),
- 'to': '%04d-%02d-%02dT23:59:59+09:00' % (until.year, until.month, until.day),
- 'include_lower': True }] })
- params: VideoSearchParam = { 'q': ' OR '.join (tags),
- 'targets': 'tagsExact',
- '_sort': '-viewCounter',
- 'fields': ('contentId,'
- 'title,'
- 'tags,'
- 'description,'
- 'viewCounter,'
- 'startTime'),
- '_limit': 100,
- 'jsonFilter': query_filter }
- res = requests.get (url, params = cast (dict[str, int | str], params), timeout = 60).json ()
- try:
- result_data += res['data']
- except KeyError:
- pass
- to = until + timedelta (days = 1)
-
- return result_data
-
-
- class Comment (Model):
- __timestamps__ = False
-
- video_id: int
- comment_no: int
- user_id: int
- content: str
- posted_at: datetime
- nico_count: int
- vpos_ms: int
-
- @property
- def video (
- self,
- ) -> DynamicProperty:
- return self.belongs_to (Video)
-
- @property
- def user (
- self,
- ) -> DynamicProperty:
- return self.belongs_to (User)
-
- def upsert (
- self,
- ) -> None:
- row = (Comment.where ('video_id', self.video_id)
- .where ('comment_no', self.comment_no)
- .first ())
- if row is not None:
- self.id = row.id
- self.save ()
-
-
- class Tag (Model):
- __timestamps__ = False
-
- name: str
-
- @property
- def video_tags (
- self,
- ) -> DynamicProperty:
- return self.has_many (VideoTag)
-
-
- class User (Model):
- __timestamps__ = False
-
- code: str
-
- @property
- def comments (
- self,
- ) -> DynamicProperty:
- return self.has_many (Comment)
-
-
- class Video (Model):
- __timestamps__ = False
-
- code: str
- title: str
- description: str
- uploaded_at: datetime
- deleted_at: datetime | None
-
- @property
- def video_histories (
- self,
- ) -> DynamicProperty:
- return self.has_many (VideoHistory)
-
- @property
- def video_tags (
- self,
- ) -> DynamicProperty:
- return self.has_many (VideoTag)
-
- @property
- def comments (
- self,
- ) -> DynamicProperty:
- return self.has_many (Comment)
-
- def upsert (
- self,
- ) -> None:
- row = Video.where ('code', self.code).first ()
- if row is not None:
- self.id = row.id
- self.save ()
-
-
- class VideoHistory (Model):
- __timestamps__ = False
-
- video_id: int
- fetched_at: date
- views_count: int
-
- @property
- def video (
- self,
- ) -> DynamicProperty:
- return self.belongs_to (Video)
-
- def upsert (
- self,
- ) -> None:
- row = (VideoHistory.where ('video_id', self.video_id)
- .where ('fetched_at', self.fetched_at)
- .first ())
- if row is not None:
- self.id = row.id
- self.save ()
-
-
- class VideoTag (Model):
- __timestamps__ = False
-
- video_id: int
- tag_id: int
- tagged_at: date
- untagged_at: date | None
-
- @property
- def video (
- self,
- ) -> DynamicProperty:
- return self.belongs_to (Video)
-
- @property
- def tag (
- self,
- ) -> DynamicProperty:
- return self.belongs_to (Tag)
-
- def upsert (
- self,
- ) -> None:
- row = (VideoTag.where ('video_id', self.video_id)
- .where ('tag_id', self.tag_id)
- .first ())
- if row is not None:
- self.id = row.id
- self.save ()
-
-
- class DbConfig (TypedDict):
- driver: str
- host: str
- database: str
- user: str
- password: str
- prefix: str
-
-
- class VideoSearchParam (TypedDict):
- q: str
- targets: str
- _sort: str
- fields: str
- _limit: int
- jsonFilter: str
-
-
- class VideoResult (TypedDict):
- contentId: str
- title: str
- tags: str
- description: str | None
- viewCounter: int
- startTime: str
-
-
- class CommentResult (TypedDict):
- id: str
- no: int
- vposMs: int
- body: str
- commands: list[str]
- userId: str
- isPremium: bool
- score: int
- postedAt: str
- nicoruCount: int
- nicoruId: Any
- source: str
- isMyPost: bool
-
-
- def normalise (
- s: str,
- ) -> str:
- return unicodedata.normalize ('NFKC', s).lower ()
-
-
- if __name__ == '__main__':
- main ()
|