Compare commits

...

4 Commits

Author SHA1 Message Date
  みてるぞ a8239f3de3 #42 2 weeks ago
  みてるぞ 1f46dc3974 #42 2 weeks ago
  みてるぞ b88f4b0ee1 #42 2 weeks ago
  みてるぞ f34dd36b6a #42 2 weeks ago
5 changed files with 245 additions and 96 deletions
Split View
  1. +3
    -0
      .gitignore
  2. BIN
      assets/tv.png
  3. +146
    -96
      main.py
  4. +0
    -0
      outputs/.gitkeep
  5. +96
    -0
      video.py

+ 3
- 0
.gitignore View File

@@ -3,3 +3,6 @@ __pycache__
/nizika_talking.wav
/youtube.py
/log.txt
/outputs/**
!/outputs/**/
!/outputs/**/.gitkeep

BIN
assets/tv.png View File

Before After
Width: 658  |  Height: 791  |  Size: 463 KiB

+ 146
- 96
main.py View File

@@ -1,15 +1,17 @@
from __future__ import annotations

import json
import math
import os
import random
import subprocess
import sys
import time
import wave
from datetime import datetime, timedelta
from enum import Enum, auto
from io import BytesIO
from typing import Callable, TypedDict
from typing import Any, Callable, TypedDict, cast

import cv2
import emoji
@@ -28,12 +30,16 @@ from pygame.time import Clock
from pytchat.core.pytchat import PytchatCore
from pytchat.processors.default.processor import Chat

import video
from aques import Aques
from common_module import CommonModule
from niconico import NicoNico
from nizika_ai.config import DB
from nizika_ai.consts import Character, GPTModel, Platform, QueryType
from nizika_ai.models import Answer, AnsweredFlag, Query, User

NIZIKA_NICO_DIR = os.environ.get ('NIZIKA_NICO_DIR') or '/root/nizika_nico'

pygame.init ()

FPS = 30
@@ -42,6 +48,8 @@ SYSTEM_FONT = pygame.font.SysFont ('notosanscjkjp', 11, bold = True)
USER_FONT = pygame.font.SysFont ('notosanscjkjp', 15, italic = True)
DEERJIKA_FONT = pygame.font.SysFont ('07nikumarufont', 23)

WATCHING_MSG = 'それじゃあ、さっそく見てみるぬ゛ん゛!'


def main (
) -> None:
@@ -53,6 +61,8 @@ def main (
y = CWindow.HEIGHT - 56.25,
balloon = balloon)
snack_time = SnackTime (game)
nico_video = NicoVideo (game)
nico_video_layer = nico_video.layer
CurrentTime (game, DEERJIKA_FONT)

