Move common logic to base Processor class

This commit is contained in:
Markus Nyman 2023-01-17 01:34:37 +02:00
parent fc13aa9a78
commit 52349668d7

View file

@ -13,7 +13,7 @@ from typing import Optional, TypeVar, Union, Any
import trakt.core import trakt.core
from tinydb import Query, TinyDB from tinydb import Query, TinyDB
from tinydb.table import Table from tinydb.table import Table, Document
from trakt import init from trakt import init
from trakt.movies import Movie from trakt.movies import Movie
from trakt.tv import TVShow from trakt.tv import TVShow
@ -97,7 +97,7 @@ def init_trakt_auth() -> bool:
TraktTVShow = TypeVar("TraktTVShow") TraktTVShow = TypeVar("TraktTVShow")
TraktMovie = TypeVar("TraktMovie") TraktMovie = TypeVar("TraktMovie")
SearchResult = Union[TraktTVShow, TraktMovie] TraktItem = Union[TraktTVShow, TraktMovie]
@dataclass @dataclass
@ -125,7 +125,7 @@ class Title:
self.without_year = title self.without_year = title
self.year = None self.year = None
def items_with_same_name(self, items: list[SearchResult]) -> list[SearchResult]: def items_with_same_name(self, items: list[TraktItem]) -> list[TraktItem]:
with_same_name = [] with_same_name = []
for item in items: for item in items:
@ -227,7 +227,7 @@ class Searcher(ABC):
self.items_with_same_name = None self.items_with_same_name = None
self._user_matched_table = user_matched_table self._user_matched_table = user_matched_table
def search(self, title: Title) -> Optional[SearchResult]: def search(self, title: Title) -> Optional[TraktItem]:
self.name = title.name self.name = title.name
# If the title contains a year, then replace the local variable with the stripped version. # If the title contains a year, then replace the local variable with the stripped version.
if title.year: if title.year:
@ -251,14 +251,14 @@ class Searcher(ABC):
self._handle_multiple_manually() self._handle_multiple_manually()
@abstractmethod @abstractmethod
def search_trakt(self, name: str) -> list[SearchResult]: def search_trakt(self, name: str) -> list[TraktItem]:
pass pass
@abstractmethod @abstractmethod
def _print_manual_selection(self): def _print_manual_selection(self):
pass pass
def _search_local(self) -> tuple[bool, SearchResult]: def _search_local(self) -> tuple[bool, TraktItem]:
user_matched_query = Query() user_matched_query = Query()
query_result = self._user_matched_table.search(user_matched_query.Name == self.name) query_result = self._user_matched_table.search(user_matched_query.Name == self.name)
# If the local database already contains an entry for a manual selection # If the local database already contains an entry for a manual selection
@ -274,7 +274,7 @@ class Searcher(ABC):
else: else:
return False, None return False, None
def _handle_multiple_manually(self) -> Optional[SearchResult]: def _handle_multiple_manually(self) -> Optional[TraktItem]:
self._print_manual_selection() self._print_manual_selection()
while True: while True:
try: try:
@ -315,7 +315,7 @@ class Searcher(ABC):
return selected_show return selected_show
def _check_single_result(self) -> Optional[SearchResult]: def _check_single_result(self) -> Optional[TraktItem]:
complete_match_names = [name_from_search for name_from_search in self.items_with_same_name if complete_match_names = [name_from_search for name_from_search in self.items_with_same_name if
name_from_search.title == self.name] name_from_search.title == self.name]
if len(complete_match_names) == 1: if len(complete_match_names) == 1:
@ -331,7 +331,7 @@ class TVShowSearcher(Searcher):
super().__init__(userMatchedShowsTable) super().__init__(userMatchedShowsTable)
self.tv_show = tv_show self.tv_show = tv_show
def search_trakt(self, name: str) -> list[SearchResult]: def search_trakt(self, name: str) -> list[TraktItem]:
return TVShow.search(name) return TVShow.search(name)
def _print_manual_selection(self) -> None: def _print_manual_selection(self) -> None:
@ -352,7 +352,7 @@ class MovieSearcher(Searcher):
def __init__(self): def __init__(self):
super().__init__(userMatchedMoviesTable) super().__init__(userMatchedMoviesTable)
def search_trakt(self, name: str) -> list[SearchResult]: def search_trakt(self, name: str) -> list[TraktItem]:
return Movie.search(name) return Movie.search(name)
def _print_manual_selection(self) -> None: def _print_manual_selection(self) -> None:
@ -368,28 +368,33 @@ class MovieSearcher(Searcher):
class Processor(ABC): class Processor(ABC):
@abstractmethod @abstractmethod
def process_item(self, tv_time_item: TVTimeItem, progress: float) -> None: def _get_synced_items(self, tv_time_item: TVTimeItem) -> list[Document]:
pass pass
@abstractmethod
def _log_already_imported(self, tv_time_item: TVTimeItem, progress: float) -> None:
pass
class TVShowProcessor(Processor): @abstractmethod
def __init__(self): def _should_continue(self, tv_time_item: TVTimeItem) -> bool:
super().__init__() pass
def process_item(self, tv_time_show: TVTimeTVShow, progress: float) -> None: @abstractmethod
def _search_trakt(self, tv_time_item: TVTimeItem) -> TraktItem:
pass
@abstractmethod
def _process(self, tv_time_item: TVTimeItem, trakt_item: TraktItem, progress: float) -> None:
pass
def process_item(self, tv_time_item: TVTimeItem, progress: float) -> None:
# Query the local database for previous entries indicating that # Query the local database for previous entries indicating that
# the item has already been imported in the past. Which will # the item has already been imported in the past. Which will
# ease pressure on Trakt's API server during a retry of the import # ease pressure on Trakt's API server during a retry of the import
# process, and just save time overall without needing to create network requests. # process, and just save time overall without needing to create network requests.
episode_completed_query = Query() synced_episodes = self._get_synced_items(tv_time_item)
synced_episodes = syncedEpisodesTable.search(episode_completed_query.episodeId == tv_time_show.episode_id)
if len(synced_episodes) != 0: if len(synced_episodes) != 0:
logging.info( self._log_already_imported(tv_time_item, progress)
f"({progress}) - Already imported,"
f" skipping \'{tv_time_show.name}\' Season {tv_time_show.season_number} /"
f" Episode {tv_time_show.episode_number}."
)
return return
# If the query returned no results, then continue to import it into Trakt # If the query returned no results, then continue to import it into Trakt
@ -401,29 +406,21 @@ class TVShowProcessor(Processor):
if error_streak > 10: if error_streak > 10:
logging.warning("An error occurred 10 times in a row... skipping episode...") logging.warning("An error occurred 10 times in a row... skipping episode...")
break break
if not self._should_continue():
break
try: try:
# Sleep for a second between each process, before going onto the next watched item. # Sleep for a second between each process, before going onto the next watched item.
# This is required to remain within the API rate limit, and use the API server fairly. # This is required to remain within the API rate limit, and use the API server fairly.
# Other developers share the service, for free - so be considerate of your usage. # Other developers share the service, for free - so be considerate of your usage.
time.sleep(DELAY_BETWEEN_ITEMS_IN_SECONDS) time.sleep(DELAY_BETWEEN_ITEMS_IN_SECONDS)
trakt_show = TVShowSearcher(tv_time_show).search(Title(tv_time_show.name)) trakt_item = self._search_trakt(tv_time_item)
if not trakt_show: if not trakt_item:
break break
logging.info( self._process(tv_time_item, trakt_item, progress)
f"({progress}) - Processing '{tv_time_show.name}'"
f" Season {tv_time_show.season_number} /"
f" Episode {tv_time_show.episode_number}"
)
season = trakt_show.seasons[tv_time_show.parse_season_number(trakt_show)]
episode = season.episodes[int(tv_time_show.episode_number) - 1]
episode.mark_as_seen(tv_time_show.date_watched)
# Add the episode to the local database as imported, so it can be skipped,
# if the process is repeated
syncedEpisodesTable.insert({"episodeId": tv_time_show.episode_id})
logging.info(f"'{tv_time_show.name}' marked as seen")
error_streak = 0 error_streak = 0
break break
@ -431,18 +428,10 @@ class TVShowProcessor(Processor):
# an incorrect Trakt show has been selected, with season/episodes which don't match TV Time. # an incorrect Trakt show has been selected, with season/episodes which don't match TV Time.
# It can also occur due to a bug in Trakt Py, whereby some seasons contain an empty array of episodes. # It can also occur due to a bug in Trakt Py, whereby some seasons contain an empty array of episodes.
except IndexError: except IndexError:
tv_show_slug = trakt_show.to_json()["shows"][0]["ids"]["ids"]["slug"] self._handle_index_error(tv_time_item, trakt_item, progress)
logging.warning(
f"({progress}) - {tv_time_show.name} Season {tv_time_show.season_number},"
f" Episode {tv_time_show.episode_number} does not exist in Trakt!"
f" (https://trakt.tv/shows/{tv_show_slug}/seasons/{tv_time_show.season_number}/episodes/{tv_time_show.episode_number})"
)
break break
except trakt.core.errors.NotFoundException: except trakt.core.errors.NotFoundException:
logging.warning( self._handle_not_found_exception(tv_time_item, progress)
f"({progress}) - {tv_time_show.name} Season {tv_time_show.season_number},"
f" Episode {tv_time_show.episode_number} does not exist (search) in Trakt!"
)
break break
except trakt.core.errors.RateLimitException: except trakt.core.errors.RateLimitException:
logging.warning( logging.warning(
@ -458,7 +447,7 @@ class TVShowProcessor(Processor):
# instead of JSON # instead of JSON
except json.decoder.JSONDecodeError: except json.decoder.JSONDecodeError:
logging.warning( logging.warning(
f"({progress}) - A JSON decode error occurred whilst processing {tv_time_show.name}" f"({progress}) - A JSON decode error occurred whilst processing {tv_time_item.name}"
" This might occur when the server is down and has produced" " This might occur when the server is down and has produced"
" a HTML document instead of JSON. The script will wait 60 seconds before trying again." " a HTML document instead of JSON. The script will wait 60 seconds before trying again."
) )
@ -469,55 +458,99 @@ class TVShowProcessor(Processor):
except KeyboardInterrupt: except KeyboardInterrupt:
sys.exit("Cancel requested...") sys.exit("Cancel requested...")
@abstractmethod
def _handle_index_error(self, tv_time_item: TVTimeItem, trakt_item: TraktItem, progress: float) -> None:
pass
@abstractmethod
def _handle_not_found_exception(self, tv_time_item: TVTimeItem, progress: float) -> None:
pass
class TVShowProcessor(Processor):
def __init__(self):
super().__init__()
def _get_synced_items(self, tv_time_show: TVTimeTVShow) -> list[Document]:
episode_completed_query = Query()
return syncedEpisodesTable.search(episode_completed_query.episodeId == tv_time_show.episode_id)
def _log_already_imported(self, tv_time_show: TVTimeTVShow, progress: float) -> None:
logging.info(
f"({progress}) - Already imported,"
f" skipping \'{tv_time_show.name}\' Season {tv_time_show.season_number} /"
f" Episode {tv_time_show.episode_number}."
)
def _should_continue(self, tv_time_show: TVTimeTVShow) -> bool:
return True
def _search_trakt(self, tv_time_show: TVTimeTVShow) -> TraktTVShow:
return TVShowSearcher(tv_time_show).search_trakt(tv_time_show.name)
def _process(self, tv_time_show: TVTimeTVShow, trakt_show: TraktItem, progress: float) -> None:
logging.info(
f"({progress}) - Processing '{tv_time_show.name}'"
f" Season {tv_time_show.season_number} /"
f" Episode {tv_time_show.episode_number}"
)
season = trakt_show.seasons[tv_time_show.parse_season_number(trakt_show)]
episode = season.episodes[int(tv_time_show.episode_number) - 1]
episode.mark_as_seen(tv_time_show.date_watched)
# Add the episode to the local database as imported, so it can be skipped,
# if the process is repeated
syncedEpisodesTable.insert({"episodeId": tv_time_show.episode_id})
logging.info(f"'{tv_time_show.name}' marked as seen")
def _handle_index_error(self, tv_time_show: TVTimeTVShow, trakt_show: TraktTVShow, progress: float) -> None:
tv_show_slug = trakt_show.to_json()["shows"][0]["ids"]["ids"]["slug"]
logging.warning(
f"({progress}) - {tv_time_show.name} Season {tv_time_show.season_number},"
f" Episode {tv_time_show.episode_number} does not exist in Trakt!"
f" (https://trakt.tv/shows/{tv_show_slug}/seasons/{tv_time_show.season_number}/episodes/{tv_time_show.episode_number})"
)
def _handle_not_found_exception(self, tv_time_show: TVTimeTVShow, progress: float) -> None:
logging.warning(
f"({progress}) - {tv_time_show.name} Season {tv_time_show.season_number},"
f" Episode {tv_time_show.episode_number} does not exist (search) in Trakt!"
)
class MovieProcessor(Processor): class MovieProcessor(Processor):
def __init__(self, watched_list: list): def __init__(self, watched_list: list):
super().__init__() super().__init__()
self._watched_list = watched_list self._watched_list = watched_list
def process_item(self, tv_time_movie: TVTimeMovie, progress: float) -> None: def _get_synced_items(self, tv_time_movie: TVTimeMovie) -> list[Document]:
# Query the local database for previous entries indicating that
# the episode has already been imported in the past. Which will
# ease pressure on Trakt's API server during a retry of the import
# process, and just save time overall without needing to create network requests.
movie_query = Query() movie_query = Query()
synced_movies = syncedMoviesTable.search( return syncedMoviesTable.search(
(movie_query.movie_name == tv_time_movie.name) & (movie_query.type == "watched") (movie_query.movie_name == tv_time_movie.name) & (movie_query.type == "watched")
) )
if len(synced_movies) != 0: def _log_already_imported(self, tv_time_movie: TVTimeMovie, progress: float) -> None:
logging.info(f"({progress}) - Already imported, skipping '{tv_time_movie.name}'.") logging.info(f"({progress}) - Already imported, skipping '{tv_time_movie.name}'.")
return
def _should_continue(self, tv_time_movie: TVTimeMovie) -> bool:
# If movie is watched but this is an entry for watchlist, then skip
if tv_time_movie.name in self._watched_list and tv_time_movie.activity_type != "watch":
logging.info(f"Skipping '{tv_time_movie.name}' to avoid redundant watchlist entry.")
return False
return True
def _search_trakt(self, tv_time_movie: TVTimeMovie) -> TraktMovie:
return MovieSearcher().search(Title(tv_time_movie.name))
def _process(self, tv_time_movie: TVTimeMovie, trakt_movie: TraktMovie, progress: float) -> None:
logging.info(f"({progress}) - Processing '{tv_time_movie.name}'")
watchlist_query = Query() watchlist_query = Query()
movies_in_watchlist = syncedMoviesTable.search( movies_in_watchlist = syncedMoviesTable.search(
(watchlist_query.movie_name == tv_time_movie.name) & (watchlist_query.type == "watchlist") (watchlist_query.movie_name == tv_time_movie.name) & (watchlist_query.type == "watchlist")
) )
# If the query returned no results, then continue to import it into Trakt
# Create a repeating loop, which will break on success, but repeats on failures
error_streak = 0
while True:
# If more than 10 errors occurred in one streak, whilst trying to import the item
# then give up, and move onto the next item, but warn the user.
if error_streak > 10:
logging.warning("An error occurred 10 times in a row... skipping episode...")
break
# If movie is watched but this is an entry for watchlist, then skip
if tv_time_movie.name in self._watched_list and tv_time_movie.activity_type != "watch":
logging.info(f"Skipping '{tv_time_movie.name}' to avoid redundant watchlist entry.")
break
try:
# Sleep for a second between each process, before going onto the next watched item.
# This is required to remain within the API rate limit, and use the API server fairly.
# Other developers share the service, for free - so be considerate of your usage.
time.sleep(DELAY_BETWEEN_ITEMS_IN_SECONDS)
trakt_movie = MovieSearcher().search(Title(tv_time_movie.name))
if not trakt_movie:
break
logging.info(f"({progress}) - Processing '{tv_time_movie.name}'")
if tv_time_movie.activity_type == "watch": if tv_time_movie.activity_type == "watch":
trakt_movie.mark_as_seen(tv_time_movie.date_watched) trakt_movie.mark_as_seen(tv_time_movie.date_watched)
# Add the episode to the local database as imported, so it can be skipped, # Add the episode to the local database as imported, so it can be skipped,
@ -537,43 +570,15 @@ class MovieProcessor(Processor):
else: else:
logging.warning(f"{tv_time_movie.name} already in watchlist") logging.warning(f"{tv_time_movie.name} already in watchlist")
error_streak = 0 def _handle_index_error(self, tv_time_movie: TVTimeMovie, trakt_movie: TraktMovie, progress: float) -> None:
break
# Catch errors which occur because of an incorrect array index. This occurs when
# an incorrect Trakt movie has been selected, with season/episodes which don't match TV Time.
# It can also occur due to a bug in Trakt Py, whereby some seasons contain an empty array of episodes.
except IndexError:
movie_slug = trakt_movie.to_json()["movies"][0]["ids"]["ids"]["slug"] movie_slug = trakt_movie.to_json()["movies"][0]["ids"]["ids"]["slug"]
logging.warning( logging.warning(
f"({progress}) - {tv_time_movie.name}" f"({progress}) - {tv_time_movie.name}"
f" does not exist in Trakt! (https://trakt.tv/movies/{movie_slug}/)" f" does not exist in Trakt! (https://trakt.tv/movies/{movie_slug}/)"
) )
break
except trakt.core.errors.NotFoundException:
logging.warning(f"({progress}) - {tv_time_movie.name} does not exist (search) in Trakt!")
break
except trakt.core.errors.RateLimitException:
logging.warning(
"The program is running too quickly and has hit Trakt's API rate limit!"
" Please increase the delay between"
" movies via the variable 'DELAY_BETWEEN_EPISODES_IN_SECONDS'."
" The program will now wait 60 seconds before"
" trying again."
)
time.sleep(60)
error_streak += 1
except json.decoder.JSONDecodeError:
logging.warning(
f"({progress}) - A JSON decode error occurred whilst processing {tv_time_movie.name}"
" This might occur when the server is down and has produced"
" a HTML document instead of JSON. The script will wait 60 seconds before trying again."
)
time.sleep(60) def _handle_not_found_exception(self, tv_time_movie: TVTimeMovie, progress: float) -> None:
error_streak += 1 logging.warning(f"({progress}) - {tv_time_movie.name} does not exist (search) in Trakt!")
# Catch a CTRL + C keyboard input, and exits the program
except KeyboardInterrupt:
sys.exit("Cancel requested...")
def process_watched_shows() -> None: def process_watched_shows() -> None: