ニジカのスカトロ,ニジカトロ. https://bsky.app/profile/deerjika-bot.bsky.social
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

main.py 8.6 KiB

3 weeks ago
2 weeks ago
2 weeks ago
2 weeks ago
2 weeks ago
2 weeks ago
2 weeks ago
2 weeks ago
2 weeks ago
3 weeks ago
2 weeks ago
2 weeks ago
2 weeks ago
2 weeks ago
2 weeks ago
2 weeks ago
2 weeks ago
2 weeks ago
2 weeks ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. from datetime import datetime, timedelta
  2. import io
  3. import json
  4. import time
  5. import sys
  6. from atproto import Client, models
  7. from bs4 import BeautifulSoup
  8. from requests.exceptions import Timeout
  9. import requests
  10. from ai.talk import Talk
  11. import account
  12. def check_notifications (
  13. client: Client,
  14. ) -> list:
  15. (uris, last_seen_at) = ([], client.get_current_time_iso ())
  16. for notification in (client.app.bsky.notification.list_notifications ()
  17. .notifications):
  18. if not notification.is_read:
  19. if notification.reason in ['mention', 'reply']:
  20. uris += [notification.uri]
  21. elif notification.reason == 'follow':
  22. client.follow (notification.author.did)
  23. client.app.bsky.notification.update_seen ({ 'seen_at': last_seen_at })
  24. return uris
  25. def get_thread_contents (
  26. client: Client,
  27. uri: str,
  28. parent_height: int,
  29. ) -> list:
  30. response = (client.get_post_thread (uri = uri,
  31. parent_height = parent_height)
  32. .thread)
  33. records = []
  34. while response is not None:
  35. records += [{ 'strong_ref': models.create_strong_ref (response.post),
  36. 'did': response.post.author.did,
  37. 'handle': response.post.author.handle,
  38. 'name': response.post.author.display_name,
  39. 'datetime': response.post.record.created_at,
  40. 'text': response.post.record.text,
  41. 'embed': response.post.record.embed }]
  42. response = response.parent
  43. return records
  44. def main (
  45. ) -> None:
  46. client = Client (base_url = 'https://bsky.social')
  47. client.login (account.USER_ID, account.PASSWORD)
  48. last_posted_at = datetime.now () - timedelta (hours = 6)
  49. has_got_snack_time = False
  50. watched_videos = []
  51. while True:
  52. now = datetime.now ()
  53. for uri in check_notifications (client):
  54. records = get_thread_contents (client, uri, 20)
  55. if len (records) > 0:
  56. answer = Talk.main ((records[0]['text']
  57. if (records[0]['embed'] is None
  58. or not hasattr (records[0]['embed'],
  59. 'images'))
  60. else [
  61. { 'type': 'text', 'text': records[0]['text'] },
  62. { 'type': 'image_url', 'image_url': {
  63. 'url': f"https://cdn.bsky.app/img/feed_fullsize/plain/{ records[0]['did'] }/{ records[0]['embed'].images[0].image.ref.link }" } }]),
  64. records[0]['name'],
  65. [*map (lambda record: {
  66. 'role': ('assistant'
  67. if (record['handle']
  68. == account.USER_ID)
  69. else 'user'),
  70. 'content':
  71. record['text']},
  72. reversed (records[1:]))])
  73. client.post (answer,
  74. reply_to = models.AppBskyFeedPost.ReplyRef (
  75. parent = records[0]['strong_ref'],
  76. root = records[-1]['strong_ref']))
  77. for datum in [e for e in get_nico_deerjika ()
  78. if e['contentId'] not in watched_videos]:
  79. watched_videos += [datum['contentId']]
  80. uri = f"https://www.nicovideo.jp/watch/{ datum['contentId'] }"
  81. (title, description, thumbnail) = get_embed_info (uri)
  82. try:
  83. upload = client.com.atproto.repo.upload_blob (
  84. io.BytesIO (requests.get (thumbnail,
  85. timeout = 60).content))
  86. thumb = upload.blob
  87. except Timeout:
  88. thumb = None
  89. embed_external = models.AppBskyEmbedExternal.Main (
  90. external = models.AppBskyEmbedExternal.External (
  91. title = title,
  92. description = description,
  93. thumb = upload.blob,
  94. uri = uri))
  95. client.post (Talk.main (f"""
  96. ニコニコに『{ datum['title'] }』という動画がアップされました。
  97. つけられたタグは「{ '」、「'.join (datum['tags']) }」です。
  98. 概要には次のように書かれています:
  99. ```html
  100. { datum['description'] }
  101. ```
  102. このことについて、みんなに告知するとともに、ニジカちゃんの感想を教えてください。 """),
  103. embed = embed_external)
  104. if now.hour == 14 and has_got_snack_time:
  105. has_got_snack_time = False
  106. if now.hour == 15 and not has_got_snack_time:
  107. try:
  108. with open ('./assets/snack-time.jpg', 'rb') as f:
  109. image = models.AppBskyEmbedImages.Image (
  110. alt = ('左に喜多ちゃん、右に人面鹿のニジカが'
  111. 'V字に並んでいる。'
  112. '喜多ちゃんは右手でピースサインをして'
  113. '片目をウインクしている。'
  114. 'ニジカは両手を広げ、'
  115. '右手にスプーンを持って'
  116. 'ポーズを取っている。'
  117. '背景には'
  118. '赤と黄色の放射線状の模様が広がり、'
  119. '下部に「おやつタイムだ!!!!」という'
  120. '日本語のテキストが表示されている。'),
  121. image = client.com.atproto.repo.upload_blob (f).blob)
  122. client.post (Talk.main ('おやつタイムだ!!!!'),
  123. embed = models.app.bsky.embed.images.Main (
  124. images = [image]))
  125. last_posted_at = now
  126. except Exception:
  127. pass
  128. has_got_snack_time = True
  129. if now - last_posted_at >= timedelta (hours = 6):
  130. client.post (Talk.main ('今どうしてる?'))
  131. last_posted_at = now
  132. time.sleep (60)
  133. def get_nico_deerjika ():
  134. URL = ('https://snapshot.search.nicovideo.jp/api/v2/snapshot/video'
  135. '/contents/search')
  136. now = datetime.now ()
  137. base = now - timedelta (hours = 12)
  138. params = { 'q': '伊地知ニジカ',
  139. 'targets': 'tags',
  140. '_sort': '-startTime',
  141. 'fields': 'contentId,title,description,tags,startTime',
  142. '_limit': 20,
  143. 'jsonFilter': json.dumps ({ 'type': 'or',
  144. 'filters': [{
  145. 'type': 'range',
  146. 'field': 'startTime',
  147. 'from': ('%04d-%02d-%02dT%02d:%02d:00+09:00'
  148. % (base.year, base.month, base.day,
  149. base.hour, base.minute)),
  150. 'to': ('%04d-%02d-%02dT23:59:59+09:00'
  151. % (now.year, now.month, now.day)),
  152. 'include_lower': True }] }) }
  153. try:
  154. res = requests.get (URL, params = params, timeout = 60).json ()
  155. except Timeout:
  156. return []
  157. data = []
  158. for datum in res['data']:
  159. datum['tags'] = datum['tags'].split ()
  160. data.append (datum)
  161. return data
  162. def get_embed_info (
  163. url: str
  164. ) -> (str, str, str):
  165. title: str = ''
  166. description: str = ''
  167. thumbnail: str = ''
  168. try:
  169. res = requests.get (url, timeout = 60)
  170. except Timeout:
  171. return ('', '', '')
  172. if res.status_code != 200:
  173. return ('', '', '')
  174. soup = BeautifulSoup (res.text, 'html.parser')
  175. tmp = soup.find ('title')
  176. if tmp is not None:
  177. title = tmp.text
  178. tmp = soup.find ('meta', attrs = { 'name': 'description' })
  179. if tmp is not None:
  180. description = tmp.get ('content')
  181. tmp = soup.find ('meta', attrs = { 'name': 'thumbnail' })
  182. if tmp is not None:
  183. thumbnail = tmp.get ('content')
  184. return (title, description, thumbnail)
  185. if __name__ == '__main__':
  186. main (*sys.argv[1:])