broadcast: Broadcast | None = None
@@ -61,7 +71,7 @@ def main (
except KeyError:
broadcast = None

waiting_balloon = (False, '', '')
waiting_balloon: tuple[bool, str | None, str] = (False, '', '')
last_flags_poll: float = 0
traced_af_ids: list[int] = []

@@ -73,9 +83,13 @@ def main (
pygame.quit ()
sys.exit ()

if (not balloon.enabled) and (not snack_time.enabled):
if ((not balloon.enabled)
and (not snack_time.enabled)
and (not nico_video.enabled)):
if waiting_balloon[0]:
deerjika.talk (waiting_balloon[1], waiting_balloon[2])
if waiting_balloon[2] == WATCHING_MSG:
nico_video.play ()
waiting_balloon = (False, '', '')

if now_m - last_flags_poll >= 1:
@@ -104,6 +118,14 @@ def main (
waiting_balloon = (True, query.content, answer.content)
answer_flag.answered = True
answer_flag.save ()
case QueryType.KIRIBAN:
query = Query.find (answer.query_id)
deerjika.talk (None, answer.content)
waiting_balloon = (True, None, WATCHING_MSG)
nico_video = NicoVideo (game, query.transfer_data.get ('video_code'))
nico_video.layer = nico_video_layer
answer_flag.answered = True
answer_flag.save ()
case _:
traced_af_ids.append (answer_flag.id)
DB.commit ()
@@ -490,7 +512,7 @@ class Deerjika (Creature):

def talk (
self,
query: str,
query: str | None,
answer: str,
) -> None:
self.bell ()
@@ -560,7 +582,7 @@ class Balloon (GameObject):
answer (str): 回答テキスト
image_url (str, None): 画像 URL
length (int): 表示する時間 (frame)
query (str): 質問テキスト
query (str, None): 質問テキスト
surface (Surface): 吹出し Surface
x_flip (bool): 左右反転フラグ
y_flip (bool): 上下反転フラグ
@@ -569,7 +591,7 @@ class Balloon (GameObject):
answer: str = ''
image_url: str | None = None
length: int = 300
query: str = ''
query: str | None = None
surface: Surface
x_flip: bool = False
y_flip: bool = False
@@ -595,8 +617,10 @@ class Balloon (GameObject):
self.game.last_answered_at = self.game.now
return
query = self.query
if CommonModule.len_by_full (query) > 21:
if query and CommonModule.len_by_full (query) > 21:
query = CommonModule.mid_by_full (query, 0, 19.5) + '...'
if query is not None:
query = '>' + query
answer = Surface (
(375, int (((CommonModule.len_by_full (self.answer) - 1) // 16 + 1) * 23.4375)),
pygame.SRCALPHA)
@@ -605,7 +629,7 @@ class Balloon (GameObject):
CommonModule.mid_by_full (self.answer, 16 * i, 16), True, (192, 0, 0)),
(0, 23.4375 * i))
surface = self.surface.copy ()
surface.blit (USER_FONT.render ('>' + query, True, (0, 0, 0)), (56.25, 32.8125))
surface.blit (USER_FONT.render (query, True, (0, 0, 0)), (56.25, 32.8125))
y: float
if self.frame < 30:
y = 0
@@ -619,7 +643,7 @@ class Balloon (GameObject):

def talk (
self,
query: str,
query: str | None,
answer: str,
image_url: str | None = None,
length: int = 300,
@@ -815,7 +839,7 @@ class Broadcast:

def __init__ (
self,
broadcast_code,
broadcast_code: str,
):
self.code = broadcast_code
self.chat = pytchat.create (self.code)
@@ -833,8 +857,11 @@ class Broadcast:


class Video (GameObject):
cap: VideoCapture | None
current: Surface | None
fps: int
pausing: bool = False
path: str
pausing: bool = False
sound: Sound | None
surfaces: list[Surface]

@@ -844,86 +871,19 @@ class Video (GameObject):
path: str,
):
super ().__init__ (game)
self.pausing = False
(self.surfaces, self.fps) = self._create_surfaces (path)
self.path = path
self.cap = None
self.current = None
self.sound = self._create_sound (path)
self.stop ()

def _create_sound (
self,
path: str,
) -> Sound | None:
bytes_io = BytesIO ()
try:
from pydub import AudioSegment
audio = AudioSegment.from_file (path, format = path.split ('.')[-1])
except ModuleNotFoundError:
return None
audio.export (bytes_io, format = 'wav')
bytes_io.seek (0)
return pygame.mixer.Sound (bytes_io)

def _create_surfaces (
self,
path: str,
) -> tuple[list[Surface], int]:
cap = self._load (path)
surfaces: list[Surface] = []
if cap is None:
return ([], FPS)
fps = int (cap.get (cv2.CAP_PROP_FPS))
while cap.isOpened ():
frame = self._read_frame (cap)
if frame is None:
break
surfaces.append (self._convert_to_surface (frame))
new_surfaces: list[Surface] = []
for i in range (len (surfaces) * FPS // fps):
new_surfaces.append (surfaces[i * fps // FPS])
return (new_surfaces, fps)

def _load (
self,
path: str,
) -> VideoCapture | None:
"""
OpenCV で動画を読込む.
"""
cap = VideoCapture (path)
if cap.isOpened ():
return cap
return None

def _read_frame (
self,
cap: VideoCapture,
) -> np.ndarray | None:
"""
動画のフレームを読込む.
"""
ret: bool
frame: np.ndarray
(ret, frame) = cap.read ()
if ret:
return frame
return None

def _convert_to_surface (
self,
frame: np.ndarray,
) -> Surface:
frame = cv2.cvtColor (frame, cv2.COLOR_BGR2RGB)
frame_surface = pygame.surfarray.make_surface (frame)
frame_surface = pygame.transform.rotate (frame_surface, -90)
frame_surface = pygame.transform.flip (frame_surface, True, False)
return frame_surface

def play (
self,
) -> None:
self.enabled = True
self.pausing = False
if self.sound is not None:
self.cap = VideoCapture (self.path)
self.frame = 0
if self.sound:
self.sound.play ()

def stop (
@@ -931,31 +891,72 @@ class Video (GameObject):
) -> None:
self.enabled = False
self.frame = 0

def pause (
self,
) -> None:
self.pausing = True
if self.cap:
self.cap.release ()
self.cap = None

def redraw (
self,
) -> None:
surface = pygame.transform.scale (self.surfaces[self.frame], (self.width, self.height))
self.game.screen.blit (surface, (self.x, self.y))
if (not self.enabled) or (self.cap is None):
return
ret, frame = self.cap.read ()
if not ret:
self.stop ()
return
surf = self._convert_to_surface (frame)
surf = pygame.transform.scale (surf, (self.width, self.height))
self.game.screen.blit (surf, (self.x, self.y))
super ().redraw ()

def update (
self,
) -> None:
if self.frame >= len (self.surfaces) - 1:
self.stop ()
if self.pausing:
self.frame -= 1
super ().update ()

def _convert_to_surface (
self,
frame: np.ndarray,
) -> Surface:
frame = cv2.cvtColor (frame, cv2.COLOR_BGR2RGB)
frame_surface = pygame.surfarray.make_surface (frame)
frame_surface = pygame.transform.rotate (frame_surface, -90)
frame_surface = pygame.transform.flip (frame_surface, True, False)
return frame_surface

def _create_sound (
self,
path: str,
) -> Sound | None:
bytes_io = BytesIO ()
try:
from pydub import AudioSegment
audio = AudioSegment.from_file (path, format = path.split ('.')[-1])
except ModuleNotFoundError:
return None
audio.export (bytes_io, format = 'wav')
bytes_io.seek (0)
return pygame.mixer.Sound (bytes_io)


class NicoVideo (Video):
...
def __init__ (
self,
game: Game,
video_code: str | None = None,
):
if video_code is None:
super ().__init__ (game, './assets/snack_time.mp4')
else:
try:
comments = fetch_comments (video_code)
fetch_nico_video (video_code, comments)
super ().__init__ (game, './outputs/output.mp4')
except Exception as ex:
print (ex)
super ().__init__ (game, './assets/snack_time.mp4')
(self.width, self.height) = (CWindow.HEIGHT * 16 // 9, CWindow.HEIGHT)
(self.x, self.y) = ((CWindow.WIDTH - self.width) / 2, 0)


class SnackTime (Video):
@@ -977,6 +978,50 @@ def fetch_bytes_from_url (
return res.content


def fetch_nico_video (
video_code: str,
comments: list[CommentDict],
) -> None:
client = NicoNico ()

watch_data = client.video.watch.get_watch_data (video_code)

outputs = client.video.watch.get_outputs (watch_data)
output_label = next (iter (outputs))

downloaded_path = client.video.watch.download_video (
watch_data, output_label, '%(title)s.%(ext)s')
os.rename (downloaded_path, './outputs/base.mp4')

ass = video.build_ass (cast (Any, comments), 640, 480)
with open ('./outputs/comments.ass', 'w', encoding = 'utf-8') as f:
f.write (ass)

subprocess.run (
['ffmpeg',
'-i', './outputs/base.mp4',
'-vf', 'ass=./outputs/comments.ass',
'-c:v', 'libx264',
'-crf', '18',
'-preset', 'veryfast',
'-c:a', 'copy',
'./outputs/output.mp4',
'-y'],
check = True)


def fetch_comments (
video_code: str,
) -> list[CommentDict]:
result = subprocess.run (
['python3', 'get_comments_by_video_code.py', video_code],
cwd = NIZIKA_NICO_DIR,
env = os.environ,
capture_output = True,
text = True)
return cast(list[CommentDict], json.loads (result.stdout))


def add_query (
broadcast: Broadcast,
) -> None:
@@ -1007,6 +1052,11 @@ def add_query (
DB.commit ()


class CommentDict (TypedDict):
content: str
vpos_ms: int


def log (
msg: str,
) -> None:


+ 0
- 0
outputs/.gitkeep View File


+ 96
- 0
video.py View File

@@ -0,0 +1,96 @@
from typing import Any

DEFAULT_W, DEFAULT_H = 1280, 720
FONT_NAME = 'Noto Sans CJK JP'
BASE_FONT = 42
SCROLL_DURATION = 4.
STATIC_DURATION = 3.
MARGIN_X = 20
LANE_PADDING = 6


def ass_time (
t: float,
) -> str:
if t < 0:
t = 0
cs = int (round (t * 100))
s, cs = divmod (cs, 100)
m, s = divmod (s, 60)
h, m = divmod (m, 60)
return f"{h}:{m:02d}:{s:02d}.{cs:02d}"

def escape_ass (
text: str,
) -> str:
text = text.replace ('\n', ' ').replace ('\r', ' ')
text = text.replace ('{', r'\{').replace ('}', r'\}')
text = text.replace ('\\', r'\\')
return text.strip ()

def approx_text_width_px (
text: str,
font_size: float,
) -> float:
return max (40., .62 * font_size * len (text))


def build_ass (
comments: list[dict[str, Any]],
w: int,
h: int,
) -> str:
header = f"""[Script Info]
ScriptType: v4.00+
PlayResX: {w}
PlayResY: {h}
ScaledBorderAndShadow: yes

[V4+ Styles]
Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding
Style: Danmaku,{FONT_NAME},{BASE_FONT},&H00FFFFFF,&H00000000,&H00000000,&H64000000,0,0,0,0,100,100,0,0,1,2,1,7,{MARGIN_X},{MARGIN_X},10,1

[Events]
Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text
"""
events: list[str] = []

lane_h = int (BASE_FONT + LANE_PADDING)
lane_count = max (6, (h - 80) // lane_h)
lane_next_free = [0.] * lane_count

top_rows = max (1, (h // 5) // lane_h)
bottom_rows = top_rows
top_next_free = [0.] * top_rows
bottom_next_free = [0.] * bottom_rows

def pick_lane (
next_free: list[float],
start: float,
) -> int:
for i, nf in enumerate (next_free):
if nf <= start:
return i
return min (range (len (next_free)), key = lambda i: next_free[i])

for c in comments:
text = escape_ass (c['content'])
if not text:
continue

start = c['vpos_ms']
end = c['vpos_ms'] + SCROLL_DURATION
lane = pick_lane (lane_next_free, start)
y = 40 + lane * lane_h

tw = approx_text_width_px (text, 1.)
x1 = w + MARGIN_X
x2 = -tw - MARGIN_X

lane_next_free[lane] = start + (SCROLL_DURATION * .65)

override = rf"{{\fs1\c&H00FFFFFF\move({int(x1)},{int(y)},{int(x2)},{int(y)})}}"
events.append (
f"Dialogue: 0,{ass_time(start)},{ass_time(end)},Danmaku,,0,0,0,,{override}{text}")

return header + '\n'.join (events) + '\n'

Loading…
Cancel
Save