mirror of
https://github.com/SinTan1729/TvTimeToTrakt.git
synced 2025-04-19 09:30:01 -05:00
169 lines
5 KiB
Python
169 lines
5 KiB
Python
#!/usr/bin/env python3
|
|
import csv
|
|
import json
|
|
import logging
|
|
import os
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
|
|
import trakt.core
|
|
from trakt import init
|
|
|
|
from processor import TVShowProcessor, MovieProcessor
|
|
from searcher import TVTimeTVShow, TVTimeMovie
|
|
|
|
# Setup logger
|
|
logging.basicConfig(
|
|
format="%(asctime)s [%(levelname)7s] :: %(message)s",
|
|
level=logging.INFO,
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
)
|
|
|
|
# Adjust this value to increase/decrease your requests between episodes.
|
|
# Make to remain within the rate limit: https://trakt.docs.apiary.io/#introduction/rate-limiting
|
|
DELAY_BETWEEN_ITEMS_IN_SECONDS = 1
|
|
|
|
|
|
@dataclass
|
|
class Config:
|
|
trakt_username: str
|
|
client_id: str
|
|
client_secret: str
|
|
gdpr_workspace_path: str
|
|
|
|
|
|
def is_authenticated() -> bool:
|
|
with open("pytrakt.json") as f:
|
|
data = json.load(f)
|
|
days_before_expiration = (
|
|
datetime.fromtimestamp(data["OAUTH_EXPIRES_AT"]) - datetime.now()
|
|
).days
|
|
return days_before_expiration >= 1
|
|
|
|
|
|
def get_configuration() -> Config:
|
|
try:
|
|
with open("config.json") as f:
|
|
data = json.load(f)
|
|
|
|
return Config(
|
|
data["TRAKT_USERNAME"],
|
|
data["CLIENT_ID"],
|
|
data["CLIENT_SECRET"],
|
|
data["GDPR_WORKSPACE_PATH"],
|
|
)
|
|
except FileNotFoundError:
|
|
logging.info("config.json not found prompting user for input")
|
|
return Config(
|
|
input("Enter your Trakt.tv username: "),
|
|
input("Enter you Client id: "),
|
|
input("Enter your Client secret: "),
|
|
input("Enter your GDPR workspace path: ")
|
|
)
|
|
|
|
|
|
config = get_configuration()
|
|
|
|
WATCHED_SHOWS_PATH = config.gdpr_workspace_path + "/seen_episode.csv"
|
|
FOLLOWED_SHOWS_PATH = config.gdpr_workspace_path + "/followed_tv_show.csv"
|
|
MOVIES_PATH = config.gdpr_workspace_path + "/tracking-prod-records.csv"
|
|
|
|
|
|
def init_trakt_auth() -> bool:
|
|
if is_authenticated():
|
|
return True
|
|
trakt.core.AUTH_METHOD = trakt.core.OAUTH_AUTH
|
|
return init(
|
|
config.trakt_username,
|
|
store=True,
|
|
client_id=config.client_id,
|
|
client_secret=config.client_secret,
|
|
)
|
|
|
|
|
|
def process_watched_shows() -> None:
|
|
with open(WATCHED_SHOWS_PATH, newline="") as csvfile:
|
|
reader = csv.DictReader(csvfile, delimiter=",")
|
|
total_rows = len(list(reader))
|
|
csvfile.seek(0, 0)
|
|
|
|
# Ignore the header row
|
|
next(reader, None)
|
|
for rows_count, row in enumerate(reader):
|
|
tv_time_show = TVTimeTVShow(row)
|
|
TVShowProcessor().process_item(tv_time_show, rows_count / total_rows)
|
|
|
|
|
|
def process_watched_movies() -> None:
|
|
with open(MOVIES_PATH, newline="") as csvfile:
|
|
reader = filter(lambda p: p["movie_name"] != "", csv.DictReader(csvfile, delimeter=""))
|
|
watched_list = [row["movie_name"] for row in reader if row["type"] == "watch"]
|
|
csvfile.seek(0, 0)
|
|
total_rows = len(list(reader))
|
|
csvfile.seek(0, 0)
|
|
|
|
# Ignore the header row
|
|
next(reader, None)
|
|
for rows_count, row in enumerate(reader):
|
|
movie = TVTimeMovie(row)
|
|
MovieProcessor(watched_list).process_item(movie, rows_count / total_rows)
|
|
|
|
|
|
def menu_selection() -> int:
|
|
# Display a menu selection
|
|
print(">> What do you want to do?")
|
|
print(" 1) Import Watch History for TV Shows from TV Time")
|
|
print(" 2) Import Watched Movies from TV Time")
|
|
print(" 3) Do both 1 and 2 (default)")
|
|
print(" 4) Exit")
|
|
|
|
while True:
|
|
try:
|
|
selection = input("Enter your menu selection: ")
|
|
selection = 3 if not selection else int(selection)
|
|
break
|
|
except ValueError:
|
|
logging.warning("Invalid input. Please enter a numerical number.")
|
|
# Check if the input is valid
|
|
if not 1 <= selection <= 4:
|
|
logging.warning("Sorry - that's an unknown menu selection")
|
|
exit()
|
|
# Exit if the 4th option was chosen
|
|
if selection == 4:
|
|
logging.info("Exiting as per user's selection.")
|
|
exit()
|
|
|
|
return selection
|
|
|
|
|
|
def start():
|
|
selection = menu_selection()
|
|
|
|
# Create the initial authentication with Trakt, before starting the process
|
|
if not init_trakt_auth():
|
|
logging.error(
|
|
"ERROR: Unable to complete authentication to Trakt - please try again."
|
|
)
|
|
|
|
if selection == 1:
|
|
logging.info("Processing watched shows.")
|
|
process_watched_shows()
|
|
# TODO: Add support for followed shows
|
|
elif selection == 2:
|
|
logging.info("Processing movies.")
|
|
process_watched_movies()
|
|
elif selection == 3:
|
|
logging.info("Processing both watched shows and movies.")
|
|
process_watched_shows()
|
|
process_watched_movies()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Check that the user has provided the GDPR path
|
|
if os.path.isdir(config.gdpr_workspace_path):
|
|
start()
|
|
else:
|
|
logging.error(
|
|
f"Oops! The TV Time GDPR folder 'config.gdpr_workspace_path'"
|
|
" does not exist on the local system. Please check it, and try again."
|
|
)
|