Browse Source

#1 型定義が未完成だが,動作は問題ないと思ãはれるため本番に移す.

kiriban
みてるぞ 3 weeks ago
parent
commit
d1b1bf2a14
6 changed files with 141 additions and 33 deletions
  1. +49
    -0
      atproto.pyi
  2. +0
    -0
      atproto/models.pyi
  3. +1
    -0
      eloquent.pyi
  4. +15
    -6
      main.py
  5. +75
    -26
      nico.py
  6. +1
    -1
      nizika_nico

+ 49
- 0
atproto.pyi View File

@@ -0,0 +1,49 @@
from datetime import datetime

from atproto import models


class Client:
app: AppNamespace

def get_current_time_iso (self) -> datetime: ...

def get_post_thread (
self,
uri: str,
parent_height: int | None = None
) -> Response: ...

def follow (self, did: str) -> None: ...


class AppNamespace:
bsky: AppBskyNamespace


class AppBskyNamespace:
notification: AppBskyNotificationNamespace


class AppBskyNotificationNamespace:
def list_notifications (self) -> Response: ...

def update_seen (self, seen: dict[str, datetime]) -> None: ...


class Response:
notifications: list[Notification]
thread: (ThreadViewPost
| models.AppBskyFeedDefs.NotFoundPost
| models.AppBskyFeedDefs.BlockedPost)


class Notification:
is_read: bool
reason: str
uri: str
author: ProfileView


class ProfileView:
did: str

+ 0
- 0
atproto/models.pyi View File


+ 1
- 0
eloquent.pyi View File

@@ -0,0 +1 @@
db/eloquent.pyi

+ 15
- 6
main.py View File

@@ -70,8 +70,8 @@ def main (

client.login (account.USER_ID, account.PASSWORD)

kiriban_list: list[tuple[int, nico.VideoInfo]] = nico.get_kiriban_list ()
got_kiriban_at: date = datetime.now ().date () - timedelta (days = datetime.now ().hour < 15)
kiriban_list: list[tuple[int, nico.VideoInfo]] = nico.get_kiriban_list (got_kiriban_at)
kiriban_interval: timedelta = ((get_kiriban_dt_to_update () - datetime.now ())
/ len (kiriban_list))
next_kiriban_at = datetime.now ()
@@ -109,8 +109,17 @@ def main (
root = records[-1]['strong_ref']))

