Browse Source

コミット忘れ

ai-migration
みてるぞ 2 weeks ago
parent
commit
82fabebe2f
3 changed files with 8 additions and 346 deletions
  1. +6
    -195
      main.py
  2. +0
    -150
      nico.py
  3. +2
    -1
      test.py

+ 6
- 195
main.py View File

@@ -5,33 +5,15 @@ Bluesky のニジカがいろいろする.

from __future__ import annotations

import io
import random
import sys
import time
from datetime import date, datetime
from datetime import time as dt_time
from datetime import timedelta
from typing import TypedDict, cast
from datetime import datetime, timedelta
from typing import TypedDict

import requests
from atproto import Client, models
from atproto_client.models.app.bsky.feed.get_timeline import Response
from bs4 import BeautifulSoup
from requests.exceptions import Timeout

import account
import nico
from ai.talk import Talk

TARGET_WORDS = ['deerjika', 'ニジカ', 'ぼっち', '虹夏', '郁代', 'バーカ',
'kfif', 'kita-flatten-ikuyo-flatten', 'ラマ田', 'ゴートう',
'ぼざクリ', 'オオミソカ', '伊地知', '喜多ちゃん',
'喜タイ', '洗澡鹿', 'シーザオ', '今日は本当に',
'ダイソーで', '変なチンチン', 'daisoで', 'だね~(笑)',
'おやつタイム', 'わさしが', 'わさび県', 'たぬマ', 'にくまる',
'ルイズマリー', '餅', 'ニジゴ', 'ゴニジ', 'ニジニジ',
'新年だよね', 'うんこじゃん', 'ほくほくのジャガイモ']


