From d6bf49033b25e611c409cc1b3a2d4b592b7df701 Mon Sep 17 00:00:00 2001 From: miteruzo Date: Thu, 10 Oct 2024 00:26:56 +0900 Subject: [PATCH] =?UTF-8?q?DbNull=20=E3=81=AE=E4=BF=AE=E6=AD=A3=E3=81=A8?= =?UTF-8?q?=E5=9E=8B=E5=AE=9A=E7=BE=A9=E6=95=B4=E5=82=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- update_db.py | 162 ++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 121 insertions(+), 41 deletions(-) diff --git a/update_db.py b/update_db.py index 7434363..0fdcd74 100644 --- a/update_db.py +++ b/update_db.py @@ -10,15 +10,20 @@ import random import string import time from dataclasses import dataclass -from datetime import datetime, timedelta -from typing import Any, TypedDict, cast +from datetime import date, datetime, timedelta +from typing import Any, Self, Type, TypedDict, cast import mysql.connector import requests +from mysql.connector.connection import MySQLConnectionAbstract -# TODO: “何もしなぃ” を意味する None と区別可能にするため,NULL クラスを別途用意すること -DbNull = None -DbNullType = type (None) + +class DbNull: + def __new__ ( + cls, + ): + delattr (cls, '__init__') +DbNullType = Type[DbNull] class VideoSearchParam (TypedDict): @@ -55,11 +60,58 @@ class CommentResult (TypedDict): isMyPost: bool +class CommentRow (TypedDict): + id: int + video_id: int + comment_no: int + user_id: int + content: str + posted_at: datetime + nico_count: int + vpos_ms: int | None + + +class TagRow (TypedDict): + id: int + name: str + + +class UserRow (TypedDict): + id: int + code: str + + +class VideoRow (TypedDict): + id: int + code: str + title: str + description: str + uploaded_at: datetime + deleted_at: datetime | None + + +class VideoHistoryRow (TypedDict): + id: int + video_id: int + fetched_at: date + views_count: int + + +class VideoTagRow (TypedDict): + id: int + video_id: int + tag_id: int + tagged_at: date + untagged_at: date | None + + def main ( ) -> None: conn = mysql.connector.connect (user = os.environ['MYSQL_USER'], password = os.environ['MYSQL_PASS'], database = 'nizika_nico') + if not isinstance (conn, MySQLConnectionAbstract): + raise TypeError now = datetime.now () @@ -248,7 +300,7 @@ def search_nico_by_tags ( class VideoDao: def __init__ ( self, - conn + conn: MySQLConnectionAbstract, ): self.conn = conn @@ -272,7 +324,7 @@ class VideoDao: id = %s ORDER BY id""", (video_id,)) - row = c.fetchone () + row = cast (VideoRow | None, c.fetchone ()) if row is None: return None return self._create_dto_from_row (row, with_relation_tables) @@ -295,7 +347,7 @@ class VideoDao: ORDER BY id""") videos: list[VideoDto] = [] - for row in c.fetchall (): + for row in cast (list[VideoRow], c.fetchall ()): videos.append (self._create_dto_from_row (row, with_relation_tables)) return videos @@ -316,7 +368,7 @@ class VideoDao: WHERE deleted_at IS NULL""") videos: list[VideoDto] = [] - for row in c.fetchall (): + for row in cast (list[VideoRow], c.fetchall ()): videos.append (self._create_dto_from_row (row, False)) return videos @@ -325,6 +377,13 @@ class VideoDao: video: VideoDto, with_relation_tables: bool, ) -> None: + deleted_at: datetime | DbNullType | None = video.deleted_at + if deleted_at is None: + raise TypeError ('未実装') + if deleted_at is DbNull: + deleted_at = None + deleted_at = cast (datetime | None, deleted_at) + with self.conn.cursor (dictionary = True) as c: c.execute (""" INSERT INTO @@ -350,7 +409,7 @@ class VideoDao: video.title, video.description, video.uploaded_at, - video.deleted_at)) + deleted_at)) video.id_ = c.lastrowid if with_relation_tables: if video.video_tags is not None: @@ -386,7 +445,7 @@ class VideoDao: def _create_dto_from_row ( self, - row, + row: VideoRow, with_relation_tables: bool, ) -> VideoDto: video = VideoDto (id_ = row['id'], @@ -394,7 +453,7 @@ class VideoDao: title = row['title'], description = row['description'], uploaded_at = row['uploaded_at'], - deleted_at = row['deleted_at']) + deleted_at = row['deleted_at'] or DbNull) if with_relation_tables and video.id_ is not None: video.video_tags = VideoTagDao (self.conn).fetch_by_video_id (video.id_, False) for i in range (len (video.video_tags)): @@ -424,7 +483,7 @@ class VideoDto: class VideoTagDao: def __init__ ( self, - conn, + conn: MySQLConnectionAbstract, ): self.conn = conn @@ -448,7 +507,7 @@ class VideoTagDao: ORDER BY id""", (video_id,)) video_tags: list[VideoTagDto] = [] - for row in c.fetchall (): + for row in cast (list[VideoTagRow], c.fetchall ()): video_tags.append (self._create_dto_from_row (row, with_relation_tables)) return video_tags @@ -473,7 +532,7 @@ class VideoTagDao: ORDER BY id""", (video_id,)) video_tags: list[VideoTagDto] = [] - for row in c.fetchall (): + for row in cast (list[VideoTagRow], c.fetchall ()): video_tags.append (self._create_dto_from_row (row, with_relation_tables)) return video_tags @@ -496,7 +555,7 @@ class VideoTagDao: WHERE video_id = %s AND tag_id = %s""", (video_id, tag_id)) - row = c.fetchone () + row = cast (VideoTagRow, c.fetchone ()) if row is None: return None return self._create_dto_from_row (row, with_relation_tables) @@ -506,6 +565,13 @@ class VideoTagDao: video_tag: VideoTagDto, with_relation_tables: bool, ) -> None: + untagged_at: date | DbNullType | None = video_tag.untagged_at + if untagged_at is None: + raise TypeError ('未実装') + if untagged_at is DbNull: + untagged_at = None + untagged_at = cast (date | None, untagged_at) + with self.conn.cursor (dictionary = True) as c: c.execute (""" INSERT INTO @@ -520,7 +586,7 @@ class VideoTagDao: %s, %s, %s)""", (video_tag.video_id, video_tag.tag_id, - video_tag.tagged_at, video_tag.untagged_at)) + video_tag.tagged_at, untagged_at)) video_tag.id_ = c.lastrowid if with_relation_tables: if video_tag.video is not None: @@ -533,6 +599,13 @@ class VideoTagDao: video_tag: VideoTagDto, with_relation_tables: bool, ) -> None: + untagged_at: date | DbNullType | None = video_tag.untagged_at + if untagged_at is None: + raise TypeError ('未実装') + if untagged_at is DbNull: + untagged_at = None + untagged_at = cast (date | None, untagged_at) + with self.conn.cursor (dictionary = True) as c: c.execute (""" INSERT INTO @@ -554,7 +627,7 @@ class VideoTagDao: untagged_at = VALUES(untagged_at)""", (video_tag.video_id, video_tag.tag_id, video_tag.tagged_at, - video_tag.untagged_at)) + untagged_at)) video_tag.id_ = c.lastrowid if with_relation_tables: if video_tag.video is not None: @@ -590,14 +663,14 @@ class VideoTagDao: def _create_dto_from_row ( self, - row, + row: VideoTagRow, with_relation_tables: bool, ) -> VideoTagDto: video_tag = VideoTagDto (id_ = row['id'], video_id = row['video_id'], tag_id = row['tag_id'], tagged_at = row['tagged_at'], - untagged_at = row['untagged_at']) + untagged_at = row['untagged_at'] or DbNull) if with_relation_tables: video_tag.video = VideoDao (self.conn).find (video_tag.video_id, True) video_tag.tag = TagDao (self.conn).find (video_tag.tag_id) @@ -608,17 +681,17 @@ class VideoTagDao: class VideoTagDto: video_id: int tag_id: int - tagged_at: datetime - id_: int | None = None - untagged_at: datetime | DbNullType = DbNull - video: VideoDto | None = None - tag: TagDto | None = None + tagged_at: date + id_: int | None = None + untagged_at: date | DbNullType = DbNull + video: VideoDto | None = None + tag: TagDto | None = None class TagDao: def __init__ ( self, - conn, + conn: MySQLConnectionAbstract, ): self.conn = conn @@ -635,7 +708,7 @@ class TagDao: tags WHERE id = %s""", (tag_id,)) - row = c.fetchone () + row = cast (TagRow | None, c.fetchone ()) if row is None: return None return self._create_dto_from_row (row) @@ -653,7 +726,7 @@ class TagDao: tags WHERE name = %s""", (tag_name,)) - row = c.fetchone () + row = cast (TagRow | None, c.fetchone ()) if row is None: return None return self._create_dto_from_row (row) @@ -686,7 +759,7 @@ class TagDao: def _create_dto_from_row ( self, - row, + row: TagRow, ) -> TagDto: return TagDto (id_ = row['id'], name = row['name']) @@ -701,7 +774,7 @@ class TagDto: class VideoHistoryDao: def __init__ ( self, - conn, + conn: MySQLConnectionAbstract, ): self.conn = conn @@ -722,7 +795,7 @@ class VideoHistoryDao: WHERE video_id = %s""", (video_id,)) video_histories: list[VideoHistoryDto] = [] - for row in c.fetchall (): + for row in cast (list[VideoHistoryRow], c.fetchall ()): video_histories.append (self._create_dto_from_row (row, with_relation_tables)) return video_histories @@ -777,7 +850,7 @@ class VideoHistoryDao: def _create_dto_from_row ( self, - row, + row: VideoHistoryRow, with_relation_tables: bool, ) -> VideoHistoryDto: video_history = VideoHistoryDto (id_ = row['id'], @@ -792,7 +865,7 @@ class VideoHistoryDao: @dataclass (slots = True) class VideoHistoryDto: video_id: int - fetched_at: datetime + fetched_at: date views_count: int id_: int | None = None video: VideoDto | None = None @@ -801,7 +874,7 @@ class VideoHistoryDto: class CommentDao: def __init__ ( self, - conn, + conn: MySQLConnectionAbstract, ): self.conn = conn @@ -826,7 +899,7 @@ class CommentDao: WHERE video_id = %s""", (video_id,)) comments: list[CommentDto] = [] - for row in c.fetchall (): + for row in cast (list[CommentRow], c.fetchall ()): comments.append (self._create_dto_from_row (row, with_relation_tables)) return comments @@ -835,6 +908,13 @@ class CommentDao: comment: CommentDto, with_relation_tables: bool, ) -> None: + vpos_ms: int | DbNullType | None = comment.vpos_ms + if vpos_ms is None: + raise TypeError ('未実装') + if vpos_ms is DbNull: + vpos_ms = None + vpos_ms = cast (int | None, vpos_ms) + with self.conn.cursor (dictionary = True) as c: c.execute (""" INSERT INTO @@ -868,7 +948,7 @@ class CommentDao: comment.content, comment.posted_at, comment.nico_count, - comment.vpos_ms)) + vpos_ms)) def upsert_all ( self, @@ -880,7 +960,7 @@ class CommentDao: def _create_dto_from_row ( self, - row, + row: CommentRow, with_relation_tables: bool, ) -> CommentDto: comment = CommentDto (id_ = row['id'], @@ -890,7 +970,7 @@ class CommentDao: content = row['content'], posted_at = row['posted_at'], nico_count = row['nico_count'], - vpos_ms = row['vpos_ms']) + vpos_ms = row['vpos_ms'] or DbNull) if with_relation_tables: comment.video = VideoDao (self.conn).find (comment.video_id, True) return comment @@ -913,7 +993,7 @@ class CommentDto: class UserDao: def __init__ ( self, - conn, + conn: MySQLConnectionAbstract, ): self.conn = conn @@ -930,7 +1010,7 @@ class UserDao: users WHERE code = %s""", (user_code,)) - row = c.fetchone () + row = cast (UserRow | None, c.fetchone ()) if row is None: return None return self._create_dto_from_row (row) @@ -949,7 +1029,7 @@ class UserDao: def _create_dto_from_row ( self, - row, + row: UserRow, ) -> UserDto: return UserDto (id_ = row['id'], code = row['code'])