ニジカのスカトロ,ニジカトロ. https://bsky.app/profile/deerjika-bot.bsky.social
main.py 5.4 KiB

  1. from datetime import datetime, timedelta
  2. import json
  3. import time
  4. import sys
  5. from atproto import Client, models
  6. import requests
  7. from ai.talk import Talk
  8. import account
  9. def check_notifications (
  10. client: Client,
  11. ) -> list:
  12. (uris, last_seen_at) = ([], client.get_current_time_iso ())
  13. for notification in (client.app.bsky.notification.list_notifications ()
  14. .notifications):
  15. if not notification.is_read:
  16. if notification.reason in ['mention', 'reply']:
  17. uris += [notification.uri]
  18. elif notification.reason == 'follow':
  19. client.follow (notification.author.did)
  20. client.app.bsky.notification.update_seen ({ 'seen_at': last_seen_at })
  21. return uris
  22. def get_thread_contents (
  23. client: Client,
  24. uri: str,
  25. parent_height: int,
  26. ) -> list:
  27. response = (client.get_post_thread (uri = uri,
  28. parent_height = parent_height)
  29. .thread)
  30. records = []
  31. while response is not None:
  32. records += [{ 'strong_ref': models.create_strong_ref (response.post),
  33. 'did': response.post.author.did,
  34. 'handle': response.post.author.handle,
  35. 'name': response.post.author.display_name,
  36. 'datetime': response.post.record.created_at,
  37. 'text': response.post.record.text,
  38. 'embed': response.post.record.embed }]
  39. response = response.parent
  40. return records
  41. def main () -> None:
  42. client = Client (base_url = 'https://bsky.social')
  43. client.login (account.USER_ID, account.PASSWORD)
  44. last_posted_at = datetime.now () - timedelta (hours = 6)
  45. has_got_snack_time = False
  46. while True:
  47. now = datetime.now ()
  48. for uri in check_notifications (client):
  49. records = get_thread_contents (client, uri, 20)
  50. if len (records) > 0:
  51. answer = Talk.main ((records[0]['text']
  52. if (records[0]['embed'] is None
  53. or not hasattr (records[0]['embed'],
  54. 'images'))
  55. else [
  56. { 'type': 'text', 'text': records[0]['text'] },
  57. { 'type': 'image_url', 'image_url': {
  58. 'url': f"https://cdn.bsky.app/img/feed_fullsize/plain/{ records[0]['did'] }/{ records[0]['embed'].images[0].image.ref.link }" } }]),
  59. records[0]['name'],
  60. [*map (lambda record: {
  61. 'role': ('assistant'
  62. if (record['handle']
  63. == account.USER_ID)
  64. else 'user'),
  65. 'content':
  66. record['text']},
  67. reversed (records[1:]))])
  68. client.send_post (answer,
  69. reply_to = models.AppBskyFeedPost.ReplyRef (
  70. parent = records[0]['strong_ref'],
  71. root = records[-1]['strong_ref']))
  72. if now.hour == 14 and has_got_snack_time:
  73. has_got_snack_time = False
  74. if now.hour == 15 and not has_got_snack_time:
  75. try:
  76. with open ('./assets/snack-time.jpg', 'rb') as f:
  77. image = models.AppBskyEmbedImages.Image (
  78. alt = 'おやつタイムだ!!!!',
  79. image = client.com.atproto.repo.upload_blob (f).blob)
  80. client.send_post (Talk.main ('おやつタイムだ!!!!'),
  81. embed = models.app.bsky.embed.images.Main (
  82. images = [image]))
  83. last_posted_at = now
  84. except Exception:
  85. pass
  86. has_got_snack_time = True
  87. if now - last_posted_at >= timedelta (hours = 6):
  88. client.send_post (Talk.main ('今どうしてる?'))
  89. last_posted_at = now
  90. time.sleep (60)
  91. def get_nico_deerjika ():
  92. URL = ('https://snapshot.search.nicovideo.jp/api/v2/snapshot/video'
  93. '/contents/search')
  94. now = datetime.now ()
  95. params = { 'q': '伊地知ニジカ',
  96. 'targets': 'tagsExact',
  97. '_sort': '-startTime',
  98. 'fields': 'contentId,title,description,tags,startTime',
  99. '_limit': 1,
  100. 'jsonFilter': json.dumps ({ 'type': 'or',
  101. 'filters': [{
  102. 'type': 'range',
  103. 'field': 'startTime',
  104. 'from': ('%04d-%02d-%02dT00:00:00+09:00'
  105. % (now.year, now.month, now.day)),
  106. 'to': ('%04d-%02d-%02dT23:59:59+09:00'
  107. % (now.year, now.month, now.day)),
  108. 'include_lower': True }] }) }
  109. res = requests.get (URL, params = params).json ()
  110. return res['data'][0] if len (res['data']) > 0 else None
  111. if __name__ == '__main__':
  112. main (*sys.argv[1:])