Process TV Shows and Movies using Processor class

This commit is contained in:
Markus Nyman 2023-01-17 00:56:27 +02:00
parent e668a6dec5
commit fc13aa9a78

View file

@ -9,7 +9,7 @@ import time
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime from datetime import datetime
from typing import Optional, TypeVar, Union, Any, TextIO from typing import Optional, TypeVar, Union, Any
import trakt.core import trakt.core
from tinydb import Query, TinyDB from tinydb import Query, TinyDB
@ -27,7 +27,7 @@ logging.basicConfig(
# Adjust this value to increase/decrease your requests between episodes. # Adjust this value to increase/decrease your requests between episodes.
# Make to remain within the rate limit: https://trakt.docs.apiary.io/#introduction/rate-limiting # Make to remain within the rate limit: https://trakt.docs.apiary.io/#introduction/rate-limiting
DELAY_BETWEEN_EPISODES_IN_SECONDS = 1 DELAY_BETWEEN_ITEMS_IN_SECONDS = 1
# Create databases to keep track of completed processes # Create databases to keep track of completed processes
database = TinyDB("localStorage.json") database = TinyDB("localStorage.json")
@ -85,7 +85,6 @@ MOVIES_PATH = config.gdpr_workspace_path + "/tracking-prod-records.csv"
def init_trakt_auth() -> bool: def init_trakt_auth() -> bool:
if is_authenticated(): if is_authenticated():
return True return True
# Set the method of authentication
trakt.core.AUTH_METHOD = trakt.core.OAUTH_AUTH trakt.core.AUTH_METHOD = trakt.core.OAUTH_AUTH
return init( return init(
config.trakt_username, config.trakt_username,
@ -95,10 +94,6 @@ def init_trakt_auth() -> bool:
) )
# With a given title, check if it contains a year (e.g Doctor Who (2005))
# and then return this value, with the title and year removed to improve
# the accuracy of Trakt results.
TraktTVShow = TypeVar("TraktTVShow") TraktTVShow = TypeVar("TraktTVShow")
TraktMovie = TypeVar("TraktMovie") TraktMovie = TypeVar("TraktMovie")
@ -117,18 +112,15 @@ class Title:
:param title: :param title:
""" """
try: try:
self.name = title
# Use a regex expression to get the value within the brackets e.g. The Americans (2017) # Use a regex expression to get the value within the brackets e.g. The Americans (2017)
year_search = re.search(r"\(([A-Za-z0-9_]+)\)", title) year_search = re.search(r"\(([A-Za-z0-9_]+)\)", title)
year_value = year_search.group(1) self.year = int(year_search.group(1))
# Then, get the title without the year value included # Then, get the title without the year value included
title_value = title.split("(")[0].strip() self.without_year = title.split("(")[0].strip()
# Put this together into an object
self.name = title
self.without_year = title_value
self.year = int(year_value)
except Exception: except Exception:
# If the above failed, then the title doesn't include a year # If the above failed, then the title doesn't include a year
# so return the object as is. # so create the value with "defaults"
self.name = title self.name = title
self.without_year = title self.without_year = title
self.year = None self.year = None
@ -167,16 +159,8 @@ class Title:
if self.name == other: if self.name == other:
return True return True
# Split the TvTime title
tv_time_title_split = self.name.split()
# Create an array of words which are found in the Trakt title
words_matched = []
# Go through each word of the TV Time title, and check if it's in the Trakt title # Go through each word of the TV Time title, and check if it's in the Trakt title
for word in tv_time_title_split: words_matched = [word for word in self.name.split() if word in other]
if word in other:
words_matched.append(word)
# Then calculate what percentage of words matched # Then calculate what percentage of words matched
quotient = len(words_matched) / len(other.split()) quotient = len(words_matched) / len(other.split())
@ -191,7 +175,7 @@ class TVTimeItem:
def __init__(self, name: str, updated_at: str): def __init__(self, name: str, updated_at: str):
self.name = name self.name = name
# Get the date which the show was marked 'watched' in TV Time # Get the date which the show was marked 'watched' in TV Time
# and parse the watched date value into a Python type # and parse the watched date value into a Python object
self.date_watched = datetime.strptime( self.date_watched = datetime.strptime(
updated_at, "%Y-%m-%d %H:%M:%S" updated_at, "%Y-%m-%d %H:%M:%S"
) )
@ -199,15 +183,37 @@ class TVTimeItem:
class TVTimeTVShow(TVTimeItem): class TVTimeTVShow(TVTimeItem):
def __init__(self, row: Any): def __init__(self, row: Any):
# Get the name of the item
super().__init__(row["tv_show_name"], row["updated_at"]) super().__init__(row["tv_show_name"], row["updated_at"])
# Get the TV Time Episode id
self.episode_id = row["episode_id"] self.episode_id = row["episode_id"]
# Get the TV Time Season Number
self.season_number = row["episode_season_number"] self.season_number = row["episode_season_number"]
# Get the TV Time Episode Number
self.episode_number = row["episode_number"] self.episode_number = row["episode_number"]
def parse_season_number(self, trakt_show: TraktTVShow) -> int:
"""
Since the Trakt.Py starts the indexing of seasons in the array from 0 (e.g. Season 1 in Index 0), then
subtract the TV Time numerical value by 1, so it starts from 0 as well. However, when a TV series includes
a 'special' season, Trakt.Py will place this as the first season in the array - so, don't subtract, since
this will match TV Time's existing value.
"""
season_number = int(self.season_number)
# Gen get the Season Number from the first item in the array
first_season_no = trakt_show.seasons[0].number
# If the season number is 0, then the Trakt show contains a "special" season
if first_season_no == 0:
# No need to modify the value, as the TV Time value will match Trakt
return season_number
# Otherwise, if the Trakt seasons start with no specials, then return the seasonNo,
# but subtracted by one (e.g. Season 1 in TV Time, will be 0)
else:
# Only subtract if the TV Time season number is greater than 0.
if season_number != 0:
return season_number - 1
# Otherwise, the TV Time season is a special! Then you don't need to change the starting position
else:
return season_number
class TVTimeMovie(TVTimeItem): class TVTimeMovie(TVTimeItem):
def __init__(self, row: Any): def __init__(self, row: Any):
@ -236,7 +242,6 @@ class Searcher(ABC):
# the script should use, or access the local database to see if the user has already provided # the script should use, or access the local database to see if the user has already provided
# a manual selection # a manual selection
# Query the local database for existing selection
should_return, query_result = self._search_local() should_return, query_result = self._search_local()
if should_return: if should_return:
return query_result return query_result
@ -259,19 +264,12 @@ class Searcher(ABC):
# If the local database already contains an entry for a manual selection # If the local database already contains an entry for a manual selection
# then don't bother prompting the user to select it again! # then don't bother prompting the user to select it again!
if len(query_result) == 1: if len(query_result) == 1:
# Get the first result from the query
first_match = query_result[0] first_match = query_result[0]
# Get the value contains the selection index
first_match_selected_index = int(first_match.get("UserSelectedIndex")) first_match_selected_index = int(first_match.get("UserSelectedIndex"))
# Check if the user previously requested to skip the show
skip_show = first_match.get("Skip") skip_show = first_match.get("Skip")
# If the user did not skip, but provided an index selection, get the
# matching show
if not skip_show: if not skip_show:
return True, self.items_with_same_name[first_match_selected_index] return True, self.items_with_same_name[first_match_selected_index]
else: else:
# Otherwise, return None, which will trigger the script to skip
# and move onto the next show
return True, None return True, None
else: else:
return False, None return False, None
@ -285,21 +283,15 @@ class Searcher(ABC):
"Please make a selection from above (or enter SKIP):" "Please make a selection from above (or enter SKIP):"
) )
# Exit the loop
if index_selected == "SKIP": if index_selected == "SKIP":
break break
# Since the value isn't 'skip', check that the result is numerical
index_selected = int(index_selected) - 1 index_selected = int(index_selected) - 1
break break
# Still allow the user to provide the exit input, and kill the program
except KeyboardInterrupt: except KeyboardInterrupt:
sys.exit("Cancel requested...") sys.exit("Cancel requested...")
# Otherwise, the user has entered an invalid value, warn the user to try again
except Exception: except Exception:
logging.error( logging.error(f"Sorry! Please select a value between 0 to {len(self.items_with_same_name)}")
f"Sorry! Please select a value between 0 to {len(self.items_with_same_name)}"
)
# If the user entered 'SKIP', then exit from the loop with no selection, which # If the user entered 'SKIP', then exit from the loop with no selection, which
# will trigger the program to move onto the next episode # will trigger the program to move onto the next episode
@ -309,9 +301,7 @@ class Searcher(ABC):
self._user_matched_table.insert( self._user_matched_table.insert(
{"Name": self.name, "UserSelectedIndex": 0, "Skip": True} {"Name": self.name, "UserSelectedIndex": 0, "Skip": True}
) )
return None return None
# Otherwise, return the selection which the user made from the list
else: else:
selected_show = self.items_with_same_name[int(index_selected)] selected_show = self.items_with_same_name[int(index_selected)]
@ -346,14 +336,12 @@ class TVShowSearcher(Searcher):
def _print_manual_selection(self) -> None: def _print_manual_selection(self) -> None:
print( print(
f"INFO - MANUAL INPUT REQUIRED: The TV Time data for Show '{self.name}' (Season {self.tv_show.season_number}," f"INFO - MANUAL INPUT REQUIRED: The TV Time data for Show '{self.name}'"
f"Episode {self.tv_show.episode_number}) has {len(self.items_with_same_name)} matching Trakt shows with the same name.\a " f" (Season {self.tv_show.season_number}, Episode {self.tv_show.episode_number}) has"
f" {len(self.items_with_same_name)} matching Trakt shows with the same name.\a "
) )
# Output each show for manual selection
for idx, item in enumerate(self.items_with_same_name): for idx, item in enumerate(self.items_with_same_name):
# Display the show's title, broadcast year, amount of seasons and a link to the Trakt page.
# This will provide the user with enough information to make a selection.
print( print(
f"({idx + 1}) {item.title} - {item.year} - {len(item.seasons)} " f"({idx + 1}) {item.title} - {item.year} - {len(item.seasons)} "
f"Season(s) - More Info: https://trakt.tv/{item.ext}" f"Season(s) - More Info: https://trakt.tv/{item.ext}"
@ -369,459 +357,251 @@ class MovieSearcher(Searcher):
def _print_manual_selection(self) -> None: def _print_manual_selection(self) -> None:
print( print(
f"INFO - MANUAL INPUT REQUIRED: The TV Time data for Movie '{self.name}' has {len(self.items_with_same_name)} " f"INFO - MANUAL INPUT REQUIRED: The TV Time data for Movie '{self.name}'"
f" has {len(self.items_with_same_name)}"
f" matching Trakt movies with the same name.\a" f" matching Trakt movies with the same name.\a"
) )
# Output each movie for manual selection
for idx, item in enumerate(self.items_with_same_name): for idx, item in enumerate(self.items_with_same_name):
# Display the movie's title, broadcast year, amount of seasons and a link to the Trakt page. print(f"({idx + 1}) {item.title} - {item.year} - More Info: https://trakt.tv/{item.ext}")
# This will provide the user with enough information to make a selection.
print(
f" ({idx + 1}) {item.title} - {item.year} - More Info: https://trakt.tv/{item.ext}"
)
class Processor: class Processor(ABC):
def __init__(self, reader: csv.DictReader, rows_total: int): @abstractmethod
self._reader = reader def process_item(self, tv_time_item: TVTimeItem, progress: float) -> None:
self._rows_total = rows_total pass
def process_watched(self):
# Loop through each line/record of the CSV file
# Ignore the header row
next(self._reader, None)
for rowsCount, row in enumerate(self._reader):
tv_time_tv_show = TVTimeTVShow(row)
class TVShowProcessor(Processor):
def __init__(self):
super().__init__()
def process_item(self, tv_time_show: TVTimeTVShow, progress: float) -> None:
# Query the local database for previous entries indicating that # Query the local database for previous entries indicating that
# the episode has already been imported in the past. Which will # the item has already been imported in the past. Which will
# ease pressure on TV Time's API server during a retry of the import # ease pressure on Trakt's API server during a retry of the import
# process, and just save time overall without needing to create network requests # process, and just save time overall without needing to create network requests.
episode_completed_query = Query() episode_completed_query = Query()
query_result = syncedEpisodesTable.search( synced_episodes = syncedEpisodesTable.search(episode_completed_query.episodeId == tv_time_show.episode_id)
episode_completed_query.episodeId == tv_show_episode_id
if len(synced_episodes) != 0:
logging.info(
f"({progress}) - Already imported,"
f" skipping \'{tv_time_show.name}\' Season {tv_time_show.season_number} /"
f" Episode {tv_time_show.episode_number}."
) )
return
# If the query returned no results, then continue to import it into Trakt # If the query returned no results, then continue to import it into Trakt
if len(query_result) == 0:
# Create a repeating loop, which will break on success, but repeats on failures # Create a repeating loop, which will break on success, but repeats on failures
error_streak = 0 error_streak = 0
while True: while True:
# If more than 10 errors occurred in one streak, whilst trying to import the episode # If more than 10 errors occurred in one streak, whilst trying to import the item
# then give up, and move onto the next episode, but warn the user. # then give up, and move onto the next item, but warn the user.
if error_streak > 10: if error_streak > 10:
logging.warning( logging.warning("An error occurred 10 times in a row... skipping episode...")
"An error occurred 10 times in a row... skipping episode..."
)
break break
try: try:
# Sleep for a second between each process, before going onto the next watched episode. # Sleep for a second between each process, before going onto the next watched item.
# This is required to remain within the API rate limit, and use the API server fairly. # This is required to remain within the API rate limit, and use the API server fairly.
# Other developers share the service, for free - so be considerate of your usage. # Other developers share the service, for free - so be considerate of your usage.
time.sleep(DELAY_BETWEEN_EPISODES_IN_SECONDS) time.sleep(DELAY_BETWEEN_ITEMS_IN_SECONDS)
# Search Trakt for the TV show matching TV Time's title value
trakt_show = TVShowSearcher(tv_show_season_number,
tv_show_episode_number).search(Title(tv_show_name))
# If the method returned 'None', then this is an indication to skip the episode, and trakt_show = TVShowSearcher(tv_time_show).search(Title(tv_time_show.name))
# move onto the next one
if not trakt_show: if not trakt_show:
break break
# Show the progress of the import on-screen
logging.info( logging.info(
f"({rowsCount + 1}/{rows_total}) - Processing '{tv_show_name}' Season {tv_show_season_number} /" f"({progress}) - Processing '{tv_time_show.name}'"
f"Episode {tv_show_episode_number}" f" Season {tv_time_show.season_number} /"
f" Episode {tv_time_show.episode_number}"
) )
# Get the season from the Trakt API
season = trakt_show.seasons[ season = trakt_show.seasons[tv_time_show.parse_season_number(trakt_show)]
parse_season_number(tv_show_season_number, trakt_show) episode = season.episodes[int(tv_time_show.episode_number) - 1]
] episode.mark_as_seen(tv_time_show.date_watched)
# Get the episode from the season
episode = season.episodes[int(tv_show_episode_number) - 1]
# Mark the episode as watched!
episode.mark_as_seen(tv_show_date_watched_converted)
# Add the episode to the local database as imported, so it can be skipped, # Add the episode to the local database as imported, so it can be skipped,
# if the process is repeated # if the process is repeated
syncedEpisodesTable.insert({"episodeId": tv_show_episode_id}) syncedEpisodesTable.insert({"episodeId": tv_time_show.episode_id})
# Clear the error streak on completing the method without errors logging.info(f"'{tv_time_show.name}' marked as seen")
error_streak = 0 error_streak = 0
break break
# Catch errors which occur because of an incorrect array index. This occurs when # Catch errors which occur because of an incorrect array index. This occurs when
# an incorrect Trakt show has been selected, with season/episodes which don't match TV Time. # an incorrect Trakt show has been selected, with season/episodes which don't match TV Time.
# It can also occur due to a bug in Trakt Py, whereby some seasons contain an empty array of episodes. # It can also occur due to a bug in Trakt Py, whereby some seasons contain an empty array of episodes.
except IndexError: except IndexError:
tv_show_slug = trakt_show.to_json()["shows"][0]["ids"]["ids"][ tv_show_slug = trakt_show.to_json()["shows"][0]["ids"]["ids"]["slug"]
"slug"
]
logging.warning( logging.warning(
f"({rowsCount}/{rows_total}) - {tv_show_name} Season {tv_show_season_number}, " f"({progress}) - {tv_time_show.name} Season {tv_time_show.season_number},"
f"Episode {tv_show_episode_number} does not exist in Trakt! " f" Episode {tv_time_show.episode_number} does not exist in Trakt!"
f"(https://trakt.tv/shows/{tv_show_slug}/seasons/{tv_show_season_number}/episodes/{tv_show_episode_number})" f" (https://trakt.tv/shows/{tv_show_slug}/seasons/{tv_time_show.season_number}/episodes/{tv_time_show.episode_number})"
) )
break break
# Catch any errors which are raised because a show could not be found in Trakt except trakt.core.errors.NotFoundException:
except trakt.errors.NotFoundException:
logging.warning( logging.warning(
f"({rowsCount}/{rows_total}) - {tv_show_name} Season {tv_show_season_number}, " f"({progress}) - {tv_time_show.name} Season {tv_time_show.season_number},"
f"Episode {tv_show_episode_number} does not exist (search) in Trakt!" f" Episode {tv_time_show.episode_number} does not exist (search) in Trakt!"
) )
break break
# Catch errors because of the program breaching the Trakt API rate limit except trakt.core.errors.RateLimitException:
except trakt.errors.RateLimitException:
logging.warning( logging.warning(
"The program is running too quickly and has hit Trakt's API rate limit! Please increase the delay between " "The program is running too quickly and has hit Trakt's API rate limit!"
+ "episdoes via the variable 'DELAY_BETWEEN_EPISODES_IN_SECONDS'. The program will now wait 60 seconds before " " Please increase the delay between"
+ "trying again." " movies via the variable 'DELAY_BETWEEN_EPISODES_IN_SECONDS'."
" The program will now wait 60 seconds before"
" trying again."
) )
time.sleep(60) time.sleep(60)
# Mark the exception in the error streak
error_streak += 1 error_streak += 1
# Catch a JSON decode error - this can be raised when the API server is down and produces a HTML page, instead of JSON # Catch a JSON decode error - this can be raised when the API server is down and produces an HTML page,
# instead of JSON
except json.decoder.JSONDecodeError: except json.decoder.JSONDecodeError:
logging.warning( logging.warning(
f"({rowsCount}/{rows_total}) - A JSON decode error occuring whilst processing {tv_show_name} " f"({progress}) - A JSON decode error occurred whilst processing {tv_time_show.name}"
+ f"Season {tv_show_season_number}, Episode {tv_show_episode_number}! This might occur when the server is down and has produced " " This might occur when the server is down and has produced"
+ "a HTML document instead of JSON. The script will wait 60 seconds before trying again." " a HTML document instead of JSON. The script will wait 60 seconds before trying again."
) )
# Wait 60 seconds
time.sleep(60) time.sleep(60)
# Mark the exception in the error streak
error_streak += 1 error_streak += 1
# Catch a CTRL + C keyboard input, and exits the program # Catch a CTRL + C keyboard input, and exits the program
except KeyboardInterrupt: except KeyboardInterrupt:
sys.exit("Cancel requested...") sys.exit("Cancel requested...")
# Skip the episode
else:
logging.info(
f"({rowsCount}/{rows_total}) - Already imported, skipping '{tv_show_name}' Season {tv_show_season_number} / Episode {tv_show_episode_number}."
)
def parse_season_number(season_number, trakt_show_obj): class MovieProcessor(Processor):
""" def __init__(self, watched_list: list):
Since the Trakt.Py starts the indexing of seasons in the array from 0 (e.g. Season 1 in Index 0), then super().__init__()
subtract the TV Time numerical value by 1, so it starts from 0 as well. However, when a TV series includes self._watched_list = watched_list
a 'special' season, Trakt.Py will place this as the first season in the array - so, don't subtract, since
this will match TV Time's existing value.
"""
# Parse the season number into a numerical value
season_number = int(season_number)
# Then get the Season Number from the first item in the array
first_season_no = trakt_show_obj.seasons[0].number
# If the season number is 0, then the Trakt show contains a "special" season
if first_season_no == 0:
# No need to modify the value, as the TV Time value will match Trakt
return season_number
# Otherwise, if the Trakt seasons start with no specials, then return the seasonNo,
# but subtracted by one (e.g. Season 1 in TV Time, will be 0)
else:
# Only subtract if the TV Time season number is greater than 0.
if season_number != 0:
return season_number - 1
# Otherwise, the TV Time season is a special! Then you don't need to change the starting position
else:
return season_number
def process_watched_shows() -> None:
# Open the CSV file within the GDPR exported data
with open(WATCHED_SHOWS_PATH, newline="") as csvfile:
# Create the CSV reader, which will break up the fields using the delimiter ','
shows_reader = csv.DictReader(csvfile, delimiter=",")
# Get the total amount of rows in the CSV file,
rows_total = len(list(shows_reader))
# Move position to the beginning of the file
csvfile.seek(0, 0)
processor = Processor(shows_reader, rows_total)
processor.process_watched()
# Loop through each line/record of the CSV file
# Ignore the header row
next(shows_reader, None)
for rowsCount, row in enumerate(shows_reader):
# Get the name of the TV show
tv_show_name = row["tv_show_name"]
# Get the TV Time Episode id
tv_show_episode_id = row["episode_id"]
# Get the TV Time Season Number
tv_show_season_number = row["episode_season_number"]
# Get the TV Time Episode Number
tv_show_episode_number = row["episode_number"]
# Get the date which the show was marked 'watched' in TV Time
tv_show_date_watched = row["updated_at"]
# Parse the watched date value into a Python type
tv_show_date_watched_converted = datetime.strptime(
tv_show_date_watched, "%Y-%m-%d %H:%M:%S"
)
def process_item(self, tv_time_movie: TVTimeMovie, progress: float) -> None:
# Query the local database for previous entries indicating that # Query the local database for previous entries indicating that
# the episode has already been imported in the past. Which will # the episode has already been imported in the past. Which will
# ease pressure on TV Time's API server during a retry of the import # ease pressure on Trakt's API server during a retry of the import
# process, and just save time overall without needing to create network requests # process, and just save time overall without needing to create network requests.
episode_completed_query = Query()
query_result = syncedEpisodesTable.search(
episode_completed_query.episodeId == tv_show_episode_id
)
# If the query returned no results, then continue to import it into Trakt
if len(query_result) == 0:
# Create a repeating loop, which will break on success, but repeats on failures
error_streak = 0
while True:
# If more than 10 errors occurred in one streak, whilst trying to import the episode
# then give up, and move onto the next episode, but warn the user.
if error_streak > 10:
logging.warning(
"An error occurred 10 times in a row... skipping episode..."
)
break
try:
# Sleep for a second between each process, before going onto the next watched episode.
# This is required to remain within the API rate limit, and use the API server fairly.
# Other developers share the service, for free - so be considerate of your usage.
time.sleep(DELAY_BETWEEN_EPISODES_IN_SECONDS)
# Search Trakt for the TV show matching TV Time's title value
trakt_show = TVShowSearcher(tv_show_season_number,
tv_show_episode_number).search(Title(tv_show_name))
# If the method returned 'None', then this is an indication to skip the episode, and
# move onto the next one
if not trakt_show:
break
# Show the progress of the import on-screen
logging.info(
f"({rowsCount + 1}/{rows_total}) - Processing '{tv_show_name}' Season {tv_show_season_number} /"
f"Episode {tv_show_episode_number}"
)
# Get the season from the Trakt API
season = trakt_show.seasons[
parse_season_number(tv_show_season_number, trakt_show)
]
# Get the episode from the season
episode = season.episodes[int(tv_show_episode_number) - 1]
# Mark the episode as watched!
episode.mark_as_seen(tv_show_date_watched_converted)
# Add the episode to the local database as imported, so it can be skipped,
# if the process is repeated
syncedEpisodesTable.insert({"episodeId": tv_show_episode_id})
# Clear the error streak on completing the method without errors
error_streak = 0
break
# Catch errors which occur because of an incorrect array index. This occurs when
# an incorrect Trakt show has been selected, with season/episodes which don't match TV Time.
# It can also occur due to a bug in Trakt Py, whereby some seasons contain an empty array of episodes.
except IndexError:
tv_show_slug = trakt_show.to_json()["shows"][0]["ids"]["ids"][
"slug"
]
logging.warning(
f"({rowsCount}/{rows_total}) - {tv_show_name} Season {tv_show_season_number}, "
f"Episode {tv_show_episode_number} does not exist in Trakt! "
f"(https://trakt.tv/shows/{tv_show_slug}/seasons/{tv_show_season_number}/episodes/{tv_show_episode_number})"
)
break
# Catch any errors which are raised because a show could not be found in Trakt
except trakt.errors.NotFoundException:
logging.warning(
f"({rowsCount}/{rows_total}) - {tv_show_name} Season {tv_show_season_number}, "
f"Episode {tv_show_episode_number} does not exist (search) in Trakt!"
)
break
# Catch errors because of the program breaching the Trakt API rate limit
except trakt.errors.RateLimitException:
logging.warning(
"The program is running too quickly and has hit Trakt's API rate limit! Please increase the delay between "
+ "episdoes via the variable 'DELAY_BETWEEN_EPISODES_IN_SECONDS'. The program will now wait 60 seconds before "
+ "trying again."
)
time.sleep(60)
# Mark the exception in the error streak
error_streak += 1
# Catch a JSON decode error - this can be raised when the API server is down and produces a HTML page, instead of JSON
except json.decoder.JSONDecodeError:
logging.warning(
f"({rowsCount}/{rows_total}) - A JSON decode error occuring whilst processing {tv_show_name} "
+ f"Season {tv_show_season_number}, Episode {tv_show_episode_number}! This might occur when the server is down and has produced "
+ "a HTML document instead of JSON. The script will wait 60 seconds before trying again."
)
# Wait 60 seconds
time.sleep(60)
# Mark the exception in the error streak
error_streak += 1
# Catch a CTRL + C keyboard input, and exits the program
except KeyboardInterrupt:
sys.exit("Cancel requested...")
# Skip the episode
else:
logging.info(
f"({rowsCount}/{rows_total}) - Already imported, skipping '{tv_show_name}' Season {tv_show_season_number} / Episode {tv_show_episode_number}."
)
def process_movies():
# Total amount of rows which have been processed in the CSV file
# Total amount of rows in the CSV file
error_streak = 0
# Open the CSV file within the GDPR exported data
with open(MOVIES_PATH, newline="") as csvfile:
# Create the CSV reader, which will break up the fields using the delimiter ','
movie_reader_temp = csv.DictReader(csvfile, delimiter=",")
movie_reader = filter(lambda p: "" != p["movie_name"], movie_reader_temp)
# First, list all movies with watched type so that watchlist entry for them is not created
watched_list = []
for row in movie_reader:
if row["type"] == "watch":
watched_list.append(row["movie_name"])
# Move position to the beginning of the file
csvfile.seek(0, 0)
# Get the total amount of rows in the CSV file,
rows_total = len(list(movie_reader))
# Move position to the beginning of the file
csvfile.seek(0, 0)
# Loop through each line/record of the CSV file
# Ignore the header row
next(movie_reader, None)
for rows_count, row in enumerate(movie_reader):
# Get the name of the Movie
movie_name = row["movie_name"]
# Get the date which the movie was marked 'watched' in TV Time
activity_type = row["type"]
movie_date_watched = row["updated_at"]
# Parse the watched date value into a Python type
movie_date_watched_converted = datetime.strptime(
movie_date_watched, "%Y-%m-%d %H:%M:%S"
)
# Query the local database for previous entries indicating that
# the episode has already been imported in the past. Which will
# ease pressure on TV Time's API server during a retry of the import
# process, and just save time overall without needing to create network requests
movie_query = Query() movie_query = Query()
query_result = syncedMoviesTable.search( synced_movies = syncedMoviesTable.search(
(movie_query.movie_name == movie_name) & (movie_query.type == "watched") (movie_query.movie_name == tv_time_movie.name) & (movie_query.type == "watched")
) )
if len(synced_movies) != 0:
logging.info(f"({progress}) - Already imported, skipping '{tv_time_movie.name}'.")
return
watchlist_query = Query() watchlist_query = Query()
query_result_watchlist = syncedMoviesTable.search( movies_in_watchlist = syncedMoviesTable.search(
(watchlist_query.movie_name == movie_name) (watchlist_query.movie_name == tv_time_movie.name) & (watchlist_query.type == "watchlist")
& (watchlist_query.type == "watchlist")
) )
# If the query returned no results, then continue to import it into Trakt # If the query returned no results, then continue to import it into Trakt
if len(query_result) == 0:
# Create a repeating loop, which will break on success, but repeats on failures # Create a repeating loop, which will break on success, but repeats on failures
error_streak = 0
while True: while True:
# If movie is watched but this is an entry for watchlist, then skip # If more than 10 errors occurred in one streak, whilst trying to import the item
if movie_name in watched_list and activity_type != "watch": # then give up, and move onto the next item, but warn the user.
logging.info(
f"Skipping '{movie_name}' to avoid redundant watchlist entry."
)
break
# If more than 10 errors occurred in one streak, whilst trying to import the episode
# then give up, and move onto the next episode, but warn the user.
if error_streak > 10: if error_streak > 10:
logging.warning( logging.warning("An error occurred 10 times in a row... skipping episode...")
"An error occurred 10 times in a row... skipping episode..." break
) # If movie is watched but this is an entry for watchlist, then skip
if tv_time_movie.name in self._watched_list and tv_time_movie.activity_type != "watch":
logging.info(f"Skipping '{tv_time_movie.name}' to avoid redundant watchlist entry.")
break break
try: try:
# Sleep for a second between each process, before going onto the next watched episode. # Sleep for a second between each process, before going onto the next watched item.
# This is required to remain within the API rate limit, and use the API server fairly. # This is required to remain within the API rate limit, and use the API server fairly.
# Other developers share the service, for free - so be considerate of your usage. # Other developers share the service, for free - so be considerate of your usage.
time.sleep(DELAY_BETWEEN_EPISODES_IN_SECONDS) time.sleep(DELAY_BETWEEN_ITEMS_IN_SECONDS)
# Search Trakt for the Movie matching TV Time's title value trakt_movie = MovieSearcher().search(Title(tv_time_movie.name))
trakt_movie_obj = MovieSearcher().search(Title(movie_name)) if not trakt_movie:
# If the method returned 'None', then this is an indication to skip the episode, and
# move onto the next one
if trakt_movie_obj is None:
break break
# Show the progress of the import on-screen
logging.info( logging.info(f"({progress}) - Processing '{tv_time_movie.name}'")
f"({rows_count + 1}/{rows_total}) - Processing '{movie_name}'"
) if tv_time_movie.activity_type == "watch":
if activity_type == "watch": trakt_movie.mark_as_seen(tv_time_movie.date_watched)
trakt_movie_obj.mark_as_seen(movie_date_watched_converted)
# Add the episode to the local database as imported, so it can be skipped, # Add the episode to the local database as imported, so it can be skipped,
# if the process is repeated # if the process is repeated
syncedMoviesTable.insert( syncedMoviesTable.insert(
{"movie_name": movie_name, "type": "watched"} {"movie_name": tv_time_movie.name, "type": "watched"}
) )
logging.info(f"Marked as seen") logging.info(f"'{tv_time_movie.name}' marked as seen")
elif len(query_result_watchlist) == 0: elif len(movies_in_watchlist) == 0:
trakt_movie_obj.add_to_watchlist() trakt_movie.add_to_watchlist()
# Add the episode to the local database as imported, so it can be skipped, # Add the episode to the local database as imported, so it can be skipped,
# if the process is repeated # if the process is repeated
syncedMoviesTable.insert( syncedMoviesTable.insert(
{"movie_name": movie_name, "type": "watchlist"} {"movie_name": tv_time_movie.name, "type": "watchlist"}
) )
logging.info(f"Added to watchlist") logging.info(f"'{tv_time_movie.name}' added to watchlist")
else: else:
logging.warning(f"Already in watchlist") logging.warning(f"{tv_time_movie.name} already in watchlist")
# Clear the error streak on completing the method without errors
error_streak = 0 error_streak = 0
break break
# Catch errors which occur because of an incorrect array index. This occurs when # Catch errors which occur because of an incorrect array index. This occurs when
# an incorrect Trakt movie has been selected, with season/episodes which don't match TV Time. # an incorrect Trakt movie has been selected, with season/episodes which don't match TV Time.
# It can also occur due to a bug in Trakt Py, whereby some seasons contain an empty array of episodes. # It can also occur due to a bug in Trakt Py, whereby some seasons contain an empty array of episodes.
except IndexError: except IndexError:
movie_slug = trakt_movie_obj.to_json()["movies"][0]["ids"]["ids"][ movie_slug = trakt_movie.to_json()["movies"][0]["ids"]["ids"]["slug"]
"slug"
]
logging.warning( logging.warning(
f"({rows_count}/{rows_total}) - {movie_name} " f"({progress}) - {tv_time_movie.name}"
f" does not exist in Trakt! (https://trakt.tv/movies/{movie_slug}/)" f" does not exist in Trakt! (https://trakt.tv/movies/{movie_slug}/)"
) )
break break
# Catch any errors which are raised because a movie could not be found in Trakt except trakt.core.errors.NotFoundException:
except trakt.errors.NotFoundException: logging.warning(f"({progress}) - {tv_time_movie.name} does not exist (search) in Trakt!")
logging.warning(
f"({rows_count}/{rows_total}) - {movie_name} does not exist (search) in Trakt!"
)
break break
# Catch errors because of the program breaching the Trakt API rate limit except trakt.core.errors.RateLimitException:
except trakt.errors.RateLimitException:
logging.warning( logging.warning(
"The program is running too quickly and has hit Trakt's API rate limit! Please increase the delay between " "The program is running too quickly and has hit Trakt's API rate limit!"
+ "movies via the variable 'DELAY_BETWEEN_EPISODES_IN_SECONDS'. The program will now wait 60 seconds before " " Please increase the delay between"
+ "trying again." " movies via the variable 'DELAY_BETWEEN_EPISODES_IN_SECONDS'."
" The program will now wait 60 seconds before"
" trying again."
) )
time.sleep(60) time.sleep(60)
# Mark the exception in the error streak
error_streak += 1 error_streak += 1
# Catch a JSON decode error - this can be raised when the API server is down and produces a HTML page, instead of JSON
except json.decoder.JSONDecodeError: except json.decoder.JSONDecodeError:
logging.warning( logging.warning(
f"({rows_count}/{rows_total}) - A JSON decode error occuring whilst processing {movie_name} " f"({progress}) - A JSON decode error occurred whilst processing {tv_time_movie.name}"
+ f" This might occur when the server is down and has produced " " This might occur when the server is down and has produced"
+ "a HTML document instead of JSON. The script will wait 60 seconds before trying again." " a HTML document instead of JSON. The script will wait 60 seconds before trying again."
) )
# Wait 60 seconds
time.sleep(60) time.sleep(60)
# Mark the exception in the error streak
error_streak += 1 error_streak += 1
# Catch a CTRL + C keyboard input, and exits the program # Catch a CTRL + C keyboard input, and exits the program
except KeyboardInterrupt: except KeyboardInterrupt:
sys.exit("Cancel requested...") sys.exit("Cancel requested...")
# Skip the episode
else: def process_watched_shows() -> None:
logging.info( with open(WATCHED_SHOWS_PATH, newline="") as csvfile:
f"({rows_count}/{rows_total}) - Already imported, skipping '{movie_name}'." reader = csv.DictReader(csvfile, delimiter=",")
) total_rows = len(list(reader))
csvfile.seek(0, 0)
# Ignore the header row
next(reader, None)
for rows_count, row in enumerate(reader):
tv_time_show = TVTimeTVShow(row)
TVShowProcessor().process_item(tv_time_show, rows_count / total_rows)
def process_watched_movies() -> None:
with open(MOVIES_PATH, newline="") as csvfile:
reader = filter(lambda p: p["movie_name"] != "", csv.DictReader(csvfile, delimeter=""))
watched_list = [row["movie_name"] for row in reader if row["type"] == "watch"]
csvfile.seek(0, 0)
total_rows = len(list(reader))
csvfile.seek(0, 0)
# Ignore the header row
next(reader, None)
for rows_count, row in enumerate(reader):
movie = TVTimeMovie(row)
MovieProcessor(watched_list).process_item(movie, rows_count / total_rows)
def menu_selection() -> int: def menu_selection() -> int:
@ -860,23 +640,17 @@ def start():
"ERROR: Unable to complete authentication to Trakt - please try again." "ERROR: Unable to complete authentication to Trakt - please try again."
) )
# Start the process which is required
if selection == 1: if selection == 1:
# Invoke the method which will import episodes which have been watched
# from TV Time into Trakt
logging.info("Processing watched shows.") logging.info("Processing watched shows.")
process_watched_shows() process_watched_shows()
# TODO: Add support for followed shows # TODO: Add support for followed shows
elif selection == 2: elif selection == 2:
# Invoke the method which will import movies which have been watched
# from TV Time into Trakt
logging.info("Processing movies.") logging.info("Processing movies.")
process_movies() process_watched_movies()
elif selection == 3: elif selection == 3:
# Invoke both the episodes and movies import methods
logging.info("Processing both watched shows and movies.") logging.info("Processing both watched shows and movies.")
process_watched_shows() process_watched_shows()
process_movies() process_watched_movies()
if __name__ == "__main__": if __name__ == "__main__":
@ -885,7 +659,6 @@ if __name__ == "__main__":
start() start()
else: else:
logging.error( logging.error(
"Oops! The TV Time GDPR folder '" f"Oops! The TV Time GDPR folder 'config.gdpr_workspace_path'"
+ config.gdpr_workspace_path " does not exist on the local system. Please check it, and try again."
+ "' does not exist on the local system. Please check it, and try again."
) )