| 
							- from datetime import datetime, timedelta
 - import io
 - import json
 - import time
 - import sys
 - 
 - from atproto import Client, models
 - from bs4 import BeautifulSoup
 - import requests
 - 
 - from ai.talk import Talk
 - import account
 - 
 - 
 - def check_notifications (
 -         client: Client,
 - ) -> list:
 -     (uris, last_seen_at) = ([], client.get_current_time_iso ())
 - 
 -     for notification in (client.app.bsky.notification.list_notifications ()
 -                          .notifications):
 -         if not notification.is_read:
 -             if notification.reason in ['mention', 'reply']:
 -                 uris += [notification.uri]
 -             elif notification.reason == 'follow':
 -                 client.follow (notification.author.did)
 - 
 -     client.app.bsky.notification.update_seen ({ 'seen_at': last_seen_at })
 - 
 -     return uris
 - 
 - 
 - def get_thread_contents (
 -         client:         Client,
 -         uri:            str,
 -         parent_height:  int,
 - ) -> list:
 -     response = (client.get_post_thread (uri           = uri,
 -                                         parent_height = parent_height)
 -                 .thread)
 -     records = []
 -     while response is not None:
 -         records += [{ 'strong_ref': models.create_strong_ref (response.post),
 -                       'did':        response.post.author.did,
 -                       'handle':     response.post.author.handle,
 -                       'name':       response.post.author.display_name,
 -                       'datetime':   response.post.record.created_at,
 -                       'text':       response.post.record.text,
 -                       'embed':      response.post.record.embed }]
 -         response = response.parent
 - 
 -     return records
 - 
 - 
 - def main (
 - ) -> None:
 -     client = Client (base_url = 'https://bsky.social')
 - 
 -     client.login (account.USER_ID, account.PASSWORD)
 - 
 -     last_posted_at = datetime.now () - timedelta (hours = 6)
 -     has_got_snack_time = False
 -     watched_videos = []
 -     while True:
 -         now = datetime.now ()
 - 
 -         for uri in check_notifications (client):
 -             records = get_thread_contents (client, uri, 20)
 -             if len (records) > 0:
 -                 answer = Talk.main ((records[0]['text']
 -                                      if (records[0]['embed'] is None
 -                                          or not hasattr (records[0]['embed'],
 -                                                          'images'))
 -                                      else [
 -                         { 'type': 'text', 'text': records[0]['text'] },
 -                         { 'type': 'image_url', 'image_url': {
 -                             'url': f"https://cdn.bsky.app/img/feed_fullsize/plain/{ records[0]['did'] }/{ records[0]['embed'].images[0].image.ref.link }" } }]),
 -                                     records[0]['name'],
 -                                     [*map (lambda record: {
 -                                                'role': ('assistant'
 -                                                         if (record['handle']
 -                                                             == account.USER_ID)
 -                                                         else 'user'),
 -                                                'content':
 -                                                    record['text']},
 -                                            reversed (records[1:]))])
 -                 client.post (answer,
 -                              reply_to = models.AppBskyFeedPost.ReplyRef (
 -                                  parent = records[0]['strong_ref'],
 -                                  root   = records[-1]['strong_ref']))
 - 
 -         for datum in [e for e in get_nico_deerjika ()
 -                         if e['contentId'] not in watched_videos]:
 -             watched_videos += [datum['contentId']]
 - 
 -             uri = f"https://www.nicovideo.jp/watch/{ datum['contentId'] }"
 -             (title, description, thumbnail) = get_embed_info (uri)
 -             upload = client.com.atproto.repo.upload_blob (
 -                     io.BytesIO (requests.get (thumbnail).content))
 - 
 -             embed_external = models.AppBskyEmbedExternal.Main (
 -                     external = models.AppBskyEmbedExternal.External (
 -                         title       = title,
 -                         description = description,
 -                         thumb       = upload.blob,
 -                         uri         = uri))
 -             client.post (Talk.main (f"""
 - ニコニコに『{ datum['title'] }』という動画がアップされました。
 - つけられたタグは「{ '」、「'.join (datum['tags']) }」です。
 - 概要には次のように書かれています:
 - ```html
 - { datum['description'] }
 - ```
 - このことについて、みんなに告知するとともに、ニジカちゃんの感想を教えてください。 """),
 -                          embed = embed_external)
 - 
 -         if now.hour == 14 and has_got_snack_time:
 -             has_got_snack_time = False
 - 
 -         if now.hour == 15 and not has_got_snack_time:
 -             try:
 -                 with open ('./assets/snack-time.jpg', 'rb') as f:
 -                     image = models.AppBskyEmbedImages.Image (
 -                             alt   = ('左に喜多ちゃん、右に人面鹿のニジカが'
 -                                      'V字に並んでいる。'
 -                                      '喜多ちゃんは右手でピースサインをして'
 -                                      '片目をウインクしている。'
 -                                      'ニジカは両手を広げ、'
 -                                      '右手にスプーンを持って'
 -                                      'ポーズを取っている。'
 -                                      '背景には'
 -                                      '赤と黄色の放射線状の模様が広がり、'
 -                                      '下部に「おやつタイムだ!!!!」という'
 -                                      '日本語のテキストが表示されている。'),
 -                             image = client.com.atproto.repo.upload_blob (f).blob)
 - 
 -                 client.post (Talk.main ('おやつタイムだ!!!!'),
 -                              embed = models.app.bsky.embed.images.Main (
 -                                  images = [image]))
 -                 last_posted_at = now
 -             except Exception:
 -                 pass
 - 
 -             has_got_snack_time = True
 - 
 -         if now - last_posted_at >= timedelta (hours = 6):
 -             client.post (Talk.main ('今どうしてる?'))
 -             last_posted_at = now
 - 
 -         time.sleep (60)
 - 
 - 
 - def get_nico_deerjika ():
 -     URL = ('https://snapshot.search.nicovideo.jp/api/v2/snapshot/video'
 -            '/contents/search')
 - 
 -     now = datetime.now ()
 -     base = now - timedelta (hours = 1)
 - 
 -     params = { 'q':          '伊地知ニジカ',
 -                'targets':    'tags',
 -                '_sort':      '-startTime',
 -                'fields':     'contentId,title,description,tags,startTime',
 -                '_limit':     20,
 -                'jsonFilter': json.dumps ({ 'type':    'or',
 -                                            'filters': [{
 -             'type':          'range',
 -             'field':         'startTime',
 -             'from':          ('%04d-%02d-%02dT%02d:%02d:00+09:00'
 -                               % (base.year, base.month, base.day,
 -                                  base.hour, base.minute)),
 -             'to':            ('%04d-%02d-%02dT23:59:59+09:00'
 -                               % (now.year, now.month, now.day)),
 -             'include_lower': True }] }) }
 - 
 -     res = requests.get (URL, params = params).json ()
 - 
 -     data = []
 -     for datum in res['data']:
 -         datum['tags'] = datum['tags'].split ()
 -         data.append (datum)
 - 
 -     return data
 - 
 - 
 - def get_embed_info (
 -         url:    str
 - ) -> (str, str, str):
 -     title:          str = ''
 -     description:    str = ''
 -     thumbnail:      str = ''
 - 
 -     try:
 -         res = requests.get (url, timeout = 60)
 -     except Exception:
 -         return ('', '', '')
 - 
 -     if res.status_code != 200:
 -         return ('', '', '')
 - 
 -     soup = BeautifulSoup (res.text, 'html.parser')
 - 
 -     tmp = soup.find ('title')
 -     if tmp is not None:
 -         title = tmp.text
 - 
 -     tmp = soup.find ('meta', attrs = { 'name': 'description' })
 -     if tmp is not None:
 -         description = tmp.get ('content')
 - 
 -     tmp = soup.find ('meta', attrs = { 'name': 'thumbnail' })
 -     if tmp is not None:
 -         thumbnail = tmp.get ('content')
 - 
 -     return (title, description, thumbnail)
 - 
 - 
 - if __name__ == '__main__':
 -     main (*sys.argv[1:])
 
 
  |