From b88f4b0ee107af50acb4e252a7416bb836161a41 Mon Sep 17 00:00:00 2001 From: miteruzo Date: Sun, 4 Jan 2026 03:27:25 +0900 Subject: [PATCH] #42 --- .gitignore | 3 ++ main.py | 64 ++++++++++++++++++++++++++++---- outputs/.gitkeep | 0 video.py | 96 ++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 155 insertions(+), 8 deletions(-) create mode 100644 outputs/.gitkeep create mode 100644 video.py diff --git a/.gitignore b/.gitignore index daa5fc1..c1046c3 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,6 @@ __pycache__ /nizika_talking.wav /youtube.py /log.txt +/outputs/** +!/outputs/**/ +!/outputs/**/.gitkeep diff --git a/main.py b/main.py index 62fb1ac..7f9c319 100644 --- a/main.py +++ b/main.py @@ -11,7 +11,7 @@ import wave from datetime import datetime, timedelta from enum import Enum, auto from io import BytesIO -from typing import Callable, TypedDict, cast +from typing import Any, Callable, TypedDict, cast import cv2 import emoji @@ -30,8 +30,10 @@ from pygame.time import Clock from pytchat.core.pytchat import PytchatCore from pytchat.processors.default.processor import Chat +import video from aques import Aques from common_module import CommonModule +from niconico import NicoNico from nizika_ai.config import DB from nizika_ai.consts import Character, GPTModel, Platform, QueryType from nizika_ai.models import Answer, AnsweredFlag, Query, User @@ -59,6 +61,8 @@ def main ( y = CWindow.HEIGHT - 56.25, balloon = balloon) snack_time = SnackTime (game) + nico_video = NicoVideo (game) + nico_video_layer = nico_video.layer CurrentTime (game, DEERJIKA_FONT) broadcast: Broadcast | None = None @@ -79,13 +83,14 @@ def main ( pygame.quit () sys.exit () - if (not balloon.enabled) and (not snack_time.enabled): + if ((not balloon.enabled) + and (not snack_time.enabled) + and (not nico_video.enabled)): if waiting_balloon[0]: deerjika.talk (waiting_balloon[1], waiting_balloon[2]) if waiting_balloon[2] == WATCHING_MSG: - ... - else: - waiting_balloon = (False, '', '') + nico_video.play () + waiting_balloon = (False, '', '') if now_m - last_flags_poll >= 1: last_flags_poll = now_m @@ -117,6 +122,8 @@ def main ( query = Query.find (answer.query_id) deerjika.talk (None, answer.content) waiting_balloon = (True, None, WATCHING_MSG) + nico_video = NicoVideo (game, query.transfer_data.get ('video_code')) + nico_video.layer = nico_video_layer answer_flag.answered = True answer_flag.save () case _: @@ -975,10 +982,19 @@ class NicoVideo (Video): def __init__ ( self, game: Game, - video_code: str, + video_code: str | None = None, ): - comments = fetch_comments (video_code) - #super ().__init__ (game, ) + if video_code is None: + super ().__init__ (game, './assets/snack_time.mp4') + else: + try: + comments = fetch_comments (video_code) + fetch_nico_video (video_code, comments) + super ().__init__ (game, './outputs/output.mp4') + except Exception: + super ().__init__ (game, './assets/snack_time.mp4') + (self.width, self.height) = (CWindow.HEIGHT * 16 // 9, CWindow.HEIGHT) + (self.x, self.y) = ((CWindow.WIDTH - self.width) / 2, 0) class SnackTime (Video): @@ -1000,6 +1016,37 @@ def fetch_bytes_from_url ( return res.content +def fetch_nico_video ( + video_code: str, + comments: list[CommentDict], +) -> None: + client = NicoNico () + + watch_data = client.video.watch.get_watch_data (video_code) + + outputs = client.video.watch.get_outputs (watch_data) + output_label = next (iter (outputs)) + + downloaded_path = client.video.watch.download_video ( + watch_data, output_label, '%(title)s.%(ext)s') + os.rename (downloaded_path, './outputs/base.mp4') + + ass = video.build_ass (cast (Any, comments), 640, 480) + with open ('./outputs/comments.ass', 'w', encoding = 'utf-8') as f: + f.write (ass) + + subprocess.run ( + ['ffmpeg', + '-i', './outputs/base.mp4', + '-vf', 'ass=./outputs/comments.ass', + '-c:v', 'libx264', + '-crf', '18', + '-preset', 'veryfast', + '-c:a', 'copy', + './outputs/output.mp4'], + check = True) + + def fetch_comments ( video_code: str, ) -> list[CommentDict]: @@ -1011,6 +1058,7 @@ def fetch_comments ( text = True) return cast(list[CommentDict], json.loads (result.stdout)) + def add_query ( broadcast: Broadcast, ) -> None: diff --git a/outputs/.gitkeep b/outputs/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/video.py b/video.py new file mode 100644 index 0000000..b16c07e --- /dev/null +++ b/video.py @@ -0,0 +1,96 @@ +from typing import Any + +DEFAULT_W, DEFAULT_H = 1280, 720 +FONT_NAME = 'Noto Sans CJK JP' +BASE_FONT = 42 +SCROLL_DURATION = 4. +STATIC_DURATION = 3. +MARGIN_X = 20 +LANE_PADDING = 6 + + +def ass_time ( + t: float, +) -> str: + if t < 0: + t = 0 + cs = int (round (t * 100)) + s, cs = divmod (cs, 100) + m, s = divmod (s, 60) + h, m = divmod (m, 60) + return f"{h}:{m:02d}:{s:02d}.{cs:02d}" + +def escape_ass ( + text: str, +) -> str: + text = text.replace ('\n', ' ').replace ('\r', ' ') + text = text.replace ('{', r'\{').replace ('}', r'\}') + text = text.replace ('\\', r'\\') + return text.strip () + +def approx_text_width_px ( + text: str, + font_size: float, +) -> float: + return max (40., .62 * font_size * len (text)) + + +def build_ass ( + comments: list[dict[str, Any]], + w: int, + h: int, +) -> str: + header = f"""[Script Info] +ScriptType: v4.00+ +PlayResX: {w} +PlayResY: {h} +ScaledBorderAndShadow: yes + +[V4+ Styles] +Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding +Style: Danmaku,{FONT_NAME},{BASE_FONT},&H00FFFFFF,&H00000000,&H00000000,&H64000000,0,0,0,0,100,100,0,0,1,2,1,7,{MARGIN_X},{MARGIN_X},10,1 + +[Events] +Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text +""" + events: list[str] = [] + + lane_h = int (BASE_FONT + LANE_PADDING) + lane_count = max (6, (h - 80) // lane_h) + lane_next_free = [0.] * lane_count + + top_rows = max (1, (h // 5) // lane_h) + bottom_rows = top_rows + top_next_free = [0.] * top_rows + bottom_next_free = [0.] * bottom_rows + + def pick_lane ( + next_free: list[float], + start: float, + ) -> int: + for i, nf in enumerate (next_free): + if nf <= start: + return i + return min (range (len (next_free)), key = lambda i: next_free[i]) + + for c in comments: + text = escape_ass (c['content']) + if not text: + continue + + start = c['vpos_ms'] + end = c['vpos_ms'] + SCROLL_DURATION + lane = pick_lane (lane_next_free, start) + y = 40 + lane * lane_h + + tw = approx_text_width_px (text, 1.) + x1 = w + MARGIN_X + x2 = -tw - MARGIN_X + + lane_next_free[lane] = start + (SCROLL_DURATION * .65) + + override = rf"{{\fs1\c&H00FFFFFF\move({int(x1)},{int(y)},{int(x2)},{int(y)})}}" + events.append ( + f"Dialogue: 0,{ass_time(start)},{ass_time(end)},Danmaku,,0,0,0,,{override}{text}") + + return header + '\n'.join (events) + '\n'