ぼざろクリーチャーシリーズ DB 兼 API(自分用)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

265 lines
8.3 KiB

  1. # pylint: disable = missing-class-docstring
  2. # pylint: disable = missing-function-docstring
  3. """
  4. 日次で実行し,ぼざクリ DB を最新に更新する.
  5. """
  6. from __future__ import annotations
  7. import json
  8. import os
  9. import random
  10. import string
  11. import time
  12. import unicodedata
  13. from datetime import datetime, timedelta
  14. from typing import Any, TypedDict, cast
  15. import jaconv
  16. import requests
  17. from eloquent import DatabaseManager, Model
  18. from models import Comment, Tag, User, Video, VideoHistory, VideoTag
  19. def main (
  20. ) -> None:
  21. config: dict[str, DbConfig] = { 'mysql': { 'driver': 'mysql',
  22. 'host': 'localhost',
  23. 'database': 'nizika_nico',
  24. 'user': os.environ['MYSQL_USER'],
  25. 'password': os.environ['MYSQL_PASS'],
  26. 'prefix': '' } }
  27. db = DatabaseManager (config)
  28. Model.set_connection_resolver (db)
  29. now = datetime.now ()
  30. api_data = search_nico_by_tags (['伊地知ニジカ', 'ぼざろクリーチャーシリーズ'])
  31. update_tables (api_data, now)
  32. def update_tables (
  33. api_data: list[VideoResult],
  34. now: datetime,
  35. ) -> None:
  36. alive_video_codes: list[str] = []
  37. for datum in api_data:
  38. tag_names: list[str] = datum['tags'].split ()
  39. video = Video ()
  40. video.code = datum['contentId']
  41. video.title = datum['title']
  42. video.description = datum['description'] or ''
  43. video.uploaded_at = datetime.fromisoformat (datum['startTime'])
  44. video.deleted_at = None
  45. video.upsert ()
  46. alive_video_codes.append (video.code)
  47. video_history = VideoHistory ()
  48. video_history.video_id = video.id
  49. video_history.fetched_at = now
  50. video_history.views_count = datum['viewCounter']
  51. video_history.upsert ()
  52. video_tags = [video_tag for video_tag in video.video_tags
  53. if video_tag.untagged_at is not None]
  54. tag: Tag | None
  55. video_tag: VideoTag | None
  56. for video_tag in video_tags:
  57. tag = video_tag.tag
  58. if (tag is not None
  59. and (normalise (tag.name) not in map (normalise, tag_names))):
  60. video_tag.untagged_at = now
  61. video_tag.save ()
  62. for tag_name in tag_names:
  63. tag = Tag.where ('name', tag_name).first ()
  64. if tag is None:
  65. tag = Tag ()
  66. tag.name = tag_name
  67. tag.save ()
  68. video_tag = (VideoTag.where ('video_id', video.id)
  69. .where ('tag_id', tag.id)
  70. .where_null ('untagged_at')
  71. .first ())
  72. if video_tag is None:
  73. video_tag = VideoTag ()
  74. video_tag.video_id = video.id
  75. video_tag.tag_id = tag.id
  76. video_tag.tagged_at = now
  77. video_tag.untagged_at = None
  78. video_tag.save ()
  79. for com in fetch_comments (video.code):
  80. user = User.where ('code', com['userId']).first ()
  81. if user is None:
  82. user = User ()
  83. user.code = com['userId']
  84. user.save ()
  85. comment = Comment ()
  86. comment.video_id = video.id
  87. comment.comment_no = com['no']
  88. comment.user_id = user.id
  89. comment.content = com['body']
  90. comment.posted_at = datetime.fromisoformat (com['postedAt'])
  91. comment.nico_count = com['nicoruCount']
  92. comment.vpos_ms = com['vposMs']
  93. comment.upsert ()
  94. # 削除動画
  95. videos = (Video.where_not_in ('code', alive_video_codes)
  96. .where_null ('deleted_at')
  97. .get ())
  98. for video in videos:
  99. if video.code not in alive_video_codes:
  100. video.deleted_at = now
  101. video.save ()
  102. def fetch_comments (
  103. video_code: str,
  104. ) -> list[CommentResult]:
  105. time.sleep (1.2)
  106. headers = { 'X-Frontend-Id': '6',
  107. 'X-Frontend-Version': '0' }
  108. action_track_id = (
  109. ''.join (random.choice (string.ascii_letters + string.digits)
  110. for _ in range (10))
  111. + '_'
  112. + str (random.randrange (10 ** 12, 10 ** 13)))
  113. url = (f"https://www.nicovideo.jp/api/watch/v3_guest/{ video_code }"
  114. + f"?actionTrackId={ action_track_id }")
  115. res = requests.post (url, headers = headers, timeout = 60).json ()
  116. try:
  117. nv_comment = res['data']['comment']['nvComment']
  118. except KeyError:
  119. return []
  120. if nv_comment is None:
  121. return []
  122. headers = { 'X-Frontend-Id': '6',
  123. 'X-Frontend-Version': '0',
  124. 'Content-Type': 'application/json' }
  125. params = { 'params': nv_comment['params'],
  126. 'additionals': { },
  127. 'threadKey': nv_comment['threadKey'] }
  128. url = nv_comment['server'] + '/v1/threads'
  129. res = (requests.post (url, json.dumps (params),
  130. headers = headers,
  131. timeout = 60)
  132. .json ())
  133. try:
  134. return res['data']['threads'][1]['comments']
  135. except (IndexError, KeyError):
  136. return []
  137. def search_nico_by_tag (
  138. tag: str,
  139. ) -> list[VideoResult]:
  140. return search_nico_by_tags ([tag])
  141. def search_nico_by_tags (
  142. tags: list[str],
  143. ) -> list[VideoResult]:
  144. today = datetime.now ()
  145. url = ('https://snapshot.search.nicovideo.jp'
  146. + '/api/v2/snapshot/video/contents/search')
  147. result_data: list[VideoResult] = []
  148. to = datetime (2022, 12, 3)
  149. while to <= today:
  150. time.sleep (1.2)
  151. until = to + timedelta (days = 14)
  152. # pylint: disable = consider-using-f-string
  153. query_filter = json.dumps ({ 'type': 'or',
  154. 'filters': [
  155. { 'type': 'range',
  156. 'field': 'startTime',
  157. 'from': ('%04d-%02d-%02dT00:00:00+09:00'
  158. % (to.year, to.month, to.day)),
  159. 'to': ('%04d-%02d-%02dT23:59:59+09:00'
  160. % (until.year, until.month, until.day)),
  161. 'include_lower': True }] })
  162. params: VideoSearchParam = { 'q': ' OR '.join (tags),
  163. 'targets': 'tagsExact',
  164. '_sort': '-viewCounter',
  165. 'fields': ('contentId,'
  166. 'title,'
  167. 'tags,'
  168. 'description,'
  169. 'viewCounter,'
  170. 'startTime'),
  171. '_limit': 100,
  172. 'jsonFilter': query_filter }
  173. res = requests.get (url, params = cast (dict[str, int | str], params), timeout = 60).json ()
  174. try:
  175. result_data += res['data']
  176. except KeyError:
  177. pass
  178. to = until + timedelta (days = 1)
  179. return result_data
  180. class DbConfig (TypedDict):
  181. driver: str
  182. host: str
  183. database: str
  184. user: str
  185. password: str
  186. prefix: str
  187. class VideoSearchParam (TypedDict):
  188. q: str
  189. targets: str
  190. _sort: str
  191. fields: str
  192. _limit: int
  193. jsonFilter: str
  194. class VideoResult (TypedDict):
  195. contentId: str
  196. title: str
  197. tags: str
  198. description: str | None
  199. viewCounter: int
  200. startTime: str
  201. class CommentResult (TypedDict):
  202. id: str
  203. no: int
  204. vposMs: int
  205. body: str
  206. commands: list[str]
  207. userId: str
  208. isPremium: bool
  209. score: int
  210. postedAt: str
  211. nicoruCount: int
  212. nicoruId: Any
  213. source: str
  214. isMyPost: bool
  215. def normalise (
  216. s: str,
  217. ) -> str:
  218. return jaconv.hira2kata (unicodedata.normalize ('NFKC', s)).lower ()
  219. if __name__ == '__main__':
  220. main ()