0e1e87ec05
#40 #40 #40 Co-authored-by: miteruzo <miteruzo@naver.com> Reviewed-on: #41
1019 行
29 KiB
Python
1019 行
29 KiB
Python
from __future__ import annotations
|
|
|
|
import math
|
|
import os
|
|
import random
|
|
import sys
|
|
import time
|
|
import wave
|
|
from datetime import datetime, timedelta
|
|
from enum import Enum, auto
|
|
from io import BytesIO
|
|
from typing import Callable, TypedDict
|
|
|
|
import cv2
|
|
import emoji
|
|
import ephem
|
|
import numpy as np
|
|
import pygame
|
|
import pygame.gfxdraw
|
|
import pytchat
|
|
import requests
|
|
from cv2 import VideoCapture
|
|
from ephem import Moon, Observer, Sun
|
|
from pygame import Rect, Surface
|
|
from pygame.font import Font
|
|
from pygame.mixer import Sound
|
|
from pygame.time import Clock
|
|
from pytchat.core.pytchat import PytchatCore
|
|
from pytchat.processors.default.processor import Chat
|
|
|
|
from aques import Aques
|
|
from common_module import CommonModule
|
|
from nizika_ai.config import DB
|
|
from nizika_ai.consts import Character, GPTModel, Platform, QueryType
|
|
from nizika_ai.models import Answer, AnsweredFlag, Query, User
|
|
|
|
pygame.init ()
|
|
|
|
FPS = 30
|
|
|
|
SYSTEM_FONT = pygame.font.SysFont ('notosanscjkjp', 11, bold = True)
|
|
USER_FONT = pygame.font.SysFont ('notosanscjkjp', 15, italic = True)
|
|
DEERJIKA_FONT = pygame.font.SysFont ('07nikumarufont', 23)
|
|
|
|
|
|
def main (
|
|
) -> None:
|
|
game = Game ()
|
|
Bg (game)
|
|
balloon = Balloon (game)
|
|
deerjika = Deerjika (game, DeerjikaPattern.RELAXED,
|
|
x = CWindow.WIDTH * 3 / 4,
|
|
y = CWindow.HEIGHT - 56.25,
|
|
balloon = balloon)
|
|
snack_time = SnackTime (game)
|
|
CurrentTime (game, DEERJIKA_FONT)
|
|
|
|
broadcast: Broadcast | None = None
|
|
try:
|
|
broadcast = Broadcast (os.environ['BROADCAST_CODE'])
|
|
except KeyError:
|
|
broadcast = None
|
|
|
|
waiting_balloon = (False, '', '')
|
|
last_flags_poll: float = 0
|
|
traced_af_ids: list[int] = []
|
|
|
|
while True:
|
|
now_m = time.monotonic ()
|
|
|
|
for event in pygame.event.get ():
|
|
if event.type == pygame.QUIT:
|
|
pygame.quit ()
|
|
sys.exit ()
|
|
|
|
if (not balloon.enabled) and (not snack_time.enabled):
|
|
if waiting_balloon[0]:
|
|
deerjika.talk (waiting_balloon[1], waiting_balloon[2])
|
|
waiting_balloon = (False, '', '')
|
|
|
|
if now_m - last_flags_poll >= 1:
|
|
last_flags_poll = now_m
|
|
log (f"balloon: { balloon.enabled }, snack: { snack_time.enabled }")
|
|
try:
|
|
DB.begin_transaction ()
|
|
answer_flags = (AnsweredFlag.where ('platform', Platform.YOUTUBE.value)
|
|
.where ('answered', False)
|
|
.where_not_in ('id', traced_af_ids)
|
|
.get ())
|
|
log (f"pending: { len (answer_flags) }")
|
|
if answer_flags:
|
|
answer_flag = random.choice (answer_flags)
|
|
log (f"flag_id: { answer_flag.id }, answer_id: { answer_flag.answer_id }")
|
|
answer = Answer.find (answer_flag.answer_id)
|
|
match QueryType (answer.query_rel.query_type):
|
|
case QueryType.YOUTUBE_COMMENT:
|
|
query = Query.find (answer.query_id)
|
|
deerjika.talk (query.content, answer.content)
|
|
answer_flag.answered = True
|
|
answer_flag.save ()
|
|
case QueryType.SNACK_TIME:
|
|
snack_time.play ()
|
|
query = Query.find (answer.query_id)
|
|
waiting_balloon = (True, query.content, answer.content)
|
|
answer_flag.answered = True
|
|
answer_flag.save ()
|
|
case _:
|
|
traced_af_ids.append (answer_flag.id)
|
|
DB.commit ()
|
|
except Exception as ex:
|
|
DB.rollback ()
|
|
log ('EXCEPTION in poll loop')
|
|
print (ex)
|
|
|
|
try:
|
|
if broadcast is not None:
|
|
add_query (broadcast)
|
|
except Exception as ex:
|
|
log ('EXCEPTION in adding a query')
|
|
print (ex)
|
|
game.redraw ()
|
|
|
|
|
|
class Bg:
|
|
"""
|
|
背景オブゼクト管理用クラス
|
|
|
|
Attributes:
|
|
base (BgBase): 最背面
|
|
grass (BgGrass): 草原部分
|
|
jojoko (Jojoko): 大月ヨヨコ
|
|
kita (KitaSun): き太く陽
|
|
"""
|
|
|
|
base: BgBase
|
|
grass: BgGrass
|
|
jojoko: Jojoko
|
|
kita: KitaSun
|
|
|
|
def __init__ (
|
|
self,
|
|
game: Game,
|
|
):
|
|
self.kita = KitaSun (game)
|
|
self.base = BgBase (game, self.kita.sun, layer = self.kita.layer - 5)
|
|
self.jojoko = Jojoko (game)
|
|
self.grass = BgGrass (game)
|
|
|
|
|
|
class DeerjikaPattern (Enum):
|
|
"""
|
|
ニジカの状態
|
|
|
|
Members:
|
|
NORMAL: 通常
|
|
RELAXED: 足パタパタ
|
|
SLEEPING: 寝ニジカ
|
|
DANCING: ダンシング・ニジカ
|
|
"""
|
|
|
|
NORMAL = auto ()
|
|
RELAXED = auto ()
|
|
SLEEPING = auto ()
|
|
DANCING = auto ()
|
|
|
|
|
|
class Direction (Enum):
|
|
"""
|
|
クリーチャの向き
|
|
|
|
Members:
|
|
LEFT: 左向き
|
|
RIGHT: 右向き
|
|
"""
|
|
|
|
LEFT = auto ()
|
|
RIGHT = auto ()
|
|
|
|
|
|
class Game:
|
|
"""
|
|
ゲーム・クラス
|
|
|
|
Attributes:
|
|
clock (Clock): Clock オブゼクト
|
|
frame (int): フレーム・カウンタ
|
|
last_answered_at (datetime): 最後に回答した時刻
|
|
now (datetime): 基準日時
|
|
objects (list[GameObject]): 再描画するクラスのリスト
|
|
screen (Surface): 基底スクリーン
|
|
sky (Sky): 天体情報
|
|
"""
|
|
|
|
bgm: Sound
|
|
clock: Clock
|
|
fps: float
|
|
frame: int
|
|
last_answered_at: datetime
|
|
now: datetime
|
|
objects: list[GameObject]
|
|
screen: Surface
|
|
sky: Sky
|
|
|
|
def __init__ (
|
|
self,
|
|
):
|
|
self.now = datetime.now ()
|
|
self.screen = pygame.display.set_mode ((CWindow.WIDTH, CWindow.HEIGHT))
|
|
self.clock = Clock ()
|
|
self.fps = FPS
|
|
self.frame = 0
|
|
self.objects = []
|
|
self.bgm = Sound ('./assets/bgm.mp3')
|
|
self.bgm.set_volume (.15)
|
|
self.bgm.play (loops = -1)
|
|
self._create_sky ()
|
|
|
|
def redraw (
|
|
self,
|
|
) -> None:
|
|
self.now = datetime.now ()
|
|
self.sky.observer.date = self.now - timedelta (hours = 9)
|
|
for obj in sorted (self.objects, key = lambda obj: obj.layer):
|
|
if obj.enabled:
|
|
obj.redraw ()
|
|
pygame.display.update ()
|
|
delta_time = self.clock.tick (FPS) / 1000
|
|
self.fps = 1 / delta_time
|
|
if delta_time > 1 / FPS:
|
|
for _ in range (int (FPS * delta_time) - 1):
|
|
for obj in self.objects:
|
|
if obj.enabled:
|
|
obj.update ()
|
|
|
|
def _create_sky (
|
|
self,
|
|
) -> None:
|
|
self.sky = Sky ()
|
|
self.sky.observer = Observer ()
|
|
self.sky.observer.lat = '35'
|
|
self.sky.observer.lon = '139'
|
|
|
|
|
|
class GameObject:
|
|
"""
|
|
各ゲーム・オブゼクトの基底クラス
|
|
|
|
Attributes:
|
|
arg (float): 回転角度 (rad)
|
|
ax (float): X 軸に対する加速度 (px/frame^2)
|
|
ay (float): y 軸に対する加速度 (px/frame^2)
|
|
enabled (bool): オブゼクトの表示可否
|
|
frame (int): フレーム・カウンタ
|
|
game (Game): ゲーム基盤
|
|
height (int): 高さ (px)
|
|
vx (float): x 軸に対する速度 (px/frame)
|
|
vy (float): y 軸に対する速度 (px/frame)
|
|
width (int): 幅 (px)
|
|
x (float): X 座標 (px)
|
|
y (float): Y 座標 (px)
|
|
"""
|
|
|
|
arg: float = 0
|
|
ax: float = 0
|
|
ay: float = 0
|
|
enabled: bool = True
|
|
frame: int
|
|
game: Game
|
|
height: int
|
|
layer: float
|
|
vx: float = 0
|
|
vy: float = 0
|
|
width: int
|
|
x: float
|
|
y: float
|
|
|
|
def __init__ (
|
|
self,
|
|
game: Game,
|
|
layer: float | None = None,
|
|
enabled: bool = True,
|
|
x: float = 0,
|
|
y: float = 0,
|
|
):
|
|
self.game = game
|
|
self.enabled = enabled
|
|
self.frame = 0
|
|
if layer is None:
|
|
if self.game.objects:
|
|
layer = max (obj.layer for obj in self.game.objects) + 10
|
|
else:
|
|
layer = 0
|
|
self.layer = layer
|
|
self.x = x
|
|
self.y = y
|
|
self.game.objects.append (self)
|
|
|
|
def redraw (
|
|
self,
|
|
) -> None:
|
|
self.update ()
|
|
|
|
def update (
|
|
self,
|
|
) -> None:
|
|
self.x += self.vx
|
|
self.y += self.vy
|
|
self.vx += self.ax
|
|
self.vy += self.ay
|
|
self.frame += 1
|
|
|
|
|
|
class BgBase (GameObject):
|
|
"""
|
|
背景
|
|
|
|
Attributes:
|
|
surface (Surface): 背景 Surface
|
|
"""
|
|
|
|
bg: Surface
|
|
bg_evening: Surface
|
|
bg_grass: Surface
|
|
bg_night: Surface
|
|
sun: Sun
|
|
|
|
def __init__ (
|
|
self,
|
|
game: Game,
|
|
sun: Sun,
|
|
layer: float,
|
|
):
|
|
super ().__init__ (game, layer = layer)
|
|
self.bg = self._load_image ('./assets/bg.jpg')
|
|
self.bg_evening = self._load_image ('./assets/bg-evening.jpg')
|
|
self.bg_grass = self._load_image ('./assets/bg-grass.png')
|
|
self.bg_night = self._load_image ('./assets/bg-night.jpg')
|
|
self.sun = sun
|
|
|
|
@staticmethod
|
|
def _load_image (
|
|
path: str,
|
|
) -> Surface:
|
|
return pygame.transform.scale (pygame.image.load (path),
|
|
(CWindow.WIDTH, CWindow.HEIGHT))
|
|
|
|
def redraw (
|
|
self,
|
|
) -> None:
|
|
date_tmp = self.game.sky.observer.date
|
|
self.game.sky.observer.date = self.game.now.date ()
|
|
sunrise_start: datetime = (
|
|
(ephem.localtime (self.game.sky.observer.previous_rising (self.sun))
|
|
- timedelta (minutes = 30)))
|
|
sunrise_end: datetime = sunrise_start + timedelta (hours = 1)
|
|
sunrise_centre: datetime = (
|
|
sunrise_start + (sunrise_end - sunrise_start) / 2)
|
|
sunset_start: datetime = (
|
|
(ephem.localtime (self.game.sky.observer.next_setting (self.sun))
|
|
- timedelta (minutes = 30)))
|
|
sunset_end: datetime = sunset_start + timedelta (hours = 1)
|
|
sunset_centre: datetime = (
|
|
sunset_start + (sunset_end - sunset_start) / 2)
|
|
self.game.sky.observer.date = date_tmp
|
|
surface: Surface = ((self.bg
|
|
if (sunrise_centre <= self.game.now < sunset_centre)
|
|
else self.bg_night)
|
|
.copy ())
|
|
if sunrise_start <= self.game.now < sunrise_end:
|
|
self.bg_evening.set_alpha (255 - int ((abs (self.game.now - sunrise_centre) * 510)
|
|
/ (sunrise_end - sunrise_centre)))
|
|
elif sunset_start <= self.game.now < sunset_end:
|
|
self.bg_evening.set_alpha (255 - int ((abs (self.game.now - sunset_centre) * 510)
|
|
/ (sunset_end - sunset_centre)))
|
|
else:
|
|
self.bg_evening.set_alpha (0)
|
|
surface.blit (self.bg_evening, (0, 0))
|
|
self.game.screen.blit (surface, (self.x, self.y))
|
|
super ().redraw ()
|
|
|
|
|
|
class BgGrass (GameObject):
|
|
"""
|
|
背景の草原部分
|
|
|
|
Attributes:
|
|
surface (Surface): 草原 Surface
|
|
"""
|
|
|
|
surface: Surface
|
|
|
|
def __init__ (
|
|
self,
|
|
game: Game,
|
|
):
|
|
super ().__init__ (game)
|
|
self.game = game
|
|
self.surface = pygame.image.load ('./assets/bg-grass.png')
|
|
self.surface = pygame.transform.scale (self.surface, (CWindow.WIDTH, CWindow.HEIGHT))
|
|
|
|
def redraw (
|
|
self,
|
|
) -> None:
|
|
self.game.screen.blit (self.surface, (self.x, self.y))
|
|
super ().redraw ()
|
|
|
|
|
|
class Creature (GameObject):
|
|
sound: Sound
|
|
|
|
def bell (
|
|
self,
|
|
) -> None:
|
|
self.sound.play ()
|
|
|
|
|
|
class Deerjika (Creature):
|
|
"""
|
|
伊地知ニジカ
|
|
|
|
Attributes:
|
|
height (int): 高さ (px)
|
|
scale (float): 拡大率
|
|
surfaces (list[Surface]): ニジカの各フレームを Surface にしたリスト
|
|
width (int): 幅 (px)
|
|
"""
|
|
|
|
FPS = 30
|
|
|
|
height: int
|
|
scale: float = 1
|
|
surfaces: list[Surface]
|
|
width: int
|
|
talking: bool = False
|
|
wav: bytearray | None = None
|
|
balloon: Balloon
|
|
|
|
def __init__ (
|
|
self,
|
|
game: Game,
|
|
pattern: DeerjikaPattern = DeerjikaPattern.NORMAL,
|
|
direction: Direction = Direction.LEFT,
|
|
layer: float | None = None,
|
|
x: float = 0,
|
|
y: float = 0,
|
|
balloon: Balloon | None = None,
|
|
):
|
|
if balloon is None:
|
|
raise Exception
|
|
super ().__init__ (game, layer, x = x, y = y)
|
|
self.pattern = pattern
|
|
self.direction = direction
|
|
self.balloon = balloon
|
|
match pattern:
|
|
case DeerjikaPattern.NORMAL:
|
|
...
|
|
case DeerjikaPattern.RELAXED:
|
|
match direction:
|
|
case Direction.LEFT:
|
|
self.width = 480
|
|
self.height = 270
|
|
surface = pygame.image.load ('./assets/deerjika_relax_left.png')
|
|
self.surfaces = []
|
|
for x in range (0, surface.get_width (), self.width):
|
|
self.surfaces.append (
|
|
surface.subsurface (x, 0, self.width, self.height))
|
|
case Direction.RIGHT:
|
|
...
|
|
self.sound = Sound ('./assets/noon.wav')
|
|
|
|
def redraw (
|
|
self,
|
|
) -> None:
|
|
surface = self.surfaces[self.frame * self.FPS // FPS % len (self.surfaces)]
|
|
if abs (self.scale - 1) > .05:
|
|
surface = pygame.transform.scale (
|
|
surface, (self.width * self.scale, self.height * self.scale))
|
|
self.game.screen.blit (surface, surface.get_rect (center = (self.x, self.y)))
|
|
super ().redraw ()
|
|
|
|
def update (
|
|
self,
|
|
) -> None:
|
|
if (not self.balloon.enabled) and self.talking:
|
|
self.talking = False
|
|
if (self.balloon.enabled and self.balloon.frame >= FPS * 1.5
|
|
and not self.talking):
|
|
self.read_out ()
|
|
super ().update ()
|
|
|
|
def talk (
|
|
self,
|
|
query: str,
|
|
answer: str,
|
|
) -> None:
|
|
self.bell ()
|
|
self._create_wav (answer)
|
|
length = 300
|
|
if self.wav is not None:
|
|
with wave.open ('./nizika_talking.wav', 'rb') as f:
|
|
length = int (FPS * (f.getnframes () / f.getframerate () + 4))
|
|
self.balloon.talk (query, answer, length = length)
|
|
|
|
def read_out (
|
|
self,
|
|
) -> None:
|
|
try:
|
|
Sound ('./nizika_talking.wav').play ()
|
|
except Exception:
|
|
pass
|
|
self.talking = True
|
|
|
|
def _create_wav (
|
|
self,
|
|
message: str,
|
|
) -> None:
|
|
try:
|
|
self.wav = Aques.main (message, False)
|
|
except Exception:
|
|
self.wav = None
|
|
if self.wav is None:
|
|
return
|
|
with open ('./nizika_talking.wav', 'wb') as f:
|
|
f.write (self.wav)
|
|
|
|
|
|
class CurrentTime (GameObject):
|
|
"""
|
|
現在日時表示
|
|
|
|
Attributes:
|
|
font (Font): フォント
|
|
"""
|
|
|
|
font: Font
|
|
|
|
def __init__ (
|
|
self,
|
|
game: Game,
|
|
font: Font,
|
|
):
|
|
super ().__init__ (game)
|
|
self.font = font
|
|
|
|
def redraw (
|
|
self,
|
|
) -> None:
|
|
for i in range (2):
|
|
cl = (i * 255, i * 255, i * 255)
|
|
self.game.screen.blit (self.font.render (str (self.game.now), True, cl), (-i, -i))
|
|
self.game.screen.blit (self.font.render ('%2.3f fps' % self.game.fps, True, cl), (-i, 24 - i))
|
|
super ().redraw ()
|
|
|
|
|
|
class Balloon (GameObject):
|
|
"""
|
|
吹出し
|
|
|
|
Attributes:
|
|
answer (str): 回答テキスト
|
|
image_url (str, None): 画像 URL
|
|
length (int): 表示する時間 (frame)
|
|
query (str): 質問テキスト
|
|
surface (Surface): 吹出し Surface
|
|
x_flip (bool): 左右反転フラグ
|
|
y_flip (bool): 上下反転フラグ
|
|
"""
|
|
|
|
answer: str = ''
|
|
image_url: str | None = None
|
|
length: int = 300
|
|
query: str = ''
|
|
surface: Surface
|
|
x_flip: bool = False
|
|
y_flip: bool = False
|
|
|
|
def __init__ (
|
|
self,
|
|
game: Game,
|
|
x_flip: bool = False,
|
|
y_flip: bool = False,
|
|
):
|
|
super ().__init__ (game, enabled = False)
|
|
self.x_flip = x_flip
|
|
self.y_flip = y_flip
|
|
self.surface = pygame.transform.scale (pygame.image.load ('./assets/balloon.png'),
|
|
(CWindow.WIDTH, CWindow.HEIGHT / 2))
|
|
self.surface = pygame.transform.flip (self.surface, self.x_flip, self.y_flip)
|
|
|
|
def redraw (
|
|
self,
|
|
) -> None:
|
|
if self.frame >= self.length:
|
|
self.enabled = False
|
|
self.game.last_answered_at = self.game.now
|
|
return
|
|
query = self.query
|
|
if CommonModule.len_by_full (query) > 21:
|
|
query = CommonModule.mid_by_full (query, 0, 19.5) + '...'
|
|
answer = Surface (
|
|
(375, int (((CommonModule.len_by_full (self.answer) - 1) // 16 + 1) * 23.4375)),
|
|
pygame.SRCALPHA)
|
|
for i in range (int (CommonModule.len_by_full (self.answer) - 1) // 16 + 1):
|
|
answer.blit (DEERJIKA_FONT.render (
|
|
CommonModule.mid_by_full (self.answer, 16 * i, 16), True, (192, 0, 0)),
|
|
(0, 23.4375 * i))
|
|
surface = self.surface.copy ()
|
|
surface.blit (USER_FONT.render ('>' + query, True, (0, 0, 0)), (56.25, 32.8125))
|
|
y: float
|
|
if self.frame < 30:
|
|
y = 0
|
|
elif self.frame >= self.length - 90:
|
|
y = answer.get_height () - 46.875
|
|
else:
|
|
y = int ((answer.get_height () - 46.875) * (self.frame - 30) / (self.length - 120))
|
|
surface.blit (answer, (46.875, 70.3125), Rect (0, y, 375, 46.875))
|
|
self.game.screen.blit (surface, (0, 0))
|
|
super ().redraw ()
|
|
|
|
def talk (
|
|
self,
|
|
query: str,
|
|
answer: str,
|
|
image_url: str | None = None,
|
|
length: int = 300,
|
|
) -> None:
|
|
self.query = query
|
|
self.answer = answer
|
|
self.image_url = image_url
|
|
self.length = length
|
|
self.frame = 0
|
|
self.enabled = True
|
|
|
|
|
|
class KitaSun (GameObject):
|
|
"""
|
|
き太く陽
|
|
|
|
Attributes:
|
|
sun (Sun): ephem の太陽オブゼクト
|
|
surface (Surface): き太く陽 Surface
|
|
"""
|
|
|
|
alt: float
|
|
az: float
|
|
sun: Sun
|
|
surface: Surface
|
|
|
|
def __init__ (
|
|
self,
|
|
game: Game,
|
|
):
|
|
super ().__init__ (game)
|
|
self.surface = pygame.transform.scale (pygame.image.load ('./assets/sun.png'), (93.75, 93.75))
|
|
self.sun = Sun ()
|
|
|
|
def redraw (
|
|
self,
|
|
) -> None:
|
|
surface = pygame.transform.rotate (self.surface, -(90 + math.degrees (self.arg)))
|
|
self.game.screen.blit (surface, surface.get_rect (center = (self.x, self.y)))
|
|
super ().redraw ()
|
|
|
|
def update (
|
|
self,
|
|
) -> None:
|
|
self.sun.compute (self.game.sky.observer)
|
|
self.alt = self.sun.alt
|
|
self.az = self.sun.az
|
|
if abs (self.new_arg - self.arg) > math.radians (15):
|
|
self.arg = self.new_arg
|
|
self.x = self.new_x
|
|
self.y = self.new_y
|
|
super ().update ()
|
|
|
|
@property
|
|
def new_x (
|
|
self,
|
|
) -> float:
|
|
return CWindow.WIDTH * (math.degrees (self.az) - 80) / 120
|
|
|
|
@property
|
|
def new_y (
|
|
self,
|
|
) -> float:
|
|
return ((CWindow.HEIGHT / 2)
|
|
- ((CWindow.HEIGHT / 2 + 46.875) * math.sin (self.alt)
|
|
/ math.sin (math.radians (60))))
|
|
|
|
@property
|
|
def new_arg (
|
|
self,
|
|
) -> float:
|
|
return math.atan2 (self.new_y - self.y, self.new_x - self.x)
|
|
|
|
|
|
class Jojoko (GameObject):
|
|
"""
|
|
大月ヨヨコ
|
|
|
|
Attributes:
|
|
base (Surface): 満月ヨヨコ Surface
|
|
moon (Moon): ephem の月オブゼクト
|
|
surface (Surface): 缺けたヨヨコ
|
|
"""
|
|
|
|
alt: float
|
|
az: float
|
|
base: Surface
|
|
moon: Moon
|
|
surface: Surface
|
|
|
|
def __init__ (
|
|
self,
|
|
game: Game,
|
|
):
|
|
super ().__init__ (game)
|
|
self.base = pygame.transform.scale (pygame.image.load ('./assets/moon.png'), (93.75, 93.75))
|
|
self.moon = Moon ()
|
|
self.surface = self._get_surface ()
|
|
|
|
def redraw (
|
|
self,
|
|
) -> None:
|
|
self.moon.compute (self.game.sky.observer)
|
|
self.alt = self.moon.alt
|
|
self.az = self.moon.az
|
|
if abs (self.new_arg - self.arg) > math.radians (15):
|
|
self.arg = self.new_arg
|
|
self.x = self.new_x
|
|
self.y = self.new_y
|
|
if self.frame % (FPS * 3600) == 0:
|
|
self.surface = self._get_surface ()
|
|
surface = pygame.transform.rotate (self.surface, -(90 + math.degrees (self.arg)))
|
|
surface.set_colorkey ((0, 255, 0))
|
|
self.game.screen.blit (surface, surface.get_rect (center = (self.x, self.y)))
|
|
super ().redraw ()
|
|
|
|
@property
|
|
def phase (
|
|
self,
|
|
) -> float:
|
|
dt: datetime = ephem.localtime (ephem.previous_new_moon (self.game.sky.observer.date))
|
|
return (self.game.now - dt).total_seconds () / 60 / 60 / 24
|
|
|
|
def _get_surface (
|
|
self,
|
|
) -> Surface:
|
|
"""
|
|
ヨヨコを月齢に応じて缺かす.
|
|
|
|
Returns:
|
|
Surface: 缺けたヨヨコ
|
|
"""
|
|
jojoko = self.base.copy ()
|
|
for i in range (200):
|
|
if 1 <= self.phase < 15:
|
|
pygame.gfxdraw.bezier (jojoko, ((0, (100 + i) * .468_75), (46.875, (180 * self.phase / 7 - 80 + i) * .468_75), (93.75, (100 + i) * .468_75)), 3, (0, 255, 0))
|
|
elif self.phase < 16:
|
|
pass
|
|
elif self.phase < 30:
|
|
pygame.gfxdraw.bezier (jojoko, ((0, (100 - i) * .468_75), (46.875, (180 * (self.phase - 15) / 7 - 80 - i) * .468_75), (93.75, (100 - i) * .468_75)), 3, (0, 255, 0))
|
|
else:
|
|
jojoko.fill ((0, 255, 0))
|
|
return jojoko
|
|
|
|
@property
|
|
def new_x (
|
|
self,
|
|
) -> float:
|
|
return CWindow.WIDTH * (math.degrees (self.az) - 80) / 120
|
|
|
|
@property
|
|
def new_y (
|
|
self,
|
|
) -> float:
|
|
return ((CWindow.HEIGHT / 2)
|
|
- ((CWindow.HEIGHT / 2 + 46.875) * math.sin (self.alt)
|
|
/ math.sin (math.radians (60))))
|
|
|
|
@property
|
|
def new_arg (
|
|
self,
|
|
) -> float:
|
|
return math.atan2 (self.new_y - self.y, self.new_x - self.x)
|
|
|
|
|
|
class Sky:
|
|
"""
|
|
天体に関する情報を保持するクラス
|
|
|
|
Attributes:
|
|
observer (Observer): 観測値
|
|
"""
|
|
|
|
observer: Observer
|
|
|
|
|
|
class CWindow:
|
|
"""
|
|
ウィンドゥに関する定数クラス
|
|
|
|
Attributes:
|
|
WIDTH (int): ウィンドゥ幅
|
|
HEIGHT (int): ウィンドゥ高さ
|
|
"""
|
|
|
|
WIDTH = 480
|
|
HEIGHT = 360
|
|
|
|
|
|
class Broadcast:
|
|
chat: PytchatCore
|
|
code: str
|
|
|
|
def __init__ (
|
|
self,
|
|
broadcast_code,
|
|
):
|
|
self.code = broadcast_code
|
|
self.chat = pytchat.create (self.code)
|
|
|
|
def fetch_chat (
|
|
self,
|
|
) -> Chat | None:
|
|
if not self.chat.is_alive ():
|
|
self.chat = pytchat.create (self.code)
|
|
return None
|
|
chats = self.chat.get ().items
|
|
if not chats:
|
|
return None
|
|
return random.choice (chats)
|
|
|
|
|
|
class Video (GameObject):
|
|
fps: int
|
|
pausing: bool = False
|
|
sound: Sound | None
|
|
surfaces: list[Surface]
|
|
|
|
def __init__ (
|
|
self,
|
|
game: Game,
|
|
path: str,
|
|
):
|
|
super ().__init__ (game)
|
|
self.pausing = False
|
|
(self.surfaces, self.fps) = self._create_surfaces (path)
|
|
self.sound = self._create_sound (path)
|
|
self.stop ()
|
|
|
|
def _create_sound (
|
|
self,
|
|
path: str,
|
|
) -> Sound | None:
|
|
bytes_io = BytesIO ()
|
|
try:
|
|
from pydub import AudioSegment
|
|
audio = AudioSegment.from_file (path, format = path.split ('.')[-1])
|
|
except ModuleNotFoundError:
|
|
return None
|
|
audio.export (bytes_io, format = 'wav')
|
|
bytes_io.seek (0)
|
|
return pygame.mixer.Sound (bytes_io)
|
|
|
|
def _create_surfaces (
|
|
self,
|
|
path: str,
|
|
) -> tuple[list[Surface], int]:
|
|
cap = self._load (path)
|
|
surfaces: list[Surface] = []
|
|
if cap is None:
|
|
return ([], FPS)
|
|
fps = int (cap.get (cv2.CAP_PROP_FPS))
|
|
while cap.isOpened ():
|
|
frame = self._read_frame (cap)
|
|
if frame is None:
|
|
break
|
|
surfaces.append (self._convert_to_surface (frame))
|
|
new_surfaces: list[Surface] = []
|
|
for i in range (len (surfaces) * FPS // fps):
|
|
new_surfaces.append (surfaces[i * fps // FPS])
|
|
return (new_surfaces, fps)
|
|
|
|
def _load (
|
|
self,
|
|
path: str,
|
|
) -> VideoCapture | None:
|
|
"""
|
|
OpenCV で動画を読込む.
|
|
"""
|
|
cap = VideoCapture (path)
|
|
if cap.isOpened ():
|
|
return cap
|
|
return None
|
|
|
|
def _read_frame (
|
|
self,
|
|
cap: VideoCapture,
|
|
) -> np.ndarray | None:
|
|
"""
|
|
動画のフレームを読込む.
|
|
"""
|
|
ret: bool
|
|
frame: np.ndarray
|
|
(ret, frame) = cap.read ()
|
|
if ret:
|
|
return frame
|
|
return None
|
|
|
|
def _convert_to_surface (
|
|
self,
|
|
frame: np.ndarray,
|
|
) -> Surface:
|
|
frame = cv2.cvtColor (frame, cv2.COLOR_BGR2RGB)
|
|
frame_surface = pygame.surfarray.make_surface (frame)
|
|
frame_surface = pygame.transform.rotate (frame_surface, -90)
|
|
frame_surface = pygame.transform.flip (frame_surface, True, False)
|
|
return frame_surface
|
|
|
|
def play (
|
|
self,
|
|
) -> None:
|
|
self.enabled = True
|
|
self.pausing = False
|
|
if self.sound is not None:
|
|
self.sound.play ()
|
|
|
|
def stop (
|
|
self,
|
|
) -> None:
|
|
self.enabled = False
|
|
self.frame = 0
|
|
|
|
def pause (
|
|
self,
|
|
) -> None:
|
|
self.pausing = True
|
|
|
|
def redraw (
|
|
self,
|
|
) -> None:
|
|
surface = pygame.transform.scale (self.surfaces[self.frame], (self.width, self.height))
|
|
self.game.screen.blit (surface, (self.x, self.y))
|
|
super ().redraw ()
|
|
|
|
def update (
|
|
self,
|
|
) -> None:
|
|
if self.frame >= len (self.surfaces) - 1:
|
|
self.stop ()
|
|
if self.pausing:
|
|
self.frame -= 1
|
|
super ().update ()
|
|
|
|
|
|
class NicoVideo (Video):
|
|
...
|
|
|
|
|
|
class SnackTime (Video):
|
|
def __init__ (
|
|
self,
|
|
game: Game,
|
|
):
|
|
super ().__init__ (game, './assets/snack_time.mp4')
|
|
(self.width, self.height) = (CWindow.HEIGHT * 16 // 9, CWindow.HEIGHT)
|
|
(self.x, self.y) = ((CWindow.WIDTH - self.width) / 2, 0)
|
|
|
|
|
|
def fetch_bytes_from_url (
|
|
url: str,
|
|
) -> bytes | None:
|
|
res = requests.get (url, timeout = 60)
|
|
if res.status_code != 200:
|
|
return None
|
|
return res.content
|
|
|
|
|
|
def add_query (
|
|
broadcast: Broadcast,
|
|
) -> None:
|
|
chat = broadcast.fetch_chat ()
|
|
if chat is None:
|
|
return
|
|
DB.begin_transaction ()
|
|
chat.message = emoji.emojize (chat.message)
|
|
user = (User.where ('platform', Platform.YOUTUBE.value)
|
|
.where ('code', chat.author.channelId)
|
|
.first ())
|
|
if user is None:
|
|
user = User ()
|
|
user.platform = Platform.YOUTUBE.value
|
|
user.code = chat.author.channelId
|
|
user.name = chat.author.name
|
|
user.icon = fetch_bytes_from_url (chat.author.imageUrl)
|
|
user.save ()
|
|
query = Query ()
|
|
query.user_id = user.id
|
|
query.target_character = Character.DEERJIKA.value
|
|
query.content = chat.message
|
|
query.query_type = QueryType.YOUTUBE_COMMENT.value
|
|
query.model = GPTModel.GPT3_TURBO.value
|
|
query.sent_at = datetime.now ()
|
|
query.answered = False
|
|
query.save ()
|
|
DB.commit ()
|
|
|
|
|
|
def log (
|
|
msg: str,
|
|
) -> None:
|
|
print (f"[{ datetime.now ().isoformat (sep = ' ', timespec = 'seconds') }] { msg }",
|
|
flush = True)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main ()
|