mirror of
https://github.com/SinTan1729/TvTimeToTrakt.git
synced 2025-04-19 17:40:01 -05:00
Config to dataclass
This commit is contained in:
parent
35ae075c76
commit
5b31823c27
1 changed files with 45 additions and 54 deletions
|
@ -6,14 +6,14 @@ import os
|
||||||
import re
|
import re
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
|
from dataclasses import dataclass
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
import trakt.core
|
import trakt.core
|
||||||
from tinydb import Query, TinyDB
|
from tinydb import Query, TinyDB
|
||||||
from trakt import init
|
from trakt import init
|
||||||
from trakt.tv import TVShow
|
|
||||||
from trakt.movies import Movie
|
from trakt.movies import Movie
|
||||||
|
from trakt.tv import TVShow
|
||||||
|
|
||||||
# Setup logger
|
# Setup logger
|
||||||
logging.basicConfig(
|
logging.basicConfig(
|
||||||
|
@ -38,48 +38,42 @@ class Expando(object):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Config:
|
||||||
|
trakt_username: str
|
||||||
|
client_id: str
|
||||||
|
client_secret: str
|
||||||
|
gdpr_workspace_path: str
|
||||||
|
|
||||||
|
|
||||||
def isAuthenticated():
|
def isAuthenticated():
|
||||||
with open("pytrakt.json") as f:
|
with open("pytrakt.json") as f:
|
||||||
data = json.load(f)
|
data = json.load(f)
|
||||||
daysBeforeExpiration = (
|
daysBeforeExpiration = (
|
||||||
datetime.fromtimestamp(data["OAUTH_EXPIRES_AT"]) - datetime.now()
|
datetime.fromtimestamp(data["OAUTH_EXPIRES_AT"]) - datetime.now()
|
||||||
).days
|
).days
|
||||||
if daysBeforeExpiration < 1:
|
if daysBeforeExpiration < 1:
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
def getConfiguration():
|
def getConfiguration() -> Config:
|
||||||
configEx = Expando()
|
|
||||||
|
|
||||||
with open("config.json") as f:
|
with open("config.json") as f:
|
||||||
data = json.load(f)
|
data = json.load(f)
|
||||||
|
|
||||||
configEx.TRAKT_USERNAME = data["TRAKT_USERNAME"]
|
return Config(
|
||||||
configEx.CLIENT_ID = data["CLIENT_ID"]
|
data["TRAKT_USERNAME"],
|
||||||
configEx.CLIENT_SECRET = data["CLIENT_SECRET"]
|
data["CLIENT_ID"],
|
||||||
configEx.GDPR_WORKSPACE_PATH = data["GDPR_WORKSPACE_PATH"]
|
data["CLIENT_SECRET"],
|
||||||
|
data["GDPR_WORKSPACE_PATH"],
|
||||||
CONFIG_SINGLETON = configEx
|
)
|
||||||
|
|
||||||
return CONFIG_SINGLETON
|
|
||||||
|
|
||||||
|
|
||||||
config = getConfiguration()
|
config = getConfiguration()
|
||||||
|
|
||||||
# Return the path to the CSV file contain the watched episode and movie data from TV Time
|
WATCHED_SHOWS_PATH = config.gdpr_workspace_path + "/seen_episode.csv"
|
||||||
|
FOLLOWED_SHOWS_PATH = config.gdpr_workspace_path + "/followed_tv_show.csv"
|
||||||
|
MOVIES_PATH = config.gdpr_workspace_path + "/tracking-prod-records.csv"
|
||||||
def getWatchedShowsPath():
|
|
||||||
return config.GDPR_WORKSPACE_PATH + "/seen_episode.csv"
|
|
||||||
|
|
||||||
|
|
||||||
def getFollowedShowsPath():
|
|
||||||
return config.GDPR_WORKSPACE_PATH + "/followed_tv_show.csv"
|
|
||||||
|
|
||||||
|
|
||||||
def getMoviesPath():
|
|
||||||
return config.GDPR_WORKSPACE_PATH + "/tracking-prod-records.csv"
|
|
||||||
|
|
||||||
|
|
||||||
def initTraktAuth():
|
def initTraktAuth():
|
||||||
|
@ -88,10 +82,10 @@ def initTraktAuth():
|
||||||
# Set the method of authentication
|
# Set the method of authentication
|
||||||
trakt.core.AUTH_METHOD = trakt.core.OAUTH_AUTH
|
trakt.core.AUTH_METHOD = trakt.core.OAUTH_AUTH
|
||||||
return init(
|
return init(
|
||||||
config.TRAKT_USERNAME,
|
config.trakt_username,
|
||||||
store=True,
|
store=True,
|
||||||
client_id=config.CLIENT_ID,
|
client_id=config.client_id,
|
||||||
client_secret=config.CLIENT_SECRET,
|
client_secret=config.client_secret,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -213,8 +207,7 @@ def getShowByName(name, seasonNo, episodeNo):
|
||||||
|
|
||||||
# Query the local database for existing selection
|
# Query the local database for existing selection
|
||||||
userMatchedQuery = Query()
|
userMatchedQuery = Query()
|
||||||
queryResult = userMatchedShowsTable.search(
|
queryResult = userMatchedShowsTable.search(userMatchedQuery.ShowName == name)
|
||||||
userMatchedQuery.ShowName == name)
|
|
||||||
|
|
||||||
# If the local database already contains an entry for a manual selection
|
# If the local database already contains an entry for a manual selection
|
||||||
# then don't bother prompting the user to select it again!
|
# then don't bother prompting the user to select it again!
|
||||||
|
@ -339,7 +332,7 @@ def processWatchedShows():
|
||||||
# Total amount of rows in the CSV file
|
# Total amount of rows in the CSV file
|
||||||
errorStreak = 0
|
errorStreak = 0
|
||||||
# Open the CSV file within the GDPR exported data
|
# Open the CSV file within the GDPR exported data
|
||||||
with open(getWatchedShowsPath(), newline="") as csvfile:
|
with open(WATCHED_SHOWS_PATH, newline="") as csvfile:
|
||||||
# Create the CSV reader, which will break up the fields using the delimiter ','
|
# Create the CSV reader, which will break up the fields using the delimiter ','
|
||||||
showsReader = csv.DictReader(csvfile, delimiter=",")
|
showsReader = csv.DictReader(csvfile, delimiter=",")
|
||||||
# Get the total amount of rows in the CSV file,
|
# Get the total amount of rows in the CSV file,
|
||||||
|
@ -400,7 +393,7 @@ def processWatchedShows():
|
||||||
break
|
break
|
||||||
# Show the progress of the import on-screen
|
# Show the progress of the import on-screen
|
||||||
logging.info(
|
logging.info(
|
||||||
f"({rowsCount+1}/{rowsTotal}) - Processing '{tvShowName}' Season {tvShowSeasonNo} / Episode {tvShowEpisodeNo}"
|
f"({rowsCount + 1}/{rowsTotal}) - Processing '{tvShowName}' Season {tvShowSeasonNo} / Episode {tvShowEpisodeNo}"
|
||||||
)
|
)
|
||||||
# Get the season from the Trakt API
|
# Get the season from the Trakt API
|
||||||
season = traktShowObj.seasons[
|
season = traktShowObj.seasons[
|
||||||
|
@ -412,8 +405,7 @@ def processWatchedShows():
|
||||||
episode.mark_as_seen(tvShowDateWatchedConverted)
|
episode.mark_as_seen(tvShowDateWatchedConverted)
|
||||||
# Add the episode to the local database as imported, so it can be skipped,
|
# Add the episode to the local database as imported, so it can be skipped,
|
||||||
# if the process is repeated
|
# if the process is repeated
|
||||||
syncedEpisodesTable.insert(
|
syncedEpisodesTable.insert({"episodeId": tvShowEpisodeId})
|
||||||
{"episodeId": tvShowEpisodeId})
|
|
||||||
# Clear the error streak on completing the method without errors
|
# Clear the error streak on completing the method without errors
|
||||||
errorStreak = 0
|
errorStreak = 0
|
||||||
break
|
break
|
||||||
|
@ -530,8 +522,7 @@ def getMovieByName(name):
|
||||||
|
|
||||||
# Query the local database for existing selection
|
# Query the local database for existing selection
|
||||||
userMatchedQuery = Query()
|
userMatchedQuery = Query()
|
||||||
queryResult = userMatchedMoviesTable.search(
|
queryResult = userMatchedMoviesTable.search(userMatchedQuery.movie_name == name)
|
||||||
userMatchedQuery.movie_name == name)
|
|
||||||
|
|
||||||
# If the local database already contains an entry for a manual selection
|
# If the local database already contains an entry for a manual selection
|
||||||
# then don't bother prompting the user to select it again!
|
# then don't bother prompting the user to select it again!
|
||||||
|
@ -628,10 +619,10 @@ def processMovies():
|
||||||
# Total amount of rows in the CSV file
|
# Total amount of rows in the CSV file
|
||||||
errorStreak = 0
|
errorStreak = 0
|
||||||
# Open the CSV file within the GDPR exported data
|
# Open the CSV file within the GDPR exported data
|
||||||
with open(getMoviesPath(), newline="") as csvfile:
|
with open(MOVIES_PATH, newline="") as csvfile:
|
||||||
# Create the CSV reader, which will break up the fields using the delimiter ','
|
# Create the CSV reader, which will break up the fields using the delimiter ','
|
||||||
movieReaderTemp = csv.DictReader(csvfile, delimiter=",")
|
movieReaderTemp = csv.DictReader(csvfile, delimiter=",")
|
||||||
movieReader = filter(lambda p: '' != p['movie_name'], movieReaderTemp)
|
movieReader = filter(lambda p: "" != p["movie_name"], movieReaderTemp)
|
||||||
# First, list all movies with watched type so that watchlist entry for them is not created
|
# First, list all movies with watched type so that watchlist entry for them is not created
|
||||||
watchedList = []
|
watchedList = []
|
||||||
for row in movieReader:
|
for row in movieReader:
|
||||||
|
@ -663,14 +654,13 @@ def processMovies():
|
||||||
# process, and just save time overall without needing to create network requests
|
# process, and just save time overall without needing to create network requests
|
||||||
movieQuery = Query()
|
movieQuery = Query()
|
||||||
queryResult = syncedMoviesTable.search(
|
queryResult = syncedMoviesTable.search(
|
||||||
(movieQuery.movie_name == movieName) &
|
(movieQuery.movie_name == movieName) & (movieQuery.type == "watched")
|
||||||
(movieQuery.type == "watched")
|
|
||||||
)
|
)
|
||||||
|
|
||||||
watchlistQuery = Query()
|
watchlistQuery = Query()
|
||||||
queryResultWatchlist = syncedMoviesTable.search(
|
queryResultWatchlist = syncedMoviesTable.search(
|
||||||
(watchlistQuery.movie_name == movieName) &
|
(watchlistQuery.movie_name == movieName)
|
||||||
(watchlistQuery.type == "watchlist")
|
& (watchlistQuery.type == "watchlist")
|
||||||
)
|
)
|
||||||
|
|
||||||
# If the query returned no results, then continue to import it into Trakt
|
# If the query returned no results, then continue to import it into Trakt
|
||||||
|
@ -680,7 +670,8 @@ def processMovies():
|
||||||
# If movie is watched but this is an entry for watchlist, then skip
|
# If movie is watched but this is an entry for watchlist, then skip
|
||||||
if movieName in watchedList and activityType != "watch":
|
if movieName in watchedList and activityType != "watch":
|
||||||
logging.info(
|
logging.info(
|
||||||
f"Skipping '{movieName}' to avoid redundant watchlist entry.")
|
f"Skipping '{movieName}' to avoid redundant watchlist entry."
|
||||||
|
)
|
||||||
break
|
break
|
||||||
# If more than 10 errors occurred in one streak, whilst trying to import the episode
|
# If more than 10 errors occurred in one streak, whilst trying to import the episode
|
||||||
# then give up, and move onto the next episode, but warn the user.
|
# then give up, and move onto the next episode, but warn the user.
|
||||||
|
@ -702,22 +693,23 @@ def processMovies():
|
||||||
break
|
break
|
||||||
# Show the progress of the import on-screen
|
# Show the progress of the import on-screen
|
||||||
logging.info(
|
logging.info(
|
||||||
f"({rowsCount+1}/{rowsTotal}) - Processing '{movieName}'"
|
f"({rowsCount + 1}/{rowsTotal}) - Processing '{movieName}'"
|
||||||
)
|
)
|
||||||
if activityType == "watch":
|
if activityType == "watch":
|
||||||
traktMovieObj.mark_as_seen(
|
traktMovieObj.mark_as_seen(movieDateWatchedConverted)
|
||||||
movieDateWatchedConverted)
|
|
||||||
# Add the episode to the local database as imported, so it can be skipped,
|
# Add the episode to the local database as imported, so it can be skipped,
|
||||||
# if the process is repeated
|
# if the process is repeated
|
||||||
syncedMoviesTable.insert(
|
syncedMoviesTable.insert(
|
||||||
{"movie_name": movieName, "type": "watched"})
|
{"movie_name": movieName, "type": "watched"}
|
||||||
|
)
|
||||||
logging.info(f"Marked as seen")
|
logging.info(f"Marked as seen")
|
||||||
elif len(queryResultWatchlist) == 0:
|
elif len(queryResultWatchlist) == 0:
|
||||||
traktMovieObj.add_to_watchlist()
|
traktMovieObj.add_to_watchlist()
|
||||||
# Add the episode to the local database as imported, so it can be skipped,
|
# Add the episode to the local database as imported, so it can be skipped,
|
||||||
# if the process is repeated
|
# if the process is repeated
|
||||||
syncedMoviesTable.insert(
|
syncedMoviesTable.insert(
|
||||||
{"movie_name": movieName, "type": "watchlist"})
|
{"movie_name": movieName, "type": "watchlist"}
|
||||||
|
)
|
||||||
logging.info(f"Added to watchlist")
|
logging.info(f"Added to watchlist")
|
||||||
else:
|
else:
|
||||||
logging.warning(f"Already in watchlist")
|
logging.warning(f"Already in watchlist")
|
||||||
|
@ -790,8 +782,7 @@ def start():
|
||||||
menuSelection = 3 if not menuSelection else int(menuSelection)
|
menuSelection = 3 if not menuSelection else int(menuSelection)
|
||||||
break
|
break
|
||||||
except ValueError:
|
except ValueError:
|
||||||
logging.warning(
|
logging.warning("Invalid input. Please enter a numerical number.")
|
||||||
"Invalid input. Please enter a numerical number.")
|
|
||||||
# Check if the input is valid
|
# Check if the input is valid
|
||||||
if not 1 <= menuSelection <= 4:
|
if not 1 <= menuSelection <= 4:
|
||||||
logging.warning("Sorry - that's an unknown menu selection")
|
logging.warning("Sorry - that's an unknown menu selection")
|
||||||
|
@ -829,12 +820,12 @@ if __name__ == "__main__":
|
||||||
# Check that the user has created the config file
|
# Check that the user has created the config file
|
||||||
if os.path.exists("config.json"):
|
if os.path.exists("config.json"):
|
||||||
# Check that the user has provided the GDPR path
|
# Check that the user has provided the GDPR path
|
||||||
if os.path.isdir(config.GDPR_WORKSPACE_PATH):
|
if os.path.isdir(config.gdpr_workspace_path):
|
||||||
start()
|
start()
|
||||||
else:
|
else:
|
||||||
logging.error(
|
logging.error(
|
||||||
"Oops! The TV Time GDPR folder '"
|
"Oops! The TV Time GDPR folder '"
|
||||||
+ config.GDPR_WORKSPACE_PATH
|
+ config.gdpr_workspace_path
|
||||||
+ "' does not exist on the local system. Please check it, and try again."
|
+ "' does not exist on the local system. Please check it, and try again."
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
|
|
Loading…
Reference in a new issue