2 コミット

作成者 SHA1 メッセージ 日付
みてるぞ 28471cb318 SQL ログ追加 2024-10-10 02:15:12 +09:00
みてるぞ d6bf49033b DbNull の修正と型定義整備 2024-10-10 00:26:56 +09:00
+160 -78
ファイルの表示
@@ -10,15 +10,20 @@ import random
import string import string
import time import time
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timedelta from datetime import date, datetime, timedelta
from typing import Any, TypedDict, cast from typing import Any, Self, Type, TypedDict, cast
import mysql.connector import mysql.connector
import requests import requests
from mysql.connector.connection import MySQLConnectionAbstract
# TODO: “何もしなぃ” を意味する None と区別可能にするため,NULL クラスを別途用意すること
DbNull = None class DbNull:
DbNullType = type (None) def __new__ (
cls,
):
delattr (cls, '__init__')
DbNullType = Type[DbNull]
class VideoSearchParam (TypedDict): class VideoSearchParam (TypedDict):
@@ -55,11 +60,58 @@ class CommentResult (TypedDict):
isMyPost: bool isMyPost: bool
class CommentRow (TypedDict):
id: int
video_id: int
comment_no: int
user_id: int
content: str
posted_at: datetime
nico_count: int
vpos_ms: int | None
class TagRow (TypedDict):
id: int
name: str
class UserRow (TypedDict):
id: int
code: str
class VideoRow (TypedDict):
id: int
code: str
title: str
description: str
uploaded_at: datetime
deleted_at: datetime | None
class VideoHistoryRow (TypedDict):
id: int
video_id: int
fetched_at: date
views_count: int
class VideoTagRow (TypedDict):
id: int
video_id: int
tag_id: int
tagged_at: date
untagged_at: date | None
def main ( def main (
) -> None: ) -> None:
conn = mysql.connector.connect (user = os.environ['MYSQL_USER'], conn = mysql.connector.connect (user = os.environ['MYSQL_USER'],
password = os.environ['MYSQL_PASS'], password = os.environ['MYSQL_PASS'],
database = 'nizika_nico') database = 'nizika_nico')
if not isinstance (conn, MySQLConnectionAbstract):
raise TypeError
now = datetime.now () now = datetime.now ()
@@ -76,6 +128,8 @@ def main (
api_data, now) api_data, now)
conn.commit () conn.commit ()
print ('Committed.')
conn.close () conn.close ()
@@ -248,7 +302,7 @@ def search_nico_by_tags (
class VideoDao: class VideoDao:
def __init__ ( def __init__ (
self, self,
conn conn: MySQLConnectionAbstract,
): ):
self.conn = conn self.conn = conn
@@ -258,7 +312,7 @@ class VideoDao:
with_relation_tables: bool, with_relation_tables: bool,
) -> VideoDto | None: ) -> VideoDto | None:
with self.conn.cursor (dictionary = True) as c: with self.conn.cursor (dictionary = True) as c:
c.execute (""" print (c.execute ("""
SELECT SELECT
id, id,
code, code,
@@ -271,8 +325,8 @@ class VideoDao:
WHERE WHERE
id = %s id = %s
ORDER BY ORDER BY
id""", (video_id,)) id""", (video_id,)))
row = c.fetchone () row = cast (VideoRow | None, c.fetchone ())
if row is None: if row is None:
return None return None
return self._create_dto_from_row (row, with_relation_tables) return self._create_dto_from_row (row, with_relation_tables)
@@ -282,7 +336,7 @@ class VideoDao:
with_relation_tables: bool, with_relation_tables: bool,
) -> list[VideoDto]: ) -> list[VideoDto]:
with self.conn.cursor (dictionary = True) as c: with self.conn.cursor (dictionary = True) as c:
c.execute (""" print (c.execute ("""
SELECT SELECT
id, id,
code, code,
@@ -293,9 +347,9 @@ class VideoDao:
FROM FROM
videos videos
ORDER BY ORDER BY
id""") id"""))
videos: list[VideoDto] = [] videos: list[VideoDto] = []
for row in c.fetchall (): for row in cast (list[VideoRow], c.fetchall ()):
videos.append (self._create_dto_from_row (row, with_relation_tables)) videos.append (self._create_dto_from_row (row, with_relation_tables))
return videos return videos
@@ -303,7 +357,7 @@ class VideoDao:
self, self,
) -> list[VideoDto]: ) -> list[VideoDto]:
with self.conn.cursor (dictionary = True) as c: with self.conn.cursor (dictionary = True) as c:
c.execute (""" print (c.execute ("""
SELECT SELECT
id, id,
code, code,
@@ -314,9 +368,9 @@ class VideoDao:
FROM FROM
videos videos
WHERE WHERE
deleted_at IS NULL""") deleted_at IS NULL"""))
videos: list[VideoDto] = [] videos: list[VideoDto] = []
for row in c.fetchall (): for row in cast (list[VideoRow], c.fetchall ()):
videos.append (self._create_dto_from_row (row, False)) videos.append (self._create_dto_from_row (row, False))
return videos return videos
@@ -325,8 +379,15 @@ class VideoDao:
video: VideoDto, video: VideoDto,
with_relation_tables: bool, with_relation_tables: bool,
) -> None: ) -> None:
deleted_at: datetime | DbNullType | None = video.deleted_at
if deleted_at is None:
raise TypeError ('未実装')
if deleted_at is DbNull:
deleted_at = None
deleted_at = cast (datetime | None, deleted_at)
with self.conn.cursor (dictionary = True) as c: with self.conn.cursor (dictionary = True) as c:
c.execute (""" print (c.execute ("""
INSERT INTO INSERT INTO
videos( videos(
code, code,
@@ -350,7 +411,7 @@ class VideoDao:
video.title, video.title,
video.description, video.description,
video.uploaded_at, video.uploaded_at,
video.deleted_at)) deleted_at)))
video.id_ = c.lastrowid video.id_ = c.lastrowid
if with_relation_tables: if with_relation_tables:
if video.video_tags is not None: if video.video_tags is not None:
@@ -376,17 +437,17 @@ class VideoDao:
if not video_ids: if not video_ids:
return return
with self.conn.cursor (dictionary = True) as c: with self.conn.cursor (dictionary = True) as c:
c.execute (""" print (c.execute ("""
UPDATE UPDATE
videos videos
SET SET
deleted_at = %%s deleted_at = %%s
WHERE WHERE
id IN (%s)""" % ', '.join (['%s'] * len (video_ids)), (at, *video_ids)) id IN (%s)""" % ', '.join (['%s'] * len (video_ids)), (at, *video_ids)))
def _create_dto_from_row ( def _create_dto_from_row (
self, self,
row, row: VideoRow,
with_relation_tables: bool, with_relation_tables: bool,
) -> VideoDto: ) -> VideoDto:
video = VideoDto (id_ = row['id'], video = VideoDto (id_ = row['id'],
@@ -394,7 +455,7 @@ class VideoDao:
title = row['title'], title = row['title'],
description = row['description'], description = row['description'],
uploaded_at = row['uploaded_at'], uploaded_at = row['uploaded_at'],
deleted_at = row['deleted_at']) deleted_at = row['deleted_at'] or DbNull)
if with_relation_tables and video.id_ is not None: if with_relation_tables and video.id_ is not None:
video.video_tags = VideoTagDao (self.conn).fetch_by_video_id (video.id_, False) video.video_tags = VideoTagDao (self.conn).fetch_by_video_id (video.id_, False)
for i in range (len (video.video_tags)): for i in range (len (video.video_tags)):
@@ -424,7 +485,7 @@ class VideoDto:
class VideoTagDao: class VideoTagDao:
def __init__ ( def __init__ (
self, self,
conn, conn: MySQLConnectionAbstract,
): ):
self.conn = conn self.conn = conn
@@ -434,7 +495,7 @@ class VideoTagDao:
with_relation_tables: bool, with_relation_tables: bool,
) -> list[VideoTagDto]: ) -> list[VideoTagDto]:
with self.conn.cursor (dictionary = True) as c: with self.conn.cursor (dictionary = True) as c:
c.execute (""" print (c.execute ("""
SELECT SELECT
id, id,
video_id, video_id,
@@ -446,9 +507,9 @@ class VideoTagDao:
WHERE WHERE
video_id = %s video_id = %s
ORDER BY ORDER BY
id""", (video_id,)) id""", (video_id,)))
video_tags: list[VideoTagDto] = [] video_tags: list[VideoTagDto] = []
for row in c.fetchall (): for row in cast (list[VideoTagRow], c.fetchall ()):
video_tags.append (self._create_dto_from_row (row, with_relation_tables)) video_tags.append (self._create_dto_from_row (row, with_relation_tables))
return video_tags return video_tags
@@ -458,7 +519,7 @@ class VideoTagDao:
with_relation_tables: bool, with_relation_tables: bool,
) -> list[VideoTagDto]: ) -> list[VideoTagDto]:
with self.conn.cursor (dictionary = True) as c: with self.conn.cursor (dictionary = True) as c:
c.execute (""" print (c.execute ("""
SELECT SELECT
id, id,
video_id, video_id,
@@ -471,9 +532,9 @@ class VideoTagDao:
video_id = %s video_id = %s
AND (untagged_at IS NULL) AND (untagged_at IS NULL)
ORDER BY ORDER BY
id""", (video_id,)) id""", (video_id,)))
video_tags: list[VideoTagDto] = [] video_tags: list[VideoTagDto] = []
for row in c.fetchall (): for row in cast (list[VideoTagRow], c.fetchall ()):
video_tags.append (self._create_dto_from_row (row, with_relation_tables)) video_tags.append (self._create_dto_from_row (row, with_relation_tables))
return video_tags return video_tags
@@ -484,7 +545,7 @@ class VideoTagDao:
with_relation_tables: bool, with_relation_tables: bool,
) -> VideoTagDto | None: ) -> VideoTagDto | None:
with self.conn.cursor (dictionary = True) as c: with self.conn.cursor (dictionary = True) as c:
c.execute (""" print (c.execute ("""
SELECT SELECT
id, id,
video_id, video_id,
@@ -495,8 +556,8 @@ class VideoTagDao:
video_tags video_tags
WHERE WHERE
video_id = %s video_id = %s
AND tag_id = %s""", (video_id, tag_id)) AND tag_id = %s""", (video_id, tag_id)))
row = c.fetchone () row = cast (VideoTagRow, c.fetchone ())
if row is None: if row is None:
return None return None
return self._create_dto_from_row (row, with_relation_tables) return self._create_dto_from_row (row, with_relation_tables)
@@ -506,8 +567,15 @@ class VideoTagDao:
video_tag: VideoTagDto, video_tag: VideoTagDto,
with_relation_tables: bool, with_relation_tables: bool,
) -> None: ) -> None:
untagged_at: date | DbNullType | None = video_tag.untagged_at
if untagged_at is None:
raise TypeError ('未実装')
if untagged_at is DbNull:
untagged_at = None
untagged_at = cast (date | None, untagged_at)
with self.conn.cursor (dictionary = True) as c: with self.conn.cursor (dictionary = True) as c:
c.execute (""" print (c.execute ("""
INSERT INTO INSERT INTO
video_tags( video_tags(
video_id, video_id,
@@ -520,7 +588,7 @@ class VideoTagDao:
%s, %s,
%s, %s,
%s)""", (video_tag.video_id, video_tag.tag_id, %s)""", (video_tag.video_id, video_tag.tag_id,
video_tag.tagged_at, video_tag.untagged_at)) video_tag.tagged_at, untagged_at)))
video_tag.id_ = c.lastrowid video_tag.id_ = c.lastrowid
if with_relation_tables: if with_relation_tables:
if video_tag.video is not None: if video_tag.video is not None:
@@ -533,8 +601,15 @@ class VideoTagDao:
video_tag: VideoTagDto, video_tag: VideoTagDto,
with_relation_tables: bool, with_relation_tables: bool,
) -> None: ) -> None:
untagged_at: date | DbNullType | None = video_tag.untagged_at
if untagged_at is None:
raise TypeError ('未実装')
if untagged_at is DbNull:
untagged_at = None
untagged_at = cast (date | None, untagged_at)
with self.conn.cursor (dictionary = True) as c: with self.conn.cursor (dictionary = True) as c:
c.execute (""" print (c.execute ("""
INSERT INTO INSERT INTO
video_tags( video_tags(
video_id, video_id,
@@ -554,7 +629,7 @@ class VideoTagDao:
untagged_at = VALUES(untagged_at)""", (video_tag.video_id, untagged_at = VALUES(untagged_at)""", (video_tag.video_id,
video_tag.tag_id, video_tag.tag_id,
video_tag.tagged_at, video_tag.tagged_at,
video_tag.untagged_at)) untagged_at)))
video_tag.id_ = c.lastrowid video_tag.id_ = c.lastrowid
if with_relation_tables: if with_relation_tables:
if video_tag.video is not None: if video_tag.video is not None:
@@ -579,25 +654,25 @@ class VideoTagDao:
if not tag_ids: if not tag_ids:
return return
with self.conn.cursor (dictionary = True) as c: with self.conn.cursor (dictionary = True) as c:
c.execute (""" print (c.execute ("""
UPDATE UPDATE
video_tags video_tags
SET SET
untagged_at = %%s untagged_at = %%s
WHERE WHERE
video_id = %%s video_id = %%s
AND tag_ids IN (%s)""" % ', '.join (['%s'] * len (tag_ids)), (now, video_id, *tag_ids)) AND tag_ids IN (%s)""" % ', '.join (['%s'] * len (tag_ids)), (now, video_id, *tag_ids)))
def _create_dto_from_row ( def _create_dto_from_row (
self, self,
row, row: VideoTagRow,
with_relation_tables: bool, with_relation_tables: bool,
) -> VideoTagDto: ) -> VideoTagDto:
video_tag = VideoTagDto (id_ = row['id'], video_tag = VideoTagDto (id_ = row['id'],
video_id = row['video_id'], video_id = row['video_id'],
tag_id = row['tag_id'], tag_id = row['tag_id'],
tagged_at = row['tagged_at'], tagged_at = row['tagged_at'],
untagged_at = row['untagged_at']) untagged_at = row['untagged_at'] or DbNull)
if with_relation_tables: if with_relation_tables:
video_tag.video = VideoDao (self.conn).find (video_tag.video_id, True) video_tag.video = VideoDao (self.conn).find (video_tag.video_id, True)
video_tag.tag = TagDao (self.conn).find (video_tag.tag_id) video_tag.tag = TagDao (self.conn).find (video_tag.tag_id)
@@ -608,9 +683,9 @@ class VideoTagDao:
class VideoTagDto: class VideoTagDto:
video_id: int video_id: int
tag_id: int tag_id: int
tagged_at: datetime tagged_at: date
id_: int | None = None id_: int | None = None
untagged_at: datetime | DbNullType = DbNull untagged_at: date | DbNullType = DbNull
video: VideoDto | None = None video: VideoDto | None = None
tag: TagDto | None = None tag: TagDto | None = None
@@ -618,7 +693,7 @@ class VideoTagDto:
class TagDao: class TagDao:
def __init__ ( def __init__ (
self, self,
conn, conn: MySQLConnectionAbstract,
): ):
self.conn = conn self.conn = conn
@@ -627,15 +702,15 @@ class TagDao:
tag_id: int, tag_id: int,
) -> TagDto | None: ) -> TagDto | None:
with self.conn.cursor (dictionary = True) as c: with self.conn.cursor (dictionary = True) as c:
c.execute (""" print (c.execute ("""
SELECT SELECT
id, id,
name name
FROM FROM
tags tags
WHERE WHERE
id = %s""", (tag_id,)) id = %s""", (tag_id,)))
row = c.fetchone () row = cast (TagRow | None, c.fetchone ())
if row is None: if row is None:
return None return None
return self._create_dto_from_row (row) return self._create_dto_from_row (row)
@@ -645,15 +720,15 @@ class TagDao:
tag_name: str, tag_name: str,
) -> TagDto | None: ) -> TagDto | None:
with self.conn.cursor (dictionary = True) as c: with self.conn.cursor (dictionary = True) as c:
c.execute (""" print (c.execute ("""
SELECT SELECT
id, id,
name name
FROM FROM
tags tags
WHERE WHERE
name = %s""", (tag_name,)) name = %s""", (tag_name,)))
row = c.fetchone () row = cast (TagRow | None, c.fetchone ())
if row is None: if row is None:
return None return None
return self._create_dto_from_row (row) return self._create_dto_from_row (row)
@@ -663,11 +738,11 @@ class TagDao:
tag: TagDto, tag: TagDto,
) -> None: ) -> None:
with self.conn.cursor (dictionary = True) as c: with self.conn.cursor (dictionary = True) as c:
c.execute (""" print (c.execute ("""
INSERT INTO INSERT INTO
tags(name) tags(name)
VALUES VALUES
(%s)""", (tag.name,)) (%s)""", (tag.name,)))
tag.id_ = c.lastrowid tag.id_ = c.lastrowid
def upsert ( def upsert (
@@ -675,18 +750,18 @@ class TagDao:
tag: TagDto, tag: TagDto,
) -> None: ) -> None:
with self.conn.cursor (dictionary = True) as c: with self.conn.cursor (dictionary = True) as c:
c.execute (""" print (c.execute ("""
INSERT INTO INSERT INTO
tags(name) tags(name)
VALUES VALUES
(%s) (%s)
ON DUPLICATE KEY UPDATE ON DUPLICATE KEY UPDATE
name = VALUES(name)""", (tag.name,)) name = VALUES(name)""", (tag.name,)))
tag.id_ = c.lastrowid tag.id_ = c.lastrowid
def _create_dto_from_row ( def _create_dto_from_row (
self, self,
row, row: TagRow,
) -> TagDto: ) -> TagDto:
return TagDto (id_ = row['id'], return TagDto (id_ = row['id'],
name = row['name']) name = row['name'])
@@ -701,7 +776,7 @@ class TagDto:
class VideoHistoryDao: class VideoHistoryDao:
def __init__ ( def __init__ (
self, self,
conn, conn: MySQLConnectionAbstract,
): ):
self.conn = conn self.conn = conn
@@ -711,7 +786,7 @@ class VideoHistoryDao:
with_relation_tables: bool, with_relation_tables: bool,
) -> list[VideoHistoryDto]: ) -> list[VideoHistoryDto]:
with self.conn.cursor (dictionary = True) as c: with self.conn.cursor (dictionary = True) as c:
c.execute (""" print (c.execute ("""
SELECT SELECT
id, id,
video_id, video_id,
@@ -720,9 +795,9 @@ class VideoHistoryDao:
FROM FROM
video_histories video_histories
WHERE WHERE
video_id = %s""", (video_id,)) video_id = %s""", (video_id,)))
video_histories: list[VideoHistoryDto] = [] video_histories: list[VideoHistoryDto] = []
for row in c.fetchall (): for row in cast (list[VideoHistoryRow], c.fetchall ()):
video_histories.append (self._create_dto_from_row (row, with_relation_tables)) video_histories.append (self._create_dto_from_row (row, with_relation_tables))
return video_histories return video_histories
@@ -731,7 +806,7 @@ class VideoHistoryDao:
video_history: VideoHistoryDto, video_history: VideoHistoryDto,
) -> None: ) -> None:
with self.conn.cursor (dictionary = True) as c: with self.conn.cursor (dictionary = True) as c:
c.execute (""" print (c.execute ("""
INSERT INTO INSERT INTO
video_histories( video_histories(
video_id, video_id,
@@ -743,14 +818,14 @@ class VideoHistoryDao:
%s, %s,
%s)""", (video_history.video_id, %s)""", (video_history.video_id,
video_history.fetched_at, video_history.fetched_at,
video_history.views_count)) video_history.views_count)))
def upsert ( def upsert (
self, self,
video_history: VideoHistoryDto, video_history: VideoHistoryDto,
) -> None: ) -> None:
with self.conn.cursor (dictionary = True) as c: with self.conn.cursor (dictionary = True) as c:
c.execute (""" print (c.execute ("""
INSERT INTO INSERT INTO
video_histories( video_histories(
video_id, video_id,
@@ -766,7 +841,7 @@ class VideoHistoryDao:
fetched_at, fetched_at,
views_count""", (video_history.video_id, views_count""", (video_history.video_id,
video_history.fetched_at, video_history.fetched_at,
video_history.views_count)) video_history.views_count)))
def upsert_all ( def upsert_all (
self, self,
@@ -777,7 +852,7 @@ class VideoHistoryDao:
def _create_dto_from_row ( def _create_dto_from_row (
self, self,
row, row: VideoHistoryRow,
with_relation_tables: bool, with_relation_tables: bool,
) -> VideoHistoryDto: ) -> VideoHistoryDto:
video_history = VideoHistoryDto (id_ = row['id'], video_history = VideoHistoryDto (id_ = row['id'],
@@ -792,7 +867,7 @@ class VideoHistoryDao:
@dataclass (slots = True) @dataclass (slots = True)
class VideoHistoryDto: class VideoHistoryDto:
video_id: int video_id: int
fetched_at: datetime fetched_at: date
views_count: int views_count: int
id_: int | None = None id_: int | None = None
video: VideoDto | None = None video: VideoDto | None = None
@@ -801,7 +876,7 @@ class VideoHistoryDto:
class CommentDao: class CommentDao:
def __init__ ( def __init__ (
self, self,
conn, conn: MySQLConnectionAbstract,
): ):
self.conn = conn self.conn = conn
@@ -811,7 +886,7 @@ class CommentDao:
with_relation_tables: bool, with_relation_tables: bool,
) -> list[CommentDto]: ) -> list[CommentDto]:
with self.conn.cursor (dictionary = True) as c: with self.conn.cursor (dictionary = True) as c:
c.execute (""" print (c.execute ("""
SELECT SELECT
id, id,
video_id, video_id,
@@ -824,9 +899,9 @@ class CommentDao:
FROM FROM
comments comments
WHERE WHERE
video_id = %s""", (video_id,)) video_id = %s""", (video_id,)))
comments: list[CommentDto] = [] comments: list[CommentDto] = []
for row in c.fetchall (): for row in cast (list[CommentRow], c.fetchall ()):
comments.append (self._create_dto_from_row (row, with_relation_tables)) comments.append (self._create_dto_from_row (row, with_relation_tables))
return comments return comments
@@ -835,8 +910,15 @@ class CommentDao:
comment: CommentDto, comment: CommentDto,
with_relation_tables: bool, with_relation_tables: bool,
) -> None: ) -> None:
vpos_ms: int | DbNullType | None = comment.vpos_ms
if vpos_ms is None:
raise TypeError ('未実装')
if vpos_ms is DbNull:
vpos_ms = None
vpos_ms = cast (int | None, vpos_ms)
with self.conn.cursor (dictionary = True) as c: with self.conn.cursor (dictionary = True) as c:
c.execute (""" print (c.execute ("""
INSERT INTO INSERT INTO
comments( comments(
video_id, video_id,
@@ -868,7 +950,7 @@ class CommentDao:
comment.content, comment.content,
comment.posted_at, comment.posted_at,
comment.nico_count, comment.nico_count,
comment.vpos_ms)) vpos_ms)))
def upsert_all ( def upsert_all (
self, self,
@@ -880,7 +962,7 @@ class CommentDao:
def _create_dto_from_row ( def _create_dto_from_row (
self, self,
row, row: CommentRow,
with_relation_tables: bool, with_relation_tables: bool,
) -> CommentDto: ) -> CommentDto:
comment = CommentDto (id_ = row['id'], comment = CommentDto (id_ = row['id'],
@@ -890,7 +972,7 @@ class CommentDao:
content = row['content'], content = row['content'],
posted_at = row['posted_at'], posted_at = row['posted_at'],
nico_count = row['nico_count'], nico_count = row['nico_count'],
vpos_ms = row['vpos_ms']) vpos_ms = row['vpos_ms'] or DbNull)
if with_relation_tables: if with_relation_tables:
comment.video = VideoDao (self.conn).find (comment.video_id, True) comment.video = VideoDao (self.conn).find (comment.video_id, True)
return comment return comment
@@ -913,7 +995,7 @@ class CommentDto:
class UserDao: class UserDao:
def __init__ ( def __init__ (
self, self,
conn, conn: MySQLConnectionAbstract,
): ):
self.conn = conn self.conn = conn
@@ -922,15 +1004,15 @@ class UserDao:
user_code: str user_code: str
) -> UserDto | None: ) -> UserDto | None:
with self.conn.cursor (dictionary = True) as c: with self.conn.cursor (dictionary = True) as c:
c.execute (""" print (c.execute ("""
SELECT SELECT
id, id,
code code
FROM FROM
users users
WHERE WHERE
code = %s""", (user_code,)) code = %s""", (user_code,)))
row = c.fetchone () row = cast (UserRow | None, c.fetchone ())
if row is None: if row is None:
return None return None
return self._create_dto_from_row (row) return self._create_dto_from_row (row)
@@ -940,16 +1022,16 @@ class UserDao:
user: UserDto, user: UserDto,
) -> None: ) -> None:
with self.conn.cursor (dictionary = True) as c: with self.conn.cursor (dictionary = True) as c:
c.execute (""" print (c.execute ("""
INSERT INTO INSERT INTO
users(code) users(code)
VALUES VALUES
(%s)""", (user.code,)) (%s)""", (user.code,)))
user.id_ = c.lastrowid user.id_ = c.lastrowid
def _create_dto_from_row ( def _create_dto_from_row (
self, self,
row, row: UserRow,
) -> UserDto: ) -> UserDto:
return UserDto (id_ = row['id'], return UserDto (id_ = row['id'],
code = row['code']) code = row['code'])