def main (
@@ -42,23 +24,15 @@ def main (

client.login (account.USER_ID, account.PASSWORD)

got_kiriban_at: date = datetime.now ().date () - timedelta (days = datetime.now ().hour < 15)
kiriban_list: list[tuple[int, nico.VideoInfo, datetime]] = (
nico.get_kiriban_list (got_kiriban_at))
kiriban_interval: timedelta = ((get_kiriban_dt_to_update () - datetime.now ())
/ len (kiriban_list))
next_kiriban_at = datetime.now ()

last_posted_at = datetime.now () - timedelta (hours = 6)
has_got_snack_time = False
has_taken_hot_spring = False
watched_videos = []
while True:
now = datetime.now ()

for uri in check_notifications (client):
records = get_thread_contents (client, uri, 20)
if len (records) > 0:
if records:
answer = Talk.main ((records[0]['text']
if (records[0]['embed'] is None
or not hasattr (records[0]['embed'],
@@ -66,7 +40,9 @@ def main (
else [
{ 'type': 'text', 'text': records[0]['text'] },
{ 'type': 'image_url', 'image_url': {
'url': f"https://cdn.bsky.app/img/feed_fullsize/plain/{ records[0]['did'] }/{ records[0]['embed'].images[0].image.ref.link }" } }]),
'url': (f"https://cdn.bsky.app/img/feed_fullsize"
f"/plain/{ records[0]['did'] }"
f"/{ records[0]['embed'].images[0].image.ref.link }") } }]),
records[0]['name'],
[*map (lambda record: {
'role': ('assistant'
@@ -81,98 +57,7 @@ def main (
parent = records[0]['strong_ref'],
root = records[-1]['strong_ref']))

like_posts (client)

if kiriban_list and datetime.now () >= next_kiriban_at:
(views_count, video_info, uploaded_at) = (
kiriban_list.pop (random.randint (0, len (kiriban_list) - 1)))
since_posted = datetime.now () - uploaded_at
uri = f"https://www.nicovideo.jp/watch/{ video_info['contentId'] }"
(title, description, thumbnail) = get_embed_info (uri)
try:
upload = client.com.atproto.repo.upload_blob (
io.BytesIO (requests.get (thumbnail,
timeout = 60).content))
thumb = upload.blob
except Timeout:
thumb = None
comments = nico.get_comments (video_info['contentId'])
popular_comments = sorted (comments,
key = lambda c: c.nico_count,
reverse = True)[:10]
latest_comments = sorted (comments,
key = lambda c: c.posted_at,
reverse = True)[:10]
embed_external = models.AppBskyEmbedExternal.Main (
external = models.AppBskyEmbedExternal.External (
title = title,
description = description,
thumb = thumb,
uri = uri))
prompt = f"{ since_posted.days }日と{ since_posted.seconds }秒前にニコニコに投稿された『{ video_info['title'] }』という動画が{ views_count }再生を突破しました。\n"
prompt += f"コメント数は{ len (comments) }件です。\n"
if video_info['tags']:
prompt += f"つけられたタグは「{ '」、「'.join (video_info['tags']) }」です。\n"
if comments:
prompt += f"人気のコメントは次の通りです:「{ '」、「'.join (c.content for c in popular_comments) }」\n"
prompt += f"最新のコメントは次の通りです:「{ '」、「'.join (c.content for c in latest_comments) }」\n"
prompt += f"""
概要には次のように書かれています:
```html
{ video_info['description'] }
```
このことについて、ニジカちゃんからのお祝いメッセージを下さい。
ただし、そのメッセージ内には再生数の数値とその多さに応じたリアクションを添えてください。
また、ぜひ投稿からこの再生数に至るまでにかかった時間や、つけられたタグ、コメントに対して思いを馳せてください。
好きなコメントがあったら教えてね。"""
client.post (Talk.main (prompt), embed = embed_external)
next_kiriban_at += kiriban_interval
last_posted_at = now

latest_deerjika = nico.get_latest_deerjika ()
if latest_deerjika is not None:
for datum in [e for e in [latest_deerjika]
if e['contentId'] not in watched_videos]:
watched_videos += [datum['contentId']]

uri = f"https://www.nicovideo.jp/watch/{ datum['contentId'] }"
(title, description, thumbnail) = get_embed_info (uri)
try:
upload = client.com.atproto.repo.upload_blob (
io.BytesIO (requests.get (thumbnail,
timeout = 60).content))
thumb = upload.blob
except Timeout:
thumb = None

embed_external = models.AppBskyEmbedExternal.Main (
external = models.AppBskyEmbedExternal.External (
title = title,
description = description,
thumb = thumb,
uri = uri))
client.post (Talk.main (f"""
ニコニコに『{ datum['title'] }』という動画がアップされました。
つけられたタグは「{ '」、「'.join (datum['tags']) }」です。
概要には次のように書かれています:
```html
{ datum['description'] }
```
このことについて、みんなに告知するとともに、ニジカちゃんの感想を教えてください。 """),
embed = embed_external)
last_posted_at = now

if now.hour == 14 and has_got_snack_time:
has_got_snack_time = False

if now.hour == 15:
if got_kiriban_at < datetime.now ().date ():
kiriban_list = nico.get_kiriban_list (datetime.now ().date ())
got_kiriban_at = datetime.now ().date ()
kiriban_interval = ((get_kiriban_dt_to_update () - datetime.now ())
/ len (kiriban_list))
next_kiriban_at = datetime.now ()

if not has_got_snack_time:
try:
with open ('./assets/snack-time.jpg', 'rb') as f:
@@ -223,10 +108,6 @@ def main (
pass
has_taken_hot_spring = True

if now - last_posted_at >= timedelta (hours = 6):
client.post (Talk.main ('今どうしてる?'))
last_posted_at = now

time.sleep (60)


@@ -270,76 +151,6 @@ def get_thread_contents (
return records


def get_embed_info (
url: str
) -> tuple[str, str, str]:
title: str = ''
description: str = ''
thumbnail: str = ''

try:
res = requests.get (url, timeout = 60)
except Timeout:
return ('', '', '')

if res.status_code != 200:
return ('', '', '')

soup = BeautifulSoup (res.text, 'html.parser')

tmp = soup.find ('title')
if tmp is not None:
title = tmp.text

tmp = soup.find ('meta', attrs = { 'name': 'description' })
if tmp is not None and hasattr (tmp, 'get'):
try:
description = cast (str, tmp.get ('content'))
except Exception:
pass

tmp = soup.find ('meta', attrs = { 'name': 'thumbnail' })
if tmp is not None and hasattr (tmp, 'get'):
try:
thumbnail = cast (str, tmp.get ('content'))
except Exception:
pass

return (title, description, thumbnail)


def get_kiriban_dt_to_update (
) -> datetime:
now = datetime.now ()
today = now.date ()
dt = datetime.combine (today, dt_time (15, 0))
if dt <= now:
dt += timedelta (days = 1)
return dt


def like_posts (
client: Client,
) -> None:
for post in get_target_posts (client):
client.like (**post)


def get_target_posts (
client: Client,
) -> list[LikeParams]:
posts = []

timeline: Response = client.get_timeline ()
for feed in timeline.feed:
if (feed.post.author.did != client.me.did
and (feed.post.viewer.like is None)
and any (target_word in feed.post.record.text.lower () for target_word in TARGET_WORDS)):
posts.append (LikeParams({ 'uri': feed.post.uri, 'cid': feed.post.cid }))

return posts


class LikeParams (TypedDict):
uri: str
cid: str


+ 0
- 150
nico.py View File

@@ -1,150 +0,0 @@
"""
ニコニコのニジカ動画取得モヂュール
"""

from __future__ import annotations

import os
from datetime import date, datetime, timedelta
from typing import TypedDict, cast

import requests
from bs4 import BeautifulSoup
from requests.exceptions import Timeout
from eloquent import DatabaseManager, Model

import nicolib
from db.models import Comment, Tag, Video, VideoHistory, VideoTag

CONFIG: dict[str, DbConfig] = { 'mysql': { 'driver': 'mysql',
'host': 'localhost',
'database': 'nizika_nico',
'user': os.environ['MYSQL_USER'],
'password': os.environ['MYSQL_PASS'],
'prefix': '' } }

DB = DatabaseManager (CONFIG)
Model.set_connection_resolver (DB)

KIRIBAN_VIEWS_COUNTS: list[int] = sorted ({ *range (1_000, 10_000, 1_000),
*range (10_000, 1_000_001, 10_000),
194, 245, 510, 810, 114_514, 1_940, 2_450, 5_100,
19_400, 24_500, 51_000, 93_194, 2_424, 242_424, 1_919,
4_545, 194_245, 245_194, 510_245 },
reverse = True)

class VideoInfo (TypedDict):
contentId: str
title: str
tags: list[str]
description: str


def get_latest_deerjika (
) -> VideoInfo | None:
tag = '伊地知ニジカ OR ぼざろクリーチャーシリーズ'
url = f"https://www.nicovideo.jp/tag/{ tag }"

params = { 'sort': 'f',
'order': 'd' }

video_info = { }

bs = get_bs_from_url (url, params)
if bs is None:
return None

try:
video = (bs.find_all ('ul', class_ = 'videoListInner')[1]
.find ('li', class_ = 'item'))

video_info['contentId'] = video['data-video-id']
except Exception:
return None

return get_video_info (video_info['contentId'])


def get_bs_from_url (
url: str,
params: dict = { },
) -> BeautifulSoup | None:
"""
URL から BeautifulSoup インスタンス生成

Parameters
----------
url: str
捜査する URL
params: dict
パラメータ

Return
------
BeautifulSoup | None
BeautifulSoup オブゼクト(失敗したら None)
"""

try:
req = requests.get (url, params = params, timeout = 60)
except Timeout:
return None

if req.status_code != 200:
return None

req.encoding = req.apparent_encoding

return BeautifulSoup (req.text, 'html.parser')


def get_kiriban_list (
base_date: date,
) -> list[tuple[int, VideoInfo, datetime]]:
kiriban_list: list[tuple[int, VideoInfo, datetime]] = []

latest_fetched_at = cast (date, (VideoHistory
.where ('fetched_at', '<=', base_date)
.max ('fetched_at')))

for kiriban_views_count in KIRIBAN_VIEWS_COUNTS:
targets = { vh.video.code for vh in (VideoHistory
.where ('fetched_at', latest_fetched_at)
.where ('views_count', '>=', kiriban_views_count)
.get ()) }
for code in targets:
if code in [kiriban[1]['contentId'] for kiriban in kiriban_list]:
continue
previous_views_count: int | None = (
VideoHistory
.where_has ('video', lambda q: q.where ('code', code))
.where ('fetched_at', '<', latest_fetched_at)
.max ('views_count'))
if previous_views_count is None:
previous_views_count = 0
if previous_views_count >= kiriban_views_count:
continue
video_info = get_video_info (code)
if video_info is not None:
kiriban_list.append ((kiriban_views_count, video_info,
cast (Video, Video.where ('code', code).first ()).uploaded_at))

return kiriban_list


def get_comments (
video_code: str,
) -> list[Comment]:
video = Video.where ('code', video_code).first ()
if video is None:
return []
return video.comments


class DbConfig (TypedDict):
driver: str
host: str
database: str
user: str
password: str
prefix: str

+ 2
- 1
test.py View File

@@ -35,7 +35,8 @@ async def main (
"""

await asyncio.gather (like_posts (),
reply ())
reply (),
check_mentions ())


async def like_posts (


Loading…
Cancel
Save