Config to dataclass

This commit is contained in:
Markus Nyman 2023-01-12 14:28:59 +02:00
parent 7974a2c5d0
commit 7d8117afba

View file

@ -6,14 +6,14 @@ import os
import re
import sys
import time
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
import trakt.core
from tinydb import Query, TinyDB
from trakt import init
from trakt.tv import TVShow
from trakt.movies import Movie
from trakt.tv import TVShow
# Setup logger
logging.basicConfig(
@ -38,48 +38,42 @@ class Expando(object):
pass
@dataclass
class Config:
trakt_username: str
client_id: str
client_secret: str
gdpr_workspace_path: str
def isAuthenticated():
with open("pytrakt.json") as f:
data = json.load(f)
daysBeforeExpiration = (
datetime.fromtimestamp(data["OAUTH_EXPIRES_AT"]) - datetime.now()
datetime.fromtimestamp(data["OAUTH_EXPIRES_AT"]) - datetime.now()
).days
if daysBeforeExpiration < 1:
return False
return True
def getConfiguration():
configEx = Expando()
def getConfiguration() -> Config:
with open("config.json") as f:
data = json.load(f)
configEx.TRAKT_USERNAME = data["TRAKT_USERNAME"]
configEx.CLIENT_ID = data["CLIENT_ID"]
configEx.CLIENT_SECRET = data["CLIENT_SECRET"]
configEx.GDPR_WORKSPACE_PATH = data["GDPR_WORKSPACE_PATH"]
CONFIG_SINGLETON = configEx
return CONFIG_SINGLETON
return Config(
data["TRAKT_USERNAME"],
data["CLIENT_ID"],
data["CLIENT_SECRET"],
data["GDPR_WORKSPACE_PATH"],
)
config = getConfiguration()
# Return the path to the CSV file contain the watched episode and movie data from TV Time
def getWatchedShowsPath():
return config.GDPR_WORKSPACE_PATH + "/seen_episode.csv"
def getFollowedShowsPath():
return config.GDPR_WORKSPACE_PATH + "/followed_tv_show.csv"
def getMoviesPath():
return config.GDPR_WORKSPACE_PATH + "/tracking-prod-records.csv"
WATCHED_SHOWS_PATH = config.gdpr_workspace_path + "/seen_episode.csv"
FOLLOWED_SHOWS_PATH = config.gdpr_workspace_path + "/followed_tv_show.csv"
MOVIES_PATH = config.gdpr_workspace_path + "/tracking-prod-records.csv"
def initTraktAuth():
@ -88,10 +82,10 @@ def initTraktAuth():
# Set the method of authentication
trakt.core.AUTH_METHOD = trakt.core.OAUTH_AUTH
return init(
config.TRAKT_USERNAME,
config.trakt_username,
store=True,
client_id=config.CLIENT_ID,
client_secret=config.CLIENT_SECRET,
client_id=config.client_id,
client_secret=config.client_secret,
)
@ -213,8 +207,7 @@ def getShowByName(name, seasonNo, episodeNo):
# Query the local database for existing selection
userMatchedQuery = Query()
queryResult = userMatchedShowsTable.search(
userMatchedQuery.ShowName == name)
queryResult = userMatchedShowsTable.search(userMatchedQuery.ShowName == name)
# If the local database already contains an entry for a manual selection
# then don't bother prompting the user to select it again!
@ -339,7 +332,7 @@ def processWatchedShows():
# Total amount of rows in the CSV file
errorStreak = 0
# Open the CSV file within the GDPR exported data
with open(getWatchedShowsPath(), newline="") as csvfile:
with open(WATCHED_SHOWS_PATH, newline="") as csvfile:
# Create the CSV reader, which will break up the fields using the delimiter ','
showsReader = csv.DictReader(csvfile, delimiter=",")
# Get the total amount of rows in the CSV file,
@ -400,7 +393,7 @@ def processWatchedShows():
break
# Show the progress of the import on-screen
logging.info(
f"({rowsCount+1}/{rowsTotal}) - Processing '{tvShowName}' Season {tvShowSeasonNo} / Episode {tvShowEpisodeNo}"
f"({rowsCount + 1}/{rowsTotal}) - Processing '{tvShowName}' Season {tvShowSeasonNo} / Episode {tvShowEpisodeNo}"
)
# Get the season from the Trakt API
season = traktShowObj.seasons[
@ -412,8 +405,7 @@ def processWatchedShows():
episode.mark_as_seen(tvShowDateWatchedConverted)
# Add the episode to the local database as imported, so it can be skipped,
# if the process is repeated
syncedEpisodesTable.insert(
{"episodeId": tvShowEpisodeId})
syncedEpisodesTable.insert({"episodeId": tvShowEpisodeId})
# Clear the error streak on completing the method without errors
errorStreak = 0
break
@ -530,8 +522,7 @@ def getMovieByName(name):
# Query the local database for existing selection
userMatchedQuery = Query()
queryResult = userMatchedMoviesTable.search(
userMatchedQuery.movie_name == name)
queryResult = userMatchedMoviesTable.search(userMatchedQuery.movie_name == name)
# If the local database already contains an entry for a manual selection
# then don't bother prompting the user to select it again!
@ -628,10 +619,10 @@ def processMovies():
# Total amount of rows in the CSV file
errorStreak = 0
# Open the CSV file within the GDPR exported data
with open(getMoviesPath(), newline="") as csvfile:
with open(MOVIES_PATH, newline="") as csvfile:
# Create the CSV reader, which will break up the fields using the delimiter ','
movieReaderTemp = csv.DictReader(csvfile, delimiter=",")
movieReader = filter(lambda p: '' != p['movie_name'], movieReaderTemp)
movieReader = filter(lambda p: "" != p["movie_name"], movieReaderTemp)
# First, list all movies with watched type so that watchlist entry for them is not created
watchedList = []
for row in movieReader:
@ -663,14 +654,13 @@ def processMovies():
# process, and just save time overall without needing to create network requests
movieQuery = Query()
queryResult = syncedMoviesTable.search(
(movieQuery.movie_name == movieName) &
(movieQuery.type == "watched")
(movieQuery.movie_name == movieName) & (movieQuery.type == "watched")
)
watchlistQuery = Query()
queryResultWatchlist = syncedMoviesTable.search(
(watchlistQuery.movie_name == movieName) &
(watchlistQuery.type == "watchlist")
(watchlistQuery.movie_name == movieName)
& (watchlistQuery.type == "watchlist")
)
# If the query returned no results, then continue to import it into Trakt
@ -680,7 +670,8 @@ def processMovies():
# If movie is watched but this is an entry for watchlist, then skip
if movieName in watchedList and activityType != "watch":
logging.info(
f"Skipping '{movieName}' to avoid redundant watchlist entry.")
f"Skipping '{movieName}' to avoid redundant watchlist entry."
)
break
# If more than 10 errors occurred in one streak, whilst trying to import the episode
# then give up, and move onto the next episode, but warn the user.
@ -702,22 +693,23 @@ def processMovies():
break
# Show the progress of the import on-screen
logging.info(
f"({rowsCount+1}/{rowsTotal}) - Processing '{movieName}'"
f"({rowsCount + 1}/{rowsTotal}) - Processing '{movieName}'"
)
if activityType == "watch":
traktMovieObj.mark_as_seen(
movieDateWatchedConverted)
traktMovieObj.mark_as_seen(movieDateWatchedConverted)
# Add the episode to the local database as imported, so it can be skipped,
# if the process is repeated
syncedMoviesTable.insert(
{"movie_name": movieName, "type": "watched"})
{"movie_name": movieName, "type": "watched"}
)
logging.info(f"Marked as seen")
elif len(queryResultWatchlist) == 0:
traktMovieObj.add_to_watchlist()
# Add the episode to the local database as imported, so it can be skipped,
# if the process is repeated
syncedMoviesTable.insert(
{"movie_name": movieName, "type": "watchlist"})
{"movie_name": movieName, "type": "watchlist"}
)
logging.info(f"Added to watchlist")
else:
logging.warning(f"Already in watchlist")
@ -790,8 +782,7 @@ def start():
menuSelection = 3 if not menuSelection else int(menuSelection)
break
except ValueError:
logging.warning(
"Invalid input. Please enter a numerical number.")
logging.warning("Invalid input. Please enter a numerical number.")
# Check if the input is valid
if not 1 <= menuSelection <= 4:
logging.warning("Sorry - that's an unknown menu selection")
@ -829,12 +820,12 @@ if __name__ == "__main__":
# Check that the user has created the config file
if os.path.exists("config.json"):
# Check that the user has provided the GDPR path
if os.path.isdir(config.GDPR_WORKSPACE_PATH):
if os.path.isdir(config.gdpr_workspace_path):
start()
else:
logging.error(
"Oops! The TV Time GDPR folder '"
+ config.GDPR_WORKSPACE_PATH
+ config.gdpr_workspace_path
+ "' does not exist on the local system. Please check it, and try again."
)
else: