DbNull の修正と型定義整備
このコミットが含まれているのは:
+121
-41
@@ -10,15 +10,20 @@ import random
|
||||
import string
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Any, TypedDict, cast
|
||||
from datetime import date, datetime, timedelta
|
||||
from typing import Any, Self, Type, TypedDict, cast
|
||||
|
||||
import mysql.connector
|
||||
import requests
|
||||
from mysql.connector.connection import MySQLConnectionAbstract
|
||||
|
||||
# TODO: “何もしなぃ” を意味する None と区別可能にするため,NULL クラスを別途用意すること
|
||||
DbNull = None
|
||||
DbNullType = type (None)
|
||||
|
||||
class DbNull:
|
||||
def __new__ (
|
||||
cls,
|
||||
):
|
||||
delattr (cls, '__init__')
|
||||
DbNullType = Type[DbNull]
|
||||
|
||||
|
||||
class VideoSearchParam (TypedDict):
|
||||
@@ -55,11 +60,58 @@ class CommentResult (TypedDict):
|
||||
isMyPost: bool
|
||||
|
||||
|
||||
class CommentRow (TypedDict):
|
||||
id: int
|
||||
video_id: int
|
||||
comment_no: int
|
||||
user_id: int
|
||||
content: str
|
||||
posted_at: datetime
|
||||
nico_count: int
|
||||
vpos_ms: int | None
|
||||
|
||||
|
||||
class TagRow (TypedDict):
|
||||
id: int
|
||||
name: str
|
||||
|
||||
|
||||
class UserRow (TypedDict):
|
||||
id: int
|
||||
code: str
|
||||
|
||||
|
||||
class VideoRow (TypedDict):
|
||||
id: int
|
||||
code: str
|
||||
title: str
|
||||
description: str
|
||||
uploaded_at: datetime
|
||||
deleted_at: datetime | None
|
||||
|
||||
|
||||
class VideoHistoryRow (TypedDict):
|
||||
id: int
|
||||
video_id: int
|
||||
fetched_at: date
|
||||
views_count: int
|
||||
|
||||
|
||||
class VideoTagRow (TypedDict):
|
||||
id: int
|
||||
video_id: int
|
||||
tag_id: int
|
||||
tagged_at: date
|
||||
untagged_at: date | None
|
||||
|
||||
|
||||
def main (
|
||||
) -> None:
|
||||
conn = mysql.connector.connect (user = os.environ['MYSQL_USER'],
|
||||
password = os.environ['MYSQL_PASS'],
|
||||
database = 'nizika_nico')
|
||||
if not isinstance (conn, MySQLConnectionAbstract):
|
||||
raise TypeError
|
||||
|
||||
now = datetime.now ()
|
||||
|
||||
@@ -248,7 +300,7 @@ def search_nico_by_tags (
|
||||
class VideoDao:
|
||||
def __init__ (
|
||||
self,
|
||||
conn
|
||||
conn: MySQLConnectionAbstract,
|
||||
):
|
||||
self.conn = conn
|
||||
|
||||
@@ -272,7 +324,7 @@ class VideoDao:
|
||||
id = %s
|
||||
ORDER BY
|
||||
id""", (video_id,))
|
||||
row = c.fetchone ()
|
||||
row = cast (VideoRow | None, c.fetchone ())
|
||||
if row is None:
|
||||
return None
|
||||
return self._create_dto_from_row (row, with_relation_tables)
|
||||
@@ -295,7 +347,7 @@ class VideoDao:
|
||||
ORDER BY
|
||||
id""")
|
||||
videos: list[VideoDto] = []
|
||||
for row in c.fetchall ():
|
||||
for row in cast (list[VideoRow], c.fetchall ()):
|
||||
videos.append (self._create_dto_from_row (row, with_relation_tables))
|
||||
return videos
|
||||
|
||||
@@ -316,7 +368,7 @@ class VideoDao:
|
||||
WHERE
|
||||
deleted_at IS NULL""")
|
||||
videos: list[VideoDto] = []
|
||||
for row in c.fetchall ():
|
||||
for row in cast (list[VideoRow], c.fetchall ()):
|
||||
videos.append (self._create_dto_from_row (row, False))
|
||||
return videos
|
||||
|
||||
@@ -325,6 +377,13 @@ class VideoDao:
|
||||
video: VideoDto,
|
||||
with_relation_tables: bool,
|
||||
) -> None:
|
||||
deleted_at: datetime | DbNullType | None = video.deleted_at
|
||||
if deleted_at is None:
|
||||
raise TypeError ('未実装')
|
||||
if deleted_at is DbNull:
|
||||
deleted_at = None
|
||||
deleted_at = cast (datetime | None, deleted_at)
|
||||
|
||||
with self.conn.cursor (dictionary = True) as c:
|
||||
c.execute ("""
|
||||
INSERT INTO
|
||||
@@ -350,7 +409,7 @@ class VideoDao:
|
||||
video.title,
|
||||
video.description,
|
||||
video.uploaded_at,
|
||||
video.deleted_at))
|
||||
deleted_at))
|
||||
video.id_ = c.lastrowid
|
||||
if with_relation_tables:
|
||||
if video.video_tags is not None:
|
||||
@@ -386,7 +445,7 @@ class VideoDao:
|
||||
|
||||
def _create_dto_from_row (
|
||||
self,
|
||||
row,
|
||||
row: VideoRow,
|
||||
with_relation_tables: bool,
|
||||
) -> VideoDto:
|
||||
video = VideoDto (id_ = row['id'],
|
||||
@@ -394,7 +453,7 @@ class VideoDao:
|
||||
title = row['title'],
|
||||
description = row['description'],
|
||||
uploaded_at = row['uploaded_at'],
|
||||
deleted_at = row['deleted_at'])
|
||||
deleted_at = row['deleted_at'] or DbNull)
|
||||
if with_relation_tables and video.id_ is not None:
|
||||
video.video_tags = VideoTagDao (self.conn).fetch_by_video_id (video.id_, False)
|
||||
for i in range (len (video.video_tags)):
|
||||
@@ -424,7 +483,7 @@ class VideoDto:
|
||||
class VideoTagDao:
|
||||
def __init__ (
|
||||
self,
|
||||
conn,
|
||||
conn: MySQLConnectionAbstract,
|
||||
):
|
||||
self.conn = conn
|
||||
|
||||
@@ -448,7 +507,7 @@ class VideoTagDao:
|
||||
ORDER BY
|
||||
id""", (video_id,))
|
||||
video_tags: list[VideoTagDto] = []
|
||||
for row in c.fetchall ():
|
||||
for row in cast (list[VideoTagRow], c.fetchall ()):
|
||||
video_tags.append (self._create_dto_from_row (row, with_relation_tables))
|
||||
return video_tags
|
||||
|
||||
@@ -473,7 +532,7 @@ class VideoTagDao:
|
||||
ORDER BY
|
||||
id""", (video_id,))
|
||||
video_tags: list[VideoTagDto] = []
|
||||
for row in c.fetchall ():
|
||||
for row in cast (list[VideoTagRow], c.fetchall ()):
|
||||
video_tags.append (self._create_dto_from_row (row, with_relation_tables))
|
||||
return video_tags
|
||||
|
||||
@@ -496,7 +555,7 @@ class VideoTagDao:
|
||||
WHERE
|
||||
video_id = %s
|
||||
AND tag_id = %s""", (video_id, tag_id))
|
||||
row = c.fetchone ()
|
||||
row = cast (VideoTagRow, c.fetchone ())
|
||||
if row is None:
|
||||
return None
|
||||
return self._create_dto_from_row (row, with_relation_tables)
|
||||
@@ -506,6 +565,13 @@ class VideoTagDao:
|
||||
video_tag: VideoTagDto,
|
||||
with_relation_tables: bool,
|
||||
) -> None:
|
||||
untagged_at: date | DbNullType | None = video_tag.untagged_at
|
||||
if untagged_at is None:
|
||||
raise TypeError ('未実装')
|
||||
if untagged_at is DbNull:
|
||||
untagged_at = None
|
||||
untagged_at = cast (date | None, untagged_at)
|
||||
|
||||
with self.conn.cursor (dictionary = True) as c:
|
||||
c.execute ("""
|
||||
INSERT INTO
|
||||
@@ -520,7 +586,7 @@ class VideoTagDao:
|
||||
%s,
|
||||
%s,
|
||||
%s)""", (video_tag.video_id, video_tag.tag_id,
|
||||
video_tag.tagged_at, video_tag.untagged_at))
|
||||
video_tag.tagged_at, untagged_at))
|
||||
video_tag.id_ = c.lastrowid
|
||||
if with_relation_tables:
|
||||
if video_tag.video is not None:
|
||||
@@ -533,6 +599,13 @@ class VideoTagDao:
|
||||
video_tag: VideoTagDto,
|
||||
with_relation_tables: bool,
|
||||
) -> None:
|
||||
untagged_at: date | DbNullType | None = video_tag.untagged_at
|
||||
if untagged_at is None:
|
||||
raise TypeError ('未実装')
|
||||
if untagged_at is DbNull:
|
||||
untagged_at = None
|
||||
untagged_at = cast (date | None, untagged_at)
|
||||
|
||||
with self.conn.cursor (dictionary = True) as c:
|
||||
c.execute ("""
|
||||
INSERT INTO
|
||||
@@ -554,7 +627,7 @@ class VideoTagDao:
|
||||
untagged_at = VALUES(untagged_at)""", (video_tag.video_id,
|
||||
video_tag.tag_id,
|
||||
video_tag.tagged_at,
|
||||
video_tag.untagged_at))
|
||||
untagged_at))
|
||||
video_tag.id_ = c.lastrowid
|
||||
if with_relation_tables:
|
||||
if video_tag.video is not None:
|
||||
@@ -590,14 +663,14 @@ class VideoTagDao:
|
||||
|
||||
def _create_dto_from_row (
|
||||
self,
|
||||
row,
|
||||
row: VideoTagRow,
|
||||
with_relation_tables: bool,
|
||||
) -> VideoTagDto:
|
||||
video_tag = VideoTagDto (id_ = row['id'],
|
||||
video_id = row['video_id'],
|
||||
tag_id = row['tag_id'],
|
||||
tagged_at = row['tagged_at'],
|
||||
untagged_at = row['untagged_at'])
|
||||
untagged_at = row['untagged_at'] or DbNull)
|
||||
if with_relation_tables:
|
||||
video_tag.video = VideoDao (self.conn).find (video_tag.video_id, True)
|
||||
video_tag.tag = TagDao (self.conn).find (video_tag.tag_id)
|
||||
@@ -608,17 +681,17 @@ class VideoTagDao:
|
||||
class VideoTagDto:
|
||||
video_id: int
|
||||
tag_id: int
|
||||
tagged_at: datetime
|
||||
id_: int | None = None
|
||||
untagged_at: datetime | DbNullType = DbNull
|
||||
video: VideoDto | None = None
|
||||
tag: TagDto | None = None
|
||||
tagged_at: date
|
||||
id_: int | None = None
|
||||
untagged_at: date | DbNullType = DbNull
|
||||
video: VideoDto | None = None
|
||||
tag: TagDto | None = None
|
||||
|
||||
|
||||
class TagDao:
|
||||
def __init__ (
|
||||
self,
|
||||
conn,
|
||||
conn: MySQLConnectionAbstract,
|
||||
):
|
||||
self.conn = conn
|
||||
|
||||
@@ -635,7 +708,7 @@ class TagDao:
|
||||
tags
|
||||
WHERE
|
||||
id = %s""", (tag_id,))
|
||||
row = c.fetchone ()
|
||||
row = cast (TagRow | None, c.fetchone ())
|
||||
if row is None:
|
||||
return None
|
||||
return self._create_dto_from_row (row)
|
||||
@@ -653,7 +726,7 @@ class TagDao:
|
||||
tags
|
||||
WHERE
|
||||
name = %s""", (tag_name,))
|
||||
row = c.fetchone ()
|
||||
row = cast (TagRow | None, c.fetchone ())
|
||||
if row is None:
|
||||
return None
|
||||
return self._create_dto_from_row (row)
|
||||
@@ -686,7 +759,7 @@ class TagDao:
|
||||
|
||||
def _create_dto_from_row (
|
||||
self,
|
||||
row,
|
||||
row: TagRow,
|
||||
) -> TagDto:
|
||||
return TagDto (id_ = row['id'],
|
||||
name = row['name'])
|
||||
@@ -701,7 +774,7 @@ class TagDto:
|
||||
class VideoHistoryDao:
|
||||
def __init__ (
|
||||
self,
|
||||
conn,
|
||||
conn: MySQLConnectionAbstract,
|
||||
):
|
||||
self.conn = conn
|
||||
|
||||
@@ -722,7 +795,7 @@ class VideoHistoryDao:
|
||||
WHERE
|
||||
video_id = %s""", (video_id,))
|
||||
video_histories: list[VideoHistoryDto] = []
|
||||
for row in c.fetchall ():
|
||||
for row in cast (list[VideoHistoryRow], c.fetchall ()):
|
||||
video_histories.append (self._create_dto_from_row (row, with_relation_tables))
|
||||
return video_histories
|
||||
|
||||
@@ -777,7 +850,7 @@ class VideoHistoryDao:
|
||||
|
||||
def _create_dto_from_row (
|
||||
self,
|
||||
row,
|
||||
row: VideoHistoryRow,
|
||||
with_relation_tables: bool,
|
||||
) -> VideoHistoryDto:
|
||||
video_history = VideoHistoryDto (id_ = row['id'],
|
||||
@@ -792,7 +865,7 @@ class VideoHistoryDao:
|
||||
@dataclass (slots = True)
|
||||
class VideoHistoryDto:
|
||||
video_id: int
|
||||
fetched_at: datetime
|
||||
fetched_at: date
|
||||
views_count: int
|
||||
id_: int | None = None
|
||||
video: VideoDto | None = None
|
||||
@@ -801,7 +874,7 @@ class VideoHistoryDto:
|
||||
class CommentDao:
|
||||
def __init__ (
|
||||
self,
|
||||
conn,
|
||||
conn: MySQLConnectionAbstract,
|
||||
):
|
||||
self.conn = conn
|
||||
|
||||
@@ -826,7 +899,7 @@ class CommentDao:
|
||||
WHERE
|
||||
video_id = %s""", (video_id,))
|
||||
comments: list[CommentDto] = []
|
||||
for row in c.fetchall ():
|
||||
for row in cast (list[CommentRow], c.fetchall ()):
|
||||
comments.append (self._create_dto_from_row (row, with_relation_tables))
|
||||
return comments
|
||||
|
||||
@@ -835,6 +908,13 @@ class CommentDao:
|
||||
comment: CommentDto,
|
||||
with_relation_tables: bool,
|
||||
) -> None:
|
||||
vpos_ms: int | DbNullType | None = comment.vpos_ms
|
||||
if vpos_ms is None:
|
||||
raise TypeError ('未実装')
|
||||
if vpos_ms is DbNull:
|
||||
vpos_ms = None
|
||||
vpos_ms = cast (int | None, vpos_ms)
|
||||
|
||||
with self.conn.cursor (dictionary = True) as c:
|
||||
c.execute ("""
|
||||
INSERT INTO
|
||||
@@ -868,7 +948,7 @@ class CommentDao:
|
||||
comment.content,
|
||||
comment.posted_at,
|
||||
comment.nico_count,
|
||||
comment.vpos_ms))
|
||||
vpos_ms))
|
||||
|
||||
def upsert_all (
|
||||
self,
|
||||
@@ -880,7 +960,7 @@ class CommentDao:
|
||||
|
||||
def _create_dto_from_row (
|
||||
self,
|
||||
row,
|
||||
row: CommentRow,
|
||||
with_relation_tables: bool,
|
||||
) -> CommentDto:
|
||||
comment = CommentDto (id_ = row['id'],
|
||||
@@ -890,7 +970,7 @@ class CommentDao:
|
||||
content = row['content'],
|
||||
posted_at = row['posted_at'],
|
||||
nico_count = row['nico_count'],
|
||||
vpos_ms = row['vpos_ms'])
|
||||
vpos_ms = row['vpos_ms'] or DbNull)
|
||||
if with_relation_tables:
|
||||
comment.video = VideoDao (self.conn).find (comment.video_id, True)
|
||||
return comment
|
||||
@@ -913,7 +993,7 @@ class CommentDto:
|
||||
class UserDao:
|
||||
def __init__ (
|
||||
self,
|
||||
conn,
|
||||
conn: MySQLConnectionAbstract,
|
||||
):
|
||||
self.conn = conn
|
||||
|
||||
@@ -930,7 +1010,7 @@ class UserDao:
|
||||
users
|
||||
WHERE
|
||||
code = %s""", (user_code,))
|
||||
row = c.fetchone ()
|
||||
row = cast (UserRow | None, c.fetchone ())
|
||||
if row is None:
|
||||
return None
|
||||
return self._create_dto_from_row (row)
|
||||
@@ -949,7 +1029,7 @@ class UserDao:
|
||||
|
||||
def _create_dto_from_row (
|
||||
self,
|
||||
row,
|
||||
row: UserRow,
|
||||
) -> UserDto:
|
||||
return UserDto (id_ = row['id'],
|
||||
code = row['code'])
|
||||
|
||||
新しい課題から参照
ユーザをブロックする