diff --git a/update_db.py b/update_db.py index 3b4a8db..cdbf138 100644 --- a/update_db.py +++ b/update_db.py @@ -27,102 +27,177 @@ db = DatabaseManager (config) Model.set_connection_resolver (db) -class VideoSearchParam (TypedDict): - q: str - targets: str - _sort: str - fields: str - _limit: int - jsonFilter: str +def main ( +) -> None: + now = datetime.now () + api_data = search_nico_by_tags (['伊地知ニジカ', 'ぼざろクリーチャーシリーズ']) + update_tables (api_data, now) -class VideoResult (TypedDict): - contentId: str - title: str - tags: str - description: str | None - viewCounter: int - startTime: str +def update_tables ( + api_data: list[VideoResult], + now: datetime, +) -> None: + alive_video_codes: list[str] = [] -class CommentResult (TypedDict): - id: str - no: int - vposMs: int - body: str - commands: list[str] - userId: str - isPremium: bool - score: int - postedAt: str - nicoruCount: int - nicoruId: Any - source: str - isMyPost: bool + for datum in api_data: + tag_names: list[str] = datum['tags'].split () + video = Video () + video.code = datum['contentId'] + video.title = datum['title'] + video.description = datum['description'] or '' + video.uploaded_at = datetime.fromisoformat (datum['startTime']) + video.deleted_at = None + video.upsert () + alive_video_codes.append (video.code) + video_history = VideoHistory () + video_history.video_id = video.id + video_history.fetched_at = now + video_history.views_count = datum['viewCounter'] + video_history.save () + video_tags = video.video_tags.where_not_null ('untagged_at').get () + for video_tag in video_tags: + tag = video_tag.tag + if tag is not None and tag.name not in tag_names: + video_tag.untagged_at = now + video_tag.save () + tags: list[Tag] = [] + for tag_name in tag_names: + tag = Tag.where ('name', tag_name).first () + if tag is None: + tag = Tag () + tag.name = tag_name + tag.save () + video_tag = (Video.where ('video_id', video.id) + .where ('tag_id', tag.id) + .where_null ('untagged_at') + .first ()) + if video_tag is None: + video_tag = VideoTag () + video_tag.video_id = video.id + video_tag.tag_id = tag.id + video_tag.tagged_at = now + video_tag.untagged_at = None + video_tag.save () + for com in fetch_comments (video.code): + user = User.where ('code', com['userId']).first () + if user is None: + user = User () + user.code = com['userId'] + user.save () + comment = Comment () + comment.video_id = video.id + comment.comment_no = com['no'] + comment.user_id = user.id + comment.content = com['body'] + comment.posted_at = datetime.fromisoformat (com['postedAt']) + comment.nico_count = com['nicoruCount'] + comment.vpos_ms = com['vposMs'] + comment.upsert () + + # 削除動画 + videos = (Video.where_not_in ('code', alive_video_codes) + .where_null ('deleted_at') + .get ()) + for video in videos: + if video.code not in alive_video_codes: + video.deleted_at = now + video.save () -class CommentRow (TypedDict): - id: int - video_id: int - comment_no: int - user_id: int - content: str - posted_at: datetime - nico_count: int - vpos_ms: int | None +def fetch_comments ( + video_code: str, +) -> list[CommentResult]: + time.sleep (1.2) + headers = { 'X-Frontend-Id': '6', + 'X-Frontend-Version': '0' } -class TagRow (TypedDict): - id: int - name: str + action_track_id = ( + ''.join (random.choice (string.ascii_letters + string.digits) + for _ in range (10)) + + '_' + + str (random.randrange (10 ** 12, 10 ** 13))) + url = (f"https://www.nicovideo.jp/api/watch/v3_guest/{ video_code }" + + f"?actionTrackId={ action_track_id }") -class UserRow (TypedDict): - id: int - code: str + res = requests.post (url, headers = headers, timeout = 60).json () + try: + nv_comment = res['data']['comment']['nvComment'] + except KeyError: + return [] + if nv_comment is None: + return [] -class VideoRow (TypedDict): - id: int - code: str - title: str - description: str - uploaded_at: datetime - deleted_at: datetime | None + headers = { 'X-Frontend-Id': '6', + 'X-Frontend-Version': '0', + 'Content-Type': 'application/json' } + params = { 'params': nv_comment['params'], + 'additionals': { }, + 'threadKey': nv_comment['threadKey'] } -class VideoHistoryRow (TypedDict): - id: int - video_id: int - fetched_at: date - views_count: int + url = nv_comment['server'] + '/v1/threads' + res = (requests.post (url, json.dumps (params), + headers = headers, + timeout = 60) + .json ()) -class VideoTagRow (TypedDict): - id: int - video_id: int - tag_id: int - tagged_at: date - untagged_at: date | None + try: + return res['data']['threads'][1]['comments'] + except (IndexError, KeyError): + return [] -class DbConfig (TypedDict): - driver: str - host: str - database: str - user: str - password: str - prefix: str +def search_nico_by_tag ( + tag: str, +) -> list[VideoResult]: + return search_nico_by_tags ([tag]) -def main ( -) -> None: - api_data = search_nico_by_tags (['伊地知ニジカ', 'ぼざろクリーチャーシリーズ']) +def search_nico_by_tags ( + tags: list[str], +) -> list[VideoResult]: + today = datetime.now () - update_tables (api_data, now) + url = ('https://snapshot.search.nicovideo.jp' + + '/api/v2/snapshot/video/contents/search') + + result_data: list[VideoResult] = [] + to = datetime (2022, 12, 3) + while to <= today: + time.sleep (1.2) + until = to + timedelta (days = 14) + query_filter = json.dumps ({ 'type': 'or', + 'filters': [ + { 'type': 'range', + 'field': 'startTime', + 'from': '%04d-%02d-%02dT00:00:00+09:00' % (to.year, to.month, to.day), + 'to': '%04d-%02d-%02dT23:59:59+09:00' % (until.year, until.month, until.day), + 'include_lower': True }] }) + params: VideoSearchParam = { 'q': ' OR '.join (tags), + 'targets': 'tagsExact', + '_sort': '-viewCounter', + 'fields': ('contentId,' + 'title,' + 'tags,' + 'description,' + 'viewCounter,' + 'startTime'), + '_limit': 100, + 'jsonFilter': query_filter } + res = requests.get (url, params = cast (dict[str, int | str], params), timeout = 60).json () + try: + result_data += res['data'] + except KeyError: + pass + to = until + timedelta (days = 1) - conn.commit () - conn.close () + return result_data class Comment (Model): @@ -150,18 +225,12 @@ class Tag (Model): ) -> DynamicProperty: return self.has_many (VideoTag) - @property - def videos ( - self, - ) -> DynamicProperty: - return self.belongs_to_many (Video) - class User (Model): __timestamps__ = False @property - del comments ( + def comments ( self, ) -> DynamicProperty: return self.has_many (Comment) @@ -182,12 +251,6 @@ class Video (Model): ) -> DynamicProperty: return self.has_many (VideoTag) - @property - def tags ( - self, - ) -> DynamicProperty: - return self.belongs_to_many (Tag) - @property def comments ( self, @@ -251,841 +314,92 @@ class VideoTag (Model): self.save () -def update_tables ( - api_data: list[VideoResult], - now: datetime, -) -> None: - video_ids: list[int] = [] - for datum in api_data: - tag_names: list[str] = datum['tags'].split () - video = Video () - video.code = datum['contentId'], - video.title = datum['title'], - video.description = datum['description'] or '', - video.uploaded_at = datetime.fromisoformat (datum['startTime']) - video.upsert () - if video.id is not None: - video_ids.append (video.id_) - video_history = VideoHistoryDto (video_id = video.id_, - fetched_at = now, - views_count = datum['viewCounter']) - video_history_dao.insert (video_history) - tag_ids: list[int] = [] - video_tags = video_tag_dao.fetch_alive_by_video_id (video.id_, False) - for vt in video_tags: - tag = tag_dao.find (vt.tag_id) - if (tag is not None - and (tag.name not in tag_names) - and (tag.id_ is not None)): - tag_ids.append (tag.id_) - video_tag_dao.untag_all (video.id_, tag_ids, now) - tags: list[TagDto] = [] - for tag_name in tag_names: - tag = tag_dao.fetch_by_name (tag_name) - if tag is None: - tag = TagDto (name = tag_name) - tag_dao.insert (tag) - if video.id_ is not None and tag.id_ is not None: - video_tag = video_tag_dao.fetch_alive_by_ids (video.id_, tag.id_, False) - if video_tag is None: - video_tag = VideoTagDto (video_id = video.id_, - tag_id = tag.id_, - tagged_at = now) - video_tag_dao.insert (video_tag, False) - for com in fetch_comments (video.code): - user = user_dao.fetch_by_code (com['userId']) - if user is None: - user = UserDto (code = com['userId']) - user_dao.insert (user) - if video.id_ is not None and user.id_ is not None: - comment = CommentDto (video_id = video.id_, - comment_no = com['no'], - user_id = user.id_, - content = com['body'], - posted_at = datetime.fromisoformat (com['postedAt']), - nico_count = com['nicoruCount'], - vpos_ms = com['vposMs']) - comment_dao.upsert (comment, False) - - alive_video_codes = [d['contentId'] for d in api_data] - - lost_video_ids: list[int] = [] - videos = video_dao.fetch_alive () - for video in videos: - if video.id_ is not None and video.code not in alive_video_codes: - lost_video_ids.append (video.id_) - - video_dao.delete (lost_video_ids, now) - - -def fetch_comments ( - video_code: str, -) -> list[CommentResult]: - time.sleep (1.2) - - headers = { 'X-Frontend-Id': '6', - 'X-Frontend-Version': '0' } - - action_track_id = ( - ''.join (random.choice (string.ascii_letters + string.digits) - for _ in range (10)) - + '_' - + str (random.randrange (10 ** 12, 10 ** 13))) - - url = (f"https://www.nicovideo.jp/api/watch/v3_guest/{ video_code }" - + f"?actionTrackId={ action_track_id }") - - res = requests.post (url, headers = headers, timeout = 60).json () - - try: - nv_comment = res['data']['comment']['nvComment'] - except KeyError: - return [] - if nv_comment is None: - return [] - - headers = { 'X-Frontend-Id': '6', - 'X-Frontend-Version': '0', - 'Content-Type': 'application/json' } - - params = { 'params': nv_comment['params'], - 'additionals': { }, - 'threadKey': nv_comment['threadKey'] } - - url = nv_comment['server'] + '/v1/threads' - - res = (requests.post (url, json.dumps (params), - headers = headers, - timeout = 60) - .json ()) - - try: - return res['data']['threads'][1]['comments'] - except (IndexError, KeyError): - return [] - - -def search_nico_by_tag ( - tag: str, -) -> list[VideoResult]: - return search_nico_by_tags ([tag]) +class DbConfig (TypedDict): + driver: str + host: str + database: str + user: str + password: str + prefix: str -def search_nico_by_tags ( - tags: list[str], -) -> list[VideoResult]: - today = datetime.now () +class VideoSearchParam (TypedDict): + q: str + targets: str + _sort: str + fields: str + _limit: int + jsonFilter: str - url = ('https://snapshot.search.nicovideo.jp' - + '/api/v2/snapshot/video/contents/search') - result_data: list[VideoResult] = [] - to = datetime (2022, 12, 3) - while to <= today: - time.sleep (1.2) - until = to + timedelta (days = 14) - query_filter = json.dumps ({ 'type': 'or', - 'filters': [ - { 'type': 'range', - 'field': 'startTime', - 'from': '%04d-%02d-%02dT00:00:00+09:00' % (to.year, to.month, to.day), - 'to': '%04d-%02d-%02dT23:59:59+09:00' % (until.year, until.month, until.day), - 'include_lower': True }] }) - params: VideoSearchParam = { 'q': ' OR '.join (tags), - 'targets': 'tagsExact', - '_sort': '-viewCounter', - 'fields': ('contentId,' - 'title,' - 'tags,' - 'description,' - 'viewCounter,' - 'startTime'), - '_limit': 100, - 'jsonFilter': query_filter } - res = requests.get (url, params = cast (dict[str, int | str], params), timeout = 60).json () - try: - result_data += res['data'] - except KeyError: - pass - to = until + timedelta (days = 1) +class VideoResult (TypedDict): + contentId: str + title: str + tags: str + description: str | None + viewCounter: int + startTime: str - return result_data +class CommentResult (TypedDict): + id: str + no: int + vposMs: int + body: str + commands: list[str] + userId: str + isPremium: bool + score: int + postedAt: str + nicoruCount: int + nicoruId: Any + source: str + isMyPost: bool -class VideoDao: - def find ( - self, - video_id: int, - with_relation_tables: bool, - ) -> VideoDto | None: - with self.conn.cursor (dictionary = True) as c: - c.execute (""" - SELECT - id, - code, - title, - description, - uploaded_at, - deleted_at - FROM - videos - WHERE - id = %s - ORDER BY - id""", (video_id,)) - print (c._executed) - row = cast (VideoRow | None, c.fetchone ()) - if row is None: - return None - return self._create_dto_from_row (row, with_relation_tables) - - def fetch_all ( - self, - with_relation_tables: bool, - ) -> list[VideoDto]: - with self.conn.cursor (dictionary = True) as c: - c.execute (""" - SELECT - id, - code, - title, - description, - uploaded_at, - deleted_at - FROM - videos - ORDER BY - id""") - print (c._executed) - videos: list[VideoDto] = [] - for row in cast (list[VideoRow], c.fetchall ()): - videos.append (self._create_dto_from_row (row, with_relation_tables)) - return videos - - def fetch_alive ( - self, - ) -> list[VideoDto]: - with self.conn.cursor (dictionary = True) as c: - c.execute (""" - SELECT - id, - code, - title, - description, - uploaded_at, - deleted_at - FROM - videos - WHERE - deleted_at IS NULL""") - print (c._executed) - videos: list[VideoDto] = [] - for row in cast (list[VideoRow], c.fetchall ()): - videos.append (self._create_dto_from_row (row, False)) - return videos - def upsert ( - self, - video: VideoDto, - with_relation_tables: bool, - ) -> None: - deleted_at: datetime | DbNullType | None = video.deleted_at - if deleted_at is None: - raise TypeError ('未実装') - if deleted_at is DbNull: - deleted_at = None - deleted_at = cast (datetime | None, deleted_at) - - with self.conn.cursor (dictionary = True) as c: - c.execute (""" - INSERT INTO - videos( - code, - title, - description, - uploaded_at, - deleted_at) - VALUES - ( - %s, - %s, - %s, - %s, - %s) - ON DUPLICATE KEY UPDATE - id = LAST_INSERT_ID(id), - code = VALUES(code), - title = VALUES(title), - description = VALUES(description), - uploaded_at = VALUES(uploaded_at), - deleted_at = VALUES(deleted_at)""", (video.code, - video.title, - video.description, - video.uploaded_at, - deleted_at)) - print (c._executed) - video.id_ = c.lastrowid - if with_relation_tables: - if video.video_tags is not None: - VideoTagDao (self.conn).upsert_all (video.video_tags, False) - if video.comments is not None: - CommentDao (self.conn).upsert_all (video.comments, False) - if video.video_histories is not None: - VideoHistoryDao (self.conn).upsert_all (video.video_histories) - - def upsert_all ( - self, - videos: list[VideoDto], - with_relation_tables: bool, - ) -> None: - for video in videos: - self.upsert (video, with_relation_tables) +class CommentRow (TypedDict): + id: int + video_id: int + comment_no: int + user_id: int + content: str + posted_at: datetime + nico_count: int + vpos_ms: int | None - def delete ( - self, - video_ids: list[int], - at: datetime, - ) -> None: - if not video_ids: - return - with self.conn.cursor (dictionary = True) as c: - c.execute (""" - UPDATE - videos - SET - deleted_at = %%s - WHERE - id IN (%s)""" % ', '.join (['%s'] * len (video_ids)), (at, *video_ids)) - print (c._executed) - - def _create_dto_from_row ( - self, - row: VideoRow, - with_relation_tables: bool, - ) -> VideoDto: - video = VideoDto (id_ = row['id'], - code = row['code'], - title = row['title'], - description = row['description'], - uploaded_at = row['uploaded_at'], - deleted_at = row['deleted_at'] or DbNull) - if with_relation_tables and video.id_ is not None: - video.video_tags = VideoTagDao (self.conn).fetch_by_video_id (video.id_, False) - for i in range (len (video.video_tags)): - video.video_tags[i].video = video - video.comments = CommentDao (self.conn).fetch_by_video_id (video.id_, False) - for i in range (len (video.comments)): - video.comments[i].video = video - video.video_histories = VideoHistoryDao (self.conn).fetch_by_video_id (video.id_, False) - for i in range (len (video.video_histories)): - video.video_histories[i].video = video - return video - - -class VideoTagDao: - def fetch_by_video_id ( - self, - video_id: int, - with_relation_tables: bool, - ) -> list[VideoTagDto]: - with self.conn.cursor (dictionary = True) as c: - c.execute (""" - SELECT - id, - video_id, - tag_id, - tagged_at, - untagged_at - FROM - video_tags - WHERE - video_id = %s - ORDER BY - id""", (video_id,)) - print (c._executed) - video_tags: list[VideoTagDto] = [] - for row in cast (list[VideoTagRow], c.fetchall ()): - video_tags.append (self._create_dto_from_row (row, with_relation_tables)) - return video_tags - - def fetch_alive_by_video_id ( - self, - video_id: int, - with_relation_tables: bool, - ) -> list[VideoTagDto]: - with self.conn.cursor (dictionary = True) as c: - c.execute (""" - SELECT - id, - video_id, - tag_id, - tagged_at, - untagged_at - FROM - video_tags - WHERE - video_id = %s - AND (untagged_at IS NULL) - ORDER BY - id""", (video_id,)) - print (c._executed) - video_tags: list[VideoTagDto] = [] - for row in cast (list[VideoTagRow], c.fetchall ()): - video_tags.append (self._create_dto_from_row (row, with_relation_tables)) - return video_tags - - def fetch_alive_by_ids ( - self, - video_id: int, - tag_id: int, - with_relation_tables: bool, - ) -> VideoTagDto | None: - with self.conn.cursor (dictionary = True) as c: - c.execute (""" - SELECT - id, - video_id, - tag_id, - tagged_at, - untagged_at - FROM - video_tags - WHERE - video_id = %s - AND tag_id = %s""", (video_id, tag_id)) - print (c._executed) - row = cast (VideoTagRow, c.fetchone ()) - if row is None: - return None - return self._create_dto_from_row (row, with_relation_tables) - - def insert ( - self, - video_tag: VideoTagDto, - with_relation_tables: bool, - ) -> None: - untagged_at: date | DbNullType | None = video_tag.untagged_at - if untagged_at is None: - raise TypeError ('未実装') - if untagged_at is DbNull: - untagged_at = None - untagged_at = cast (date | None, untagged_at) - - with self.conn.cursor (dictionary = True) as c: - c.execute (""" - INSERT INTO - video_tags( - video_id, - tag_id, - tagged_at, - untagged_at) - VALUES - ( - %s, - %s, - %s, - %s)""", (video_tag.video_id, video_tag.tag_id, - video_tag.tagged_at, untagged_at)) - print (c._executed) - video_tag.id_ = c.lastrowid - if with_relation_tables: - if video_tag.video is not None: - VideoDao (self.conn).upsert (video_tag.video, True) - if video_tag.tag is not None: - TagDao (self.conn).upsert (video_tag.tag) - def upsert ( - self, - video_tag: VideoTagDto, - with_relation_tables: bool, - ) -> None: - untagged_at: date | DbNullType | None = video_tag.untagged_at - if untagged_at is None: - raise TypeError ('未実装') - if untagged_at is DbNull: - untagged_at = None - untagged_at = cast (date | None, untagged_at) - - with self.conn.cursor (dictionary = True) as c: - c.execute (""" - INSERT INTO - video_tags( - video_id, - tag_id, - tagged_at, - untagged_at) - VALUES - ( - %s, - %s, - %s, - %s) - ON DUPLICATE KEY UPDATE - id = LAST_INSERT_ID(id), - video_id = VALUES(video_id), - tag_id = VALUES(tag_id), - tagged_at = VALUES(tagged_at), - untagged_at = VALUES(untagged_at)""", (video_tag.video_id, - video_tag.tag_id, - video_tag.tagged_at, - untagged_at)) - print (c._executed) - video_tag.id_ = c.lastrowid - if with_relation_tables: - if video_tag.video is not None: - VideoDao (self.conn).upsert (video_tag.video, True) - if video_tag.tag is not None: - TagDao (self.conn).upsert (video_tag.tag) - - def upsert_all ( - self, - video_tags: list[VideoTagDto], - with_relation_tables: bool, - ) -> None: - for video_tag in video_tags: - self.upsert (video_tag, with_relation_tables) +class TagRow (TypedDict): + id: int + name: str - def untag_all ( - self, - video_id: int, - tag_ids: list[int], - now: datetime, - ) -> None: - if not tag_ids: - return - with self.conn.cursor (dictionary = True) as c: - c.execute (""" - UPDATE - video_tags - SET - untagged_at = %%s - WHERE - video_id = %%s - AND tag_id IN (%s)""" % ', '.join (['%s'] * len (tag_ids)), (now, video_id, *tag_ids)) - print (c._executed) - - def _create_dto_from_row ( - self, - row: VideoTagRow, - with_relation_tables: bool, - ) -> VideoTagDto: - video_tag = VideoTagDto (id_ = row['id'], - video_id = row['video_id'], - tag_id = row['tag_id'], - tagged_at = row['tagged_at'], - untagged_at = row['untagged_at'] or DbNull) - if with_relation_tables: - video_tag.video = VideoDao (self.conn).find (video_tag.video_id, True) - video_tag.tag = TagDao (self.conn).find (video_tag.tag_id) - return video_tag - - -class TagDao: - def find ( - self, - tag_id: int, - ) -> TagDto | None: - with self.conn.cursor (dictionary = True) as c: - c.execute (""" - SELECT - id, - name - FROM - tags - WHERE - id = %s""", (tag_id,)) - print (c._executed) - row = cast (TagRow | None, c.fetchone ()) - if row is None: - return None - return self._create_dto_from_row (row) - - def fetch_by_name ( - self, - tag_name: str, - ) -> TagDto | None: - with self.conn.cursor (dictionary = True) as c: - c.execute (""" - SELECT - id, - name - FROM - tags - WHERE - name = %s""", (tag_name,)) - print (c._executed) - row = cast (TagRow | None, c.fetchone ()) - if row is None: - return None - return self._create_dto_from_row (row) - - def insert ( - self, - tag: TagDto, - ) -> None: - with self.conn.cursor (dictionary = True) as c: - c.execute (""" - INSERT INTO - tags(name) - VALUES - (%s)""", (tag.name,)) - print (c._executed) - tag.id_ = c.lastrowid - def upsert ( - self, - tag: TagDto, - ) -> None: - with self.conn.cursor (dictionary = True) as c: - c.execute (""" - INSERT INTO - tags(name) - VALUES - (%s) - ON DUPLICATE KEY UPDATE - id = LAST_INSERT_ID(id), - name = VALUES(name)""", (tag.name,)) - print (c._executed) - tag.id_ = c.lastrowid - - def _create_dto_from_row ( - self, - row: TagRow, - ) -> TagDto: - return TagDto (id_ = row['id'], - name = row['name']) +class UserRow (TypedDict): + id: int + code: str -class VideoHistoryDao: - def fetch_by_video_id ( - self, - video_id: int, - with_relation_tables: bool, - ) -> list[VideoHistoryDto]: - with self.conn.cursor (dictionary = True) as c: - c.execute (""" - SELECT - id, - video_id, - fetched_at, - views_count - FROM - video_histories - WHERE - video_id = %s""", (video_id,)) - print (c._executed) - video_histories: list[VideoHistoryDto] = [] - for row in cast (list[VideoHistoryRow], c.fetchall ()): - video_histories.append (self._create_dto_from_row (row, with_relation_tables)) - return video_histories - - def insert ( - self, - video_history: VideoHistoryDto, - ) -> None: - with self.conn.cursor (dictionary = True) as c: - c.execute (""" - INSERT INTO - video_histories( - video_id, - fetched_at, - views_count) - VALUES - ( - %s, - %s, - %s)""", (video_history.video_id, - video_history.fetched_at, - video_history.views_count)) - print (c._executed) +class VideoRow (TypedDict): + id: int + code: str + title: str + description: str + uploaded_at: datetime + deleted_at: datetime | None - def upsert ( - self, - video_history: VideoHistoryDto, - ) -> None: - with self.conn.cursor (dictionary = True) as c: - c.execute (""" - INSERT INTO - video_histories( - video_id, - fetched_at, - views_count) - VALUES - ( - %s, - %s, - %s) - ON DUPLICATE KEY UPDATE - id = LAST_INSERT_ID(id), - video_id = VALUES(video_id), - fetched_at = VALUES(fetched_at), - views_count = VALUES(views_count)""", (video_history.video_id, - video_history.fetched_at, - video_history.views_count)) - print (c._executed) - - def upsert_all ( - self, - video_histories: list[VideoHistoryDto], - ) -> None: - for video_history in video_histories: - self.upsert (video_history) - def _create_dto_from_row ( - self, - row: VideoHistoryRow, - with_relation_tables: bool, - ) -> VideoHistoryDto: - video_history = VideoHistoryDto (id_ = row['id'], - video_id = row['video_id'], - fetched_at = row['fetched_at'], - views_count = row['views_count']) - if with_relation_tables: - video_history.video = VideoDao (self.conn).find (video_history.video_id, True) - return video_history - - -class CommentDao: - def fetch_by_video_id ( - self, - video_id: int, - with_relation_tables: bool, - ) -> list[CommentDto]: - with self.conn.cursor (dictionary = True) as c: - c.execute (""" - SELECT - id, - video_id, - comment_no, - user_id, - content, - posted_at, - nico_count, - vpos_ms - FROM - comments - WHERE - video_id = %s""", (video_id,)) - print (c._executed) - comments: list[CommentDto] = [] - for row in cast (list[CommentRow], c.fetchall ()): - comments.append (self._create_dto_from_row (row, with_relation_tables)) - return comments +class VideoHistoryRow (TypedDict): + id: int + video_id: int + fetched_at: date + views_count: int - def upsert ( - self, - comment: CommentDto, - with_relation_tables: bool, - ) -> None: - vpos_ms: int | DbNullType | None = comment.vpos_ms - if vpos_ms is None: - raise TypeError ('未実装') - if vpos_ms is DbNull: - vpos_ms = None - vpos_ms = cast (int | None, vpos_ms) - - with self.conn.cursor (dictionary = True) as c: - c.execute (""" - INSERT INTO - comments( - video_id, - comment_no, - user_id, - content, - posted_at, - nico_count, - vpos_ms) - VALUES - ( - %s, - %s, - %s, - %s, - %s, - %s, - %s) - ON DUPLICATE KEY UPDATE - id = LAST_INSERT_ID(id), - video_id = VALUES(video_id), - comment_no = VALUES(comment_no), - user_id = VALUES(user_id), - content = VALUES(content), - posted_at = VALUES(posted_at), - nico_count = VALUES(nico_count), - vpos_ms = VALUES(vpos_ms)""", (comment.video_id, - comment.comment_no, - comment.user_id, - comment.content, - comment.posted_at, - comment.nico_count, - vpos_ms)) - print (c._executed) - - def upsert_all ( - self, - comments: list[CommentDto], - with_relation_tables: bool, - ) -> None: - for comment in comments: - self.upsert (comment, with_relation_tables) - def _create_dto_from_row ( - self, - row: CommentRow, - with_relation_tables: bool, - ) -> CommentDto: - comment = CommentDto (id_ = row['id'], - video_id = row['video_id'], - comment_no = row['comment_no'], - user_id = row['user_id'], - content = row['content'], - posted_at = row['posted_at'], - nico_count = row['nico_count'], - vpos_ms = row['vpos_ms'] or DbNull) - if with_relation_tables: - comment.video = VideoDao (self.conn).find (comment.video_id, True) - return comment - - -class UserDao: - def fetch_by_code ( - self, - user_code: str - ) -> UserDto | None: - with self.conn.cursor (dictionary = True) as c: - c.execute (""" - SELECT - id, - code - FROM - users - WHERE - code = %s""", (user_code,)) - print (c._executed) - row = cast (UserRow | None, c.fetchone ()) - if row is None: - return None - return self._create_dto_from_row (row) - - def insert ( - self, - user: UserDto, - ) -> None: - with self.conn.cursor (dictionary = True) as c: - c.execute (""" - INSERT INTO - users(code) - VALUES - (%s)""", (user.code,)) - print (c.execute) - user.id_ = c.lastrowid - - def _create_dto_from_row ( - self, - row: UserRow, - ) -> UserDto: - return UserDto (id_ = row['id'], - code = row['code']) +class VideoTagRow (TypedDict): + id: int + video_id: int + tag_id: int + tagged_at: date + untagged_at: date | None if __name__ == '__main__':