from __future__ import annotations import math import os import random import sys import wave from datetime import datetime, timedelta from enum import Enum, auto from io import BytesIO from typing import Callable, TypedDict import cv2 import emoji import ephem import numpy as np import pygame import pygame.gfxdraw import pytchat import requests from cv2 import VideoCapture from ephem import Moon, Observer, Sun from pygame import Rect, Surface from pygame.font import Font from pygame.mixer import Sound from pygame.time import Clock from pytchat.core.pytchat import PytchatCore from pytchat.processors.default.processor import Chat from aques import Aques from common_module import CommonModule from nizika_ai.config import DB from nizika_ai.consts import (AnswerType, Character, GPTModel, Platform, QueryType) from nizika_ai.models import Answer, AnsweredFlag, Query, User pygame.init () FPS = 30 SYSTEM_FONT = pygame.font.SysFont ('notosanscjkjp', 24, bold = True) USER_FONT = pygame.font.SysFont ('notosanscjkjp', 32, italic = True) DEERJIKA_FONT = pygame.font.SysFont ('07nikumarufont', 50) def main ( ) -> None: game = Game () Bg (game) balloon = Balloon (game) deerjika = Deerjika (game, DeerjikaPattern.RELAXED, x = CWindow.WIDTH * 3 / 4, y = CWindow.HEIGHT - 120, balloon = balloon) CurrentTime (game, SYSTEM_FONT) try: broadcast = Broadcast (os.environ['BROADCAST_CODE']) except Exception: pass while True: for event in pygame.event.get (): if event.type == pygame.QUIT: pygame.quit () sys.exit () if not balloon.enabled: try: DB.begin_transaction () answer_flags = (AnsweredFlag.where ('platform', Platform.YOUTUBE.value) .where ('answered', False) .get ()) if answer_flags: answer_flag = random.choice (answer_flags) answer = Answer.find (answer_flag.answer_id) if answer.answer_type == AnswerType.YOUTUBE_REPLY.value: query = Query.find (answer.query_id) deerjika.talk (query.content, answer.content) answer_flag.answered = True answer_flag.save () DB.commit () add_query (broadcast) except Exception: pass game.redraw () class Bg: """ 背景オブゼクト管理用クラス Attributes: base (BgBase): 最背面 grass (BgGrass): 草原部分 jojoko (Jojoko): 大月ヨヨコ kita (KitaSun): き太く陽 """ base: BgBase grass: BgGrass jojoko: Jojoko kita: KitaSun def __init__ ( self, game: Game, ): self.base = BgBase (game) self.jojoko = Jojoko (game) self.kita = KitaSun (game) self.grass = BgGrass (game) class DeerjikaPattern (Enum): """ ニジカの状態 Members: NORMAL: 通常 RELAXED: 足パタパタ SLEEPING: 寝ニジカ DANCING: ダンシング・ニジカ """ NORMAL = auto () RELAXED = auto () SLEEPING = auto () DANCING = auto () class Direction (Enum): """ クリーチャの向き Members: LEFT: 左向き RIGHT: 右向き """ LEFT = auto () RIGHT = auto () class Game: """ ゲーム・クラス Attributes: clock (Clock): Clock オブゼクト frame (int): フレーム・カウンタ last_answered_at (datetime): 最後に回答した時刻 now (datetime): 基準日時 objects (list[GameObject]): 再描画するクラスのリスト screen (Surface): 基底スクリーン sky (Sky): 天体情報 """ bgm: Sound clock: Clock fps: float frame: int last_answered_at: datetime now: datetime objects: list[GameObject] screen: Surface sky: Sky def __init__ ( self, ): self.now = datetime.now () self.screen = pygame.display.set_mode ((CWindow.WIDTH, CWindow.HEIGHT)) self.clock = Clock () self.fps = FPS self.frame = 0 self.objects = [] self.bgm = Sound ('assets/bgm.mp3') self.bgm.set_volume (.15) self.bgm.play (loops = -1) self._create_sky () def redraw ( self, ) -> None: self.now = datetime.now () self.sky.observer.date = self.now - timedelta (hours = 9) for obj in sorted (self.objects, key = lambda obj: obj.layer): if obj.enabled: obj.redraw () pygame.display.update () delta_time = self.clock.tick (FPS) / 1000 self.fps = 1 / delta_time if delta_time > 1 / FPS: for _ in range (int (FPS * delta_time) - 1): for obj in self.objects: if obj.enabled: obj.update () def _create_sky ( self, ) -> None: self.sky = Sky () self.sky.observer = Observer () self.sky.observer.lat = '35' self.sky.observer.lon = '139' class GameObject: """ 各ゲーム・オブゼクトの基底クラス Attributes: arg (float): 回転角度 (rad) ax (float): X 軸に対する加速度 (px/frame^2) ay (float): y 軸に対する加速度 (px/frame^2) enabled (bool): オブゼクトの表示可否 frame (int): フレーム・カウンタ game (Game): ゲーム基盤 height (int): 高さ (px) vx (float): x 軸に対する速度 (px/frame) vy (float): y 軸に対する速度 (px/frame) width (int): 幅 (px) x (float): X 座標 (px) y (float): Y 座標 (px) """ arg: float = 0 ax: float = 0 ay: float = 0 enabled: bool = True frame: int game: Game height: int layer: float vx: float = 0 vy: float = 0 width: int x: float y: float def __init__ ( self, game: Game, layer: float | None = None, enabled: bool = True, x: float = 0, y: float = 0, ): self.game = game self.enabled = enabled self.frame = 0 if layer is None: if self.game.objects: layer = max (obj.layer for obj in self.game.objects) + 10 else: layer = 0 self.layer = layer self.x = x self.y = y self.game.objects.append (self) def redraw ( self, ) -> None: self.update () def update ( self, ) -> None: self.x += self.vx self.y += self.vy self.vx += self.ax self.vy += self.ay self.frame += 1 class BgBase (GameObject): """ 背景 Attributes: surface (Surface): 背景 Surface """ bg: Surface bg_evening: Surface bg_grass: Surface bg_night: Surface surface: Surface def __init__ ( self, game: Game, ): super ().__init__ (game) self.bg = pygame.image.load ('assets/bg.jpg') self.bg_evening = pygame.image.load ('assets/bg-evening.jpg') self.bg_grass = pygame.image.load ('assets/bg-grass.png') self.bg_night = pygame.image.load ('assets/bg-night.jpg') self.surface = pygame.transform.scale (self.bg, (CWindow.WIDTH, CWindow.HEIGHT)) def redraw ( self, ) -> None: self.game.screen.blit (self.surface, (self.x, self.y)) super ().redraw () class BgGrass (GameObject): """ 背景の草原部分 Attributes: surface (Surface): 草原 Surface """ surface: Surface def __init__ ( self, game: Game, ): super ().__init__ (game) self.game = game self.surface = pygame.image.load ('assets/bg-grass.png') self.surface = pygame.transform.scale (self.surface, (CWindow.WIDTH, CWindow.HEIGHT)) def redraw ( self, ) -> None: self.game.screen.blit (self.surface, (self.x, self.y)) super ().redraw () class Creature (GameObject): sound: Sound def bell ( self, ) -> None: self.sound.play () class Deerjika (Creature): """ 伊地知ニジカ Attributes: height (int): 高さ (px) scale (float): 拡大率 surfaces (list[Surface]): ニジカの各フレームを Surface にしたリスト width (int): 幅 (px) """ FPS = 30 height: int scale: float = .8 surfaces: list[Surface] width: int talking: bool = False wav: bytearray | None = None balloon: Balloon def __init__ ( self, game: Game, pattern: DeerjikaPattern = DeerjikaPattern.NORMAL, direction: Direction = Direction.LEFT, layer: float | None = None, x: float = 0, y: float = 0, balloon: Balloon | None = None, ): if balloon is None: raise Exception super ().__init__ (game, layer, x = x, y = y) self.pattern = pattern self.direction = direction self.balloon = balloon match pattern: case DeerjikaPattern.NORMAL: ... case DeerjikaPattern.RELAXED: match direction: case Direction.LEFT: self.width = 1280 self.height = 720 surface = pygame.image.load ('assets/deerjika_relax_left.png') self.surfaces = [] for x in range (0, surface.get_width (), self.width): self.surfaces.append ( surface.subsurface (x, 0, self.width, self.height)) case Direction.RIGHT: ... self.sound = Sound ('assets/noon.wav') def redraw ( self, ) -> None: surface = pygame.transform.scale (self.surfaces[self.frame * self.FPS // FPS % len (self.surfaces)], (self.width * self.scale, self.height * self.scale)) self.game.screen.blit (surface, surface.get_rect (center = (self.x, self.y))) super ().redraw () def update ( self, ) -> None: if (not self.balloon.enabled) and self.talking: self.talking = False if (self.balloon.enabled and self.balloon.frame >= FPS * 1.5 and not self.talking): self.read_out () super ().update () def talk ( self, query: str, answer: str, ) -> None: self.bell () self._create_wav (answer) length = 300 if self.wav is not None: with wave.open ('./nizika_talking.wav', 'rb') as f: length = int (FPS * (f.getnframes () / f.getframerate () + 4)) self.balloon.talk (query, answer, length = length) def read_out ( self, ) -> None: Sound ('./nizika_talking.wav').play () self.talking = True def _create_wav ( self, message: str, ) -> None: try: self.wav = Aques.main (message, False) except: self.wav = None if self.wav is None: return with open ('./nizika_talking.wav', 'wb') as f: f.write (self.wav) class CurrentTime (GameObject): """ 現在日時表示 Attributes: font (Font): フォント """ font: Font def __init__ ( self, game: Game, font: Font, ): super ().__init__ (game) self.font = font def redraw ( self, ) -> None: for i in range (4): self.game.screen.blit ( self.font.render (f"{ self.game.now } { self.game.fps } fps", True, (0, 0, 0)), (i % 2, i // 2 * 2)) super ().redraw () class Balloon (GameObject): """ 吹出し Attributes: answer (str): 回答テキスト image_url (str, None): 画像 URL length (int): 表示する時間 (frame) query (str): 質問テキスト surface (Surface): 吹出し Surface x_flip (bool): 左右反転フラグ y_flip (bool): 上下反転フラグ """ answer: str = '' image_url: str | None = None length: int = 300 query: str = '' surface: Surface x_flip: bool = False y_flip: bool = False def __init__ ( self, game: Game, x_flip: bool = False, y_flip: bool = False, ): super ().__init__ (game, enabled = False) self.x_flip = x_flip self.y_flip = y_flip self.surface = pygame.transform.scale (pygame.image.load ('assets/balloon.png'), (CWindow.WIDTH, CWindow.HEIGHT / 2)) self.surface = pygame.transform.flip (self.surface, self.x_flip, self.y_flip) def redraw ( self, ) -> None: if self.frame >= self.length: self.enabled = False self.game.last_answered_at = self.game.now return query = self.query if CommonModule.len_by_full (query) > 21: query = CommonModule.mid_by_full (query, 0, 19.5) + '...' answer = Surface ((800, ((CommonModule.len_by_full (self.answer) - 1) // 16 + 1) * 50), pygame.SRCALPHA) for i in range (int (CommonModule.len_by_full (self.answer) - 1) // 16 + 1): answer.blit (DEERJIKA_FONT.render ( CommonModule.mid_by_full (self.answer, 16 * i, 16), True, (192, 0, 0)), (0, 50 * i)) surface = self.surface.copy () surface.blit (USER_FONT.render ('>' + query, True, (0, 0, 0)), (120, 70)) y: int if self.frame < 30: y = 0 elif self.frame >= self.length - 90: y = answer.get_height () - 100 else: y = int ((answer.get_height () - 100) * (self.frame - 30) / (self.length - 120)) surface.blit (answer, (100, 150), Rect (0, y, 800, 100)) self.game.screen.blit (surface, (0, 0)) super ().redraw () def talk ( self, query: str, answer: str, image_url: str | None = None, length: int = 300, ) -> None: self.query = query self.answer = answer self.image_url = image_url self.length = length self.frame = 0 self.enabled = True class KitaSun (GameObject): """ き太く陽 Attributes: sun (Sun): ephem の太陽オブゼクト surface (Surface): き太く陽 Surface """ alt: float az: float sun: Sun surface: Surface def __init__ ( self, game: Game, ): super ().__init__ (game) self.surface = pygame.transform.scale (pygame.image.load ('assets/sun.png'), (200, 200)) self.sun = Sun () def redraw ( self, ) -> None: surface = pygame.transform.rotate (self.surface, -(90 + math.degrees (self.arg))) self.game.screen.blit (surface, surface.get_rect (center = (self.x, self.y))) super ().redraw () def update ( self, ) -> None: self.sun.compute (self.game.sky.observer) self.alt = self.sun.alt self.az = self.sun.az if abs (self.new_arg - self.arg) > math.radians (15): self.arg = self.new_arg self.x = self.new_x self.y = self.new_y super ().update () @property def new_x ( self, ) -> float: return CWindow.WIDTH * (math.degrees (self.az) - 80) / 120 @property def new_y ( self, ) -> float: return ((CWindow.HEIGHT / 2) - ((CWindow.HEIGHT / 2 + 100) * math.sin (self.alt) / math.sin (math.radians (60)))) @property def new_arg ( self, ) -> float: return math.atan2 (self.new_y - self.y, self.new_x - self.x) class Jojoko (GameObject): """ 大月ヨヨコ Attributes: base (Surface): 満月ヨヨコ Surface moon (Moon): ephem の月オブゼクト surface (Surface): 缺けたヨヨコ """ alt: float az: float base: Surface moon: Moon surface: Surface def __init__ ( self, game: Game, ): super ().__init__ (game) self.base = pygame.transform.scale (pygame.image.load ('assets/moon.png'), (200, 200)) self.moon = Moon () self.surface = self._get_surface () def redraw ( self, ) -> None: self.moon.compute (self.game.sky.observer) self.alt = self.moon.alt self.az = self.moon.az if abs (self.new_arg - self.arg) > math.radians (15): self.arg = self.new_arg self.x = self.new_x self.y = self.new_y if self.frame % (FPS * 3600) == 0: self.surface = self._get_surface () surface = pygame.transform.rotate (self.surface, -(90 + math.degrees (self.arg))) surface.set_colorkey ((0, 255, 0)) self.game.screen.blit (surface, surface.get_rect (center = (self.x, self.y))) super ().redraw () @property def phase ( self, ) -> float: dt: datetime = ephem.localtime (ephem.previous_new_moon (self.game.sky.observer.date)) return (self.game.now - dt).total_seconds () / 60 / 60 / 24 def _get_surface ( self, ) -> Surface: """ ヨヨコを月齢に応じて缺かす. Returns: Surface: 缺けたヨヨコ """ jojoko = self.base.copy () for i in range (200): if 1 <= self.phase < 15: pygame.gfxdraw.bezier (jojoko, ((0, 100 + i), (100, 180 * self.phase / 7 - 80 + i), (200, 100 + i)), 3, (0, 255, 0)) elif self.phase < 16: pass elif self.phase < 30: pygame.gfxdraw.bezier (jojoko, ((0, 100 - i), (100, 180 * (self.phase - 15) / 7 - 80 - i), (200, 100 - i)), 3, (0, 255, 0)) else: jojoko.fill ((0, 255, 0)) return jojoko @property def new_x ( self, ) -> float: return CWindow.WIDTH * (math.degrees (self.az) - 80) / 120 @property def new_y ( self, ) -> float: return ((CWindow.HEIGHT / 2) - ((CWindow.HEIGHT / 2 + 100) * math.sin (self.alt) / math.sin (math.radians (60)))) @property def new_arg ( self, ) -> float: return math.atan2 (self.new_y - self.y, self.new_x - self.x) class Sky: """ 天体に関する情報を保持するクラス Attributes: observer (Observer): 観測値 """ observer: Observer class CWindow: """ ウィンドゥに関する定数クラス Attributes: WIDTH (int): ウィンドゥ幅 HEIGHT (int): ウィンドゥ高さ """ WIDTH = 1024 HEIGHT = 768 class Broadcast: chat: PytchatCore def __init__ ( self, broadcast_code, ): self.chat = pytchat.create (broadcast_code) def fetch_chat ( self, ) -> Chat | None: if not self.chat.is_alive (): return None chats = self.chat.get ().items if not chats: return None print (f"{ datetime.now () }: { chats }") return random.choice (chats) class Video (GameObject): fps: int pausing: bool = False sound: Sound | None surfaces: list[Surface] def __init__ ( self, game: Game, path: str, ): super ().__init__ (game) self.pausing = False (self.surfaces, self.fps) = self._create_surfaces (path) self.sound = self._create_sound (path) self.stop () def _create_sound ( self, path: str, ) -> Sound | None: bytes_io = BytesIO () try: from pydub import AudioSegment audio = AudioSegment.from_file (path, format = path.split ('.')[-1]) except ModuleNotFoundError: return None audio.export (bytes_io, format = 'wav') bytes_io.seek (0) return pygame.mixer.Sound (bytes_io) def _create_surfaces ( self, path: str, ) -> tuple[list[Surface], int]: cap = self._load (path) surfaces: list[Surface] = [] if cap is None: return ([], FPS) fps = int (cap.get (cv2.CAP_PROP_FPS)) while cap.isOpened (): frame = self._read_frame (cap) if frame is None: break surfaces.append (self._convert_to_surface (frame)) new_surfaces: list[Surface] = [] for i in range (len (surfaces) * FPS // fps): new_surfaces.append (surfaces[i * fps // FPS]) return (new_surfaces, fps) def _load ( self, path: str, ) -> VideoCapture | None: """ OpenCV で動画を読込む. """ cap = VideoCapture (path) if cap.isOpened (): return cap return None def _read_frame ( self, cap: VideoCapture, ) -> np.ndarray | None: """ 動画のフレームを読込む. """ ret: bool frame: np.ndarray (ret, frame) = cap.read () if ret: return frame return None def _convert_to_surface ( self, frame: np.ndarray, ) -> Surface: frame = cv2.cvtColor (frame, cv2.COLOR_BGR2RGB) frame_surface = pygame.surfarray.make_surface (frame) frame_surface = pygame.transform.rotate (frame_surface, -90) frame_surface = pygame.transform.flip (frame_surface, True, False) return frame_surface def play ( self, ) -> None: self.enabled = True self.pausing = False if self.sound is not None: self.sound.play () def stop ( self, ) -> None: self.enabled = False self.frame = 0 def pause ( self, ) -> None: self.pausing = True def redraw ( self, ) -> None: self.game.screen.blit (self.surfaces[self.frame], (self.x, self.y)) super ().redraw () def update ( self, ) -> None: if self.frame >= len (self.surfaces) - 1: self.pause () if self.pausing: self.frame -= 1 super ().update () class NicoVideo (Video): ... def fetch_bytes_from_url ( url: str, ) -> bytes | None: res = requests.get (url, timeout = 60) if res.status_code != 200: return None return res.content def add_query ( broadcast: Broadcast, ) -> None: chat = broadcast.fetch_chat () if chat is None: return DB.begin_transaction () chat.message = emoji.emojize (chat.message) message: str = chat.message user = (User.where ('platform', Platform.YOUTUBE.value) .where ('code', chat.author.channelId) .first ()) if user is None: user = User () user.platform = Platform.YOUTUBE.value user.code = chat.author.channelId user.name = chat.author.name user.icon = fetch_bytes_from_url (chat.author.imageUrl) user.save () query = Query () query.user_id = user.id query.target_character = Character.DEERJIKA.value query.content = chat.message query.query_type = QueryType.YOUTUBE_COMMENT.value query.model = GPTModel.GPT3_TURBO.value query.sent_at = datetime.now () query.answered = False query.save () DB.commit () if __name__ == '__main__': main ()