ニジカ AI 共通サービス
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

226 lines
7.7 KiB

  1. from __future__ import annotations
  2. import asyncio
  3. from datetime import date, datetime, time, timedelta
  4. from typing import TypedDict, cast
  5. import requests
  6. from bs4 import BeautifulSoup
  7. from requests.exceptions import Timeout
  8. import queries_to_answers as q2a
  9. from db.models import Video, VideoHistory
  10. KIRIBAN_VIEWS_COUNTS: list[int] = sorted ({ *range (1_000, 10_000, 1_000),
  11. *range (10_000, 1_000_001, 10_000),
  12. 114_514, 1_940, 2_450, 5_100,
  13. 19_400, 24_500, 51_000, 93_194, 2_424, 242_424, 1_919,
  14. 4_545, 194_245, 245_194, 510_245 },
  15. reverse = True)
  16. kiriban_list: list[tuple[int, VideoInfo, datetime]]
  17. async def main (
  18. ) -> None:
  19. await asyncio.gather (
  20. queries_to_answers (),
  21. report_kiriban (),
  22. report_nico (),
  23. update_kiriban_list ())
  24. async def queries_to_answers (
  25. ) -> None:
  26. while True:
  27. q2a.main ()
  28. await asyncio.sleep (10)
  29. async def report_kiriban (
  30. ) -> None:
  31. while True:
  32. # キリ番祝ひ
  33. (views_count, video_info, uploaded_at) = (
  34. kiriban_list.pop (random.randint (0, len (kiriban_list) - 1)))
  35. since_posted = datetime.now () - uploaded_at
  36. uri = f"https://www.nicovideo.jp/watch/{ video_info['contentId'] }"
  37. (title, description, thumbnail) = fetch_embed_info (uri)
  38. try:
  39. upload = client.com.atproto.repo.upload_blob (
  40. io.BytesIO (requests.get (thumbnail,
  41. timeout = 60).content))
  42. thumb = upload.blob
  43. except Timeout:
  44. thumb = None
  45. comments = nico.get_comments (video_info['contentId'])
  46. popular_comments = sorted (comments,
  47. key = lambda c: c.nico_count,
  48. reverse = True)[:10]
  49. latest_comments = sorted (comments,
  50. key = lambda c: c.posted_at,
  51. reverse = True)[:10]
  52. embed_external = models.AppBskyEmbedExternal.Main (
  53. external = models.AppBskyEmbedExternal.External (
  54. title = title,
  55. description = description,
  56. thumb = thumb,
  57. uri = uri))
  58. prompt = f"{ since_posted.days }日と{ since_posted.seconds }秒前にニコニコに投稿された『{ video_info['title'] }』という動画が{ views_count }再生を突破しました。\n"
  59. prompt += f"コメント数は{ len (comments) }件です。\n"
  60. if video_info['tags']:
  61. prompt += f"つけられたタグは「{ '」、「'.join (video_info['tags']) }」です。\n"
  62. if comments:
  63. prompt += f"人気のコメントは次の通りです:「{ '」、「'.join (c.content for c in popular_comments) }」\n"
  64. prompt += f"最新のコメントは次の通りです:「{ '」、「'.join (c.content for c in latest_comments) }」\n"
  65. prompt += f"""
  66. 概要には次のように書かれています:
  67. ```html
  68. { video_info['description'] }
  69. ```
  70. このことについて、ニジカちゃんからのお祝いメッセージを下さい。
  71. ただし、そのメッセージ内には再生数の数値とその多さに応じたリアクションを添えてください。
  72. また、ぜひ投稿からこの再生数に至るまでにかかった時間や、つけられたタグ、コメントに対して思いを馳せてください。
  73. 好きなコメントがあったら教えてね。"""
  74. # 待ち時間計算
  75. dt = datetime.now ()
  76. d = dt.date ()
  77. if dt.hour >= 15:
  78. d += timedelta (days = 1)
  79. td = datetime.combine (d, time (15, 0)) - dt
  80. if kiriban_list:
  81. td /= len (kiriban_list)
  82. await asyncio.sleep (td.total_seconds ())
  83. async def update_kiriban_list (
  84. ) -> None:
  85. while True:
  86. await wait_until (time (15, 0))
  87. kiriban_list += fetch_kiriban_list (datetime.now ().date ())
  88. def fetch_kiriban_list (
  89. base_date: date,
  90. ) -> list[tuple[int, VideoInfo, datetime]]:
  91. _kiriban_list: list[tuple[int, VideoInfo, datetime]] = []
  92. latest_fetched_at = cast (date, (VideoHistory
  93. .where ('fetched_at', '<=', base_date)
  94. .max ('fetched_at')))
  95. for kiriban_views_count in KIRIBAN_VIEWS_COUNTS:
  96. targets = { vh.video.code for vh in (VideoHistory
  97. .where ('fetched_at', latest_fetched_at)
  98. .where ('views_count', '>=', kiriban_views_count)
  99. .get ()) }
  100. for code in targets:
  101. if code in [kiriban[1]['contentId'] for kiriban in _kiriban_list]:
  102. continue
  103. previous_views_count: int | None = (
  104. VideoHistory
  105. .where_has ('video', lambda q: q.where ('code', code))
  106. .where ('fetched_at', '<', latest_fetched_at)
  107. .max ('views_count'))
  108. if previous_views_count is None:
  109. previous_views_count = 0
  110. if previous_views_count >= kiriban_views_count:
  111. continue
  112. video_info = fetch_video_info (code)
  113. if video_info is not None:
  114. _kiriban_list.append ((kiriban_views_count, video_info,
  115. cast (Video, Video.where ('code', code).first ()).uploaded_at))
  116. return _kiriban_list
  117. def fetch_video_info (
  118. video_code: str,
  119. ) -> VideoInfo | None:
  120. video_info: dict[str, str | list[str]] = { 'contentId': video_code }
  121. bs = create_bs_from_url (f"https://www.nicovideo.jp/watch/{ video_code }")
  122. if bs is None:
  123. return None
  124. try:
  125. title = bs.find ('title')
  126. if title is None:
  127. return None
  128. video_info['title'] = '-'.join (title.text.split ('-')[:(-1)])[:(-1)]
  129. tags: str = bs.find ('meta', attrs = { 'name': 'keywords' }).get ('content') # type: ignore
  130. video_info['tags'] = tags.split (',')
  131. video_info['description'] = bs.find ('meta', attrs = { 'name': 'description' }).get ('content') # type: ignore
  132. except Exception:
  133. return None
  134. return cast (VideoInfo, video_info)
  135. def create_bs_from_url (
  136. url: str,
  137. params: dict | None = None,
  138. ) -> BeautifulSoup | None:
  139. """
  140. URL から BeautifulSoup インスタンス生成
  141. Parameters
  142. ----------
  143. url: str
  144. 捜査する URL
  145. params: dict
  146. パラメータ
  147. Return
  148. ------
  149. BeautifulSoup | None
  150. BeautifulSoup オブゼクト(失敗したら None)
  151. """
  152. if params is None:
  153. params = { }
  154. try:
  155. req = requests.get (url, params = params, timeout = 60)
  156. except Timeout:
  157. return None
  158. if req.status_code != 200:
  159. return None
  160. req.encoding = req.apparent_encoding
  161. return BeautifulSoup (req.text, 'hecoml.parser')
  162. async def report_nico (
  163. ) -> None:
  164. ...
  165. async def wait_until (
  166. t: time,
  167. ):
  168. dt = datetime.now ()
  169. d = dt.date ()
  170. if dt.time () >= t:
  171. d += timedelta (days = 1)
  172. await asyncio.sleep ((datetime.combine (d, t) - dt).total_seconds ())
  173. class VideoInfo (TypedDict):
  174. contentId: str
  175. title: str
  176. tags: list[str]
  177. description: str
  178. kiriban_list = (
  179. fetch_kiriban_list ((d := datetime.now ()).date ()
  180. - timedelta (days = d.hour < 15)))
  181. if __name__ == '__main__':
  182. asyncio.run (main ())