ぼざろクリーチャーシリーズ DB 兼 API(自分用)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

264 lines
8.3 KiB

  1. # pylint: disable = missing-class-docstring
  2. # pylint: disable = missing-function-docstring
  3. """
  4. 日次で実行し,ぼざクリ DB を最新に更新する.
  5. """
  6. from __future__ import annotations
  7. import json
  8. import os
  9. import random
  10. import string
  11. import time
  12. import unicodedata
  13. from datetime import datetime, timedelta
  14. from typing import Any, TypedDict, cast
  15. import requests
  16. from eloquent import DatabaseManager, Model
  17. from models import Comment, Tag, User, Video, VideoHistory, VideoTag
  18. def main (
  19. ) -> None:
  20. config: dict[str, DbConfig] = { 'mysql': { 'driver': 'mysql',
  21. 'host': 'localhost',
  22. 'database': 'nizika_nico',
  23. 'user': os.environ['MYSQL_USER'],
  24. 'password': os.environ['MYSQL_PASS'],
  25. 'prefix': '' } }
  26. db = DatabaseManager (config)
  27. Model.set_connection_resolver (db)
  28. now = datetime.now ()
  29. api_data = search_nico_by_tags (['伊地知ニジカ', 'ぼざろクリーチャーシリーズ'])
  30. update_tables (api_data, now)
  31. def update_tables (
  32. api_data: list[VideoResult],
  33. now: datetime,
  34. ) -> None:
  35. alive_video_codes: list[str] = []
  36. for datum in api_data:
  37. tag_names: list[str] = datum['tags'].split ()
  38. video = Video ()
  39. video.code = datum['contentId']
  40. video.title = datum['title']
  41. video.description = datum['description'] or ''
  42. video.uploaded_at = datetime.fromisoformat (datum['startTime'])
  43. video.deleted_at = None
  44. video.upsert ()
  45. alive_video_codes.append (video.code)
  46. video_history = VideoHistory ()
  47. video_history.video_id = video.id
  48. video_history.fetched_at = now
  49. video_history.views_count = datum['viewCounter']
  50. video_history.upsert ()
  51. video_tags = [video_tag for video_tag in video.video_tags
  52. if video_tag.untagged_at is not None]
  53. tag: Tag | None
  54. video_tag: VideoTag | None
  55. for video_tag in video_tags:
  56. tag = video_tag.tag
  57. if (tag is not None
  58. and (normalise (tag.name) not in map (normalise, tag_names))):
  59. video_tag.untagged_at = now
  60. video_tag.save ()
  61. for tag_name in tag_names:
  62. tag = Tag.where ('name', tag_name).first ()
  63. if tag is None:
  64. tag = Tag ()
  65. tag.name = tag_name
  66. tag.save ()
  67. video_tag = (VideoTag.where ('video_id', video.id)
  68. .where ('tag_id', tag.id)
  69. .where_null ('untagged_at')
  70. .first ())
  71. if video_tag is None:
  72. video_tag = VideoTag ()
  73. video_tag.video_id = video.id
  74. video_tag.tag_id = tag.id
  75. video_tag.tagged_at = now
  76. video_tag.untagged_at = None
  77. video_tag.save ()
  78. for com in fetch_comments (video.code):
  79. user = User.where ('code', com['userId']).first ()
  80. if user is None:
  81. user = User ()
  82. user.code = com['userId']
  83. user.save ()
  84. comment = Comment ()
  85. comment.video_id = video.id
  86. comment.comment_no = com['no']
  87. comment.user_id = user.id
  88. comment.content = com['body']
  89. comment.posted_at = datetime.fromisoformat (com['postedAt'])
  90. comment.nico_count = com['nicoruCount']
  91. comment.vpos_ms = com['vposMs']
  92. comment.upsert ()
  93. # 削除動画
  94. videos = (Video.where_not_in ('code', alive_video_codes)
  95. .where_null ('deleted_at')
  96. .get ())
  97. for video in videos:
  98. if video.code not in alive_video_codes:
  99. video.deleted_at = now
  100. video.save ()
  101. def fetch_comments (
  102. video_code: str,
  103. ) -> list[CommentResult]:
  104. time.sleep (1.2)
  105. headers = { 'X-Frontend-Id': '6',
  106. 'X-Frontend-Version': '0' }
  107. action_track_id = (
  108. ''.join (random.choice (string.ascii_letters + string.digits)
  109. for _ in range (10))
  110. + '_'
  111. + str (random.randrange (10 ** 12, 10 ** 13)))
  112. url = (f"https://www.nicovideo.jp/api/watch/v3_guest/{ video_code }"
  113. + f"?actionTrackId={ action_track_id }")
  114. res = requests.post (url, headers = headers, timeout = 60).json ()
  115. try:
  116. nv_comment = res['data']['comment']['nvComment']
  117. except KeyError:
  118. return []
  119. if nv_comment is None:
  120. return []
  121. headers = { 'X-Frontend-Id': '6',
  122. 'X-Frontend-Version': '0',
  123. 'Content-Type': 'application/json' }
  124. params = { 'params': nv_comment['params'],
  125. 'additionals': { },
  126. 'threadKey': nv_comment['threadKey'] }
  127. url = nv_comment['server'] + '/v1/threads'
  128. res = (requests.post (url, json.dumps (params),
  129. headers = headers,
  130. timeout = 60)
  131. .json ())
  132. try:
  133. return res['data']['threads'][1]['comments']
  134. except (IndexError, KeyError):
  135. return []
  136. def search_nico_by_tag (
  137. tag: str,
  138. ) -> list[VideoResult]:
  139. return search_nico_by_tags ([tag])
  140. def search_nico_by_tags (
  141. tags: list[str],
  142. ) -> list[VideoResult]:
  143. today = datetime.now ()
  144. url = ('https://snapshot.search.nicovideo.jp'
  145. + '/api/v2/snapshot/video/contents/search')
  146. result_data: list[VideoResult] = []
  147. to = datetime (2022, 12, 3)
  148. while to <= today:
  149. time.sleep (1.2)
  150. until = to + timedelta (days = 14)
  151. # pylint: disable = consider-using-f-string
  152. query_filter = json.dumps ({ 'type': 'or',
  153. 'filters': [
  154. { 'type': 'range',
  155. 'field': 'startTime',
  156. 'from': ('%04d-%02d-%02dT00:00:00+09:00'
  157. % (to.year, to.month, to.day)),
  158. 'to': ('%04d-%02d-%02dT23:59:59+09:00'
  159. % (until.year, until.month, until.day)),
  160. 'include_lower': True }] })
  161. params: VideoSearchParam = { 'q': ' OR '.join (tags),
  162. 'targets': 'tagsExact',
  163. '_sort': '-viewCounter',
  164. 'fields': ('contentId,'
  165. 'title,'
  166. 'tags,'
  167. 'description,'
  168. 'viewCounter,'
  169. 'startTime'),
  170. '_limit': 100,
  171. 'jsonFilter': query_filter }
  172. res = requests.get (url, params = cast (dict[str, int | str], params), timeout = 60).json ()
  173. try:
  174. result_data += res['data']
  175. except KeyError:
  176. pass
  177. to = until + timedelta (days = 1)
  178. return result_data
  179. class DbConfig (TypedDict):
  180. driver: str
  181. host: str
  182. database: str
  183. user: str
  184. password: str
  185. prefix: str
  186. class VideoSearchParam (TypedDict):
  187. q: str
  188. targets: str
  189. _sort: str
  190. fields: str
  191. _limit: int
  192. jsonFilter: str
  193. class VideoResult (TypedDict):
  194. contentId: str
  195. title: str
  196. tags: str
  197. description: str | None
  198. viewCounter: int
  199. startTime: str
  200. class CommentResult (TypedDict):
  201. id: str
  202. no: int
  203. vposMs: int
  204. body: str
  205. commands: list[str]
  206. userId: str
  207. isPremium: bool
  208. score: int
  209. postedAt: str
  210. nicoruCount: int
  211. nicoruId: Any
  212. source: str
  213. isMyPost: bool
  214. def normalise (
  215. s: str,
  216. ) -> str:
  217. return unicodedata.normalize ('NFKC', s).lower ()
  218. if __name__ == '__main__':
  219. main ()