Browse Source

本日作業分

feature/query
みてるぞ 1 month ago
parent
commit
48f8b35b93
1 changed files with 143 additions and 24 deletions
  1. +143
    -24
      update_db.py

+ 143
- 24
update_db.py View File

@@ -16,6 +16,23 @@ def main (
user = os.environ['MYSQL_USER'],
password = os.environ['MYSQL_PASS'])

video_dao = VideoDao (conn)

api_data = search_nico_by_tags (['伊地知ニジカ', 'ぼざろクリーチャーシリーズ'])

update_video_table (video_dao, api_data)

# TODO: 書くこと


def update_video_table (
video_dao,
api_data: list[dict],
) -> None:
videos = video_dao.fetch_all ()

# TODO: 書くこと


def fetch_comments (
video_id: str,
@@ -36,7 +53,7 @@ def fetch_comments (

try:
nv_comment = res['data']['comment']['nvComment']
except (NameError, AttributeError):
except KeyError:
return []
if nv_comment is None:
return []
@@ -58,7 +75,7 @@ def fetch_comments (

try:
return res['data']['threads'][1]['comments']
except (NameError, AttributeError):
except (IndexError, KeyError):
return []


@@ -74,6 +91,8 @@ def search_nico_by_tags (
url = ('https://snapshot.search.nicovideo.jp'
+ '/api/v2/snapshot/video/contents/search')

# TODO: 年月日の設定ができてゐなぃのと,100 件までしか取得できなぃので何とかすること

query_filter = json.dumps ({ 'type': 'or',
'filters': [
{ 'type': 'range',
@@ -102,7 +121,33 @@ class VideoDao:
):
self.conn = conn

def fetch (
self,
video_id: int,
with_relation_tables: bool = True,
) -> VideoDto | None:
with self.conn.cursor () as c:
c.execute ("""
SELECT
id,
code,
title,
description,
uploaded_at,
deleted_at
FROM
videos
WHERE
id = %s
ORDER BY
id""", video_id)
row = c.fetchone ()
if row is None:
return None
return self._create_dto_from_row (row, with_relation_tables)

def fetch_all (
self,
with_relation_tables: bool = True,
) -> list[VideoDto]:
with self.conn.cursor () as c:
@@ -115,25 +160,39 @@ class VideoDao:
uploaded_at,
deleted_at
FROM
videos""")
for video in c.fetchall ():
if with_relation_tables:
video_tags = VideoTagDao (conn).fetch_all (False)
comments = CommentDao (conn).fetch_all (False)
video_histories = VideoHistoryDao (conn).fetch_all (False)
else:
video_tags = None
comments = None
video_histories = None
return VideoDto (id_ = video['id'],
code = video['code'],
title = video['title'],
description = video['description'],
uploaded_at = video['uploaded_at'],
deleted_at = video['deleted_at'],
video_tags = video_tags,
comments = comments,
video_histories = video_histories)
videos
ORDER BY
id""")
videos: list[VideoDto] = []
for row in c.fetchall ():
videos.append (self._create_dto_from_row (row, with_relation_tables))
return videos

def _create_dto_from_row (
self,
row,
with_relation_tables: bool,
) -> VideoDto:
video = VideoDto (id_ = row['id'],
code = row['code'],
title = row['title'],
description = row['description'],
uploaded_at = row['uploaded_at'],
deleted_at = row['deleted_at'],
video_tags = None,
comments = None,
video_histories = None)
if with_relation_tables:
video.video_tags = VideoTagDao (self.conn).fetch_by_video_id (video.id_, False)
for i in range (len (video.video_tags)):
video.video_tags[i].video = video
video.comments = CommentDao (self.conn).fetch_by_video_id (video.id_, False)
for i in range (len (video.comments)):
video.comments[i].video = video
video.video_histories = VideoHistoryDao (self.conn).fetch_by_video_id (video.id_, False)
for i in range (len (video.video_histories)):
video.video_histories[i].video = video
return video


@dataclass (slots = True)
@@ -144,9 +203,69 @@ class VideoDto:
description: str
uploaded_at: datetime
deleted_at: datetime | None
video_tags: VideoTagDto | None
comments: CommentDto | None
video_histories: VideoHistoryDto | None
video_tags: list[VideoTagDto] | None
comments: list[CommentDto] | None
video_histories: list[VideoHistoryDto] | None


class VideoTagDao:
def __init__ (
self,
conn,
):
self.conn = conn

def fetch_by_video_id (
self,
video_id: int,
with_relation_tables: bool = True,
) -> list[VideoTagDto]:
with self.conn.cursor () as c:
c.execute ("""
SELECT
id,
video_id,
tag_id,
tagged_at,
untagged_at
FROM
video_tags
WHERE
video_id = %s
ORDER BY
id""", video_id)
video_tags: list[VideoTagDto] = []
for row in c.fetchall ():
video_tags.append (self._create_dto_from_row (row, with_relation_tables))
return video_tags

def _create_dto_from_row (
self,
row,
with_relation_tables: bool,
) -> VideoTagDto:
video_tag = VideoTagDto (id_ = row['id'],
video_id = row['video_id'],
tag_id = row['tag_id'],
tagged_at = row['tagged_at'],
untagged_at = row['untagged_at'],
video = None,
tag = None)
if with_relation_tables:
video_tag.video = VideoDao (self.conn).fetch (video_tag.video_id, True)
video_tag.tag = TagDao (self.conn).fetch (video_tag.tag_id, True)
return video_tag


@dataclass (slots = True)
class VideoTagDto:
id_: int
video_id: int
tag_id: int
tagged_at: datetime
untagged_at: datetime
video: VideoDto
tag: TagDto


if __name__ == '__main__':


Loading…
Cancel
Save