TvTimeToTrakt/TimeToTrakt.py
Markus Nyman 26cd599e45
Movie support and refactoring (#26)
* Initial working

* Cleanup

* Merged scripts

* Present menu before authentication. add entries

* Just use one database

* Add bell on manual input prompt (suggested by @WeirdAlex03)

* Config to dataclass

* Prompt config if it doesn't exist

* Naming to snake_case

* Remove use of Exodus class

* Remove old Title fields

* Specify TV Shows and Movies as default action

* Extract menu selection to own function

* Fix movie query

* Simple refactor

* Extract getting same name items to common function

* Remove unnecessary param

* Fix TinyDB movie name

* WIP: refactor

* Introduce common get_item function

* Extract finding single result to common function

* Implement general Searcher to search and handle search results

* WIP: Processor class

* Remove redundant dataclass annotation

* Add TVTimeShow class

* Make Searcher abstract

* Process TV Shows and Movies using Processor class

* Move common logic to base Processor class

* Small cleanup

* Split stuff to own files

* Fix error

* Fix search bug

* Improve logging

* Change is not to is None

* Handle general exception when processing

* Fix grammar

* Fix typo

* Fix case where nothing is found

* Read release date where possible

* Progress to percentage

Co-authored-by: SinTan1729 <sayantan.santra689@gmail.com>
2023-01-17 11:12:52 +00:00

169 lines
5.1 KiB
Python

#!/usr/bin/env python3
import csv
import json
import logging
import os
from dataclasses import dataclass
from datetime import datetime
import trakt.core
from trakt import init
from processor import TVShowProcessor, MovieProcessor
from searcher import TVTimeTVShow, TVTimeMovie
# Setup logger
logging.basicConfig(
format="%(asctime)s [%(levelname)7s] :: %(message)s",
level=logging.INFO,
datefmt="%Y-%m-%d %H:%M:%S",
)
# Adjust this value to increase/decrease your requests between episodes.
# Make to remain within the rate limit: https://trakt.docs.apiary.io/#introduction/rate-limiting
DELAY_BETWEEN_ITEMS_IN_SECONDS = 1
@dataclass
class Config:
trakt_username: str
client_id: str
client_secret: str
gdpr_workspace_path: str
def is_authenticated() -> bool:
with open("pytrakt.json") as f:
data = json.load(f)
days_before_expiration = (
datetime.fromtimestamp(data["OAUTH_EXPIRES_AT"]) - datetime.now()
).days
return days_before_expiration >= 1
def get_configuration() -> Config:
try:
with open("config.json") as f:
data = json.load(f)
return Config(
data["TRAKT_USERNAME"],
data["CLIENT_ID"],
data["CLIENT_SECRET"],
data["GDPR_WORKSPACE_PATH"],
)
except FileNotFoundError:
logging.info("config.json not found prompting user for input")
return Config(
input("Enter your Trakt.tv username: "),
input("Enter you Client id: "),
input("Enter your Client secret: "),
input("Enter your GDPR workspace path: ")
)
config = get_configuration()
WATCHED_SHOWS_PATH = config.gdpr_workspace_path + "/seen_episode.csv"
FOLLOWED_SHOWS_PATH = config.gdpr_workspace_path + "/followed_tv_show.csv"
MOVIES_PATH = config.gdpr_workspace_path + "/tracking-prod-records.csv"
def init_trakt_auth() -> bool:
if is_authenticated():
return True
trakt.core.AUTH_METHOD = trakt.core.OAUTH_AUTH
return init(
config.trakt_username,
store=True,
client_id=config.client_id,
client_secret=config.client_secret,
)
def process_watched_shows() -> None:
with open(WATCHED_SHOWS_PATH, newline="") as csvfile:
reader = csv.DictReader(csvfile, delimiter=",")
total_rows = len(list(reader))
csvfile.seek(0, 0)
# Ignore the header row
next(reader, None)
for rows_count, row in enumerate(reader):
tv_time_show = TVTimeTVShow(row)
TVShowProcessor().process_item(tv_time_show, "{:.2f}%".format(rows_count / total_rows * 100))
def process_watched_movies() -> None:
with open(MOVIES_PATH, newline="") as csvfile:
reader = filter(lambda p: p["movie_name"] != "", csv.DictReader(csvfile, delimiter=","))
watched_list = [row["movie_name"] for row in reader if row["type"] == "watch"]
csvfile.seek(0, 0)
total_rows = len(list(reader))
csvfile.seek(0, 0)
# Ignore the header row
next(reader, None)
for rows_count, row in enumerate(reader):
movie = TVTimeMovie(row)
MovieProcessor(watched_list).process_item(movie, "{:.2f}%".format(rows_count / total_rows * 100))
def menu_selection() -> int:
# Display a menu selection
print(">> What do you want to do?")
print(" 1) Import Watch History for TV Shows from TV Time")
print(" 2) Import Watched Movies from TV Time")
print(" 3) Do both 1 and 2 (default)")
print(" 4) Exit")
while True:
try:
selection = input("Enter your menu selection: ")
selection = 3 if not selection else int(selection)
break
except ValueError:
logging.warning("Invalid input. Please enter a numerical number.")
# Check if the input is valid
if not 1 <= selection <= 4:
logging.warning("Sorry - that's an unknown menu selection")
exit()
# Exit if the 4th option was chosen
if selection == 4:
logging.info("Exiting as per user's selection.")
exit()
return selection
def start():
selection = menu_selection()
# Create the initial authentication with Trakt, before starting the process
if not init_trakt_auth():
logging.error(
"ERROR: Unable to complete authentication to Trakt - please try again."
)
if selection == 1:
logging.info("Processing watched shows.")
process_watched_shows()
# TODO: Add support for followed shows
elif selection == 2:
logging.info("Processing movies.")
process_watched_movies()
elif selection == 3:
logging.info("Processing both watched shows and movies.")
process_watched_shows()
process_watched_movies()
if __name__ == "__main__":
# Check that the user has provided the GDPR path
if os.path.isdir(config.gdpr_workspace_path):
start()
else:
logging.error(
f"Oops! The TV Time GDPR folder 'config.gdpr_workspace_path'"
" does not exist on the local system. Please check it, and try again."
)