From ad00347714a6cea03175d1ef097b4eaf7a52e19b Mon Sep 17 00:00:00 2001 From: Markus Nyman Date: Tue, 17 Jan 2023 01:46:12 +0200 Subject: [PATCH] Split stuff to own files --- TimeToTrakt.py | 506 +------------------------------------------------ database.py | 8 + processor.py | 228 ++++++++++++++++++++++ searcher.py | 284 +++++++++++++++++++++++++++ 4 files changed, 523 insertions(+), 503 deletions(-) create mode 100644 database.py create mode 100644 processor.py create mode 100644 searcher.py diff --git a/TimeToTrakt.py b/TimeToTrakt.py index 2b49609..7699141 100644 --- a/TimeToTrakt.py +++ b/TimeToTrakt.py @@ -3,20 +3,14 @@ import csv import json import logging import os -import re -import sys -import time -from abc import ABC, abstractmethod from dataclasses import dataclass from datetime import datetime -from typing import Optional, TypeVar, Union, Any import trakt.core -from tinydb import Query, TinyDB -from tinydb.table import Table, Document from trakt import init -from trakt.movies import Movie -from trakt.tv import TVShow + +from processor import TVShowProcessor, MovieProcessor +from searcher import TVTimeTVShow, TVTimeMovie # Setup logger logging.basicConfig( @@ -29,13 +23,6 @@ logging.basicConfig( # Make to remain within the rate limit: https://trakt.docs.apiary.io/#introduction/rate-limiting DELAY_BETWEEN_ITEMS_IN_SECONDS = 1 -# Create databases to keep track of completed processes -database = TinyDB("localStorage.json") -syncedEpisodesTable = database.table("SyncedEpisodes") -userMatchedShowsTable = database.table("TvTimeTraktUserMatched") -syncedMoviesTable = database.table("SyncedMovies") -userMatchedMoviesTable = database.table("TvTimeTraktUserMatchedMovies") - @dataclass class Config: @@ -94,493 +81,6 @@ def init_trakt_auth() -> bool: ) -TraktTVShow = TypeVar("TraktTVShow") -TraktMovie = TypeVar("TraktMovie") - -TraktItem = Union[TraktTVShow, TraktMovie] - - -@dataclass -class Title: - name: str - without_year: str - year: Optional[int] - - def __init__(self, title: str): - """ - Parse the title's name for year. - :param title: - """ - try: - self.name = title - # Use a regex expression to get the value within the brackets e.g. The Americans (2017) - year_search = re.search(r"\(([A-Za-z0-9_]+)\)", title) - self.year = int(year_search.group(1)) - # Then, get the title without the year value included - self.without_year = title.split("(")[0].strip() - except Exception: - # If the above failed, then the title doesn't include a year - # so create the value with "defaults" - self.name = title - self.without_year = title - self.year = None - - def items_with_same_name(self, items: list[TraktItem]) -> list[TraktItem]: - with_same_name = [] - - for item in items: - if self.matches(item.title): - # If the title included the year of broadcast, then we can be more picky in the results - # to look for an item with a broadcast year that matches - if self.year: - # If the item title is a 1:1 match, with the same broadcast year, then bingo! - if (self.name == item.title) and (item.year == self.year): - # Clear previous results, and only use this one - with_same_name = [item] - break - - # Otherwise, only add the item if the broadcast year matches - if item.year == self.year: - with_same_name.append(item) - # If the item doesn't have the broadcast year, then add all the results - else: - with_same_name.append(item) - - return with_same_name - - def matches(self, other: str) -> bool: - """ - Shows in TV Time are often different to Trakt.TV - in order to improve results and automation, - calculate how many words are in the title, and return true if more than 50% of the title is a match, - It seems to improve automation, and reduce manual selection... - """ - - # If the name is a complete match, then don't bother comparing them! - if self.name == other: - return True - - # Go through each word of the TV Time title, and check if it's in the Trakt title - words_matched = [word for word in self.name.split() if word in other] - - # Then calculate what percentage of words matched - quotient = len(words_matched) / len(other.split()) - percentage = quotient * 100 - - # If more than 50% of words in the TV Time title exist in the Trakt title, - # then return the title as a possibility to use - return percentage > 50 - - -class TVTimeItem: - def __init__(self, name: str, updated_at: str): - self.name = name - # Get the date which the show was marked 'watched' in TV Time - # and parse the watched date value into a Python object - self.date_watched = datetime.strptime( - updated_at, "%Y-%m-%d %H:%M:%S" - ) - - -class TVTimeTVShow(TVTimeItem): - def __init__(self, row: Any): - super().__init__(row["tv_show_name"], row["updated_at"]) - self.episode_id = row["episode_id"] - self.season_number = row["episode_season_number"] - self.episode_number = row["episode_number"] - - def parse_season_number(self, trakt_show: TraktTVShow) -> int: - """ - Since the Trakt.Py starts the indexing of seasons in the array from 0 (e.g. Season 1 in Index 0), then - subtract the TV Time numerical value by 1, so it starts from 0 as well. However, when a TV series includes - a 'special' season, Trakt.Py will place this as the first season in the array - so, don't subtract, since - this will match TV Time's existing value. - """ - - season_number = int(self.season_number) - # Gen get the Season Number from the first item in the array - first_season_no = trakt_show.seasons[0].number - - # If the season number is 0, then the Trakt show contains a "special" season - if first_season_no == 0: - # No need to modify the value, as the TV Time value will match Trakt - return season_number - # Otherwise, if the Trakt seasons start with no specials, then return the seasonNo, - # but subtracted by one (e.g. Season 1 in TV Time, will be 0) - else: - # Only subtract if the TV Time season number is greater than 0. - if season_number != 0: - return season_number - 1 - # Otherwise, the TV Time season is a special! Then you don't need to change the starting position - else: - return season_number - - -class TVTimeMovie(TVTimeItem): - def __init__(self, row: Any): - super().__init__(row["movie_name"], row["updated_at"]) - self.activity_type = row["type"] - - -class Searcher(ABC): - def __init__(self, user_matched_table: Table): - self.name = "" - self.items_with_same_name: Optional[TraktItem] = None - self._user_matched_table = user_matched_table - - def search(self, title: Title) -> Optional[TraktItem]: - self.name = title.name - # If the title contains a year, then replace the local variable with the stripped version. - if title.year: - self.name = title.without_year - self.items_with_same_name = title.items_with_same_name(self.search_trakt(self.name)) - - single_result = self._check_single_result() - if single_result: - return single_result - - # If the search contains multiple results, then we need to confirm with the user which show - # the script should use, or access the local database to see if the user has already provided - # a manual selection - - should_return, query_result = self._search_local() - if should_return: - return query_result - # If the user has not provided a manual selection already in the process - # then prompt the user to make a selection - else: - self._handle_multiple_manually() - - @abstractmethod - def search_trakt(self, name: str) -> list[TraktItem]: - pass - - @abstractmethod - def _print_manual_selection(self): - pass - - def _search_local(self) -> tuple[bool, TraktItem]: - user_matched_query = Query() - query_result = self._user_matched_table.search(user_matched_query.Name == self.name) - # If the local database already contains an entry for a manual selection - # then don't bother prompting the user to select it again! - if len(query_result) == 1: - first_match = query_result[0] - first_match_selected_index = int(first_match.get("UserSelectedIndex")) - skip_show = first_match.get("Skip") - if not skip_show: - return True, self.items_with_same_name[first_match_selected_index] - else: - return True, None - else: - return False, None - - def _handle_multiple_manually(self) -> Optional[TraktItem]: - self._print_manual_selection() - while True: - try: - # Get the user's selection, either a numerical input, or a string 'SKIP' value - index_selected = input( - "Please make a selection from above (or enter SKIP):" - ) - - if index_selected == "SKIP": - break - - index_selected = int(index_selected) - 1 - break - except KeyboardInterrupt: - sys.exit("Cancel requested...") - except Exception: - logging.error(f"Sorry! Please select a value between 0 to {len(self.items_with_same_name)}") - - # If the user entered 'SKIP', then exit from the loop with no selection, which - # will trigger the program to move onto the next episode - if index_selected == "SKIP": - # Record that the user has skipped the TV Show for import, so that - # manual input isn't required everytime - self._user_matched_table.insert( - {"Name": self.name, "UserSelectedIndex": 0, "Skip": True} - ) - return None - else: - selected_show = self.items_with_same_name[int(index_selected)] - - self._user_matched_table.insert( - { - "Name": self.name, - "UserSelectedIndex": index_selected, - "Skip": False, - } - ) - - return selected_show - - def _check_single_result(self) -> Optional[TraktItem]: - complete_match_names = [name_from_search for name_from_search in self.items_with_same_name if - name_from_search.title == self.name] - if len(complete_match_names) == 1: - return complete_match_names[0] - elif len(self.items_with_same_name) == 1: - return self.items_with_same_name[0] - elif len(self.items_with_same_name) < 1: - return None - - -class TVShowSearcher(Searcher): - def __init__(self, tv_show: TVTimeTVShow): - super().__init__(userMatchedShowsTable) - self.tv_show = tv_show - - def search_trakt(self, name: str) -> list[TraktItem]: - return TVShow.search(name) - - def _print_manual_selection(self) -> None: - print( - f"INFO - MANUAL INPUT REQUIRED: The TV Time data for Show '{self.name}'" - f" (Season {self.tv_show.season_number}, Episode {self.tv_show.episode_number}) has" - f" {len(self.items_with_same_name)} matching Trakt shows with the same name.\a" - ) - - for idx, item in enumerate(self.items_with_same_name): - print( - f"({idx + 1}) {item.title} - {item.year} - {len(item.seasons)}" - f" Season(s) - More Info: https://trakt.tv/{item.ext}" - ) - - -class MovieSearcher(Searcher): - def __init__(self): - super().__init__(userMatchedMoviesTable) - - def search_trakt(self, name: str) -> list[TraktItem]: - return Movie.search(name) - - def _print_manual_selection(self) -> None: - print( - f"INFO - MANUAL INPUT REQUIRED: The TV Time data for Movie '{self.name}'" - f" has {len(self.items_with_same_name)}" - f" matching Trakt movies with the same name.\a" - ) - - for idx, item in enumerate(self.items_with_same_name): - print(f"({idx + 1}) {item.title} - {item.year} - More Info: https://trakt.tv/{item.ext}") - - -class Processor(ABC): - @abstractmethod - def _get_synced_items(self, tv_time_item: TVTimeItem) -> list[Document]: - pass - - @abstractmethod - def _log_already_imported(self, tv_time_item: TVTimeItem, progress: float) -> None: - pass - - @abstractmethod - def _should_continue(self, tv_time_item: TVTimeItem) -> bool: - pass - - @abstractmethod - def _search_trakt(self, tv_time_item: TVTimeItem) -> TraktItem: - pass - - @abstractmethod - def _process(self, tv_time_item: TVTimeItem, trakt_item: TraktItem, progress: float) -> None: - pass - - def process_item(self, tv_time_item: TVTimeItem, progress: float) -> None: - # Query the local database for previous entries indicating that - # the item has already been imported in the past. Which will - # ease pressure on Trakt's API server during a retry of the import - # process, and just save time overall without needing to create network requests. - synced_episodes = self._get_synced_items(tv_time_item) - if len(synced_episodes) != 0: - self._log_already_imported(tv_time_item, progress) - return - - # If the query returned no results, then continue to import it into Trakt - # Create a repeating loop, which will break on success, but repeats on failures - error_streak = 0 - while True: - # If more than 10 errors occurred in one streak, whilst trying to import the item - # then give up, and move onto the next item, but warn the user. - if error_streak > 10: - logging.warning("An error occurred 10 times in a row... skipping episode...") - break - - if not self._should_continue(): - break - - try: - # Sleep for a second between each process, before going onto the next watched item. - # This is required to remain within the API rate limit, and use the API server fairly. - # Other developers share the service, for free - so be considerate of your usage. - time.sleep(DELAY_BETWEEN_ITEMS_IN_SECONDS) - - trakt_item = self._search_trakt(tv_time_item) - if not trakt_item: - break - - self._process(tv_time_item, trakt_item, progress) - - error_streak = 0 - break - # Catch errors which occur because of an incorrect array index. This occurs when - # an incorrect Trakt show has been selected, with season/episodes which don't match TV Time. - # It can also occur due to a bug in Trakt Py, whereby some seasons contain an empty array of episodes. - except IndexError: - self._handle_index_error(tv_time_item, trakt_item, progress) - break - except trakt.core.errors.NotFoundException: - self._handle_not_found_exception(tv_time_item, progress) - break - except trakt.core.errors.RateLimitException: - logging.warning( - "The program is running too quickly and has hit Trakt's API rate limit!" - " Please increase the delay between" - " movies via the variable 'DELAY_BETWEEN_EPISODES_IN_SECONDS'." - " The program will now wait 60 seconds before" - " trying again." - ) - time.sleep(60) - error_streak += 1 - # Catch a JSON decode error - this can be raised when the API server is down and produces an HTML page, - # instead of JSON - except json.decoder.JSONDecodeError: - logging.warning( - f"({progress}) - A JSON decode error occurred whilst processing {tv_time_item.name}" - " This might occur when the server is down and has produced" - " a HTML document instead of JSON. The script will wait 60 seconds before trying again." - ) - - time.sleep(60) - error_streak += 1 - # Catch a CTRL + C keyboard input, and exits the program - except KeyboardInterrupt: - sys.exit("Cancel requested...") - - @abstractmethod - def _handle_index_error(self, tv_time_item: TVTimeItem, trakt_item: TraktItem, progress: float) -> None: - pass - - @abstractmethod - def _handle_not_found_exception(self, tv_time_item: TVTimeItem, progress: float) -> None: - pass - - -class TVShowProcessor(Processor): - def __init__(self): - super().__init__() - - def _get_synced_items(self, tv_time_show: TVTimeTVShow) -> list[Document]: - episode_completed_query = Query() - return syncedEpisodesTable.search(episode_completed_query.episodeId == tv_time_show.episode_id) - - def _log_already_imported(self, tv_time_show: TVTimeTVShow, progress: float) -> None: - logging.info( - f"({progress}) - Already imported," - f" skipping \'{tv_time_show.name}\' Season {tv_time_show.season_number} /" - f" Episode {tv_time_show.episode_number}." - ) - - def _should_continue(self, tv_time_show: TVTimeTVShow) -> bool: - return True - - def _search_trakt(self, tv_time_show: TVTimeTVShow) -> TraktTVShow: - return TVShowSearcher(tv_time_show).search_trakt(tv_time_show.name) - - def _process(self, tv_time_show: TVTimeTVShow, trakt_show: TraktItem, progress: float) -> None: - logging.info( - f"({progress}) - Processing '{tv_time_show.name}'" - f" Season {tv_time_show.season_number} /" - f" Episode {tv_time_show.episode_number}" - ) - - season = trakt_show.seasons[tv_time_show.parse_season_number(trakt_show)] - episode = season.episodes[int(tv_time_show.episode_number) - 1] - episode.mark_as_seen(tv_time_show.date_watched) - # Add the episode to the local database as imported, so it can be skipped, - # if the process is repeated - syncedEpisodesTable.insert({"episodeId": tv_time_show.episode_id}) - logging.info(f"'{tv_time_show.name}' marked as seen") - - def _handle_index_error(self, tv_time_show: TVTimeTVShow, trakt_show: TraktTVShow, progress: float) -> None: - tv_show_slug = trakt_show.to_json()["shows"][0]["ids"]["ids"]["slug"] - logging.warning( - f"({progress}) - {tv_time_show.name} Season {tv_time_show.season_number}," - f" Episode {tv_time_show.episode_number} does not exist in Trakt!" - f" (https://trakt.tv/shows/{tv_show_slug}/seasons/{tv_time_show.season_number}/episodes/{tv_time_show.episode_number})" - ) - - def _handle_not_found_exception(self, tv_time_show: TVTimeTVShow, progress: float) -> None: - logging.warning( - f"({progress}) - {tv_time_show.name} Season {tv_time_show.season_number}," - f" Episode {tv_time_show.episode_number} does not exist (search) in Trakt!" - ) - - -class MovieProcessor(Processor): - def __init__(self, watched_list: list): - super().__init__() - self._watched_list = watched_list - - def _get_synced_items(self, tv_time_movie: TVTimeMovie) -> list[Document]: - movie_query = Query() - return syncedMoviesTable.search( - (movie_query.movie_name == tv_time_movie.name) & (movie_query.type == "watched") - ) - - def _log_already_imported(self, tv_time_movie: TVTimeMovie, progress: float) -> None: - logging.info(f"({progress}) - Already imported, skipping '{tv_time_movie.name}'.") - - def _should_continue(self, tv_time_movie: TVTimeMovie) -> bool: - # If movie is watched but this is an entry for watchlist, then skip - if tv_time_movie.name in self._watched_list and tv_time_movie.activity_type != "watch": - logging.info(f"Skipping '{tv_time_movie.name}' to avoid redundant watchlist entry.") - return False - - return True - - def _search_trakt(self, tv_time_movie: TVTimeMovie) -> TraktMovie: - return MovieSearcher().search_trakt(tv_time_movie.name) - - def _process(self, tv_time_movie: TVTimeMovie, trakt_movie: TraktMovie, progress: float) -> None: - logging.info(f"({progress}) - Processing '{tv_time_movie.name}'") - - watchlist_query = Query() - movies_in_watchlist = syncedMoviesTable.search( - (watchlist_query.movie_name == tv_time_movie.name) & (watchlist_query.type == "watchlist") - ) - - if tv_time_movie.activity_type == "watch": - trakt_movie.mark_as_seen(tv_time_movie.date_watched) - # Add the episode to the local database as imported, so it can be skipped, - # if the process is repeated - syncedMoviesTable.insert( - {"movie_name": tv_time_movie.name, "type": "watched"} - ) - logging.info(f"'{tv_time_movie.name}' marked as seen") - elif len(movies_in_watchlist) == 0: - trakt_movie.add_to_watchlist() - # Add the episode to the local database as imported, so it can be skipped, - # if the process is repeated - syncedMoviesTable.insert( - {"movie_name": tv_time_movie.name, "type": "watchlist"} - ) - logging.info(f"'{tv_time_movie.name}' added to watchlist") - else: - logging.warning(f"{tv_time_movie.name} already in watchlist") - - def _handle_index_error(self, tv_time_movie: TVTimeMovie, trakt_movie: TraktMovie, progress: float) -> None: - movie_slug = trakt_movie.to_json()["movies"][0]["ids"]["ids"]["slug"] - logging.warning( - f"({progress}) - {tv_time_movie.name}" - f" does not exist in Trakt! (https://trakt.tv/movies/{movie_slug}/)" - ) - - def _handle_not_found_exception(self, tv_time_movie: TVTimeMovie, progress: float) -> None: - logging.warning(f"({progress}) - {tv_time_movie.name} does not exist (search) in Trakt!") - - def process_watched_shows() -> None: with open(WATCHED_SHOWS_PATH, newline="") as csvfile: reader = csv.DictReader(csvfile, delimiter=",") diff --git a/database.py b/database.py new file mode 100644 index 0000000..14d25ab --- /dev/null +++ b/database.py @@ -0,0 +1,8 @@ +from tinydb import TinyDB + +# Create databases to keep track of completed processes +database = TinyDB("localStorage.json") +syncedEpisodesTable = database.table("SyncedEpisodes") +userMatchedShowsTable = database.table("TvTimeTraktUserMatched") +syncedMoviesTable = database.table("SyncedMovies") +userMatchedMoviesTable = database.table("TvTimeTraktUserMatchedMovies") diff --git a/processor.py b/processor.py new file mode 100644 index 0000000..db1ce68 --- /dev/null +++ b/processor.py @@ -0,0 +1,228 @@ +import json +import logging +import sys +import time +from abc import ABC, abstractmethod + +import trakt.core +from tinydb import Query +from tinydb.table import Document + +from database import syncedEpisodesTable, syncedMoviesTable +from searcher import TVShowSearcher, MovieSearcher, TraktTVShow, TraktMovie, TraktItem, TVTimeItem, TVTimeTVShow, \ + TVTimeMovie + + +class Processor(ABC): + @abstractmethod + def _get_synced_items(self, tv_time_item: TVTimeItem) -> list[Document]: + pass + + @abstractmethod + def _log_already_imported(self, tv_time_item: TVTimeItem, progress: float) -> None: + pass + + @abstractmethod + def _should_continue(self, tv_time_item: TVTimeItem) -> bool: + pass + + @abstractmethod + def _search_trakt(self, tv_time_item: TVTimeItem) -> TraktItem: + pass + + @abstractmethod + def _process(self, tv_time_item: TVTimeItem, trakt_item: TraktItem, progress: float) -> None: + pass + + def process_item(self, tv_time_item: TVTimeItem, progress: float, delay: int = 1) -> None: + # Query the local database for previous entries indicating that + # the item has already been imported in the past. Which will + # ease pressure on Trakt's API server during a retry of the import + # process, and just save time overall without needing to create network requests. + synced_episodes = self._get_synced_items(tv_time_item) + if len(synced_episodes) != 0: + self._log_already_imported(tv_time_item, progress) + return + + # If the query returned no results, then continue to import it into Trakt + # Create a repeating loop, which will break on success, but repeats on failures + error_streak = 0 + while True: + # If more than 10 errors occurred in one streak, whilst trying to import the item + # then give up, and move onto the next item, but warn the user. + if error_streak > 10: + logging.warning("An error occurred 10 times in a row... skipping episode...") + break + + if not self._should_continue(): + break + + try: + # Sleep for a second between each process, before going onto the next watched item. + # This is required to remain within the API rate limit, and use the API server fairly. + # Other developers share the service, for free - so be considerate of your usage. + time.sleep(delay) + + trakt_item = self._search_trakt(tv_time_item) + if not trakt_item: + break + + self._process(tv_time_item, trakt_item, progress) + + error_streak = 0 + break + # Catch errors which occur because of an incorrect array index. This occurs when + # an incorrect Trakt show has been selected, with season/episodes which don't match TV Time. + # It can also occur due to a bug in Trakt Py, whereby some seasons contain an empty array of episodes. + except IndexError: + self._handle_index_error(tv_time_item, trakt_item, progress) + break + except trakt.core.errors.NotFoundException: + self._handle_not_found_exception(tv_time_item, progress) + break + except trakt.core.errors.RateLimitException: + logging.warning( + "The program is running too quickly and has hit Trakt's API rate limit!" + " Please increase the delay between" + " movies via the variable 'DELAY_BETWEEN_EPISODES_IN_SECONDS'." + " The program will now wait 60 seconds before" + " trying again." + ) + time.sleep(60) + error_streak += 1 + # Catch a JSON decode error - this can be raised when the API server is down and produces an HTML page, + # instead of JSON + except json.decoder.JSONDecodeError: + logging.warning( + f"({progress}) - A JSON decode error occurred whilst processing {tv_time_item.name}" + " This might occur when the server is down and has produced" + " a HTML document instead of JSON. The script will wait 60 seconds before trying again." + ) + + time.sleep(60) + error_streak += 1 + # Catch a CTRL + C keyboard input, and exits the program + except KeyboardInterrupt: + sys.exit("Cancel requested...") + + @abstractmethod + def _handle_index_error(self, tv_time_item: TVTimeItem, trakt_item: TraktItem, progress: float) -> None: + pass + + @abstractmethod + def _handle_not_found_exception(self, tv_time_item: TVTimeItem, progress: float) -> None: + pass + + +class TVShowProcessor(Processor): + def __init__(self): + super().__init__() + + def _get_synced_items(self, tv_time_show: TVTimeTVShow) -> list[Document]: + episode_completed_query = Query() + return syncedEpisodesTable.search(episode_completed_query.episodeId == tv_time_show.episode_id) + + def _log_already_imported(self, tv_time_show: TVTimeTVShow, progress: float) -> None: + logging.info( + f"({progress}) - Already imported," + f" skipping \'{tv_time_show.name}\' Season {tv_time_show.season_number} /" + f" Episode {tv_time_show.episode_number}." + ) + + def _should_continue(self, tv_time_show: TVTimeTVShow) -> bool: + return True + + def _search_trakt(self, tv_time_show: TVTimeTVShow) -> TraktTVShow: + return TVShowSearcher(tv_time_show).search_trakt(tv_time_show.name) + + def _process(self, tv_time_show: TVTimeTVShow, trakt_show: TraktItem, progress: float) -> None: + logging.info( + f"({progress}) - Processing '{tv_time_show.name}'" + f" Season {tv_time_show.season_number} /" + f" Episode {tv_time_show.episode_number}" + ) + + season = trakt_show.seasons[tv_time_show.parse_season_number(trakt_show)] + episode = season.episodes[int(tv_time_show.episode_number) - 1] + episode.mark_as_seen(tv_time_show.date_watched) + # Add the episode to the local database as imported, so it can be skipped, + # if the process is repeated + syncedEpisodesTable.insert({"episodeId": tv_time_show.episode_id}) + logging.info(f"'{tv_time_show.name}' marked as seen") + + def _handle_index_error(self, tv_time_show: TVTimeTVShow, trakt_show: TraktTVShow, progress: float) -> None: + tv_show_slug = trakt_show.to_json()["shows"][0]["ids"]["ids"]["slug"] + logging.warning( + f"({progress}) - {tv_time_show.name} Season {tv_time_show.season_number}," + f" Episode {tv_time_show.episode_number} does not exist in Trakt!" + f" (https://trakt.tv/shows/{tv_show_slug}/seasons/{tv_time_show.season_number}/episodes/{tv_time_show.episode_number})" + ) + + def _handle_not_found_exception(self, tv_time_show: TVTimeTVShow, progress: float) -> None: + logging.warning( + f"({progress}) - {tv_time_show.name} Season {tv_time_show.season_number}," + f" Episode {tv_time_show.episode_number} does not exist (search) in Trakt!" + ) + + +class MovieProcessor(Processor): + def __init__(self, watched_list: list): + super().__init__() + self._watched_list = watched_list + + def _get_synced_items(self, tv_time_movie: TVTimeMovie) -> list[Document]: + movie_query = Query() + return syncedMoviesTable.search( + (movie_query.movie_name == tv_time_movie.name) & (movie_query.type == "watched") + ) + + def _log_already_imported(self, tv_time_movie: TVTimeMovie, progress: float) -> None: + logging.info(f"({progress}) - Already imported, skipping '{tv_time_movie.name}'.") + + def _should_continue(self, tv_time_movie: TVTimeMovie) -> bool: + # If movie is watched but this is an entry for watchlist, then skip + if tv_time_movie.name in self._watched_list and tv_time_movie.activity_type != "watch": + logging.info(f"Skipping '{tv_time_movie.name}' to avoid redundant watchlist entry.") + return False + + return True + + def _search_trakt(self, tv_time_movie: TVTimeMovie) -> TraktMovie: + return MovieSearcher().search_trakt(tv_time_movie.name) + + def _process(self, tv_time_movie: TVTimeMovie, trakt_movie: TraktMovie, progress: float) -> None: + logging.info(f"({progress}) - Processing '{tv_time_movie.name}'") + + watchlist_query = Query() + movies_in_watchlist = syncedMoviesTable.search( + (watchlist_query.movie_name == tv_time_movie.name) & (watchlist_query.type == "watchlist") + ) + + if tv_time_movie.activity_type == "watch": + trakt_movie.mark_as_seen(tv_time_movie.date_watched) + # Add the episode to the local database as imported, so it can be skipped, + # if the process is repeated + syncedMoviesTable.insert( + {"movie_name": tv_time_movie.name, "type": "watched"} + ) + logging.info(f"'{tv_time_movie.name}' marked as seen") + elif len(movies_in_watchlist) == 0: + trakt_movie.add_to_watchlist() + # Add the episode to the local database as imported, so it can be skipped, + # if the process is repeated + syncedMoviesTable.insert( + {"movie_name": tv_time_movie.name, "type": "watchlist"} + ) + logging.info(f"'{tv_time_movie.name}' added to watchlist") + else: + logging.warning(f"{tv_time_movie.name} already in watchlist") + + def _handle_index_error(self, tv_time_movie: TVTimeMovie, trakt_movie: TraktMovie, progress: float) -> None: + movie_slug = trakt_movie.to_json()["movies"][0]["ids"]["ids"]["slug"] + logging.warning( + f"({progress}) - {tv_time_movie.name}" + f" does not exist in Trakt! (https://trakt.tv/movies/{movie_slug}/)" + ) + + def _handle_not_found_exception(self, tv_time_movie: TVTimeMovie, progress: float) -> None: + logging.warning(f"({progress}) - {tv_time_movie.name} does not exist (search) in Trakt!") diff --git a/searcher.py b/searcher.py new file mode 100644 index 0000000..e9527e3 --- /dev/null +++ b/searcher.py @@ -0,0 +1,284 @@ +import logging +import re +import sys +from abc import ABC, abstractmethod +from dataclasses import dataclass +from datetime import datetime +from typing import Optional, TypeVar, Union, Any + +from tinydb import Query +from tinydb.table import Table +from trakt.movies import Movie +from trakt.tv import TVShow + +from database import userMatchedShowsTable, userMatchedMoviesTable + +TraktTVShow = TypeVar("TraktTVShow") +TraktMovie = TypeVar("TraktMovie") +TraktItem = Union[TraktTVShow, TraktMovie] + + +@dataclass +class Title: + name: str + without_year: str + year: Optional[int] + + def __init__(self, title: str): + """ + Parse the title's name for year. + :param title: + """ + try: + self.name = title + # Use a regex expression to get the value within the brackets e.g. The Americans (2017) + year_search = re.search(r"\(([A-Za-z0-9_]+)\)", title) + self.year = int(year_search.group(1)) + # Then, get the title without the year value included + self.without_year = title.split("(")[0].strip() + except Exception: + # If the above failed, then the title doesn't include a year + # so create the value with "defaults" + self.name = title + self.without_year = title + self.year = None + + def items_with_same_name(self, items: list[TraktItem]) -> list[TraktItem]: + with_same_name = [] + + for item in items: + if self.matches(item.title): + # If the title included the year of broadcast, then we can be more picky in the results + # to look for an item with a broadcast year that matches + if self.year: + # If the item title is a 1:1 match, with the same broadcast year, then bingo! + if (self.name == item.title) and (item.year == self.year): + # Clear previous results, and only use this one + with_same_name = [item] + break + + # Otherwise, only add the item if the broadcast year matches + if item.year == self.year: + with_same_name.append(item) + # If the item doesn't have the broadcast year, then add all the results + else: + with_same_name.append(item) + + return with_same_name + + def matches(self, other: str) -> bool: + """ + Shows in TV Time are often different to Trakt.TV - in order to improve results and automation, + calculate how many words are in the title, and return true if more than 50% of the title is a match, + It seems to improve automation, and reduce manual selection... + """ + + # If the name is a complete match, then don't bother comparing them! + if self.name == other: + return True + + # Go through each word of the TV Time title, and check if it's in the Trakt title + words_matched = [word for word in self.name.split() if word in other] + + # Then calculate what percentage of words matched + quotient = len(words_matched) / len(other.split()) + percentage = quotient * 100 + + # If more than 50% of words in the TV Time title exist in the Trakt title, + # then return the title as a possibility to use + return percentage > 50 + + +class TVTimeItem: + def __init__(self, name: str, updated_at: str): + self.name = name + # Get the date which the show was marked 'watched' in TV Time + # and parse the watched date value into a Python object + self.date_watched = datetime.strptime( + updated_at, "%Y-%m-%d %H:%M:%S" + ) + + +class TVTimeTVShow(TVTimeItem): + def __init__(self, row: Any): + super().__init__(row["tv_show_name"], row["updated_at"]) + self.episode_id = row["episode_id"] + self.season_number = row["episode_season_number"] + self.episode_number = row["episode_number"] + + def parse_season_number(self, trakt_show: TraktTVShow) -> int: + """ + Since the Trakt.Py starts the indexing of seasons in the array from 0 (e.g. Season 1 in Index 0), then + subtract the TV Time numerical value by 1, so it starts from 0 as well. However, when a TV series includes + a 'special' season, Trakt.Py will place this as the first season in the array - so, don't subtract, since + this will match TV Time's existing value. + """ + + season_number = int(self.season_number) + # Gen get the Season Number from the first item in the array + first_season_no = trakt_show.seasons[0].number + + # If the season number is 0, then the Trakt show contains a "special" season + if first_season_no == 0: + # No need to modify the value, as the TV Time value will match Trakt + return season_number + # Otherwise, if the Trakt seasons start with no specials, then return the seasonNo, + # but subtracted by one (e.g. Season 1 in TV Time, will be 0) + else: + # Only subtract if the TV Time season number is greater than 0. + if season_number != 0: + return season_number - 1 + # Otherwise, the TV Time season is a special! Then you don't need to change the starting position + else: + return season_number + + +class TVTimeMovie(TVTimeItem): + def __init__(self, row: Any): + super().__init__(row["movie_name"], row["updated_at"]) + self.activity_type = row["type"] + + +class Searcher(ABC): + def __init__(self, user_matched_table: Table): + self.name = "" + self.items_with_same_name: Optional[TraktItem] = None + self._user_matched_table = user_matched_table + + def search(self, title: Title) -> Optional[TraktItem]: + self.name = title.name + # If the title contains a year, then replace the local variable with the stripped version. + if title.year: + self.name = title.without_year + self.items_with_same_name = title.items_with_same_name(self.search_trakt(self.name)) + + single_result = self._check_single_result() + if single_result: + return single_result + + # If the search contains multiple results, then we need to confirm with the user which show + # the script should use, or access the local database to see if the user has already provided + # a manual selection + + should_return, query_result = self._search_local() + if should_return: + return query_result + # If the user has not provided a manual selection already in the process + # then prompt the user to make a selection + else: + self._handle_multiple_manually() + + @abstractmethod + def search_trakt(self, name: str) -> list[TraktItem]: + pass + + @abstractmethod + def _print_manual_selection(self): + pass + + def _search_local(self) -> tuple[bool, TraktItem]: + user_matched_query = Query() + query_result = self._user_matched_table.search(user_matched_query.Name == self.name) + # If the local database already contains an entry for a manual selection + # then don't bother prompting the user to select it again! + if len(query_result) == 1: + first_match = query_result[0] + first_match_selected_index = int(first_match.get("UserSelectedIndex")) + skip_show = first_match.get("Skip") + if not skip_show: + return True, self.items_with_same_name[first_match_selected_index] + else: + return True, None + else: + return False, None + + def _handle_multiple_manually(self) -> Optional[TraktItem]: + self._print_manual_selection() + while True: + try: + # Get the user's selection, either a numerical input, or a string 'SKIP' value + index_selected = input( + "Please make a selection from above (or enter SKIP):" + ) + + if index_selected == "SKIP": + break + + index_selected = int(index_selected) - 1 + break + except KeyboardInterrupt: + sys.exit("Cancel requested...") + except Exception: + logging.error(f"Sorry! Please select a value between 0 to {len(self.items_with_same_name)}") + + # If the user entered 'SKIP', then exit from the loop with no selection, which + # will trigger the program to move onto the next episode + if index_selected == "SKIP": + # Record that the user has skipped the TV Show for import, so that + # manual input isn't required everytime + self._user_matched_table.insert( + {"Name": self.name, "UserSelectedIndex": 0, "Skip": True} + ) + return None + else: + selected_show = self.items_with_same_name[int(index_selected)] + + self._user_matched_table.insert( + { + "Name": self.name, + "UserSelectedIndex": index_selected, + "Skip": False, + } + ) + + return selected_show + + def _check_single_result(self) -> Optional[TraktItem]: + complete_match_names = [name_from_search for name_from_search in self.items_with_same_name if + name_from_search.title == self.name] + if len(complete_match_names) == 1: + return complete_match_names[0] + elif len(self.items_with_same_name) == 1: + return self.items_with_same_name[0] + elif len(self.items_with_same_name) < 1: + return None + + +class TVShowSearcher(Searcher): + def __init__(self, tv_show: TVTimeTVShow): + super().__init__(userMatchedShowsTable) + self.tv_show = tv_show + + def search_trakt(self, name: str) -> list[TraktItem]: + return TVShow.search(name) + + def _print_manual_selection(self) -> None: + print( + f"INFO - MANUAL INPUT REQUIRED: The TV Time data for Show '{self.name}'" + f" (Season {self.tv_show.season_number}, Episode {self.tv_show.episode_number}) has" + f" {len(self.items_with_same_name)} matching Trakt shows with the same name.\a" + ) + + for idx, item in enumerate(self.items_with_same_name): + print( + f"({idx + 1}) {item.title} - {item.year} - {len(item.seasons)}" + f" Season(s) - More Info: https://trakt.tv/{item.ext}" + ) + + +class MovieSearcher(Searcher): + def __init__(self): + super().__init__(userMatchedMoviesTable) + + def search_trakt(self, name: str) -> list[TraktItem]: + return Movie.search(name) + + def _print_manual_selection(self) -> None: + print( + f"INFO - MANUAL INPUT REQUIRED: The TV Time data for Movie '{self.name}'" + f" has {len(self.items_with_same_name)}" + f" matching Trakt movies with the same name.\a" + ) + + for idx, item in enumerate(self.items_with_same_name): + print(f"({idx + 1}) {item.title} - {item.year} - More Info: https://trakt.tv/{item.ext}")