if kiriban_list and datetime.now () >= next_kiriban_at:
(views_count, video_code) = (
(views_count, video_info) = (
kiriban_list.pop (random.randint (0, len (kiriban_list) - 1)))
uri = f"https://www.nicovideo.jp/watch/{ video_info['contentId'] }"
(title, description, thumbnail) = get_embed_info (uri)
try:
upload = client.com.atproto.repo.upload_blob (
io.BytesIO (requests.get (thumbnail,
timeout = 60).content))
thumb = upload.blob
except Timeout:
thumb = None
embed_external = models.AppBskyEmbedExternal.Main (
external = models.AppBskyEmbedExternal.External (
title = title,
@@ -118,11 +127,11 @@ def main (
thumb = thumb,
uri = uri))
client.post (Talk.main (f"""
ニコニコの『{ datum['title'] }』という動画が{ views_count }再生を突破しました。
つけられたタグは「{ '」、「'.join (datum['tags']) }」です。
ニコニコの『{ video_info['title'] }』という動画が{ views_count }再生を突破しました。
つけられたタグは「{ '」、「'.join (video_info['tags']) }」です。
概要には次のように書かれています:
```html
{ datum['description'] }
{ video_info['description'] }
```
このことについて、ニジカちゃんからのお祝いメッセージを下さい。"""),
embed = embed_external)
@@ -167,7 +176,7 @@ def main (

if now.hour == 15:
if got_kiriban_at < datetime.now ().date ():
kiriban_list = nico.get_kiriban_list ()
kiriban_list = nico.get_kiriban_list (datetime.now ().date ())
got_kiriban_at = datetime.now ().date ()
kiriban_interval = ((get_kiriban_dt_to_update () - datetime.now ())
/ len (kiriban_list))


+ 75
- 26
nico.py View File

@@ -2,19 +2,32 @@
ニコニコのニジカ動画取得モヂュール
"""

import os
from datetime import date, timedelta
from typing import TypedDict, cast

import requests
from bs4 import BeautifulSoup
from requests.exceptions import Timeout
from eloquent import DatabaseManager, Model

from db.models import Video
from db.models import Tag, Video, VideoHistory, VideoTag

KIRIBAN_VIEWS_COUNTS = { *range (100, 1_000, 100),
*range (1_000, 10_000, 1_000),
*range (10_000, 1_000_001, 10_000),
194, 245, 510, 114_514, 1_940, 2_450, 5_100, 24_500,
51_000, 2_424 }
CONFIG: dict[str, DbConfig] = { 'mysql': { 'driver': 'mysql',
'host': 'localhost',
'database': 'nizika_nico',
'user': os.environ['MYSQL_USER'],
'password': os.environ['MYSQL_PASS'],
'prefix': '' } }

DB = DatabaseManager (CONFIG)
Model.set_connection_resolver (DB)

KIRIBAN_VIEWS_COUNTS: set[int] = { *range (100, 1_000, 100),
*range (1_000, 10_000, 1_000),
*range (10_000, 1_000_001, 10_000),
194, 245, 510, 114_514, 1_940, 2_450, 5_100, 24_500,
51_000, 2_424 }


class VideoInfo (TypedDict):
@@ -46,25 +59,7 @@ def get_latest_deerjika (
except Exception:
return None

bs = get_bs_from_url ('https://www.nicovideo.jp/watch/'
+ video_info['contentId'])
if bs is None:
return None

try:
title = bs.find ('title')
if title is None:
return None
video_info['title'] = '-'.join (title.text.split ('-')[:(-1)])[:(-1)]

tags: str = bs.find ('meta', attrs = { 'name': 'keywords' }).get ('content') # type: ignore
video_info['tags'] = tags.split (',')

video_info['description'] = bs.find ('meta', attrs = { 'name': 'description' }).get ('content') # type: ignore
except Exception:
return None

return cast (VideoInfo, video_info)
return get_video_info (video_info['contentId'])


def get_bs_from_url (
@@ -100,8 +95,62 @@ def get_bs_from_url (
return BeautifulSoup (req.text, 'html.parser')


def get_video_info (
video_code: str,
) -> VideoInfo | None:
video_info: dict[str, str | list[str]] = { 'contentId': video_code }

bs = get_bs_from_url (f"https://www.nicovideo.jp/watch/{ video_code }")
if bs is None:
return None

try:
title = bs.find ('title')
if title is None:
return None
video_info['title'] = '-'.join (title.text.split ('-')[:(-1)])[:(-1)]

tags: str = bs.find ('meta', attrs = { 'name': 'keywords' }).get ('content') # type: ignore
video_info['tags'] = tags.split (',')

video_info['description'] = bs.find ('meta', attrs = { 'name': 'description' }).get ('content') # type: ignore
except Exception:
return None

return cast (VideoInfo, video_info)


def get_kiriban_list (
base_date: date,
) -> list[tuple[int, VideoInfo]]:
kiriban_list: list[tuple[int, VideoInfo]] = []


latest_fetched_at = cast (date, VideoHistory.max ('fetched_at'))
previous_fetched_at = cast (date, (VideoHistory
.where ('fetched_at', '<', latest_fetched_at)
.max ('fetched_at')))

for kiriban_views_count in KIRIBAN_VIEWS_COUNTS:
targets = ({ vh.video.code for vh in (VideoHistory
.where ('fetched_at', latest_fetched_at)
.where ('views_count', '>=', kiriban_views_count)
.get ()) }
- { vh.video.code for vh in (VideoHistory
.where ('fetched_at', previous_fetched_at)
.where ('views_count', '>=', kiriban_views_count)
.get ()) })
for code in targets:
video_info = get_video_info (code)
if video_info is not None:
kiriban_list.append ((kiriban_views_count, video_info))

return kiriban_list


class DbConfig (TypedDict):
driver: str
host: str
database: str
user: str
password: str
prefix: str

+ 1
- 1
nizika_nico

@@ -1 +1 @@
Subproject commit 67b76e6dd4d9b9332ba6774656d0ffc8a94be997
Subproject commit b2f5f81ca8615781d79d807fe92f7824653cea22

Loading…
Cancel
Save