2 コミット

作成者 SHA1 メッセージ 日付
みてるぞ 28471cb318 SQL ログ追加 2024-10-10 02:15:12 +09:00
みてるぞ d6bf49033b DbNull の修正と型定義整備 2024-10-10 00:26:56 +09:00
+163 -81
ファイルの表示
@@ -10,15 +10,20 @@ import random
import string
import time
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any, TypedDict, cast
from datetime import date, datetime, timedelta
from typing import Any, Self, Type, TypedDict, cast
import mysql.connector
import requests
from mysql.connector.connection import MySQLConnectionAbstract
# TODO: “何もしなぃ” を意味する None と区別可能にするため,NULL クラスを別途用意すること
DbNull = None
DbNullType = type (None)
class DbNull:
def __new__ (
cls,
):
delattr (cls, '__init__')
DbNullType = Type[DbNull]
class VideoSearchParam (TypedDict):
@@ -55,11 +60,58 @@ class CommentResult (TypedDict):
isMyPost: bool
class CommentRow (TypedDict):
id: int
video_id: int
comment_no: int
user_id: int
content: str
posted_at: datetime
nico_count: int
vpos_ms: int | None
class TagRow (TypedDict):
id: int
name: str
class UserRow (TypedDict):
id: int
code: str
class VideoRow (TypedDict):
id: int
code: str
title: str
description: str
uploaded_at: datetime
deleted_at: datetime | None
class VideoHistoryRow (TypedDict):
id: int
video_id: int
fetched_at: date
views_count: int
class VideoTagRow (TypedDict):
id: int
video_id: int
tag_id: int
tagged_at: date
untagged_at: date | None
def main (
) -> None:
conn = mysql.connector.connect (user = os.environ['MYSQL_USER'],
password = os.environ['MYSQL_PASS'],
database = 'nizika_nico')
if not isinstance (conn, MySQLConnectionAbstract):
raise TypeError
now = datetime.now ()
@@ -76,6 +128,8 @@ def main (
api_data, now)
conn.commit ()
print ('Committed.')
conn.close ()
@@ -248,7 +302,7 @@ def search_nico_by_tags (
class VideoDao:
def __init__ (
self,
conn
conn: MySQLConnectionAbstract,
):
self.conn = conn
@@ -258,7 +312,7 @@ class VideoDao:
with_relation_tables: bool,
) -> VideoDto | None:
with self.conn.cursor (dictionary = True) as c:
c.execute ("""
print (c.execute ("""
SELECT
id,
code,
@@ -271,8 +325,8 @@ class VideoDao:
WHERE
id = %s
ORDER BY
id""", (video_id,))
row = c.fetchone ()
id""", (video_id,)))
row = cast (VideoRow | None, c.fetchone ())
if row is None:
return None
return self._create_dto_from_row (row, with_relation_tables)
@@ -282,7 +336,7 @@ class VideoDao:
with_relation_tables: bool,
) -> list[VideoDto]:
with self.conn.cursor (dictionary = True) as c:
c.execute ("""
print (c.execute ("""
SELECT
id,
code,
@@ -293,9 +347,9 @@ class VideoDao:
FROM
videos
ORDER BY
id""")
id"""))
videos: list[VideoDto] = []
for row in c.fetchall ():
for row in cast (list[VideoRow], c.fetchall ()):
videos.append (self._create_dto_from_row (row, with_relation_tables))
return videos
@@ -303,7 +357,7 @@ class VideoDao:
self,
) -> list[VideoDto]:
with self.conn.cursor (dictionary = True) as c:
c.execute ("""
print (c.execute ("""
SELECT
id,
code,
@@ -314,9 +368,9 @@ class VideoDao:
FROM
videos
WHERE
deleted_at IS NULL""")
deleted_at IS NULL"""))
videos: list[VideoDto] = []
for row in c.fetchall ():
for row in cast (list[VideoRow], c.fetchall ()):
videos.append (self._create_dto_from_row (row, False))
return videos
@@ -325,8 +379,15 @@ class VideoDao:
video: VideoDto,
with_relation_tables: bool,
) -> None:
deleted_at: datetime | DbNullType | None = video.deleted_at
if deleted_at is None:
raise TypeError ('未実装')
if deleted_at is DbNull:
deleted_at = None
deleted_at = cast (datetime | None, deleted_at)
with self.conn.cursor (dictionary = True) as c:
c.execute ("""
print (c.execute ("""
INSERT INTO
videos(
code,
@@ -350,7 +411,7 @@ class VideoDao:
video.title,
video.description,
video.uploaded_at,
video.deleted_at))
deleted_at)))
video.id_ = c.lastrowid
if with_relation_tables:
if video.video_tags is not None:
@@ -376,17 +437,17 @@ class VideoDao:
if not video_ids:
return
with self.conn.cursor (dictionary = True) as c:
c.execute ("""
print (c.execute ("""
UPDATE
videos
SET
deleted_at = %%s
WHERE
id IN (%s)""" % ', '.join (['%s'] * len (video_ids)), (at, *video_ids))
id IN (%s)""" % ', '.join (['%s'] * len (video_ids)), (at, *video_ids)))
def _create_dto_from_row (
self,
row,
row: VideoRow,
with_relation_tables: bool,
) -> VideoDto:
video = VideoDto (id_ = row['id'],
@@ -394,7 +455,7 @@ class VideoDao:
title = row['title'],
description = row['description'],
uploaded_at = row['uploaded_at'],
deleted_at = row['deleted_at'])
deleted_at = row['deleted_at'] or DbNull)
if with_relation_tables and video.id_ is not None:
video.video_tags = VideoTagDao (self.conn).fetch_by_video_id (video.id_, False)
for i in range (len (video.video_tags)):
@@ -424,7 +485,7 @@ class VideoDto:
class VideoTagDao:
def __init__ (
self,
conn,
conn: MySQLConnectionAbstract,
):
self.conn = conn
@@ -434,7 +495,7 @@ class VideoTagDao:
with_relation_tables: bool,
) -> list[VideoTagDto]:
with self.conn.cursor (dictionary = True) as c:
c.execute ("""
print (c.execute ("""
SELECT
id,
video_id,
@@ -446,9 +507,9 @@ class VideoTagDao:
WHERE
video_id = %s
ORDER BY
id""", (video_id,))
id""", (video_id,)))
video_tags: list[VideoTagDto] = []
for row in c.fetchall ():
for row in cast (list[VideoTagRow], c.fetchall ()):
video_tags.append (self._create_dto_from_row (row, with_relation_tables))
return video_tags
@@ -458,7 +519,7 @@ class VideoTagDao:
with_relation_tables: bool,
) -> list[VideoTagDto]:
with self.conn.cursor (dictionary = True) as c:
c.execute ("""
print (c.execute ("""
SELECT
id,
video_id,
@@ -471,9 +532,9 @@ class VideoTagDao:
video_id = %s
AND (untagged_at IS NULL)
ORDER BY
id""", (video_id,))
id""", (video_id,)))
video_tags: list[VideoTagDto] = []
for row in c.fetchall ():
for row in cast (list[VideoTagRow], c.fetchall ()):
video_tags.append (self._create_dto_from_row (row, with_relation_tables))
return video_tags
@@ -484,7 +545,7 @@ class VideoTagDao:
with_relation_tables: bool,
) -> VideoTagDto | None:
with self.conn.cursor (dictionary = True) as c:
c.execute ("""
print (c.execute ("""
SELECT
id,
video_id,
@@ -495,8 +556,8 @@ class VideoTagDao:
video_tags
WHERE
video_id = %s
AND tag_id = %s""", (video_id, tag_id))
row = c.fetchone ()
AND tag_id = %s""", (video_id, tag_id)))
row = cast (VideoTagRow, c.fetchone ())
if row is None:
return None
return self._create_dto_from_row (row, with_relation_tables)
@@ -506,8 +567,15 @@ class VideoTagDao:
video_tag: VideoTagDto,
with_relation_tables: bool,
) -> None:
untagged_at: date | DbNullType | None = video_tag.untagged_at
if untagged_at is None:
raise TypeError ('未実装')
if untagged_at is DbNull:
untagged_at = None
untagged_at = cast (date | None, untagged_at)
with self.conn.cursor (dictionary = True) as c:
c.execute ("""
print (c.execute ("""
INSERT INTO
video_tags(
video_id,
@@ -520,7 +588,7 @@ class VideoTagDao:
%s,
%s,
%s)""", (video_tag.video_id, video_tag.tag_id,
video_tag.tagged_at, video_tag.untagged_at))
video_tag.tagged_at, untagged_at)))
video_tag.id_ = c.lastrowid
if with_relation_tables:
if video_tag.video is not None:
@@ -533,8 +601,15 @@ class VideoTagDao:
video_tag: VideoTagDto,
with_relation_tables: bool,
) -> None:
untagged_at: date | DbNullType | None = video_tag.untagged_at
if untagged_at is None:
raise TypeError ('未実装')
if untagged_at is DbNull:
untagged_at = None
untagged_at = cast (date | None, untagged_at)
with self.conn.cursor (dictionary = True) as c:
c.execute ("""
print (c.execute ("""
INSERT INTO
video_tags(
video_id,
@@ -554,7 +629,7 @@ class VideoTagDao:
untagged_at = VALUES(untagged_at)""", (video_tag.video_id,
video_tag.tag_id,
video_tag.tagged_at,
video_tag.untagged_at))
untagged_at)))
video_tag.id_ = c.lastrowid
if with_relation_tables:
if video_tag.video is not None:
@@ -579,25 +654,25 @@ class VideoTagDao:
if not tag_ids:
return
with self.conn.cursor (dictionary = True) as c:
c.execute ("""
print (c.execute ("""
UPDATE
video_tags
SET
untagged_at = %%s
WHERE
video_id = %%s
AND tag_ids IN (%s)""" % ', '.join (['%s'] * len (tag_ids)), (now, video_id, *tag_ids))
AND tag_ids IN (%s)""" % ', '.join (['%s'] * len (tag_ids)), (now, video_id, *tag_ids)))
def _create_dto_from_row (
self,
row,
row: VideoTagRow,
with_relation_tables: bool,
) -> VideoTagDto:
video_tag = VideoTagDto (id_ = row['id'],
video_id = row['video_id'],
tag_id = row['tag_id'],
tagged_at = row['tagged_at'],
untagged_at = row['untagged_at'])
untagged_at = row['untagged_at'] or DbNull)
if with_relation_tables:
video_tag.video = VideoDao (self.conn).find (video_tag.video_id, True)
video_tag.tag = TagDao (self.conn).find (video_tag.tag_id)
@@ -608,17 +683,17 @@ class VideoTagDao:
class VideoTagDto:
video_id: int
tag_id: int
tagged_at: datetime
id_: int | None = None
untagged_at: datetime | DbNullType = DbNull
video: VideoDto | None = None
tag: TagDto | None = None
tagged_at: date
id_: int | None = None
untagged_at: date | DbNullType = DbNull
video: VideoDto | None = None
tag: TagDto | None = None
class TagDao:
def __init__ (
self,
conn,
conn: MySQLConnectionAbstract,
):
self.conn = conn
@@ -627,15 +702,15 @@ class TagDao:
tag_id: int,
) -> TagDto | None:
with self.conn.cursor (dictionary = True) as c:
c.execute ("""
print (c.execute ("""
SELECT
id,
name
FROM
tags
WHERE
id = %s""", (tag_id,))
row = c.fetchone ()
id = %s""", (tag_id,)))
row = cast (TagRow | None, c.fetchone ())
if row is None:
return None
return self._create_dto_from_row (row)
@@ -645,15 +720,15 @@ class TagDao:
tag_name: str,
) -> TagDto | None:
with self.conn.cursor (dictionary = True) as c:
c.execute ("""
print (c.execute ("""
SELECT
id,
name
FROM
tags
WHERE
name = %s""", (tag_name,))
row = c.fetchone ()
name = %s""", (tag_name,)))
row = cast (TagRow | None, c.fetchone ())
if row is None:
return None
return self._create_dto_from_row (row)
@@ -663,11 +738,11 @@ class TagDao:
tag: TagDto,
) -> None:
with self.conn.cursor (dictionary = True) as c:
c.execute ("""
print (c.execute ("""
INSERT INTO
tags(name)
VALUES
(%s)""", (tag.name,))
(%s)""", (tag.name,)))
tag.id_ = c.lastrowid
def upsert (
@@ -675,18 +750,18 @@ class TagDao:
tag: TagDto,
) -> None:
with self.conn.cursor (dictionary = True) as c:
c.execute ("""
print (c.execute ("""
INSERT INTO
tags(name)
VALUES
(%s)
ON DUPLICATE KEY UPDATE
name = VALUES(name)""", (tag.name,))
name = VALUES(name)""", (tag.name,)))
tag.id_ = c.lastrowid
def _create_dto_from_row (
self,
row,
row: TagRow,
) -> TagDto:
return TagDto (id_ = row['id'],
name = row['name'])
@@ -701,7 +776,7 @@ class TagDto:
class VideoHistoryDao:
def __init__ (
self,
conn,
conn: MySQLConnectionAbstract,
):
self.conn = conn
@@ -711,7 +786,7 @@ class VideoHistoryDao:
with_relation_tables: bool,
) -> list[VideoHistoryDto]:
with self.conn.cursor (dictionary = True) as c:
c.execute ("""
print (c.execute ("""
SELECT
id,
video_id,
@@ -720,9 +795,9 @@ class VideoHistoryDao:
FROM
video_histories
WHERE
video_id = %s""", (video_id,))
video_id = %s""", (video_id,)))
video_histories: list[VideoHistoryDto] = []
for row in c.fetchall ():
for row in cast (list[VideoHistoryRow], c.fetchall ()):
video_histories.append (self._create_dto_from_row (row, with_relation_tables))
return video_histories
@@ -731,7 +806,7 @@ class VideoHistoryDao:
video_history: VideoHistoryDto,
) -> None:
with self.conn.cursor (dictionary = True) as c:
c.execute ("""
print (c.execute ("""
INSERT INTO
video_histories(
video_id,
@@ -743,14 +818,14 @@ class VideoHistoryDao:
%s,
%s)""", (video_history.video_id,
video_history.fetched_at,
video_history.views_count))
video_history.views_count)))
def upsert (
self,
video_history: VideoHistoryDto,
) -> None:
with self.conn.cursor (dictionary = True) as c:
c.execute ("""
print (c.execute ("""
INSERT INTO
video_histories(
video_id,
@@ -766,7 +841,7 @@ class VideoHistoryDao:
fetched_at,
views_count""", (video_history.video_id,
video_history.fetched_at,
video_history.views_count))
video_history.views_count)))
def upsert_all (
self,
@@ -777,7 +852,7 @@ class VideoHistoryDao:
def _create_dto_from_row (
self,
row,
row: VideoHistoryRow,
with_relation_tables: bool,
) -> VideoHistoryDto:
video_history = VideoHistoryDto (id_ = row['id'],
@@ -792,7 +867,7 @@ class VideoHistoryDao:
@dataclass (slots = True)
class VideoHistoryDto:
video_id: int
fetched_at: datetime
fetched_at: date
views_count: int
id_: int | None = None
video: VideoDto | None = None
@@ -801,7 +876,7 @@ class VideoHistoryDto:
class CommentDao:
def __init__ (
self,
conn,
conn: MySQLConnectionAbstract,
):
self.conn = conn
@@ -811,7 +886,7 @@ class CommentDao:
with_relation_tables: bool,
) -> list[CommentDto]:
with self.conn.cursor (dictionary = True) as c:
c.execute ("""
print (c.execute ("""
SELECT
id,
video_id,
@@ -824,9 +899,9 @@ class CommentDao:
FROM
comments
WHERE
video_id = %s""", (video_id,))
video_id = %s""", (video_id,)))
comments: list[CommentDto] = []
for row in c.fetchall ():
for row in cast (list[CommentRow], c.fetchall ()):
comments.append (self._create_dto_from_row (row, with_relation_tables))
return comments
@@ -835,8 +910,15 @@ class CommentDao:
comment: CommentDto,
with_relation_tables: bool,
) -> None:
vpos_ms: int | DbNullType | None = comment.vpos_ms
if vpos_ms is None:
raise TypeError ('未実装')
if vpos_ms is DbNull:
vpos_ms = None
vpos_ms = cast (int | None, vpos_ms)
with self.conn.cursor (dictionary = True) as c:
c.execute ("""
print (c.execute ("""
INSERT INTO
comments(
video_id,
@@ -868,7 +950,7 @@ class CommentDao:
comment.content,
comment.posted_at,
comment.nico_count,
comment.vpos_ms))
vpos_ms)))
def upsert_all (
self,
@@ -880,7 +962,7 @@ class CommentDao:
def _create_dto_from_row (
self,
row,
row: CommentRow,
with_relation_tables: bool,
) -> CommentDto:
comment = CommentDto (id_ = row['id'],
@@ -890,7 +972,7 @@ class CommentDao:
content = row['content'],
posted_at = row['posted_at'],
nico_count = row['nico_count'],
vpos_ms = row['vpos_ms'])
vpos_ms = row['vpos_ms'] or DbNull)
if with_relation_tables:
comment.video = VideoDao (self.conn).find (comment.video_id, True)
return comment
@@ -913,7 +995,7 @@ class CommentDto:
class UserDao:
def __init__ (
self,
conn,
conn: MySQLConnectionAbstract,
):
self.conn = conn
@@ -922,15 +1004,15 @@ class UserDao:
user_code: str
) -> UserDto | None:
with self.conn.cursor (dictionary = True) as c:
c.execute ("""
print (c.execute ("""
SELECT
id,
code
FROM
users
WHERE
code = %s""", (user_code,))
row = c.fetchone ()
code = %s""", (user_code,)))
row = cast (UserRow | None, c.fetchone ())
if row is None:
return None
return self._create_dto_from_row (row)
@@ -940,16 +1022,16 @@ class UserDao:
user: UserDto,
) -> None:
with self.conn.cursor (dictionary = True) as c:
c.execute ("""
print (c.execute ("""
INSERT INTO
users(code)
VALUES
(%s)""", (user.code,))
(%s)""", (user.code,)))
user.id_ = c.lastrowid
def _create_dto_from_row (
self,
row,
row: UserRow,
) -> UserDto:
return UserDto (id_ = row['id'],
code = row['code'])