|
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072 |
- """
- 日次で実行し,ぼざクリ DB を最新に更新する.
- """
-
- from __future__ import annotations
-
- import json
- import os
- import random
- import string
- import time
- from dataclasses import dataclass
- from datetime import date, datetime, timedelta
- from typing import Any, Type, TypedDict, cast
-
- import mysql.connector
- import requests
- from mysql.connector.connection import MySQLConnectionAbstract
-
-
- class DbNull:
- def __new__ (
- cls,
- ):
- delattr (cls, '__init__')
- DbNullType = Type[DbNull]
-
-
- class VideoSearchParam (TypedDict):
- q: str
- targets: str
- _sort: str
- fields: str
- _limit: int
- jsonFilter: str
-
-
- class VideoResult (TypedDict):
- contentId: str
- title: str
- tags: str
- description: str | None
- viewCounter: int
- startTime: str
-
-
- class CommentResult (TypedDict):
- id: str
- no: int
- vposMs: int
- body: str
- commands: list[str]
- userId: str
- isPremium: bool
- score: int
- postedAt: str
- nicoruCount: int
- nicoruId: Any
- source: str
- isMyPost: bool
-
-
- class CommentRow (TypedDict):
- id: int
- video_id: int
- comment_no: int
- user_id: int
- content: str
- posted_at: datetime
- nico_count: int
- vpos_ms: int | None
-
-
- class TagRow (TypedDict):
- id: int
- name: str
-
-
- class UserRow (TypedDict):
- id: int
- code: str
-
-
- class VideoRow (TypedDict):
- id: int
- code: str
- title: str
- description: str
- uploaded_at: datetime
- deleted_at: datetime | None
-
-
- class VideoHistoryRow (TypedDict):
- id: int
- video_id: int
- fetched_at: date
- views_count: int
-
-
- class VideoTagRow (TypedDict):
- id: int
- video_id: int
- tag_id: int
- tagged_at: date
- untagged_at: date | None
-
-
- def main (
- ) -> None:
- conn = mysql.connector.connect (user = os.environ['MYSQL_USER'],
- password = os.environ['MYSQL_PASS'],
- database = 'nizika_nico')
- if not isinstance (conn, MySQLConnectionAbstract):
- raise TypeError
-
- now = datetime.now ()
-
- video_dao = VideoDao (conn)
- tag_dao = TagDao (conn)
- video_tag_dao = VideoTagDao (conn)
- video_history_dao = VideoHistoryDao (conn)
- comment_dao = CommentDao (conn)
- user_dao = UserDao (conn)
-
- api_data = search_nico_by_tags (['伊地知ニジカ', 'ぼざろクリーチャーシリーズ'])
-
- update_tables (video_dao, tag_dao, video_tag_dao, video_history_dao, comment_dao, user_dao,
- api_data, now)
-
- conn.commit ()
- conn.close ()
-
-
- def update_tables (
- video_dao: VideoDao,
- tag_dao: TagDao,
- video_tag_dao: VideoTagDao,
- video_history_dao: VideoHistoryDao,
- comment_dao: CommentDao,
- user_dao: UserDao,
- api_data: list[VideoResult],
- now: datetime,
- ) -> None:
- video_ids: list[int] = []
- for datum in api_data:
- tag_names: list[str] = datum['tags'].split ()
- video = VideoDto (code = datum['contentId'],
- title = datum['title'],
- description = datum['description'] or '',
- uploaded_at = datetime.fromisoformat (datum['startTime']))
- video_dao.upsert (video, False)
- if video.id_ is not None:
- video_ids.append (video.id_)
- video_history = VideoHistoryDto (video_id = video.id_,
- fetched_at = now,
- views_count = datum['viewCounter'])
- video_history_dao.insert (video_history)
- tag_ids: list[int] = []
- video_tags = video_tag_dao.fetch_alive_by_video_id (video.id_, False)
- for vt in video_tags:
- tag = tag_dao.find (vt.tag_id)
- if (tag is not None
- and (tag.name.upper () not in [tn.upper () for tn in tag_names])
- and (tag.id_ is not None)):
- tag_ids.append (tag.id_)
- video_tag_dao.untag_all (video.id_, tag_ids, now)
- tags: list[TagDto] = []
- for tag_name in tag_names:
- tag = tag_dao.fetch_by_name (tag_name)
- if tag is None:
- tag = TagDto (name = tag_name)
- tag_dao.insert (tag)
- if video.id_ is not None and tag.id_ is not None:
- video_tag = video_tag_dao.fetch_alive_by_ids (video.id_, tag.id_, False)
- if video_tag is None:
- video_tag = VideoTagDto (video_id = video.id_,
- tag_id = tag.id_,
- tagged_at = now)
- video_tag_dao.insert (video_tag, False)
- for com in fetch_comments (video.code):
- user = user_dao.fetch_by_code (com['userId'])
- if user is None:
- user = UserDto (code = com['userId'])
- user_dao.insert (user)
- if video.id_ is not None and user.id_ is not None:
- comment = CommentDto (video_id = video.id_,
- comment_no = com['no'],
- user_id = user.id_,
- content = com['body'],
- posted_at = datetime.fromisoformat (com['postedAt']),
- nico_count = com['nicoruCount'],
- vpos_ms = com['vposMs'])
- comment_dao.upsert (comment, False)
-
- alive_video_codes = [d['contentId'] for d in api_data]
-
- lost_video_ids: list[int] = []
- videos = video_dao.fetch_alive ()
- for video in videos:
- if video.id_ is not None and video.code not in alive_video_codes:
- lost_video_ids.append (video.id_)
-
- video_dao.delete (lost_video_ids, now)
-
-
- def fetch_comments (
- video_code: str,
- ) -> list[CommentResult]:
- time.sleep (1.2)
-
- headers = { 'X-Frontend-Id': '6',
- 'X-Frontend-Version': '0' }
-
- action_track_id = (
- ''.join (random.choice (string.ascii_letters + string.digits)
- for _ in range (10))
- + '_'
- + str (random.randrange (10 ** 12, 10 ** 13)))
-
- url = (f"https://www.nicovideo.jp/api/watch/v3_guest/{ video_code }"
- + f"?actionTrackId={ action_track_id }")
-
- res = requests.post (url, headers = headers, timeout = 60).json ()
-
- try:
- nv_comment = res['data']['comment']['nvComment']
- except KeyError:
- return []
- if nv_comment is None:
- return []
-
- headers = { 'X-Frontend-Id': '6',
- 'X-Frontend-Version': '0',
- 'Content-Type': 'application/json' }
-
- params = { 'params': nv_comment['params'],
- 'additionals': { },
- 'threadKey': nv_comment['threadKey'] }
-
- url = nv_comment['server'] + '/v1/threads'
-
- res = (requests.post (url, json.dumps (params),
- headers = headers,
- timeout = 60)
- .json ())
-
- try:
- return res['data']['threads'][1]['comments']
- except (IndexError, KeyError):
- return []
-
-
- def search_nico_by_tag (
- tag: str,
- ) -> list[VideoResult]:
- return search_nico_by_tags ([tag])
-
-
- def search_nico_by_tags (
- tags: list[str],
- ) -> list[VideoResult]:
- today = datetime.now ()
-
- url = ('https://snapshot.search.nicovideo.jp'
- + '/api/v2/snapshot/video/contents/search')
-
- result_data: list[VideoResult] = []
- to = datetime (2022, 12, 3)
- while to <= today:
- time.sleep (1.2)
- until = to + timedelta (days = 14)
- query_filter = json.dumps ({ 'type': 'or',
- 'filters': [
- { 'type': 'range',
- 'field': 'startTime',
- 'from': '%04d-%02d-%02dT00:00:00+09:00' % (to.year, to.month, to.day),
- 'to': '%04d-%02d-%02dT23:59:59+09:00' % (until.year, until.month, until.day),
- 'include_lower': True }] })
- params: VideoSearchParam = { 'q': ' OR '.join (tags),
- 'targets': 'tagsExact',
- '_sort': '-viewCounter',
- 'fields': ('contentId,'
- 'title,'
- 'tags,'
- 'description,'
- 'viewCounter,'
- 'startTime'),
- '_limit': 100,
- 'jsonFilter': query_filter }
- res = requests.get (url, params = cast (dict[str, int | str], params), timeout = 60).json ()
- try:
- result_data += res['data']
- except KeyError:
- pass
- to = until + timedelta (days = 1)
-
- return result_data
-
-
- class VideoDao:
- def __init__ (
- self,
- conn: MySQLConnectionAbstract,
- ):
- self.conn = conn
-
- def find (
- self,
- video_id: int,
- with_relation_tables: bool,
- ) -> VideoDto | None:
- with self.conn.cursor (dictionary = True) as c:
- c.execute ("""
- SELECT
- id,
- code,
- title,
- description,
- uploaded_at,
- deleted_at
- FROM
- videos
- WHERE
- id = %s
- ORDER BY
- id""", (video_id,))
- print (c._executed)
- row = cast (VideoRow | None, c.fetchone ())
- if row is None:
- return None
- return self._create_dto_from_row (row, with_relation_tables)
-
- def fetch_all (
- self,
- with_relation_tables: bool,
- ) -> list[VideoDto]:
- with self.conn.cursor (dictionary = True) as c:
- c.execute ("""
- SELECT
- id,
- code,
- title,
- description,
- uploaded_at,
- deleted_at
- FROM
- videos
- ORDER BY
- id""")
- print (c._executed)
- videos: list[VideoDto] = []
- for row in cast (list[VideoRow], c.fetchall ()):
- videos.append (self._create_dto_from_row (row, with_relation_tables))
- return videos
-
- def fetch_alive (
- self,
- ) -> list[VideoDto]:
- with self.conn.cursor (dictionary = True) as c:
- c.execute ("""
- SELECT
- id,
- code,
- title,
- description,
- uploaded_at,
- deleted_at
- FROM
- videos
- WHERE
- deleted_at IS NULL""")
- print (c._executed)
- videos: list[VideoDto] = []
- for row in cast (list[VideoRow], c.fetchall ()):
- videos.append (self._create_dto_from_row (row, False))
- return videos
-
- def upsert (
- self,
- video: VideoDto,
- with_relation_tables: bool,
- ) -> None:
- deleted_at: datetime | DbNullType | None = video.deleted_at
- if deleted_at is None:
- raise TypeError ('未実装')
- if deleted_at is DbNull:
- deleted_at = None
- deleted_at = cast (datetime | None, deleted_at)
-
- with self.conn.cursor (dictionary = True) as c:
- c.execute ("""
- INSERT INTO
- videos(
- code,
- title,
- description,
- uploaded_at,
- deleted_at)
- VALUES
- (
- %s,
- %s,
- %s,
- %s,
- %s)
- ON DUPLICATE KEY UPDATE
- id = LAST_INSERT_ID(id),
- code = VALUES(code),
- title = VALUES(title),
- description = VALUES(description),
- uploaded_at = VALUES(uploaded_at),
- deleted_at = VALUES(deleted_at)""", (video.code,
- video.title,
- video.description,
- video.uploaded_at,
- deleted_at))
- print (c._executed)
- video.id_ = c.lastrowid
- if with_relation_tables:
- if video.video_tags is not None:
- VideoTagDao (self.conn).upsert_all (video.video_tags, False)
- if video.comments is not None:
- CommentDao (self.conn).upsert_all (video.comments, False)
- if video.video_histories is not None:
- VideoHistoryDao (self.conn).upsert_all (video.video_histories)
-
- def upsert_all (
- self,
- videos: list[VideoDto],
- with_relation_tables: bool,
- ) -> None:
- for video in videos:
- self.upsert (video, with_relation_tables)
-
- def delete (
- self,
- video_ids: list[int],
- at: datetime,
- ) -> None:
- if not video_ids:
- return
- with self.conn.cursor (dictionary = True) as c:
- c.execute ("""
- UPDATE
- videos
- SET
- deleted_at = %%s
- WHERE
- id IN (%s)""" % ', '.join (['%s'] * len (video_ids)), (at, *video_ids))
- print (c._executed)
-
- def _create_dto_from_row (
- self,
- row: VideoRow,
- with_relation_tables: bool,
- ) -> VideoDto:
- video = VideoDto (id_ = row['id'],
- code = row['code'],
- title = row['title'],
- description = row['description'],
- uploaded_at = row['uploaded_at'],
- deleted_at = row['deleted_at'] or DbNull)
- if with_relation_tables and video.id_ is not None:
- video.video_tags = VideoTagDao (self.conn).fetch_by_video_id (video.id_, False)
- for i in range (len (video.video_tags)):
- video.video_tags[i].video = video
- video.comments = CommentDao (self.conn).fetch_by_video_id (video.id_, False)
- for i in range (len (video.comments)):
- video.comments[i].video = video
- video.video_histories = VideoHistoryDao (self.conn).fetch_by_video_id (video.id_, False)
- for i in range (len (video.video_histories)):
- video.video_histories[i].video = video
- return video
-
-
- @dataclass (slots = True)
- class VideoDto:
- code: str
- title: str
- description: str
- uploaded_at: datetime
- id_: int | None = None
- deleted_at: datetime | DbNullType = DbNull
- video_tags: list[VideoTagDto] | None = None
- comments: list[CommentDto] | None = None
- video_histories: list[VideoHistoryDto] | None = None
-
-
- class VideoTagDao:
- def __init__ (
- self,
- conn: MySQLConnectionAbstract,
- ):
- self.conn = conn
-
- def fetch_by_video_id (
- self,
- video_id: int,
- with_relation_tables: bool,
- ) -> list[VideoTagDto]:
- with self.conn.cursor (dictionary = True) as c:
- c.execute ("""
- SELECT
- id,
- video_id,
- tag_id,
- tagged_at,
- untagged_at
- FROM
- video_tags
- WHERE
- video_id = %s
- ORDER BY
- id""", (video_id,))
- print (c._executed)
- video_tags: list[VideoTagDto] = []
- for row in cast (list[VideoTagRow], c.fetchall ()):
- video_tags.append (self._create_dto_from_row (row, with_relation_tables))
- return video_tags
-
- def fetch_alive_by_video_id (
- self,
- video_id: int,
- with_relation_tables: bool,
- ) -> list[VideoTagDto]:
- with self.conn.cursor (dictionary = True) as c:
- c.execute ("""
- SELECT
- id,
- video_id,
- tag_id,
- tagged_at,
- untagged_at
- FROM
- video_tags
- WHERE
- video_id = %s
- AND (untagged_at IS NULL)
- ORDER BY
- id""", (video_id,))
- print (c._executed)
- video_tags: list[VideoTagDto] = []
- for row in cast (list[VideoTagRow], c.fetchall ()):
- video_tags.append (self._create_dto_from_row (row, with_relation_tables))
- return video_tags
-
- def fetch_alive_by_ids (
- self,
- video_id: int,
- tag_id: int,
- with_relation_tables: bool,
- ) -> VideoTagDto | None:
- with self.conn.cursor (dictionary = True) as c:
- c.execute ("""
- SELECT
- id,
- video_id,
- tag_id,
- tagged_at,
- untagged_at
- FROM
- video_tags
- WHERE
- video_id = %s
- AND tag_id = %s""", (video_id, tag_id))
- print (c._executed)
- row = cast (VideoTagRow, c.fetchone ())
- if row is None:
- return None
- return self._create_dto_from_row (row, with_relation_tables)
-
- def insert (
- self,
- video_tag: VideoTagDto,
- with_relation_tables: bool,
- ) -> None:
- untagged_at: date | DbNullType | None = video_tag.untagged_at
- if untagged_at is None:
- raise TypeError ('未実装')
- if untagged_at is DbNull:
- untagged_at = None
- untagged_at = cast (date | None, untagged_at)
-
- with self.conn.cursor (dictionary = True) as c:
- c.execute ("""
- INSERT INTO
- video_tags(
- video_id,
- tag_id,
- tagged_at,
- untagged_at)
- VALUES
- (
- %s,
- %s,
- %s,
- %s)""", (video_tag.video_id, video_tag.tag_id,
- video_tag.tagged_at, untagged_at))
- print (c._executed)
- video_tag.id_ = c.lastrowid
- if with_relation_tables:
- if video_tag.video is not None:
- VideoDao (self.conn).upsert (video_tag.video, True)
- if video_tag.tag is not None:
- TagDao (self.conn).upsert (video_tag.tag)
-
- def upsert (
- self,
- video_tag: VideoTagDto,
- with_relation_tables: bool,
- ) -> None:
- untagged_at: date | DbNullType | None = video_tag.untagged_at
- if untagged_at is None:
- raise TypeError ('未実装')
- if untagged_at is DbNull:
- untagged_at = None
- untagged_at = cast (date | None, untagged_at)
-
- with self.conn.cursor (dictionary = True) as c:
- c.execute ("""
- INSERT INTO
- video_tags(
- video_id,
- tag_id,
- tagged_at,
- untagged_at)
- VALUES
- (
- %s,
- %s,
- %s,
- %s)
- ON DUPLICATE KEY UPDATE
- id = LAST_INSERT_ID(id),
- video_id = VALUES(video_id),
- tag_id = VALUES(tag_id),
- tagged_at = VALUES(tagged_at),
- untagged_at = VALUES(untagged_at)""", (video_tag.video_id,
- video_tag.tag_id,
- video_tag.tagged_at,
- untagged_at))
- print (c._executed)
- video_tag.id_ = c.lastrowid
- if with_relation_tables:
- if video_tag.video is not None:
- VideoDao (self.conn).upsert (video_tag.video, True)
- if video_tag.tag is not None:
- TagDao (self.conn).upsert (video_tag.tag)
-
- def upsert_all (
- self,
- video_tags: list[VideoTagDto],
- with_relation_tables: bool,
- ) -> None:
- for video_tag in video_tags:
- self.upsert (video_tag, with_relation_tables)
-
- def untag_all (
- self,
- video_id: int,
- tag_ids: list[int],
- now: datetime,
- ) -> None:
- if not tag_ids:
- return
- with self.conn.cursor (dictionary = True) as c:
- c.execute ("""
- UPDATE
- video_tags
- SET
- untagged_at = %%s
- WHERE
- video_id = %%s
- AND tag_id IN (%s)""" % ', '.join (['%s'] * len (tag_ids)), (now, video_id, *tag_ids))
- print (c._executed)
-
- def _create_dto_from_row (
- self,
- row: VideoTagRow,
- with_relation_tables: bool,
- ) -> VideoTagDto:
- video_tag = VideoTagDto (id_ = row['id'],
- video_id = row['video_id'],
- tag_id = row['tag_id'],
- tagged_at = row['tagged_at'],
- untagged_at = row['untagged_at'] or DbNull)
- if with_relation_tables:
- video_tag.video = VideoDao (self.conn).find (video_tag.video_id, True)
- video_tag.tag = TagDao (self.conn).find (video_tag.tag_id)
- return video_tag
-
-
- @dataclass (slots = True)
- class VideoTagDto:
- video_id: int
- tag_id: int
- tagged_at: date
- id_: int | None = None
- untagged_at: date | DbNullType = DbNull
- video: VideoDto | None = None
- tag: TagDto | None = None
-
-
- class TagDao:
- def __init__ (
- self,
- conn: MySQLConnectionAbstract,
- ):
- self.conn = conn
-
- def find (
- self,
- tag_id: int,
- ) -> TagDto | None:
- with self.conn.cursor (dictionary = True) as c:
- c.execute ("""
- SELECT
- id,
- name
- FROM
- tags
- WHERE
- id = %s""", (tag_id,))
- print (c._executed)
- row = cast (TagRow | None, c.fetchone ())
- if row is None:
- return None
- return self._create_dto_from_row (row)
-
- def fetch_by_name (
- self,
- tag_name: str,
- ) -> TagDto | None:
- with self.conn.cursor (dictionary = True) as c:
- c.execute ("""
- SELECT
- id,
- name
- FROM
- tags
- WHERE
- name = %s""", (tag_name,))
- print (c._executed)
- row = cast (TagRow | None, c.fetchone ())
- if row is None:
- return None
- return self._create_dto_from_row (row)
-
- def insert (
- self,
- tag: TagDto,
- ) -> None:
- with self.conn.cursor (dictionary = True) as c:
- c.execute ("""
- INSERT INTO
- tags(name)
- VALUES
- (%s)""", (tag.name,))
- print (c._executed)
- tag.id_ = c.lastrowid
-
- def upsert (
- self,
- tag: TagDto,
- ) -> None:
- with self.conn.cursor (dictionary = True) as c:
- c.execute ("""
- INSERT INTO
- tags(name)
- VALUES
- (%s)
- ON DUPLICATE KEY UPDATE
- id = LAST_INSERT_ID(id),
- name = VALUES(name)""", (tag.name,))
- print (c._executed)
- tag.id_ = c.lastrowid
-
- def _create_dto_from_row (
- self,
- row: TagRow,
- ) -> TagDto:
- return TagDto (id_ = row['id'],
- name = row['name'])
-
-
- @dataclass (slots = True)
- class TagDto:
- name: str
- id_: int | None = None
-
-
- class VideoHistoryDao:
- def __init__ (
- self,
- conn: MySQLConnectionAbstract,
- ):
- self.conn = conn
-
- def fetch_by_video_id (
- self,
- video_id: int,
- with_relation_tables: bool,
- ) -> list[VideoHistoryDto]:
- with self.conn.cursor (dictionary = True) as c:
- c.execute ("""
- SELECT
- id,
- video_id,
- fetched_at,
- views_count
- FROM
- video_histories
- WHERE
- video_id = %s""", (video_id,))
- print (c._executed)
- video_histories: list[VideoHistoryDto] = []
- for row in cast (list[VideoHistoryRow], c.fetchall ()):
- video_histories.append (self._create_dto_from_row (row, with_relation_tables))
- return video_histories
-
- def insert (
- self,
- video_history: VideoHistoryDto,
- ) -> None:
- with self.conn.cursor (dictionary = True) as c:
- c.execute ("""
- INSERT INTO
- video_histories(
- video_id,
- fetched_at,
- views_count)
- VALUES
- (
- %s,
- %s,
- %s)""", (video_history.video_id,
- video_history.fetched_at,
- video_history.views_count))
- print (c._executed)
-
- def upsert (
- self,
- video_history: VideoHistoryDto,
- ) -> None:
- with self.conn.cursor (dictionary = True) as c:
- c.execute ("""
- INSERT INTO
- video_histories(
- video_id,
- fetched_at,
- views_count)
- VALUES
- (
- %s,
- %s,
- %s)
- ON DUPLICATE KEY UPDATE
- id = LAST_INSERT_ID(id),
- video_id = VALUES(video_id),
- fetched_at = VALUES(fetched_at),
- views_count = VALUES(views_count)""", (video_history.video_id,
- video_history.fetched_at,
- video_history.views_count))
- print (c._executed)
-
- def upsert_all (
- self,
- video_histories: list[VideoHistoryDto],
- ) -> None:
- for video_history in video_histories:
- self.upsert (video_history)
-
- def _create_dto_from_row (
- self,
- row: VideoHistoryRow,
- with_relation_tables: bool,
- ) -> VideoHistoryDto:
- video_history = VideoHistoryDto (id_ = row['id'],
- video_id = row['video_id'],
- fetched_at = row['fetched_at'],
- views_count = row['views_count'])
- if with_relation_tables:
- video_history.video = VideoDao (self.conn).find (video_history.video_id, True)
- return video_history
-
-
- @dataclass (slots = True)
- class VideoHistoryDto:
- video_id: int
- fetched_at: date
- views_count: int
- id_: int | None = None
- video: VideoDto | None = None
-
-
- class CommentDao:
- def __init__ (
- self,
- conn: MySQLConnectionAbstract,
- ):
- self.conn = conn
-
- def fetch_by_video_id (
- self,
- video_id: int,
- with_relation_tables: bool,
- ) -> list[CommentDto]:
- with self.conn.cursor (dictionary = True) as c:
- c.execute ("""
- SELECT
- id,
- video_id,
- comment_no,
- user_id,
- content,
- posted_at,
- nico_count,
- vpos_ms
- FROM
- comments
- WHERE
- video_id = %s""", (video_id,))
- print (c._executed)
- comments: list[CommentDto] = []
- for row in cast (list[CommentRow], c.fetchall ()):
- comments.append (self._create_dto_from_row (row, with_relation_tables))
- return comments
-
- def upsert (
- self,
- comment: CommentDto,
- with_relation_tables: bool,
- ) -> None:
- vpos_ms: int | DbNullType | None = comment.vpos_ms
- if vpos_ms is None:
- raise TypeError ('未実装')
- if vpos_ms is DbNull:
- vpos_ms = None
- vpos_ms = cast (int | None, vpos_ms)
-
- with self.conn.cursor (dictionary = True) as c:
- c.execute ("""
- INSERT INTO
- comments(
- video_id,
- comment_no,
- user_id,
- content,
- posted_at,
- nico_count,
- vpos_ms)
- VALUES
- (
- %s,
- %s,
- %s,
- %s,
- %s,
- %s,
- %s)
- ON DUPLICATE KEY UPDATE
- id = LAST_INSERT_ID(id),
- video_id = VALUES(video_id),
- comment_no = VALUES(comment_no),
- user_id = VALUES(user_id),
- content = VALUES(content),
- posted_at = VALUES(posted_at),
- nico_count = VALUES(nico_count),
- vpos_ms = VALUES(vpos_ms)""", (comment.video_id,
- comment.comment_no,
- comment.user_id,
- comment.content,
- comment.posted_at,
- comment.nico_count,
- vpos_ms))
- print (c._executed)
-
- def upsert_all (
- self,
- comments: list[CommentDto],
- with_relation_tables: bool,
- ) -> None:
- for comment in comments:
- self.upsert (comment, with_relation_tables)
-
- def _create_dto_from_row (
- self,
- row: CommentRow,
- with_relation_tables: bool,
- ) -> CommentDto:
- comment = CommentDto (id_ = row['id'],
- video_id = row['video_id'],
- comment_no = row['comment_no'],
- user_id = row['user_id'],
- content = row['content'],
- posted_at = row['posted_at'],
- nico_count = row['nico_count'],
- vpos_ms = row['vpos_ms'] or DbNull)
- if with_relation_tables:
- comment.video = VideoDao (self.conn).find (comment.video_id, True)
- return comment
-
-
- @dataclass (slots = True)
- class CommentDto:
- video_id: int
- comment_no: int
- user_id: int
- content: str
- posted_at: datetime
- id_: int | None = None
- nico_count: int = 0
- vpos_ms: int | DbNullType = DbNull
- video: VideoDto | None = None
- user: UserDto | None = None
-
-
- class UserDao:
- def __init__ (
- self,
- conn: MySQLConnectionAbstract,
- ):
- self.conn = conn
-
- def fetch_by_code (
- self,
- user_code: str
- ) -> UserDto | None:
- with self.conn.cursor (dictionary = True) as c:
- c.execute ("""
- SELECT
- id,
- code
- FROM
- users
- WHERE
- code = %s""", (user_code,))
- print (c._executed)
- row = cast (UserRow | None, c.fetchone ())
- if row is None:
- return None
- return self._create_dto_from_row (row)
-
- def insert (
- self,
- user: UserDto,
- ) -> None:
- with self.conn.cursor (dictionary = True) as c:
- c.execute ("""
- INSERT INTO
- users(code)
- VALUES
- (%s)""", (user.code,))
- print (c.execute)
- user.id_ = c.lastrowid
-
- def _create_dto_from_row (
- self,
- row: UserRow,
- ) -> UserDto:
- return UserDto (id_ = row['id'],
- code = row['code'])
-
-
- @dataclass (slots = True)
- class UserDto:
- code: str
- id_: int | None = None
-
-
- if __name__ == '__main__':
- main ()
|