コミットを比較
2 コミット
| 作成者 | SHA1 | 日付 | |
|---|---|---|---|
| 28471cb318 | |||
| d6bf49033b |
+160
-78
@@ -10,15 +10,20 @@ import random
|
||||
import string
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Any, TypedDict, cast
|
||||
from datetime import date, datetime, timedelta
|
||||
from typing import Any, Self, Type, TypedDict, cast
|
||||
|
||||
import mysql.connector
|
||||
import requests
|
||||
from mysql.connector.connection import MySQLConnectionAbstract
|
||||
|
||||
# TODO: “何もしなぃ” を意味する None と区別可能にするため,NULL クラスを別途用意すること
|
||||
DbNull = None
|
||||
DbNullType = type (None)
|
||||
|
||||
class DbNull:
|
||||
def __new__ (
|
||||
cls,
|
||||
):
|
||||
delattr (cls, '__init__')
|
||||
DbNullType = Type[DbNull]
|
||||
|
||||
|
||||
class VideoSearchParam (TypedDict):
|
||||
@@ -55,11 +60,58 @@ class CommentResult (TypedDict):
|
||||
isMyPost: bool
|
||||
|
||||
|
||||
class CommentRow (TypedDict):
|
||||
id: int
|
||||
video_id: int
|
||||
comment_no: int
|
||||
user_id: int
|
||||
content: str
|
||||
posted_at: datetime
|
||||
nico_count: int
|
||||
vpos_ms: int | None
|
||||
|
||||
|
||||
class TagRow (TypedDict):
|
||||
id: int
|
||||
name: str
|
||||
|
||||
|
||||
class UserRow (TypedDict):
|
||||
id: int
|
||||
code: str
|
||||
|
||||
|
||||
class VideoRow (TypedDict):
|
||||
id: int
|
||||
code: str
|
||||
title: str
|
||||
description: str
|
||||
uploaded_at: datetime
|
||||
deleted_at: datetime | None
|
||||
|
||||
|
||||
class VideoHistoryRow (TypedDict):
|
||||
id: int
|
||||
video_id: int
|
||||
fetched_at: date
|
||||
views_count: int
|
||||
|
||||
|
||||
class VideoTagRow (TypedDict):
|
||||
id: int
|
||||
video_id: int
|
||||
tag_id: int
|
||||
tagged_at: date
|
||||
untagged_at: date | None
|
||||
|
||||
|
||||
def main (
|
||||
) -> None:
|
||||
conn = mysql.connector.connect (user = os.environ['MYSQL_USER'],
|
||||
password = os.environ['MYSQL_PASS'],
|
||||
database = 'nizika_nico')
|
||||
if not isinstance (conn, MySQLConnectionAbstract):
|
||||
raise TypeError
|
||||
|
||||
now = datetime.now ()
|
||||
|
||||
@@ -76,6 +128,8 @@ def main (
|
||||
api_data, now)
|
||||
|
||||
conn.commit ()
|
||||
print ('Committed.')
|
||||
|
||||
conn.close ()
|
||||
|
||||
|
||||
@@ -248,7 +302,7 @@ def search_nico_by_tags (
|
||||
class VideoDao:
|
||||
def __init__ (
|
||||
self,
|
||||
conn
|
||||
conn: MySQLConnectionAbstract,
|
||||
):
|
||||
self.conn = conn
|
||||
|
||||
@@ -258,7 +312,7 @@ class VideoDao:
|
||||
with_relation_tables: bool,
|
||||
) -> VideoDto | None:
|
||||
with self.conn.cursor (dictionary = True) as c:
|
||||
c.execute ("""
|
||||
print (c.execute ("""
|
||||
SELECT
|
||||
id,
|
||||
code,
|
||||
@@ -271,8 +325,8 @@ class VideoDao:
|
||||
WHERE
|
||||
id = %s
|
||||
ORDER BY
|
||||
id""", (video_id,))
|
||||
row = c.fetchone ()
|
||||
id""", (video_id,)))
|
||||
row = cast (VideoRow | None, c.fetchone ())
|
||||
if row is None:
|
||||
return None
|
||||
return self._create_dto_from_row (row, with_relation_tables)
|
||||
@@ -282,7 +336,7 @@ class VideoDao:
|
||||
with_relation_tables: bool,
|
||||
) -> list[VideoDto]:
|
||||
with self.conn.cursor (dictionary = True) as c:
|
||||
c.execute ("""
|
||||
print (c.execute ("""
|
||||
SELECT
|
||||
id,
|
||||
code,
|
||||
@@ -293,9 +347,9 @@ class VideoDao:
|
||||
FROM
|
||||
videos
|
||||
ORDER BY
|
||||
id""")
|
||||
id"""))
|
||||
videos: list[VideoDto] = []
|
||||
for row in c.fetchall ():
|
||||
for row in cast (list[VideoRow], c.fetchall ()):
|
||||
videos.append (self._create_dto_from_row (row, with_relation_tables))
|
||||
return videos
|
||||
|
||||
@@ -303,7 +357,7 @@ class VideoDao:
|
||||
self,
|
||||
) -> list[VideoDto]:
|
||||
with self.conn.cursor (dictionary = True) as c:
|
||||
c.execute ("""
|
||||
print (c.execute ("""
|
||||
SELECT
|
||||
id,
|
||||
code,
|
||||
@@ -314,9 +368,9 @@ class VideoDao:
|
||||
FROM
|
||||
videos
|
||||
WHERE
|
||||
deleted_at IS NULL""")
|
||||
deleted_at IS NULL"""))
|
||||
videos: list[VideoDto] = []
|
||||
for row in c.fetchall ():
|
||||
for row in cast (list[VideoRow], c.fetchall ()):
|
||||
videos.append (self._create_dto_from_row (row, False))
|
||||
return videos
|
||||
|
||||
@@ -325,8 +379,15 @@ class VideoDao:
|
||||
video: VideoDto,
|
||||
with_relation_tables: bool,
|
||||
) -> None:
|
||||
deleted_at: datetime | DbNullType | None = video.deleted_at
|
||||
if deleted_at is None:
|
||||
raise TypeError ('未実装')
|
||||
if deleted_at is DbNull:
|
||||
deleted_at = None
|
||||
deleted_at = cast (datetime | None, deleted_at)
|
||||
|
||||
with self.conn.cursor (dictionary = True) as c:
|
||||
c.execute ("""
|
||||
print (c.execute ("""
|
||||
INSERT INTO
|
||||
videos(
|
||||
code,
|
||||
@@ -350,7 +411,7 @@ class VideoDao:
|
||||
video.title,
|
||||
video.description,
|
||||
video.uploaded_at,
|
||||
video.deleted_at))
|
||||
deleted_at)))
|
||||
video.id_ = c.lastrowid
|
||||
if with_relation_tables:
|
||||
if video.video_tags is not None:
|
||||
@@ -376,17 +437,17 @@ class VideoDao:
|
||||
if not video_ids:
|
||||
return
|
||||
with self.conn.cursor (dictionary = True) as c:
|
||||
c.execute ("""
|
||||
print (c.execute ("""
|
||||
UPDATE
|
||||
videos
|
||||
SET
|
||||
deleted_at = %%s
|
||||
WHERE
|
||||
id IN (%s)""" % ', '.join (['%s'] * len (video_ids)), (at, *video_ids))
|
||||
id IN (%s)""" % ', '.join (['%s'] * len (video_ids)), (at, *video_ids)))
|
||||
|
||||
def _create_dto_from_row (
|
||||
self,
|
||||
row,
|
||||
row: VideoRow,
|
||||
with_relation_tables: bool,
|
||||
) -> VideoDto:
|
||||
video = VideoDto (id_ = row['id'],
|
||||
@@ -394,7 +455,7 @@ class VideoDao:
|
||||
title = row['title'],
|
||||
description = row['description'],
|
||||
uploaded_at = row['uploaded_at'],
|
||||
deleted_at = row['deleted_at'])
|
||||
deleted_at = row['deleted_at'] or DbNull)
|
||||
if with_relation_tables and video.id_ is not None:
|
||||
video.video_tags = VideoTagDao (self.conn).fetch_by_video_id (video.id_, False)
|
||||
for i in range (len (video.video_tags)):
|
||||
@@ -424,7 +485,7 @@ class VideoDto:
|
||||
class VideoTagDao:
|
||||
def __init__ (
|
||||
self,
|
||||
conn,
|
||||
conn: MySQLConnectionAbstract,
|
||||
):
|
||||
self.conn = conn
|
||||
|
||||
@@ -434,7 +495,7 @@ class VideoTagDao:
|
||||
with_relation_tables: bool,
|
||||
) -> list[VideoTagDto]:
|
||||
with self.conn.cursor (dictionary = True) as c:
|
||||
c.execute ("""
|
||||
print (c.execute ("""
|
||||
SELECT
|
||||
id,
|
||||
video_id,
|
||||
@@ -446,9 +507,9 @@ class VideoTagDao:
|
||||
WHERE
|
||||
video_id = %s
|
||||
ORDER BY
|
||||
id""", (video_id,))
|
||||
id""", (video_id,)))
|
||||
video_tags: list[VideoTagDto] = []
|
||||
for row in c.fetchall ():
|
||||
for row in cast (list[VideoTagRow], c.fetchall ()):
|
||||
video_tags.append (self._create_dto_from_row (row, with_relation_tables))
|
||||
return video_tags
|
||||
|
||||
@@ -458,7 +519,7 @@ class VideoTagDao:
|
||||
with_relation_tables: bool,
|
||||
) -> list[VideoTagDto]:
|
||||
with self.conn.cursor (dictionary = True) as c:
|
||||
c.execute ("""
|
||||
print (c.execute ("""
|
||||
SELECT
|
||||
id,
|
||||
video_id,
|
||||
@@ -471,9 +532,9 @@ class VideoTagDao:
|
||||
video_id = %s
|
||||
AND (untagged_at IS NULL)
|
||||
ORDER BY
|
||||
id""", (video_id,))
|
||||
id""", (video_id,)))
|
||||
video_tags: list[VideoTagDto] = []
|
||||
for row in c.fetchall ():
|
||||
for row in cast (list[VideoTagRow], c.fetchall ()):
|
||||
video_tags.append (self._create_dto_from_row (row, with_relation_tables))
|
||||
return video_tags
|
||||
|
||||
@@ -484,7 +545,7 @@ class VideoTagDao:
|
||||
with_relation_tables: bool,
|
||||
) -> VideoTagDto | None:
|
||||
with self.conn.cursor (dictionary = True) as c:
|
||||
c.execute ("""
|
||||
print (c.execute ("""
|
||||
SELECT
|
||||
id,
|
||||
video_id,
|
||||
@@ -495,8 +556,8 @@ class VideoTagDao:
|
||||
video_tags
|
||||
WHERE
|
||||
video_id = %s
|
||||
AND tag_id = %s""", (video_id, tag_id))
|
||||
row = c.fetchone ()
|
||||
AND tag_id = %s""", (video_id, tag_id)))
|
||||
row = cast (VideoTagRow, c.fetchone ())
|
||||
if row is None:
|
||||
return None
|
||||
return self._create_dto_from_row (row, with_relation_tables)
|
||||
@@ -506,8 +567,15 @@ class VideoTagDao:
|
||||
video_tag: VideoTagDto,
|
||||
with_relation_tables: bool,
|
||||
) -> None:
|
||||
untagged_at: date | DbNullType | None = video_tag.untagged_at
|
||||
if untagged_at is None:
|
||||
raise TypeError ('未実装')
|
||||
if untagged_at is DbNull:
|
||||
untagged_at = None
|
||||
untagged_at = cast (date | None, untagged_at)
|
||||
|
||||
with self.conn.cursor (dictionary = True) as c:
|
||||
c.execute ("""
|
||||
print (c.execute ("""
|
||||
INSERT INTO
|
||||
video_tags(
|
||||
video_id,
|
||||
@@ -520,7 +588,7 @@ class VideoTagDao:
|
||||
%s,
|
||||
%s,
|
||||
%s)""", (video_tag.video_id, video_tag.tag_id,
|
||||
video_tag.tagged_at, video_tag.untagged_at))
|
||||
video_tag.tagged_at, untagged_at)))
|
||||
video_tag.id_ = c.lastrowid
|
||||
if with_relation_tables:
|
||||
if video_tag.video is not None:
|
||||
@@ -533,8 +601,15 @@ class VideoTagDao:
|
||||
video_tag: VideoTagDto,
|
||||
with_relation_tables: bool,
|
||||
) -> None:
|
||||
untagged_at: date | DbNullType | None = video_tag.untagged_at
|
||||
if untagged_at is None:
|
||||
raise TypeError ('未実装')
|
||||
if untagged_at is DbNull:
|
||||
untagged_at = None
|
||||
untagged_at = cast (date | None, untagged_at)
|
||||
|
||||
with self.conn.cursor (dictionary = True) as c:
|
||||
c.execute ("""
|
||||
print (c.execute ("""
|
||||
INSERT INTO
|
||||
video_tags(
|
||||
video_id,
|
||||
@@ -554,7 +629,7 @@ class VideoTagDao:
|
||||
untagged_at = VALUES(untagged_at)""", (video_tag.video_id,
|
||||
video_tag.tag_id,
|
||||
video_tag.tagged_at,
|
||||
video_tag.untagged_at))
|
||||
untagged_at)))
|
||||
video_tag.id_ = c.lastrowid
|
||||
if with_relation_tables:
|
||||
if video_tag.video is not None:
|
||||
@@ -579,25 +654,25 @@ class VideoTagDao:
|
||||
if not tag_ids:
|
||||
return
|
||||
with self.conn.cursor (dictionary = True) as c:
|
||||
c.execute ("""
|
||||
print (c.execute ("""
|
||||
UPDATE
|
||||
video_tags
|
||||
SET
|
||||
untagged_at = %%s
|
||||
WHERE
|
||||
video_id = %%s
|
||||
AND tag_ids IN (%s)""" % ', '.join (['%s'] * len (tag_ids)), (now, video_id, *tag_ids))
|
||||
AND tag_ids IN (%s)""" % ', '.join (['%s'] * len (tag_ids)), (now, video_id, *tag_ids)))
|
||||
|
||||
def _create_dto_from_row (
|
||||
self,
|
||||
row,
|
||||
row: VideoTagRow,
|
||||
with_relation_tables: bool,
|
||||
) -> VideoTagDto:
|
||||
video_tag = VideoTagDto (id_ = row['id'],
|
||||
video_id = row['video_id'],
|
||||
tag_id = row['tag_id'],
|
||||
tagged_at = row['tagged_at'],
|
||||
untagged_at = row['untagged_at'])
|
||||
untagged_at = row['untagged_at'] or DbNull)
|
||||
if with_relation_tables:
|
||||
video_tag.video = VideoDao (self.conn).find (video_tag.video_id, True)
|
||||
video_tag.tag = TagDao (self.conn).find (video_tag.tag_id)
|
||||
@@ -608,9 +683,9 @@ class VideoTagDao:
|
||||
class VideoTagDto:
|
||||
video_id: int
|
||||
tag_id: int
|
||||
tagged_at: datetime
|
||||
tagged_at: date
|
||||
id_: int | None = None
|
||||
untagged_at: datetime | DbNullType = DbNull
|
||||
untagged_at: date | DbNullType = DbNull
|
||||
video: VideoDto | None = None
|
||||
tag: TagDto | None = None
|
||||
|
||||
@@ -618,7 +693,7 @@ class VideoTagDto:
|
||||
class TagDao:
|
||||
def __init__ (
|
||||
self,
|
||||
conn,
|
||||
conn: MySQLConnectionAbstract,
|
||||
):
|
||||
self.conn = conn
|
||||
|
||||
@@ -627,15 +702,15 @@ class TagDao:
|
||||
tag_id: int,
|
||||
) -> TagDto | None:
|
||||
with self.conn.cursor (dictionary = True) as c:
|
||||
c.execute ("""
|
||||
print (c.execute ("""
|
||||
SELECT
|
||||
id,
|
||||
name
|
||||
FROM
|
||||
tags
|
||||
WHERE
|
||||
id = %s""", (tag_id,))
|
||||
row = c.fetchone ()
|
||||
id = %s""", (tag_id,)))
|
||||
row = cast (TagRow | None, c.fetchone ())
|
||||
if row is None:
|
||||
return None
|
||||
return self._create_dto_from_row (row)
|
||||
@@ -645,15 +720,15 @@ class TagDao:
|
||||
tag_name: str,
|
||||
) -> TagDto | None:
|
||||
with self.conn.cursor (dictionary = True) as c:
|
||||
c.execute ("""
|
||||
print (c.execute ("""
|
||||
SELECT
|
||||
id,
|
||||
name
|
||||
FROM
|
||||
tags
|
||||
WHERE
|
||||
name = %s""", (tag_name,))
|
||||
row = c.fetchone ()
|
||||
name = %s""", (tag_name,)))
|
||||
row = cast (TagRow | None, c.fetchone ())
|
||||
if row is None:
|
||||
return None
|
||||
return self._create_dto_from_row (row)
|
||||
@@ -663,11 +738,11 @@ class TagDao:
|
||||
tag: TagDto,
|
||||
) -> None:
|
||||
with self.conn.cursor (dictionary = True) as c:
|
||||
c.execute ("""
|
||||
print (c.execute ("""
|
||||
INSERT INTO
|
||||
tags(name)
|
||||
VALUES
|
||||
(%s)""", (tag.name,))
|
||||
(%s)""", (tag.name,)))
|
||||
tag.id_ = c.lastrowid
|
||||
|
||||
def upsert (
|
||||
@@ -675,18 +750,18 @@ class TagDao:
|
||||
tag: TagDto,
|
||||
) -> None:
|
||||
with self.conn.cursor (dictionary = True) as c:
|
||||
c.execute ("""
|
||||
print (c.execute ("""
|
||||
INSERT INTO
|
||||
tags(name)
|
||||
VALUES
|
||||
(%s)
|
||||
ON DUPLICATE KEY UPDATE
|
||||
name = VALUES(name)""", (tag.name,))
|
||||
name = VALUES(name)""", (tag.name,)))
|
||||
tag.id_ = c.lastrowid
|
||||
|
||||
def _create_dto_from_row (
|
||||
self,
|
||||
row,
|
||||
row: TagRow,
|
||||
) -> TagDto:
|
||||
return TagDto (id_ = row['id'],
|
||||
name = row['name'])
|
||||
@@ -701,7 +776,7 @@ class TagDto:
|
||||
class VideoHistoryDao:
|
||||
def __init__ (
|
||||
self,
|
||||
conn,
|
||||
conn: MySQLConnectionAbstract,
|
||||
):
|
||||
self.conn = conn
|
||||
|
||||
@@ -711,7 +786,7 @@ class VideoHistoryDao:
|
||||
with_relation_tables: bool,
|
||||
) -> list[VideoHistoryDto]:
|
||||
with self.conn.cursor (dictionary = True) as c:
|
||||
c.execute ("""
|
||||
print (c.execute ("""
|
||||
SELECT
|
||||
id,
|
||||
video_id,
|
||||
@@ -720,9 +795,9 @@ class VideoHistoryDao:
|
||||
FROM
|
||||
video_histories
|
||||
WHERE
|
||||
video_id = %s""", (video_id,))
|
||||
video_id = %s""", (video_id,)))
|
||||
video_histories: list[VideoHistoryDto] = []
|
||||
for row in c.fetchall ():
|
||||
for row in cast (list[VideoHistoryRow], c.fetchall ()):
|
||||
video_histories.append (self._create_dto_from_row (row, with_relation_tables))
|
||||
return video_histories
|
||||
|
||||
@@ -731,7 +806,7 @@ class VideoHistoryDao:
|
||||
video_history: VideoHistoryDto,
|
||||
) -> None:
|
||||
with self.conn.cursor (dictionary = True) as c:
|
||||
c.execute ("""
|
||||
print (c.execute ("""
|
||||
INSERT INTO
|
||||
video_histories(
|
||||
video_id,
|
||||
@@ -743,14 +818,14 @@ class VideoHistoryDao:
|
||||
%s,
|
||||
%s)""", (video_history.video_id,
|
||||
video_history.fetched_at,
|
||||
video_history.views_count))
|
||||
video_history.views_count)))
|
||||
|
||||
def upsert (
|
||||
self,
|
||||
video_history: VideoHistoryDto,
|
||||
) -> None:
|
||||
with self.conn.cursor (dictionary = True) as c:
|
||||
c.execute ("""
|
||||
print (c.execute ("""
|
||||
INSERT INTO
|
||||
video_histories(
|
||||
video_id,
|
||||
@@ -766,7 +841,7 @@ class VideoHistoryDao:
|
||||
fetched_at,
|
||||
views_count""", (video_history.video_id,
|
||||
video_history.fetched_at,
|
||||
video_history.views_count))
|
||||
video_history.views_count)))
|
||||
|
||||
def upsert_all (
|
||||
self,
|
||||
@@ -777,7 +852,7 @@ class VideoHistoryDao:
|
||||
|
||||
def _create_dto_from_row (
|
||||
self,
|
||||
row,
|
||||
row: VideoHistoryRow,
|
||||
with_relation_tables: bool,
|
||||
) -> VideoHistoryDto:
|
||||
video_history = VideoHistoryDto (id_ = row['id'],
|
||||
@@ -792,7 +867,7 @@ class VideoHistoryDao:
|
||||
@dataclass (slots = True)
|
||||
class VideoHistoryDto:
|
||||
video_id: int
|
||||
fetched_at: datetime
|
||||
fetched_at: date
|
||||
views_count: int
|
||||
id_: int | None = None
|
||||
video: VideoDto | None = None
|
||||
@@ -801,7 +876,7 @@ class VideoHistoryDto:
|
||||
class CommentDao:
|
||||
def __init__ (
|
||||
self,
|
||||
conn,
|
||||
conn: MySQLConnectionAbstract,
|
||||
):
|
||||
self.conn = conn
|
||||
|
||||
@@ -811,7 +886,7 @@ class CommentDao:
|
||||
with_relation_tables: bool,
|
||||
) -> list[CommentDto]:
|
||||
with self.conn.cursor (dictionary = True) as c:
|
||||
c.execute ("""
|
||||
print (c.execute ("""
|
||||
SELECT
|
||||
id,
|
||||
video_id,
|
||||
@@ -824,9 +899,9 @@ class CommentDao:
|
||||
FROM
|
||||
comments
|
||||
WHERE
|
||||
video_id = %s""", (video_id,))
|
||||
video_id = %s""", (video_id,)))
|
||||
comments: list[CommentDto] = []
|
||||
for row in c.fetchall ():
|
||||
for row in cast (list[CommentRow], c.fetchall ()):
|
||||
comments.append (self._create_dto_from_row (row, with_relation_tables))
|
||||
return comments
|
||||
|
||||
@@ -835,8 +910,15 @@ class CommentDao:
|
||||
comment: CommentDto,
|
||||
with_relation_tables: bool,
|
||||
) -> None:
|
||||
vpos_ms: int | DbNullType | None = comment.vpos_ms
|
||||
if vpos_ms is None:
|
||||
raise TypeError ('未実装')
|
||||
if vpos_ms is DbNull:
|
||||
vpos_ms = None
|
||||
vpos_ms = cast (int | None, vpos_ms)
|
||||
|
||||
with self.conn.cursor (dictionary = True) as c:
|
||||
c.execute ("""
|
||||
print (c.execute ("""
|
||||
INSERT INTO
|
||||
comments(
|
||||
video_id,
|
||||
@@ -868,7 +950,7 @@ class CommentDao:
|
||||
comment.content,
|
||||
comment.posted_at,
|
||||
comment.nico_count,
|
||||
comment.vpos_ms))
|
||||
vpos_ms)))
|
||||
|
||||
def upsert_all (
|
||||
self,
|
||||
@@ -880,7 +962,7 @@ class CommentDao:
|
||||
|
||||
def _create_dto_from_row (
|
||||
self,
|
||||
row,
|
||||
row: CommentRow,
|
||||
with_relation_tables: bool,
|
||||
) -> CommentDto:
|
||||
comment = CommentDto (id_ = row['id'],
|
||||
@@ -890,7 +972,7 @@ class CommentDao:
|
||||
content = row['content'],
|
||||
posted_at = row['posted_at'],
|
||||
nico_count = row['nico_count'],
|
||||
vpos_ms = row['vpos_ms'])
|
||||
vpos_ms = row['vpos_ms'] or DbNull)
|
||||
if with_relation_tables:
|
||||
comment.video = VideoDao (self.conn).find (comment.video_id, True)
|
||||
return comment
|
||||
@@ -913,7 +995,7 @@ class CommentDto:
|
||||
class UserDao:
|
||||
def __init__ (
|
||||
self,
|
||||
conn,
|
||||
conn: MySQLConnectionAbstract,
|
||||
):
|
||||
self.conn = conn
|
||||
|
||||
@@ -922,15 +1004,15 @@ class UserDao:
|
||||
user_code: str
|
||||
) -> UserDto | None:
|
||||
with self.conn.cursor (dictionary = True) as c:
|
||||
c.execute ("""
|
||||
print (c.execute ("""
|
||||
SELECT
|
||||
id,
|
||||
code
|
||||
FROM
|
||||
users
|
||||
WHERE
|
||||
code = %s""", (user_code,))
|
||||
row = c.fetchone ()
|
||||
code = %s""", (user_code,)))
|
||||
row = cast (UserRow | None, c.fetchone ())
|
||||
if row is None:
|
||||
return None
|
||||
return self._create_dto_from_row (row)
|
||||
@@ -940,16 +1022,16 @@ class UserDao:
|
||||
user: UserDto,
|
||||
) -> None:
|
||||
with self.conn.cursor (dictionary = True) as c:
|
||||
c.execute ("""
|
||||
print (c.execute ("""
|
||||
INSERT INTO
|
||||
users(code)
|
||||
VALUES
|
||||
(%s)""", (user.code,))
|
||||
(%s)""", (user.code,)))
|
||||
user.id_ = c.lastrowid
|
||||
|
||||
def _create_dto_from_row (
|
||||
self,
|
||||
row,
|
||||
row: UserRow,
|
||||
) -> UserDto:
|
||||
return UserDto (id_ = row['id'],
|
||||
code = row['code'])
|
||||
|
||||
新しい課題から参照
ユーザをブロックする