mirror of
https://github.com/SinTan1729/TvTimeToTrakt.git
synced 2025-04-19 17:40:01 -05:00
Simple refactor
This commit is contained in:
parent
86f63b0360
commit
a1276c6de6
1 changed files with 80 additions and 111 deletions
145
TimeToTrakt.py
145
TimeToTrakt.py
|
@ -43,15 +43,13 @@ class Config:
|
||||||
gdpr_workspace_path: str
|
gdpr_workspace_path: str
|
||||||
|
|
||||||
|
|
||||||
def is_authenticated():
|
def is_authenticated() -> bool:
|
||||||
with open("pytrakt.json") as f:
|
with open("pytrakt.json") as f:
|
||||||
data = json.load(f)
|
data = json.load(f)
|
||||||
days_before_expiration = (
|
days_before_expiration = (
|
||||||
datetime.fromtimestamp(data["OAUTH_EXPIRES_AT"]) - datetime.now()
|
datetime.fromtimestamp(data["OAUTH_EXPIRES_AT"]) - datetime.now()
|
||||||
).days
|
).days
|
||||||
if days_before_expiration < 1:
|
return days_before_expiration >= 1
|
||||||
return False
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
def get_configuration() -> Config:
|
def get_configuration() -> Config:
|
||||||
|
@ -82,7 +80,7 @@ FOLLOWED_SHOWS_PATH = config.gdpr_workspace_path + "/followed_tv_show.csv"
|
||||||
MOVIES_PATH = config.gdpr_workspace_path + "/tracking-prod-records.csv"
|
MOVIES_PATH = config.gdpr_workspace_path + "/tracking-prod-records.csv"
|
||||||
|
|
||||||
|
|
||||||
def init_trakt_auth():
|
def init_trakt_auth() -> bool:
|
||||||
if is_authenticated():
|
if is_authenticated():
|
||||||
return True
|
return True
|
||||||
# Set the method of authentication
|
# Set the method of authentication
|
||||||
|
@ -133,7 +131,7 @@ def get_year_from_title(title) -> Title:
|
||||||
# It seems to improve automation, and reduce manual selection....
|
# It seems to improve automation, and reduce manual selection....
|
||||||
|
|
||||||
|
|
||||||
def check_title_name_match(tv_time_title, trakt_title):
|
def check_title_name_match(tv_time_title: str, trakt_title: str) -> bool:
|
||||||
# If the name is a complete match, then don't bother comparing them!
|
# If the name is a complete match, then don't bother comparing them!
|
||||||
if tv_time_title == trakt_title:
|
if tv_time_title == trakt_title:
|
||||||
return True
|
return True
|
||||||
|
@ -162,7 +160,7 @@ def check_title_name_match(tv_time_title, trakt_title):
|
||||||
# in Trakt.TV either by automation, or asking the user to confirm.
|
# in Trakt.TV either by automation, or asking the user to confirm.
|
||||||
|
|
||||||
|
|
||||||
def get_show_by_name(name, season_number, episode_number):
|
def get_show_by_name(name: str, season_number: str, episode_number: str):
|
||||||
# Parse the TV Show's name for year, if one is present in the string
|
# Parse the TV Show's name for year, if one is present in the string
|
||||||
title = get_year_from_title(name)
|
title = get_year_from_title(name)
|
||||||
|
|
||||||
|
@ -170,15 +168,12 @@ def get_show_by_name(name, season_number, episode_number):
|
||||||
if title.year:
|
if title.year:
|
||||||
name = title.without_year
|
name = title.without_year
|
||||||
|
|
||||||
# Request the Trakt API for search results, using the name
|
|
||||||
tv_search = TVShow.search(name)
|
|
||||||
|
|
||||||
# Create an array of shows which have been matched
|
# Create an array of shows which have been matched
|
||||||
shows_with_same_name = []
|
shows_with_same_name = []
|
||||||
|
|
||||||
# Go through each result from the search
|
# Go through each result from the search
|
||||||
for show in tv_search:
|
for show in TVShow.search(name):
|
||||||
# Check if the title is a match, based on our conditions (e.g over 50% of words match)
|
# Check if the title is a match, based on our conditions (e.g. over 50% of words match)
|
||||||
if check_title_name_match(name, show.title):
|
if check_title_name_match(name, show.title):
|
||||||
# If the title included the year of broadcast, then we can be more picky in the results
|
# If the title included the year of broadcast, then we can be more picky in the results
|
||||||
# to look for a show with a broadcast year that matches
|
# to look for a show with a broadcast year that matches
|
||||||
|
@ -196,21 +191,17 @@ def get_show_by_name(name, season_number, episode_number):
|
||||||
else:
|
else:
|
||||||
shows_with_same_name.append(show)
|
shows_with_same_name.append(show)
|
||||||
|
|
||||||
# Sweep through the results once more for 1:1 title name matches,
|
complete_match_names = [name_from_search for name_from_search in shows_with_same_name if name_from_search.title == name]
|
||||||
# then if the list contains one entry with a 1:1 match, then clear the array
|
|
||||||
# and only use this one!
|
|
||||||
complete_match_names = []
|
|
||||||
for nameFromSearch in shows_with_same_name:
|
|
||||||
if nameFromSearch.title == name:
|
|
||||||
complete_match_names.append(nameFromSearch)
|
|
||||||
|
|
||||||
if len(complete_match_names) == 1:
|
if len(complete_match_names) == 1:
|
||||||
shows_with_same_name = complete_match_names
|
return complete_match_names[0]
|
||||||
|
elif len(shows_with_same_name) == 1:
|
||||||
|
return shows_with_same_name[0]
|
||||||
|
elif len(shows_with_same_name) < 1:
|
||||||
|
return None
|
||||||
|
else:
|
||||||
# If the search contains multiple results, then we need to confirm with the user which show
|
# If the search contains multiple results, then we need to confirm with the user which show
|
||||||
# the script should use, or access the local database to see if the user has already provided
|
# the script should use, or access the local database to see if the user has already provided
|
||||||
# a manual selection
|
# a manual selection
|
||||||
if len(shows_with_same_name) > 1:
|
|
||||||
|
|
||||||
# Query the local database for existing selection
|
# Query the local database for existing selection
|
||||||
user_matched_query = Query()
|
user_matched_query = Query()
|
||||||
|
@ -257,14 +248,14 @@ def get_show_by_name(name, season_number, episode_number):
|
||||||
"Please make a selection from above (or enter SKIP):"
|
"Please make a selection from above (or enter SKIP):"
|
||||||
)
|
)
|
||||||
|
|
||||||
if index_selected != "SKIP":
|
# Exit the loop
|
||||||
|
if index_selected == "SKIP":
|
||||||
|
break
|
||||||
|
|
||||||
# Since the value isn't 'skip', check that the result is numerical
|
# Since the value isn't 'skip', check that the result is numerical
|
||||||
index_selected = int(index_selected) - 1
|
index_selected = int(index_selected) - 1
|
||||||
# Exit the selection loop
|
# Exit the selection loop
|
||||||
break
|
break
|
||||||
# Otherwise, exit the loop
|
|
||||||
else:
|
|
||||||
break
|
|
||||||
# Still allow the user to provide the exit input, and kill the program
|
# Still allow the user to provide the exit input, and kill the program
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
sys.exit("Cancel requested...")
|
sys.exit("Cancel requested...")
|
||||||
|
@ -298,17 +289,9 @@ def get_show_by_name(name, season_number, episode_number):
|
||||||
|
|
||||||
return selected_show
|
return selected_show
|
||||||
|
|
||||||
else:
|
|
||||||
if len(shows_with_same_name) > 0:
|
|
||||||
# If the search returned only one result, then awesome!
|
|
||||||
# Return the show, so the import automation can continue.
|
|
||||||
return shows_with_same_name[0]
|
|
||||||
else:
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
# Since the Trakt.Py starts the indexing of seasons in the array from 0 (e.g. Season 1 in Index 0), then
|
||||||
# Since the Trakt.Py starts the indexing of seasons in the array from 0 (e.g Season 1 in Index 0), then
|
# subtract the TV Time numerical value by 1, so it starts from 0 as well. However, when a TV series includes
|
||||||
# subtract the TV Time numerical value by 1 so it starts from 0 as well. However, when a TV series includes
|
|
||||||
# a 'special' season, Trakt.Py will place this as the first season in the array - so, don't subtract, since
|
# a 'special' season, Trakt.Py will place this as the first season in the array - so, don't subtract, since
|
||||||
# this will match TV Time's existing value.
|
# this will match TV Time's existing value.
|
||||||
|
|
||||||
|
@ -335,9 +318,7 @@ def parse_season_number(season_number, trakt_show_obj):
|
||||||
return season_number
|
return season_number
|
||||||
|
|
||||||
|
|
||||||
def process_watched_shows():
|
def process_watched_shows() -> None:
|
||||||
# Total amount of rows in the CSV file
|
|
||||||
error_streak = 0
|
|
||||||
# Open the CSV file within the GDPR exported data
|
# Open the CSV file within the GDPR exported data
|
||||||
with open(WATCHED_SHOWS_PATH, newline="") as csvfile:
|
with open(WATCHED_SHOWS_PATH, newline="") as csvfile:
|
||||||
# Create the CSV reader, which will break up the fields using the delimiter ','
|
# Create the CSV reader, which will break up the fields using the delimiter ','
|
||||||
|
@ -355,9 +336,9 @@ def process_watched_shows():
|
||||||
# Get the TV Time Episode id
|
# Get the TV Time Episode id
|
||||||
tv_show_episode_id = row["episode_id"]
|
tv_show_episode_id = row["episode_id"]
|
||||||
# Get the TV Time Season Number
|
# Get the TV Time Season Number
|
||||||
tv_show_season_no = row["episode_season_number"]
|
tv_show_season_number = row["episode_season_number"]
|
||||||
# Get the TV Time Episode Number
|
# Get the TV Time Episode Number
|
||||||
tv_show_episode_no = row["episode_number"]
|
tv_show_episode_number = row["episode_number"]
|
||||||
# Get the date which the show was marked 'watched' in TV Time
|
# Get the date which the show was marked 'watched' in TV Time
|
||||||
tv_show_date_watched = row["updated_at"]
|
tv_show_date_watched = row["updated_at"]
|
||||||
# Parse the watched date value into a Python type
|
# Parse the watched date value into a Python type
|
||||||
|
@ -377,6 +358,7 @@ def process_watched_shows():
|
||||||
# If the query returned no results, then continue to import it into Trakt
|
# If the query returned no results, then continue to import it into Trakt
|
||||||
if len(query_result) == 0:
|
if len(query_result) == 0:
|
||||||
# Create a repeating loop, which will break on success, but repeats on failures
|
# Create a repeating loop, which will break on success, but repeats on failures
|
||||||
|
error_streak = 0
|
||||||
while True:
|
while True:
|
||||||
# If more than 10 errors occurred in one streak, whilst trying to import the episode
|
# If more than 10 errors occurred in one streak, whilst trying to import the episode
|
||||||
# then give up, and move onto the next episode, but warn the user.
|
# then give up, and move onto the next episode, but warn the user.
|
||||||
|
@ -391,24 +373,24 @@ def process_watched_shows():
|
||||||
# Other developers share the service, for free - so be considerate of your usage.
|
# Other developers share the service, for free - so be considerate of your usage.
|
||||||
time.sleep(DELAY_BETWEEN_EPISODES_IN_SECONDS)
|
time.sleep(DELAY_BETWEEN_EPISODES_IN_SECONDS)
|
||||||
# Search Trakt for the TV show matching TV Time's title value
|
# Search Trakt for the TV show matching TV Time's title value
|
||||||
trakt_show_obj = get_show_by_name(
|
trakt_show = get_show_by_name(
|
||||||
tv_show_name, tv_show_season_no, tv_show_episode_no
|
tv_show_name, tv_show_season_number, tv_show_episode_number
|
||||||
)
|
)
|
||||||
# If the method returned 'None', then this is an indication to skip the episode, and
|
# If the method returned 'None', then this is an indication to skip the episode, and
|
||||||
# move onto the next one
|
# move onto the next one
|
||||||
if trakt_show_obj is None:
|
if not trakt_show:
|
||||||
break
|
break
|
||||||
# Show the progress of the import on-screen
|
# Show the progress of the import on-screen
|
||||||
logging.info(
|
logging.info(
|
||||||
f"({rowsCount + 1}/{rows_total}) - Processing '{tv_show_name}' Season {tv_show_season_no} /"
|
f"({rowsCount + 1}/{rows_total}) - Processing '{tv_show_name}' Season {tv_show_season_number} /"
|
||||||
f"Episode {tv_show_episode_no}"
|
f"Episode {tv_show_episode_number}"
|
||||||
)
|
)
|
||||||
# Get the season from the Trakt API
|
# Get the season from the Trakt API
|
||||||
season = trakt_show_obj.seasons[
|
season = trakt_show.seasons[
|
||||||
parse_season_number(tv_show_season_no, trakt_show_obj)
|
parse_season_number(tv_show_season_number, trakt_show)
|
||||||
]
|
]
|
||||||
# Get the episode from the season
|
# Get the episode from the season
|
||||||
episode = season.episodes[int(tv_show_episode_no) - 1]
|
episode = season.episodes[int(tv_show_episode_number) - 1]
|
||||||
# Mark the episode as watched!
|
# Mark the episode as watched!
|
||||||
episode.mark_as_seen(tv_show_date_watched_converted)
|
episode.mark_as_seen(tv_show_date_watched_converted)
|
||||||
# Add the episode to the local database as imported, so it can be skipped,
|
# Add the episode to the local database as imported, so it can be skipped,
|
||||||
|
@ -421,20 +403,20 @@ def process_watched_shows():
|
||||||
# an incorrect Trakt show has been selected, with season/episodes which don't match TV Time.
|
# an incorrect Trakt show has been selected, with season/episodes which don't match TV Time.
|
||||||
# It can also occur due to a bug in Trakt Py, whereby some seasons contain an empty array of episodes.
|
# It can also occur due to a bug in Trakt Py, whereby some seasons contain an empty array of episodes.
|
||||||
except IndexError:
|
except IndexError:
|
||||||
tv_show_slug = trakt_show_obj.to_json()["shows"][0]["ids"]["ids"][
|
tv_show_slug = trakt_show.to_json()["shows"][0]["ids"]["ids"][
|
||||||
"slug"
|
"slug"
|
||||||
]
|
]
|
||||||
logging.warning(
|
logging.warning(
|
||||||
f"({rowsCount}/{rows_total}) - {tv_show_name} Season {tv_show_season_no}, "
|
f"({rowsCount}/{rows_total}) - {tv_show_name} Season {tv_show_season_number}, "
|
||||||
f"Episode {tv_show_episode_no} does not exist in Trakt! "
|
f"Episode {tv_show_episode_number} does not exist in Trakt! "
|
||||||
f"(https://trakt.tv/shows/{tv_show_slug}/seasons/{tv_show_season_no}/episodes/{tv_show_episode_no})"
|
f"(https://trakt.tv/shows/{tv_show_slug}/seasons/{tv_show_season_number}/episodes/{tv_show_episode_number})"
|
||||||
)
|
)
|
||||||
break
|
break
|
||||||
# Catch any errors which are raised because a show could not be found in Trakt
|
# Catch any errors which are raised because a show could not be found in Trakt
|
||||||
except trakt.errors.NotFoundException:
|
except trakt.errors.NotFoundException:
|
||||||
logging.warning(
|
logging.warning(
|
||||||
f"({rowsCount}/{rows_total}) - {tv_show_name} Season {tv_show_season_no}, "
|
f"({rowsCount}/{rows_total}) - {tv_show_name} Season {tv_show_season_number}, "
|
||||||
f"Episode {tv_show_episode_no} does not exist (search) in Trakt!"
|
f"Episode {tv_show_episode_number} does not exist (search) in Trakt!"
|
||||||
)
|
)
|
||||||
break
|
break
|
||||||
# Catch errors because of the program breaching the Trakt API rate limit
|
# Catch errors because of the program breaching the Trakt API rate limit
|
||||||
|
@ -452,7 +434,7 @@ def process_watched_shows():
|
||||||
except json.decoder.JSONDecodeError:
|
except json.decoder.JSONDecodeError:
|
||||||
logging.warning(
|
logging.warning(
|
||||||
f"({rowsCount}/{rows_total}) - A JSON decode error occuring whilst processing {tv_show_name} "
|
f"({rowsCount}/{rows_total}) - A JSON decode error occuring whilst processing {tv_show_name} "
|
||||||
+ f"Season {tv_show_season_no}, Episode {tv_show_episode_no}! This might occur when the server is down and has produced "
|
+ f"Season {tv_show_season_number}, Episode {tv_show_episode_number}! This might occur when the server is down and has produced "
|
||||||
+ "a HTML document instead of JSON. The script will wait 60 seconds before trying again."
|
+ "a HTML document instead of JSON. The script will wait 60 seconds before trying again."
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -467,7 +449,7 @@ def process_watched_shows():
|
||||||
# Skip the episode
|
# Skip the episode
|
||||||
else:
|
else:
|
||||||
logging.info(
|
logging.info(
|
||||||
f"({rowsCount}/{rows_total}) - Already imported, skipping '{tv_show_name}' Season {tv_show_season_no} / Episode {tv_show_episode_no}."
|
f"({rowsCount}/{rows_total}) - Already imported, skipping '{tv_show_name}' Season {tv_show_season_number} / Episode {tv_show_episode_number}."
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -475,7 +457,7 @@ def process_watched_shows():
|
||||||
# in Trakt.TV either by automation, or asking the user to confirm.
|
# in Trakt.TV either by automation, or asking the user to confirm.
|
||||||
|
|
||||||
|
|
||||||
def get_movie_by_name(name):
|
def get_movie_by_name(name: str):
|
||||||
# Parse the Movie's name for year, if one is present in the string
|
# Parse the Movie's name for year, if one is present in the string
|
||||||
title = get_year_from_title(name)
|
title = get_year_from_title(name)
|
||||||
|
|
||||||
|
@ -483,15 +465,12 @@ def get_movie_by_name(name):
|
||||||
if title.year:
|
if title.year:
|
||||||
name = title.without_year
|
name = title.without_year
|
||||||
|
|
||||||
# Request the Trakt API for search results, using the name
|
|
||||||
movie_search = Movie.search(name)
|
|
||||||
|
|
||||||
# Create an array of movies which have been matched
|
# Create an array of movies which have been matched
|
||||||
movies_with_same_name = []
|
movies_with_same_name = []
|
||||||
|
|
||||||
# Go through each result from the search
|
# Go through each result from the search
|
||||||
for movie in movie_search:
|
for movie in Movie.search(name):
|
||||||
# Check if the title is a match, based on our conditions (e.g over 50% of words match)
|
# Check if the title is a match, based on our conditions (e.g. over 50% of words match)
|
||||||
if check_title_name_match(name, movie.title):
|
if check_title_name_match(name, movie.title):
|
||||||
# If the title included the year of broadcast, then we can be more picky in the results
|
# If the title included the year of broadcast, then we can be more picky in the results
|
||||||
# to look for a movie with a broadcast year that matches
|
# to look for a movie with a broadcast year that matches
|
||||||
|
@ -509,21 +488,17 @@ def get_movie_by_name(name):
|
||||||
else:
|
else:
|
||||||
movies_with_same_name.append(movie)
|
movies_with_same_name.append(movie)
|
||||||
|
|
||||||
# Sweep through the results once more for 1:1 title name matches,
|
complete_match_names = [name_from_search for name_from_search in movies_with_same_name if name_from_search.title == name]
|
||||||
# then if the list contains one entry with a 1:1 match, then clear the array
|
|
||||||
# and only use this one!
|
|
||||||
complete_match_names = []
|
|
||||||
for nameFromSearch in movies_with_same_name:
|
|
||||||
if nameFromSearch.title == name:
|
|
||||||
complete_match_names.append(nameFromSearch)
|
|
||||||
|
|
||||||
if len(complete_match_names) == 1:
|
if len(complete_match_names) == 1:
|
||||||
movies_with_same_name = complete_match_names
|
return complete_match_names[0]
|
||||||
|
elif len(movies_with_same_name) == 1:
|
||||||
|
return movies_with_same_name[0]
|
||||||
|
elif len(movies_with_same_name) < 1:
|
||||||
|
return None
|
||||||
|
else:
|
||||||
# If the search contains multiple results, then we need to confirm with the user which movie
|
# If the search contains multiple results, then we need to confirm with the user which movie
|
||||||
# the script should use, or access the local database to see if the user has already provided
|
# the script should use, or access the local database to see if the user has already provided
|
||||||
# a manual selection
|
# a manual selection
|
||||||
if len(movies_with_same_name) > 1:
|
|
||||||
|
|
||||||
# Query the local database for existing selection
|
# Query the local database for existing selection
|
||||||
user_matched_query = Query()
|
user_matched_query = Query()
|
||||||
|
@ -610,14 +585,6 @@ def get_movie_by_name(name):
|
||||||
|
|
||||||
return selected_movie
|
return selected_movie
|
||||||
|
|
||||||
else:
|
|
||||||
if len(movies_with_same_name) > 0:
|
|
||||||
# If the search returned only one result, then awesome!
|
|
||||||
# Return the movie, so the import automation can continue.
|
|
||||||
return movies_with_same_name[0]
|
|
||||||
else:
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def process_movies():
|
def process_movies():
|
||||||
# Total amount of rows which have been processed in the CSV file
|
# Total amount of rows which have been processed in the CSV file
|
||||||
|
@ -773,6 +740,7 @@ def process_movies():
|
||||||
f"({rows_count}/{rows_total}) - Already imported, skipping '{movie_name}'."
|
f"({rows_count}/{rows_total}) - Already imported, skipping '{movie_name}'."
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def menu_selection() -> int:
|
def menu_selection() -> int:
|
||||||
# Display a menu selection
|
# Display a menu selection
|
||||||
print(">> What do you want to do?")
|
print(">> What do you want to do?")
|
||||||
|
@ -802,8 +770,13 @@ def menu_selection() -> int:
|
||||||
|
|
||||||
def start():
|
def start():
|
||||||
selection = menu_selection()
|
selection = menu_selection()
|
||||||
|
|
||||||
# Create the initial authentication with Trakt, before starting the process
|
# Create the initial authentication with Trakt, before starting the process
|
||||||
if init_trakt_auth():
|
if not init_trakt_auth():
|
||||||
|
logging.error(
|
||||||
|
"ERROR: Unable to complete authentication to Trakt - please try again."
|
||||||
|
)
|
||||||
|
|
||||||
# Start the process which is required
|
# Start the process which is required
|
||||||
if selection == 1:
|
if selection == 1:
|
||||||
# Invoke the method which will import episodes which have been watched
|
# Invoke the method which will import episodes which have been watched
|
||||||
|
@ -821,10 +794,6 @@ def start():
|
||||||
logging.info("Processing both watched shows and movies.")
|
logging.info("Processing both watched shows and movies.")
|
||||||
process_watched_shows()
|
process_watched_shows()
|
||||||
process_movies()
|
process_movies()
|
||||||
else:
|
|
||||||
logging.error(
|
|
||||||
"ERROR: Unable to complete authentication to Trakt - please try again."
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|
Loading…
Reference in a new issue