ぼざろクリーチャーシリーズ DB 兼 API(自分用)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

394 lines
13 KiB

  1. # pylint: disable = missing-class-docstring
  2. # pylint: disable = missing-function-docstring
  3. """
  4. 日次で実行し,ぼざクリ DB を最新に更新する.
  5. """
  6. from __future__ import annotations
  7. import json
  8. import logging
  9. import random
  10. import string
  11. import time
  12. import unicodedata
  13. from datetime import date, datetime, timedelta
  14. from typing import Any, TypedDict, cast
  15. import jaconv
  16. import requests
  17. from db.config import DB
  18. from db.models import (Comment,
  19. Tag,
  20. TrackedVideo,
  21. User,
  22. Video,
  23. VideoHistory,
  24. VideoTag)
  25. logger = logging.getLogger (__name__)
  26. logging.basicConfig (
  27. level = logging.INFO,
  28. format = '%(asctime)s %(levelname)s %(message)s')
  29. def main (
  30. ) -> None:
  31. now = datetime.now ()
  32. today = now.date ()
  33. search_result = search_nico_by_tags (['伊地知ニジカ',
  34. 'ぼざろクリーチャーシリーズ',
  35. 'ぼざろクリーチャーシリーズ外伝'])
  36. comments_by_video_code = fetch_comments_by_video_code (search_result['videos'])
  37. context: UpdateContext = { 'api_data': search_result['videos'],
  38. 'comments_by_video_code': comments_by_video_code,
  39. 'deletable': search_result['is_complete'] }
  40. connection = DB.connection ()
  41. connection.begin_transaction ()
  42. try:
  43. update_tables (context, now, today)
  44. connection.commit ()
  45. except Exception:
  46. connection.rollback ()
  47. raise
  48. def update_tables (
  49. context: UpdateContext,
  50. now: datetime,
  51. today: date,
  52. ) -> None:
  53. alive_video_codes: list[str] = []
  54. for datum in context['api_data']:
  55. tag_names = datum['tags'].split ()
  56. normalised_tag_names = {normalise (tag_name) for tag_name in tag_names}
  57. user: User | None = None
  58. if datum['userId'] is not None:
  59. user = User.where ('code', str (datum['userId'])).first ()
  60. if user is None:
  61. user = User ()
  62. user.code = str (datum['userId'])
  63. user.save ()
  64. video = Video ()
  65. video.code = datum['contentId']
  66. video.user_id = user.id if user else None
  67. video.title = datum['title']
  68. video.description = datum['description'] or ''
  69. video.uploaded_at = datetime.fromisoformat (datum['startTime'])
  70. video.deleted_at = None
  71. video.upsert ()
  72. alive_video_codes.append (video.code)
  73. video_history = VideoHistory ()
  74. video_history.video_id = video.id
  75. video_history.fetched_at = today
  76. video_history.views_count = datum['viewCounter']
  77. video_history.upsert ()
  78. video_tags = [video_tag for video_tag in video.video_tags
  79. if video_tag.untagged_at is None]
  80. for video_tag in video_tags:
  81. tag = video_tag.tag
  82. if tag is None:
  83. continue
  84. if normalise (tag.name) in normalised_tag_names:
  85. continue
  86. video_tag.untagged_at = today
  87. video_tag.save ()
  88. for tag_name in tag_names:
  89. tag = Tag.where ('name', tag_name).first ()
  90. if tag is None:
  91. tag = Tag ()
  92. tag.name = tag_name
  93. tag.save ()
  94. video_tag = (VideoTag.where ('video_id', video.id)
  95. .where ('tag_id', tag.id)
  96. .first ())
  97. if video_tag is None:
  98. video_tag = VideoTag ()
  99. video_tag.video_id = video.id
  100. video_tag.tag_id = tag.id
  101. video_tag.tagged_at = getattr (video_tag, 'tagged_at', None) or today
  102. video_tag.untagged_at = None
  103. video_tag.upsert ()
  104. for com in context['comments_by_video_code'].get (video.code, []):
  105. user = User.where ('code', com['userId']).first ()
  106. if user is None:
  107. user = User ()
  108. user.code = com['userId']
  109. user.save ()
  110. comment = Comment ()
  111. comment.video_id = video.id
  112. comment.comment_no = com['no']
  113. comment.user_id = user.id
  114. comment.content = com['body']
  115. comment.posted_at = datetime.fromisoformat (com['postedAt'])
  116. comment.nico_count = com.get ('nicoruCount', 0)
  117. comment.vpos_ms = com.get ('vposMs', 0)
  118. comment.upsert ()
  119. if not context['deletable']:
  120. logger.warning ('skip soft-delete because the latest fetch was incomplete')
  121. return
  122. if not alive_video_codes:
  123. logger.warning ('skip soft-delete because no alive videos were fetched')
  124. return
  125. videos = (Video.where_not_in ('code', alive_video_codes)
  126. .where_null ('deleted_at')
  127. .get ())
  128. for video in videos:
  129. video.deleted_at = now
  130. video.save ()
  131. def fetch_video_data (
  132. video_code: str,
  133. ) -> dict[str, Any]:
  134. time.sleep (1.2)
  135. headers = { 'X-Frontend-Id': '6',
  136. 'X-Frontend-Version': '0' }
  137. action_track_id = (
  138. ''.join (random.choice (string.ascii_letters + string.digits)
  139. for _ in range (10))
  140. + '_'
  141. + str (random.randrange (10 ** 12, 10 ** 13)))
  142. url = (f'https://www.nicovideo.jp/api/watch/v3_guest/{ video_code }'
  143. + f'?actionTrackId={ action_track_id }')
  144. return requests.post (url, headers = headers, timeout = 60).json ()
  145. def fetch_comments_by_video_code (
  146. videos: list[VideoResult],
  147. ) -> dict[str, list[CommentResult]]:
  148. comments_by_video_code: dict[str, list[CommentResult]] = {}
  149. for video in videos:
  150. video_code = video['contentId']
  151. try:
  152. comments_by_video_code[video_code] = fetch_comments (video_code)
  153. except (KeyError,
  154. TypeError,
  155. ValueError,
  156. requests.RequestException) as exc:
  157. logger.warning ('failed to fetch comments: %s (%s)', video_code, exc)
  158. comments_by_video_code[video_code] = []
  159. return comments_by_video_code
  160. def fetch_comments (
  161. video_code: str,
  162. ) -> list[CommentResult]:
  163. video_data = fetch_video_data (video_code)
  164. nv_comment = (video_data.get ('data', {})
  165. .get ('comment', {})
  166. .get ('nvComment'))
  167. if nv_comment is None:
  168. return []
  169. headers = { 'X-Frontend-Id': '6',
  170. 'X-Frontend-Version': '0',
  171. 'Content-Type': 'application/json' }
  172. params = { 'params': nv_comment['params'],
  173. 'additionals': {},
  174. 'threadKey': nv_comment['threadKey'] }
  175. url = nv_comment['server'] + '/v1/threads'
  176. response = requests.post (url,
  177. json = params,
  178. headers = headers,
  179. timeout = 60)
  180. response.raise_for_status ()
  181. res = response.json ()
  182. return select_comments_from_threads (res)
  183. def select_comments_from_threads (
  184. response: dict[str, Any],
  185. ) -> list[CommentResult]:
  186. threads = response.get ('data', {}).get ('threads', [])
  187. if not isinstance (threads, list):
  188. return []
  189. main_comments: list[CommentResult] = []
  190. fallback_comments: list[CommentResult] = []
  191. for thread in threads:
  192. comments = thread.get ('comments') if isinstance (thread, dict) else None
  193. if not isinstance (comments, list):
  194. continue
  195. casted_comments = cast (list[CommentResult], comments)
  196. if len (casted_comments) > len (fallback_comments):
  197. fallback_comments = casted_comments
  198. fork = str (thread.get ('fork', '')).lower ()
  199. label = str (thread.get ('label', '')).lower ()
  200. thread_id = str (thread.get ('id', '')).lower ()
  201. if fork == 'main' or 'main' in label or 'main' in thread_id:
  202. main_comments = casted_comments
  203. selected_comments = main_comments or fallback_comments
  204. deduped_comments: dict[int, CommentResult] = {}
  205. for comment in selected_comments:
  206. comment_no = comment.get ('no')
  207. if not isinstance (comment_no, int):
  208. continue
  209. deduped_comments[comment_no] = comment
  210. return [deduped_comments[comment_no]
  211. for comment_no in sorted (deduped_comments)]
  212. def search_nico_by_tags (
  213. tags: list[str],
  214. ) -> SearchNicoResult:
  215. today = datetime.now ()
  216. url = ('https://snapshot.search.nicovideo.jp'
  217. + '/api/v2/snapshot/video/contents/search')
  218. result_by_video_code: dict[str, VideoResult] = {}
  219. is_complete = True
  220. to = datetime (2022, 12, 3)
  221. while to <= today:
  222. time.sleep (1.2)
  223. until = to + timedelta (days = 14)
  224. # pylint: disable = consider-using-f-string
  225. query_filter = json.dumps ({ 'type': 'or',
  226. 'filters': [
  227. { 'type': 'range',
  228. 'field': 'startTime',
  229. 'from': ('%04d-%02d-%02dT00:00:00+09:00'
  230. % (to.year, to.month, to.day)),
  231. 'to': ('%04d-%02d-%02dT23:59:59+09:00'
  232. % (until.year, until.month, until.day)),
  233. 'include_lower': True }] })
  234. params: VideoSearchParam = { 'q': ' OR '.join (tags),
  235. 'targets': 'tagsExact',
  236. '_sort': '-viewCounter',
  237. 'fields': ('contentId,'
  238. 'userId,'
  239. 'title,'
  240. 'tags,'
  241. 'description,'
  242. 'viewCounter,'
  243. 'startTime'),
  244. '_limit': 100,
  245. 'jsonFilter': query_filter }
  246. try:
  247. response = requests.get (
  248. url,
  249. params = cast (dict[str, int | str], params),
  250. timeout = 60)
  251. response.raise_for_status ()
  252. res = response.json ()
  253. for datum in cast (list[VideoResult], res.get ('data', [])):
  254. result_by_video_code[datum['contentId']] = datum
  255. except (ValueError, requests.RequestException) as exc:
  256. logger.warning ('snapshot fetch failed: %s - %s (%s)',
  257. to.date (),
  258. until.date (),
  259. exc)
  260. is_complete = False
  261. to = until + timedelta (days = 1)
  262. for video in TrackedVideo.get ():
  263. if video.code in result_by_video_code:
  264. continue
  265. try:
  266. tracked_video = video
  267. video_data = fetch_video_data (tracked_video.code)['data']
  268. owner = video_data.get ('owner') or {}
  269. video_info = video_data['video']
  270. result_by_video_code[tracked_video.code] = {
  271. 'contentId': tracked_video.code,
  272. 'userId': owner.get ('id'),
  273. 'title': video_info['title'],
  274. 'tags': ' '.join (map (lambda t: t['name'],
  275. video_data['tag']['items'])),
  276. 'description': video_info['description'],
  277. 'viewCounter': video_info['count']['view'],
  278. 'startTime': video_info['registeredAt'] }
  279. except (KeyError,
  280. TypeError,
  281. ValueError,
  282. requests.RequestException) as exc:
  283. logger.warning ('tracked video fetch failed: %s (%s)', video.code, exc)
  284. is_complete = False
  285. return { 'videos': list (result_by_video_code.values ()),
  286. 'is_complete': is_complete }
  287. def normalise (
  288. text: str,
  289. ) -> str:
  290. return jaconv.hira2kata (
  291. unicodedata.normalize ('NFKC', text.strip ())).lower ()
  292. class SearchNicoResult (TypedDict):
  293. videos: list['VideoResult']
  294. is_complete: bool
  295. class UpdateContext (TypedDict):
  296. api_data: list['VideoResult']
  297. comments_by_video_code: dict[str, list['CommentResult']]
  298. deletable: bool
  299. class VideoSearchParam (TypedDict):
  300. q: str
  301. targets: str
  302. _sort: str
  303. fields: str
  304. _limit: int
  305. jsonFilter: str
  306. class VideoResult (TypedDict):
  307. contentId: str
  308. userId: int | None
  309. title: str
  310. tags: str
  311. description: str | None
  312. viewCounter: int
  313. startTime: str
  314. class CommentResult (TypedDict):
  315. no: int
  316. userId: str
  317. body: str
  318. postedAt: str
  319. nicoruCount: int
  320. vposMs: int
  321. if __name__ == '__main__':
  322. main ()