このコミットが含まれているのは:
2026-01-04 03:27:25 +09:00
コミット b88f4b0ee1
4個のファイルの変更155行の追加8行の削除
+3
ファイルの表示
@@ -3,3 +3,6 @@ __pycache__
/nizika_talking.wav
/youtube.py
/log.txt
/outputs/**
!/outputs/**/
!/outputs/**/.gitkeep
+56 -8
ファイルの表示
@@ -11,7 +11,7 @@ import wave
from datetime import datetime, timedelta
from enum import Enum, auto
from io import BytesIO
from typing import Callable, TypedDict, cast
from typing import Any, Callable, TypedDict, cast
import cv2
import emoji
@@ -30,8 +30,10 @@ from pygame.time import Clock
from pytchat.core.pytchat import PytchatCore
from pytchat.processors.default.processor import Chat
import video
from aques import Aques
from common_module import CommonModule
from niconico import NicoNico
from nizika_ai.config import DB
from nizika_ai.consts import Character, GPTModel, Platform, QueryType
from nizika_ai.models import Answer, AnsweredFlag, Query, User
@@ -59,6 +61,8 @@ def main (
y = CWindow.HEIGHT - 56.25,
balloon = balloon)
snack_time = SnackTime (game)
nico_video = NicoVideo (game)
nico_video_layer = nico_video.layer
CurrentTime (game, DEERJIKA_FONT)
broadcast: Broadcast | None = None
@@ -79,13 +83,14 @@ def main (
pygame.quit ()
sys.exit ()
if (not balloon.enabled) and (not snack_time.enabled):
if ((not balloon.enabled)
and (not snack_time.enabled)
and (not nico_video.enabled)):
if waiting_balloon[0]:
deerjika.talk (waiting_balloon[1], waiting_balloon[2])
if waiting_balloon[2] == WATCHING_MSG:
...
else:
waiting_balloon = (False, '', '')
nico_video.play ()
waiting_balloon = (False, '', '')
if now_m - last_flags_poll >= 1:
last_flags_poll = now_m
@@ -117,6 +122,8 @@ def main (
query = Query.find (answer.query_id)
deerjika.talk (None, answer.content)
waiting_balloon = (True, None, WATCHING_MSG)
nico_video = NicoVideo (game, query.transfer_data.get ('video_code'))
nico_video.layer = nico_video_layer
answer_flag.answered = True
answer_flag.save ()
case _:
@@ -975,10 +982,19 @@ class NicoVideo (Video):
def __init__ (
self,
game: Game,
video_code: str,
video_code: str | None = None,
):
comments = fetch_comments (video_code)
#super ().__init__ (game, )
if video_code is None:
super ().__init__ (game, './assets/snack_time.mp4')
else:
try:
comments = fetch_comments (video_code)
fetch_nico_video (video_code, comments)
super ().__init__ (game, './outputs/output.mp4')
except Exception:
super ().__init__ (game, './assets/snack_time.mp4')
(self.width, self.height) = (CWindow.HEIGHT * 16 // 9, CWindow.HEIGHT)
(self.x, self.y) = ((CWindow.WIDTH - self.width) / 2, 0)
class SnackTime (Video):
@@ -1000,6 +1016,37 @@ def fetch_bytes_from_url (
return res.content
def fetch_nico_video (
video_code: str,
comments: list[CommentDict],
) -> None:
client = NicoNico ()
watch_data = client.video.watch.get_watch_data (video_code)
outputs = client.video.watch.get_outputs (watch_data)
output_label = next (iter (outputs))
downloaded_path = client.video.watch.download_video (
watch_data, output_label, '%(title)s.%(ext)s')
os.rename (downloaded_path, './outputs/base.mp4')
ass = video.build_ass (cast (Any, comments), 640, 480)
with open ('./outputs/comments.ass', 'w', encoding = 'utf-8') as f:
f.write (ass)
subprocess.run (
['ffmpeg',
'-i', './outputs/base.mp4',
'-vf', 'ass=./outputs/comments.ass',
'-c:v', 'libx264',
'-crf', '18',
'-preset', 'veryfast',
'-c:a', 'copy',
'./outputs/output.mp4'],
check = True)
def fetch_comments (
video_code: str,
) -> list[CommentDict]:
@@ -1011,6 +1058,7 @@ def fetch_comments (
text = True)
return cast(list[CommentDict], json.loads (result.stdout))
def add_query (
broadcast: Broadcast,
) -> None:
ファイルの表示
+96
ファイルの表示
@@ -0,0 +1,96 @@
from typing import Any
DEFAULT_W, DEFAULT_H = 1280, 720
FONT_NAME = 'Noto Sans CJK JP'
BASE_FONT = 42
SCROLL_DURATION = 4.
STATIC_DURATION = 3.
MARGIN_X = 20
LANE_PADDING = 6
def ass_time (
t: float,
) -> str:
if t < 0:
t = 0
cs = int (round (t * 100))
s, cs = divmod (cs, 100)
m, s = divmod (s, 60)
h, m = divmod (m, 60)
return f"{h}:{m:02d}:{s:02d}.{cs:02d}"
def escape_ass (
text: str,
) -> str:
text = text.replace ('\n', ' ').replace ('\r', ' ')
text = text.replace ('{', r'\{').replace ('}', r'\}')
text = text.replace ('\\', r'\\')
return text.strip ()
def approx_text_width_px (
text: str,
font_size: float,
) -> float:
return max (40., .62 * font_size * len (text))
def build_ass (
comments: list[dict[str, Any]],
w: int,
h: int,
) -> str:
header = f"""[Script Info]
ScriptType: v4.00+
PlayResX: {w}
PlayResY: {h}
ScaledBorderAndShadow: yes
[V4+ Styles]
Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding
Style: Danmaku,{FONT_NAME},{BASE_FONT},&H00FFFFFF,&H00000000,&H00000000,&H64000000,0,0,0,0,100,100,0,0,1,2,1,7,{MARGIN_X},{MARGIN_X},10,1
[Events]
Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text
"""
events: list[str] = []
lane_h = int (BASE_FONT + LANE_PADDING)
lane_count = max (6, (h - 80) // lane_h)
lane_next_free = [0.] * lane_count
top_rows = max (1, (h // 5) // lane_h)
bottom_rows = top_rows
top_next_free = [0.] * top_rows
bottom_next_free = [0.] * bottom_rows
def pick_lane (
next_free: list[float],
start: float,
) -> int:
for i, nf in enumerate (next_free):
if nf <= start:
return i
return min (range (len (next_free)), key = lambda i: next_free[i])
for c in comments:
text = escape_ass (c['content'])
if not text:
continue
start = c['vpos_ms']
end = c['vpos_ms'] + SCROLL_DURATION
lane = pick_lane (lane_next_free, start)
y = 40 + lane * lane_h
tw = approx_text_width_px (text, 1.)
x1 = w + MARGIN_X
x2 = -tw - MARGIN_X
lane_next_free[lane] = start + (SCROLL_DURATION * .65)
override = rf"{{\fs1\c&H00FFFFFF\move({int(x1)},{int(y)},{int(x2)},{int(y)})}}"
events.append (
f"Dialogue: 0,{ass_time(start)},{ass_time(end)},Danmaku,,0,0,0,,{override}{text}")
return header + '\n'.join (events) + '\n'