Naming to snake_case

This commit is contained in:
Markus Nyman 2023-01-12 14:49:00 +02:00
parent 0ab9772b33
commit e9c3d99c53

View file

@ -46,18 +46,18 @@ class Config:
gdpr_workspace_path: str gdpr_workspace_path: str
def isAuthenticated(): def is_authenticated():
with open("pytrakt.json") as f: with open("pytrakt.json") as f:
data = json.load(f) data = json.load(f)
daysBeforeExpiration = ( days_before_expiration = (
datetime.fromtimestamp(data["OAUTH_EXPIRES_AT"]) - datetime.now() datetime.fromtimestamp(data["OAUTH_EXPIRES_AT"]) - datetime.now()
).days ).days
if daysBeforeExpiration < 1: if days_before_expiration < 1:
return False return False
return True return True
def getConfiguration() -> Config: def get_configuration() -> Config:
try: try:
with open("config.json") as f: with open("config.json") as f:
data = json.load(f) data = json.load(f)
@ -78,15 +78,15 @@ def getConfiguration() -> Config:
) )
config = getConfiguration() config = get_configuration()
WATCHED_SHOWS_PATH = config.gdpr_workspace_path + "/seen_episode.csv" WATCHED_SHOWS_PATH = config.gdpr_workspace_path + "/seen_episode.csv"
FOLLOWED_SHOWS_PATH = config.gdpr_workspace_path + "/followed_tv_show.csv" FOLLOWED_SHOWS_PATH = config.gdpr_workspace_path + "/followed_tv_show.csv"
MOVIES_PATH = config.gdpr_workspace_path + "/tracking-prod-records.csv" MOVIES_PATH = config.gdpr_workspace_path + "/tracking-prod-records.csv"
def initTraktAuth(): def init_trakt_auth():
if isAuthenticated(): if is_authenticated():
return True return True
# Set the method of authentication # Set the method of authentication
trakt.core.AUTH_METHOD = trakt.core.OAUTH_AUTH trakt.core.AUTH_METHOD = trakt.core.OAUTH_AUTH
@ -103,18 +103,18 @@ def initTraktAuth():
# the accuracy of Trakt results. # the accuracy of Trakt results.
def getYearFromTitle(title): def get_year_from_title(title):
ex = Expando() ex = Expando()
try: try:
# Use a regex expression to get the value within the brackets e.g The Americans (2017) # Use a regex expression to get the value within the brackets e.g The Americans (2017)
yearSearch = re.search(r"\(([A-Za-z0-9_]+)\)", title) year_search = re.search(r"\(([A-Za-z0-9_]+)\)", title)
yearValue = yearSearch.group(1) year_value = year_search.group(1)
# Then, get the title without the year value included # Then, get the title without the year value included
titleValue = title.split("(")[0].strip() title_value = title.split("(")[0].strip()
# Put this together into an object # Put this together into an object
ex.titleWithoutYear = titleValue ex.titleWithoutYear = title_value
ex.yearValue = int(yearValue) ex.yearValue = int(year_value)
return ex return ex
except Exception: except Exception:
# If the above failed, then the title doesn't include a year # If the above failed, then the title doesn't include a year
@ -129,24 +129,24 @@ def getYearFromTitle(title):
# It seems to improve automation, and reduce manual selection.... # It seems to improve automation, and reduce manual selection....
def checkTitleNameMatch(tvTimeTitle, traktTitle): def check_title_name_match(tv_time_title, trakt_title):
# If the name is a complete match, then don't bother comparing them! # If the name is a complete match, then don't bother comparing them!
if tvTimeTitle == traktTitle: if tv_time_title == trakt_title:
return True return True
# Split the TvTime title # Split the TvTime title
tvTimeTitleSplit = tvTimeTitle.split() tv_time_title_split = tv_time_title.split()
# Create an array of words which are found in the Trakt title # Create an array of words which are found in the Trakt title
wordsMatched = [] words_matched = []
# Go through each word of the TV Time title, and check if it's in the Trakt title # Go through each word of the TV Time title, and check if it's in the Trakt title
for word in tvTimeTitleSplit: for word in tv_time_title_split:
if word in traktTitle: if word in trakt_title:
wordsMatched.append(word) words_matched.append(word)
# Then calculate what percentage of words matched # Then calculate what percentage of words matched
quotient = len(wordsMatched) / len(traktTitle.split()) quotient = len(words_matched) / len(trakt_title.split())
percentage = quotient * 100 percentage = quotient * 100
# If more than 50% of words in the TV Time title exist in the Trakt title, # If more than 50% of words in the TV Time title exist in the Trakt title,
@ -158,79 +158,78 @@ def checkTitleNameMatch(tvTimeTitle, traktTitle):
# in Trakt.TV either by automation, or asking the user to confirm. # in Trakt.TV either by automation, or asking the user to confirm.
def getShowByName(name, seasonNo, episodeNo): def get_show_by_name(name, season_number, episode_number):
# Parse the TV Show's name for year, if one is present in the string # Parse the TV Show's name for year, if one is present in the string
titleObj = getYearFromTitle(name) title_obj = get_year_from_title(name)
# Create a boolean to indicate if the title contains a year, # Create a boolean to indicate if the title contains a year,
# this is used later on to improve the accuracy of picking # this is used later on to improve the accuracy of picking
# from search results # from search results
doesTitleIncludeYear = titleObj.yearValue != -1 does_title_include_year = title_obj.yearValue != -1
# If the title contains a year, then replace the local variable with the stripped version # If the title contains a year, then replace the local variable with the stripped version
if doesTitleIncludeYear: if does_title_include_year:
name = titleObj.titleWithoutYear name = title_obj.titleWithoutYear
# Request the Trakt API for search results, using the name # Request the Trakt API for search results, using the name
tvSearch = TVShow.search(name) tv_search = TVShow.search(name)
# Create an array of shows which have been matched # Create an array of shows which have been matched
showsWithSameName = [] shows_with_same_name = []
# Go through each result from the search # Go through each result from the search
for show in tvSearch: for show in tv_search:
# Check if the title is a match, based on our conditions (e.g over 50% of words match) # Check if the title is a match, based on our conditions (e.g over 50% of words match)
if checkTitleNameMatch(name, show.title): if check_title_name_match(name, show.title):
# If the title included the year of broadcast, then we can be more picky in the results # If the title included the year of broadcast, then we can be more picky in the results
# to look for a show with a broadcast year that matches # to look for a show with a broadcast year that matches
if doesTitleIncludeYear: if does_title_include_year:
# If the show title is a 1:1 match, with the same broadcast year, then bingo! # If the show title is a 1:1 match, with the same broadcast year, then bingo!
if (name == show.title) and (show.year == titleObj.yearValue): if (name == show.title) and (show.year == title_obj.yearValue):
# Clear previous results, and only use this one # Clear previous results, and only use this one
showsWithSameName = [] shows_with_same_name = [show]
showsWithSameName.append(show)
break break
# Otherwise, only add the show if the broadcast year matches # Otherwise, only add the show if the broadcast year matches
if show.year == titleObj.yearValue: if show.year == title_obj.yearValue:
showsWithSameName.append(show) shows_with_same_name.append(show)
# If the program doesn't have the broadcast year, then add all the results # If the program doesn't have the broadcast year, then add all the results
else: else:
showsWithSameName.append(show) shows_with_same_name.append(show)
# Sweep through the results once more for 1:1 title name matches, # Sweep through the results once more for 1:1 title name matches,
# then if the list contains one entry with a 1:1 match, then clear the array # then if the list contains one entry with a 1:1 match, then clear the array
# and only use this one! # and only use this one!
completeMatchNames = [] complete_match_names = []
for nameFromSearch in showsWithSameName: for nameFromSearch in shows_with_same_name:
if nameFromSearch.title == name: if nameFromSearch.title == name:
completeMatchNames.append(nameFromSearch) complete_match_names.append(nameFromSearch)
if len(completeMatchNames) == 1: if len(complete_match_names) == 1:
showsWithSameName = completeMatchNames shows_with_same_name = complete_match_names
# If the search contains multiple results, then we need to confirm with the user which show # If the search contains multiple results, then we need to confirm with the user which show
# the script should use, or access the local database to see if the user has already provided # the script should use, or access the local database to see if the user has already provided
# a manual selection # a manual selection
if len(showsWithSameName) > 1: if len(shows_with_same_name) > 1:
# Query the local database for existing selection # Query the local database for existing selection
userMatchedQuery = Query() user_matched_query = Query()
queryResult = userMatchedShowsTable.search(userMatchedQuery.ShowName == name) query_result = userMatchedShowsTable.search(user_matched_query.ShowName == name)
# If the local database already contains an entry for a manual selection # If the local database already contains an entry for a manual selection
# then don't bother prompting the user to select it again! # then don't bother prompting the user to select it again!
if len(queryResult) == 1: if len(query_result) == 1:
# Get the first result from the query # Get the first result from the query
firstMatch = queryResult[0] first_match = query_result[0]
# Get the value contains the selection index # Get the value contains the selection index
firstMatchSelectedIndex = int(firstMatch.get("UserSelectedIndex")) first_match_selected_index = int(first_match.get("UserSelectedIndex"))
# Check if the user previously requested to skip the show # Check if the user previously requested to skip the show
skipShow = firstMatch.get("SkipShow") skip_show = first_match.get("SkipShow")
# If the user did not skip, but provided an index selection, get the # If the user did not skip, but provided an index selection, get the
# matching show # matching show
if not skipShow: if not skip_show:
return showsWithSameName[firstMatchSelectedIndex] return shows_with_same_name[first_match_selected_index]
# Otherwise, return None, which will trigger the script to skip # Otherwise, return None, which will trigger the script to skip
# and move onto the next show # and move onto the next show
else: else:
@ -239,27 +238,29 @@ def getShowByName(name, seasonNo, episodeNo):
# then prompt the user to make a selection # then prompt the user to make a selection
else: else:
print( print(
f"INFO - MANUAL INPUT REQUIRED: The TV Time data for Show '{name}' (Season {seasonNo}, Episode {episodeNo}) has {len(showsWithSameName)} matching Trakt shows with the same name.\a" f"INFO - MANUAL INPUT REQUIRED: The TV Time data for Show '{name}' (Season {season_number},"
f"Episode {episode_number}) has {len(shows_with_same_name)} matching Trakt shows with the same name.\a "
) )
# Output each show for manual selection # Output each show for manual selection
for idx, item in enumerate(showsWithSameName): for idx, item in enumerate(shows_with_same_name):
# Display the show's title, broadcast year, amount of seasons and a link to the Trakt page. # Display the show's title, broadcast year, amount of seasons and a link to the Trakt page.
# This will provide the user with enough information to make a selection. # This will provide the user with enough information to make a selection.
print( print(
f" ({idx + 1}) {item.title} - {item.year} - {len(item.seasons)} Season(s) - More Info: https://trakt.tv/{item.ext}" f" ({idx + 1}) {item.title} - {item.year} - {len(item.seasons)} "
f"Season(s) - More Info: https://trakt.tv/{item.ext}"
) )
while True: while True:
try: try:
# Get the user's selection, either a numerical input, or a string 'SKIP' value # Get the user's selection, either a numerical input, or a string 'SKIP' value
indexSelected = input( index_selected = input(
"Please make a selection from above (or enter SKIP):" "Please make a selection from above (or enter SKIP):"
) )
if indexSelected != "SKIP": if index_selected != "SKIP":
# Since the value isn't 'skip', check that the result is numerical # Since the value isn't 'skip', check that the result is numerical
indexSelected = int(indexSelected) - 1 index_selected = int(index_selected) - 1
# Exit the selection loop # Exit the selection loop
break break
# Otherwise, exit the loop # Otherwise, exit the loop
@ -271,12 +272,12 @@ def getShowByName(name, seasonNo, episodeNo):
# Otherwise, the user has entered an invalid value, warn the user to try again # Otherwise, the user has entered an invalid value, warn the user to try again
except Exception: except Exception:
logging.error( logging.error(
f"Sorry! Please select a value between 0 to {len(showsWithSameName)}" f"Sorry! Please select a value between 0 to {len(shows_with_same_name)}"
) )
# If the user entered 'SKIP', then exit from the loop with no selection, which # If the user entered 'SKIP', then exit from the loop with no selection, which
# will trigger the program to move onto the next episode # will trigger the program to move onto the next episode
if indexSelected == "SKIP": if index_selected == "SKIP":
# Record that the user has skipped the TV Show for import, so that # Record that the user has skipped the TV Show for import, so that
# manual input isn't required everytime # manual input isn't required everytime
userMatchedShowsTable.insert( userMatchedShowsTable.insert(
@ -286,23 +287,23 @@ def getShowByName(name, seasonNo, episodeNo):
return None return None
# Otherwise, return the selection which the user made from the list # Otherwise, return the selection which the user made from the list
else: else:
selectedShow = showsWithSameName[int(indexSelected)] selected_show = shows_with_same_name[int(index_selected)]
userMatchedShowsTable.insert( userMatchedShowsTable.insert(
{ {
"ShowName": name, "ShowName": name,
"UserSelectedIndex": indexSelected, "UserSelectedIndex": index_selected,
"SkipShow": False, "SkipShow": False,
} }
) )
return selectedShow return selected_show
else: else:
if len(showsWithSameName) > 0: if len(shows_with_same_name) > 0:
# If the search returned only one result, then awesome! # If the search returned only one result, then awesome!
# Return the show, so the import automation can continue. # Return the show, so the import automation can continue.
return showsWithSameName[0] return shows_with_same_name[0]
else: else:
return None return None
@ -313,76 +314,74 @@ def getShowByName(name, seasonNo, episodeNo):
# this will match TV Time's existing value. # this will match TV Time's existing value.
def parseSeasonNo(seasonNo, traktShowObj): def parse_season_number(season_number, trakt_show_obj):
# Parse the season number into a numerical value # Parse the season number into a numerical value
seasonNo = int(seasonNo) season_number = int(season_number)
# Then get the Season Number from the first item in the array # Then get the Season Number from the first item in the array
firstSeasonNo = traktShowObj.seasons[0].number first_season_no = trakt_show_obj.seasons[0].number
# If the season number is 0, then the Trakt show contains a "special" season # If the season number is 0, then the Trakt show contains a "special" season
if firstSeasonNo == 0: if first_season_no == 0:
# No need to modify the value, as the TV Time value will match Trakt # No need to modify the value, as the TV Time value will match Trakt
return seasonNo return season_number
# Otherwise, if the Trakt seasons start with no specials, then return the seasonNo, # Otherwise, if the Trakt seasons start with no specials, then return the seasonNo,
# but subtracted by one (e.g Season 1 in TV Time, will be 0) # but subtracted by one (e.g Season 1 in TV Time, will be 0)
else: else:
# Only subtract if the TV Time season number is greater than 0. # Only subtract if the TV Time season number is greater than 0.
if seasonNo != 0: if season_number != 0:
return seasonNo - 1 return season_number - 1
# Otherwise, the TV Time season is a special! Then you don't need to change the starting position # Otherwise, the TV Time season is a special! Then you don't need to change the starting position
else: else:
return seasonNo return season_number
def processWatchedShows(): def process_watched_shows():
# Total amount of rows which have been processed in the CSV file
rowsCount = 0
# Total amount of rows in the CSV file # Total amount of rows in the CSV file
errorStreak = 0 error_streak = 0
# Open the CSV file within the GDPR exported data # Open the CSV file within the GDPR exported data
with open(WATCHED_SHOWS_PATH, newline="") as csvfile: with open(WATCHED_SHOWS_PATH, newline="") as csvfile:
# Create the CSV reader, which will break up the fields using the delimiter ',' # Create the CSV reader, which will break up the fields using the delimiter ','
showsReader = csv.DictReader(csvfile, delimiter=",") shows_reader = csv.DictReader(csvfile, delimiter=",")
# Get the total amount of rows in the CSV file, # Get the total amount of rows in the CSV file,
rowsTotal = len(list(showsReader)) rows_total = len(list(shows_reader))
# Move position to the beginning of the file # Move position to the beginning of the file
csvfile.seek(0, 0) csvfile.seek(0, 0)
# Loop through each line/record of the CSV file # Loop through each line/record of the CSV file
# Ignore the header row # Ignore the header row
next(showsReader, None) next(shows_reader, None)
for rowsCount, row in enumerate(showsReader): for rowsCount, row in enumerate(shows_reader):
# Get the name of the TV show # Get the name of the TV show
tvShowName = row["tv_show_name"] tv_show_name = row["tv_show_name"]
# Get the TV Time Episode Id # Get the TV Time Episode id
tvShowEpisodeId = row["episode_id"] tv_show_episode_id = row["episode_id"]
# Get the TV Time Season Number # Get the TV Time Season Number
tvShowSeasonNo = row["episode_season_number"] tv_show_season_no = row["episode_season_number"]
# Get the TV Time Episode Number # Get the TV Time Episode Number
tvShowEpisodeNo = row["episode_number"] tv_show_episode_no = row["episode_number"]
# Get the date which the show was marked 'watched' in TV Time # Get the date which the show was marked 'watched' in TV Time
tvShowDateWatched = row["updated_at"] tv_show_date_watched = row["updated_at"]
# Parse the watched date value into a Python type # Parse the watched date value into a Python type
tvShowDateWatchedConverted = datetime.strptime( tv_show_date_watched_converted = datetime.strptime(
tvShowDateWatched, "%Y-%m-%d %H:%M:%S" tv_show_date_watched, "%Y-%m-%d %H:%M:%S"
) )
# Query the local database for previous entries indicating that # Query the local database for previous entries indicating that
# the episode has already been imported in the past. Which will # the episode has already been imported in the past. Which will
# ease pressure on TV Time's API server during a retry of the import # ease pressure on TV Time's API server during a retry of the import
# process, and just save time overall without needing to create network requests # process, and just save time overall without needing to create network requests
episodeCompletedQuery = Query() episode_completed_query = Query()
queryResult = syncedEpisodesTable.search( query_result = syncedEpisodesTable.search(
episodeCompletedQuery.episodeId == tvShowEpisodeId episode_completed_query.episodeId == tv_show_episode_id
) )
# If the query returned no results, then continue to import it into Trakt # If the query returned no results, then continue to import it into Trakt
if len(queryResult) == 0: if len(query_result) == 0:
# Create a repeating loop, which will break on success, but repeats on failures # Create a repeating loop, which will break on success, but repeats on failures
while True: while True:
# If more than 10 errors occurred in one streak, whilst trying to import the episode # If more than 10 errors occurred in one streak, whilst trying to import the episode
# then give up, and move onto the next episode, but warn the user. # then give up, and move onto the next episode, but warn the user.
if errorStreak > 10: if error_streak > 10:
logging.warning( logging.warning(
"An error occurred 10 times in a row... skipping episode..." "An error occurred 10 times in a row... skipping episode..."
) )
@ -393,46 +392,50 @@ def processWatchedShows():
# Other developers share the service, for free - so be considerate of your usage. # Other developers share the service, for free - so be considerate of your usage.
time.sleep(DELAY_BETWEEN_EPISODES_IN_SECONDS) time.sleep(DELAY_BETWEEN_EPISODES_IN_SECONDS)
# Search Trakt for the TV show matching TV Time's title value # Search Trakt for the TV show matching TV Time's title value
traktShowObj = getShowByName( trakt_show_obj = get_show_by_name(
tvShowName, tvShowSeasonNo, tvShowEpisodeNo tv_show_name, tv_show_season_no, tv_show_episode_no
) )
# If the method returned 'None', then this is an indication to skip the episode, and # If the method returned 'None', then this is an indication to skip the episode, and
# move onto the next one # move onto the next one
if traktShowObj is None: if trakt_show_obj is None:
break break
# Show the progress of the import on-screen # Show the progress of the import on-screen
logging.info( logging.info(
f"({rowsCount + 1}/{rowsTotal}) - Processing '{tvShowName}' Season {tvShowSeasonNo} / Episode {tvShowEpisodeNo}" f"({rowsCount + 1}/{rows_total}) - Processing '{tv_show_name}' Season {tv_show_season_no} /"
f"Episode {tv_show_episode_no}"
) )
# Get the season from the Trakt API # Get the season from the Trakt API
season = traktShowObj.seasons[ season = trakt_show_obj.seasons[
parseSeasonNo(tvShowSeasonNo, traktShowObj) parse_season_number(tv_show_season_no, trakt_show_obj)
] ]
# Get the episode from the season # Get the episode from the season
episode = season.episodes[int(tvShowEpisodeNo) - 1] episode = season.episodes[int(tv_show_episode_no) - 1]
# Mark the episode as watched! # Mark the episode as watched!
episode.mark_as_seen(tvShowDateWatchedConverted) episode.mark_as_seen(tv_show_date_watched_converted)
# Add the episode to the local database as imported, so it can be skipped, # Add the episode to the local database as imported, so it can be skipped,
# if the process is repeated # if the process is repeated
syncedEpisodesTable.insert({"episodeId": tvShowEpisodeId}) syncedEpisodesTable.insert({"episodeId": tv_show_episode_id})
# Clear the error streak on completing the method without errors # Clear the error streak on completing the method without errors
errorStreak = 0 error_streak = 0
break break
# Catch errors which occur because of an incorrect array index. This occurs when # Catch errors which occur because of an incorrect array index. This occurs when
# an incorrect Trakt show has been selected, with season/episodes which don't match TV Time. # an incorrect Trakt show has been selected, with season/episodes which don't match TV Time.
# It can also occur due to a bug in Trakt Py, whereby some seasons contain an empty array of episodes. # It can also occur due to a bug in Trakt Py, whereby some seasons contain an empty array of episodes.
except IndexError: except IndexError:
tvShowSlug = traktShowObj.to_json()["shows"][0]["ids"]["ids"][ tv_show_slug = trakt_show_obj.to_json()["shows"][0]["ids"]["ids"][
"slug" "slug"
] ]
logging.warning( logging.warning(
f"({rowsCount}/{rowsTotal}) - {tvShowName} Season {tvShowSeasonNo}, Episode {tvShowEpisodeNo} does not exist in Trakt! (https://trakt.tv/shows/{tvShowSlug}/seasons/{tvShowSeasonNo}/episodes/{tvShowEpisodeNo})" f"({rowsCount}/{rows_total}) - {tv_show_name} Season {tv_show_season_no}, "
f"Episode {tv_show_episode_no} does not exist in Trakt! "
f"(https://trakt.tv/shows/{tv_show_slug}/seasons/{tv_show_season_no}/episodes/{tv_show_episode_no})"
) )
break break
# Catch any errors which are raised because a show could not be found in Trakt # Catch any errors which are raised because a show could not be found in Trakt
except trakt.errors.NotFoundException: except trakt.errors.NotFoundException:
logging.warning( logging.warning(
f"({rowsCount}/{rowsTotal}) - {tvShowName} Season {tvShowSeasonNo}, Episode {tvShowEpisodeNo} does not exist (search) in Trakt!" f"({rowsCount}/{rows_total}) - {tv_show_name} Season {tv_show_season_no}, "
f"Episode {tv_show_episode_no} does not exist (search) in Trakt!"
) )
break break
# Catch errors because of the program breaching the Trakt API rate limit # Catch errors because of the program breaching the Trakt API rate limit
@ -445,12 +448,12 @@ def processWatchedShows():
time.sleep(60) time.sleep(60)
# Mark the exception in the error streak # Mark the exception in the error streak
errorStreak += 1 error_streak += 1
# Catch a JSON decode error - this can be raised when the API server is down and produces a HTML page, instead of JSON # Catch a JSON decode error - this can be raised when the API server is down and produces a HTML page, instead of JSON
except json.decoder.JSONDecodeError: except json.decoder.JSONDecodeError:
logging.warning( logging.warning(
f"({rowsCount}/{rowsTotal}) - A JSON decode error occuring whilst processing {tvShowName} " f"({rowsCount}/{rows_total}) - A JSON decode error occuring whilst processing {tv_show_name} "
+ f"Season {tvShowSeasonNo}, Episode {tvShowEpisodeNo}! This might occur when the server is down and has produced " + f"Season {tv_show_season_no}, Episode {tv_show_episode_no}! This might occur when the server is down and has produced "
+ "a HTML document instead of JSON. The script will wait 60 seconds before trying again." + "a HTML document instead of JSON. The script will wait 60 seconds before trying again."
) )
@ -458,14 +461,14 @@ def processWatchedShows():
time.sleep(60) time.sleep(60)
# Mark the exception in the error streak # Mark the exception in the error streak
errorStreak += 1 error_streak += 1
# Catch a CTRL + C keyboard input, and exits the program # Catch a CTRL + C keyboard input, and exits the program
except KeyboardInterrupt: except KeyboardInterrupt:
sys.exit("Cancel requested...") sys.exit("Cancel requested...")
# Skip the episode # Skip the episode
else: else:
logging.info( logging.info(
f"({rowsCount}/{rowsTotal}) - Already imported, skipping '{tvShowName}' Season {tvShowSeasonNo} / Episode {tvShowEpisodeNo}." f"({rowsCount}/{rows_total}) - Already imported, skipping '{tv_show_name}' Season {tv_show_season_no} / Episode {tv_show_episode_no}."
) )
@ -473,79 +476,78 @@ def processWatchedShows():
# in Trakt.TV either by automation, or asking the user to confirm. # in Trakt.TV either by automation, or asking the user to confirm.
def getMovieByName(name): def get_movie_by_name(name):
# Parse the Movie's name for year, if one is present in the string # Parse the Movie's name for year, if one is present in the string
titleObj = getYearFromTitle(name) title_obj = get_year_from_title(name)
# Create a boolean to indicate if the title contains a year, # Create a boolean to indicate if the title contains a year,
# this is used later on to improve the accuracy of picking # this is used later on to improve the accuracy of picking
# from search results # from search results
doesTitleIncludeYear = titleObj.yearValue != -1 does_title_include_year = title_obj.yearValue != -1
# If the title contains a year, then replace the local variable with the stripped version # If the title contains a year, then replace the local variable with the stripped version
if doesTitleIncludeYear: if does_title_include_year:
name = titleObj.titleWithoutYear name = title_obj.titleWithoutYear
# Request the Trakt API for search results, using the name # Request the Trakt API for search results, using the name
movieSearch = Movie.search(name) movie_search = Movie.search(name)
# Create an array of movies which have been matched # Create an array of movies which have been matched
moviesWithSameName = [] movies_with_same_name = []
# Go through each result from the search # Go through each result from the search
for movie in movieSearch: for movie in movie_search:
# Check if the title is a match, based on our conditions (e.g over 50% of words match) # Check if the title is a match, based on our conditions (e.g over 50% of words match)
if checkTitleNameMatch(name, movie.title): if check_title_name_match(name, movie.title):
# If the title included the year of broadcast, then we can be more picky in the results # If the title included the year of broadcast, then we can be more picky in the results
# to look for a movie with a broadcast year that matches # to look for a movie with a broadcast year that matches
if doesTitleIncludeYear: if does_title_include_year:
# If the movie title is a 1:1 match, with the same broadcast year, then bingo! # If the movie title is a 1:1 match, with the same broadcast year, then bingo!
if (name == movie.title) and (movie.year == titleObj.yearValue): if (name == movie.title) and (movie.year == title_obj.yearValue):
# Clear previous results, and only use this one # Clear previous results, and only use this one
moviesWithSameName = [] movies_with_same_name = [movie]
moviesWithSameName.append(movie)
break break
# Otherwise, only add the movie if the broadcast year matches # Otherwise, only add the movie if the broadcast year matches
if movie.year == titleObj.yearValue: if movie.year == title_obj.yearValue:
moviesWithSameName.append(movie) movies_with_same_name.append(movie)
# If the program doesn't have the broadcast year, then add all the results # If the program doesn't have the broadcast year, then add all the results
else: else:
moviesWithSameName.append(movie) movies_with_same_name.append(movie)
# Sweep through the results once more for 1:1 title name matches, # Sweep through the results once more for 1:1 title name matches,
# then if the list contains one entry with a 1:1 match, then clear the array # then if the list contains one entry with a 1:1 match, then clear the array
# and only use this one! # and only use this one!
completeMatchNames = [] complete_match_names = []
for nameFromSearch in moviesWithSameName: for nameFromSearch in movies_with_same_name:
if nameFromSearch.title == name: if nameFromSearch.title == name:
completeMatchNames.append(nameFromSearch) complete_match_names.append(nameFromSearch)
if len(completeMatchNames) == 1: if len(complete_match_names) == 1:
moviesWithSameName = completeMatchNames movies_with_same_name = complete_match_names
# If the search contains multiple results, then we need to confirm with the user which movie # If the search contains multiple results, then we need to confirm with the user which movie
# the script should use, or access the local database to see if the user has already provided # the script should use, or access the local database to see if the user has already provided
# a manual selection # a manual selection
if len(moviesWithSameName) > 1: if len(movies_with_same_name) > 1:
# Query the local database for existing selection # Query the local database for existing selection
userMatchedQuery = Query() user_matched_query = Query()
queryResult = userMatchedMoviesTable.search(userMatchedQuery.movie_name == name) query_result = userMatchedMoviesTable.search(user_matched_query.movie_name == name)
# If the local database already contains an entry for a manual selection # If the local database already contains an entry for a manual selection
# then don't bother prompting the user to select it again! # then don't bother prompting the user to select it again!
if len(queryResult) == 1: if len(query_result) == 1:
# Get the first result from the query # Get the first result from the query
firstMatch = queryResult[0] first_match = query_result[0]
# Get the value contains the selection index # Get the value contains the selection index
firstMatchSelectedIndex = int(firstMatch.get("UserSelectedIndex")) first_match_selected_index = int(first_match.get("UserSelectedIndex"))
# Check if the user previously requested to skip the movie # Check if the user previously requested to skip the movie
skipMovie = firstMatch.get("SkipMovie") skip_movie = first_match.get("SkipMovie")
# If the user did not skip, but provided an index selection, get the # If the user did not skip, but provided an index selection, get the
# matching movie # matching movie
if not skipMovie: if not skip_movie:
return moviesWithSameName[firstMatchSelectedIndex] return movies_with_same_name[first_match_selected_index]
# Otherwise, return None, which will trigger the script to skip # Otherwise, return None, which will trigger the script to skip
# and move onto the next movie # and move onto the next movie
else: else:
@ -554,11 +556,12 @@ def getMovieByName(name):
# then prompt the user to make a selection # then prompt the user to make a selection
else: else:
print( print(
f"INFO - MANUAL INPUT REQUIRED: The TV Time data for Movie '{name}' has {len(moviesWithSameName)} matching Trakt movies with the same name.\a" f"INFO - MANUAL INPUT REQUIRED: The TV Time data for Movie '{name}' has {len(movies_with_same_name)} "
f"matching Trakt movies with the same name.\a"
) )
# Output each movie for manual selection # Output each movie for manual selection
for idx, item in enumerate(moviesWithSameName): for idx, item in enumerate(movies_with_same_name):
# Display the movie's title, broadcast year, amount of seasons and a link to the Trakt page. # Display the movie's title, broadcast year, amount of seasons and a link to the Trakt page.
# This will provide the user with enough information to make a selection. # This will provide the user with enough information to make a selection.
print( print(
@ -568,13 +571,13 @@ def getMovieByName(name):
while True: while True:
try: try:
# Get the user's selection, either a numerical input, or a string 'SKIP' value # Get the user's selection, either a numerical input, or a string 'SKIP' value
indexSelected = input( index_selected = input(
"Please make a selection from above (or enter SKIP):" "Please make a selection from above (or enter SKIP):"
) )
if indexSelected != "SKIP": if index_selected != "SKIP":
# Since the value isn't 'skip', check that the result is numerical # Since the value isn't 'skip', check that the result is numerical
indexSelected = int(indexSelected) - 1 index_selected = int(index_selected) - 1
# Exit the selection loop # Exit the selection loop
break break
# Otherwise, exit the loop # Otherwise, exit the loop
@ -586,12 +589,12 @@ def getMovieByName(name):
# Otherwise, the user has entered an invalid value, warn the user to try again # Otherwise, the user has entered an invalid value, warn the user to try again
except Exception: except Exception:
logging.error( logging.error(
f"Sorry! Please select a value between 0 to {len(moviesWithSameName)}" f"Sorry! Please select a value between 0 to {len(movies_with_same_name)}"
) )
# If the user entered 'SKIP', then exit from the loop with no selection, which # If the user entered 'SKIP', then exit from the loop with no selection, which
# will trigger the program to move onto the next episode # will trigger the program to move onto the next episode
if indexSelected == "SKIP": if index_selected == "SKIP":
# Record that the user has skipped the Movie for import, so that # Record that the user has skipped the Movie for import, so that
# manual input isn't required everytime # manual input isn't required everytime
userMatchedMoviesTable.insert( userMatchedMoviesTable.insert(
@ -601,90 +604,89 @@ def getMovieByName(name):
return None return None
# Otherwise, return the selection which the user made from the list # Otherwise, return the selection which the user made from the list
else: else:
selectedMovie = moviesWithSameName[int(indexSelected)] selected_movie = movies_with_same_name[int(index_selected)]
userMatchedMoviesTable.insert( userMatchedMoviesTable.insert(
{ {
"movie_name": name, "movie_name": name,
"UserSelectedIndex": indexSelected, "UserSelectedIndex": index_selected,
"SkipMovie": False, "SkipMovie": False,
} }
) )
return selectedMovie return selected_movie
else: else:
if len(moviesWithSameName) > 0: if len(movies_with_same_name) > 0:
# If the search returned only one result, then awesome! # If the search returned only one result, then awesome!
# Return the movie, so the import automation can continue. # Return the movie, so the import automation can continue.
return moviesWithSameName[0] return movies_with_same_name[0]
else: else:
return None return None
def processMovies(): def process_movies():
# Total amount of rows which have been processed in the CSV file # Total amount of rows which have been processed in the CSV file
rowsCount = 0
# Total amount of rows in the CSV file # Total amount of rows in the CSV file
errorStreak = 0 error_streak = 0
# Open the CSV file within the GDPR exported data # Open the CSV file within the GDPR exported data
with open(MOVIES_PATH, newline="") as csvfile: with open(MOVIES_PATH, newline="") as csvfile:
# Create the CSV reader, which will break up the fields using the delimiter ',' # Create the CSV reader, which will break up the fields using the delimiter ','
movieReaderTemp = csv.DictReader(csvfile, delimiter=",") movie_reader_temp = csv.DictReader(csvfile, delimiter=",")
movieReader = filter(lambda p: "" != p["movie_name"], movieReaderTemp) movie_reader = filter(lambda p: "" != p["movie_name"], movie_reader_temp)
# First, list all movies with watched type so that watchlist entry for them is not created # First, list all movies with watched type so that watchlist entry for them is not created
watchedList = [] watched_list = []
for row in movieReader: for row in movie_reader:
if row["type"] == "watch": if row["type"] == "watch":
watchedList.append(row["movie_name"]) watched_list.append(row["movie_name"])
# Move position to the beginning of the file # Move position to the beginning of the file
csvfile.seek(0, 0) csvfile.seek(0, 0)
# Get the total amount of rows in the CSV file, # Get the total amount of rows in the CSV file,
rowsTotal = len(list(movieReader)) rows_total = len(list(movie_reader))
# Move position to the beginning of the file # Move position to the beginning of the file
csvfile.seek(0, 0) csvfile.seek(0, 0)
# Loop through each line/record of the CSV file # Loop through each line/record of the CSV file
# Ignore the header row # Ignore the header row
next(movieReader, None) next(movie_reader, None)
for rowsCount, row in enumerate(movieReader): for rows_count, row in enumerate(movie_reader):
# Get the name of the Movie # Get the name of the Movie
movieName = row["movie_name"] movie_name = row["movie_name"]
# Get the date which the movie was marked 'watched' in TV Time # Get the date which the movie was marked 'watched' in TV Time
activityType = row["type"] activity_type = row["type"]
movieDateWatched = row["updated_at"] movie_date_watched = row["updated_at"]
# Parse the watched date value into a Python type # Parse the watched date value into a Python type
movieDateWatchedConverted = datetime.strptime( movie_date_watched_converted = datetime.strptime(
movieDateWatched, "%Y-%m-%d %H:%M:%S" movie_date_watched, "%Y-%m-%d %H:%M:%S"
) )
# Query the local database for previous entries indicating that # Query the local database for previous entries indicating that
# the episode has already been imported in the past. Which will # the episode has already been imported in the past. Which will
# ease pressure on TV Time's API server during a retry of the import # ease pressure on TV Time's API server during a retry of the import
# process, and just save time overall without needing to create network requests # process, and just save time overall without needing to create network requests
movieQuery = Query() movie_query = Query()
queryResult = syncedMoviesTable.search( query_result = syncedMoviesTable.search(
(movieQuery.movie_name == movieName) & (movieQuery.type == "watched") (movie_query.movie_name == movie_name) & (movie_query.type == "watched")
) )
watchlistQuery = Query() watchlist_query = Query()
queryResultWatchlist = syncedMoviesTable.search( query_result_watchlist = syncedMoviesTable.search(
(watchlistQuery.movie_name == movieName) (watchlist_query.movie_name == movie_name)
& (watchlistQuery.type == "watchlist") & (watchlist_query.type == "watchlist")
) )
# If the query returned no results, then continue to import it into Trakt # If the query returned no results, then continue to import it into Trakt
if len(queryResult) == 0: if len(query_result) == 0:
# Create a repeating loop, which will break on success, but repeats on failures # Create a repeating loop, which will break on success, but repeats on failures
while True: while True:
# If movie is watched but this is an entry for watchlist, then skip # If movie is watched but this is an entry for watchlist, then skip
if movieName in watchedList and activityType != "watch": if movie_name in watched_list and activity_type != "watch":
logging.info( logging.info(
f"Skipping '{movieName}' to avoid redundant watchlist entry." f"Skipping '{movie_name}' to avoid redundant watchlist entry."
) )
break break
# If more than 10 errors occurred in one streak, whilst trying to import the episode # If more than 10 errors occurred in one streak, whilst trying to import the episode
# then give up, and move onto the next episode, but warn the user. # then give up, and move onto the next episode, but warn the user.
if errorStreak > 10: if error_streak > 10:
logging.warning( logging.warning(
"An error occurred 10 times in a row... skipping episode..." "An error occurred 10 times in a row... skipping episode..."
) )
@ -695,51 +697,52 @@ def processMovies():
# Other developers share the service, for free - so be considerate of your usage. # Other developers share the service, for free - so be considerate of your usage.
time.sleep(DELAY_BETWEEN_EPISODES_IN_SECONDS) time.sleep(DELAY_BETWEEN_EPISODES_IN_SECONDS)
# Search Trakt for the Movie matching TV Time's title value # Search Trakt for the Movie matching TV Time's title value
traktMovieObj = getMovieByName(movieName) trakt_movie_obj = get_movie_by_name(movie_name)
# If the method returned 'None', then this is an indication to skip the episode, and # If the method returned 'None', then this is an indication to skip the episode, and
# move onto the next one # move onto the next one
if traktMovieObj is None: if trakt_movie_obj is None:
break break
# Show the progress of the import on-screen # Show the progress of the import on-screen
logging.info( logging.info(
f"({rowsCount + 1}/{rowsTotal}) - Processing '{movieName}'" f"({rows_count + 1}/{rows_total}) - Processing '{movie_name}'"
) )
if activityType == "watch": if activity_type == "watch":
traktMovieObj.mark_as_seen(movieDateWatchedConverted) trakt_movie_obj.mark_as_seen(movie_date_watched_converted)
# Add the episode to the local database as imported, so it can be skipped, # Add the episode to the local database as imported, so it can be skipped,
# if the process is repeated # if the process is repeated
syncedMoviesTable.insert( syncedMoviesTable.insert(
{"movie_name": movieName, "type": "watched"} {"movie_name": movie_name, "type": "watched"}
) )
logging.info(f"Marked as seen") logging.info(f"Marked as seen")
elif len(queryResultWatchlist) == 0: elif len(query_result_watchlist) == 0:
traktMovieObj.add_to_watchlist() trakt_movie_obj.add_to_watchlist()
# Add the episode to the local database as imported, so it can be skipped, # Add the episode to the local database as imported, so it can be skipped,
# if the process is repeated # if the process is repeated
syncedMoviesTable.insert( syncedMoviesTable.insert(
{"movie_name": movieName, "type": "watchlist"} {"movie_name": movie_name, "type": "watchlist"}
) )
logging.info(f"Added to watchlist") logging.info(f"Added to watchlist")
else: else:
logging.warning(f"Already in watchlist") logging.warning(f"Already in watchlist")
# Clear the error streak on completing the method without errors # Clear the error streak on completing the method without errors
errorStreak = 0 error_streak = 0
break break
# Catch errors which occur because of an incorrect array index. This occurs when # Catch errors which occur because of an incorrect array index. This occurs when
# an incorrect Trakt movie has been selected, with season/episodes which don't match TV Time. # an incorrect Trakt movie has been selected, with season/episodes which don't match TV Time.
# It can also occur due to a bug in Trakt Py, whereby some seasons contain an empty array of episodes. # It can also occur due to a bug in Trakt Py, whereby some seasons contain an empty array of episodes.
except IndexError: except IndexError:
movieSlug = traktMovieObj.to_json()["movies"][0]["ids"]["ids"][ movie_slug = trakt_movie_obj.to_json()["movies"][0]["ids"]["ids"][
"slug" "slug"
] ]
logging.warning( logging.warning(
f"({rowsCount}/{rowsTotal}) - {movieName} does not exist in Trakt! (https://trakt.tv/movies/{movieSlug}/)" f"({rows_count}/{rows_total}) - {movie_name} "
f"does not exist in Trakt! (https://trakt.tv/movies/{movie_slug}/)"
) )
break break
# Catch any errors which are raised because a movie could not be found in Trakt # Catch any errors which are raised because a movie could not be found in Trakt
except trakt.errors.NotFoundException: except trakt.errors.NotFoundException:
logging.warning( logging.warning(
f"({rowsCount}/{rowsTotal}) - {movieName} does not exist (search) in Trakt!" f"({rows_count}/{rows_total}) - {movie_name} does not exist (search) in Trakt!"
) )
break break
# Catch errors because of the program breaching the Trakt API rate limit # Catch errors because of the program breaching the Trakt API rate limit
@ -752,11 +755,11 @@ def processMovies():
time.sleep(60) time.sleep(60)
# Mark the exception in the error streak # Mark the exception in the error streak
errorStreak += 1 error_streak += 1
# Catch a JSON decode error - this can be raised when the API server is down and produces a HTML page, instead of JSON # Catch a JSON decode error - this can be raised when the API server is down and produces a HTML page, instead of JSON
except json.decoder.JSONDecodeError: except json.decoder.JSONDecodeError:
logging.warning( logging.warning(
f"({rowsCount}/{rowsTotal}) - A JSON decode error occuring whilst processing {movieName} " f"({rows_count}/{rows_total}) - A JSON decode error occuring whilst processing {movie_name} "
+ f" This might occur when the server is down and has produced " + f" This might occur when the server is down and has produced "
+ "a HTML document instead of JSON. The script will wait 60 seconds before trying again." + "a HTML document instead of JSON. The script will wait 60 seconds before trying again."
) )
@ -765,7 +768,7 @@ def processMovies():
time.sleep(60) time.sleep(60)
# Mark the exception in the error streak # Mark the exception in the error streak
errorStreak += 1 error_streak += 1
# Catch a CTRL + C keyboard input, and exits the program # Catch a CTRL + C keyboard input, and exits the program
except KeyboardInterrupt: except KeyboardInterrupt:
sys.exit("Cancel requested...") sys.exit("Cancel requested...")
@ -773,7 +776,7 @@ def processMovies():
# Skip the episode # Skip the episode
else: else:
logging.info( logging.info(
f"({rowsCount}/{rowsTotal}) - Already imported, skipping '{movieName}'." f"({rows_count}/{rows_total}) - Already imported, skipping '{movie_name}'."
) )
@ -787,38 +790,38 @@ def start():
while True: while True:
try: try:
menuSelection = input("Enter your menu selection: ") menu_selection = input("Enter your menu selection: ")
menuSelection = 3 if not menuSelection else int(menuSelection) menu_selection = 3 if not menu_selection else int(menu_selection)
break break
except ValueError: except ValueError:
logging.warning("Invalid input. Please enter a numerical number.") logging.warning("Invalid input. Please enter a numerical number.")
# Check if the input is valid # Check if the input is valid
if not 1 <= menuSelection <= 4: if not 1 <= menu_selection <= 4:
logging.warning("Sorry - that's an unknown menu selection") logging.warning("Sorry - that's an unknown menu selection")
exit() exit()
# Exit if the 4th option was chosen # Exit if the 4th option was chosen
if menuSelection == 4: if menu_selection == 4:
logging.info("Exiting as per user's selection.") logging.info("Exiting as per user's selection.")
exit() exit()
# Create the initial authentication with Trakt, before starting the process # Create the initial authentication with Trakt, before starting the process
if initTraktAuth(): if init_trakt_auth():
# Start the process which is required # Start the process which is required
if menuSelection == 1: if menu_selection == 1:
# Invoke the method which will import episodes which have been watched # Invoke the method which will import episodes which have been watched
# from TV Time into Trakt # from TV Time into Trakt
logging.info("Processing watched shows.") logging.info("Processing watched shows.")
processWatchedShows() process_watched_shows()
# TODO: Add support for followed shows # TODO: Add support for followed shows
elif menuSelection == 2: elif menu_selection == 2:
# Invoke the method which will import movies which have been watched # Invoke the method which will import movies which have been watched
# from TV Time into Trakt # from TV Time into Trakt
logging.info("Processing movies.") logging.info("Processing movies.")
processMovies() process_movies()
elif menuSelection == 3: elif menu_selection == 3:
# Invoke both the episodes and movies import methods # Invoke both the episodes and movies import methods
logging.info("Processing both watched shows and movies.") logging.info("Processing both watched shows and movies.")
processWatchedShows() process_watched_shows()
processMovies() process_movies()
else: else:
logging.error( logging.error(
"ERROR: Unable to complete authentication to Trakt - please try again." "ERROR: Unable to complete authentication to Trakt - please try again."