|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 |
- import json
- import os
- import random
- import string
- import time
- from dataclasses import dataclass
- from datetime import datetime
-
- import mysql.connector
- import requests
-
-
- def main (
- ) -> None:
- conn = mysql.connector.connect (host = os.environ['MYSQL_HOST'],
- user = os.environ['MYSQL_USER'],
- password = os.environ['MYSQL_PASS'])
-
-
- def fetch_comments (
- video_id: str,
- ) -> list:
- headers = { 'X-Frontend-Id': '6',
- 'X-Frontend-Version': '0' }
-
- action_track_id = (
- ''.join (random.choice (string.ascii_letters + string.digits)
- for _ in range (10))
- + '_'
- + str (random.randrange (10 ** 12, 10 ** 13)))
-
- url = (f"https://www.nicovideo.jp/api/watch/v3_guest/{ video_id }"
- + f"?actionTrackId={ action_track_id }")
-
- res = requests.post (url, headers = headers, timeout = 60).json ()
-
- try:
- nv_comment = res['data']['comment']['nvComment']
- except (NameError, AttributeError):
- return []
- if nv_comment is None:
- return []
-
- headers = { 'X-Frontend-Id': '6',
- 'X-Frontend-Version': '0',
- 'Content-Type': 'application/json' }
-
- params = { 'params': nv_comment['params'],
- 'additionals': { },
- 'threadKey': nv_comment['threadKey'] }
-
- url = nv_comment['server'] + '/v1/threads'
-
- res = (requests.post (url, json.dumps (params),
- headers = headers,
- timeout = 60)
- .json ())
-
- try:
- return res['data']['threads'][1]['comments']
- except (NameError, AttributeError):
- return []
-
-
- def search_nico_by_tag (
- tag: str,
- ) -> list[dict]:
- return search_nico_by_tags ([tag])
-
-
- def search_nico_by_tags (
- tags: list[str],
- ) -> list[dict]:
- url = ('https://snapshot.search.nicovideo.jp'
- + '/api/v2/snapshot/video/contents/search')
-
- query_filter = json.dumps ({ 'type': 'or',
- 'filters': [
- { 'type': 'range',
- 'field': 'startTime',
- 'from': f"{year}-{start}T00:00:00+09:00",
- 'to': f"{year}-{end}T23:59:59+09:00",
- 'include_lower': True }] })
-
- params: dict[str, int | str]
- params = { 'q': ' OR '.join (tags),
- 'targets': 'tagsExact',
- '_sort': '-viewCounter',
- 'fields': 'contentId,title,tags,viewCounter,startTime',
- '_limit': 100,
- 'jsonFilter': query_filter }
-
- res = requests.get (url, params = params, timeout = 60).json ()
-
- return res['data']
-
-
- class VideoDao:
- def __init__ (
- self,
- conn
- ):
- self.conn = conn
-
- def fetch_all (
- with_relation_tables: bool = True,
- ) -> list[VideoDto]:
- with self.conn.cursor () as c:
- c.execute ("""
- SELECT
- id,
- code,
- title,
- description,
- uploaded_at,
- deleted_at
- FROM
- videos""")
- for video in c.fetchall ():
- if with_relation_tables:
- video_tags = VideoTagDao (conn).fetch_all (False)
- comments = CommentDao (conn).fetch_all (False)
- video_histories = VideoHistoryDao (conn).fetch_all (False)
- else:
- video_tags = None
- comments = None
- video_histories = None
- return VideoDto (id_ = video['id'],
- code = video['code'],
- title = video['title'],
- description = video['description'],
- uploaded_at = video['uploaded_at'],
- deleted_at = video['deleted_at'],
- video_tags = video_tags,
- comments = comments,
- video_histories = video_histories)
-
-
- @dataclass (slots = True)
- class VideoDto:
- id_: int
- code: str
- title: str
- description: str
- uploaded_at: datetime
- deleted_at: datetime | None
- video_tags: VideoTagDto | None
- comments: CommentDto | None
- video_histories: VideoHistoryDto | None
-
-
- if __name__ == '__main__':
- main ()
|